Skip to main content

atomr_streams/
lifecycle.rs

1//! Lifecycle operators on `Source<T>`.
2//!
3//! Phase 12.8 of `docs/full-port-plan.md`. Akka.NET / Akka Streams
4//! parity: `WatchTermination`, `Monitor`, `Log`. Each one wraps a
5//! source and surfaces side-channel signals (completion, every
6//! element, log line) without altering the elements themselves.
7
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use futures::stream::StreamExt;
12use tokio::sync::oneshot;
13
14use crate::source::Source;
15
16/// `watch_termination(src)` returns the original source plus a
17/// `oneshot::Receiver<()>` that fires when upstream completes
18/// (whether by exhaustion or by the receiver being polled past the
19/// final element).
20///
21/// Akka.NET: `Source.WatchTermination`.
22pub fn watch_termination<T: Send + 'static>(src: Source<T>) -> (Source<T>, oneshot::Receiver<()>) {
23    let (tx, rx) = oneshot::channel();
24    let inner = src.into_boxed();
25    let mut tx_holder = Some(tx);
26    // `chain` a single synthetic element through a `filter_map` that
27    // (a) drops the synthetic element so downstream sees only real
28    // ones, and (b) fires the `tx` exactly once.
29    let terminator = futures::stream::once(async {}).filter_map(move |()| {
30        if let Some(tx) = tx_holder.take() {
31            let _ = tx.send(());
32        }
33        std::future::ready(None::<T>)
34    });
35    let stream = inner.chain(terminator).boxed();
36    (Source { inner: stream }, rx)
37}
38
39/// `monitor(src, on_each)` — invoke `on_each(&item)` for every
40/// element flowing through, without consuming or transforming it.
41/// Useful for telemetry instrumentation.
42///
43/// Akka.NET: `Source.Monitor`.
44pub fn monitor<T, F>(src: Source<T>, mut on_each: F) -> Source<T>
45where
46    T: Send + 'static,
47    F: FnMut(&T) + Send + 'static,
48{
49    let inner = src.into_boxed();
50    Source { inner: inner.inspect(move |item| on_each(item)).boxed() }
51}
52
53/// `count_elements(src)` — convenience: returns the source unchanged
54/// plus an `Arc<AtomicU64>` that totals every element.
55///
56/// Akka.NET: typically expressed as `monitor(... |_| counter.inc())`.
57pub fn count_elements<T: Send + 'static>(src: Source<T>) -> (Source<T>, Arc<AtomicU64>) {
58    let counter = Arc::new(AtomicU64::new(0));
59    let c2 = counter.clone();
60    (
61        monitor(src, move |_| {
62            c2.fetch_add(1, Ordering::Relaxed);
63        }),
64        counter,
65    )
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71    use crate::sink::Sink;
72    use std::time::Duration;
73
74    #[tokio::test]
75    async fn watch_termination_fires_when_source_exhausts() {
76        let s = Source::from_iter(vec![1, 2, 3]);
77        let (src, term) = watch_termination(s);
78        let collected = Sink::collect(src).await;
79        assert_eq!(collected, vec![1, 2, 3]);
80        tokio::time::timeout(Duration::from_millis(100), term)
81            .await
82            .expect("termination signal not received")
83            .unwrap();
84    }
85
86    #[tokio::test]
87    async fn monitor_observes_every_element() {
88        let s = Source::from_iter(vec![10, 20, 30]);
89        let observed = Arc::new(parking_lot::Mutex::new(Vec::<i32>::new()));
90        let o2 = observed.clone();
91        let m = monitor(s, move |x| o2.lock().push(*x));
92        let collected = Sink::collect(m).await;
93        assert_eq!(collected, vec![10, 20, 30]);
94        assert_eq!(*observed.lock(), vec![10, 20, 30]);
95    }
96
97    #[tokio::test]
98    async fn count_elements_totals_emitted_items() {
99        let s = Source::from_iter(0..100i32);
100        let (src, counter) = count_elements(s);
101        let _ = Sink::collect(src).await;
102        assert_eq!(counter.load(Ordering::Relaxed), 100);
103    }
104}