1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use crate::child::{BastionChildren, BastionClosure, Message};
use crate::context::BastionContext;
use crossbeam_channel::unbounded;
use std::cmp::Ordering;
use std::panic::AssertUnwindSafe;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use uuid::Uuid;
#[derive(Clone, PartialOrd, PartialEq, Eq, Debug)]
pub struct SupervisorURN {
pub sys: String,
pub name: String,
pub res: String,
}
impl Default for SupervisorURN {
fn default() -> Self {
let uuid_gen = Uuid::new_v4();
SupervisorURN {
sys: "bastion".to_owned(),
name: "default-supervisor".to_owned(),
res: uuid_gen.to_string(),
}
}
}
impl Ord for SupervisorURN {
fn cmp(&self, other: &Self) -> Ordering {
self.sys
.cmp(&other.sys)
.then(self.name.cmp(&other.name))
.then(self.res.cmp(&other.res))
}
}
#[derive(Clone, Debug)]
pub enum SupervisionStrategy {
OneForOne,
OneForAll,
RestForOne,
}
impl Default for SupervisionStrategy {
fn default() -> Self {
SupervisionStrategy::OneForOne
}
}
#[derive(Default, Clone, Debug)]
pub struct Supervisor {
pub urn: SupervisorURN,
pub(crate) descendants: Vec<BastionChildren>,
pub(crate) killed: Vec<BastionChildren>,
pub(crate) strategy: SupervisionStrategy,
}
impl Supervisor {
pub fn props(mut self, name: String, system: String) -> Self {
let mut urn = SupervisorURN::default();
urn.name = name;
self.urn = urn;
self.urn.sys = system;
self
}
pub fn strategy(mut self, strategy: SupervisionStrategy) -> Self {
self.strategy = strategy;
self
}
pub fn children<F, M>(mut self, thunk: F, msg: M, scale: i32) -> Self
where
F: BastionClosure,
M: Message,
{
let bt = Box::new(thunk);
let msg_box = Box::new(msg);
let (p, c) = unbounded();
let children = BastionChildren {
id: Uuid::new_v4().to_string(),
tx: Some(p),
rx: Some(c),
redundancy: scale,
msg: objekt::clone_box(&*msg_box),
thunk: objekt::clone_box(&*bt),
};
self.descendants.push(children);
self
}
pub fn launch(self) {
for descendant in &self.descendants {
let descendant = descendant.clone();
for child_id in 0..descendant.redundancy {
let tx = descendant.tx.as_ref().unwrap().clone();
let rx = descendant.rx.clone().unwrap();
let nt = objekt::clone_box(&*descendant.thunk);
let msgr = objekt::clone_box(&*descendant.msg);
let msgr_panic_handler = objekt::clone_box(&*descendant.msg);
let mut if_killed = descendant.clone();
if_killed.id = format!("{}::{}", if_killed.id, child_id);
let mut this_spv = self.clone();
let f = future::lazy(move || {
nt(
BastionContext {
bcast_rx: Some(rx.clone()),
bcast_tx: Some(tx.clone()),
},
msgr,
);
future::ok::<(), ()>(())
});
let k = AssertUnwindSafe(f)
.catch_unwind()
.then(|result| -> FutureResult<(), ()> {
this_spv.killed.push(if_killed);
if let Err(err) = result {
error!("Panic happened in supervised child - {:?}", err);
crate::bastion::Bastion::fault_recovery(this_spv, msgr_panic_handler);
}
future::ok(())
});
let ark = crate::bastion::PLATFORM.clone();
let mut runtime = ark.lock().unwrap();
let shared_runtime = &mut runtime.runtime;
shared_runtime.spawn(k);
}
}
}
}