robots/actors/
actor_cell.rs

1/// This module contains most of the internals of actors.
2///
3/// It is used to handle messages, system messages, termination, initialization, restarting and
4/// creation of actors.
5
6use std::any::Any;
7use std::collections::{HashMap, VecDeque};
8use std::mem;
9use std::sync::{Arc, Mutex, RwLock, Weak};
10
11use actors::{Actor, ActorPath, ActorRef, ActorSystem, Message, Props};
12use actors::future::{Computation, Complete, Future, FutureState};
13use actors::name_resolver::ResolveRequest;
14use actors::props::ActorFactory;
15
16/// Closure to handle failure of an Actor.
17pub type FailureHandler = Arc<Fn(Failure, ActorCell) + Send + Sync>;
18
19enum Ref<T: ?Sized> {
20    StrongRef(Arc<T>),
21    WeakRef(Weak<T>),
22}
23
24macro_rules! unwrap_inner {
25    ($r:expr, $b:block) => {
26        match $r {
27            Ref::StrongRef(ref inner) => inner.clone(),
28            Ref::WeakRef(ref inner) => match inner.upgrade() {
29                Some(inner) => inner.clone(),
30                None => {
31                    $b
32                },
33            }
34        }
35    }
36}
37
38/// Main interface for interractiong with an Actor for the internals.
39pub struct ActorCell {
40    // We have an inner structure in order to be able to generate new ActorCell easily.
41    inner_cell: Ref<InnerActorCell>,
42}
43
44impl Clone for ActorCell {
45    fn clone(&self) -> ActorCell {
46        ActorCell {
47            inner_cell: Ref::WeakRef(match self.inner_cell {
48                Ref::StrongRef(ref inner) => Arc::downgrade(&inner),
49                Ref::WeakRef(ref inner) => inner.clone(),
50            }),
51        }
52    }
53}
54
55
56impl ActorCell {
57    /// Creates a new ActorCell.
58    pub fn new( props: Arc<ActorFactory>,
59               system: ActorSystem,
60               father: ActorRef,
61               path: Arc<ActorPath>)
62               -> ActorCell {
63        ActorCell {
64            inner_cell: Ref::StrongRef(Arc::new(InnerActorCell::new(props,
65                                                                    system,
66                                                                    father,
67                                                                    path))),
68        }
69    }
70
71    /// Puts a message with its sender in the Actor's mailbox and schedules the Actor.
72    pub fn receive_message(&self, message: InnerMessage, sender: ActorRef) {
73        let inner = unwrap_inner!(self.inner_cell, {
74            warn!("A message was send to a ref to a stopped actor");
75            return;
76        });
77        inner.receive_message(message, sender);
78        inner.system.enqueue_actor(self.actor_ref());
79    }
80
81    /// Puts a system message with its sender in the Actor's system mailbox and schedules the Actor.
82    pub fn receive_system_message(&self, system_message: SystemMessage) {
83        let inner = unwrap_inner!(self.inner_cell, {
84            warn!("A message was send to a ref to a stopped actor");
85            return;
86        });
87        inner.receive_system_message(system_message);
88        inner.system.enqueue_actor(self.actor_ref());
89    }
90
91    /// Makes the Actor handle an envelope in its mailbox.
92    pub fn handle_envelope(&self) {
93        let inner = unwrap_inner!(self.inner_cell, {
94            warn!("A message was send to a ref to a stopped actor");
95            return;
96        });
97        inner.handle_envelope(self.clone());
98    }
99}
100
101/// This is the API that Actors are supposed to see of their context while handling a message.
102pub trait ActorContext {
103    /// Returns an ActorRef to the Actor.
104    fn actor_ref(&self) -> ActorRef;
105
106    /// Spawns a child actor.
107    fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> Result<ActorRef, &'static str>;
108
109    /// Sends a Message to the targeted ActorRef.
110    fn tell<MessageTo: Message>(&self, to: ActorRef, message: MessageTo);
111
112    /// Creates a Future, this Future will send the message to the targetted ActorRef (and thus be
113    /// the sender of the message).
114    fn ask<MessageTo: Message>(&self, to: ActorRef, message: MessageTo, future_name: String) -> ActorRef;
115
116    /// Completes a Future.
117    fn complete<MessageTo: Message>(&self, to: ActorRef, complete: MessageTo);
118
119    /// Tells a future to forward its result to another Actor.
120    /// The Future is then dropped.
121    fn forward_result<T: Message>(&self, future: ActorRef, to: ActorRef);
122
123    /// Tells a future to forward its result to another Future that will be completed with this
124    /// result.
125    /// The Future is then dropped.
126    fn forward_result_to_future<T: Message>(&self, future: ActorRef, to: ActorRef);
127
128    /// Sends the Future a closure to apply on its value, the value will be updated with the output
129    /// of the closure.
130    fn do_computation<T: Message, F: Fn(Box<Any + Send>, ActorCell) -> T + Send + Sync + 'static>
131        (&self, future: ActorRef, closure: F);
132
133    /// Requests the targeted actor to stop.
134    fn stop(&self, actor_ref: ActorRef);
135
136    /// Asks the father of the actor to terminate it.
137    fn kill_me(&self);
138
139    /// Returns an Arc to the sender of the message being handled.
140    fn sender(&self) -> ActorRef;
141
142    /// Father of the actor.
143    fn father(&self) -> ActorRef;
144
145    /// Children of the actor.
146    fn children(&self) -> HashMap<Arc<ActorPath>, ActorRef>;
147
148    /// Lifecycle monitoring, list of monitored actors.
149    fn monitoring(&self) -> HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)>;
150
151    /// Actors monitoring this actor.
152    fn monitored_by(&self) -> Vec<ActorRef>;
153
154    /// Monitor an actor with the given handler.
155    fn monitor(&self, actor: ActorRef, handler: FailureHandler);
156
157    /// Logical path to the actor, such as `/user/foo/bar/baz`
158    fn path(&self) -> Arc<ActorPath>;
159
160    /// Future containing an Option<ActorRef> with an ActtorRef to the Actor with the given logical
161    /// path.
162    ///
163    /// The future will have the path: `$actor/$name_request`
164    fn identify_actor(&self, logical_path: String, request_name: String) -> ActorRef;
165
166    /// Sends a control message to the given actor.
167    fn tell_control(&self, actor: ActorRef, message: ControlMessage);
168
169    /// Puts the actor in a state of failure with the given reason.
170    fn fail(&self, reason: &'static str);
171}
172
173impl ActorContext for ActorCell {
174    fn actor_ref(&self) -> ActorRef {
175        ActorRef::with_cell(self.clone(), self.path())
176    }
177
178    fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> Result<ActorRef, &'static str> {
179        let inner = unwrap_inner!(self.inner_cell, {
180            panic!("Tried to create an actor from the context of a no longer existing actor");
181        });
182
183        // We check that there is no path traversal.
184        if name.find("/") != None {
185            return Err("Used a '/' in the name of an actor, this is not allowed");
186        }
187
188        let path = self.path().child(name);
189        info!("creating actor {}", path.logical_path());
190        let inner_cell = InnerActorCell::new(props,
191                                             inner.system.clone(),
192                                             self.actor_ref(),
193                                             path.clone());
194        let actor_cell = ActorCell { inner_cell: Ref::StrongRef(Arc::new(inner_cell)) };
195        let internal_ref = ActorRef::with_cell(actor_cell, path.clone());
196        let external_ref = internal_ref.clone();
197        inner.children.lock().unwrap().insert(path.clone(), internal_ref);
198        inner.monitoring.lock().unwrap().insert(path.clone(), (external_ref.clone(), Arc::new(InnerActorCell::restart_child)));
199        external_ref.receive_system_message(SystemMessage::Start);
200        // This is a bit messy, but we have a chicken / egg issue otherwise when creating the name
201        // resolver actor.
202        if *(path.logical_path()) != "/system/name_resolver" {
203            self.tell(inner.system.name_resolver(), ResolveRequest::Add(external_ref.clone()));
204        }
205        Ok(external_ref)
206    }
207
208    fn tell<MessageTo: Message>(&self, to: ActorRef, message: MessageTo) {
209        // FIXME(gamazeps): Code duplication.
210        let path = to.path();
211        match *path {
212            ActorPath::Local(_) => to.receive(InnerMessage::Message(Box::new(message)), self.actor_ref()),
213            ActorPath::Distant(ref path) => {
214                info!("Sent a message of size {} to distant actor {}:{}", mem::size_of::<MessageTo>(),
215                path.distant_logical_path(), path.addr_port());
216            },
217        }
218    }
219
220    fn ask<MessageTo: Message>(&self, to: ActorRef, message: MessageTo, name: String) -> ActorRef {
221        let future = self.actor_of(Props::new(Arc::new(Future::new), ()), name).unwrap();
222        future.tell_to(to, message);
223        future
224    }
225
226    fn complete<MessageTo: Message>(&self, future: ActorRef, complete: MessageTo) {
227        // FIXME(gamazeps): Code duplication.
228        // This is a copy of the code in tell, but we need to do that in order to put a Box<Any> in
229        // the mailbox.
230        let path = future.path();
231        match *path {
232            ActorPath::Local(_) => future.receive(InnerMessage::Message(Box::new(Complete::new(Box::new(complete)))), self.actor_ref()),
233            ActorPath::Distant(ref path) => {
234                info!("Sent a message of size {} to distant future {}:{}", mem::size_of::<MessageTo>(),
235                path.distant_logical_path(), path.addr_port());
236            },
237        }
238    }
239
240    fn forward_result<T: Message>(&self, future: ActorRef, actor: ActorRef) {
241        self.tell(future, Computation::Forward(actor, Arc::new(move |value, context, to| {
242            let value = Box::<Any + Send>::downcast::<T>(value).expect("Message of the wrong type");
243            context.tell(to, *value);
244            FutureState::Extracted
245        })));
246    }
247
248    fn forward_result_to_future<T: Message>(&self, future: ActorRef, actor: ActorRef) {
249        self.tell(future, Computation::Forward(actor, Arc::new(move |value, context, to| {
250            let value = Box::<Any + Send>::downcast::<T>(value).expect("Message of the wrong type");
251            context.complete(to, *value);
252            FutureState::Extracted
253        })));
254    }
255
256    fn do_computation<T: Message, F: Fn(Box<Any + Send>, ActorCell) -> T + Send + Sync + 'static>
257        (&self, future: ActorRef, closure: F) {
258        self.tell(future, Computation::Computation(Arc::new(move |value, context| {
259            let v = closure(value, context);
260            FutureState::Computing(Box::new(v))
261        })));
262    }
263
264    fn sender(&self) -> ActorRef {
265        let inner = unwrap_inner!(self.inner_cell, {
266            panic!("Tried to get a sender from the context of a no longer existing actor");
267        });
268        // This is weird but this is for clippy.
269        let current_sender = inner.current_sender.lock().unwrap();
270        current_sender.as_ref().unwrap().clone()
271    }
272
273    fn tell_control(&self, actor: ActorRef, message: ControlMessage) {
274        let path = actor.path();
275        match *path {
276            ActorPath::Local(_) => actor.receive(InnerMessage::Control(message), self.actor_ref()),
277            ActorPath::Distant(_) => {},
278        }
279    }
280
281    fn stop(&self, actor_ref: ActorRef) {
282        self.tell_control(actor_ref, ControlMessage::PoisonPill);
283    }
284
285    fn kill_me(&self) {
286        self.tell_control(self.father(), ControlMessage::KillMe(self.actor_ref()));
287    }
288
289    fn father(&self) -> ActorRef {
290        let inner = unwrap_inner!(self.inner_cell, {
291            panic!("Tried to get the father from the context of a no longer existing actor");
292        });
293        inner.father.clone()
294    }
295
296    fn children(&self) -> HashMap<Arc<ActorPath>, ActorRef> {
297        let inner = unwrap_inner!(self.inner_cell, {
298            panic!("Tried to get the children from the context of a no longer existing actor");
299        });
300        let children = inner.children.lock().unwrap();
301        children.clone()
302    }
303
304    fn monitoring(&self) -> HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)> {
305        let inner = unwrap_inner!(self.inner_cell, {
306            panic!("Tried to get the monitored actors from the context of a no longer existing \
307                    actor");
308        });
309        let monitoring = inner.monitoring.lock().unwrap();
310        monitoring.clone()
311    }
312
313    fn monitored_by(&self) -> Vec<ActorRef> {
314        let inner = unwrap_inner!(self.inner_cell, {
315            panic!("Tried to get the monitoring actors from the context of a no longer existing actor");
316        });
317        let monitored_by = inner.monitored_by.lock().unwrap();
318        monitored_by.clone()
319    }
320
321    fn monitor(&self, actor: ActorRef, handler: FailureHandler) {
322        let inner = unwrap_inner!(self.inner_cell, {
323            panic!("tried to have a no longer existing actor monitor an other actor?")
324        });
325        self.tell_control(actor.clone(), ControlMessage::RegisterMonitoring);
326        let mut monitoring = inner.monitoring.lock().unwrap();
327        monitoring.insert(actor.path(), (actor, handler));
328    }
329
330    fn path(&self) -> Arc<ActorPath> {
331        let inner = unwrap_inner!(self.inner_cell, {
332            panic!("Tried to get the path from the context of a no longer existing actor");
333        });
334        inner.path.clone()
335    }
336
337    fn identify_actor(&self, name: String, request_name: String) -> ActorRef {
338        let inner = unwrap_inner!(self.inner_cell, {
339            panic!("Tried to get the actor system of a no longer existing actor while resolving \
340                    a path. This should *never* happen");
341        });
342        self.ask(inner.system.name_resolver(), ResolveRequest::Get(name), request_name)
343    }
344
345    fn fail(&self, reason: &'static str) {
346        let inner = unwrap_inner!(self.inner_cell, {
347            panic!("Tried to get the state of a no longer existing actor while resolving \
348                    a path. This should *never* happen");
349        });
350        {*inner.actor_state.write().unwrap() = ActorState::Failed;}
351        for actor in self.monitored_by().iter() {
352            self.tell_control(actor.clone(), ControlMessage::Failure(Failure::new(self.actor_ref(), reason)));
353        }
354    }
355}
356
357#[derive(PartialEq, Copy, Clone)]
358/// Interna representation of the actor's state.
359enum ActorState {
360    /// The actor has panicked and has not yet been restarded.
361    Failed,
362    /// The actor is up and running.
363    Running,
364    /// The actor is in a clean state, but has not initiazed itself yet.
365    Unstarted,
366}
367
368/// Structure used to send a failure message when the actor panics.
369struct Failsafe {
370    context: ActorCell,
371    active: bool,
372}
373
374impl Failsafe {
375    fn new(context: ActorCell) -> Failsafe {
376        Failsafe {
377            context: context,
378            active: true,
379        }
380    }
381
382    /// Cancels the failsafe, means that everything went normally.
383    fn cancel(mut self) {
384        self.active = false;
385    }
386}
387
388impl Drop for Failsafe {
389    fn drop(&mut self) {
390        if self.active {
391            self.context.fail("panic");
392        }
393    }
394}
395
396/// Special messages issued by the actor system.
397/// Note that these are treated with the highest priority and will thus be handled before any
398/// InnerMessage is handled.
399#[derive(Clone, Copy)]
400pub enum SystemMessage {
401    /// Restarts the actor by replacing it with a new version created with its ActorFactory.
402    Restart,
403
404    /// Tells the actor to initialize itself.
405    /// Note that the initialization is not done by the father for fairness reasons.
406    Start,
407}
408
409/// Structure used to store a message and its sender.
410struct Envelope {
411    message: InnerMessage,
412    sender: ActorRef,
413}
414
415/// Types of message that can be sent to an actor that will be treated normally.
416pub enum InnerMessage {
417    /// Regular message.
418    Message(Box<Any + Send>),
419
420    /// Control messages.
421    Control(ControlMessage),
422}
423
424/// Control Messages.
425#[derive(Clone)]
426pub enum ControlMessage {
427    /// Requests the termination of the actor.
428    /// This is what is sent when the `context.stop(actor_ref)` is called.
429    PoisonPill,
430
431    /// Tells an actor another failed.
432    Failure(Failure),
433
434    /// Message sent to the father of an actor to request being terminated.
435    KillMe(ActorRef),
436
437    /// Message sent to be notified of failures.
438    RegisterMonitoring,
439}
440
441#[derive(Clone)]
442/// Structurer containing Actor Failure informations.
443pub struct Failure {
444    source: ActorRef,
445    reason: &'static str,
446}
447
448impl Failure {
449    fn new(source: ActorRef, reason: &'static str) -> Failure {
450        Failure {
451            source: source,
452            reason: reason,
453        }
454    }
455    /// Actor that failed.
456    pub fn actor(&self) -> ActorRef {self.source.clone()}
457    /// Reason of failure.
458    pub fn reason(&self) -> &'static str {self.reason}
459}
460
461struct InnerActorCell {
462    mailbox: Mutex<VecDeque<Envelope>>,
463    system_mailbox: Mutex<VecDeque<SystemMessage>>,
464    props: Arc<ActorFactory>,
465    system: ActorSystem,
466    path: Arc<ActorPath>,
467    current_sender: Mutex<Option<ActorRef>>,
468    busy: Mutex<()>,
469    father: ActorRef,
470    children: Mutex<HashMap<Arc<ActorPath>, ActorRef>>,
471    monitoring: Mutex<HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)>>,
472    actor_state: Arc<RwLock<ActorState>>,
473    monitored_by: Mutex<Vec<ActorRef>>,
474    actor: RwLock<Arc<Actor>>,
475}
476
477impl InnerActorCell {
478    /// Constructor.
479    fn new(props: Arc<ActorFactory>,
480           system: ActorSystem,
481           father: ActorRef,
482           path: Arc<ActorPath>)
483           -> InnerActorCell {
484        InnerActorCell {
485            actor: RwLock::new(props.create()),
486            mailbox: Mutex::new(VecDeque::new()),
487            system_mailbox: Mutex::new(VecDeque::new()),
488            props: props,
489            system: system,
490            path: path,
491            current_sender: Mutex::new(None),
492            busy: Mutex::new(()),
493            father: father.clone(),
494            children: Mutex::new(HashMap::new()),
495            monitoring: Mutex::new(HashMap::new()),
496            actor_state: Arc::new(RwLock::new(ActorState::Unstarted)),
497            monitored_by: Mutex::new(vec![father.clone()]),
498        }
499    }
500
501    fn receive_envelope(&self, envelope: Envelope) {
502        self.mailbox.lock().unwrap().push_back(envelope);
503    }
504
505    fn receive_message(&self, message: InnerMessage, sender: ActorRef) {
506        self.receive_envelope(Envelope {
507            message: message,
508            sender: sender,
509        });
510    }
511
512    fn receive_system_message(&self, system_message: SystemMessage) {
513        self.system_mailbox.lock().unwrap().push_back(system_message);
514    }
515
516    fn handle_envelope(&self, context: ActorCell) {
517        // Now we do not want users to be able to touch current_sender while the actor is busy.
518        let _lock = self.busy.lock();
519        let failsafe = Failsafe::new(context.clone());
520        // System messages are handled first, so that we can restart an actor if he failed without
521        // loosing the messages in the mailbox.
522        // NOTE: This does not break the fact that messages sent by the same actor are treated in
523        // the order they are sent (if all to the same target actor), as system messages must not
524        // be sent by other actors by the user.
525        if let Some(message) = self.system_mailbox.lock().unwrap().pop_front() {
526            match message {
527                SystemMessage::Restart => self.restart(context),
528                SystemMessage::Start => self.start(context),
529            }
530            failsafe.cancel();
531            return;
532        }
533
534        let state = {self.actor_state.read().unwrap().clone()};
535        if state == ActorState::Running {
536            let envelope = match self.mailbox.lock().unwrap().pop_front() {
537                Some(envelope) => envelope,
538                None => {
539                    failsafe.cancel();
540                    return;
541                }
542            };
543            {
544                let mut current_sender = self.current_sender.lock().unwrap();
545                *current_sender = Some(envelope.sender.clone());
546            };
547            {
548                let actor = self.actor.read().unwrap();
549                match envelope.message {
550                    InnerMessage::Message(message) => {
551                        actor.receive(message, context);
552                    },
553                    InnerMessage::Control(message) => {
554                        match message {
555                            ControlMessage::PoisonPill => context.kill_me(),
556                            ControlMessage::Failure(failure) => {
557                                let monitoring = self.monitoring.lock().unwrap();
558                                let handler = monitoring.get(&failure.actor().path())
559                                    .expect("Received a failure notification from an unknown actor");
560                                (*handler.1)(failure, context);
561                            },
562                            ControlMessage::KillMe(actor_ref) => self.kill(actor_ref, context),
563                            ControlMessage::RegisterMonitoring => {
564                                let mut mon = self.monitored_by.lock().unwrap();
565                                mon.push(context.sender());
566                            },
567                        }
568                    }
569                }
570            }
571        } else {
572            self.system.enqueue_actor(context.actor_ref());
573        }
574
575        failsafe.cancel();
576    }
577
578    fn kill(&self, actor: ActorRef, context: ActorCell) {
579        self.children.lock().unwrap().remove(&actor.path()).expect(&format!("actor {} was asked to kill {} and cannot do that",
580                                             self.path.logical_path(),
581                                             actor.path().logical_path()));
582        context.tell(self.system.name_resolver(), ResolveRequest::Remove(actor.path()));
583    }
584
585    fn start(&self, context: ActorCell) {
586        self.actor.write().unwrap().pre_start(context);
587        *self.actor_state.write().unwrap() = ActorState::Running;
588    }
589
590    fn restart(&self, context: ActorCell) {
591        let mut actor = self.actor.write().unwrap();
592        actor.pre_restart(context.clone());
593        *actor = self.props.create();
594        actor.post_restart(context);
595        *self.actor_state.write().unwrap() = ActorState::Running;
596    }
597
598    fn restart_child(failure: Failure, _context: ActorCell) {
599        failure.actor().receive_system_message(SystemMessage::Restart);
600    }
601}
602
603impl Drop for InnerActorCell {
604    fn drop(&mut self) {
605        // FIXME(gamazeps) Looking at the logs it seems as though fathers are killed before their
606        // children, that is not the intended behaviour.
607        let actor = self.actor.write().unwrap();
608        info!("Actor {} is dropped", *self.path.logical_path());
609        actor.post_stop();
610    }
611}