use crate::error::ExecutorError;
use crate::item::ExecutableItem;
use crate::trigger::{TriggerDecl, TriggerDeclarer};
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Vertex(pub(crate) usize);
#[allow(clippy::redundant_pub_crate)]
pub(crate) struct Graph {
pub(crate) items: Vec<Box<dyn ExecutableItem>>,
pub(crate) successors: Vec<Vec<usize>>, pub(crate) in_degree: Vec<usize>, pub(crate) root: usize,
pub(crate) decls: Vec<TriggerDecl>,
vertex_ptrs: Vec<VertexPtr>,
counters: Vec<AtomicUsize>,
pending: AtomicUsize,
stop_flag: AtomicBool,
stop_chain_seen: AtomicBool,
first_err: Mutex<Option<crate::error::ItemError>>,
done_cv: (Mutex<()>, Condvar),
ready_ring: crate::ready_ring::ReadyRing,
vertex_jobs: Vec<Box<dyn FnMut() + Send + 'static>>,
}
impl core::fmt::Debug for Graph {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Graph")
.field("n_items", &self.items.len())
.field("successors", &self.successors)
.field("in_degree", &self.in_degree)
.field("root", &self.root)
.finish_non_exhaustive()
}
}
impl Graph {
pub(crate) fn root_task_id(&self) -> Option<&str> {
self.items[self.root].task_id()
}
}
pub struct GraphBuilder {
items: Vec<Box<dyn ExecutableItem>>,
edges: Vec<(usize, usize)>,
root: Option<usize>,
}
impl GraphBuilder {
pub(crate) fn new() -> Self {
Self {
items: Vec::new(),
edges: Vec::new(),
root: None,
}
}
pub fn vertex<I: ExecutableItem>(&mut self, item: I) -> Vertex {
let idx = self.items.len();
self.items.push(Box::new(item));
Vertex(idx)
}
pub fn edge(&mut self, from: Vertex, to: Vertex) -> &mut Self {
self.edges.push((from.0, to.0));
self
}
pub const fn root(&mut self, v: Vertex) -> &mut Self {
self.root = Some(v.0);
self
}
#[allow(clippy::too_many_lines)]
pub(crate) fn finish(mut self) -> Result<Graph, ExecutorError> {
let n = self.items.len();
if n == 0 {
return Err(ExecutorError::InvalidGraph("graph has no vertices".into()));
}
let root = self
.root
.ok_or_else(|| ExecutorError::InvalidGraph("no root vertex set".into()))?;
if root >= n {
return Err(ExecutorError::InvalidGraph(
"root index out of bounds".into(),
));
}
let mut successors = vec![Vec::<usize>::new(); n];
let mut in_degree = vec![0_usize; n];
for &(from, to) in &self.edges {
if from >= n || to >= n {
return Err(ExecutorError::InvalidGraph(
"edge index out of bounds".into(),
));
}
if from == to {
return Err(ExecutorError::InvalidGraph(
"self-loops are not allowed".into(),
));
}
successors[from].push(to);
in_degree[to] += 1;
}
let mut k_in = in_degree.clone();
let mut queue: Vec<usize> = k_in
.iter()
.enumerate()
.filter_map(|(i, d)| (*d == 0).then_some(i))
.collect();
let mut visited = 0_usize;
while let Some(u) = queue.pop() {
visited += 1;
for &v in &successors[u] {
k_in[v] -= 1;
if k_in[v] == 0 {
queue.push(v);
}
}
}
if visited != n {
return Err(ExecutorError::InvalidGraph("graph contains a cycle".into()));
}
let mut reach = vec![false; n];
let mut stack = vec![root];
while let Some(u) = stack.pop() {
if reach[u] {
continue;
}
reach[u] = true;
for &v in &successors[u] {
stack.push(v);
}
}
if reach.iter().any(|r| !*r) {
return Err(ExecutorError::InvalidGraph(
"every vertex must be reachable from the root".into(),
));
}
let mut decl = TriggerDeclarer::new_internal();
self.items[root].declare_triggers(&mut decl)?;
let decls = decl.into_decls();
for (i, body) in self.items.iter_mut().enumerate() {
if i == root {
continue;
}
let mut spurious = TriggerDeclarer::new_internal();
let _ = body.declare_triggers(&mut spurious);
if !spurious.is_empty() {
#[cfg(feature = "tracing")]
tracing::warn!(target: "taktora-executor", vertex = i,
"non-root graph vertex declared triggers; ignored");
}
}
let n_items = self.items.len();
let mut items = self.items;
#[allow(unsafe_code)]
let vertex_ptrs: Vec<VertexPtr> = items
.iter_mut()
.map(|b| VertexPtr(std::ptr::from_mut(b.as_mut())))
.collect();
let counters: Vec<AtomicUsize> = in_degree.iter().map(|d| AtomicUsize::new(*d)).collect();
Ok(Graph {
items,
successors,
in_degree,
root,
decls,
vertex_ptrs,
counters,
pending: AtomicUsize::new(n_items),
stop_flag: AtomicBool::new(false),
stop_chain_seen: AtomicBool::new(false),
first_err: Mutex::new(None),
done_cv: (Mutex::new(()), Condvar::new()),
ready_ring: crate::ready_ring::ReadyRing::new(n_items),
vertex_jobs: Vec::new(),
})
}
}
use crate::context::Stoppable;
use crate::monitor::ExecutionMonitor;
use crate::observer::Observer;
use crate::pool::Pool;
use crate::task_id::TaskId;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
#[allow(clippy::redundant_pub_crate)]
pub(crate) struct GraphRunOutcome {
#[allow(clippy::redundant_pub_crate)]
pub(crate) error: Option<crate::error::ItemError>,
#[allow(clippy::redundant_pub_crate)]
pub(crate) stopped_chain: bool,
}
struct VertexPtr(*mut dyn ExecutableItem);
#[allow(unsafe_code)]
unsafe impl Send for VertexPtr {}
#[allow(unsafe_code)]
unsafe impl Sync for VertexPtr {}
#[allow(unsafe_code)]
#[derive(Copy, Clone)]
struct SendGraphPtr(*const Graph);
impl SendGraphPtr {
const fn get(&self) -> *const Graph {
self.0
}
}
#[allow(unsafe_code)]
unsafe impl Send for SendGraphPtr {}
#[allow(unsafe_code)]
unsafe impl Sync for SendGraphPtr {}
impl Graph {
fn finalise_skipped(&self, i: usize) {
if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
self.notify_done();
return;
}
for &j in &self.successors[i] {
self.cancel_subtree(j);
}
}
fn cancel_subtree(&self, root: usize) {
let mut stack = vec![root];
while let Some(u) = stack.pop() {
let prev = self.counters[u].swap(usize::MAX, Ordering::AcqRel);
if prev != usize::MAX {
if self.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
self.notify_done();
return;
}
for &v in &self.successors[u] {
stack.push(v);
}
}
}
}
fn notify_done(&self) {
let _g = self.done_cv.0.lock().unwrap();
self.done_cv.1.notify_all();
}
}
impl Graph {
#[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
pub(crate) fn prepare_dispatch(
self: &mut Box<Self>,
task_id: TaskId,
stop: Stoppable,
observer: Arc<dyn Observer>,
monitor: Arc<dyn ExecutionMonitor>,
err_slot: Arc<Mutex<Option<crate::error::ExecutorError>>>,
) {
let n = self.items.len();
#[allow(unsafe_code)]
let graph_ptr = SendGraphPtr(std::ptr::from_ref::<Self>(self.as_ref()));
let mut jobs: Vec<Box<dyn FnMut() + Send + 'static>> = Vec::with_capacity(n);
for i in 0..n {
let task_id = task_id.clone();
let stop = stop.clone();
let observer = Arc::clone(&observer);
let monitor = Arc::clone(&monitor);
let err_slot = Arc::clone(&err_slot);
let job: Box<dyn FnMut() + Send + 'static> = Box::new(move || {
#[allow(unsafe_code)]
let g: &Self = unsafe { &*graph_ptr.get() };
if g.stop_flag.load(Ordering::Acquire) {
g.finalise_skipped(i);
return;
}
let mut ctx = crate::context::Context::new(&task_id, &stop, observer.as_ref());
let ptr = g.vertex_ptrs[i].0;
#[allow(unsafe_code)]
let app_id = unsafe { (*ptr).app_id() };
#[allow(unsafe_code)]
let app_inst = unsafe { (*ptr).app_instance_id() };
if let Some(aid) = app_id {
observer.on_app_start(task_id.clone(), aid, app_inst);
}
let started = std::time::Instant::now();
monitor.pre_execute(task_id.clone(), started);
#[allow(unsafe_code)]
let res =
crate::executor::run_item_catch_unwind_external(unsafe { &mut *ptr }, &mut ctx);
let took = started.elapsed();
monitor.post_execute(task_id.clone(), started, took, res.is_ok());
if let Err(ref e) = res {
observer.on_app_error(task_id.clone(), e.as_ref());
}
if app_id.is_some() {
observer.on_app_stop(task_id.clone());
}
match &res {
Ok(crate::ControlFlow::Continue) => {}
Ok(crate::ControlFlow::StopChain) => {
g.stop_chain_seen.store(true, Ordering::Release);
g.stop_flag.store(true, Ordering::Release);
}
Err(_) => g.stop_flag.store(true, Ordering::Release),
}
if let Err(e) = res {
let mut fe = g.first_err.lock().unwrap();
if fe.is_none() {
*fe = Some(e);
}
}
if g.pending.fetch_sub(1, Ordering::AcqRel) == 1 {
g.notify_done();
} else if g.stop_flag.load(Ordering::Acquire) {
for &j in &g.successors[i] {
g.cancel_subtree(j);
}
} else {
for &j in &g.successors[i] {
if g.counters[j].fetch_sub(1, Ordering::AcqRel) == 1 {
g.ready_ring
.push(j)
.expect("ready_ring sized to n_vertices");
}
}
}
let _ = &err_slot; });
jobs.push(job);
}
self.vertex_jobs = jobs;
}
#[allow(unsafe_code)]
pub(crate) fn run_once_borrowed(&mut self, pool: &Pool) -> GraphRunOutcome {
let n = self.items.len();
for (c, d) in self.counters.iter().zip(self.in_degree.iter()) {
c.store(*d, Ordering::Relaxed);
}
self.pending.store(n, Ordering::Relaxed);
self.stop_flag.store(false, Ordering::Relaxed);
self.stop_chain_seen.store(false, Ordering::Relaxed);
*self.first_err.lock().unwrap() = None;
self.ready_ring.reset();
for i in 0..n {
if self.in_degree[i] == 0 {
self.dispatch_vertex(pool, i);
}
}
loop {
while let Some(i) = self.ready_ring.pop() {
self.dispatch_vertex(pool, i);
}
if self.pending.load(Ordering::Acquire) == 0 {
break;
}
let guard = self.done_cv.0.lock().unwrap();
if self.pending.load(Ordering::Acquire) == 0 {
drop(guard);
break;
}
drop(
self.done_cv
.1
.wait_timeout(guard, std::time::Duration::from_millis(5))
.unwrap()
.0,
);
}
while self.ready_ring.pop().is_some() {}
let mut first_err = self.first_err.lock().unwrap();
GraphRunOutcome {
error: first_err.take(),
stopped_chain: self.stop_chain_seen.load(Ordering::Acquire),
}
}
#[allow(unsafe_code)]
fn dispatch_vertex(&mut self, pool: &Pool, i: usize) {
let job_ptr: *mut (dyn FnMut() + Send) =
std::ptr::from_mut::<dyn FnMut() + Send>(self.vertex_jobs[i].as_mut());
unsafe {
pool.submit_borrowed(crate::pool::BorrowedJob::new(job_ptr));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{ControlFlow, item};
#[test]
fn empty_graph_rejected() {
let b = GraphBuilder::new();
let err = b.finish().expect_err("empty graph");
assert!(format!("{err}").contains("no vertices"));
}
#[test]
fn missing_root_rejected() {
let mut b = GraphBuilder::new();
b.vertex(item(|_| Ok(ControlFlow::Continue)));
let err = b.finish().expect_err("missing root");
assert!(format!("{err}").contains("no root"));
}
#[test]
fn cycle_rejected() {
let mut b = GraphBuilder::new();
let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
let v = b.vertex(item(|_| Ok(ControlFlow::Continue)));
b.edge(a, v).edge(v, a).root(a);
let err = b.finish().expect_err("cycle");
assert!(format!("{err}").contains("cycle"));
}
#[test]
fn unreachable_vertex_rejected() {
let mut b = GraphBuilder::new();
let a = b.vertex(item(|_| Ok(ControlFlow::Continue)));
let _orphan = b.vertex(item(|_| Ok(ControlFlow::Continue)));
b.root(a);
let err = b.finish().expect_err("unreachable");
assert!(format!("{err}").contains("reachable"));
}
#[test]
#[allow(clippy::many_single_char_names)]
fn diamond_graph_builds() {
let mut b = GraphBuilder::new();
let r = b.vertex(item(|_| Ok(ControlFlow::Continue)));
let l = b.vertex(item(|_| Ok(ControlFlow::Continue)));
let rt = b.vertex(item(|_| Ok(ControlFlow::Continue)));
let m = b.vertex(item(|_| Ok(ControlFlow::Continue)));
b.edge(r, l).edge(r, rt).edge(l, m).edge(rt, m).root(r);
let g = b.finish().expect("diamond");
assert_eq!(g.successors[r.0], vec![l.0, rt.0]);
assert_eq!(g.in_degree[m.0], 2);
}
}