1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
use std::time::Duration;

use actix_rt::ArbiterHandle;
use futures_core::stream::Stream;
use log::error;

use crate::address::{channel, Addr};
use crate::context::Context;
use crate::contextitems::{ActorDelayedMessageItem, ActorMessageItem, ActorMessageStreamItem};
use crate::fut::{ActorFuture, ActorStreamExt};
use crate::handler::{Handler, Message};
use crate::mailbox::DEFAULT_CAPACITY;
use crate::stream::StreamHandler;
use crate::utils::{IntervalFunc, TimerFunc};

/// Actors are objects which encapsulate state and behavior.
///
/// Actors run within a specific execution context
/// [`Context<A>`](struct.Context.html). The context object is available
/// only during execution. Each actor has a separate execution
/// context. The execution context also controls the lifecycle of an
/// actor.
///
/// Actors communicate exclusively by exchanging messages. The sender
/// actor can wait for a response. Actors are not referenced directly,
/// but by address [`Addr`](struct.Addr.html) To be able to handle a
/// specific message actor has to provide a
/// [`Handler<M>`](trait.Handler.html) implementation for this
/// message. All messages are statically typed. A message can be
/// handled in asynchronous fashion. An actor can spawn other actors
/// or add futures or streams to the execution context. The actor
/// trait provides several methods that allow controlling the actor
/// lifecycle.
///
/// # Actor lifecycle
///
/// ## Started
///
/// An actor starts in the `Started` state, during this state the
/// `started` method gets called.
///
/// ## Running
///
/// After an actor's `started` method got called, the actor
/// transitions to the `Running` state. An actor can stay in the
/// `running` state for an indefinite amount of time.
///
/// ## Stopping
///
/// The actor's execution state changes to `stopping` in the following
/// situations:
///
/// * `Context::stop` gets called by actor itself
/// * all addresses to the actor get dropped
/// * no evented objects are registered in its context.
///
/// An actor can return from the `stopping` state to the `running`
/// state by creating a new address or adding an evented object, like
/// a future or stream, in its `Actor::stopping` method.
///
/// If an actor changed to a `stopping` state because
/// `Context::stop()` got called, the context then immediately stops
/// processing incoming messages and calls the `Actor::stopping()`
/// method. If an actor does not return back to a `running` state,
/// all unprocessed messages get dropped.
///
/// ## Stopped
///
/// If an actor does not modify execution context while in stopping
/// state, the actor state changes to `Stopped`. This state is
/// considered final and at this point the actor gets dropped.
#[allow(unused_variables)]
pub trait Actor: Sized + Unpin + 'static {
    /// Actor execution context type
    type Context: ActorContext;

    /// Called when an actor gets polled the first time.
    fn started(&mut self, ctx: &mut Self::Context) {}

    /// Called after an actor is in `Actor::Stopping` state.
    ///
    /// There can be several reasons for stopping:
    ///
    /// - `Context::stop` gets called by the actor itself.
    /// - All addresses to the current actor get dropped and no more
    ///   evented objects are left in the context.
    ///
    /// An actor can return from the stopping state to the running
    /// state by returning `Running::Continue`.
    fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
        Running::Stop
    }

    /// Called after an actor is stopped.
    ///
    /// This method can be used to perform any needed cleanup work or
    /// to spawn more actors. This is the final state, after this
    /// method got called, the actor will be dropped.
    fn stopped(&mut self, ctx: &mut Self::Context) {}

    /// Start a new asynchronous actor, returning its address.
    ///
    /// # Examples
    ///
    /// ```
    /// use actix::*;
    ///
    /// struct MyActor;
    /// impl Actor for MyActor {
    ///     type Context = Context<Self>;
    /// }
    ///
    /// fn main() {
    ///     // initialize system
    ///     System::new().block_on(async {
    ///         let addr = MyActor.start(); // <- start actor and get its address
    /// #       System::current().stop();
    ///     });
    /// }
    /// ```
    fn start(self) -> Addr<Self>
    where
        Self: Actor<Context = Context<Self>>,
    {
        Context::new().run(self)
    }

    /// Construct and start a new asynchronous actor, returning its
    /// address.
    ///
    /// This is constructs a new actor using the `Default` trait, and
    /// invokes its `start` method.
    fn start_default() -> Addr<Self>
    where
        Self: Actor<Context = Context<Self>> + Default,
    {
        Self::default().start()
    }

    /// Start new actor in arbiter's thread.
    fn start_in_arbiter<F>(wrk: &ArbiterHandle, f: F) -> Addr<Self>
    where
        Self: Actor<Context = Context<Self>>,
        F: FnOnce(&mut Context<Self>) -> Self + Send + 'static,
    {
        let (tx, rx) = channel::channel(DEFAULT_CAPACITY);

        // create actor
        wrk.spawn_fn(move || {
            let mut ctx = Context::with_receiver(rx);
            let act = f(&mut ctx);
            let fut = ctx.into_future(act);

            actix_rt::spawn(fut);
        });

        Addr::new(tx)
    }

    /// Start a new asynchronous actor given a `Context`.
    ///
    /// Use this method if you need the `Context` object during actor
    /// initialization.
    ///
    /// # Examples
    ///
    /// ```
    /// use actix::*;
    ///
    /// struct MyActor {
    ///     val: usize,
    /// }
    /// impl Actor for MyActor {
    ///     type Context = Context<Self>;
    /// }
    ///
    /// fn main() {
    ///     // initialize system
    ///     System::new().block_on(async {
    ///         let addr = MyActor::create(|ctx: &mut Context<MyActor>| MyActor { val: 10 });
    /// #       System::current().stop();
    ///     });
    /// }
    /// ```
    fn create<F>(f: F) -> Addr<Self>
    where
        Self: Actor<Context = Context<Self>>,
        F: FnOnce(&mut Context<Self>) -> Self,
    {
        let mut ctx = Context::new();
        let act = f(&mut ctx);
        ctx.run(act)
    }
}

