indusagi_core/channel.rs
1//! The re-iterable [`Channel`] — a stream *factory*, not a stream.
2//!
3//! Ports the gateway's `channelOf` (`llmgateway/streaming/channel.ts`). The
4//! critical contract: a `Channel` wraps a generator **factory** so that each
5//! iteration re-issues the underlying request. `channelOf(make)` returns an
6//! object whose `[Symbol.asyncIterator]` calls `make()` *afresh*; iterating the
7//! same channel twice fires two requests. This is what makes retries work
8//! (collect, fail, re-iterate) without the caller rebuilding the pipeline.
9//!
10//! In Rust we store the factory closure behind an `Arc` and re-run it on every
11//! [`Channel::iter`] call. The closure is `Fn` (not `FnOnce`), so it must clone
12//! whatever per-iteration state it needs (a fresh `Conversation`/`StreamOptions`
13//! per request). Re-iteration is rare, so the clone cost is acceptable.
14//!
15//! This module is generic over the item type so it can carry gateway
16//! `Emission`s, runtime `Signal`s, or anything else; the gateway crate
17//! specializes it to `Channel<Emission>` and adds `collect_reply`.
18
19use std::pin::Pin;
20use std::sync::Arc;
21
22use futures::Stream;
23
24/// A boxed, `Send` stream of `T` — the concrete output of a [`Channel`]'s
25/// factory. Mirrors the gateway's `EmissionStream` but generic.
26pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
27
28/// A re-iterable channel: a cloneable handle around a stream factory. Each call
29/// to [`Channel::iter`] re-runs the factory, producing a brand-new stream — so
30/// iterating the channel twice issues the underlying work twice.
31#[derive(Clone)]
32pub struct Channel<T> {
33 factory: Arc<dyn Fn() -> BoxStream<T> + Send + Sync>,
34}
35
36impl<T> Channel<T> {
37 /// Build a channel from a stream `factory`. The factory is invoked once per
38 /// [`Channel::iter`] call; it must be re-runnable (`Fn`, not `FnOnce`) and
39 /// should clone any per-iteration inputs it captures.
40 pub fn of<F>(factory: F) -> Self
41 where
42 F: Fn() -> BoxStream<T> + Send + Sync + 'static,
43 {
44 Self {
45 factory: Arc::new(factory),
46 }
47 }
48
49 /// Produce a fresh stream by **re-running the factory**. Calling this twice
50 /// runs the factory twice — the re-iterable contract.
51 pub fn iter(&self) -> BoxStream<T> {
52 (self.factory)()
53 }
54}
55
56/// Build a channel from a `Vec` factory — a scripted/test convenience that
57/// re-emits the same sequence on each iteration. The `make` closure produces the
58/// items afresh per iteration so callers can observe the re-run (e.g. by
59/// incrementing a counter inside `make`).
60pub fn channel_of_vec<T, F>(make: F) -> Channel<T>
61where
62 T: Send + 'static,
63 F: Fn() -> Vec<T> + Send + Sync + 'static,
64{
65 Channel::of(move || Box::pin(futures::stream::iter(make())) as BoxStream<T>)
66}
67
68#[cfg(test)]
69mod tests {
70 use super::*;
71 use futures::StreamExt;
72 use std::sync::atomic::{AtomicUsize, Ordering};
73
74 // --- SPIKE 1: re-iterable Channel ----------------------------------------
75
76 #[tokio::test]
77 async fn iterating_twice_reruns_the_factory_and_yields_full_sequence_each_time() {
78 // A counter the factory increments on every invocation lets us prove the
79 // factory RE-RAN (rather than replaying a cached stream).
80 let runs = Arc::new(AtomicUsize::new(0));
81 let runs_for_factory = runs.clone();
82
83 let channel: Channel<i32> = channel_of_vec(move || {
84 runs_for_factory.fetch_add(1, Ordering::SeqCst);
85 vec![1, 2, 3]
86 });
87
88 // First iteration.
89 let first: Vec<i32> = channel.iter().collect().await;
90 assert_eq!(first, vec![1, 2, 3]);
91 assert_eq!(
92 runs.load(Ordering::SeqCst),
93 1,
94 "factory ran once after first iter"
95 );
96
97 // Second iteration of the SAME channel: factory re-runs, full sequence again.
98 let second: Vec<i32> = channel.iter().collect().await;
99 assert_eq!(second, vec![1, 2, 3]);
100 assert_eq!(
101 runs.load(Ordering::SeqCst),
102 2,
103 "factory RE-RAN on the second iteration (re-iterable contract)"
104 );
105 }
106
107 #[tokio::test]
108 async fn channel_is_cloneable_and_clones_share_the_same_factory() {
109 let runs = Arc::new(AtomicUsize::new(0));
110 let r = runs.clone();
111 let channel: Channel<u8> = channel_of_vec(move || {
112 r.fetch_add(1, Ordering::SeqCst);
113 vec![7]
114 });
115 let clone = channel.clone();
116
117 let a: Vec<u8> = channel.iter().collect().await;
118 let b: Vec<u8> = clone.iter().collect().await;
119 assert_eq!(a, vec![7]);
120 assert_eq!(b, vec![7]);
121 // Two iterations across the original + clone => factory ran twice.
122 assert_eq!(runs.load(Ordering::SeqCst), 2);
123 }
124
125 #[tokio::test]
126 async fn factory_closure_can_capture_per_iteration_state() {
127 // Each iteration produces a sequence derived from the run index, proving
128 // the factory observes fresh state per call.
129 let counter = Arc::new(AtomicUsize::new(0));
130 let c = counter.clone();
131 let channel: Channel<usize> = channel_of_vec(move || {
132 let n = c.fetch_add(1, Ordering::SeqCst);
133 vec![n, n + 1]
134 });
135
136 let first: Vec<usize> = channel.iter().collect().await;
137 let second: Vec<usize> = channel.iter().collect().await;
138 assert_eq!(first, vec![0, 1]);
139 assert_eq!(second, vec![1, 2]);
140 }
141}