crb_agent/
address.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use crate::agent::Agent;
use crate::context::Context;
use anyhow::{Error, Result};
use async_trait::async_trait;
use crb_core::{mpsc, watch};
use crb_send::{Recipient, Sender};

pub struct AddressJoint<A: Agent + ?Sized> {
    msg_rx: mpsc::UnboundedReceiver<Envelope<A>>,
    status_tx: watch::Sender<AgentStatus>,
}

impl<A: Agent> AddressJoint<A> {
    pub fn new_pair() -> (Address<A>, AddressJoint<A>) {
        let (msg_tx, msg_rx) = mpsc::unbounded_channel();
        let (status_tx, status_rx) = watch::channel(AgentStatus::Active);
        let address = Address { msg_tx, status_rx };
        let joint = AddressJoint { msg_rx, status_tx };
        (address, joint)
    }

    pub fn report(&mut self, interrupted: bool) -> Result<()> {
        let status = if interrupted {
            AgentStatus::Interrupted
        } else {
            AgentStatus::Done
        };
        self.status_tx.send(status)?;
        Ok(())
    }

    pub async fn next_envelope(&mut self) -> Option<Envelope<A>> {
        self.msg_rx.recv().await
    }

    pub fn close(&mut self) {
        self.msg_rx.close();
    }
}

pub struct Address<A: Agent + ?Sized> {
    msg_tx: mpsc::UnboundedSender<Envelope<A>>,
    status_rx: watch::Receiver<AgentStatus>,
}

impl<A: Agent> Address<A> {
    pub fn send(&self, msg: impl MessageFor<A>) -> Result<()> {
        self.msg_tx
            .send(Box::new(msg))
            .map_err(|_| Error::msg("Can't send the message to the actor"))
    }

    /// Important! `join` must use a reference to allow using it under `DerefMut` trait
    pub async fn join(&mut self) -> Result<AgentStatus> {
        let status = self.status_rx.wait_for(AgentStatus::is_finished).await?;
        Ok(status.clone())
    }
}

impl<A: Agent> Clone for Address<A> {
    fn clone(&self) -> Self {
        Self {
            msg_tx: self.msg_tx.clone(),
            status_rx: self.status_rx.clone(),
        }
    }
}

impl<A, M> Sender<M> for Address<A>
where
    A: Agent,
    M: MessageFor<A>,
{
    fn send(&self, input: M) -> Result<(), Error> {
        Address::send(self, input)
    }
}

impl<A: Agent> Address<A> {
    pub fn sender<M>(&self) -> Recipient<M>
    where
        M: MessageFor<A>,
    {
        Recipient::new(self.clone())
    }
}

#[derive(PartialEq, Eq, Clone)]
pub enum AgentStatus {
    Active,
    Interrupted,
    Done,
}

impl AgentStatus {
    pub fn is_finished(&self) -> bool {
        matches!(self, Self::Interrupted | Self::Done)
    }
}

pub type Envelope<A> = Box<dyn MessageFor<A>>;

#[async_trait]
pub trait MessageFor<A: Agent>: Send + 'static {
    async fn handle(self: Box<Self>, actor: &mut A, ctx: &mut Context<A>) -> Result<()>;
}