tokio_process_tools/
collector.rsuse std::error::Error;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use thiserror::Error;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
#[derive(Debug, Error)]
pub enum CollectorError {
#[error("The collector task could not be joined/terminated: {0}")]
TaskJoin(#[source] tokio::task::JoinError),
}
pub trait Sink: Debug + Send + Sync + 'static {}
impl<T> Sink for T where T: Debug + Send + Sync + 'static {}
pub type CollectError = Box<dyn Error + Send + Sync>;
pub type AsyncCollectFn<'a> = Pin<Box<dyn Future<Output = Result<(), CollectError>> + Send + 'a>>;
pub struct Collector<T: Sink> {
pub(crate) task: Option<JoinHandle<T>>,
pub(crate) task_termination_sender: Option<Sender<()>>,
}
impl<T: Sink> Collector<T> {
pub async fn abort(mut self) -> Result<T, CollectorError> {
if let Some(task_termination_sender) = self.task_termination_sender.take() {
if let Err(_err) = task_termination_sender.send(()) {
tracing::error!(
"Unexpected failure when sending termination signal to collector task."
);
};
}
if let Some(task) = self.task.take() {
return task.await.map_err(CollectorError::TaskJoin);
}
unreachable!("The collector task was already aborted");
}
}
impl<T: Sink> Drop for Collector<T> {
fn drop(&mut self) {
if let Some(task_termination_sender) = self.task_termination_sender.take() {
if let Err(_err) = task_termination_sender.send(()) {
tracing::error!(
"Unexpected failure when sending termination signal to collector task."
);
}
}
if let Some(task) = self.task.take() {
task.abort();
}
}
}