dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Shared execution context passed to every task.
//!
//! A [`Context`] is created once per run and shared (via `Arc`) across all
//! tasks. It provides three things tasks commonly need:
//!
//! * a typed key/value **blackboard** for passing data between tasks,
//! * **cooperative cancellation** (e.g. for graceful shutdown),
//! * a lightweight **event bus** used by [`crate::tasks::EventDrivenTask`].

use crate::utils::Config;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

/// A cooperative cancellation token.
///
/// Cloning yields another handle to the *same* underlying flag, so cancelling
/// any clone cancels them all. Tasks should poll [`CancelToken::is_cancelled`]
/// at safe points or `await` [`CancelToken::cancelled`].
#[derive(Clone, Default)]
pub struct CancelToken {
    inner: Arc<CancelInner>,
}

#[derive(Default)]
struct CancelInner {
    cancelled: AtomicBool,
    notify: Notify,
}

impl CancelToken {
    /// Create a fresh, un-cancelled token.
    pub fn new() -> Self {
        Self::default()
    }

    /// Signal cancellation and wake all waiters.
    pub fn cancel(&self) {
        self.inner.cancelled.store(true, Ordering::SeqCst);
        self.inner.notify.notify_waiters();
    }

    /// Whether cancellation has been requested.
    pub fn is_cancelled(&self) -> bool {
        self.inner.cancelled.load(Ordering::SeqCst)
    }

    /// Resolve once cancellation is requested. Returns immediately if already
    /// cancelled.
    pub async fn cancelled(&self) {
        if self.is_cancelled() {
            return;
        }
        // Re-check after registering for the notification to avoid a lost wakeup.
        let notified = self.inner.notify.notified();
        if self.is_cancelled() {
            return;
        }
        notified.await;
    }
}

/// Per-run shared state handed to tasks.
pub struct Context {
    /// Immutable run configuration.
    pub config: Arc<Config>,
    /// Unique id for this run.
    pub run_id: String,
    blackboard: RwLock<HashMap<String, serde_json::Value>>,
    events: RwLock<HashMap<String, Arc<Notify>>>,
    cancel: CancelToken,
}

impl Context {
    /// Build a context for a new run.
    pub fn new(config: Arc<Config>) -> Self {
        Context {
            config,
            run_id: uuid::Uuid::new_v4().to_string(),
            blackboard: RwLock::new(HashMap::new()),
            events: RwLock::new(HashMap::new()),
            cancel: CancelToken::new(),
        }
    }

    /// A context with default configuration — handy in tests.
    pub fn for_tests() -> Arc<Self> {
        Arc::new(Context::new(Arc::new(Config::default())))
    }

    /// The cancellation token for this run.
    pub fn cancel_token(&self) -> &CancelToken {
        &self.cancel
    }

    /// Request cancellation of the whole run.
    pub fn cancel(&self) {
        self.cancel.cancel();
    }

    /// Whether the run has been cancelled.
    pub fn is_cancelled(&self) -> bool {
        self.cancel.is_cancelled()
    }

    // ----- blackboard -----

    /// Store a value on the blackboard, overwriting any existing entry.
    pub fn set(&self, key: impl Into<String>, value: serde_json::Value) {
        self.blackboard.write().insert(key.into(), value);
    }

    /// Fetch (and clone) a value from the blackboard.
    pub fn get(&self, key: &str) -> Option<serde_json::Value> {
        self.blackboard.read().get(key).cloned()
    }

    /// Deserialize a blackboard value into a concrete type.
    pub fn get_as<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
        self.get(key).and_then(|v| serde_json::from_value(v).ok())
    }

    // ----- event bus -----

    fn event_handle(&self, name: &str) -> Arc<Notify> {
        if let Some(n) = self.events.read().get(name) {
            return n.clone();
        }
        let mut w = self.events.write();
        w.entry(name.to_string())
            .or_insert_with(|| Arc::new(Notify::new()))
            .clone()
    }

    /// Emit a named event, waking any tasks waiting on it.
    pub fn emit(&self, name: &str) {
        self.event_handle(name).notify_waiters();
    }

    /// Wait for a named event to be emitted.
    ///
    /// Resolves early (returning `false`) if the run is cancelled first;
    /// returns `true` when the event actually fires.
    pub async fn wait_for(&self, name: &str) -> bool {
        let notify = self.event_handle(name);
        let notified = notify.notified();
        tokio::select! {
            _ = notified => true,
            _ = self.cancel.cancelled() => false,
        }
    }
}