hojicha_runtime/stream_builders.rs
1//! Stream builder utilities for common async patterns
2//!
3//! This module provides helper functions for creating common types of streams
4//! that can be used with the `Program::subscribe()` method.
5
6use futures::stream::{Stream, StreamExt};
7use std::time::Duration;
8
9/// Create a stream that emits at regular intervals
10///
11/// This is useful for creating timers, periodic updates, or any task that needs
12/// to run at regular intervals.
13///
14/// # Example
15/// ```
16/// use hojicha_runtime::stream_builders::interval_stream;
17/// use std::time::Duration;
18/// use futures::StreamExt;
19///
20/// # tokio_test::block_on(async {
21/// let mut stream = interval_stream(Duration::from_millis(10));
22/// let first = stream.next().await;
23/// assert_eq!(first, Some(()));
24/// # });
25/// ```
26pub fn interval_stream(duration: Duration) -> impl Stream<Item = ()> {
27 use tokio_stream::wrappers::IntervalStream;
28
29 let interval = tokio::time::interval(duration);
30 IntervalStream::new(interval).map(|_| ())
31}
32
33/// Create a stream from a channel receiver
34///
35/// This allows you to bridge between channel-based APIs and the stream-based
36/// subscription system.
37///
38/// # Example
39/// ```
40/// use hojicha_runtime::stream_builders::channel_stream;
41/// use futures::StreamExt;
42///
43/// # tokio_test::block_on(async {
44/// let (tx, rx) = tokio::sync::mpsc::channel(1);
45/// let mut stream = channel_stream(rx);
46///
47/// tx.send(42).await.unwrap();
48/// let value = stream.next().await;
49/// assert_eq!(value, Some(42));
50/// # });
51/// ```
52pub fn channel_stream<T>(rx: tokio::sync::mpsc::Receiver<T>) -> impl Stream<Item = T> + Send + Unpin
53where
54 T: Send + 'static,
55{
56 Box::pin(futures::stream::unfold(rx, |mut rx| async move {
57 rx.recv().await.map(|item| (item, rx))
58 }))
59}
60
61/// Create a stream from an unbounded channel receiver
62///
63/// Similar to `channel_stream` but for unbounded channels.
64///
65/// # Example
66/// ```
67/// use hojicha_runtime::stream_builders::unbounded_channel_stream;
68/// use futures::StreamExt;
69///
70/// # tokio_test::block_on(async {
71/// let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
72/// let mut stream = unbounded_channel_stream(rx);
73///
74/// tx.send(42).unwrap();
75/// let value = stream.next().await;
76/// assert_eq!(value, Some(42));
77/// # });
78/// ```
79pub fn unbounded_channel_stream<T>(
80 rx: tokio::sync::mpsc::UnboundedReceiver<T>,
81) -> impl Stream<Item = T> + Send + Unpin
82where
83 T: Send + 'static,
84{
85 Box::pin(futures::stream::unfold(rx, |mut rx| async move {
86 rx.recv().await.map(|item| (item, rx))
87 }))
88}
89
90/// Create a stream that emits once after a delay
91///
92/// This is useful for delayed actions or one-time timeouts.
93///
94/// # Example
95/// ```
96/// use hojicha_runtime::stream_builders::delayed_stream;
97/// use std::time::Duration;
98/// use futures::StreamExt;
99///
100/// # tokio_test::block_on(async {
101/// let mut stream = delayed_stream(Duration::from_millis(10));
102/// let result = stream.next().await;
103/// assert_eq!(result, Some(()));
104/// # });
105/// ```
106pub fn delayed_stream(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
107 Box::pin(futures::stream::once(async move {
108 tokio::time::sleep(duration).await;
109 }))
110}
111
112/// Create a stream that completes after a delay (alias for delayed_stream)
113///
114/// This is useful for timeouts or delayed actions.
115///
116/// # Example
117/// ```
118/// use hojicha_runtime::stream_builders::timeout_stream;
119/// use std::time::Duration;
120/// use futures::StreamExt;
121///
122/// # tokio_test::block_on(async {
123/// let mut stream = timeout_stream(Duration::from_millis(10));
124/// let result = stream.next().await;
125/// assert_eq!(result, Some(()));
126/// # });
127/// ```
128pub fn timeout_stream(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
129 delayed_stream(duration)
130}
131
132/// Create a stream that merges multiple streams
133///
134/// This allows you to combine multiple event sources into a single stream.
135///
136/// # Example
137/// ```
138/// use hojicha_runtime::stream_builders::{interval_stream, merge_streams};
139/// use std::time::Duration;
140/// use futures::StreamExt;
141///
142/// # tokio_test::block_on(async {
143/// let stream1 = interval_stream(Duration::from_millis(10)).take(2).map(|_| "fast");
144/// let stream2 = interval_stream(Duration::from_millis(15)).take(1).map(|_| "slow");
145///
146/// let merged = merge_streams(vec![
147/// Box::new(stream1) as Box<dyn futures::Stream<Item = &str> + Send + Unpin>,
148/// Box::new(stream2) as Box<dyn futures::Stream<Item = &str> + Send + Unpin>,
149/// ]);
150/// let results: Vec<_> = merged.collect().await;
151/// assert_eq!(results.len(), 3);
152/// # });
153/// ```
154pub fn merge_streams<T>(
155 streams: Vec<Box<dyn Stream<Item = T> + Send + Unpin>>,
156) -> impl Stream<Item = T>
157where
158 T: Send + 'static,
159{
160 use futures::stream::SelectAll;
161
162 let mut select_all = SelectAll::new();
163 for stream in streams {
164 select_all.push(stream);
165 }
166 select_all
167}
168
169/// Create a stream from a watch channel
170///
171/// This creates a stream that emits whenever the watched value changes.
172///
173/// # Example
174/// ```
175/// use hojicha_runtime::stream_builders::watch_stream;
176/// use std::time::Duration;
177/// use futures::StreamExt;
178///
179/// # tokio_test::block_on(async {
180/// let (tx, rx) = tokio::sync::watch::channel(0);
181///
182/// // Update the watched value
183/// tokio::spawn(async move {
184/// tokio::time::sleep(Duration::from_millis(5)).await;
185/// tx.send(42).ok();
186/// });
187///
188/// let mut stream = watch_stream(rx).take(1);
189/// let value = stream.next().await;
190/// assert_eq!(value, Some(42));
191/// # });
192/// ```
193pub fn watch_stream<T>(rx: tokio::sync::watch::Receiver<T>) -> impl Stream<Item = T> + Send + Unpin
194where
195 T: Clone + Send + Sync + 'static,
196{
197 Box::pin(futures::stream::unfold(rx, |mut rx| async move {
198 rx.changed().await.ok()?;
199 let value = rx.borrow().clone();
200 Some((value, rx))
201 }))
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use futures::StreamExt;
208 use std::time::Duration;
209
210 #[tokio::test]
211 async fn test_interval_stream() {
212 let mut stream = interval_stream(Duration::from_millis(10)).take(3);
213
214 let mut count = 0;
215 while stream.next().await.is_some() {
216 count += 1;
217 }
218
219 assert_eq!(count, 3);
220 }
221
222 #[tokio::test]
223 async fn test_timeout_stream() {
224 let mut stream = timeout_stream(Duration::from_millis(10));
225
226 let result = stream.next().await;
227 assert!(result.is_some());
228
229 let result = stream.next().await;
230 assert!(result.is_none()); // Stream should be done
231 }
232
233 #[tokio::test]
234 async fn test_channel_stream() {
235 let (tx, rx) = tokio::sync::mpsc::channel(10);
236
237 tx.send(1).await.unwrap();
238 tx.send(2).await.unwrap();
239 tx.send(3).await.unwrap();
240 drop(tx);
241
242 let mut stream = channel_stream(rx);
243
244 assert_eq!(stream.next().await, Some(1));
245 assert_eq!(stream.next().await, Some(2));
246 assert_eq!(stream.next().await, Some(3));
247 assert_eq!(stream.next().await, None);
248 }
249}