indusagi-core 0.1.0

Cross-cutting primitives every indusagi crate depends on: cancellation, env registry, brand, locator, canonical-JSON, version, ids, errors, re-iterable channel.
Documentation
//! Cooperative cancellation — the framework's single cancellation currency.
//!
//! Replaces the TypeScript framework's native `AbortController`/`AbortSignal`
//! pair, which was chained **parent → round → per-call child** in
//! `runtime/dispatch/scheduler.ts`. There is no `CancelToken` *class* in the TS
//! source; the chaining convention is collapsed here onto a thin wrapper around
//! [`tokio_util::sync::CancellationToken`], which is already cooperative,
//! queryable, multi-listener, and drop-safe-chainable.
//!
//! ## The cooperative-cancel contract (the cross-cutting invariant)
//!
//! Cancellation is **cooperative**: holders poll [`CancellationToken::is_cancelled`]
//! or `await` [`CancellationToken::cancelled`]; nothing is forcibly killed. The
//! invariant every subsystem must honor:
//!
//! - A cancelled **tool** returns a typed `is_error` outcome — never `Err`,
//!   never a panic.
//! - A cancelled **connector** yields a terminal `Emission::Error` — it never
//!   throws out of the stream.
//! - When core code itself needs to *signal* "I observed cancel", it yields the
//!   typed [`CoreError::Cancelled`] (see [`CancellationToken::error_if_cancelled`]
//!   / [`CancellationToken::guard`]). Cancellation is a typed value, never a
//!   panic and never silently folded into an `Ok`.
//!
//! Rust is safer here than the Python port: there is no ambient exception to
//! swallow, so the sharpest Python footgun (the `except CancelledError: raise`
//! discipline) simply does not exist. The typed-error discipline still has to be
//! enforced — that is what this module's helpers are for.
//!
//! ## The chain
//!
//! ```text
//!   parent  ── child_token() ──▶  round  ── child_token() ──▶  per-call child
//! ```
//!
//! Cancellation propagates *down* the chain (cancelling a parent cancels every
//! descendant) but never *up* (cancelling a child leaves the parent live). This
//! is exactly the scheduler's parent→round→child semantics, and the chain is
//! drop-safe: dropping a token does not cancel it, but a `DropGuard` can cancel
//! on scope exit when wanted.

use std::future::Future;

pub use tokio_util::sync::CancellationToken;

use crate::errors::CoreError;

/// A root cancellation token. Equivalent to `new AbortController()` whose
/// `.signal` is handed to the top of the chain.
pub fn root_token() -> CancellationToken {
    CancellationToken::new()
}

/// Derive a *round* token from a `parent`. Cancelling `parent` cancels the
/// returned token (and everything chained beneath it); cancelling the returned
/// token leaves `parent` live. This is `parent.child_token()`, named for the
/// scheduler's parent→round step.
pub fn round_token(parent: &CancellationToken) -> CancellationToken {
    parent.child_token()
}

/// Derive a *per-call* token from a `round`. Same propagation rules; named for
/// the scheduler's round→child step. (Functionally identical to
/// [`round_token`]; the two names document intent at the call site.)
pub fn child_token(round: &CancellationToken) -> CancellationToken {
    round.child_token()
}

/// Extension helpers on [`CancellationToken`] that enforce the typed-error
/// contract. Implemented as an extension trait so the canonical tokio-util type
/// flows through the whole framework unchanged.
pub trait CancelExt {
    /// Return `Err(CoreError::Cancelled)` iff the token is already cancelled,
    /// else `Ok(())`. The cooperative analogue of `signal.aborted` that yields a
    /// *typed* error instead of a bool — call it at loop tops in find/grep walks
    /// and framer reads.
    fn error_if_cancelled(&self) -> Result<(), CoreError>;

    /// Run `fut` to completion unless the token cancels first. On cancel, the
    /// in-flight future is dropped (hard teardown of that future only) and a
    /// typed [`CoreError::Cancelled`] is returned — never a panic, never a
    /// swallowed value. This is the `select! { cancelled => Err(..), res => res }`
    /// pattern packaged so call sites cannot accidentally forget the cancel arm.
    fn guard<F, T>(&self, fut: F) -> impl Future<Output = Result<T, CoreError>> + Send
    where
        F: Future<Output = T> + Send;
}

impl CancelExt for CancellationToken {
    fn error_if_cancelled(&self) -> Result<(), CoreError> {
        if self.is_cancelled() {
            Err(CoreError::cancelled())
        } else {
            Ok(())
        }
    }

    async fn guard<F, T>(&self, fut: F) -> Result<T, CoreError>
    where
        F: Future<Output = T> + Send,
    {
        tokio::select! {
            biased;
            () = self.cancelled() => Err(CoreError::cancelled()),
            out = fut => Ok(out),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    #[test]
    fn root_starts_uncancelled() {
        let t = root_token();
        assert!(!t.is_cancelled());
        assert!(t.error_if_cancelled().is_ok());
    }

    #[test]
    fn cancel_down_the_chain_propagates_parent_to_child() {
        let parent = root_token();
        let round = round_token(&parent);
        let child = child_token(&round);

        assert!(!child.is_cancelled());
        parent.cancel();
        // Down-propagation: cancelling the parent cancels the whole chain.
        assert!(parent.is_cancelled());
        assert!(round.is_cancelled());
        assert!(child.is_cancelled());
    }

    #[test]
    fn cancel_does_not_propagate_up() {
        let parent = root_token();
        let round = round_token(&parent);
        let child = child_token(&round);

        child.cancel();
        // Up-propagation must NOT happen: a failing per-call child never tears
        // down the round or the parent.
        assert!(child.is_cancelled());
        assert!(!round.is_cancelled());
        assert!(!parent.is_cancelled());
    }

    #[test]
    fn error_if_cancelled_yields_typed_error_not_panic() {
        let t = root_token();
        t.cancel();
        let err = t.error_if_cancelled().unwrap_err();
        assert!(err.is_cancelled());
        assert_eq!(err.to_string(), "cancelled");
    }

    #[tokio::test]
    async fn guard_returns_ok_when_future_wins() {
        let t = root_token();
        let out = t.guard(async { 7 }).await;
        assert_eq!(out.unwrap(), 7);
    }

    #[tokio::test]
    async fn guard_returns_typed_cancel_when_token_wins() {
        let t = root_token();
        let t2 = t.clone();
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(5)).await;
            t2.cancel();
        });
        // A slow future that observes cancel resolves to a TYPED error, never a
        // panic and never an Err that isn't the cancel sentinel.
        let out: Result<(), CoreError> = t
            .guard(async {
                tokio::time::sleep(Duration::from_secs(3600)).await;
            })
            .await;
        assert!(out.unwrap_err().is_cancelled());
    }
}