Skip to main content

indusagi_core/
channel.rs

1//! The re-iterable [`Channel`] — a stream *factory*, not a stream.
2//!
3//! Ports the gateway's `channelOf` (`llmgateway/streaming/channel.ts`). The
4//! critical contract: a `Channel` wraps a generator **factory** so that each
5//! iteration re-issues the underlying request. `channelOf(make)` returns an
6//! object whose `[Symbol.asyncIterator]` calls `make()` *afresh*; iterating the
7//! same channel twice fires two requests. This is what makes retries work
8//! (collect, fail, re-iterate) without the caller rebuilding the pipeline.
9//!
10//! In Rust we store the factory closure behind an `Arc` and re-run it on every
11//! [`Channel::iter`] call. The closure is `Fn` (not `FnOnce`), so it must clone
12//! whatever per-iteration state it needs (a fresh `Conversation`/`StreamOptions`
13//! per request). Re-iteration is rare, so the clone cost is acceptable.
14//!
15//! This module is generic over the item type so it can carry gateway
16//! `Emission`s, runtime `Signal`s, or anything else; the gateway crate
17//! specializes it to `Channel<Emission>` and adds `collect_reply`.
18
19use std::pin::Pin;
20use std::sync::Arc;
21
22use futures::Stream;
23
24/// A boxed, `Send` stream of `T` — the concrete output of a [`Channel`]'s
25/// factory. Mirrors the gateway's `EmissionStream` but generic.
26pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
27
28/// A re-iterable channel: a cloneable handle around a stream factory. Each call
29/// to [`Channel::iter`] re-runs the factory, producing a brand-new stream — so
30/// iterating the channel twice issues the underlying work twice.
31#[derive(Clone)]
32pub struct Channel<T> {
33    factory: Arc<dyn Fn() -> BoxStream<T> + Send + Sync>,
34}
35
36impl<T> Channel<T> {
37    /// Build a channel from a stream `factory`. The factory is invoked once per
38    /// [`Channel::iter`] call; it must be re-runnable (`Fn`, not `FnOnce`) and
39    /// should clone any per-iteration inputs it captures.
40    pub fn of<F>(factory: F) -> Self
41    where
42        F: Fn() -> BoxStream<T> + Send + Sync + 'static,
43    {
44        Self {
45            factory: Arc::new(factory),
46        }
47    }
48
49    /// Produce a fresh stream by **re-running the factory**. Calling this twice
50    /// runs the factory twice — the re-iterable contract.
51    pub fn iter(&self) -> BoxStream<T> {
52        (self.factory)()
53    }
54}
55
56/// Build a channel from a `Vec` factory — a scripted/test convenience that
57/// re-emits the same sequence on each iteration. The `make` closure produces the
58/// items afresh per iteration so callers can observe the re-run (e.g. by
59/// incrementing a counter inside `make`).
60pub fn channel_of_vec<T, F>(make: F) -> Channel<T>
61where
62    T: Send + 'static,
63    F: Fn() -> Vec<T> + Send + Sync + 'static,
64{
65    Channel::of(move || Box::pin(futures::stream::iter(make())) as BoxStream<T>)
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71    use futures::StreamExt;
72    use std::sync::atomic::{AtomicUsize, Ordering};
73
74    // --- SPIKE 1: re-iterable Channel ----------------------------------------
75
76    #[tokio::test]
77    async fn iterating_twice_reruns_the_factory_and_yields_full_sequence_each_time() {
78        // A counter the factory increments on every invocation lets us prove the
79        // factory RE-RAN (rather than replaying a cached stream).
80        let runs = Arc::new(AtomicUsize::new(0));
81        let runs_for_factory = runs.clone();
82
83        let channel: Channel<i32> = channel_of_vec(move || {
84            runs_for_factory.fetch_add(1, Ordering::SeqCst);
85            vec![1, 2, 3]
86        });
87
88        // First iteration.
89        let first: Vec<i32> = channel.iter().collect().await;
90        assert_eq!(first, vec![1, 2, 3]);
91        assert_eq!(
92            runs.load(Ordering::SeqCst),
93            1,
94            "factory ran once after first iter"
95        );
96
97        // Second iteration of the SAME channel: factory re-runs, full sequence again.
98        let second: Vec<i32> = channel.iter().collect().await;
99        assert_eq!(second, vec![1, 2, 3]);
100        assert_eq!(
101            runs.load(Ordering::SeqCst),
102            2,
103            "factory RE-RAN on the second iteration (re-iterable contract)"
104        );
105    }
106
107    #[tokio::test]
108    async fn channel_is_cloneable_and_clones_share_the_same_factory() {
109        let runs = Arc::new(AtomicUsize::new(0));
110        let r = runs.clone();
111        let channel: Channel<u8> = channel_of_vec(move || {
112            r.fetch_add(1, Ordering::SeqCst);
113            vec![7]
114        });
115        let clone = channel.clone();
116
117        let a: Vec<u8> = channel.iter().collect().await;
118        let b: Vec<u8> = clone.iter().collect().await;
119        assert_eq!(a, vec![7]);
120        assert_eq!(b, vec![7]);
121        // Two iterations across the original + clone => factory ran twice.
122        assert_eq!(runs.load(Ordering::SeqCst), 2);
123    }
124
125    #[tokio::test]
126    async fn factory_closure_can_capture_per_iteration_state() {
127        // Each iteration produces a sequence derived from the run index, proving
128        // the factory observes fresh state per call.
129        let counter = Arc::new(AtomicUsize::new(0));
130        let c = counter.clone();
131        let channel: Channel<usize> = channel_of_vec(move || {
132            let n = c.fetch_add(1, Ordering::SeqCst);
133            vec![n, n + 1]
134        });
135
136        let first: Vec<usize> = channel.iter().collect().await;
137        let second: Vec<usize> = channel.iter().collect().await;
138        assert_eq!(first, vec![0, 1]);
139        assert_eq!(second, vec![1, 2]);
140    }
141}