crb_runtime/
task.rs

1use crate::context::ReachableContext;
2use crate::interruptor::Interruptor;
3use crate::runtime::{InteractiveRuntime, Runtime};
4use async_trait::async_trait;
5use crb_core::JoinHandle;
6use derive_more::{Deref, DerefMut};
7use std::marker::PhantomData;
8
9pub trait InteractiveTask<T>: Task<T> + InteractiveRuntime {
10    fn spawn_connected(self) -> <Self::Context as ReachableContext>::Address {
11        let address = self.address();
12        self.spawn();
13        address
14    }
15}
16
17#[async_trait]
18pub trait Task<T = ()>: Runtime + Sized {
19    fn spawn(mut self) -> TaskHandle<T> {
20        let interruptor = self.get_interruptor();
21        let handle = crb_core::spawn(async move {
22            self.routine().await;
23        });
24        let job = JobHandle {
25            interruptor,
26            handle,
27            cancel_on_drop: false,
28        };
29        TaskHandle {
30            job,
31            _task: PhantomData,
32        }
33    }
34
35    async fn run(mut self) {
36        self.routine().await;
37    }
38}
39
40#[derive(Deref, DerefMut)]
41pub struct TaskHandle<T = ()> {
42    #[deref]
43    #[deref_mut]
44    job: JobHandle,
45    _task: PhantomData<T>,
46}
47
48impl<T> TaskHandle<T> {
49    pub fn job(self) -> JobHandle {
50        self.into()
51    }
52}
53
54impl<T> From<TaskHandle<T>> for JobHandle {
55    fn from(task_handle: TaskHandle<T>) -> Self {
56        task_handle.job
57    }
58}
59
60pub struct JobHandle {
61    interruptor: Box<dyn Interruptor>,
62    handle: JoinHandle<()>,
63    cancel_on_drop: bool,
64}
65
66impl JobHandle {
67    pub fn cancel_on_drop(&mut self, cancel: bool) {
68        self.cancel_on_drop = cancel;
69    }
70
71    pub fn interrupt(&mut self) {
72        self.interruptor.interrupt();
73    }
74}
75
76impl Drop for JobHandle {
77    fn drop(&mut self) {
78        if self.cancel_on_drop {
79            self.handle.abort();
80        }
81    }
82}