atomr_streams/
lifecycle.rs1use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use futures::stream::StreamExt;
12use tokio::sync::oneshot;
13
14use crate::source::Source;
15
16pub fn watch_termination<T: Send + 'static>(src: Source<T>) -> (Source<T>, oneshot::Receiver<()>) {
23 let (tx, rx) = oneshot::channel();
24 let inner = src.into_boxed();
25 let mut tx_holder = Some(tx);
26 let terminator = futures::stream::once(async {}).filter_map(move |()| {
30 if let Some(tx) = tx_holder.take() {
31 let _ = tx.send(());
32 }
33 std::future::ready(None::<T>)
34 });
35 let stream = inner.chain(terminator).boxed();
36 (Source { inner: stream }, rx)
37}
38
39pub fn monitor<T, F>(src: Source<T>, mut on_each: F) -> Source<T>
45where
46 T: Send + 'static,
47 F: FnMut(&T) + Send + 'static,
48{
49 let inner = src.into_boxed();
50 Source { inner: inner.inspect(move |item| on_each(item)).boxed() }
51}
52
53pub fn count_elements<T: Send + 'static>(src: Source<T>) -> (Source<T>, Arc<AtomicU64>) {
58 let counter = Arc::new(AtomicU64::new(0));
59 let c2 = counter.clone();
60 (
61 monitor(src, move |_| {
62 c2.fetch_add(1, Ordering::Relaxed);
63 }),
64 counter,
65 )
66}
67
68#[cfg(test)]
69mod tests {
70 use super::*;
71 use crate::sink::Sink;
72 use std::time::Duration;
73
74 #[tokio::test]
75 async fn watch_termination_fires_when_source_exhausts() {
76 let s = Source::from_iter(vec![1, 2, 3]);
77 let (src, term) = watch_termination(s);
78 let collected = Sink::collect(src).await;
79 assert_eq!(collected, vec![1, 2, 3]);
80 tokio::time::timeout(Duration::from_millis(100), term)
81 .await
82 .expect("termination signal not received")
83 .unwrap();
84 }
85
86 #[tokio::test]
87 async fn monitor_observes_every_element() {
88 let s = Source::from_iter(vec![10, 20, 30]);
89 let observed = Arc::new(parking_lot::Mutex::new(Vec::<i32>::new()));
90 let o2 = observed.clone();
91 let m = monitor(s, move |x| o2.lock().push(*x));
92 let collected = Sink::collect(m).await;
93 assert_eq!(collected, vec![10, 20, 30]);
94 assert_eq!(*observed.lock(), vec![10, 20, 30]);
95 }
96
97 #[tokio::test]
98 async fn count_elements_totals_emitted_items() {
99 let s = Source::from_iter(0..100i32);
100 let (src, counter) = count_elements(s);
101 let _ = Sink::collect(src).await;
102 assert_eq!(counter.load(Ordering::Relaxed), 100);
103 }
104}