1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
//! Core of the actor
//!
//! This object is essentially the actor itself. It encapsulate in self actor object, mailbox and
//! dispatcher. Contains references to the actor system and other elements of the system.
//!

use crate::common::tsafe::TSafe;
use crate::actors::dispatcher::Dispatcher;
use crate::actors::mailbox::Mailbox;
use crate::actors::actor_context::ActorContext;
use crate::actors::actor::Actor;
use crate::actors::actor_path::ActorPath;
use crate::actors::abstract_actor_system::AbstractActorSystem;
use crate::actors::envelope::Envelope;
use crate::actors::abstract_actor_ref::AbstractActorRef;
use crate::actors::abstract_actor_ref::ActorRef;
use crate::actors::local_actor_ref::LocalActorRef;
use crate::actors::watcher::WatchingEvents;
use crate::actors::message::Message;
use crate::actors::error::Error;
use crate::actors::supervision::SupervisionStrategy;
use std::collections::HashMap;
use std::any::Any;

pub struct ActorCell {

    /// Reference to the message dispatcher
    pub dispatcher: TSafe<Dispatcher + Send>,

    /// Actor mailbox
    pub mailbox: TSafe<Mailbox + Send>,

    /// Executor asynchronous block id. Actually this value represents the thread id, on
    /// the actor messages will processed. See default_dispatcher for more info about actor's
    /// async mechanics.
    pub bid: usize,

    /// Object which extends the actor trait and contain application logic
    pub actor: TSafe<Actor + Send>,

    ///  Actor path object that represents the actor position in the actors hierarchy
    pub path: TSafe<ActorPath>,

    /// Reference to the actor system
    pub system: TSafe<AbstractActorSystem + Send>,

    /// Suspend flag. See the suspend method description for more details
    pub suspended: bool,

    /// Stop flag. See the start method description for more details
    pub stopped: bool,

    /// Parent actor cell of actor
    pub parent: Option<TSafe<ActorCell>>,

    /// Actor's childs
    pub childs: HashMap<String, TSafe<ActorCell>>,

    /// Actor's supervision strategy
    pub supervision_strategy: SupervisionStrategy
}

impl ActorCell {

    /// Create new actor cell. This is the internal constructor and should never be used in a
    /// user code.
    pub fn new(system: TSafe<AbstractActorSystem + Send>,
        path: TSafe<ActorPath>,
        actor: TSafe<Actor + Send>,
        bid: usize,
        dispatcher: TSafe<Dispatcher + Send>,
        mailbox: TSafe<Mailbox + Send>,
        parent: Option<TSafe<ActorCell>>,
        supervision_strategy: SupervisionStrategy) -> ActorCell {

        ActorCell {
            actor,
            bid,
            dispatcher,
            mailbox,
            path,
            system,
            suspended: false,
            stopped: true,
            parent,
            childs: HashMap::new(),
            supervision_strategy
        }
    }

    /// Fail handler of actor. This method calls when message handler was completed with error.
    /// Here code realize a supervision strategy of actor and decide what to do next.
    pub fn fail(&mut self, err: Error, boxed_self: TSafe<ActorCell>) -> impl FnOnce() -> () {
        //self.suspended = true;

        let system = self.system.clone();
        let path = self.path.clone();
        let supervision_strategy = self.supervision_strategy.clone();

        move || {
            let self_: ActorRef =  Box::new(LocalActorRef::new(boxed_self.clone(), path));
            let sender = system.lock().unwrap().dead_letters();
            let system = system.clone();

            let ctx = ActorContext::new(sender, self_.clone(), system, boxed_self.clone());

            match supervision_strategy {
                SupervisionStrategy::Resume => {
                    let actor = {
                        let boxed_self = boxed_self.lock().unwrap();
                        boxed_self.actor.clone()
                    };

                    actor.lock().unwrap().pre_fail(ctx, err, SupervisionStrategy::Resume);
                },
                SupervisionStrategy::Restart => {
                    let actor = {
                        let boxed_self = boxed_self.lock().unwrap();
                        boxed_self.actor.clone()
                    };

                    actor.lock().unwrap().pre_fail(ctx, err, SupervisionStrategy::Restart);
                    let f = boxed_self.lock().unwrap().restart(boxed_self.clone());
                    f();
                },
                SupervisionStrategy::Stop => {
                    let actor = {
                        let boxed_self = boxed_self.lock().unwrap();
                        boxed_self.actor.clone()
                    };

                    actor.lock().unwrap().pre_fail(ctx, err, SupervisionStrategy::Stop);
                    let f = boxed_self.lock().unwrap().stop(boxed_self.clone());
                    f();
                },
                SupervisionStrategy::Escalate => {
                    let (actor, parent) = {
                        let boxed_self = boxed_self.lock().unwrap();
                        (boxed_self.actor.clone(), boxed_self.parent.clone())
                    };

                    actor.lock().unwrap().pre_fail(ctx, err.clone(), SupervisionStrategy::Escalate);
                    let parent = parent.unwrap();
                    let f = parent.lock().unwrap().fail(err, parent.clone());
                    f();
                }
            };
        }
    }

