crb_task/
task.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use crate::runtime::{Task, TaskRuntime};
use anyhow::{Error, Result};
use async_trait::async_trait;
use crb_core::JoinHandle;
use crb_runtime::kit::{Entrypoint, Interruptor, Runtime};
use derive_more::{Deref, DerefMut};
use futures::Future;
use std::marker::PhantomData;

#[derive(Deref, DerefMut)]
pub struct TypedTask<T> {
    #[deref]
    #[deref_mut]
    task: TypelessTask,
    _run: PhantomData<T>,
}

impl<T: Task> TypedTask<T> {
    pub fn spawn(task: T) -> Self {
        let mut runtime = TaskRuntime::new(task);
        let interruptor = runtime.get_interruptor();
        let handle = crb_core::spawn(runtime.entrypoint());
        let task = TypelessTask {
            interruptor,
            handle,
            cancel_on_drop: false,
        };
        Self {
            task,
            _run: PhantomData,
        }
    }
}

impl<T> From<TypedTask<T>> for TypelessTask {
    fn from(typed: TypedTask<T>) -> Self {
        typed.task
    }
}

pub struct TypelessTask {
    interruptor: Interruptor,
    handle: JoinHandle<()>,
    cancel_on_drop: bool,
}

impl TypelessTask {
    pub fn cancel_on_drop(&mut self, cancel: bool) {
        self.cancel_on_drop = cancel;
    }

    pub fn interrupt(&mut self) {
        self.interruptor.stop(true).ok();
    }
}

impl Drop for TypelessTask {
    fn drop(&mut self) {
        if self.cancel_on_drop {
            self.handle.abort();
        }
    }
}

impl TypelessTask {
    pub fn spawn<T>(fut: T) -> Self
    where
        T: Future<Output = Result<()>> + Send + 'static,
    {
        let task = FnTask { fut: Some(fut) };
        TypedTask::spawn(task).into()
    }
}

struct FnTask<T> {
    fut: Option<T>,
}

#[async_trait]
impl<T> Task for FnTask<T>
where
    T: Future<Output = Result<()>> + Send + 'static,
{
    async fn routine(&mut self) -> Result<()> {
        self.fut
            .take()
            .ok_or_else(|| Error::msg("Future has taken already"))?
            .await
    }
}