crb_agent/
address.rs

1use crate::agent::Agent;
2use crate::context::Context;
3use anyhow::{Error, Result};
4use async_trait::async_trait;
5use crb_core::{mpsc, watch};
6use crb_runtime::Stopper;
7use crb_send::{Recipient, Sender};
8use derive_more::{Deref, DerefMut};
9
10pub struct AddressJoint<A: Agent> {
11    msg_rx: mpsc::UnboundedReceiver<Envelope<A>>,
12    status_tx: watch::Sender<AgentStatus>,
13}
14
15impl<A: Agent> AddressJoint<A> {
16    pub fn new_pair(stopper: Stopper) -> (Address<A>, AddressJoint<A>) {
17        let (msg_tx, msg_rx) = mpsc::unbounded_channel();
18        let (status_tx, status_rx) = watch::channel(AgentStatus::Active);
19        let address = Address {
20            msg_tx,
21            status_rx,
22            stopper,
23        };
24        let joint = AddressJoint { msg_rx, status_tx };
25        (address, joint)
26    }
27
28    pub fn report(&mut self, interrupted: bool) -> Result<()> {
29        let status = if interrupted {
30            AgentStatus::Interrupted
31        } else {
32            AgentStatus::Done
33        };
34        self.status_tx.send(status)?;
35        Ok(())
36    }
37
38    pub async fn next_envelope(&mut self) -> Option<Envelope<A>> {
39        self.msg_rx.recv().await
40    }
41
42    pub fn close(&mut self) {
43        self.msg_rx.close();
44    }
45}
46
47pub struct Address<A: Agent> {
48    msg_tx: mpsc::UnboundedSender<Envelope<A>>,
49    status_rx: watch::Receiver<AgentStatus>,
50    stopper: Stopper,
51}
52
53impl<A: Agent> Address<A> {
54    pub fn send(&self, msg: impl MessageFor<A>) -> Result<()> {
55        self.msg_tx
56            .send(Box::new(msg))
57            .map_err(|_| Error::msg("Can't send the message to the actor"))
58    }
59
60    /// Important! `join` must use a reference to allow using it under `DerefMut` trait
61    pub async fn join(&mut self) -> Result<AgentStatus> {
62        let status = self.status_rx.wait_for(AgentStatus::is_finished).await?;
63        Ok(status.clone())
64    }
65
66    pub(crate) fn stopper(&self) -> &Stopper {
67        &self.stopper
68    }
69}
70
71impl<A: Agent> Clone for Address<A> {
72    fn clone(&self) -> Self {
73        Self {
74            msg_tx: self.msg_tx.clone(),
75            status_rx: self.status_rx.clone(),
76            stopper: self.stopper.clone(),
77        }
78    }
79}
80
81impl<A, M> Sender<M> for Address<A>
82where
83    A: Agent,
84    M: MessageFor<A>,
85{
86    fn send(&self, input: M) -> Result<()> {
87        Address::send(self, input)
88    }
89}
90
91impl<A: Agent> Address<A> {
92    pub fn sender<M>(&self) -> Recipient<M>
93    where
94        M: MessageFor<A>,
95    {
96        Recipient::new(self.clone())
97    }
98}
99
100#[derive(PartialEq, Eq, Clone)]
101pub enum AgentStatus {
102    Active,
103    Interrupted,
104    Done,
105}
106
107impl AgentStatus {
108    pub fn is_finished(&self) -> bool {
109        matches!(self, Self::Interrupted | Self::Done)
110    }
111}
112
113pub type Envelope<A> = Box<dyn MessageFor<A>>;
114
115#[async_trait]
116pub trait MessageFor<A: Agent>: Send + 'static {
117    async fn handle(self: Box<Self>, actor: &mut A, ctx: &mut Context<A>) -> Result<()>;
118}
119
120#[derive(Deref, DerefMut)]
121pub struct Link<A: Agent> {
122    inner: A::Link,
123}
124
125impl<A: Agent> From<Address<A>> for Link<A> {
126    fn from(address: Address<A>) -> Self {
127        Self {
128            inner: A::Link::from(address),
129        }
130    }
131}
132
133impl<A: Agent> Link<A> {
134    pub fn into_inner(self) -> A::Link {
135        self.inner
136    }
137}