1use std::sync::{Arc, Mutex, RwLock};
2use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
3use std::thread;
4
5use actors::{ActorPath, ActorRef, Message, Props};
6use actors::actor_cell::{ActorCell, SystemMessage};
7use actors::cthulhu::Cthulhu;
8use actors::future::{Future, FutureExtractor};
9use actors::name_resolver::NameResolver;
10use actors::props::ActorFactory;
11use actors::root_actor::RootActor;
12
13struct Relauncher {
15 actor_system: ActorSystem,
16 active: bool,
17}
18
19impl Relauncher {
20 fn new(actor_system: ActorSystem) -> Relauncher {
21 Relauncher {
22 actor_system: actor_system,
23 active: true,
24 }
25 }
26
27 fn cancel(mut self) {
28 self.active = false;
29 }
30}
31
32impl Drop for Relauncher {
33 fn drop(&mut self) {
34 if self.active {
35 self.actor_system.spawn_thread();
36 }
37 }
38}
39
40pub struct ActorSystem {
52 inner: Arc<InnerActorSystem>,
53}
54
55impl ActorSystem {
56 pub fn new(name: String) -> ActorSystem {
60 let actor_system = ActorSystem { inner: Arc::new(InnerActorSystem::new(name)) };
61 let cthulhu = Cthulhu::new(actor_system.clone());
62 let cthulhu = ActorRef::with_cthulhu(cthulhu);
63 info!("Created cthulhu");
64 *actor_system.inner.cthulhu.write().unwrap() = Some(cthulhu.clone());
65 let user_actor_path = ActorPath::new_local("/user".to_owned());
66 let user_actor_cell = ActorCell::new(Props::new(Arc::new(RootActor::new), ()),
67 actor_system.clone(),
68 cthulhu.clone(),
69 user_actor_path.clone());
70 let user_actor = ActorRef::with_cell(user_actor_cell, user_actor_path);
71 user_actor.receive_system_message(SystemMessage::Start);
72 info!("Created /user actor");
73 *actor_system.inner.user_actor.write().unwrap() = Some(user_actor);
74 let system_actor_path = ActorPath::new_local("/system".to_owned());
75 let system_actor_cell = ActorCell::new(Props::new(Arc::new(RootActor::new), ()),
76 actor_system.clone(),
77 cthulhu.clone(),
78 system_actor_path.clone());
79 let system_actor = ActorRef::with_cell(system_actor_cell, system_actor_path);
80 system_actor.receive_system_message(SystemMessage::Start);
81 info!("Created /system actor");
82 *actor_system.inner.system_actor.write().unwrap() = Some(system_actor);
83 actor_system.spawn_threads(1);
84 info!("Launched the first thread");
85 let name_resolver = actor_system.system_actor_of(Props::new(Arc::new(NameResolver::new), ()), "name_resolver".to_owned());
86 info!("Created the /system/name_resolver actor");
87 *actor_system.inner.name_resolver.write().unwrap() = Some(name_resolver);
88 actor_system
89 }
90
91 pub fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
93 self.inner.actor_of(props, name)
94 }
95
96 pub fn system_actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
98 self.inner.system_actor_of(props, name)
99 }
100
101 pub fn shutdown(&self) {
106 self.inner.shutdown();
107 }
108
109 pub fn enqueue_actor(&self, actor_ref: ActorRef) {
111 self.inner.enqueue_actor(actor_ref);
112 }
113
114 pub fn spawn_thread(&self) {
118 let actors_queue = self.inner.actors_queue_receiver.clone();
119 let rx = self.inner.consumer_threads_receiver.clone();
120 let actor_system = self.clone();
121 let _ = thread::spawn(move || {
122 let relauncher = Relauncher::new(actor_system.clone());
124 loop {
125 match rx.lock().unwrap().try_recv() {
127 Ok(_) | Err(TryRecvError::Disconnected) => {
128 relauncher.cancel();
129 break;
130 }
131 Err(TryRecvError::Empty) => {}
132 };
133
134 let actor_ref = {
136 let lock = actors_queue.lock().unwrap();
137 lock.try_recv()
138 };
139
140 match actor_ref {
141 Ok(actor_ref) => actor_ref.handle(),
142 Err(TryRecvError::Empty) => continue,
143 Err(TryRecvError::Disconnected) => {
144 relauncher.cancel();
145 actor_system.shutdown();
146 panic!("The actors queue failed, something is very wrong");
147 }
148 }
149 }
150 });
151 *self.inner.n_threads.lock().unwrap() += 1;
152 }
153
154 pub fn terminate_thread(&self) {
156 self.inner.terminate_thread();
157 }
158
159 pub fn spawn_threads(&self, n: u32) {
161 for _ in 0..n {
162 self.spawn_thread();
163 }
164 }
165
166 pub fn terminate_threads(&self, n: u32) {
168 self.inner.terminate_threads(n);
169 }
170
171 pub fn name_resolver(&self) -> ActorRef {
173 match self.inner.name_resolver.read().unwrap().as_ref() {
174 None => panic!("The name resolver is not initialized."),
175 Some(resolver) => resolver.clone(),
176 }
177 }
178
179 pub fn tell<M: Message>(&self, to: ActorRef, message: M) {
184 match self.inner.user_actor.read().unwrap().as_ref() {
185 Some(user_actor) => user_actor.tell_to(to, message),
186 None => unreachable!(),
187 }
188 }
189
190 pub fn ask<M: Message>(&self, to: ActorRef, message: M, name: String) -> ActorRef {
194 let future = self.actor_of(Props::new(Arc::new(Future::new), ()), name);
195 future.tell_to(to, message);
196 future
197 }
198
199 pub fn extract_result<M: Message>(&self, future: ActorRef) -> M {
206 let (tx, rx) = channel();
209 let _extractor = self.actor_of(Props::new(Arc::new(FutureExtractor::new), (future, Arc::new(Mutex::new(tx)))), "extractor".to_owned());
210 rx.recv().unwrap()
211 }
212}
213
214impl Clone for ActorSystem {
215 fn clone(&self) -> ActorSystem {
216 ActorSystem { inner: self.inner.clone() }
217 }
218}
219
220struct InnerActorSystem {
221 _name: String,
222 consumer_threads_sender: Mutex<Sender<()>>,
224 consumer_threads_receiver: Arc<Mutex<Receiver<()>>>,
225 n_threads: Mutex<u32>,
226 actors_queue_sender: Mutex<Sender<ActorRef>>,
228 actors_queue_receiver: Arc<Mutex<Receiver<ActorRef>>>,
230 cthulhu: RwLock<Option<ActorRef >>,
231 user_actor: RwLock<Option<ActorRef>>,
232 system_actor: RwLock<Option<ActorRef>>,
233 name_resolver: RwLock<Option<ActorRef>>,
235}
236
237impl InnerActorSystem {
238 fn new(name: String) -> InnerActorSystem {
239 let (tx_queue, rx_queue) = channel();
240 let (tx_thread, rx_thread) = channel();
241 InnerActorSystem {
242 _name: name,
243 consumer_threads_sender: Mutex::new(tx_thread),
244 consumer_threads_receiver: Arc::new(Mutex::new(rx_thread)),
245 n_threads: Mutex::new(0u32),
246 actors_queue_sender: Mutex::new(tx_queue),
247 actors_queue_receiver: Arc::new(Mutex::new(rx_queue)),
248 cthulhu: RwLock::new(None),
249 user_actor: RwLock::new(None),
250 system_actor: RwLock::new(None),
251 name_resolver: RwLock::new(None),
252 }
253 }
254
255 fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
259 match self.user_actor.read().unwrap().clone() {
262 Some(user_actor) => {
263 let (tx, rx) = channel::<Result<ActorRef, &'static str>>();
266 info!("Created the channel to get an ActorRef from a root actor");
267 self.cthulhu.read().unwrap().as_ref().unwrap().tell_to(user_actor, (props, name, Arc::new(Mutex::new(tx))));
268 rx.recv().unwrap().unwrap()
269 },
270 None => panic!("The user actor is not initialised"),
271 }
272 }
273
274 fn system_actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
275 match self.system_actor.read().unwrap().clone() {
278 Some(system_actor) => {
279 let (tx, rx) = channel::<Result<ActorRef, &'static str>>();
282 info!("Created the channel to get an ActorRef from a root actor");
283 self.cthulhu.read().unwrap().as_ref().unwrap().tell_to(system_actor, (props, name, Arc::new(Mutex::new(tx))));
284 rx.recv().unwrap().unwrap()
285 },
286 None => panic!("The system actor is not initialised"),
287 }
288 }
289
290 fn shutdown(&self) {
292 let n = {*self.n_threads.lock().unwrap()};
295 self.terminate_threads(n);
296 *self.user_actor.write().unwrap() = None;
297 *self.system_actor.write().unwrap() = None;
298 *self.cthulhu.write().unwrap() = None;
299 }
300
301 fn enqueue_actor(&self, actor_ref: ActorRef) {
303 match self.actors_queue_sender.lock().unwrap().send(actor_ref) {
304 Ok(_) => return,
305 Err(_) => {
306 self.shutdown();
307 panic!("The communication channel for messages is disconnected, this is bad!");
308 }
309 }
310 }
311
312 fn terminate_thread(&self) {
314 let _ = self.consumer_threads_sender.lock().unwrap().send(());
315 *self.n_threads.lock().unwrap() -= 1;
316 }
317
318 fn terminate_threads(&self, n: u32) {
320 for _ in 0..n {
321 self.terminate_thread();
322 }
323 }
324}
325
326impl Drop for InnerActorSystem {
327 fn drop(&mut self) { }
328}