1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use crate::{channel, Controller, Id, Operator, Signal};
use anyhow::Error;
use async_trait::async_trait;
use derive_more::{From, Into};
use futures::{select_biased, FutureExt, StreamExt};
use tokio::sync::watch;
use uuid::Uuid;
#[derive(Debug, From, Into)]
pub struct ShutdownReceiver {
status: watch::Receiver<LiteStatus>,
}
impl ShutdownReceiver {
pub async fn just_done(mut self) {
while self.status.recv().await.is_some() {
if self.status.borrow().is_done() {
break;
}
}
}
}
fn task<L: LiteTask>(lite_task: L, supervisor: Option<impl Into<Controller>>) -> Controller {
let id = Id::of_task(&lite_task);
let supervisor = supervisor.map(Into::into);
let (controller, operator) = channel::pair(id, supervisor);
let id = controller.id();
let runtime = LiteRuntime {
id,
lite_task: Some(lite_task),
operator,
};
tokio::spawn(runtime.entrypoint());
controller
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LiteStatus {
Alive,
Stop,
}
impl LiteStatus {
pub fn is_done(&self) -> bool {
*self == LiteStatus::Stop
}
}
#[async_trait]
pub trait LiteTask: Sized + Send + 'static {
fn name(&self) -> String {
let uuid = Uuid::new_v4();
format!("Task:{}({})", std::any::type_name::<Self>(), uuid)
}
fn start<T>(self, supervisor: Option<T>) -> Controller
where
T: Into<Controller> + Send,
{
task(self, supervisor)
}
async fn routine(self, signal: ShutdownReceiver) -> Result<(), Error>;
}
struct LiteRuntime<L: LiteTask> {
id: Id,
lite_task: Option<L>,
operator: Operator,
}
impl<L: LiteTask> LiteRuntime<L> {
async fn entrypoint(mut self) {
self.operator.initialize();
log::info!("Starting task: {:?}", self.id);
self.routine().await;
log::info!("Task finished: {:?}", self.id);
self.operator.finalize();
}
async fn routine(&mut self) {
let (shutdown_tx, shutdown_rx) = watch::channel(LiteStatus::Alive);
let lite_task = self.lite_task.take().unwrap();
let mut routine = lite_task.routine(shutdown_rx.into()).fuse();
let mut shutdown = Some(shutdown_tx);
loop {
select_biased! {
event = self.operator.next() => {
log::trace!("Stop signal received: {:?} for task {:?}", event, self.id);
let signal = event.expect("task controller couldn't be closed");
match signal {
Signal::Shutdown => {
}
Signal::Finished { .. } => {
panic!("Tasks don't support supervised childs.");
}
}
if let Some(tx) = shutdown.take() {
if tx.broadcast(LiteStatus::Stop).is_err() {
log::error!("Can't send shutdown signal to: {:?}", self.id);
}
}
}
done = routine => {
if let Err(err) = done {
log::error!("LiteTask {:?} failed: {}", self.id, err);
}
break;
}
}
}
}
}