1use crate::agent::Agent;
2use crate::context::Context;
3use anyhow::{Error, Result};
4use async_trait::async_trait;
5use crb_core::{mpsc, watch};
6use crb_runtime::Stopper;
7use crb_send::{Recipient, Sender};
8use derive_more::{Deref, DerefMut};
9
10pub struct AddressJoint<A: Agent> {
11 msg_rx: mpsc::UnboundedReceiver<Envelope<A>>,
12 status_tx: watch::Sender<AgentStatus>,
13}
14
15impl<A: Agent> AddressJoint<A> {
16 pub fn new_pair(stopper: Stopper) -> (Address<A>, AddressJoint<A>) {
17 let (msg_tx, msg_rx) = mpsc::unbounded_channel();
18 let (status_tx, status_rx) = watch::channel(AgentStatus::Active);
19 let address = Address {
20 msg_tx,
21 status_rx,
22 stopper,
23 };
24 let joint = AddressJoint { msg_rx, status_tx };
25 (address, joint)
26 }
27
28 pub fn report(&mut self, interrupted: bool) -> Result<()> {
29 let status = if interrupted {
30 AgentStatus::Interrupted
31 } else {
32 AgentStatus::Done
33 };
34 self.status_tx.send(status)?;
35 Ok(())
36 }
37
38 pub async fn next_envelope(&mut self) -> Option<Envelope<A>> {
39 self.msg_rx.recv().await
40 }
41
42 pub fn close(&mut self) {
43 self.msg_rx.close();
44 }
45}
46
47pub struct Address<A: Agent> {
48 msg_tx: mpsc::UnboundedSender<Envelope<A>>,
49 status_rx: watch::Receiver<AgentStatus>,
50 stopper: Stopper,
51}
52
53impl<A: Agent> Address<A> {
54 pub fn send(&self, msg: impl MessageFor<A>) -> Result<()> {
55 self.msg_tx
56 .send(Box::new(msg))
57 .map_err(|_| Error::msg("Can't send the message to the actor"))
58 }
59
60 pub async fn join(&mut self) -> Result<AgentStatus> {
62 let status = self.status_rx.wait_for(AgentStatus::is_finished).await?;
63 Ok(status.clone())
64 }
65
66 pub(crate) fn stopper(&self) -> &Stopper {
67 &self.stopper
68 }
69}
70
71impl<A: Agent> Clone for Address<A> {
72 fn clone(&self) -> Self {
73 Self {
74 msg_tx: self.msg_tx.clone(),
75 status_rx: self.status_rx.clone(),
76 stopper: self.stopper.clone(),
77 }
78 }
79}
80
81impl<A, M> Sender<M> for Address<A>
82where
83 A: Agent,
84 M: MessageFor<A>,
85{
86 fn send(&self, input: M) -> Result<()> {
87 Address::send(self, input)
88 }
89}
90
91impl<A: Agent> Address<A> {
92 pub fn sender<M>(&self) -> Recipient<M>
93 where
94 M: MessageFor<A>,
95 {
96 Recipient::new(self.clone())
97 }
98}
99
100#[derive(PartialEq, Eq, Clone)]
101pub enum AgentStatus {
102 Active,
103 Interrupted,
104 Done,
105}
106
107impl AgentStatus {
108 pub fn is_finished(&self) -> bool {
109 matches!(self, Self::Interrupted | Self::Done)
110 }
111}
112
113pub type Envelope<A> = Box<dyn MessageFor<A>>;
114
115#[async_trait]
116pub trait MessageFor<A: Agent>: Send + 'static {
117 async fn handle(self: Box<Self>, actor: &mut A, ctx: &mut Context<A>) -> Result<()>;
118}
119
120#[derive(Deref, DerefMut)]
121pub struct Link<A: Agent> {
122 inner: A::Link,
123}
124
125impl<A: Agent> From<Address<A>> for Link<A> {
126 fn from(address: Address<A>) -> Self {
127 Self {
128 inner: A::Link::from(address),
129 }
130 }
131}
132
133impl<A: Agent> Link<A> {
134 pub fn into_inner(self) -> A::Link {
135 self.inner
136 }
137}