tokio_process_tools/
inspector.rs

1use thiserror::Error;
2use tokio::sync::oneshot::Sender;
3use tokio::task::JoinHandle;
4
5/// Errors that can occur when inspecting stream data.
6#[derive(Debug, Error)]
7pub enum InspectorError {
8    /// The inspector task could not be joined/terminated.
9    #[error("The inspector task could not be joined/terminated: {0}")]
10    TaskJoin(#[from] tokio::task::JoinError),
11}
12
13/// A collector for stream data, inspecting it chunk by chunk.
14///
15/// See the `inspect_*` functions on `BroadcastOutputStream` and `SingleOutputStream`.
16///
17/// For proper cleanup, call
18/// - `wait()`, which waits for the collection task to complete.
19/// - `cancel()`, which sends a termination signal and then waits for the collection task to complete.
20///
21/// If not cleaned up, the termination signal will be sent when dropping this collector,
22/// but the task will be aborted (forceful, not waiting for its regular completion).
23pub struct Inspector {
24    pub(crate) task: Option<JoinHandle<()>>,
25    pub(crate) task_termination_sender: Option<Sender<()>>,
26}
27
28impl Inspector {
29    /// Checks if this task has finished.
30    pub fn is_finished(&self) -> bool {
31        self.task.as_ref().map(|t| t.is_finished()).unwrap_or(true)
32    }
33
34    /// Wait for the inspector to terminate naturally.
35    ///
36    /// An inspector will automatically terminate when either:
37    ///
38    /// 1. The underlying write-side of the stream is dropped.
39    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
40    /// 3. The first `Next::Break` is observed.
41    ///
42    /// If none of these may occur in your case, this could/will hang forever!
43    pub async fn wait(mut self) -> Result<(), InspectorError> {
44        // Take the `task_termination_sender`. Let's make sure nobody can ever interfere with us
45        // waiting here. DO NOT drop it, or the task will terminate (at least if it also takes the
46        // receive-error as a signal to terminate)!
47        let tts = self.task_termination_sender.take();
48
49        let result = self
50            .task
51            .take()
52            .expect("`task` to be present.")
53            .await
54            .map_err(InspectorError::TaskJoin);
55
56        // Drop the termination sender, we don't need it. Task is now terminated.
57        drop(tts);
58
59        result
60    }
61
62    /// Sends a cancellation event to the inspector, letting it shut down.
63    pub async fn cancel(mut self) -> Result<(), InspectorError> {
64        // We ignore any potential error here.
65        // Sending may fail if the task is already terminated (for example, by reaching EOF),
66        // which in turn dropped the receiver end!
67        let _res = self
68            .task_termination_sender
69            .take()
70            .expect("`task_termination_sender` to be present.")
71            .send(());
72
73        self.wait().await
74    }
75}
76
77impl Drop for Inspector {
78    fn drop(&mut self) {
79        if let Some(task_termination_sender) = self.task_termination_sender.take() {
80            // We ignore any potential error here.
81            // Sending may fail if the task is already terminated (for example, by reaching EOF),
82            // which in turn dropped the receiver end!
83            let _res = task_termination_sender.send(());
84        }
85        if let Some(task) = self.task.take() {
86            task.abort();
87        }
88    }
89}