tokio_process_tools/
collector.rs

1use std::error::Error;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use thiserror::Error;
6use tokio::sync::oneshot::Sender;
7use tokio::task::JoinHandle;
8
9#[derive(Debug, Error)]
10pub enum CollectorError {
11    #[error("The collector task could not be joined/terminated: {0}")]
12    TaskJoin(#[source] tokio::task::JoinError),
13}
14
15pub trait Sink: Debug + Send + Sync + 'static {}
16
17impl<T> Sink for T where T: Debug + Send + Sync + 'static {}
18
19pub type CollectError = Box<dyn Error + Send + Sync>;
20
21pub type AsyncCollectFn<'a> = Pin<Box<dyn Future<Output = Result<(), CollectError>> + Send + 'a>>;
22
23/// A collector for output lines.
24///
25/// For proper cleanup, call `abort()` which gracefully waits for the collecting task to complete.
26/// It should terminate fast, as an internal termination signal is send to it.
27/// If dropped without calling `abort()`, the termination will be sent as well,
28/// but the task will be aborted (forceful, not waiting for completion).
29pub struct Collector<T: Sink> {
30    pub(crate) task: Option<JoinHandle<T>>,
31    pub(crate) task_termination_sender: Option<Sender<()>>,
32}
33
34impl<T: Sink> Collector<T> {
35    pub async fn abort(mut self) -> Result<T, CollectorError> {
36        if let Some(task_termination_sender) = self.task_termination_sender.take() {
37            // Safety: In normal (non-panic-) scenarios, this call SHOULD never fail.
38            // The receiver lives in the tokio task, and is only dropped after once receiving
39            // the termination signal.
40            // The task is only awaited-and-dropped after THIS send and only ONCE, gated by taking
41            // it out the `Option`, which can only be done once.
42            // It might fail when a panic occurs.
43            if let Err(_err) = task_termination_sender.send(()) {
44                tracing::trace!(
45                    "Unexpected failure when sending termination signal to collector task."
46                );
47            };
48        }
49        if let Some(task) = self.task.take() {
50            return task.await.map_err(CollectorError::TaskJoin);
51        }
52        unreachable!("The collector task was already aborted");
53    }
54}
55
56impl<T: Sink> Drop for Collector<T> {
57    fn drop(&mut self) {
58        if let Some(task_termination_sender) = self.task_termination_sender.take() {
59            // Safety: In normal (non-panic-) scenarios, this call SHOULD never fail.
60            // The receiver lives in the tokio task, and is only dropped after once receiving
61            // the termination signal.
62            // The task is only awaited-and-dropped after THIS send and only ONCE, gated by taking
63            // it out the `Option`, which can only be done once.
64            // It might fail when a panic occurs.
65            if let Err(_err) = task_termination_sender.send(()) {
66                tracing::trace!(
67                    "Unexpected failure when sending termination signal to collector task."
68                );
69            }
70        }
71        if let Some(task) = self.task.take() {
72            task.abort();
73        }
74    }
75}