crb_routine/
runtime.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use crate::finalizer::{BoxFinalizer, Finalizer};
use crate::routine::Routine;
use async_trait::async_trait;
use crb_core::time::Duration;
use crb_runtime::kit::{
    Context, Controller, Entrypoint, Failures, InteractiveRuntime, Interruptor, ManagedContext,
    Runtime,
};

pub struct RoutineRuntime<R: Routine> {
    pub routine: R,
    pub context: R::Context,
    pub failures: Failures,
}

impl<R: Routine> RoutineRuntime<R> {
    pub fn new(routine: R) -> Self
    where
        R::Context: Default,
    {
        Self {
            routine,
            context: R::Context::default(),
            failures: Failures::default(),
        }
    }
}

#[async_trait]
impl<R: Routine> InteractiveRuntime for RoutineRuntime<R> {
    type Context = R::Context;

    fn address(&self) -> <Self::Context as Context>::Address {
        self.context.address().clone()
    }
}

#[async_trait]
impl<R: Routine> Runtime for RoutineRuntime<R> {
    fn get_interruptor(&mut self) -> Interruptor {
        self.context.session().controller().interruptor.clone()
    }

    async fn routine(&mut self) {
        let ctx = &mut self.context;
        let result = self.routine.routine(ctx).await;
        self.failures.put(result);
    }
}

pub struct RoutineSession<R: Routine> {
    controller: Controller,
    /// Interval between repeatable routine calls
    interval: Duration,
    finalizer: Option<BoxFinalizer<R::Output>>,
}

impl<R: Routine> Default for RoutineSession<R> {
    fn default() -> Self {
        let controller = Controller::default();
        Self {
            controller,
            interval: Duration::from_secs(5),
            finalizer: None,
        }
    }
}

impl<R: Routine> RoutineSession<R> {
    /// Set repeat interval.
    pub fn set_interval(&mut self, interval: Duration) {
        self.interval = interval;
    }

    pub fn interval(&self) -> Duration {
        self.interval
    }

    pub fn set_finalizer(&mut self, finalizer: impl Finalizer<R::Output>) {
        self.finalizer = Some(Box::new(finalizer));
    }

    pub fn take_finalizer(&mut self) -> Option<BoxFinalizer<R::Output>> {
        self.finalizer.take()
    }
}

impl<R: Routine> Context for RoutineSession<R> {
    // TODO: TaskAddress that uses a controller internally
    type Address = ();

    fn address(&self) -> &Self::Address {
        &()
    }
}

impl<R: Routine> ManagedContext for RoutineSession<R> {
    fn controller(&mut self) -> &mut Controller {
        &mut self.controller
    }

    fn shutdown(&mut self) {
        self.controller.stop(false).ok();
    }
}

pub trait RoutineContext<R: Routine>: Context {
    fn session(&mut self) -> &mut RoutineSession<R>;
}

impl<R: Routine> RoutineContext<R> for RoutineSession<R> {
    fn session(&mut self) -> &mut RoutineSession<R> {
        self
    }
}

pub trait Standalone: Routine {
    fn spawn(self)
    where
        Self::Context: Default;
}

impl<T: Routine + 'static> Standalone for T {
    fn spawn(self)
    where
        Self::Context: Default,
    {
        let mut runtime = RoutineRuntime::new(self);
        let address = runtime.context.session().address().clone();
        crb_core::spawn(runtime.entrypoint());
        address
    }
}