use thiserror::Error;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
#[derive(Debug, Error)]
pub enum InspectorError {
#[error("Failed to join/terminate the inspector task over stream '{stream_name}': {source}")]
TaskJoin {
stream_name: &'static str,
#[source]
source: tokio::task::JoinError,
},
}
pub struct Inspector {
pub(crate) stream_name: &'static str,
pub(crate) task: Option<JoinHandle<()>>,
pub(crate) task_termination_sender: Option<Sender<()>>,
}
impl Inspector {
#[must_use]
pub fn is_finished(&self) -> bool {
self.task
.as_ref()
.is_none_or(tokio::task::JoinHandle::is_finished)
}
pub async fn wait(mut self) -> Result<(), InspectorError> {
let tts = self.task_termination_sender.take();
let result = self
.task
.take()
.expect("`task` to be present.")
.await
.map_err(|err| InspectorError::TaskJoin {
stream_name: self.stream_name,
source: err,
});
drop(tts);
result
}
pub async fn cancel(mut self) -> Result<(), InspectorError> {
let _res = self
.task_termination_sender
.take()
.expect("`task_termination_sender` to be present.")
.send(());
self.wait().await
}
}
impl Drop for Inspector {
fn drop(&mut self) {
if let Some(task_termination_sender) = self.task_termination_sender.take() {
let _res = task_termination_sender.send(());
}
if let Some(task) = self.task.take() {
task.abort();
}
}
}