crb_routine/
runtime.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::finalizer::{BoxFinalizer, Finalizer};
use crate::Routine;
use anyhow::Error;
use async_trait::async_trait;
use crb_core::time::Duration;
use crb_runtime::kit::{
    Context, Controller, Entrypoint, Failures, Interruptor, ManagedContext, OpenRuntime, Runtime,
};

pub struct RoutineRuntime<R: Routine> {
    pub routine: R,
    pub context: R::Context,
    pub failures: Failures,
}

impl<R: Routine> RoutineRuntime<R> {
    pub fn new(routine: R) -> Self
    where
        R::Context: Default,
    {
        Self {
            routine,
            context: R::Context::default(),
            failures: Failures::default(),
        }
    }
}

#[async_trait]
impl<R: Routine> OpenRuntime for RoutineRuntime<R> {
    type Context = R::Context;

    fn address(&self) -> <Self::Context as Context>::Address {
        self.context.address().clone()
    }
}

#[async_trait]
impl<R: Routine> Runtime for RoutineRuntime<R> {
    fn get_interruptor(&mut self) -> Interruptor {
        self.context.session().controller().interruptor.clone()
    }

    async fn routine(&mut self) {
        let ctx = &mut self.context;
        let output = self.routine.routine(ctx).await;
        let result = if let Some(mut finalizer) = self.context.session().finalizer.take() {
            finalizer.finalize(output).await
        } else {
            output.map(drop).map_err(Error::from)
        };
        self.failures.put(result);
    }
}

pub struct RoutineSession<R: Routine> {
    controller: Controller,
    /// Interval between repeatable routine calls
    interval: Duration,
    finalizer: Option<BoxFinalizer<R::Output>>,
}

impl<R: Routine> RoutineSession<R> {
    /// Set repeat interval.
    pub fn set_interval(&mut self, interval: Duration) {
        self.interval = interval;
    }

    pub fn interval(&self) -> Duration {
        self.interval
    }

    pub fn set_finalizer(&mut self, finalizer: impl Finalizer<R::Output>) {
        self.finalizer = Some(Box::new(finalizer));
    }

    pub fn take_finalizer(&mut self) -> Option<BoxFinalizer<R::Output>> {
        self.finalizer.take()
    }
}

impl<R: Routine> Context for RoutineSession<R> {
    // TODO: TaskAddress that uses a controller internally
    type Address = ();

    fn address(&self) -> &Self::Address {
        &()
    }
}

impl<R: Routine> ManagedContext for RoutineSession<R> {
    fn controller(&mut self) -> &mut Controller {
        &mut self.controller
    }

    fn shutdown(&mut self) {
        self.controller.stop(false).ok();
    }
}

pub trait RoutineContext<R: Routine>: Context {
    fn session(&mut self) -> &mut RoutineSession<R>;
}

impl<R: Routine> RoutineContext<R> for RoutineSession<R> {
    fn session(&mut self) -> &mut RoutineSession<R> {
        self
    }
}

pub trait Standalone: Routine {
    fn spawn(self)
    where
        Self::Context: Default;
}

impl<T: Routine + 'static> Standalone for T {
    fn spawn(self)
    where
        Self::Context: Default,
    {
        let mut runtime = RoutineRuntime::new(self);
        let address = runtime.context.session().address().clone();
        crb_core::spawn(runtime.entrypoint());
        address
    }
}