tokio_process_tools/
collector.rs

1use crate::output_stream::Next;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use thiserror::Error;
6use tokio::sync::oneshot::Sender;
7use tokio::task::JoinHandle;
8
9/// Errors that can occur when collecting stream data.
10#[derive(Debug, Error)]
11pub enum CollectorError {
12    /// The collector task could not be joined/terminated.
13    #[error("The collector task could not be joined/terminated: {0}")]
14    TaskJoin(#[source] tokio::task::JoinError),
15}
16
17/// A trait for types that can act as sinks for collected stream data.
18///
19/// This is automatically implemented for any type that is `Debug + Send + Sync + 'static`.
20pub trait Sink: Debug + Send + Sync + 'static {}
21
22impl<T> Sink for T where T: Debug + Send + Sync + 'static {}
23
24// NOTE: We use Pin<Box> here to force usage of Higher-Rank Trait Bounds (HRTBs).
25// The returned futures will most-likely capture the `&mut T`and are therefore poised
26// by its lifetime. Without the trait-object usage, this would not work.
27pub type AsyncCollectFn<'a> = Pin<Box<dyn Future<Output = Next> + Send + 'a>>;
28
29/// A collector for stream data, inspecting it chunk by chunk but also providing mutable access
30/// to a sink in which the data can be stored.
31///
32/// See the `collect_*` functions on `BroadcastOutputStream` and `SingleOutputStream`.
33///
34/// For proper cleanup, call
35/// - `wait()`, which waits for the collection task to complete.
36/// - `cancel()`, which sends a termination signal and then waits for the collection task to complete.
37///
38/// If not cleaned up, the termination signal will be sent when dropping this collector,
39/// but the task will be aborted (forceful, not waiting for its regular completion).
40pub struct Collector<S: Sink> {
41    pub(crate) task: Option<JoinHandle<S>>,
42    pub(crate) task_termination_sender: Option<Sender<()>>,
43}
44
45impl<S: Sink> Collector<S> {
46    /// Checks if this task has finished.
47    pub fn is_finished(&self) -> bool {
48        self.task.as_ref().map(|t| t.is_finished()).unwrap_or(true)
49    }
50
51    /// Wait for the collector to terminate naturally.
52    ///
53    /// A collector will automatically terminate when either:
54    ///
55    /// 1. The underlying write-side of the stream is dropped.
56    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
57    /// 3. The first `Next::Break` is observed.
58    ///
59    /// If none of these may occur in your case, this could/will hang forever!
60    pub async fn wait(mut self) -> Result<S, CollectorError> {
61        // Take the `task_termination_sender`. Let's make sure nobody can ever interfere with us
62        // waiting here. DO NOT drop it, or the task will terminate (at least if it also takes the
63        // receive-error as a signal to terminate)!
64        let tts = self.task_termination_sender.take();
65
66        let sink = self
67            .task
68            .take()
69            .expect("`task` to be present.")
70            .await
71            .map_err(CollectorError::TaskJoin);
72
73        // Drop the termination sender, we don't need it. Task is now terminated.
74        drop(tts);
75
76        sink
77    }
78
79    /// Sends a cancellation event to the collector, letting it shut down.
80    pub async fn cancel(mut self) -> Result<S, CollectorError> {
81        // We ignore any potential error here.
82        // Sending may fail if the task is already terminated (for example, by reaching EOF),
83        // which in turn dropped the receiver end!
84        let _res = self
85            .task_termination_sender
86            .take()
87            .expect("`task_termination_sender` to be present.")
88            .send(());
89
90        self.wait().await
91    }
92}
93
94impl<S: Sink> Drop for Collector<S> {
95    fn drop(&mut self) {
96        if let Some(task_termination_sender) = self.task_termination_sender.take() {
97            // We ignore any potential error here.
98            // Sending may fail if the task is already terminated (for example, by reaching EOF),
99            // which in turn dropped the receiver end!
100            let _res = task_termination_sender.send(());
101        }
102        if let Some(task) = self.task.take() {
103            task.abort();
104        }
105    }
106}