1use std::any::Any;
7use std::collections::{HashMap, VecDeque};
8use std::mem;
9use std::sync::{Arc, Mutex, RwLock, Weak};
10
11use actors::{Actor, ActorPath, ActorRef, ActorSystem, Message, Props};
12use actors::future::{Computation, Complete, Future, FutureState};
13use actors::name_resolver::ResolveRequest;
14use actors::props::ActorFactory;
15
16pub type FailureHandler = Arc<Fn(Failure, ActorCell) + Send + Sync>;
18
19enum Ref<T: ?Sized> {
20 StrongRef(Arc<T>),
21 WeakRef(Weak<T>),
22}
23
24macro_rules! unwrap_inner {
25 ($r:expr, $b:block) => {
26 match $r {
27 Ref::StrongRef(ref inner) => inner.clone(),
28 Ref::WeakRef(ref inner) => match inner.upgrade() {
29 Some(inner) => inner.clone(),
30 None => {
31 $b
32 },
33 }
34 }
35 }
36}
37
38pub struct ActorCell {
40 inner_cell: Ref<InnerActorCell>,
42}
43
44impl Clone for ActorCell {
45 fn clone(&self) -> ActorCell {
46 ActorCell {
47 inner_cell: Ref::WeakRef(match self.inner_cell {
48 Ref::StrongRef(ref inner) => Arc::downgrade(&inner),
49 Ref::WeakRef(ref inner) => inner.clone(),
50 }),
51 }
52 }
53}
54
55
56impl ActorCell {
57 pub fn new( props: Arc<ActorFactory>,
59 system: ActorSystem,
60 father: ActorRef,
61 path: Arc<ActorPath>)
62 -> ActorCell {
63 ActorCell {
64 inner_cell: Ref::StrongRef(Arc::new(InnerActorCell::new(props,
65 system,
66 father,
67 path))),
68 }
69 }
70
71 pub fn receive_message(&self, message: InnerMessage, sender: ActorRef) {
73 let inner = unwrap_inner!(self.inner_cell, {
74 warn!("A message was send to a ref to a stopped actor");
75 return;
76 });
77 inner.receive_message(message, sender);
78 inner.system.enqueue_actor(self.actor_ref());
79 }
80
81 pub fn receive_system_message(&self, system_message: SystemMessage) {
83 let inner = unwrap_inner!(self.inner_cell, {
84 warn!("A message was send to a ref to a stopped actor");
85 return;
86 });
87 inner.receive_system_message(system_message);
88 inner.system.enqueue_actor(self.actor_ref());
89 }
90
91 pub fn handle_envelope(&self) {
93 let inner = unwrap_inner!(self.inner_cell, {
94 warn!("A message was send to a ref to a stopped actor");
95 return;
96 });
97 inner.handle_envelope(self.clone());
98 }
99}
100
101pub trait ActorContext {
103 fn actor_ref(&self) -> ActorRef;
105
106 fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> Result<ActorRef, &'static str>;
108
109 fn tell<MessageTo: Message>(&self, to: ActorRef, message: MessageTo);
111
112 fn ask<MessageTo: Message>(&self, to: ActorRef, message: MessageTo, future_name: String) -> ActorRef;
115
116 fn complete<MessageTo: Message>(&self, to: ActorRef, complete: MessageTo);
118
119 fn forward_result<T: Message>(&self, future: ActorRef, to: ActorRef);
122
123 fn forward_result_to_future<T: Message>(&self, future: ActorRef, to: ActorRef);
127
128 fn do_computation<T: Message, F: Fn(Box<Any + Send>, ActorCell) -> T + Send + Sync + 'static>
131 (&self, future: ActorRef, closure: F);
132
133 fn stop(&self, actor_ref: ActorRef);
135
136 fn kill_me(&self);
138
139 fn sender(&self) -> ActorRef;
141
142 fn father(&self) -> ActorRef;
144
145 fn children(&self) -> HashMap<Arc<ActorPath>, ActorRef>;
147
148 fn monitoring(&self) -> HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)>;
150
151 fn monitored_by(&self) -> Vec<ActorRef>;
153
154 fn monitor(&self, actor: ActorRef, handler: FailureHandler);
156
157 fn path(&self) -> Arc<ActorPath>;
159
160 fn identify_actor(&self, logical_path: String, request_name: String) -> ActorRef;
165
166 fn tell_control(&self, actor: ActorRef, message: ControlMessage);
168
169 fn fail(&self, reason: &'static str);
171}
172
173impl ActorContext for ActorCell {
174 fn actor_ref(&self) -> ActorRef {
175 ActorRef::with_cell(self.clone(), self.path())
176 }
177
178 fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> Result<ActorRef, &'static str> {
179 let inner = unwrap_inner!(self.inner_cell, {
180 panic!("Tried to create an actor from the context of a no longer existing actor");
181 });
182
183 if name.find("/") != None {
185 return Err("Used a '/' in the name of an actor, this is not allowed");
186 }
187
188 let path = self.path().child(name);
189 info!("creating actor {}", path.logical_path());
190 let inner_cell = InnerActorCell::new(props,
191 inner.system.clone(),
192 self.actor_ref(),
193 path.clone());
194 let actor_cell = ActorCell { inner_cell: Ref::StrongRef(Arc::new(inner_cell)) };
195 let internal_ref = ActorRef::with_cell(actor_cell, path.clone());
196 let external_ref = internal_ref.clone();
197 inner.children.lock().unwrap().insert(path.clone(), internal_ref);
198 inner.monitoring.lock().unwrap().insert(path.clone(), (external_ref.clone(), Arc::new(InnerActorCell::restart_child)));
199 external_ref.receive_system_message(SystemMessage::Start);
200 if *(path.logical_path()) != "/system/name_resolver" {
203 self.tell(inner.system.name_resolver(), ResolveRequest::Add(external_ref.clone()));
204 }
205 Ok(external_ref)
206 }
207
208 fn tell<MessageTo: Message>(&self, to: ActorRef, message: MessageTo) {
209 let path = to.path();
211 match *path {
212 ActorPath::Local(_) => to.receive(InnerMessage::Message(Box::new(message)), self.actor_ref()),
213 ActorPath::Distant(ref path) => {
214 info!("Sent a message of size {} to distant actor {}:{}", mem::size_of::<MessageTo>(),
215 path.distant_logical_path(), path.addr_port());
216 },
217 }
218 }
219
220 fn ask<MessageTo: Message>(&self, to: ActorRef, message: MessageTo, name: String) -> ActorRef {
221 let future = self.actor_of(Props::new(Arc::new(Future::new), ()), name).unwrap();
222 future.tell_to(to, message);
223 future
224 }
225
226 fn complete<MessageTo: Message>(&self, future: ActorRef, complete: MessageTo) {
227 let path = future.path();
231 match *path {
232 ActorPath::Local(_) => future.receive(InnerMessage::Message(Box::new(Complete::new(Box::new(complete)))), self.actor_ref()),
233 ActorPath::Distant(ref path) => {
234 info!("Sent a message of size {} to distant future {}:{}", mem::size_of::<MessageTo>(),
235 path.distant_logical_path(), path.addr_port());
236 },
237 }
238 }
239
240 fn forward_result<T: Message>(&self, future: ActorRef, actor: ActorRef) {
241 self.tell(future, Computation::Forward(actor, Arc::new(move |value, context, to| {
242 let value = Box::<Any + Send>::downcast::<T>(value).expect("Message of the wrong type");
243 context.tell(to, *value);
244 FutureState::Extracted
245 })));
246 }
247
248 fn forward_result_to_future<T: Message>(&self, future: ActorRef, actor: ActorRef) {
249 self.tell(future, Computation::Forward(actor, Arc::new(move |value, context, to| {
250 let value = Box::<Any + Send>::downcast::<T>(value).expect("Message of the wrong type");
251 context.complete(to, *value);
252 FutureState::Extracted
253 })));
254 }
255
256 fn do_computation<T: Message, F: Fn(Box<Any + Send>, ActorCell) -> T + Send + Sync + 'static>
257 (&self, future: ActorRef, closure: F) {
258 self.tell(future, Computation::Computation(Arc::new(move |value, context| {
259 let v = closure(value, context);
260 FutureState::Computing(Box::new(v))
261 })));
262 }
263
264 fn sender(&self) -> ActorRef {
265 let inner = unwrap_inner!(self.inner_cell, {
266 panic!("Tried to get a sender from the context of a no longer existing actor");
267 });
268 let current_sender = inner.current_sender.lock().unwrap();
270 current_sender.as_ref().unwrap().clone()
271 }
272
273 fn tell_control(&self, actor: ActorRef, message: ControlMessage) {
274 let path = actor.path();
275 match *path {
276 ActorPath::Local(_) => actor.receive(InnerMessage::Control(message), self.actor_ref()),
277 ActorPath::Distant(_) => {},
278 }
279 }
280
281 fn stop(&self, actor_ref: ActorRef) {
282 self.tell_control(actor_ref, ControlMessage::PoisonPill);
283 }
284
285 fn kill_me(&self) {
286 self.tell_control(self.father(), ControlMessage::KillMe(self.actor_ref()));
287 }
288
289 fn father(&self) -> ActorRef {
290 let inner = unwrap_inner!(self.inner_cell, {
291 panic!("Tried to get the father from the context of a no longer existing actor");
292 });
293 inner.father.clone()
294 }
295
296 fn children(&self) -> HashMap<Arc<ActorPath>, ActorRef> {
297 let inner = unwrap_inner!(self.inner_cell, {
298 panic!("Tried to get the children from the context of a no longer existing actor");
299 });
300 let children = inner.children.lock().unwrap();
301 children.clone()
302 }
303
304 fn monitoring(&self) -> HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)> {
305 let inner = unwrap_inner!(self.inner_cell, {
306 panic!("Tried to get the monitored actors from the context of a no longer existing \
307 actor");
308 });
309 let monitoring = inner.monitoring.lock().unwrap();
310 monitoring.clone()
311 }
312
313 fn monitored_by(&self) -> Vec<ActorRef> {
314 let inner = unwrap_inner!(self.inner_cell, {
315 panic!("Tried to get the monitoring actors from the context of a no longer existing actor");
316 });
317 let monitored_by = inner.monitored_by.lock().unwrap();
318 monitored_by.clone()
319 }
320
321 fn monitor(&self, actor: ActorRef, handler: FailureHandler) {
322 let inner = unwrap_inner!(self.inner_cell, {
323 panic!("tried to have a no longer existing actor monitor an other actor?")
324 });
325 self.tell_control(actor.clone(), ControlMessage::RegisterMonitoring);
326 let mut monitoring = inner.monitoring.lock().unwrap();
327 monitoring.insert(actor.path(), (actor, handler));
328 }
329
330 fn path(&self) -> Arc<ActorPath> {
331 let inner = unwrap_inner!(self.inner_cell, {
332 panic!("Tried to get the path from the context of a no longer existing actor");
333 });
334 inner.path.clone()
335 }
336
337 fn identify_actor(&self, name: String, request_name: String) -> ActorRef {
338 let inner = unwrap_inner!(self.inner_cell, {
339 panic!("Tried to get the actor system of a no longer existing actor while resolving \
340 a path. This should *never* happen");
341 });
342 self.ask(inner.system.name_resolver(), ResolveRequest::Get(name), request_name)
343 }
344
345 fn fail(&self, reason: &'static str) {
346 let inner = unwrap_inner!(self.inner_cell, {
347 panic!("Tried to get the state of a no longer existing actor while resolving \
348 a path. This should *never* happen");
349 });
350 {*inner.actor_state.write().unwrap() = ActorState::Failed;}
351 for actor in self.monitored_by().iter() {
352 self.tell_control(actor.clone(), ControlMessage::Failure(Failure::new(self.actor_ref(), reason)));
353 }
354 }
355}
356
357#[derive(PartialEq, Copy, Clone)]
358enum ActorState {
360 Failed,
362 Running,
364 Unstarted,
366}
367
368struct Failsafe {
370 context: ActorCell,
371 active: bool,
372}
373
374impl Failsafe {
375 fn new(context: ActorCell) -> Failsafe {
376 Failsafe {
377 context: context,
378 active: true,
379 }
380 }
381
382 fn cancel(mut self) {
384 self.active = false;
385 }
386}
387
388impl Drop for Failsafe {
389 fn drop(&mut self) {
390 if self.active {
391 self.context.fail("panic");
392 }
393 }
394}
395
396#[derive(Clone, Copy)]
400pub enum SystemMessage {
401 Restart,
403
404 Start,
407}
408
409struct Envelope {
411 message: InnerMessage,
412 sender: ActorRef,
413}
414
415pub enum InnerMessage {
417 Message(Box<Any + Send>),
419
420 Control(ControlMessage),
422}
423
424#[derive(Clone)]
426pub enum ControlMessage {
427 PoisonPill,
430
431 Failure(Failure),
433
434 KillMe(ActorRef),
436
437 RegisterMonitoring,
439}
440
441#[derive(Clone)]
442pub struct Failure {
444 source: ActorRef,
445 reason: &'static str,
446}
447
448impl Failure {
449 fn new(source: ActorRef, reason: &'static str) -> Failure {
450 Failure {
451 source: source,
452 reason: reason,
453 }
454 }
455 pub fn actor(&self) -> ActorRef {self.source.clone()}
457 pub fn reason(&self) -> &'static str {self.reason}
459}
460
461struct InnerActorCell {
462 mailbox: Mutex<VecDeque<Envelope>>,
463 system_mailbox: Mutex<VecDeque<SystemMessage>>,
464 props: Arc<ActorFactory>,
465 system: ActorSystem,
466 path: Arc<ActorPath>,
467 current_sender: Mutex<Option<ActorRef>>,
468 busy: Mutex<()>,
469 father: ActorRef,
470 children: Mutex<HashMap<Arc<ActorPath>, ActorRef>>,
471 monitoring: Mutex<HashMap<Arc<ActorPath>, (ActorRef, FailureHandler)>>,
472 actor_state: Arc<RwLock<ActorState>>,
473 monitored_by: Mutex<Vec<ActorRef>>,
474 actor: RwLock<Arc<Actor>>,
475}
476
477impl InnerActorCell {
478 fn new(props: Arc<ActorFactory>,
480 system: ActorSystem,
481 father: ActorRef,
482 path: Arc<ActorPath>)
483 -> InnerActorCell {
484 InnerActorCell {
485 actor: RwLock::new(props.create()),
486 mailbox: Mutex::new(VecDeque::new()),
487 system_mailbox: Mutex::new(VecDeque::new()),
488 props: props,
489 system: system,
490 path: path,
491 current_sender: Mutex::new(None),
492 busy: Mutex::new(()),
493 father: father.clone(),
494 children: Mutex::new(HashMap::new()),
495 monitoring: Mutex::new(HashMap::new()),
496 actor_state: Arc::new(RwLock::new(ActorState::Unstarted)),
497 monitored_by: Mutex::new(vec![father.clone()]),
498 }
499 }
500
501 fn receive_envelope(&self, envelope: Envelope) {
502 self.mailbox.lock().unwrap().push_back(envelope);
503 }
504
505 fn receive_message(&self, message: InnerMessage, sender: ActorRef) {
506 self.receive_envelope(Envelope {
507 message: message,
508 sender: sender,
509 });
510 }
511
512 fn receive_system_message(&self, system_message: SystemMessage) {
513 self.system_mailbox.lock().unwrap().push_back(system_message);
514 }
515
516 fn handle_envelope(&self, context: ActorCell) {
517 let _lock = self.busy.lock();
519 let failsafe = Failsafe::new(context.clone());
520 if let Some(message) = self.system_mailbox.lock().unwrap().pop_front() {
526 match message {
527 SystemMessage::Restart => self.restart(context),
528 SystemMessage::Start => self.start(context),
529 }
530 failsafe.cancel();
531 return;
532 }
533
534 let state = {self.actor_state.read().unwrap().clone()};
535 if state == ActorState::Running {
536 let envelope = match self.mailbox.lock().unwrap().pop_front() {
537 Some(envelope) => envelope,
538 None => {
539 failsafe.cancel();
540 return;
541 }
542 };
543 {
544 let mut current_sender = self.current_sender.lock().unwrap();
545 *current_sender = Some(envelope.sender.clone());
546 };
547 {
548 let actor = self.actor.read().unwrap();
549 match envelope.message {
550 InnerMessage::Message(message) => {
551 actor.receive(message, context);
552 },
553 InnerMessage::Control(message) => {
554 match message {
555 ControlMessage::PoisonPill => context.kill_me(),
556 ControlMessage::Failure(failure) => {
557 let monitoring = self.monitoring.lock().unwrap();
558 let handler = monitoring.get(&failure.actor().path())
559 .expect("Received a failure notification from an unknown actor");
560 (*handler.1)(failure, context);
561 },
562 ControlMessage::KillMe(actor_ref) => self.kill(actor_ref, context),
563 ControlMessage::RegisterMonitoring => {
564 let mut mon = self.monitored_by.lock().unwrap();
565 mon.push(context.sender());
566 },
567 }
568 }
569 }
570 }
571 } else {
572 self.system.enqueue_actor(context.actor_ref());
573 }
574
575 failsafe.cancel();
576 }
577
578 fn kill(&self, actor: ActorRef, context: ActorCell) {
579 self.children.lock().unwrap().remove(&actor.path()).expect(&format!("actor {} was asked to kill {} and cannot do that",
580 self.path.logical_path(),
581 actor.path().logical_path()));
582 context.tell(self.system.name_resolver(), ResolveRequest::Remove(actor.path()));
583 }
584
585 fn start(&self, context: ActorCell) {
586 self.actor.write().unwrap().pre_start(context);
587 *self.actor_state.write().unwrap() = ActorState::Running;
588 }
589
590 fn restart(&self, context: ActorCell) {
591 let mut actor = self.actor.write().unwrap();
592 actor.pre_restart(context.clone());
593 *actor = self.props.create();
594 actor.post_restart(context);
595 *self.actor_state.write().unwrap() = ActorState::Running;
596 }
597
598 fn restart_child(failure: Failure, _context: ActorCell) {
599 failure.actor().receive_system_message(SystemMessage::Restart);
600 }
601}
602
603impl Drop for InnerActorCell {
604 fn drop(&mut self) {
605 let actor = self.actor.write().unwrap();
608 info!("Actor {} is dropped", *self.path.logical_path());
609 actor.post_stop();
610 }
611}