1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
use futures::{Future, Async, Poll}; use actor::{Actor, Supervised}; use arbiter::Arbiter; use address::{sync_channel, ActorAddress, Addr, Syn}; use context::Context; use mailbox::DEFAULT_CAPACITY; use msgs::Execute; /// Actor supervisor /// /// Supervisor manages incoming message for actor. In case of actor failure, supervisor /// creates new execution context and restarts actor lifecycle. Supervisor does not /// does not re-create actor, it just calls `restarting()` method. /// /// Supervisor has same lifecycle as actor. In situation when all addresses to supervisor /// get dropped and actor does not execute anything, supervisor terminates. /// /// `Supervisor` can not guarantee that actor successfully process incoming message. /// If actor fails during message processing, this message can not be recovered. Sender /// would receive `Err(Cancelled)` error in this situation. /// /// ## Example /// /// ```rust /// # #[macro_use] extern crate actix; /// # use actix::prelude::*; /// #[derive(Message)] /// struct Die; /// /// struct MyActor; /// /// impl Actor for MyActor { /// type Context = Context<Self>; /// } /// /// // To use actor with supervisor actor has to implement `Supervised` trait /// impl actix::Supervised for MyActor { /// fn restarting(&mut self, ctx: &mut Context<MyActor>) { /// println!("restarting"); /// } /// } /// /// impl Handler<Die> for MyActor { /// type Result = (); /// /// fn handle(&mut self, _: Die, ctx: &mut Context<MyActor>) { /// ctx.stop(); /// # Arbiter::system().do_send(actix::msgs::SystemExit(0)); /// } /// } /// /// fn main() { /// let sys = System::new("test"); /// /// let addr: Addr<Unsync, _> = actix::Supervisor::start(|_| MyActor); /// /// addr.do_send(Die); /// sys.run(); /// } /// ``` pub struct Supervisor<A> where A: Supervised + Actor<Context=Context<A>> { ctx: A::Context } impl<A> Supervisor<A> where A: Supervised + Actor<Context=Context<A>> { /// Start new supervised actor in current Arbiter. /// /// Type of returned address depends on variable type. For example to get `Addr<Syn, _>` /// of newly created actor, use explicitly `Addr<Syn, _>` type as type of a variable. /// /// ```rust /// # #[macro_use] extern crate actix; /// # use actix::prelude::*; /// struct MyActor; /// /// impl Actor for MyActor { /// type Context = Context<Self>; /// } /// /// # impl actix::Supervised for MyActor {} /// # fn main() { /// # let sys = System::new("test"); /// // Get `Addr<Unsync, _>` of a MyActor actor /// let addr1: Addr<Unsync, _> = actix::Supervisor::start(|_| MyActor); /// /// // Get `Addr<Syn, _>` of a MyActor actor /// let addr2: Addr<Syn, _> = actix::Supervisor::start(|_| MyActor); /// # } /// ``` pub fn start<Addr, F>(f: F) -> Addr where F: FnOnce(&mut A::Context) -> A + 'static, A: Actor<Context=Context<A>> + ActorAddress<A, Addr> { // create actor let mut ctx = Context::new(None); let act = f(&mut ctx); let addr = <A as ActorAddress<A, Addr>>::get(&mut ctx); ctx.set_actor(act); // create supervisor Arbiter::handle().spawn(Supervisor::<A>{ ctx }); addr } /// Start new supervised actor in arbiter's thread. pub fn start_in<F>(addr: &Addr<Syn, Arbiter>, f: F) -> Addr<Syn, A> where A: Actor<Context=Context<A>>, F: FnOnce(&mut Context<A>) -> A + Send + 'static { let (tx, rx) = sync_channel::channel(DEFAULT_CAPACITY); addr.do_send(Execute::new(move || -> Result<(), ()> { let mut ctx = Context::with_receiver(None, rx); let act = f(&mut ctx); ctx.set_actor(act); Arbiter::handle().spawn(Supervisor::<A>{ ctx }); Ok(()) })); Addr::new(tx) } } #[doc(hidden)] impl<A> Future for Supervisor<A> where A: Supervised + Actor<Context=Context<A>> { type Item = (); type Error = (); fn poll(&mut self) -> Poll<Self::Item, Self::Error> { loop { match self.ctx.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(_)) | Err(_) => { // stop if context's address is not connected if !self.ctx.restart() { return Ok(Async::Ready(())) } } } } } }