veilid_tools/
deferred_stream_processor.rs1use futures_util::{
2 future::{select, Either},
3 stream::FuturesUnordered,
4 StreamExt,
5};
6use stop_token::future::FutureExt as _;
7
8use super::*;
9
10#[derive(Debug)]
11struct DeferredStreamProcessorInner {
12 opt_deferred_stream_channel: Option<flume::Sender<PinBoxFutureStatic<()>>>,
13 opt_stopper: Option<StopSource>,
14 opt_join_handle: Option<MustJoinHandle<()>>,
15}
16
17#[derive(Debug, Clone, Copy, Eq, PartialEq)]
18pub enum DeferredStreamResult {
19 Done,
20 Continue,
21}
22
23#[derive(Debug)]
26pub struct DeferredStreamProcessor {
27 inner: Mutex<DeferredStreamProcessorInner>,
28}
29
30impl DeferredStreamProcessor {
31 #[must_use]
33 pub fn new() -> Self {
34 Self {
35 inner: Mutex::new(DeferredStreamProcessorInner {
36 opt_deferred_stream_channel: None,
37 opt_stopper: None,
38 opt_join_handle: None,
39 }),
40 }
41 }
42
43 pub fn init(&self) {
45 let stopper = StopSource::new();
46 let stop_token = stopper.token();
47
48 let mut inner = self.inner.lock();
49 inner.opt_stopper = Some(stopper);
50 let (dsc_tx, dsc_rx) = flume::unbounded::<PinBoxFutureStatic<()>>();
51 inner.opt_deferred_stream_channel = Some(dsc_tx);
52 inner.opt_join_handle = Some(spawn(
53 "deferred stream processor",
54 Self::processor(stop_token, dsc_rx),
55 ));
56 }
57
58 pub async fn terminate(&self) {
60 let opt_jh = {
61 let mut inner = self.inner.lock();
62 drop(inner.opt_deferred_stream_channel.take());
63 drop(inner.opt_stopper.take());
64 inner.opt_join_handle.take()
65 };
66 if let Some(jh) = opt_jh {
67 jh.await;
68 }
69 }
70
71 async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver<PinBoxFutureStatic<()>>) {
72 let mut unord = FuturesUnordered::<PinBoxFutureStatic<()>>::new();
73
74 unord.push(Box::pin(stop_token));
76
77 let mut unord_fut = unord.next();
79 let mut dsc_fut = dsc_rx.recv_async();
80 loop {
81 let res = select(unord_fut, dsc_fut).await;
82 match res {
83 Either::Left((x, old_dsc_fut)) => {
84 if x.is_none() {
86 break;
87 }
88
89 unord_fut = unord.next();
91 dsc_fut = old_dsc_fut;
93 }
94 Either::Right((new_proc, old_unord_fut)) => {
95 drop(old_unord_fut);
98 let Ok(new_proc) = new_proc else {
99 break;
100 };
101
102 unord.push(new_proc);
104
105 unord_fut = unord.next();
109 dsc_fut = dsc_rx.recv_async();
111 }
112 }
113 }
114 }
115
116 pub fn add_stream<
123 T: Send + 'static,
124 S: futures_util::Stream<Item = T> + Unpin + Send + 'static,
125 >(
126 &self,
127 mut receiver: S,
128 mut handler: impl FnMut(T) -> PinBoxFutureStatic<DeferredStreamResult> + Send + 'static,
129 ) -> bool {
130 let (st, dsc_tx) = {
131 let inner = self.inner.lock();
132 let Some(st) = inner.opt_stopper.as_ref().map(|s| s.token()) else {
133 return false;
134 };
135 let Some(dsc_tx) = inner.opt_deferred_stream_channel.clone() else {
136 return false;
137 };
138 (st, dsc_tx)
139 };
140 let drp = Box::pin(async move {
141 while let Ok(Some(res)) = receiver.next().timeout_at(st.clone()).await {
142 if matches!(handler(res).await, DeferredStreamResult::Done) {
143 break;
144 }
145 }
146 });
147 if dsc_tx.send(drp).is_err() {
148 return false;
149 }
150 true
151 }
152
153 pub fn add_future<F>(&self, fut: F) -> bool
155 where
156 F: Future<Output = ()> + Send + 'static,
157 {
158 let dsc_tx = {
159 let inner = self.inner.lock();
160 let Some(dsc_tx) = inner.opt_deferred_stream_channel.clone() else {
161 return false;
162 };
163 dsc_tx
164 };
165 if dsc_tx.send(Box::pin(fut)).is_err() {
166 return false;
167 }
168 true
169 }
170}
171
172impl Default for DeferredStreamProcessor {
173 fn default() -> Self {
174 Self::new()
175 }
176}