tokio_process_tools/
inspector.rs

1use thiserror::Error;
2use tokio::sync::oneshot::Sender;
3use tokio::task::JoinHandle;
4
5#[derive(Debug, Error)]
6pub enum InspectorError {
7    #[error("The inspector task could not be joined/terminated: {0}")]
8    TaskJoin(#[from] tokio::task::JoinError),
9}
10
11/// A collector for stream data, inspecting it chunk by chunk.
12///
13/// See the `inspect_*` functions on `BroadcastOutputStream` and `SingleOutputStream`.
14///
15/// For proper cleanup, call
16/// - `wait()`, which waits for the collection task to complete.
17/// - `cancel()`, which sends a termination signal and then waits for the collection task to complete.
18///
19/// If not cleaned up, the termination signal will be sent when dropping this collector,
20/// but the task will be aborted (forceful, not waiting for its regular completion).
21pub struct Inspector {
22    pub(crate) task: Option<JoinHandle<()>>,
23    pub(crate) task_termination_sender: Option<Sender<()>>,
24}
25
26impl Inspector {
27    /// Checks if this task has finished.
28    pub fn is_finished(&self) -> bool {
29        self.task.as_ref().map(|t| t.is_finished()).unwrap_or(true)
30    }
31
32    /// Wait for the inspector to terminate naturally.
33    ///
34    /// An inspector will automatically terminate when either:
35    ///
36    /// 1. The underlying write-side of the stream is dropped.
37    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
38    /// 3. The first `Next::Break` is observed.
39    ///
40    /// If none of these may occur in your case, this could/will hang forever!
41    pub async fn wait(mut self) -> Result<(), InspectorError> {
42        // Take the `task_termination_sender`. Let's make sure nobody can ever interfere with us
43        // waiting here. DO NOT drop it, or the task will terminate (at least if it also takes the
44        // receive-error as a signal to terminate)!
45        let tts = self.task_termination_sender.take();
46
47        let result = self
48            .task
49            .take()
50            .expect("`task` to be present.")
51            .await
52            .map_err(InspectorError::TaskJoin);
53
54        // Drop the termination sender, we don't need it. Task is now terminated.
55        drop(tts);
56
57        result
58    }
59
60    /// Sends a cancellation event to the inspector, letting it shut down.
61    pub async fn cancel(mut self) -> Result<(), InspectorError> {
62        // We ignore any potential error here.
63        // Sending may fail if the task is already terminated (for example, by reaching EOF),
64        // which in turn dropped the receiver end!
65        let _res = self
66            .task_termination_sender
67            .take()
68            .expect("`task_termination_sender` to be present.")
69            .send(());
70
71        self.wait().await
72    }
73}
74
75impl Drop for Inspector {
76    fn drop(&mut self) {
77        if let Some(task_termination_sender) = self.task_termination_sender.take() {
78            // We ignore any potential error here.
79            // Sending may fail if the task is already terminated (for example, by reaching EOF),
80            // which in turn dropped the receiver end!
81            let _res = task_termination_sender.send(());
82        }
83        if let Some(task) = self.task.take() {
84            task.abort();
85        }
86    }
87}