1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
//!
//! Supervisor related code. Including their management, creation, and destruction.

use crate::child::{BastionChildren, BastionClosure, Message};
use crate::context::BastionContext;
use crossbeam_channel::unbounded;
use std::cmp::Ordering;
use std::panic::AssertUnwindSafe;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use uuid::Uuid;

///
/// Identifier struct for Supervisor
/// System uses this to assemble resource name for children and supervisors.
#[derive(Clone, PartialOrd, PartialEq, Eq, Debug)]
pub struct SupervisorURN {
    /// Supervisor's system name
    pub sys: String,
    /// Supervisor's name
    pub name: String,
    /// Supervisor's unique identifier
    pub res: String,
}

impl Default for SupervisorURN {
    fn default() -> Self {
        let uuid_gen = Uuid::new_v4();
        SupervisorURN {
            sys: "bastion".to_owned(),
            name: "default-supervisor".to_owned(),
            res: uuid_gen.to_string(),
        }
    }
}

impl Ord for SupervisorURN {
    fn cmp(&self, other: &Self) -> Ordering {
        self.sys
            .cmp(&other.sys)
            .then(self.name.cmp(&other.name))
            .then(self.res.cmp(&other.res))
    }
}

///
/// Possible supervision strategies to pass to the supervisor.
///
/// **OneForOne**: If a child gets killed only that child will be restarted under the supervisor.
///
/// **OneForAll**: If a child gets killed all children at the same level under the supervision will be restarted.
///
/// **RestForOne**: If a child gets killed restart the rest of the children at the same level under the supervisor.
#[derive(Clone, Debug)]
pub enum SupervisionStrategy {
    /// If a child gets killed only that child will be restarted under the supervisor.
    ///
    /// Example is from [learnyousomeerlang.com](https://learnyousomeerlang.com):
    ///
    /// ![](https://learnyousomeerlang.com/static/img/restart-one-for-one.png)
    OneForOne,
    /// If a child gets killed all children at the same level under the supervision will be restarted.
    ///
    /// Example is from [learnyousomeerlang.com](https://learnyousomeerlang.com):
    ///
    /// ![](https://learnyousomeerlang.com/static/img/restart-one-for-all.png)
    OneForAll,
    /// If a child gets killed restart the rest of the children at the same level under the supervisor.
    ///
    /// Example is from [learnyousomeerlang.com](https://learnyousomeerlang.com):
    ///
    /// ![](https://learnyousomeerlang.com/static/img/restart-rest-for-one.png)
    RestForOne,
}

impl Default for SupervisionStrategy {
    fn default() -> Self {
        SupervisionStrategy::OneForOne
    }
}

///
/// Supervisor definition to keep track of supervisor information. e.g:
/// * context
/// * children
/// * strategy
/// * supervisor identifier
#[derive(Default, Clone, Debug)]
pub struct Supervisor {
    /// Supervisor's URN scheme
    pub urn: SupervisorURN,
    /// Supervisor's context
    pub(crate) ctx: BastionContext,
    /// Supervisor's strategy
    pub(crate) strategy: SupervisionStrategy,
}

///
/// Builder pattern for supervisors.
impl Supervisor {
    ///
    /// Assign properties of the supervisor
    ///
    /// # Arguments
    /// * `name` - name of the supervisor
    /// * `system` - system name for the supervisor. Can be used for grouping dependent supervisors.
    ///
    /// # Example
    /// ```rust
    ///# use bastion::prelude::*;
    ///# let name = "supervisor-name";
    ///# let system = "system-name";
    /// Supervisor::default().props(name.into(), system.into());
    /// ```
    pub fn props(mut self, name: String, system: String) -> Self {
        let mut urn = SupervisorURN::default();
        urn.name = name;
        self.urn = urn;
        self.urn.sys = system;
        self
    }

    ///
    /// Assigns strategy for this supervisor
    ///
    /// # Arguments
    /// * `strategy` - An [SupervisionStrategy] for supervisor to define how to operate on failures.
    ///
    /// # Example
    /// ```rust
    ///# use bastion::prelude::*;
    ///# let name = "supervisor-name";
    ///# let system = "system-name";
    /// Supervisor::default().props(name.into(), system.into())
    ///    .strategy(SupervisionStrategy::RestForOne);
    /// ```
    pub fn strategy(mut self, strategy: SupervisionStrategy) -> Self {
        self.strategy = strategy;
        self
    }

