1use crate::{Actor, ActorId, Caller, Context, Error, Handler, Message, Result, Sender};
2use futures::channel::{mpsc, oneshot};
3use futures::future::Shared;
4use futures::Future;
5use std::hash::{Hash, Hasher};
6use std::pin::Pin;
7use std::sync::{Arc, Mutex, Weak};
8
9type ExecFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
10
11pub(crate) type ExecFn<A> =
12 Box<dyn for<'a> FnOnce(&'a mut A, &'a mut Context<A>) -> ExecFuture<'a> + Send + 'static>;
13
14pub(crate) enum ActorEvent<A> {
15 Exec(ExecFn<A>),
16 Stop(Option<Error>),
17 RemoveStream(usize),
18}
19
20pub struct Addr<A> {
25 pub(crate) actor_id: ActorId,
26 pub(crate) tx: Arc<mpsc::UnboundedSender<ActorEvent<A>>>,
27 pub(crate) rx_exit: Option<Shared<oneshot::Receiver<()>>>,
28}
29
30impl<A> Clone for Addr<A> {
31 fn clone(&self) -> Self {
32 Self {
33 actor_id: self.actor_id,
34 tx: self.tx.clone(),
35 rx_exit: self.rx_exit.clone(),
36 }
37 }
38}
39
40impl<A> Addr<A> {
41 pub fn downgrade(&self) -> WeakAddr<A> {
42 WeakAddr {
43 actor_id: self.actor_id,
44 tx: Arc::downgrade(&self.tx),
45 rx_exit: self.rx_exit.clone(),
46 }
47 }
48}
49
50impl<A> PartialEq for Addr<A> {
51 fn eq(&self, other: &Self) -> bool {
52 self.actor_id == other.actor_id
53 }
54}
55
56impl<A> Hash for Addr<A> {
57 fn hash<H: Hasher>(&self, state: &mut H) {
58 self.actor_id.hash(state)
59 }
60}
61
62impl<A: Actor> Addr<A> {
63 pub fn actor_id(&self) -> ActorId {
65 self.actor_id
66 }
67
68 pub fn stop(&mut self, err: Option<Error>) -> Result<()> {
70 mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Stop(err))?;
71 Ok(())
72 }
73
74 pub async fn call<T: Message>(&self, msg: T) -> Result<T::Result>
76 where
77 A: Handler<T>,
78 {
79 let (tx, rx) = oneshot::channel();
80 mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Exec(Box::new(
81 move |actor, ctx| {
82 Box::pin(async move {
83 let res = Handler::handle(actor, ctx, msg).await;
84 let _ = tx.send(res);
85 })
86 },
87 )))?;
88
89 Ok(rx.await?)
90 }
91
92 pub fn send<T: Message<Result = ()>>(&self, msg: T) -> Result<()>
94 where
95 A: Handler<T>,
96 {
97 mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Exec(Box::new(
98 move |actor, ctx| {
99 Box::pin(async move {
100 Handler::handle(actor, ctx, msg).await;
101 })
102 },
103 )))?;
104 Ok(())
105 }
106
107 pub fn caller<T: Message>(&self) -> Caller<T>
109 where
110 A: Handler<T>,
111 {
112 let weak_tx = Arc::downgrade(&self.tx);
113
114 Caller {
115 actor_id: self.actor_id.clone(),
116 caller_fn: Mutex::new(Box::new(move |msg| {
117 let weak_tx_option = weak_tx.upgrade();
118 Box::pin(async move {
119 match weak_tx_option {
120 Some(tx) => {
121 let (oneshot_tx, oneshot_rx) = oneshot::channel();
122
123 mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(
124 Box::new(move |actor, ctx| {
125 Box::pin(async move {
126 let res = Handler::handle(&mut *actor, ctx, msg).await;
127 let _ = oneshot_tx.send(res);
128 })
129 }),
130 ))?;
131 Ok(oneshot_rx.await?)
132 }
133 None => Err(anyhow::anyhow!("Actor Dropped")),
134 }
135 })
136 })),
137 }
138 }
139
140 pub fn sender<T: Message<Result = ()>>(&self) -> Sender<T>
142 where
143 A: Handler<T>,
144 {
145 let weak_tx = Arc::downgrade(&self.tx);
146 Sender {
147 actor_id: self.actor_id.clone(),
148 sender_fn: Box::new(move |msg| match weak_tx.upgrade() {
149 Some(tx) => {
150 mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new(
151 move |actor, ctx| {
152 Box::pin(async move {
153 Handler::handle(&mut *actor, ctx, msg).await;
154 })
155 },
156 )))?;
157 Ok(())
158 }
159 None => Ok(()),
160 }),
161 }
162 }
163
164 pub async fn wait_for_stop(self) {
166 if let Some(rx_exit) = self.rx_exit {
167 rx_exit.await.ok();
168 } else {
169 futures::future::pending::<()>().await;
170 }
171 }
172}
173
174pub struct WeakAddr<A> {
175 pub(crate) actor_id: ActorId,
176 pub(crate) tx: Weak<mpsc::UnboundedSender<ActorEvent<A>>>,
177 pub(crate) rx_exit: Option<Shared<oneshot::Receiver<()>>>,
178}
179
180impl<A> PartialEq for WeakAddr<A> {
181 fn eq(&self, other: &Self) -> bool {
182 self.actor_id == other.actor_id
183 }
184}
185
186impl<A> Hash for WeakAddr<A> {
187 fn hash<H: Hasher>(&self, state: &mut H) {
188 self.actor_id.hash(state)
189 }
190}
191
192impl<A> WeakAddr<A> {
193 pub fn upgrade(&self) -> Option<Addr<A>> {
194 match self.tx.upgrade() {
195 Some(tx) => Some(Addr {
196 actor_id: self.actor_id,
197 tx,
198 rx_exit: self.rx_exit.clone(),
199 }),
200 None => None,
201 }
202 }
203}
204
205impl<A> Clone for WeakAddr<A> {
206 fn clone(&self) -> Self {
207 Self {
208 actor_id: self.actor_id,
209 tx: self.tx.clone(),
210 rx_exit: self.rx_exit.clone(),
211 }
212 }
213}
214
215impl<A: Actor> WeakAddr<A> {
216 pub fn actor_id(&self) -> ActorId {
218 self.actor_id
219 }
220}