Skip to main content

tokio_process_tools/
inspector.rs

1use thiserror::Error;
2use tokio::sync::oneshot::Sender;
3use tokio::task::JoinHandle;
4
5/// Errors that can occur when inspecting stream data.
6#[derive(Debug, Error)]
7pub enum InspectorError {
8    /// The inspector task could not be joined/terminated.
9    #[error("Failed to join/terminate the inspector task over stream '{stream_name}': {source}")]
10    TaskJoin {
11        /// The name of the stream this inspector operates on.
12        stream_name: &'static str,
13
14        /// The source error.
15        #[source]
16        source: tokio::task::JoinError,
17    },
18}
19
20/// An inspector for stream data, inspecting it chunk by chunk.
21///
22/// See the `inspect_*` functions on `BroadcastOutputStream` and `SingleOutputStream`.
23///
24/// For proper cleanup, call
25/// - `wait()`, which waits for the collection task to complete.
26/// - `cancel()`, which sends a termination signal and then waits for the collection task to complete.
27///
28/// If not cleaned up, the termination signal will be sent when dropping this inspector,
29/// but the task will be aborted (forceful, not waiting for its regular completion).
30pub struct Inspector {
31    /// The name of the stream this inspector operates on.
32    pub(crate) stream_name: &'static str,
33
34    pub(crate) task: Option<JoinHandle<()>>,
35    pub(crate) task_termination_sender: Option<Sender<()>>,
36}
37
38impl Inspector {
39    /// Checks if this task has finished.
40    #[must_use]
41    pub fn is_finished(&self) -> bool {
42        self.task
43            .as_ref()
44            .is_none_or(tokio::task::JoinHandle::is_finished)
45    }
46
47    /// Wait for the inspector to terminate naturally.
48    ///
49    /// An inspector will automatically terminate when either:
50    ///
51    /// 1. The underlying write-side of the stream is dropped.
52    /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
53    /// 3. The first `Next::Break` is observed.
54    ///
55    /// If none of these may occur in your case, this could/will hang forever!
56    ///
57    /// # Errors
58    ///
59    /// Returns [`InspectorError::TaskJoin`] if the inspector task cannot be joined.
60    ///
61    /// # Panics
62    ///
63    /// Panics if the inspector's internal task has already been taken.
64    pub async fn wait(mut self) -> Result<(), InspectorError> {
65        // Take the `task_termination_sender`. Let's make sure nobody can ever interfere with us
66        // waiting here. DO NOT drop it, or the task will terminate (at least if it also takes the
67        // receive-error as a signal to terminate)!
68        let tts = self.task_termination_sender.take();
69
70        let result = self
71            .task
72            .take()
73            .expect("`task` to be present.")
74            .await
75            .map_err(|err| InspectorError::TaskJoin {
76                stream_name: self.stream_name,
77                source: err,
78            });
79
80        // Drop the termination sender, we don't need it. Task is now terminated.
81        drop(tts);
82
83        result
84    }
85
86    /// Sends a cancellation event to the inspector, letting it shut down.
87    ///
88    /// # Errors
89    ///
90    /// Returns [`InspectorError::TaskJoin`] if the inspector task cannot be joined.
91    ///
92    /// # Panics
93    ///
94    /// Panics if the inspector's internal cancellation sender has already been taken.
95    pub async fn cancel(mut self) -> Result<(), InspectorError> {
96        // We ignore any potential error here.
97        // Sending may fail if the task is already terminated (for example, by reaching EOF),
98        // which in turn dropped the receiver end!
99        let _res = self
100            .task_termination_sender
101            .take()
102            .expect("`task_termination_sender` to be present.")
103            .send(());
104
105        self.wait().await
106    }
107}
108
109impl Drop for Inspector {
110    fn drop(&mut self) {
111        if let Some(task_termination_sender) = self.task_termination_sender.take() {
112            // We ignore any potential error here.
113            // Sending may fail if the task is already terminated (for example, by reaching EOF),
114            // which in turn dropped the receiver end!
115            let _res = task_termination_sender.send(());
116        }
117        if let Some(task) = self.task.take() {
118            task.abort();
119        }
120    }
121}