tokio_process_tools/
collector.rs1use std::error::Error;
2use std::fmt::Debug;
3use std::future::Future;
4use std::pin::Pin;
5use thiserror::Error;
6use tokio::sync::oneshot::Sender;
7use tokio::task::JoinHandle;
8
9#[derive(Debug, Error)]
10pub enum CollectorError {
11 #[error("The collector task could not be joined/terminated: {0}")]
12 TaskJoin(#[source] tokio::task::JoinError),
13}
14
15pub trait Sink: Debug + Send + Sync + 'static {}
16
17impl<T> Sink for T where T: Debug + Send + Sync + 'static {}
18
19pub type CollectError = Box<dyn Error + Send + Sync>;
20
21pub type AsyncCollectFn<'a> = Pin<Box<dyn Future<Output = Result<(), CollectError>> + Send + 'a>>;
22
23pub struct Collector<T: Sink> {
30 pub(crate) task: Option<JoinHandle<T>>,
31 pub(crate) task_termination_sender: Option<Sender<()>>,
32}
33
34impl<T: Sink> Collector<T> {
35 pub async fn abort(mut self) -> Result<T, CollectorError> {
36 if let Some(task_termination_sender) = self.task_termination_sender.take() {
37 if let Err(_err) = task_termination_sender.send(()) {
44 tracing::trace!(
45 "Unexpected failure when sending termination signal to collector task."
46 );
47 };
48 }
49 if let Some(task) = self.task.take() {
50 return task.await.map_err(CollectorError::TaskJoin);
51 }
52 unreachable!("The collector task was already aborted");
53 }
54}
55
56impl<T: Sink> Drop for Collector<T> {
57 fn drop(&mut self) {
58 if let Some(task_termination_sender) = self.task_termination_sender.take() {
59 if let Err(_err) = task_termination_sender.send(()) {
66 tracing::trace!(
67 "Unexpected failure when sending termination signal to collector task."
68 );
69 }
70 }
71 if let Some(task) = self.task.take() {
72 task.abort();
73 }
74 }
75}