1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
use futures_util::{
future::{select, Either},
stream::FuturesUnordered,
StreamExt,
};
use stop_token::future::FutureExt as _;
use super::*;
/// Background processor for streams
/// Handles streams to completion, passing each item from the stream to a callback
pub struct DeferredStreamProcessor {
pub opt_deferred_stream_channel: Option<flume::Sender<SendPinBoxFuture<()>>>,
pub opt_stopper: Option<StopSource>,
pub opt_join_handle: Option<MustJoinHandle<()>>,
}
impl DeferredStreamProcessor {
/// Create a new DeferredStreamProcessor
pub fn new() -> Self {
Self {
opt_deferred_stream_channel: None,
opt_stopper: None,
opt_join_handle: None,
}
}
/// Initialize the processor before use
pub async fn init(&mut self) {
let stopper = StopSource::new();
let stop_token = stopper.token();
self.opt_stopper = Some(stopper);
let (dsc_tx, dsc_rx) = flume::unbounded::<SendPinBoxFuture<()>>();
self.opt_deferred_stream_channel = Some(dsc_tx);
self.opt_join_handle = Some(spawn(
"deferred stream processor",
Self::processor(stop_token, dsc_rx),
));
}
/// Terminate the processor and ensure all streams are closed
pub async fn terminate(&mut self) {
drop(self.opt_deferred_stream_channel.take());
drop(self.opt_stopper.take());
if let Some(jh) = self.opt_join_handle.take() {
jh.await;
}
}
async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver<SendPinBoxFuture<()>>) {
let mut unord = FuturesUnordered::<SendPinBoxFuture<()>>::new();
// Ensure the unord never finishes
unord.push(Box::pin(std::future::pending()));
// Processor loop
let mut unord_fut = unord.next();
let mut dsc_fut = dsc_rx.recv_async();
while let Ok(res) = select(unord_fut, dsc_fut)
.timeout_at(stop_token.clone())
.await
{
match res {
Either::Left((x, old_dsc_fut)) => {
// Unord future processor should never get empty
assert!(x.is_some());
// Make another unord future to process
unord_fut = unord.next();
// put back the other future and keep going
dsc_fut = old_dsc_fut;
}
Either::Right((new_proc, old_unord_fut)) => {
// Immediately drop the old unord future
// because we never care about it completing
drop(old_unord_fut);
let Ok(new_proc) = new_proc else {
break;
};
// Add a new stream to process
unord.push(new_proc);
// Make a new unord future because we don't care about the
// completion of the last unord future, they never return
// anything.
unord_fut = unord.next();
// Make a new receiver future
dsc_fut = dsc_rx.recv_async();
}
}
}
}
/// Queue a stream to process in the background
///
/// * 'receiver' is the stream to process
/// * 'handler' is the callback to handle each item from the stream
///
/// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized.
pub fn add<T: Send + 'static>(
&mut self,
receiver: flume::Receiver<T>,
mut handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
) -> bool {
let Some(st) = self.opt_stopper.as_ref().map(|s| s.token()) else {
return false;
};
let Some(dsc_tx) = self.opt_deferred_stream_channel.clone() else {
return false;
};
let drp = Box::pin(async move {
while let Ok(Ok(res)) = receiver.recv_async().timeout_at(st.clone()).await {
if !handler(res).await {
break;
}
}
});
if dsc_tx.send(drp).is_err() {
return false;
}
true
}
}
impl Default for DeferredStreamProcessor {
fn default() -> Self {
Self::new()
}
}