Skip to main content

atomr_streams/
lifecycle.rs

1//! Lifecycle operators on `Source<T>`.
2//!
3//! Operators: `WatchTermination`, `Monitor`, `Log`. Each one wraps a
4//! source and surfaces side-channel signals (completion, every
5//! element, log line) without altering the elements themselves.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use futures::stream::StreamExt;
11use tokio::sync::oneshot;
12
13use crate::source::Source;
14
15/// `watch_termination(src)` returns the original source plus a
16/// `oneshot::Receiver<()>` that fires when upstream completes
17/// (whether by exhaustion or by the receiver being polled past the
18/// final element).
19///
20pub fn watch_termination<T: Send + 'static>(src: Source<T>) -> (Source<T>, oneshot::Receiver<()>) {
21    let (tx, rx) = oneshot::channel();
22    let inner = src.into_boxed();
23    let mut tx_holder = Some(tx);
24    // `chain` a single synthetic element through a `filter_map` that
25    // (a) drops the synthetic element so downstream sees only real
26    // ones, and (b) fires the `tx` exactly once.
27    let terminator = futures::stream::once(async {}).filter_map(move |()| {
28        if let Some(tx) = tx_holder.take() {
29            let _ = tx.send(());
30        }
31        std::future::ready(None::<T>)
32    });
33    let stream = inner.chain(terminator).boxed();
34    (Source { inner: stream }, rx)
35}
36
37/// `monitor(src, on_each)` — invoke `on_each(&item)` for every
38/// element flowing through, without consuming or transforming it.
39/// Useful for telemetry instrumentation.
40///
41pub fn monitor<T, F>(src: Source<T>, mut on_each: F) -> Source<T>
42where
43    T: Send + 'static,
44    F: FnMut(&T) + Send + 'static,
45{
46    let inner = src.into_boxed();
47    Source { inner: inner.inspect(move |item| on_each(item)).boxed() }
48}
49
50/// `count_elements(src)` — convenience: returns the source unchanged
51/// plus an `Arc<AtomicU64>` that totals every element.
52///
53/// typically expressed as `monitor(. |_| counter.inc())`.
54pub fn count_elements<T: Send + 'static>(src: Source<T>) -> (Source<T>, Arc<AtomicU64>) {
55    let counter = Arc::new(AtomicU64::new(0));
56    let c2 = counter.clone();
57    (
58        monitor(src, move |_| {
59            c2.fetch_add(1, Ordering::Relaxed);
60        }),
61        counter,
62    )
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68    use crate::sink::Sink;
69    use std::time::Duration;
70
71    #[tokio::test]
72    async fn watch_termination_fires_when_source_exhausts() {
73        let s = Source::from_iter(vec![1, 2, 3]);
74        let (src, term) = watch_termination(s);
75        let collected = Sink::collect(src).await;
76        assert_eq!(collected, vec![1, 2, 3]);
77        tokio::time::timeout(Duration::from_millis(100), term)
78            .await
79            .expect("termination signal not received")
80            .unwrap();
81    }
82
83    #[tokio::test]
84    async fn monitor_observes_every_element() {
85        let s = Source::from_iter(vec![10, 20, 30]);
86        let observed = Arc::new(parking_lot::Mutex::new(Vec::<i32>::new()));
87        let o2 = observed.clone();
88        let m = monitor(s, move |x| o2.lock().push(*x));
89        let collected = Sink::collect(m).await;
90        assert_eq!(collected, vec![10, 20, 30]);
91        assert_eq!(*observed.lock(), vec![10, 20, 30]);
92    }
93
94    #[tokio::test]
95    async fn count_elements_totals_emitted_items() {
96        let s = Source::from_iter(0..100i32);
97        let (src, counter) = count_elements(s);
98        let _ = Sink::collect(src).await;
99        assert_eq!(counter.load(Ordering::Relaxed), 100);
100    }
101}