indusagi-core 0.1.0

Cross-cutting primitives every indusagi crate depends on: cancellation, env registry, brand, locator, canonical-JSON, version, ids, errors, re-iterable channel.
Documentation
//! The re-iterable [`Channel`] — a stream *factory*, not a stream.
//!
//! Ports the gateway's `channelOf` (`llmgateway/streaming/channel.ts`). The
//! critical contract: a `Channel` wraps a generator **factory** so that each
//! iteration re-issues the underlying request. `channelOf(make)` returns an
//! object whose `[Symbol.asyncIterator]` calls `make()` *afresh*; iterating the
//! same channel twice fires two requests. This is what makes retries work
//! (collect, fail, re-iterate) without the caller rebuilding the pipeline.
//!
//! In Rust we store the factory closure behind an `Arc` and re-run it on every
//! [`Channel::iter`] call. The closure is `Fn` (not `FnOnce`), so it must clone
//! whatever per-iteration state it needs (a fresh `Conversation`/`StreamOptions`
//! per request). Re-iteration is rare, so the clone cost is acceptable.
//!
//! This module is generic over the item type so it can carry gateway
//! `Emission`s, runtime `Signal`s, or anything else; the gateway crate
//! specializes it to `Channel<Emission>` and adds `collect_reply`.

use std::pin::Pin;
use std::sync::Arc;

use futures::Stream;

/// A boxed, `Send` stream of `T` — the concrete output of a [`Channel`]'s
/// factory. Mirrors the gateway's `EmissionStream` but generic.
pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;

/// A re-iterable channel: a cloneable handle around a stream factory. Each call
/// to [`Channel::iter`] re-runs the factory, producing a brand-new stream — so
/// iterating the channel twice issues the underlying work twice.
#[derive(Clone)]
pub struct Channel<T> {
    factory: Arc<dyn Fn() -> BoxStream<T> + Send + Sync>,
}

impl<T> Channel<T> {
    /// Build a channel from a stream `factory`. The factory is invoked once per
    /// [`Channel::iter`] call; it must be re-runnable (`Fn`, not `FnOnce`) and
    /// should clone any per-iteration inputs it captures.
    pub fn of<F>(factory: F) -> Self
    where
        F: Fn() -> BoxStream<T> + Send + Sync + 'static,
    {
        Self {
            factory: Arc::new(factory),
        }
    }

    /// Produce a fresh stream by **re-running the factory**. Calling this twice
    /// runs the factory twice — the re-iterable contract.
    pub fn iter(&self) -> BoxStream<T> {
        (self.factory)()
    }
}

/// Build a channel from a `Vec` factory — a scripted/test convenience that
/// re-emits the same sequence on each iteration. The `make` closure produces the
/// items afresh per iteration so callers can observe the re-run (e.g. by
/// incrementing a counter inside `make`).
pub fn channel_of_vec<T, F>(make: F) -> Channel<T>
where
    T: Send + 'static,
    F: Fn() -> Vec<T> + Send + Sync + 'static,
{
    Channel::of(move || Box::pin(futures::stream::iter(make())) as BoxStream<T>)
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::StreamExt;
    use std::sync::atomic::{AtomicUsize, Ordering};

    // --- SPIKE 1: re-iterable Channel ----------------------------------------

    #[tokio::test]
    async fn iterating_twice_reruns_the_factory_and_yields_full_sequence_each_time() {
        // A counter the factory increments on every invocation lets us prove the
        // factory RE-RAN (rather than replaying a cached stream).
        let runs = Arc::new(AtomicUsize::new(0));
        let runs_for_factory = runs.clone();

        let channel: Channel<i32> = channel_of_vec(move || {
            runs_for_factory.fetch_add(1, Ordering::SeqCst);
            vec![1, 2, 3]
        });

        // First iteration.
        let first: Vec<i32> = channel.iter().collect().await;
        assert_eq!(first, vec![1, 2, 3]);
        assert_eq!(
            runs.load(Ordering::SeqCst),
            1,
            "factory ran once after first iter"
        );

        // Second iteration of the SAME channel: factory re-runs, full sequence again.
        let second: Vec<i32> = channel.iter().collect().await;
        assert_eq!(second, vec![1, 2, 3]);
        assert_eq!(
            runs.load(Ordering::SeqCst),
            2,
            "factory RE-RAN on the second iteration (re-iterable contract)"
        );
    }

    #[tokio::test]
    async fn channel_is_cloneable_and_clones_share_the_same_factory() {
        let runs = Arc::new(AtomicUsize::new(0));
        let r = runs.clone();
        let channel: Channel<u8> = channel_of_vec(move || {
            r.fetch_add(1, Ordering::SeqCst);
            vec![7]
        });
        let clone = channel.clone();

        let a: Vec<u8> = channel.iter().collect().await;
        let b: Vec<u8> = clone.iter().collect().await;
        assert_eq!(a, vec![7]);
        assert_eq!(b, vec![7]);
        // Two iterations across the original + clone => factory ran twice.
        assert_eq!(runs.load(Ordering::SeqCst), 2);
    }

    #[tokio::test]
    async fn factory_closure_can_capture_per_iteration_state() {
        // Each iteration produces a sequence derived from the run index, proving
        // the factory observes fresh state per call.
        let counter = Arc::new(AtomicUsize::new(0));
        let c = counter.clone();
        let channel: Channel<usize> = channel_of_vec(move || {
            let n = c.fetch_add(1, Ordering::SeqCst);
            vec![n, n + 1]
        });

        let first: Vec<usize> = channel.iter().collect().await;
        let second: Vec<usize> = channel.iter().collect().await;
        assert_eq!(first, vec![0, 1]);
        assert_eq!(second, vec![1, 2]);
    }
}