1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use std::{
    any::{type_name, Any},
    collections::VecDeque,
};

use anyhow::{Context as _, Error};
use tracing::{event, Level};

use crate::{Actor, Context, State};

pub trait AnyActorEntry {
    fn debug_name(&self) -> &'static str;

    /// Add a message to be handled to the actor's internal queue.
    fn enqueue(&mut self, slot: &mut dyn Any) -> Result<(), Error>;

    /// Process pending messages.
    fn process(&mut self, ctx: &mut Context);
}

pub struct ActorEntry<S>
where
    S: Actor,
{
    actor: S,
    state: State<S>,
}

impl<A> ActorEntry<A>
where
    A: Actor,
{
    pub fn new(actor: A) -> Self {
        Self {
            actor,
            state: State {
                queue: VecDeque::new(),
            },
        }
    }
}

impl<S> AnyActorEntry for ActorEntry<S>
where
    S: Actor,
{
    fn debug_name(&self) -> &'static str {
        type_name::<S>()
    }

    fn enqueue(&mut self, slot: &mut dyn Any) -> Result<(), Error> {
        // Take the message out
        let slot: &mut Option<S::Message> =
            slot.downcast_mut().context("incorrect message type")?;
        let message = slot.take().context("message not in slot")?;

        self.state.queue.push_back(message);

        Ok(())
    }

    fn process(&mut self, ctx: &mut Context) {
        let result = self.actor.process(ctx, &mut self.state);

        if !self.state.queue.is_empty() {
            event!(Level::WARN, "actor did not process all pending messages");
        }

        match result {
            Ok(value) => value,
            Err(error) => {
                // TODO: What to do with this?
                event!(Level::ERROR, ?error, "actor failed while processing");
            }
        }
    }
}