Skip to main content

co_actor/
task_handle.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use futures::{
5	future::{BoxFuture, LocalBoxFuture},
6	FutureExt,
7};
8use std::{
9	fmt::Debug,
10	future::Future,
11	pin::Pin,
12	task::{Context, Poll},
13};
14#[cfg(feature = "js")]
15use tokio_with_wasm::alias as tokio;
16
17#[derive(Debug, thiserror::Error)]
18pub enum TaskError {
19	#[error("Task has cancelled")]
20	Cancelled,
21}
22
23pub struct TaskHandle<O: Send + 'static> {
24	join: tokio::sync::oneshot::Receiver<O>,
25}
26impl<O: Send + 'static> Debug for TaskHandle<O> {
27	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28		f.debug_struct("TaskHandle").finish()
29	}
30}
31impl<O: Send + 'static> TaskHandle<O> {
32	pub fn handle<F>(task: F) -> (BoxFuture<'static, ()>, TaskHandle<O>)
33	where
34		O: Send + 'static,
35		F: Future<Output = O> + Send + 'static,
36	{
37		let (tx, rx) = tokio::sync::oneshot::channel::<O>();
38		let task = async move {
39			tx.send(task.await).ok();
40		};
41		let task_handle = TaskHandle { join: rx };
42		(task.boxed(), task_handle)
43	}
44
45	pub fn handle_local<F>(task: F) -> (LocalBoxFuture<'static, ()>, TaskHandle<O>)
46	where
47		O: Send + 'static,
48		F: Future<Output = O> + 'static,
49	{
50		let (tx, rx) = tokio::sync::oneshot::channel::<O>();
51		let task = async move {
52			tx.send(task.await).ok();
53		};
54		let task_handle = TaskHandle { join: rx };
55		(task.boxed_local(), task_handle)
56	}
57}
58impl<O: Send + 'static> Future for TaskHandle<O> {
59	type Output = Result<O, TaskError>;
60
61	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62		match self.join.poll_unpin(cx) {
63			Poll::Ready(Ok(result)) => Poll::Ready(Ok(result)),
64			Poll::Ready(Err(_)) => Poll::Ready(Err(TaskError::Cancelled)),
65			Poll::Pending => Poll::Pending,
66		}
67	}
68}
69static_assertions::assert_impl_all!(TaskHandle<()>: Send);