robots/actors/
actor_system.rs

1use std::sync::{Arc, Mutex, RwLock};
2use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
3use std::thread;
4
5use actors::{ActorPath, ActorRef, Message, Props};
6use actors::actor_cell::{ActorCell, SystemMessage};
7use actors::cthulhu::Cthulhu;
8use actors::future::{Future, FutureExtractor};
9use actors::name_resolver::NameResolver;
10use actors::props::ActorFactory;
11use actors::root_actor::RootActor;
12
13/// This is failsafe used to relaunch consumer threads if they panic!.
14struct Relauncher {
15    actor_system: ActorSystem,
16    active: bool,
17}
18
19impl Relauncher {
20    fn new(actor_system: ActorSystem) -> Relauncher {
21        Relauncher {
22            actor_system: actor_system,
23            active: true,
24        }
25    }
26
27    fn cancel(mut self) {
28        self.active = false;
29    }
30}
31
32impl Drop for Relauncher {
33    fn drop(&mut self) {
34        if self.active {
35            self.actor_system.spawn_thread();
36        }
37    }
38}
39
40/// The actor system is the struct that manages:
41///
42///   * The creation of the root actors.
43///   * The consumer threads.
44///   * Scheduling the actors.
45///
46/// It needs to be instantiated once at the beggining of the application. Then we need to specify
47/// the number of consumer threads that will be allocated.
48///
49/// Calling `shutdown`, will drop all the actors and terminate the consumer threads.
50/// Note that it will shut down the system even if some actors have still messages to handle.
51pub struct ActorSystem {
52    inner: Arc<InnerActorSystem>,
53}
54
55impl ActorSystem {
56    /// Creates a new ActorSystem.
57    ///
58    /// Note that one thread is started.
59    pub fn new(name: String) -> ActorSystem {
60        let actor_system = ActorSystem { inner: Arc::new(InnerActorSystem::new(name)) };
61        let cthulhu = Cthulhu::new(actor_system.clone());
62        let cthulhu = ActorRef::with_cthulhu(cthulhu);
63        info!("Created cthulhu");
64        *actor_system.inner.cthulhu.write().unwrap() = Some(cthulhu.clone());
65        let user_actor_path = ActorPath::new_local("/user".to_owned());
66        let user_actor_cell = ActorCell::new(Props::new(Arc::new(RootActor::new), ()),
67                                                actor_system.clone(),
68                                                cthulhu.clone(),
69                                                user_actor_path.clone());
70        let user_actor = ActorRef::with_cell(user_actor_cell, user_actor_path);
71        user_actor.receive_system_message(SystemMessage::Start);
72        info!("Created /user actor");
73        *actor_system.inner.user_actor.write().unwrap() = Some(user_actor);
74        let system_actor_path = ActorPath::new_local("/system".to_owned());
75        let system_actor_cell = ActorCell::new(Props::new(Arc::new(RootActor::new), ()),
76                                                actor_system.clone(),
77                                                cthulhu.clone(),
78                                                system_actor_path.clone());
79        let system_actor = ActorRef::with_cell(system_actor_cell, system_actor_path);
80        system_actor.receive_system_message(SystemMessage::Start);
81        info!("Created /system actor");
82        *actor_system.inner.system_actor.write().unwrap() = Some(system_actor);
83        actor_system.spawn_threads(1);
84        info!("Launched the first thread");
85        let name_resolver = actor_system.system_actor_of(Props::new(Arc::new(NameResolver::new), ()), "name_resolver".to_owned());
86        info!("Created the /system/name_resolver actor");
87        *actor_system.inner.name_resolver.write().unwrap() = Some(name_resolver);
88        actor_system
89    }
90
91    /// Spawns an Actor created using the Props given for the user.
92    pub fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
93        self.inner.actor_of(props, name)
94    }
95
96    /// Spawns an Actor created using the Props given for the system.
97    pub fn system_actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
98        self.inner.system_actor_of(props, name)
99    }
100
101    /// Shuts the actor system down.
102    ///
103    /// It will terminate all the actors (whether they still have messages to handle or not) and
104    /// then terminate the consumer threads.
105    pub fn shutdown(&self) {
106        self.inner.shutdown();
107    }
108
109    /// Enqueues the given ActorRef in the queue of ActorRef with message to handle.
110    pub fn enqueue_actor(&self, actor_ref: ActorRef) {
111        self.inner.enqueue_actor(actor_ref);
112    }
113
114    /// Spawns a thread that will have ActorRef handle their messages.
115    ///
116    /// This thread can be terminated by calling `terminate_thread`.
117    pub fn spawn_thread(&self) {
118        let actors_queue = self.inner.actors_queue_receiver.clone();
119        let rx = self.inner.consumer_threads_receiver.clone();
120        let actor_system = self.clone();
121        let _ = thread::spawn(move || {
122            // This is a failsafe used to relaunch a consumer thread if it panic!
123            let relauncher = Relauncher::new(actor_system.clone());
124            loop {
125                // We check if we received a termination request.
126                match rx.lock().unwrap().try_recv() {
127                    Ok(_) | Err(TryRecvError::Disconnected) => {
128                        relauncher.cancel();
129                        break;
130                    }
131                    Err(TryRecvError::Empty) => {}
132                };
133
134                // We try to get an ActorRef with a message to handle.
135                let actor_ref = {
136                    let lock = actors_queue.lock().unwrap();
137                    lock.try_recv()
138                };
139
140                match actor_ref {
141                    Ok(actor_ref) => actor_ref.handle(),
142                    Err(TryRecvError::Empty) => continue,
143                    Err(TryRecvError::Disconnected) => {
144                        relauncher.cancel();
145                        actor_system.shutdown();
146                        panic!("The actors queue failed, something is very wrong");
147                    }
148                }
149            }
150        });
151        *self.inner.n_threads.lock().unwrap() += 1;
152    }
153
154    /// Kills a consumer thread.
155    pub fn terminate_thread(&self) {
156        self.inner.terminate_thread();
157    }
158
159    /// Spawns n consumer threads.
160    pub fn spawn_threads(&self, n: u32) {
161        for _ in 0..n {
162            self.spawn_thread();
163        }
164    }
165
166    /// Kills n consumer threads.
167    pub fn terminate_threads(&self, n: u32) {
168        self.inner.terminate_threads(n);
169    }
170
171    /// Gives the ActorRef of the name resolver actor.
172    pub fn name_resolver(&self) -> ActorRef {
173        match self.inner.name_resolver.read().unwrap().as_ref() {
174            None => panic!("The name resolver is not initialized."),
175            Some(resolver) => resolver.clone(),
176        }
177    }
178
179    /// Sends a message to the given actor.
180    ///
181    /// The sender of the message is the user_actor, thus this expects that no answer will be
182    /// given.
183    pub fn tell<M: Message>(&self, to: ActorRef, message: M) {
184        match self.inner.user_actor.read().unwrap().as_ref() {
185            Some(user_actor) => user_actor.tell_to(to, message),
186            None => unreachable!(),
187        }
188    }
189
190    /// Creates a Future that will send the message to the targetted actor.
191    ///
192    /// The father of this Future is the user_actor.
193    pub fn ask<M: Message>(&self, to: ActorRef, message: M, name: String) -> ActorRef {
194        let future = self.actor_of(Props::new(Arc::new(Future::new), ()), name);
195        future.tell_to(to, message);
196        future
197    }
198
199    /// Extracts the result from a Future.
200    ///
201    /// This is not supposed to be used a lot as this is a synchronous call (if an actor wants to
202    /// get the result of a fututure it should use forward_result instead).
203    ///
204    /// The extraction creates an Extractor actor whose father is the user_actor.
205    pub fn extract_result<M: Message>(&self, future: ActorRef) -> M {
206        // NOTE: this creates a lot of things but this is not meant to be used outside of
207        // tests or examples so this is fine by my book.
208        let (tx, rx) = channel();
209        let _extractor = self.actor_of(Props::new(Arc::new(FutureExtractor::new), (future, Arc::new(Mutex::new(tx)))), "extractor".to_owned());
210        rx.recv().unwrap()
211    }
212}
213
214impl Clone for ActorSystem {
215    fn clone(&self) -> ActorSystem {
216        ActorSystem { inner: self.inner.clone() }
217    }
218}
219
220struct InnerActorSystem {
221    _name: String,
222    // Communication channels to the co,sumer threads.
223    consumer_threads_sender: Mutex<Sender<()>>,
224    consumer_threads_receiver: Arc<Mutex<Receiver<()>>>,
225    n_threads: Mutex<u32>,
226    // Sends ActorRefs to be handled on that channel.
227    actors_queue_sender: Mutex<Sender<ActorRef>>,
228    // Receiving end to give to the thread pool.
229    actors_queue_receiver: Arc<Mutex<Receiver<ActorRef>>>,
230    cthulhu: RwLock<Option<ActorRef >>,
231    user_actor: RwLock<Option<ActorRef>>,
232    system_actor: RwLock<Option<ActorRef>>,
233    // ActorRef to the name resolver.
234    name_resolver: RwLock<Option<ActorRef>>,
235}
236
237impl InnerActorSystem {
238    fn new(name: String) -> InnerActorSystem {
239        let (tx_queue, rx_queue) = channel();
240        let (tx_thread, rx_thread) = channel();
241        InnerActorSystem {
242            _name: name,
243            consumer_threads_sender: Mutex::new(tx_thread),
244            consumer_threads_receiver: Arc::new(Mutex::new(rx_thread)),
245            n_threads: Mutex::new(0u32),
246            actors_queue_sender: Mutex::new(tx_queue),
247            actors_queue_receiver: Arc::new(Mutex::new(rx_queue)),
248            cthulhu: RwLock::new(None),
249            user_actor: RwLock::new(None),
250            system_actor: RwLock::new(None),
251            name_resolver: RwLock::new(None),
252        }
253    }
254
255    /// Spawns an Actor for the user with the given ActorFactory.
256    ///
257    /// This will be part of the user cator hierarchy.
258    fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
259        // Not having the user actor in a Mutex is ok because the actor_of function already has
260        // mutual exclusion, so we are in the clear.
261        match self.user_actor.read().unwrap().clone() {
262            Some(user_actor) => {
263                // NOTE: this creates a lot of things but this is not meant to be used outside of
264                // the initialisation of the system so this is fine by my book.
265                let (tx, rx) = channel::<Result<ActorRef, &'static str>>();
266                info!("Created the channel to get an ActorRef from a root actor");
267                self.cthulhu.read().unwrap().as_ref().unwrap().tell_to(user_actor, (props, name, Arc::new(Mutex::new(tx))));
268                rx.recv().unwrap().unwrap()
269            },
270            None => panic!("The user actor is not initialised"),
271        }
272    }
273
274    fn system_actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
275        // Not having the user actor in a Mutex is ok because the actor_of function already has
276        // mutual exclusion, so we are in the clear.
277        match self.system_actor.read().unwrap().clone() {
278            Some(system_actor) => {
279                // NOTE: this creates a lot of things but this is not meant to be used outside of
280                // the initialisation of the system so this is fine by my book.
281                let (tx, rx) = channel::<Result<ActorRef, &'static str>>();
282                info!("Created the channel to get an ActorRef from a root actor");
283                self.cthulhu.read().unwrap().as_ref().unwrap().tell_to(system_actor, (props, name, Arc::new(Mutex::new(tx))));
284                rx.recv().unwrap().unwrap()
285            },
286            None => panic!("The system actor is not initialised"),
287        }
288    }
289
290    /// Shuts the actor system down.
291    fn shutdown(&self) {
292        // We have to get this out of the mutex, because terminate_threads would deadlock on
293        // n_thread.
294        let n = {*self.n_threads.lock().unwrap()};
295        self.terminate_threads(n);
296        *self.user_actor.write().unwrap() = None;
297        *self.system_actor.write().unwrap() = None;
298        *self.cthulhu.write().unwrap() = None;
299    }
300
301    /// Enqueues the given ActorRef in the list of ActorRef with messages to be handled.
302    fn enqueue_actor(&self, actor_ref: ActorRef) {
303        match self.actors_queue_sender.lock().unwrap().send(actor_ref) {
304            Ok(_) => return,
305            Err(_) => {
306                self.shutdown();
307                panic!("The communication channel for messages is disconnected, this is bad!");
308            }
309        }
310    }
311
312    /// Kills a consumer thread.
313    fn terminate_thread(&self) {
314        let _ = self.consumer_threads_sender.lock().unwrap().send(());
315        *self.n_threads.lock().unwrap() -= 1;
316    }
317
318    /// Kills n consumer threads.
319    fn terminate_threads(&self, n: u32) {
320        for _ in 0..n {
321            self.terminate_thread();
322        }
323    }
324}
325
326impl Drop for InnerActorSystem {
327    fn drop(&mut self) { }
328}