veilid_tools/
deferred_stream_processor.rs

1use futures_util::{
2    future::{select, Either},
3    stream::FuturesUnordered,
4    StreamExt,
5};
6use stop_token::future::FutureExt as _;
7
8use super::*;
9
10#[derive(Debug)]
11struct DeferredStreamProcessorInner {
12    opt_deferred_stream_channel: Option<flume::Sender<PinBoxFutureStatic<()>>>,
13    opt_stopper: Option<StopSource>,
14    opt_join_handle: Option<MustJoinHandle<()>>,
15}
16
17/// Background processor for streams
18/// Handles streams to completion, passing each item from the stream to a callback
19#[derive(Debug)]
20pub struct DeferredStreamProcessor {
21    inner: Mutex<DeferredStreamProcessorInner>,
22}
23
24impl DeferredStreamProcessor {
25    /// Create a new DeferredStreamProcessor
26    #[must_use]
27    pub fn new() -> Self {
28        Self {
29            inner: Mutex::new(DeferredStreamProcessorInner {
30                opt_deferred_stream_channel: None,
31                opt_stopper: None,
32                opt_join_handle: None,
33            }),
34        }
35    }
36
37    /// Initialize the processor before use
38    pub fn init(&self) {
39        let stopper = StopSource::new();
40        let stop_token = stopper.token();
41
42        let mut inner = self.inner.lock();
43        inner.opt_stopper = Some(stopper);
44        let (dsc_tx, dsc_rx) = flume::unbounded::<PinBoxFutureStatic<()>>();
45        inner.opt_deferred_stream_channel = Some(dsc_tx);
46        inner.opt_join_handle = Some(spawn(
47            "deferred stream processor",
48            Self::processor(stop_token, dsc_rx),
49        ));
50    }
51
52    /// Terminate the processor and ensure all streams are closed
53    pub async fn terminate(&self) {
54        let opt_jh = {
55            let mut inner = self.inner.lock();
56            drop(inner.opt_deferred_stream_channel.take());
57            drop(inner.opt_stopper.take());
58            inner.opt_join_handle.take()
59        };
60        if let Some(jh) = opt_jh {
61            jh.await;
62        }
63    }
64
65    async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver<PinBoxFutureStatic<()>>) {
66        let mut unord = FuturesUnordered::<PinBoxFutureStatic<()>>::new();
67
68        // Ensure the unord never finishes
69        unord.push(Box::pin(std::future::pending()));
70
71        // Processor loop
72        let mut unord_fut = unord.next();
73        let mut dsc_fut = dsc_rx.recv_async();
74        while let Ok(res) = select(unord_fut, dsc_fut)
75            .timeout_at(stop_token.clone())
76            .await
77        {
78            match res {
79                Either::Left((x, old_dsc_fut)) => {
80                    // Unord future processor should never get empty
81                    assert!(x.is_some());
82
83                    // Make another unord future to process
84                    unord_fut = unord.next();
85                    // put back the other future and keep going
86                    dsc_fut = old_dsc_fut;
87                }
88                Either::Right((new_proc, old_unord_fut)) => {
89                    // Immediately drop the old unord future
90                    // because we never care about it completing
91                    drop(old_unord_fut);
92                    let Ok(new_proc) = new_proc else {
93                        break;
94                    };
95
96                    // Add a new stream to process
97                    unord.push(new_proc);
98
99                    // Make a new unord future because we don't care about the
100                    // completion of the last unord future, they never return
101                    // anything.
102                    unord_fut = unord.next();
103                    // Make a new receiver future
104                    dsc_fut = dsc_rx.recv_async();
105                }
106            }
107        }
108    }
109
110    /// Queue a stream to process in the background
111    ///
112    /// * 'receiver' is the stream to process
113    /// * 'handler' is the callback to handle each item from the stream
114    ///
115    /// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized.
116    pub fn add<T: Send + 'static, S: futures_util::Stream<Item = T> + Unpin + Send + 'static>(
117        &self,
118        mut receiver: S,
119        mut handler: impl FnMut(T) -> PinBoxFutureStatic<bool> + Send + 'static,
120    ) -> bool {
121        let (st, dsc_tx) = {
122            let inner = self.inner.lock();
123            let Some(st) = inner.opt_stopper.as_ref().map(|s| s.token()) else {
124                return false;
125            };
126            let Some(dsc_tx) = inner.opt_deferred_stream_channel.clone() else {
127                return false;
128            };
129            (st, dsc_tx)
130        };
131        let drp = Box::pin(async move {
132            while let Ok(Some(res)) = receiver.next().timeout_at(st.clone()).await {
133                if !handler(res).await {
134                    break;
135                }
136            }
137        });
138        if dsc_tx.send(drp).is_err() {
139            return false;
140        }
141        true
142    }
143}
144
145impl Default for DeferredStreamProcessor {
146    fn default() -> Self {
147        Self::new()
148    }
149}