1use std::any::Any;
2use std::cmp::Ordering;
3use std::fmt::{self, Debug};
4use std::future::Future;
5use std::hash::{Hash, Hasher};
6use std::sync::{Arc, Weak};
7use std::{mem, ptr};
8
9use futures::channel::{mpsc, oneshot};
10use futures::future::{self, BoxFuture, FutureExt};
11use futures::select_biased;
12use futures::stream::{FuturesUnordered, StreamExt};
13use futures::task::{Spawn, SpawnError, SpawnExt};
14
15use crate::{send, Actor, Produces, Termination};
16
17type MutItem<T> = Box<dyn for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, bool> + Send>;
18type FutItem = BoxFuture<'static, ()>;
19
20async fn mutex_task<T>(
21 value: T,
22 mut mut_channel: mpsc::UnboundedReceiver<MutItem<T>>,
23 mut fut_channel: mpsc::UnboundedReceiver<FutItem>,
24) {
25 let mut futs = FuturesUnordered::new();
26 let mut value = value;
29 loop {
30 let current_item = loop {
32 if select_biased! {
33 _ = futs.select_next_some() => false,
34 item = mut_channel.next() => if let Some(item) = item {
35 break item
36 } else {
37 true
38 },
39 item = fut_channel.select_next_some() => {
40 futs.push(item);
41 false
42 },
43 complete => true,
44 } {
45 return;
46 }
47 };
48
49 let mut current_future = current_item(&mut value).fuse();
51 loop {
52 select_biased! {
53 done = current_future => if done {
54 return;
55 } else {
56 break
57 },
58 _ = futs.select_next_some() => {},
59 item = fut_channel.select_next_some() => futs.push(item),
60 }
61 }
62 }
63}
64
65struct AddrInner<T> {
66 mut_channel: mpsc::UnboundedSender<MutItem<T>>,
67 fut_channel: mpsc::UnboundedSender<FutItem>,
68}
69
70impl<T: 'static> AddrInner<T> {
71 fn send_mut(this: &Arc<dyn Any + Send + Sync>, item: MutItem<T>) {
72 this.downcast_ref::<Self>()
73 .unwrap()
74 .mut_channel
75 .unbounded_send(item)
76 .ok();
77 }
78 fn send_fut(this: &Arc<dyn Any + Send + Sync>, item: FutItem) {
79 this.downcast_ref::<Self>()
80 .unwrap()
81 .fut_channel
82 .unbounded_send(item)
83 .ok();
84 }
85
86 fn send_mut_upcasted<U: ?Sized + 'static, F: Fn(&mut T) -> &mut U + Copy + Send>(
88 this: &Arc<dyn Any + Send + Sync>,
89 item: MutItem<U>,
90 ) {
91 assert_eq!(mem::size_of::<F>(), 0);
92
93 this.downcast_ref::<Self>()
94 .unwrap()
95 .mut_channel
96 .unbounded_send(Box::new(move |x| {
97 let f: F = unsafe { mem::zeroed() };
98 item(f(x))
99 }))
100 .ok();
101 }
102}
103
104fn send_unreachable<T>(_: &Arc<dyn Any + Send + Sync>, _: T) {
105 unreachable!()
106}
107
108pub trait AddrLike: Send + Sync + Clone + Debug + 'static + AsAddr<Addr = Self> {
111 type Actor: Actor + ?Sized;
113
114 #[doc(hidden)]
115 fn send_mut(&self, item: MutItem<Self::Actor>);
116
117 fn send_fut(&self, fut: impl Future<Output = ()> + Send + 'static);
119
120 fn call_fut<R: Send + 'static>(
124 &self,
125 fut: impl Future<Output = Produces<R>> + Send + 'static,
126 ) -> Produces<R> {
127 let (mut tx, rx) = oneshot::channel();
128 self.send_fut(async move {
129 select_biased! {
130 _ = tx.cancellation().fuse() => {}
131 res = fut.fuse() => {
132 let _ = tx.send(res);
133 }
134 };
135 });
136 Produces::Deferred(rx)
137 }
138
139 fn send_fut_with<F: Future<Output = ()> + Send + 'static>(&self, f: impl FnOnce(Self) -> F) {
141 self.send_fut(f(self.clone()));
142 }
143
144 fn call_fut_with<R: Send + 'static, F: Future<Output = Produces<R>> + Send + 'static>(
146 &self,
147 f: impl FnOnce(Self) -> F,
148 ) -> Produces<R> {
149 self.call_fut(f(self.clone()))
150 }
151
152 fn termination(&self) -> Termination {
156 Termination(self.call_fut(future::pending()))
157 }
158}
159
160pub trait AsAddr {
162 type Addr: AddrLike;
164
165 fn as_addr(&self) -> &Self::Addr;
167}
168
169impl<T: AsAddr + ?Sized> AsAddr for &T {
170 type Addr = T::Addr;
171 fn as_addr(&self) -> &Self::Addr {
172 (**self).as_addr()
173 }
174}
175impl<T: Actor + ?Sized> AsAddr for crate::Addr<T> {
176 type Addr = Self;
177 fn as_addr(&self) -> &Self::Addr {
178 self
179 }
180}
181impl<T: Actor + ?Sized> AsAddr for crate::WeakAddr<T> {
182 type Addr = Self;
183 fn as_addr(&self) -> &Self::Addr {
184 self
185 }
186}
187
188pub struct Addr<T: ?Sized + 'static> {
196 inner: Option<Arc<dyn Any + Send + Sync>>,
197 send_mut: &'static (dyn Fn(&Arc<dyn Any + Send + Sync>, MutItem<T>) + Send + Sync),
198 send_fut: &'static (dyn Fn(&Arc<dyn Any + Send + Sync>, FutItem) + Send + Sync),
199}
200
201impl<T: ?Sized> Debug for Addr<T> {
202 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
203 write!(
204 f,
205 "{} {{ detached: {} }}",
206 std::any::type_name::<Self>(),
207 self.inner.is_none()
208 )
209 }
210}
211
212impl<T: ?Sized> Clone for Addr<T> {
213 fn clone(&self) -> Self {
214 Self {
215 inner: self.inner.clone(),
216 send_mut: self.send_mut,
217 send_fut: self.send_fut,
218 }
219 }
220}
221
222impl<T: ?Sized> Default for Addr<T> {
223 fn default() -> Self {
224 Self::detached()
225 }
226}
227
228impl<T: ?Sized, U: ?Sized> PartialEq<Addr<U>> for Addr<T> {
229 fn eq(&self, rhs: &Addr<U>) -> bool {
230 self.ptr() == rhs.ptr()
231 }
232}
233
234impl<T: ?Sized, U: ?Sized> PartialEq<WeakAddr<U>> for Addr<T> {
235 fn eq(&self, rhs: &WeakAddr<U>) -> bool {
236 self.ptr() == rhs.ptr()
237 }
238}
239
240impl<T: ?Sized> Eq for Addr<T> {}
241impl<T: ?Sized> Hash for Addr<T> {
242 fn hash<H: Hasher>(&self, state: &mut H) {
243 self.ptr().hash(state)
244 }
245}
246
247impl<T: ?Sized, U: ?Sized> PartialOrd<Addr<U>> for Addr<T> {
248 fn partial_cmp(&self, rhs: &Addr<U>) -> Option<Ordering> {
249 self.ptr().partial_cmp(&rhs.ptr())
250 }
251}
252
253impl<T: ?Sized, U: ?Sized> PartialOrd<WeakAddr<U>> for Addr<T> {
254 fn partial_cmp(&self, rhs: &WeakAddr<U>) -> Option<Ordering> {
255 self.ptr().partial_cmp(&rhs.ptr())
256 }
257}
258impl<T: ?Sized> Ord for Addr<T> {
259 fn cmp(&self, rhs: &Addr<T>) -> Ordering {
260 self.ptr().cmp(&rhs.ptr())
261 }
262}
263
264impl<T: Actor + ?Sized> AddrLike for Addr<T> {
265 type Actor = T;
266
267 #[doc(hidden)]
268 fn send_mut(&self, item: MutItem<Self::Actor>) {
269 if let Some(inner) = &self.inner {
270 (self.send_mut)(inner, item);
271 }
272 }
273
274 fn send_fut(&self, fut: impl Future<Output = ()> + Send + 'static) {
275 if let Some(inner) = &self.inner {
276 (self.send_fut)(inner, FutureExt::boxed(fut));
277 }
278 }
279}
280
281impl<T: Actor> Addr<T> {
282 pub fn new<S: Spawn + ?Sized>(spawner: &S, value: T) -> Result<Self, SpawnError> {
284 let (mtx, mrx) = mpsc::unbounded();
285 let (ftx, frx) = mpsc::unbounded();
286 spawner.spawn(mutex_task(value, mrx, frx))?;
287 let addr = Self {
288 inner: Some(Arc::new(AddrInner {
289 mut_channel: mtx,
290 fut_channel: ftx,
291 })),
292 send_mut: &AddrInner::<T>::send_mut,
293 send_fut: &AddrInner::<T>::send_fut,
294 };
295
296 send!(addr.started(addr.clone()));
298
299 Ok(addr)
300 }
301 #[doc(hidden)]
302 pub fn upcast<U: ?Sized + Send + 'static, F: Fn(&mut T) -> &mut U + Copy + Send + 'static>(
303 self,
304 _f: F,
305 ) -> Addr<U> {
306 Addr {
307 inner: self.inner,
308 send_mut: &AddrInner::<T>::send_mut_upcasted::<U, F>,
309 send_fut: self.send_fut,
310 }
311 }
312}
313impl<T: ?Sized> Addr<T> {
314 pub fn detached() -> Self {
316 Self {
317 inner: None,
318 send_mut: &send_unreachable,
319 send_fut: &send_unreachable,
320 }
321 }
322 fn ptr(&self) -> *const () {
323 if let Some(inner) = &self.inner {
324 Arc::as_ptr(inner) as *const ()
325 } else {
326 ptr::null()
327 }
328 }
329}
330impl<T: ?Sized + Send + 'static> Addr<T> {
331 pub fn downgrade(&self) -> WeakAddr<T> {
333 WeakAddr {
334 inner: self.inner.as_ref().map(Arc::downgrade),
335 send_mut: self.send_mut,
336 send_fut: self.send_fut,
337 }
338 }
339 pub fn downcast<U: Send + 'static>(self) -> Result<Addr<U>, Addr<T>> {
344 if let Some(inner) = &self.inner {
345 if inner.is::<AddrInner<U>>() {
346 Ok(Addr {
347 inner: self.inner,
348 send_mut: &AddrInner::<U>::send_mut,
349 send_fut: self.send_fut,
350 })
351 } else {
352 Err(self)
353 }
354 } else {
355 Ok(Addr::detached())
356 }
357 }
358}
359
360pub struct WeakAddr<T: ?Sized + 'static> {
368 inner: Option<Weak<dyn Any + Send + Sync>>,
369 send_mut: &'static (dyn Fn(&Arc<dyn Any + Send + Sync>, MutItem<T>) + Send + Sync),
370 send_fut: &'static (dyn Fn(&Arc<dyn Any + Send + Sync>, FutItem) + Send + Sync),
371}
372
373impl<T: ?Sized> Clone for WeakAddr<T> {
374 fn clone(&self) -> Self {
375 Self {
376 inner: self.inner.clone(),
377 send_mut: self.send_mut,
378 send_fut: self.send_fut,
379 }
380 }
381}
382
383impl<T: ?Sized> Debug for WeakAddr<T> {
384 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
385 write!(f, "{} {{..}}", std::any::type_name::<Self>())
386 }
387}
388
389impl<T: ?Sized> Default for WeakAddr<T> {
390 fn default() -> Self {
391 Self::detached()
392 }
393}
394
395impl<T: ?Sized, U: ?Sized> PartialEq<Addr<U>> for WeakAddr<T> {
396 fn eq(&self, rhs: &Addr<U>) -> bool {
397 self.ptr() == rhs.ptr()
398 }
399}
400
401impl<T: ?Sized, U: ?Sized> PartialEq<WeakAddr<U>> for WeakAddr<T> {
402 fn eq(&self, rhs: &WeakAddr<U>) -> bool {
403 self.ptr() == rhs.ptr()
404 }
405}
406
407impl<T: ?Sized> Eq for WeakAddr<T> {}
408impl<T: ?Sized> Hash for WeakAddr<T> {
409 fn hash<H: Hasher>(&self, state: &mut H) {
410 self.ptr().hash(state)
411 }
412}
413
414impl<T: ?Sized, U: ?Sized> PartialOrd<Addr<U>> for WeakAddr<T> {
415 fn partial_cmp(&self, rhs: &Addr<U>) -> Option<Ordering> {
416 self.ptr().partial_cmp(&rhs.ptr())
417 }
418}
419
420impl<T: ?Sized, U: ?Sized> PartialOrd<WeakAddr<U>> for WeakAddr<T> {
421 fn partial_cmp(&self, rhs: &WeakAddr<U>) -> Option<Ordering> {
422 self.ptr().partial_cmp(&rhs.ptr())
423 }
424}
425impl<T: ?Sized> Ord for WeakAddr<T> {
426 fn cmp(&self, rhs: &WeakAddr<T>) -> Ordering {
427 self.ptr().cmp(&rhs.ptr())
428 }
429}
430
431fn upgrade_weak<T: ?Sized>(maybe_weak: &Option<Weak<T>>) -> Option<Arc<T>> {
432 maybe_weak.as_ref().and_then(Weak::upgrade)
433}
434
435impl<T: Actor + ?Sized> AddrLike for WeakAddr<T> {
436 type Actor = T;
437
438 #[doc(hidden)]
439 fn send_mut(&self, item: MutItem<Self::Actor>) {
440 if let Some(inner) = upgrade_weak(&self.inner) {
441 (self.send_mut)(&inner, item);
442 }
443 }
444
445 fn send_fut(&self, fut: impl Future<Output = ()> + Send + 'static) {
446 if let Some(inner) = upgrade_weak(&self.inner) {
447 (self.send_fut)(&inner, FutureExt::boxed(fut));
448 }
449 }
450}
451
452impl<T: ?Sized> WeakAddr<T> {
453 pub fn detached() -> Self {
455 Self {
456 inner: None,
457 send_mut: &send_unreachable,
458 send_fut: &send_unreachable,
459 }
460 }
461 fn ptr(&self) -> *const () {
464 if let Some(inner) = upgrade_weak(&self.inner) {
465 Arc::as_ptr(&inner) as *const ()
466 } else {
467 ptr::null()
468 }
469 }
470}
471impl<T: Send + 'static> WeakAddr<T> {
472 #[doc(hidden)]
473 pub fn upcast<U: ?Sized + Send + 'static, F: Fn(&mut T) -> &mut U + Copy + Send + 'static>(
474 self,
475 _f: F,
476 ) -> WeakAddr<U> {
477 WeakAddr {
478 inner: self.inner,
479 send_mut: &AddrInner::<T>::send_mut_upcasted::<U, F>,
480 send_fut: self.send_fut,
481 }
482 }
483}
484impl<T: ?Sized + Send + 'static> WeakAddr<T> {
485 pub fn upgrade(&self) -> Addr<T> {
488 if let Some(inner) = upgrade_weak(&self.inner) {
489 Addr {
490 inner: Some(inner),
491 send_mut: self.send_mut,
492 send_fut: self.send_fut,
493 }
494 } else {
495 Addr::detached()
496 }
497 }
498}