#[allow(unused_variables)]
/// Actors with the ability to restart after failure.
///
/// Supervised actors can be managed by a
/// [`Supervisor`](struct.Supervisor.html). As an additional lifecycle
/// event, the `restarting` method can be implemented.
///
/// If a supervised actor fails, its supervisor creates new execution
/// context and restarts the actor, invoking its `restarting` method.
/// After a call to this method, the actor's execution state changes
/// to `Started` and the regular lifecycle process starts.
///
/// The `restarting` method gets called with the newly constructed
/// `Context` object.
pub trait Supervised: Actor {
    /// Called when the supervisor restarts a failed actor.
    fn restarting(&mut self, ctx: &mut <Self as Actor>::Context) {}
}

/// Actor execution state
#[derive(PartialEq, Debug, Copy, Clone)]
pub enum ActorState {
    /// Actor is started.
    Started,
    /// Actor is running.
    Running,
    /// Actor is stopping.
    Stopping,
    /// Actor is stopped.
    Stopped,
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Running {
    Stop,
    Continue,
}

impl ActorState {
    /// Indicates whether the actor is alive.
    pub fn alive(self) -> bool {
        self == ActorState::Started || self == ActorState::Running
    }
    /// Indicates whether the actor is stopped or stopping.
    pub fn stopping(self) -> bool {
        self == ActorState::Stopping || self == ActorState::Stopped
    }
}

/// Actor execution context.
///
/// Each actor runs within a specific execution context. The actor's
/// associated type `Actor::Context` defines the context to use for
/// the actor, and must implement the `ActorContext` trait.
///
/// The execution context defines the type of execution, and the
/// actor's communication channels (message handling).
pub trait ActorContext: Sized {
    /// Immediately stop processing incoming messages and switch to a
    /// `stopping` state. This only affects actors that are currently
    /// `running`. Future attempts to queue messages will fail.
    fn stop(&mut self);

    /// Terminate actor execution unconditionally. This sets the actor
    /// into the `stopped` state. This causes future attempts to queue
    /// messages to fail.
    fn terminate(&mut self);

