atomr_streams/
lifecycle.rs1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use futures::stream::StreamExt;
11use tokio::sync::oneshot;
12
13use crate::source::Source;
14
15pub fn watch_termination<T: Send + 'static>(src: Source<T>) -> (Source<T>, oneshot::Receiver<()>) {
21 let (tx, rx) = oneshot::channel();
22 let inner = src.into_boxed();
23 let mut tx_holder = Some(tx);
24 let terminator = futures::stream::once(async {}).filter_map(move |()| {
28 if let Some(tx) = tx_holder.take() {
29 let _ = tx.send(());
30 }
31 std::future::ready(None::<T>)
32 });
33 let stream = inner.chain(terminator).boxed();
34 (Source { inner: stream }, rx)
35}
36
37pub fn monitor<T, F>(src: Source<T>, mut on_each: F) -> Source<T>
42where
43 T: Send + 'static,
44 F: FnMut(&T) + Send + 'static,
45{
46 let inner = src.into_boxed();
47 Source { inner: inner.inspect(move |item| on_each(item)).boxed() }
48}
49
50pub fn count_elements<T: Send + 'static>(src: Source<T>) -> (Source<T>, Arc<AtomicU64>) {
55 let counter = Arc::new(AtomicU64::new(0));
56 let c2 = counter.clone();
57 (
58 monitor(src, move |_| {
59 c2.fetch_add(1, Ordering::Relaxed);
60 }),
61 counter,
62 )
63}
64
65#[cfg(test)]
66mod tests {
67 use super::*;
68 use crate::sink::Sink;
69 use std::time::Duration;
70
71 #[tokio::test]
72 async fn watch_termination_fires_when_source_exhausts() {
73 let s = Source::from_iter(vec![1, 2, 3]);
74 let (src, term) = watch_termination(s);
75 let collected = Sink::collect(src).await;
76 assert_eq!(collected, vec![1, 2, 3]);
77 tokio::time::timeout(Duration::from_millis(100), term)
78 .await
79 .expect("termination signal not received")
80 .unwrap();
81 }
82
83 #[tokio::test]
84 async fn monitor_observes_every_element() {
85 let s = Source::from_iter(vec![10, 20, 30]);
86 let observed = Arc::new(parking_lot::Mutex::new(Vec::<i32>::new()));
87 let o2 = observed.clone();
88 let m = monitor(s, move |x| o2.lock().push(*x));
89 let collected = Sink::collect(m).await;
90 assert_eq!(collected, vec![10, 20, 30]);
91 assert_eq!(*observed.lock(), vec![10, 20, 30]);
92 }
93
94 #[tokio::test]
95 async fn count_elements_totals_emitted_items() {
96 let s = Source::from_iter(0..100i32);
97 let (src, counter) = count_elements(s);
98 let _ = Sink::collect(src).await;
99 assert_eq!(counter.load(Ordering::Relaxed), 100);
100 }
101}