1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use crate::child::{BastionChildren, BastionClosure, Message};
use crate::context::BastionContext;
use crossbeam_channel::unbounded;
use std::cmp::Ordering;
use std::panic::AssertUnwindSafe;
use tokio::prelude::future::FutureResult;
use tokio::prelude::*;
use uuid::Uuid;

#[derive(Clone, PartialOrd, PartialEq, Eq, Debug)]
pub struct SupervisorURN {
    pub sys: String,  // Supervisor System Name
    pub name: String, // Supervisor Name
    pub res: String,  // Supervisor Identifier
}

impl Default for SupervisorURN {
    fn default() -> Self {
        let uuid_gen = Uuid::new_v4();
        SupervisorURN {
            sys: "bastion".to_owned(),
            name: "default-supervisor".to_owned(),
            res: uuid_gen.to_string(),
        }
    }
}

impl Ord for SupervisorURN {
    fn cmp(&self, other: &Self) -> Ordering {
        self.sys
            .cmp(&other.sys)
            .then(self.name.cmp(&other.name))
            .then(self.res.cmp(&other.res))
    }
}

#[derive(Clone, Debug)]
pub enum SupervisionStrategy {
    OneForOne,
    OneForAll,
    RestForOne,
}

impl Default for SupervisionStrategy {
    fn default() -> Self {
        SupervisionStrategy::OneForOne
    }
}

#[derive(Default, Clone, Debug)]
pub struct Supervisor {
    pub urn: SupervisorURN,
    pub(crate) descendants: Vec<BastionChildren>,
    pub(crate) killed: Vec<BastionChildren>,
    pub(crate) strategy: SupervisionStrategy,
}

impl Supervisor {
    pub fn props(mut self, name: String, system: String) -> Self {
        let mut urn = SupervisorURN::default();
        urn.name = name;
        self.urn = urn;
        self.urn.sys = system;
        self
    }

    pub fn strategy(mut self, strategy: SupervisionStrategy) -> Self {
        self.strategy = strategy;
        self
    }

    pub fn children<F, M>(mut self, thunk: F, msg: M, scale: i32) -> Self
    where
        F: BastionClosure,
        M: Message,
    {
        let bt = Box::new(thunk);
        let msg_box = Box::new(msg);
        let (p, c) = unbounded();

        let children = BastionChildren {
            id: Uuid::new_v4().to_string(),
            tx: Some(p),
            rx: Some(c),
            redundancy: scale,
            msg: objekt::clone_box(&*msg_box),
            thunk: objekt::clone_box(&*bt),
        };

        self.descendants.push(children);

        self
    }

    pub fn launch(self) {
        for descendant in &self.descendants {
            let descendant = descendant.clone();

            for child_id in 0..descendant.redundancy {
                let tx = descendant.tx.as_ref().unwrap().clone();
                let rx = descendant.rx.clone().unwrap();

                let nt = objekt::clone_box(&*descendant.thunk);
                let msgr = objekt::clone_box(&*descendant.msg);
                let msgr_panic_handler = objekt::clone_box(&*descendant.msg);
                let mut if_killed = descendant.clone();
                if_killed.id = format!("{}::{}", if_killed.id, child_id);

                let mut this_spv = self.clone();

                let f = future::lazy(move || {
                    nt(
                        BastionContext {
                            bcast_rx: Some(rx.clone()),
                            bcast_tx: Some(tx.clone()),
                        },
                        msgr,
                    );
                    future::ok::<(), ()>(())
                });

                let k = AssertUnwindSafe(f)
                    .catch_unwind()
                    .then(|result| -> FutureResult<(), ()> {
                        this_spv.killed.push(if_killed);

                        // Already re-entrant code
                        if let Err(err) = result {
                            error!("Panic happened in supervised child - {:?}", err);
                            crate::bastion::Bastion::fault_recovery(this_spv, msgr_panic_handler);
                        }
                        future::ok(())
                    });

                let ark = crate::bastion::PLATFORM.clone();
                let mut runtime = ark.lock().unwrap();
                let shared_runtime = &mut runtime.runtime;
                shared_runtime.spawn(k);
            }
        }
    }
}