veilid_tools/
deferred_stream_processor.rs

1use futures_util::{
2    future::{select, Either},
3    stream::FuturesUnordered,
4    StreamExt,
5};
6use stop_token::future::FutureExt as _;
7
8use super::*;
9
10#[derive(Debug)]
11struct DeferredStreamProcessorInner {
12    opt_deferred_stream_channel: Option<flume::Sender<SendPinBoxFuture<()>>>,
13    opt_stopper: Option<StopSource>,
14    opt_join_handle: Option<MustJoinHandle<()>>,
15}
16
17/// Background processor for streams
18/// Handles streams to completion, passing each item from the stream to a callback
19#[derive(Debug)]
20pub struct DeferredStreamProcessor {
21    inner: Mutex<DeferredStreamProcessorInner>,
22}
23
24impl DeferredStreamProcessor {
25    /// Create a new DeferredStreamProcessor
26    pub fn new() -> Self {
27        Self {
28            inner: Mutex::new(DeferredStreamProcessorInner {
29                opt_deferred_stream_channel: None,
30                opt_stopper: None,
31                opt_join_handle: None,
32            }),
33        }
34    }
35
36    /// Initialize the processor before use
37    pub async fn init(&self) {
38        let stopper = StopSource::new();
39        let stop_token = stopper.token();
40
41        let mut inner = self.inner.lock();
42        inner.opt_stopper = Some(stopper);
43        let (dsc_tx, dsc_rx) = flume::unbounded::<SendPinBoxFuture<()>>();
44        inner.opt_deferred_stream_channel = Some(dsc_tx);
45        inner.opt_join_handle = Some(spawn(
46            "deferred stream processor",
47            Self::processor(stop_token, dsc_rx),
48        ));
49    }
50
51    /// Terminate the processor and ensure all streams are closed
52    pub async fn terminate(&self) {
53        let opt_jh = {
54            let mut inner = self.inner.lock();
55            drop(inner.opt_deferred_stream_channel.take());
56            drop(inner.opt_stopper.take());
57            inner.opt_join_handle.take()
58        };
59        if let Some(jh) = opt_jh {
60            jh.await;
61        }
62    }
63
64    async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver<SendPinBoxFuture<()>>) {
65        let mut unord = FuturesUnordered::<SendPinBoxFuture<()>>::new();
66
67        // Ensure the unord never finishes
68        unord.push(Box::pin(std::future::pending()));
69
70        // Processor loop
71        let mut unord_fut = unord.next();
72        let mut dsc_fut = dsc_rx.recv_async();
73        while let Ok(res) = select(unord_fut, dsc_fut)
74            .timeout_at(stop_token.clone())
75            .await
76        {
77            match res {
78                Either::Left((x, old_dsc_fut)) => {
79                    // Unord future processor should never get empty
80                    assert!(x.is_some());
81
82                    // Make another unord future to process
83                    unord_fut = unord.next();
84                    // put back the other future and keep going
85                    dsc_fut = old_dsc_fut;
86                }
87                Either::Right((new_proc, old_unord_fut)) => {
88                    // Immediately drop the old unord future
89                    // because we never care about it completing
90                    drop(old_unord_fut);
91                    let Ok(new_proc) = new_proc else {
92                        break;
93                    };
94
95                    // Add a new stream to process
96                    unord.push(new_proc);
97
98                    // Make a new unord future because we don't care about the
99                    // completion of the last unord future, they never return
100                    // anything.
101                    unord_fut = unord.next();
102                    // Make a new receiver future
103                    dsc_fut = dsc_rx.recv_async();
104                }
105            }
106        }
107    }
108
109    /// Queue a stream to process in the background
110    ///
111    /// * 'receiver' is the stream to process
112    /// * 'handler' is the callback to handle each item from the stream
113    ///
114    /// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized.
115    pub fn add<T: Send + 'static, S: futures_util::Stream<Item = T> + Unpin + Send + 'static>(
116        &self,
117        mut receiver: S,
118        mut handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
119    ) -> bool {
120        let (st, dsc_tx) = {
121            let inner = self.inner.lock();
122            let Some(st) = inner.opt_stopper.as_ref().map(|s| s.token()) else {
123                return false;
124            };
125            let Some(dsc_tx) = inner.opt_deferred_stream_channel.clone() else {
126                return false;
127            };
128            (st, dsc_tx)
129        };
130        let drp = Box::pin(async move {
131            while let Ok(Some(res)) = receiver.next().timeout_at(st.clone()).await {
132                if !handler(res).await {
133                    break;
134                }
135            }
136        });
137        if dsc_tx.send(drp).is_err() {
138            return false;
139        }
140        true
141    }
142}
143
144impl Default for DeferredStreamProcessor {
145    fn default() -> Self {
146        Self::new()
147    }
148}