    ///
    /// [Supervisor] level spawn function for child generation from the parent context.
    /// This context carries global broadcast for the system.
    /// Every context directly tied to the parent process.
    /// If you listen broadcast tx/rx pair in the parent process,
    /// you can communicate with the children with specific message type.
    ///
    /// Bastion doesn't enforce you to use specific Message type or force you to implement traits.
    /// Dynamic dispatch is made over heap fat ptrs and that means all message objects can be
    /// passed around with heap constructs.
    ///
    /// # Arguments
    /// * `thunk` - User code which will be executed inside the process.
    /// * `msg` - Initial message which will be passed to the thunk.
    /// * `scale` - How many children will be spawn with given `thunk` and `msg` as process body.
    ///
    /// # Examples
    /// ```
    ///# use bastion::prelude::*;
    ///# use std::{fs, thread};
    ///#
    ///# fn main() {
    ///#     Bastion::platform();
    ///#
    ///#     let message = "Supervision Message".to_string();
    ///#
    ///#     /// Name of the supervisor, and system of the new supervisor
    ///#     /// By default if you don't specify Supervisors use "One for One".
    ///#     /// Let's look at "One for One".
    ///Bastion::supervisor("file-reader", "remote-fs")
    ///    .strategy(SupervisionStrategy::OneForOne)
    ///    .children(
    ///        |p: BastionContext, _msg| {
    ///            println!("File below doesn't exist so it will panic.");
    ///            fs::read_to_string("cacophony").unwrap();
    ///
    ///            /// Hook to rebind to the system.
    ///            p.hook();
    ///        },
    ///        message, // Message for all redundant children
    ///        1_i32,   // Redundancy level
    ///    );
    ///# }
    /// ```
    pub fn children<F, M>(mut self, thunk: F, msg: M, scale: i32) -> Self
    where
        F: BastionClosure,
        M: Message,
    {
        let bt = Box::new(thunk);
        let msg_box = Box::new(msg);
        let (p, c) = unbounded();

        let children = BastionChildren {
            id: Uuid::new_v4().to_string(),
            tx: Some(p.clone()),
            rx: Some(c.clone()),
            redundancy: scale,
            msg: objekt::clone_box(&*msg_box),
            thunk: objekt::clone_box(&*bt),
        };

        self.ctx.descendants.push(children);
        self.ctx.bcast_rx = Some(c.clone());
        self.ctx.bcast_tx = Some(p.clone());
        self.ctx.parent = Some(Box::new(self.clone()));

        self
    }

    ///
    /// Launch completes the builder pattern.
    /// It is the main finalizer that sets necessary arguments prepares
    /// channels and registers supervisor to the runtime.
    ///
    /// Runtime can't be notified without calling [launch](struct.Supervisor.html#method.launch).
    ///
    /// # Examples
    /// ```
    ///# use bastion::prelude::*;
    ///# use std::{fs, thread};
    ///#
    ///# fn main() {
    ///#     Bastion::platform();
    ///#
    ///#     let message = "Supervision Message".to_string();
    ///#
    ///#     /// Name of the supervisor, and system of the new supervisor
    ///#     /// By default if you don't specify Supervisors use "One for One".
    ///#     /// Let's look at "One for One".
    ///Bastion::supervisor("file-reader", "remote-fs")
    ///    .strategy(SupervisionStrategy::OneForOne)
    ///    .children(
    ///        |p: BastionContext, _msg| {
    ///            println!("File below doesn't exist so it will panic.");
    ///            fs::read_to_string("cacophony").unwrap();
    ///
    ///            /// Hook to rebind to the system.
    ///            p.hook();
    ///        },
    ///        message, // Message for all redundant children
    ///        1_i32,   // Redundancy level
    ///    )
    ///    .launch(); // Launch finalizes supervisor build and registers to definition.
    ///# }
    /// ```
    pub fn launch(mut self) {
        for descendant in &self.ctx.descendants {
            let descendant = descendant.clone();

            for child_id in 0..descendant.redundancy {
                let tx = descendant.tx.as_ref().unwrap().clone();
                let rx = descendant.rx.clone().unwrap();

                let nt = objekt::clone_box(&*descendant.thunk);
                let msgr = objekt::clone_box(&*descendant.msg);
                let msgr_panic_handler = objekt::clone_box(&*descendant.msg);
                let mut if_killed = descendant.clone();
                if_killed.id = format!("{}::{}", if_killed.id, child_id);

                let mut this_spv = self.clone();
                let context_spv = self.clone();

                let f = future::lazy(move || {
                    nt(
                        BastionContext {
                            parent: Some(Box::new(context_spv.clone())),
                            descendants: context_spv.ctx.descendants,
                            killed: context_spv.ctx.killed,
                            bcast_rx: Some(rx.clone()),
                            bcast_tx: Some(tx.clone()),
                        },
                        msgr,
                    );
                    future::ok::<(), ()>(())
                });

                let k = AssertUnwindSafe(f)
                    .catch_unwind()
                    .then(|result| -> FutureResult<(), ()> {
                        this_spv.ctx.killed.push(if_killed);

                        // Already re-entrant code
                        if let Err(err) = result {
                            error!("Panic happened in supervised child - {:?}", err);
                            crate::bastion::Bastion::fault_recovery(this_spv, msgr_panic_handler);
                        }
                        future::ok(())
                    });

                let ark = crate::bastion::PLATFORM.clone();
                let mut runtime = ark.lock();
                let shared_runtime = &mut runtime.runtime;
                shared_runtime.spawn(k);
            }
        }


        // FIXME: There might be discrepancy between passed self and referenced self.
        // Fix this with either passing reference without Box (lifetimes sigh!)
        // Or use channels to send back to the supervision tree.
        self.ctx.parent = Some(Box::new(self.clone()));
    }
}