use crate::error::PoolError;
use crate::task::TaskLabel;
use fibre::oneshot;
use std::collections::HashSet;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing;
#[derive(Debug)]
pub struct TaskHandle<R: Send + 'static> {
pub(crate) task_id: u64,
pub(crate) cancellation_token: CancellationToken,
pub(crate) result_receiver: Option<oneshot::Receiver<Result<R, PoolError>>>,
pub(crate) labels: Arc<HashSet<TaskLabel>>,
pub(crate) is_detached: bool,
}
impl<R: Send + 'static> TaskHandle<R> {
pub fn id(&self) -> u64 {
self.task_id
}
pub fn labels(&self) -> HashSet<TaskLabel> {
(*self.labels).clone() }
pub fn is_cancellation_requested(&self) -> bool {
self.cancellation_token.is_cancelled()
}
pub fn cancel(&self) {
tracing::debug!(task_id = %self.task_id, "TaskHandle: Cancellation requested.");
self.cancellation_token.cancel();
}
pub fn detach(mut self) {
self.is_detached = true;
self.result_receiver.take();
tracing::trace!(task_id = %self.task_id, "TaskHandle: Task detached.");
}
pub async fn await_result(mut self) -> Result<R, PoolError> {
match self.result_receiver.take() {
Some(rx) => {
match rx.recv().await {
Ok(task_outcome_result) => task_outcome_result, Err(oneshot_recv_error) => {
tracing::warn!(task_id = %self.task_id, "Result channel receive error: {}", oneshot_recv_error);
Err(PoolError::ResultChannelError(format!(
"Task (id: {}) result channel unexpectedly closed: {}",
self.task_id, oneshot_recv_error
)))
}
}
}
None => Err(PoolError::ResultUnavailable),
}
}
}