tokio_process_tools/inspector.rs
1use thiserror::Error;
2use tokio::sync::oneshot::Sender;
3use tokio::task::JoinHandle;
4
5/// Errors that can occur when inspecting stream data.
6#[derive(Debug, Error)]
7pub enum InspectorError {
8 /// The inspector task could not be joined/terminated.
9 #[error("Failed to join/terminate the inspector task over stream '{stream_name}': {source}")]
10 TaskJoin {
11 /// The name of the stream this inspector operates on.
12 stream_name: &'static str,
13
14 /// The source error.
15 #[source]
16 source: tokio::task::JoinError,
17 },
18}
19
20/// An inspector for stream data, inspecting it chunk by chunk.
21///
22/// See the `inspect_*` functions on `BroadcastOutputStream` and `SingleOutputStream`.
23///
24/// For proper cleanup, call
25/// - `wait()`, which waits for the collection task to complete.
26/// - `cancel()`, which sends a termination signal and then waits for the collection task to complete.
27///
28/// If not cleaned up, the termination signal will be sent when dropping this inspector,
29/// but the task will be aborted (forceful, not waiting for its regular completion).
30pub struct Inspector {
31 /// The name of the stream this inspector operates on.
32 pub(crate) stream_name: &'static str,
33
34 pub(crate) task: Option<JoinHandle<()>>,
35 pub(crate) task_termination_sender: Option<Sender<()>>,
36}
37
38impl Inspector {
39 /// Checks if this task has finished.
40 pub fn is_finished(&self) -> bool {
41 self.task.as_ref().map(|t| t.is_finished()).unwrap_or(true)
42 }
43
44 /// Wait for the inspector to terminate naturally.
45 ///
46 /// An inspector will automatically terminate when either:
47 ///
48 /// 1. The underlying write-side of the stream is dropped.
49 /// 2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
50 /// 3. The first `Next::Break` is observed.
51 ///
52 /// If none of these may occur in your case, this could/will hang forever!
53 pub async fn wait(mut self) -> Result<(), InspectorError> {
54 // Take the `task_termination_sender`. Let's make sure nobody can ever interfere with us
55 // waiting here. DO NOT drop it, or the task will terminate (at least if it also takes the
56 // receive-error as a signal to terminate)!
57 let tts = self.task_termination_sender.take();
58
59 let result = self
60 .task
61 .take()
62 .expect("`task` to be present.")
63 .await
64 .map_err(|err| InspectorError::TaskJoin {
65 stream_name: self.stream_name,
66 source: err,
67 });
68
69 // Drop the termination sender, we don't need it. Task is now terminated.
70 drop(tts);
71
72 result
73 }
74
75 /// Sends a cancellation event to the inspector, letting it shut down.
76 pub async fn cancel(mut self) -> Result<(), InspectorError> {
77 // We ignore any potential error here.
78 // Sending may fail if the task is already terminated (for example, by reaching EOF),
79 // which in turn dropped the receiver end!
80 let _res = self
81 .task_termination_sender
82 .take()
83 .expect("`task_termination_sender` to be present.")
84 .send(());
85
86 self.wait().await
87 }
88}
89
90impl Drop for Inspector {
91 fn drop(&mut self) {
92 if let Some(task_termination_sender) = self.task_termination_sender.take() {
93 // We ignore any potential error here.
94 // Sending may fail if the task is already terminated (for example, by reaching EOF),
95 // which in turn dropped the receiver end!
96 let _res = task_termination_sender.send(());
97 }
98 if let Some(task) = self.task.take() {
99 task.abort();
100 }
101 }
102}