tokio_process_tools/
inspector.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use thiserror::Error;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

#[derive(Debug, Error)]
pub enum InspectorError {
    #[error("The inspector task could not be joined/terminated: {0}")]
    TaskJoin(#[from] tokio::task::JoinError),
}

/// An inspector for output lines.
///
/// For proper cleanup, call `abort()` which gracefully waits for the collecting task to complete.
/// It should terminate fast, as an internal termination signal is send to it.
/// If dropped without calling `abort()`, the termination will be sent as well,
/// but the task will be aborted (forceful, not waiting for completion).
pub struct Inspector {
    pub(crate) task: Option<JoinHandle<()>>,
    pub(crate) task_termination_sender: Option<Sender<()>>,
}

impl Inspector {
    pub async fn abort(mut self) -> Result<(), InspectorError> {
        if let Some(task_termination_sender) = self.task_termination_sender.take() {
            // Safety: This `expect` call SHOULD neve fail. The receiver lives in the tokio task,
            // and is only dropped after receiving the termination signal.
            // The task is only awaited-and-dropped after THIS send and only ONCE, gated by taking
            // it out the `Option`, which can only be done once.
            if let Err(_err) = task_termination_sender.send(()) {
                tracing::error!(
                    "Unexpected failure when sending termination signal to inspector task."
                );
            };
        }
        if let Some(task) = self.task.take() {
            return task.await.map_err(InspectorError::TaskJoin);
        }
        unreachable!("The inspector task was already aborted");
    }
}

impl Drop for Inspector {
    fn drop(&mut self) {
        if let Some(task_termination_sender) = self.task_termination_sender.take() {
            if let Err(_err) = task_termination_sender.send(()) {
                tracing::error!(
                    "Unexpected failure when sending termination signal to inspector task."
                );
            }
        }
        if let Some(task) = self.task.take() {
            task.abort();
        }
    }
}