1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use futures_util::{
    future::{select, Either},
    stream::FuturesUnordered,
    StreamExt,
};
use stop_token::future::FutureExt as _;

use super::*;

/// Background processor for streams
/// Handles streams to completion, passing each item from the stream to a callback
pub struct DeferredStreamProcessor {
    pub opt_deferred_stream_channel: Option<flume::Sender<SendPinBoxFuture<()>>>,
    pub opt_stopper: Option<StopSource>,
    pub opt_join_handle: Option<MustJoinHandle<()>>,
}

impl DeferredStreamProcessor {
    /// Create a new DeferredStreamProcessor
    pub fn new() -> Self {
        Self {
            opt_deferred_stream_channel: None,
            opt_stopper: None,
            opt_join_handle: None,
        }
    }

    /// Initialize the processor before use
    pub async fn init(&mut self) {
        let stopper = StopSource::new();
        let stop_token = stopper.token();
        self.opt_stopper = Some(stopper);
        let (dsc_tx, dsc_rx) = flume::unbounded::<SendPinBoxFuture<()>>();
        self.opt_deferred_stream_channel = Some(dsc_tx);
        self.opt_join_handle = Some(spawn(
            "deferred stream processor",
            Self::processor(stop_token, dsc_rx),
        ));
    }

    /// Terminate the processor and ensure all streams are closed
    pub async fn terminate(&mut self) {
        drop(self.opt_deferred_stream_channel.take());
        drop(self.opt_stopper.take());
        if let Some(jh) = self.opt_join_handle.take() {
            jh.await;
        }
    }

    async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver<SendPinBoxFuture<()>>) {
        let mut unord = FuturesUnordered::<SendPinBoxFuture<()>>::new();

        // Ensure the unord never finishes
        unord.push(Box::pin(std::future::pending()));

        // Processor loop
        let mut unord_fut = unord.next();
        let mut dsc_fut = dsc_rx.recv_async();
        while let Ok(res) = select(unord_fut, dsc_fut)
            .timeout_at(stop_token.clone())
            .await
        {
            match res {
                Either::Left((x, old_dsc_fut)) => {
                    // Unord future processor should never get empty
                    assert!(x.is_some());

                    // Make another unord future to process
                    unord_fut = unord.next();
                    // put back the other future and keep going
                    dsc_fut = old_dsc_fut;
                }
                Either::Right((new_proc, old_unord_fut)) => {
                    // Immediately drop the old unord future
                    // because we never care about it completing
                    drop(old_unord_fut);
                    let Ok(new_proc) = new_proc else {
                        break;
                    };

                    // Add a new stream to process
                    unord.push(new_proc);

                    // Make a new unord future because we don't care about the
                    // completion of the last unord future, they never return
                    // anything.
                    unord_fut = unord.next();
                    // Make a new receiver future
                    dsc_fut = dsc_rx.recv_async();
                }
            }
        }
    }

    /// Queue a stream to process in the background
    ///
    /// * 'receiver' is the stream to process
    /// * 'handler' is the callback to handle each item from the stream
    ///
    /// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized.
    pub fn add<T: Send + 'static>(
        &mut self,
        receiver: flume::Receiver<T>,
        mut handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
    ) -> bool {
        let Some(st) = self.opt_stopper.as_ref().map(|s| s.token()) else {
            return false;
        };
        let Some(dsc_tx) = self.opt_deferred_stream_channel.clone() else {
            return false;
        };
        let drp = Box::pin(async move {
            while let Ok(Ok(res)) = receiver.recv_async().timeout_at(st.clone()).await {
                if !handler(res).await {
                    break;
                }
            }
        });
        if dsc_tx.send(drp).is_err() {
            return false;
        }
        true
    }
}

impl Default for DeferredStreamProcessor {
    fn default() -> Self {
        Self::new()
    }
}