crb_agent/
address.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use crate::agent::Agent;
use anyhow::{Error, Result};
use async_trait::async_trait;
use crb_core::{mpsc, watch};

pub struct AddressJoint<A: Agent> {
    msg_rx: mpsc::UnboundedReceiver<Envelope<A>>,
    status_tx: watch::Sender<AgentStatus<A>>,
}

impl<A: Agent> AddressJoint<A> {
    pub fn new_pair() -> (Address<A>, AddressJoint<A>) {
        let (msg_tx, msg_rx) = mpsc::unbounded_channel();
        let (status_tx, status_rx) = watch::channel(AgentStatus::Active);
        let address = Address { msg_tx, status_rx };
        let joint = AddressJoint { msg_rx, status_tx };
        (address, joint)
    }

    pub async fn next_envelope(&mut self) -> Option<Envelope<A>> {
        self.msg_rx.recv().await
    }

    pub fn report(&mut self, output: A::Output) -> Result<()> {
        let status = AgentStatus::Done(output);
        self.status_tx.send(status).map_err(Error::from)
    }

    pub fn close(&mut self) {
        self.msg_rx.close();
    }
}

pub struct Address<A: Agent> {
    msg_tx: mpsc::UnboundedSender<Envelope<A>>,
    status_rx: watch::Receiver<AgentStatus<A>>,
}

impl<A: Agent> Address<A> {
    pub fn send(&self, msg: impl MessageFor<A>) -> Result<()> {
        self.msg_tx
            .send(Box::new(msg))
            .map_err(|_| Error::msg("Can't send the message to the actor"))
    }

    pub async fn join(&mut self) -> Result<A::Output, Error> {
        let status = self.status_rx.wait_for(AgentStatus::is_done).await?;
        status
            .take()
            .ok_or_else(|| Error::msg("Can't extract the output from the agent"))
    }
}

impl<A: Agent> Clone for Address<A> {
    fn clone(&self) -> Self {
        Self {
            msg_tx: self.msg_tx.clone(),
            status_rx: self.status_rx.clone(),
        }
    }
}

#[derive(PartialEq, Eq)]
pub enum AgentStatus<T: Agent> {
    Active,
    Done(T::Output),
}

impl<T: Agent> AgentStatus<T> {
    pub fn is_done(&self) -> bool {
        matches!(self, Self::Done(_))
    }

    pub fn take(&self) -> Option<T::Output> {
        match self {
            Self::Active => None,
            Self::Done(value) => Some(value.clone()),
        }
    }
}

pub type Envelope<A> = Box<dyn MessageFor<A>>;

#[async_trait]
pub trait MessageFor<A: Agent + ?Sized>: Send + 'static {
    async fn handle(self: Box<Self>, actor: &mut A, ctx: &mut A::Context) -> Result<()>;
}