1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
use std::{
    future::Future,
    pin::Pin,
    task::{self, Poll},
};

use actix_rt::ArbiterHandle;
use pin_project_lite::pin_project;

use crate::{
    actor::{Actor, AsyncContext, Supervised},
    address::{channel, Addr},
    context::Context,
    context_impl::ContextFut,
    mailbox::DEFAULT_CAPACITY,
};

pin_project! {
    /// Actor supervisor
    ///
    /// A Supervisor manages incoming messages for an actor. In case of actor failure,
    /// the supervisor creates a new execution context and restarts the actor's lifecycle.
    /// A Supervisor does not re-create their actor, it just calls the `restarting()`
    /// method.
    ///
    /// Supervisors have the same lifecycle as actors. If all addresses to
    /// a supervisor gets dropped and its actor does not execute anything, the supervisor
    /// terminates.
    ///
    /// Supervisors can not guarantee that their actors successfully processes incoming
    /// messages. If the actor fails during message processing, the message can not be
    /// recovered. The sender would receive an `Err(Cancelled)` error in this situation.
    ///
    /// # Examples
    ///
    /// ```
    /// # use actix::prelude::*;
    /// #[derive(Message)]
    /// #[rtype(result = "()")]
    /// struct Die;
    ///
    /// struct MyActor;
    ///
    /// impl Actor for MyActor {
    ///     type Context = Context<Self>;
    /// }
    ///
    /// // To use actor with supervisor actor has to implement `Supervised` trait
    /// impl actix::Supervised for MyActor {
    ///     fn restarting(&mut self, ctx: &mut Context<MyActor>) {
    ///         println!("restarting");
    ///     }
    /// }
    ///
    /// impl Handler<Die> for MyActor {
    ///     type Result = ();
    ///
    ///     fn handle(&mut self, _: Die, ctx: &mut Context<MyActor>) {
    ///         ctx.stop();
    /// #       System::current().stop();
    ///     }
    /// }
    ///
    /// fn main() {
    ///     let mut sys = System::new();
    ///
    ///     let addr = sys.block_on(async { actix::Supervisor::start(|_| MyActor) });
    ///     addr.do_send(Die);
    ///
    ///     sys.run();
    /// }
    /// ```
    #[derive(Debug)]
    pub struct Supervisor<A>
    where
        A: Supervised,
        A: Actor<Context = Context<A>>
    {
        #[pin]
        fut: ContextFut<A, Context<A>>,
    }
}

impl<A> Supervisor<A>
where
    A: Supervised + Actor<Context = Context<A>>,
{
    /// Start new supervised actor in current tokio runtime.
    ///
    /// Type of returned address depends on variable type. For example to get
    /// `Addr<Syn, _>` of newly created actor, use explicitly `Addr<Syn,
    /// _>` type as type of a variable.
    ///
    /// ```
    /// # use actix::prelude::*;
    /// struct MyActor;
    ///
    /// impl Actor for MyActor {
    ///     type Context = Context<Self>;
    /// }
    ///
    /// # impl actix::Supervised for MyActor {}
    /// # fn main() {
    /// #    System::new().block_on(async {
    /// // Get `Addr` of a MyActor actor
    /// let addr = actix::Supervisor::start(|_| MyActor);
    /// #         System::current().stop();
    /// # });}
    /// ```
    pub fn start<F>(f: F) -> Addr<A>
    where
        F: FnOnce(&mut A::Context) -> A + 'static,
        A: Actor<Context = Context<A>>,
    {
        // create actor
        let mut ctx = Context::new();
        let act = f(&mut ctx);
        let addr = ctx.address();
        let fut = ctx.into_future(act);

        // create supervisor
        actix_rt::spawn(Self { fut });

        addr
    }

    /// Start new supervised actor in arbiter's thread.
    pub fn start_in_arbiter<F>(sys: &ArbiterHandle, f: F) -> Addr<A>
    where
        A: Actor<Context = Context<A>>,
        F: FnOnce(&mut Context<A>) -> A + Send + 'static,
    {
        let (tx, rx) = channel::channel(DEFAULT_CAPACITY);

        sys.spawn_fn(move || {
            let mut ctx = Context::with_receiver(rx);
            let act = f(&mut ctx);
            let fut = ctx.into_future(act);

            actix_rt::spawn(Self { fut });
        });

        Addr::new(tx)
    }
}

#[doc(hidden)]
impl<A> Future for Supervisor<A>
where
    A: Supervised + Actor<Context = Context<A>>,
{
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        loop {
            match this.fut.as_mut().poll(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(_) => {
                    // stop if context's address is not connected
                    if !this.fut.restart() {
                        return Poll::Ready(());
                    }
                }
            }
        }
    }
}