1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
use crate::envelope::MessageEnvelope;
use crate::{Actor, Address, Context, WeakAddress};
use futures::channel::mpsc;
use futures::StreamExt;
use std::sync::Arc;

/// A message that can be sent by an [`Address`](struct.Address.html) to the [`ActorManager`](struct.ActorManager.html)
pub(crate) enum ManagerMessage<A: Actor> {
    /// The address sending this is being dropped and is the only external strong address in existence
    /// other than the one held by the [`Context`](struct.Context.html). This notifies the
    /// [`ActorManager`](struct.ActorManager.html) so that it can check if the actor should be
    /// dropped
    LastAddress,
    /// A message being sent to the actor. To read about envelopes and why we use them, check out
    /// `envelope.rs`
    Message(Box<dyn MessageEnvelope<Actor = A>>),
    /// A notification queued with `Context::notify_later`
    LateNotification(Box<dyn MessageEnvelope<Actor = A>>),
}

/// If and how to continue the manage loop
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub(crate) enum ContinueManageLoop {
    Yes,
    ExitImmediately,
    ProcessNotifications,
}

/// A manager for the actor which handles incoming messages and stores the context. Its managing
/// loop can be started with [`ActorManager::manage`](struct.ActorManager.html#method.manage).
pub struct ActorManager<A: Actor> {
    actor: A,
    ctx: Context<A>,
}

impl<A: Actor> Drop for ActorManager<A> {
    fn drop(&mut self) {
        self.actor.stopped(&mut self.ctx);
    }
}

impl<A: Actor> ActorManager<A> {
    /// Return the actor and its address in ready-to-run the actor by returning its address and
    /// its manager. The `ActorManager::manage` future has to be executed for the actor to actually
    /// start.
    pub(crate) fn start(actor: A) -> (Address<A>, ActorManager<A>) {
        let (sender, receiver) = mpsc::unbounded();
        let ref_counter = Arc::new(());
        let addr = WeakAddress {
            sender: sender.clone(),
            ref_counter: Arc::downgrade(&ref_counter),
        };
        let ctx = Context::new(addr, receiver, ref_counter.clone());

        let mgr = ActorManager { actor, ctx };

        let addr = Address {
            sender,
            ref_counter,
        };

        (addr, mgr)
    }

    /// Starts the manager loop. This will start the actor and allow it to respond to messages.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use xtra::prelude::*;
    /// struct MyActor;
    /// impl Actor for MyActor {}
    ///
    /// #[smol_potat::main]
    /// async fn main() {
    ///     let (addr, mgr) = MyActor.create();
    ///     smol::Task::spawn(mgr.manage()).detach(); // Actually spawn the actor onto an executor
    /// }
    /// ```
    pub async fn manage(mut self) {
        self.actor.started(&mut self.ctx);

        // Idk why anyone would do this, but we have to check that they didn't do ctx.stop() in the
        // started method, otherwise it would kinda be a bug
        if !self.ctx.check_running(&mut self.actor) {
            return;
        }

        // Listen for any messages for the ActorManager
        while let Some(msg) = self.ctx.receiver.next().await {
            match self.ctx.handle_message(msg, &mut self.actor).await {
                ContinueManageLoop::Yes => {}
                ContinueManageLoop::ProcessNotifications => break,
                ContinueManageLoop::ExitImmediately => return,
            }
        }

        // Handle any last late notifications that were sent after the last strong address was dropped
        // We can't .await, because that would mean that we are awaiting forever! So, instead, we do
        // `next_message` and check if the result is `Ok`. Because we know that any late notifications
        // sent from the context must be fully send by now due to it being marked as stopped (so
        // that no other addresses can be created and sending concurrently), we can make the inference
        // that if `next_message` returns `Err`, there are no more late notifications to handle.
        while let Ok(Some(msg)) = self.ctx.receiver.try_next() {
            let res = self.ctx.handle_message(msg, &mut self.actor).await;
            if res == ContinueManageLoop::ExitImmediately {
                break;
            }
        }
    }
}