    /// Retrieve the current Actor execution state.
    fn state(&self) -> ActorState;
}

/// Asynchronous execution context.
pub trait AsyncContext<A>: ActorContext
where
    A: Actor<Context = Self>,
{
    /// Returns the address of the context.
    fn address(&self) -> Addr<A>;

    /// Spawns a future into the context.
    ///
    /// Returns a handle of the spawned future, which can be used for
    /// cancelling its execution.
    ///
    /// All futures spawned into an actor's context are cancelled
    /// during the actor's stopping stage.
    fn spawn<F>(&mut self, fut: F) -> SpawnHandle
    where
        F: ActorFuture<A, Output = ()> + 'static;

    /// Spawns a future into the context, waiting for it to resolve.
    ///
    /// This stops processing any incoming events until the future
    /// resolves.
    fn wait<F>(&mut self, fut: F)
    where
        F: ActorFuture<A, Output = ()> + 'static;

    /// Checks if the context is paused (waiting for future completion or stopping).
    fn waiting(&self) -> bool;

    /// Cancels a spawned future.
    ///
    /// The `handle` is a value returned by the `spawn` method.
    fn cancel_future(&mut self, handle: SpawnHandle) -> bool;

    /// Registers a stream with the context.
    ///
    /// This allows handling a `Stream` in a way similar to normal
    /// actor messages.
    ///
    /// ```
    /// # use std::io;
    /// use actix::prelude::*;
    /// use futures_util::stream::once;
    ///
    /// #[derive(Message)]
    /// #[rtype(result = "()")]
    /// struct Ping;
    ///
    /// struct MyActor;
    ///
    /// impl StreamHandler<Ping> for MyActor {
    ///
    ///     fn handle(&mut self, item: Ping, ctx: &mut Context<MyActor>) {
    ///         println!("PING");
    ///         System::current().stop();
    ///     }
    ///
    ///     fn finished(&mut self, ctx: &mut Self::Context) {
    ///         println!("finished");
    ///     }
    /// }
    ///
    /// impl Actor for MyActor {
    ///    type Context = Context<Self>;
    ///
    ///    fn started(&mut self, ctx: &mut Context<Self>) {
    ///        // add stream
    ///        ctx.add_stream(once(async { Ping }));
    ///    }
    /// }
    ///
    /// fn main() {
    ///     let mut sys = System::new();
    ///     let addr = sys.block_on(async { MyActor.start() });
    ///     sys.run();
    ///  }
    /// ```
    fn add_stream<S>(&mut self, fut: S) -> SpawnHandle
    where
        S: Stream + 'static,
        A: StreamHandler<S::Item>,
    {
        <A as StreamHandler<S::Item>>::add_stream(fut, self)
    }

    /// Registers a stream with the context, ignoring errors.
    ///
    /// This method is similar to `add_stream` but it skips stream
    /// errors.
    ///
    /// ```
    /// use actix::prelude::*;
    /// use futures_util::stream::once;
    ///
    /// #[derive(Message)]
    /// #[rtype(result = "()")]
    /// struct Ping;
    ///
    /// struct MyActor;
    ///
    /// impl Handler<Ping> for MyActor {
    ///     type Result = ();
    ///
    ///     fn handle(&mut self, msg: Ping, ctx: &mut Context<MyActor>) {
    ///         println!("PING");
    /// #       System::current().stop();
    ///     }
    /// }
    ///
    /// impl Actor for MyActor {
    ///     type Context = Context<Self>;
    ///
    ///     fn started(&mut self, ctx: &mut Context<Self>) {
    ///         // add messages stream
    ///         ctx.add_message_stream(once(async { Ping }));
    ///     }
    /// }
    ///
    /// fn main() {
    ///    System::new().block_on(async {
    ///        let addr = MyActor.start();
    ///    });
    /// }
    /// ```
    fn add_message_stream<S>(&mut self, fut: S)
    where
        S: Stream + 'static,
        S::Item: Message,
        A: Handler<S::Item>,
    {
        if self.state() == ActorState::Stopped {
            error!("Context::add_message_stream called for stopped actor.");
        } else {
            self.spawn(ActorMessageStreamItem::new(fut));
        }
    }

    /// Sends the message `msg` to self. This bypasses the mailbox capacity, and
    /// will always queue the message. If the actor is in the `stopped` state, an
    /// error will be raised.
    fn notify<M>(&mut self, msg: M)
    where
        A: Handler<M>,
        M: Message + 'static,
    {
        if self.state() == ActorState::Stopped {
            error!("Context::notify called for stopped actor.");
        } else {
            self.spawn(ActorMessageItem::new(msg));
        }
    }

    /// Sends the message `msg` to self after a specified period of time.
    ///
    /// Returns a spawn handle which can be used for cancellation. The
    /// notification gets cancelled if the context's stop method gets
    /// called. This bypasses the mailbox capacity, and
    /// will always queue the message. If the actor is in the `stopped` state, an
    /// error will be raised.
    fn notify_later<M>(&mut self, msg: M, after: Duration) -> SpawnHandle
    where
        A: Handler<M>,
        M: Message + 'static,
    {
        if self.state() == ActorState::Stopped {
            error!("Context::notify_later called for stopped actor.");
            SpawnHandle::default()
        } else {
            self.spawn(ActorDelayedMessageItem::new(msg, after))
        }
    }

    /// Executes a closure after a specified period of time.
    ///
    /// The closure gets passed the same actor and its
    /// context. Execution gets cancelled if the context's stop method
    /// gets called.
    fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
    where
        F: FnOnce(&mut A, &mut A::Context) + 'static,
    {
        self.spawn(TimerFunc::new(dur, f))
    }

    /// Spawns a job to execute the given closure periodically, at a
    /// specified fixed interval.
    fn run_interval<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
    where
        F: FnMut(&mut A, &mut A::Context) + 'static,
    {
        self.spawn(IntervalFunc::new(dur, f).finish())
    }
}

/// A handle to a spawned future.
///
/// Can be used to cancel the future.
#[derive(Eq, PartialEq, Debug, Copy, Clone, Hash)]
pub struct SpawnHandle(usize);

impl SpawnHandle {
    /// Gets the next handle.
    pub fn next(self) -> SpawnHandle {
        SpawnHandle(self.0 + 1)
    }
    #[doc(hidden)]
    pub fn into_usize(self) -> usize {
        self.0
    }
}

impl Default for SpawnHandle {
    fn default() -> SpawnHandle {
        SpawnHandle(0)
    }
}