    /// Starts the actor. Creates him context, obtain bid form the dispatcher, run preStart hook
    /// and permits message receiving through dropping the stopped flag.
    pub fn start(self: &mut Self, boxed_self: TSafe<ActorCell>) -> impl FnOnce() -> () {
        self.bid = self.dispatcher.lock().unwrap().obtain_bid();
        //println!("Bid = {}", self.bid);

        let self_ =  Box::new(LocalActorRef::new(boxed_self.clone(), self.path.clone()));
        let sender = self.system.lock().unwrap().dead_letters();
        let system = self.system.clone();

        let ctx = ActorContext::new(sender, self_, system, boxed_self.clone());

        move || {
            let actor = {
                let boxed_self = boxed_self.lock().unwrap();
                boxed_self.actor.clone()
            };

            actor.lock().unwrap().pre_start(ctx);
            boxed_self.lock().unwrap().stopped = false;
        }
    }

    pub fn restart(self: &mut Self, boxed_self: TSafe<ActorCell>) -> impl FnOnce() -> ()  {
        let self_ =  Box::new(LocalActorRef::new(boxed_self.clone(), self.path.clone()));
        let sender = self.system.lock().unwrap().dead_letters();
        let system = self.system.clone();

        let ctx = ActorContext::new(sender, self_, system, boxed_self.clone());

        move || {
            let f = {
                let mut boxed_self_o = boxed_self.lock().unwrap();
                let f = boxed_self_o.stop(boxed_self.clone());
                f
            };
            f();

            let f = {
                let mut boxed_self_o = boxed_self.lock().unwrap();
                let f = boxed_self_o.start(boxed_self.clone());
                f
            };
            f();

            let (actor, system) = {
                let boxed_self = boxed_self.lock().unwrap();
                (boxed_self.actor.clone(), boxed_self.system.clone())
            };

            actor.lock().unwrap().post_restart(ctx);
        }
    }

    /// Stops the actor. Prohibits receiving new messages and calls the postStop hook.
    pub fn stop(self: &mut Self, boxed_self: TSafe<ActorCell>) -> impl FnOnce() -> () {
        self.stopped = true;

        //FIXME this is potential memory leak place! What happen if an actor is stopped but his mailbox is not empty?
        //self.mailbox.lock().unwrap().clean_up();

        // Stop all childs
        for (path, cell) in self.childs.iter() {
            let cell = cell.clone();
            let boxed_cell = cell.clone();
            let  f = cell.lock().unwrap().stop(boxed_cell);
            f();
        }
        self.childs.clear();

        let self_: ActorRef =  Box::new(LocalActorRef::new(boxed_self.clone(), self.path.clone()));
        let sender = self.system.lock().unwrap().dead_letters();
        let system = self.system.clone();

        let ctx = ActorContext::new(sender, self_.clone(), system, boxed_self.clone());

        move || {
            let (actor, system) = {
                let boxed_self = boxed_self.lock().unwrap();
                (boxed_self.actor.clone(), boxed_self.system.clone())
            };

            actor.lock().unwrap().post_stop(ctx);
            system.lock().unwrap().register_watch_event(&self_, WatchingEvents::Terminated);

        }

    }

    /// Suspends the actor. Prohibits receiving new messages.
    pub fn suspend(self: &mut Self) {
        self.suspended = true;
    }

    /// Sends the message to the actor. Creates new envelope with the message and indicates to
    /// dispatcher to schedule execution of this envelope. Message sends to the actors may be done,
    /// only if flags suspended and stopped will be dropped. Otherwise, the message will be dropped
    /// to deadLetter.
    pub fn send(self: &mut Self,
                boxed_self: &TSafe<ActorCell>,
                msg: Message,
                rself: Option<ActorRef>,
                to_ref: Box<AbstractActorRef + Send>) {

        // If cell does not receive new messages, drops message to the deadLetter
        if self.stopped || self.suspended {
            let mut dead_letters = self.system.lock().unwrap().dead_letters();
            dead_letters.cell().lock().unwrap().send(&dead_letters.cell(),
                                                   msg, rself,
                                                   to_ref);
        } else {
            let envelope = Envelope::new(
                msg,
                rself,
                to_ref,
                self.system.clone());


            self.dispatcher.lock().unwrap().dispatch(
                boxed_self.clone(),
                self.bid,
                self.mailbox.clone(),
                self.actor.clone(), envelope);
        }
    }

    /// Performs action identical to the send method do, but with ignoring state of the stopping
    /// flags.
    pub fn force_send(self: &mut Self,
                      boxed_self: TSafe<ActorCell>,
                      msg: Message,
                      rself: Option<Box<AbstractActorRef + Send>>,
                      to_ref: Box<AbstractActorRef + Send>) {

        let envelope = Envelope::new(
            msg,
            rself,
            to_ref,
            self.system.clone());

        self.dispatcher.lock().unwrap().dispatch(
            boxed_self,
            self.bid,
            self.mailbox.clone(),
            self.actor.clone(), envelope);
    }
}

//impl Drop for ActorCell {
//    fn drop(&mut self) {
//        println!("ActorCell dropped")
//    }
//}
//


// Attentions!!! This object does't do be cloned. Cloned must by on the boxed (TSave) value of the
// cell.