Skip to main content

veilid_tools/
deferred_stream_processor.rs

1use futures_util::{
2    future::{select, Either},
3    stream::FuturesUnordered,
4    StreamExt,
5};
6use stop_token::future::FutureExt as _;
7
8use super::*;
9
10#[derive(Debug)]
11struct DeferredStreamProcessorInner {
12    opt_deferred_stream_channel: Option<flume::Sender<PinBoxFutureStatic<()>>>,
13    opt_stopper: Option<StopSource>,
14    opt_join_handle: Option<MustJoinHandle<()>>,
15}
16
17#[derive(Debug, Clone, Copy, Eq, PartialEq)]
18pub enum DeferredStreamResult {
19    Done,
20    Continue,
21}
22
23/// Background processor for streams
24/// Handles streams to completion, passing each item from the stream to a callback
25#[derive(Debug)]
26pub struct DeferredStreamProcessor {
27    inner: Mutex<DeferredStreamProcessorInner>,
28}
29
30impl DeferredStreamProcessor {
31    /// Create a new DeferredStreamProcessor
32    #[must_use]
33    pub fn new() -> Self {
34        Self {
35            inner: Mutex::new(DeferredStreamProcessorInner {
36                opt_deferred_stream_channel: None,
37                opt_stopper: None,
38                opt_join_handle: None,
39            }),
40        }
41    }
42
43    /// Initialize the processor before use
44    pub fn init(&self) {
45        let stopper = StopSource::new();
46        let stop_token = stopper.token();
47
48        let mut inner = self.inner.lock();
49        inner.opt_stopper = Some(stopper);
50        let (dsc_tx, dsc_rx) = flume::unbounded::<PinBoxFutureStatic<()>>();
51        inner.opt_deferred_stream_channel = Some(dsc_tx);
52        inner.opt_join_handle = Some(spawn(
53            "deferred stream processor",
54            Self::processor(stop_token, dsc_rx),
55        ));
56    }
57
58    /// Terminate the processor and ensure all streams are closed
59    pub async fn terminate(&self) {
60        let opt_jh = {
61            let mut inner = self.inner.lock();
62            drop(inner.opt_deferred_stream_channel.take());
63            drop(inner.opt_stopper.take());
64            inner.opt_join_handle.take()
65        };
66        if let Some(jh) = opt_jh {
67            jh.await;
68        }
69    }
70
71    async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver<PinBoxFutureStatic<()>>) {
72        let mut unord = FuturesUnordered::<PinBoxFutureStatic<()>>::new();
73
74        // Ensure the unord never finishes unless the stop source got dropped
75        unord.push(Box::pin(stop_token));
76
77        // Processor loop
78        let mut unord_fut = unord.next();
79        let mut dsc_fut = dsc_rx.recv_async();
80        loop {
81            let res = select(unord_fut, dsc_fut).await;
82            match res {
83                Either::Left((x, old_dsc_fut)) => {
84                    // If the unord future gets empty, the stop token got dropped and all the other tasks finished
85                    if x.is_none() {
86                        break;
87                    }
88
89                    // Make another unord future to process
90                    unord_fut = unord.next();
91                    // put back the other future and keep going
92                    dsc_fut = old_dsc_fut;
93                }
94                Either::Right((new_proc, old_unord_fut)) => {
95                    // Immediately drop the old unord future
96                    // because we never care about it completing
97                    drop(old_unord_fut);
98                    let Ok(new_proc) = new_proc else {
99                        break;
100                    };
101
102                    // Add a new stream to process
103                    unord.push(new_proc);
104
105                    // Make a new unord future because we don't care about the
106                    // completion of the last unord future, they never return
107                    // anything.
108                    unord_fut = unord.next();
109                    // Make a new receiver future
110                    dsc_fut = dsc_rx.recv_async();
111                }
112            }
113        }
114    }
115
116    /// Queue a stream to process in the background
117    ///
118    /// * 'receiver' is the stream to process
119    /// * 'handler' is the callback to handle each item from the stream
120    ///
121    /// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized.
122    pub fn add_stream<
123        T: Send + 'static,
124        S: futures_util::Stream<Item = T> + Unpin + Send + 'static,
125    >(
126        &self,
127        mut receiver: S,
128        mut handler: impl FnMut(T) -> PinBoxFutureStatic<DeferredStreamResult> + Send + 'static,
129    ) -> bool {
130        let (st, dsc_tx) = {
131            let inner = self.inner.lock();
132            let Some(st) = inner.opt_stopper.as_ref().map(|s| s.token()) else {
133                return false;
134            };
135            let Some(dsc_tx) = inner.opt_deferred_stream_channel.clone() else {
136                return false;
137            };
138            (st, dsc_tx)
139        };
140        let drp = Box::pin(async move {
141            while let Ok(Some(res)) = receiver.next().timeout_at(st.clone()).await {
142                if matches!(handler(res).await, DeferredStreamResult::Done) {
143                    break;
144                }
145            }
146        });
147        if dsc_tx.send(drp).is_err() {
148            return false;
149        }
150        true
151    }
152
153    /// Queue a single future to process in the background
154    pub fn add_future<F>(&self, fut: F) -> bool
155    where
156        F: Future<Output = ()> + Send + 'static,
157    {
158        let dsc_tx = {
159            let inner = self.inner.lock();
160            let Some(dsc_tx) = inner.opt_deferred_stream_channel.clone() else {
161                return false;
162            };
163            dsc_tx
164        };
165        if dsc_tx.send(Box::pin(fut)).is_err() {
166            return false;
167        }
168        true
169    }
170}
171
172impl Default for DeferredStreamProcessor {
173    fn default() -> Self {
174        Self::new()
175    }
176}