act_zero/
addr.rs

1use std::any::Any;
2use std::cmp::Ordering;
3use std::fmt::{self, Debug};
4use std::future::Future;
5use std::hash::{Hash, Hasher};
6use std::sync::{Arc, Weak};
7use std::{mem, ptr};
8
9use futures::channel::{mpsc, oneshot};
10use futures::future::{self, BoxFuture, FutureExt};
11use futures::select_biased;
12use futures::stream::{FuturesUnordered, StreamExt};
13use futures::task::{Spawn, SpawnError, SpawnExt};
14
15use crate::{send, Actor, Produces, Termination};
16
17type MutItem<T> = Box<dyn for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, bool> + Send>;
18type FutItem = BoxFuture<'static, ()>;
19
20async fn mutex_task<T>(
21    value: T,
22    mut mut_channel: mpsc::UnboundedReceiver<MutItem<T>>,
23    mut fut_channel: mpsc::UnboundedReceiver<FutItem>,
24) {
25    let mut futs = FuturesUnordered::new();
26    // Re-bind 'value' so that it is dropped before futs.
27    // That will ensure .termination() completes only once the value's drop has finished.
28    let mut value = value;
29    loop {
30        // Obtain an item
31        let current_item = loop {
32            if select_biased! {
33                _ = futs.select_next_some() => false,
34                item = mut_channel.next() => if let Some(item) = item {
35                    break item
36                } else {
37                    true
38                },
39                item = fut_channel.select_next_some() => {
40                    futs.push(item);
41                    false
42                },
43                complete => true,
44            } {
45                return;
46            }
47        };
48
49        // Wait for the current item to run
50        let mut current_future = current_item(&mut value).fuse();
51        loop {
52            select_biased! {
53                done = current_future => if done {
54                    return;
55                } else {
56                    break
57                },
58                _ = futs.select_next_some() => {},
59                item = fut_channel.select_next_some() => futs.push(item),
60            }
61        }
62    }
63}
64
65struct AddrInner<T> {
66    mut_channel: mpsc::UnboundedSender<MutItem<T>>,
67    fut_channel: mpsc::UnboundedSender<FutItem>,
68}
69
70impl<T: 'static> AddrInner<T> {
71    fn send_mut(this: &Arc<dyn Any + Send + Sync>, item: MutItem<T>) {
72        this.downcast_ref::<Self>()
73            .unwrap()
74            .mut_channel
75            .unbounded_send(item)
76            .ok();
77    }
78    fn send_fut(this: &Arc<dyn Any + Send + Sync>, item: FutItem) {
79        this.downcast_ref::<Self>()
80            .unwrap()
81            .fut_channel
82            .unbounded_send(item)
83            .ok();
84    }
85
86    // Must only be called if we have previously encountered a witness value of type `F`.
87    fn send_mut_upcasted<U: ?Sized + 'static, F: Fn(&mut T) -> &mut U + Copy + Send>(
88        this: &Arc<dyn Any + Send + Sync>,
89        item: MutItem<U>,
90    ) {
91        assert_eq!(mem::size_of::<F>(), 0);
92
93        this.downcast_ref::<Self>()
94            .unwrap()
95            .mut_channel
96            .unbounded_send(Box::new(move |x| {
97                let f: F = unsafe { mem::zeroed() };
98                item(f(x))
99            }))
100            .ok();
101    }
102}
103
104fn send_unreachable<T>(_: &Arc<dyn Any + Send + Sync>, _: T) {
105    unreachable!()
106}
107
108/// Trait provides methods for spawning futures onto an actor. Implemented by
109/// `Addr` and `WeakAddr` alike.
110pub trait AddrLike: Send + Sync + Clone + Debug + 'static + AsAddr<Addr = Self> {
111    /// Type of the actor reference by this address.
112    type Actor: Actor + ?Sized;
113
114    #[doc(hidden)]
115    fn send_mut(&self, item: MutItem<Self::Actor>);
116
117    /// Spawn a future onto the actor which does not return a value.
118    fn send_fut(&self, fut: impl Future<Output = ()> + Send + 'static);
119
120    /// Spawn a future onto the actor and provide the means to get back
121    /// the result. The future will be cancelled if the receiver is
122    /// dropped before it has completed.
123    fn call_fut<R: Send + 'static>(
124        &self,
125        fut: impl Future<Output = Produces<R>> + Send + 'static,
126    ) -> Produces<R> {
127        let (mut tx, rx) = oneshot::channel();
128        self.send_fut(async move {
129            select_biased! {
130                _ = tx.cancellation().fuse() => {}
131                res = fut.fuse() => {
132                    let _ = tx.send(res);
133                }
134            };
135        });
136        Produces::Deferred(rx)
137    }
138
139    /// Equivalent to `send_fut` but provides access to the actor's address.
140    fn send_fut_with<F: Future<Output = ()> + Send + 'static>(&self, f: impl FnOnce(Self) -> F) {
141        self.send_fut(f(self.clone()));
142    }
143
144    /// Equivalent to `call_fut` but provides access to the actor's address.
145    fn call_fut_with<R: Send + 'static, F: Future<Output = Produces<R>> + Send + 'static>(
146        &self,
147        f: impl FnOnce(Self) -> F,
148    ) -> Produces<R> {
149        self.call_fut(f(self.clone()))
150    }
151
152    /// Returns a future which resolves when the actor terminates. If the
153    /// actor has already terminated, or if this address is detached, the
154    /// future will resolve immediately.
155    fn termination(&self) -> Termination {
156        Termination(self.call_fut(future::pending()))
157    }
158}
159
160/// Implemented by addresses and references to addresses
161pub trait AsAddr {
162    /// The inner address type
163    type Addr: AddrLike;
164
165    /// Obtain a direct reference to the address
166    fn as_addr(&self) -> &Self::Addr;
167}
168
169impl<T: AsAddr + ?Sized> AsAddr for &T {
170    type Addr = T::Addr;
171    fn as_addr(&self) -> &Self::Addr {
172        (**self).as_addr()
173    }
174}
175impl<T: Actor + ?Sized> AsAddr for crate::Addr<T> {
176    type Addr = Self;
177    fn as_addr(&self) -> &Self::Addr {
178        self
179    }
180}
181impl<T: Actor + ?Sized> AsAddr for crate::WeakAddr<T> {
182    type Addr = Self;
183    fn as_addr(&self) -> &Self::Addr {
184        self
185    }
186}
187
188/// A strong reference to a spawned actor. Actors can be spawned using `Addr::new`.
189///
190/// Methods can be called on the actor after it has been spawned using the
191/// `send!(...)` and `call!(...)` macros.
192///
193/// Can be converted to the address of a trait-object using the `upcast!(...)`
194/// macro.
195pub struct Addr<T: ?Sized + 'static> {
196    inner: Option<Arc<dyn Any + Send + Sync>>,
197    send_mut: &'static (dyn Fn(&Arc<dyn Any + Send + Sync>, MutItem<T>) + Send + Sync),
198    send_fut: &'static (dyn Fn(&Arc<dyn Any + Send + Sync>, FutItem) + Send + Sync),
199}
200
201impl<T: ?Sized> Debug for Addr<T> {
202    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
203        write!(
204            f,
205            "{} {{ detached: {} }}",
206            std::any::type_name::<Self>(),
207            self.inner.is_none()
208        )
209    }
210}
211
212impl<T: ?Sized> Clone for Addr<T> {
213    fn clone(&self) -> Self {
214        Self {
215            inner: self.inner.clone(),
216            send_mut: self.send_mut,
217            send_fut: self.send_fut,
218        }
219    }
220}
221
222impl<T: ?Sized> Default for Addr<T> {
223    fn default() -> Self {
224        Self::detached()
225    }
226}
227
228impl<T: ?Sized, U: ?Sized> PartialEq<Addr<U>> for Addr<T> {
229    fn eq(&self, rhs: &Addr<U>) -> bool {
230        self.ptr() == rhs.ptr()
231    }
232}
233
234impl<T: ?Sized, U: ?Sized> PartialEq<WeakAddr<U>> for Addr<T> {
235    fn eq(&self, rhs: &WeakAddr<U>) -> bool {
236        self.ptr() == rhs.ptr()
237    }
238}
239
240impl<T: ?Sized> Eq for Addr<T> {}
241impl<T: ?Sized> Hash for Addr<T> {
242    fn hash<H: Hasher>(&self, state: &mut H) {
243        self.ptr().hash(state)
244    }
245}
246
247impl<T: ?Sized, U: ?Sized> PartialOrd<Addr<U>> for Addr<T> {
248    fn partial_cmp(&self, rhs: &Addr<U>) -> Option<Ordering> {
249        self.ptr().partial_cmp(&rhs.ptr())
250    }
251}
252
253impl<T: ?Sized, U: ?Sized> PartialOrd<WeakAddr<U>> for Addr<T> {
254    fn partial_cmp(&self, rhs: &WeakAddr<U>) -> Option<Ordering> {
255        self.ptr().partial_cmp(&rhs.ptr())
256    }
257}
258impl<T: ?Sized> Ord for Addr<T> {
259    fn cmp(&self, rhs: &Addr<T>) -> Ordering {
260        self.ptr().cmp(&rhs.ptr())
261    }
262}
263
264impl<T: Actor + ?Sized> AddrLike for Addr<T> {
265    type Actor = T;
266
267    #[doc(hidden)]
268    fn send_mut(&self, item: MutItem<Self::Actor>) {
269        if let Some(inner) = &self.inner {
270            (self.send_mut)(inner, item);
271        }
272    }
273
274    fn send_fut(&self, fut: impl Future<Output = ()> + Send + 'static) {
275        if let Some(inner) = &self.inner {
276            (self.send_fut)(inner, FutureExt::boxed(fut));
277        }
278    }
279}
280
281impl<T: Actor> Addr<T> {
282    /// Spawn an actor using the given spawner. If successful returns the address of the actor.
283    pub fn new<S: Spawn + ?Sized>(spawner: &S, value: T) -> Result<Self, SpawnError> {
284        let (mtx, mrx) = mpsc::unbounded();
285        let (ftx, frx) = mpsc::unbounded();
286        spawner.spawn(mutex_task(value, mrx, frx))?;
287        let addr = Self {
288            inner: Some(Arc::new(AddrInner {
289                mut_channel: mtx,
290                fut_channel: ftx,
291            })),
292            send_mut: &AddrInner::<T>::send_mut,
293            send_fut: &AddrInner::<T>::send_fut,
294        };
295
296        // Tell the actor its own address
297        send!(addr.started(addr.clone()));
298
299        Ok(addr)
300    }
301    #[doc(hidden)]
302    pub fn upcast<U: ?Sized + Send + 'static, F: Fn(&mut T) -> &mut U + Copy + Send + 'static>(
303        self,
304        _f: F,
305    ) -> Addr<U> {
306        Addr {
307            inner: self.inner,
308            send_mut: &AddrInner::<T>::send_mut_upcasted::<U, F>,
309            send_fut: self.send_fut,
310        }
311    }
312}
313impl<T: ?Sized> Addr<T> {
314    /// Create an address which does not refer to any actor.
315    pub fn detached() -> Self {
316        Self {
317            inner: None,
318            send_mut: &send_unreachable,
319            send_fut: &send_unreachable,
320        }
321    }
322    fn ptr(&self) -> *const () {
323        if let Some(inner) = &self.inner {
324            Arc::as_ptr(inner) as *const ()
325        } else {
326            ptr::null()
327        }
328    }
329}
330impl<T: ?Sized + Send + 'static> Addr<T> {
331    /// Downgrade to a weak reference, which does not try to keep the actor alive.
332    pub fn downgrade(&self) -> WeakAddr<T> {
333        WeakAddr {
334            inner: self.inner.as_ref().map(Arc::downgrade),
335            send_mut: self.send_mut,
336            send_fut: self.send_fut,
337        }
338    }
339    /// Attempt to downcast the address of a "trait-object actor" to a concrete type.
340    ///
341    /// This function may succeed even when the cast would normally be
342    /// unsuccessful if the address has become detached.
343    pub fn downcast<U: Send + 'static>(self) -> Result<Addr<U>, Addr<T>> {
344        if let Some(inner) = &self.inner {
345            if inner.is::<AddrInner<U>>() {
346                Ok(Addr {
347                    inner: self.inner,
348                    send_mut: &AddrInner::<U>::send_mut,
349                    send_fut: self.send_fut,
350                })
351            } else {
352                Err(self)
353            }
354        } else {
355            Ok(Addr::detached())
356        }
357    }
358}
359
360/// A weak reference to a spawned actor.
361///
362/// Methods can be called on the actor after it has been spawned using the
363/// `send!(...)` and `call!(...)` macros.
364///
365/// Can be converted to the address of a trait-object using the `upcast!(...)`
366/// macro.
367pub struct WeakAddr<T: ?Sized + 'static> {
368    inner: Option<Weak<dyn Any + Send + Sync>>,
369    send_mut: &'static (dyn Fn(&Arc<dyn Any + Send + Sync>, MutItem<T>) + Send + Sync),
370    send_fut: &'static (dyn Fn(&Arc<dyn Any + Send + Sync>, FutItem) + Send + Sync),
371}
372
373impl<T: ?Sized> Clone for WeakAddr<T> {
374    fn clone(&self) -> Self {
375        Self {
376            inner: self.inner.clone(),
377            send_mut: self.send_mut,
378            send_fut: self.send_fut,
379        }
380    }
381}
382
383impl<T: ?Sized> Debug for WeakAddr<T> {
384    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
385        write!(f, "{} {{..}}", std::any::type_name::<Self>())
386    }
387}
388
389impl<T: ?Sized> Default for WeakAddr<T> {
390    fn default() -> Self {
391        Self::detached()
392    }
393}
394
395impl<T: ?Sized, U: ?Sized> PartialEq<Addr<U>> for WeakAddr<T> {
396    fn eq(&self, rhs: &Addr<U>) -> bool {
397        self.ptr() == rhs.ptr()
398    }
399}
400
401impl<T: ?Sized, U: ?Sized> PartialEq<WeakAddr<U>> for WeakAddr<T> {
402    fn eq(&self, rhs: &WeakAddr<U>) -> bool {
403        self.ptr() == rhs.ptr()
404    }
405}
406
407impl<T: ?Sized> Eq for WeakAddr<T> {}
408impl<T: ?Sized> Hash for WeakAddr<T> {
409    fn hash<H: Hasher>(&self, state: &mut H) {
410        self.ptr().hash(state)
411    }
412}
413
414impl<T: ?Sized, U: ?Sized> PartialOrd<Addr<U>> for WeakAddr<T> {
415    fn partial_cmp(&self, rhs: &Addr<U>) -> Option<Ordering> {
416        self.ptr().partial_cmp(&rhs.ptr())
417    }
418}
419
420impl<T: ?Sized, U: ?Sized> PartialOrd<WeakAddr<U>> for WeakAddr<T> {
421    fn partial_cmp(&self, rhs: &WeakAddr<U>) -> Option<Ordering> {
422        self.ptr().partial_cmp(&rhs.ptr())
423    }
424}
425impl<T: ?Sized> Ord for WeakAddr<T> {
426    fn cmp(&self, rhs: &WeakAddr<T>) -> Ordering {
427        self.ptr().cmp(&rhs.ptr())
428    }
429}
430
431fn upgrade_weak<T: ?Sized>(maybe_weak: &Option<Weak<T>>) -> Option<Arc<T>> {
432    maybe_weak.as_ref().and_then(Weak::upgrade)
433}
434
435impl<T: Actor + ?Sized> AddrLike for WeakAddr<T> {
436    type Actor = T;
437
438    #[doc(hidden)]
439    fn send_mut(&self, item: MutItem<Self::Actor>) {
440        if let Some(inner) = upgrade_weak(&self.inner) {
441            (self.send_mut)(&inner, item);
442        }
443    }
444
445    fn send_fut(&self, fut: impl Future<Output = ()> + Send + 'static) {
446        if let Some(inner) = upgrade_weak(&self.inner) {
447            (self.send_fut)(&inner, FutureExt::boxed(fut));
448        }
449    }
450}
451
452impl<T: ?Sized> WeakAddr<T> {
453    /// Create an address which does not refer to any actor.
454    pub fn detached() -> Self {
455        Self {
456            inner: None,
457            send_mut: &send_unreachable,
458            send_fut: &send_unreachable,
459        }
460    }
461    // TODO: Replace this with an implementation using `Weak::as_ptr` once support for
462    // unsized values hits stable.
463    fn ptr(&self) -> *const () {
464        if let Some(inner) = upgrade_weak(&self.inner) {
465            Arc::as_ptr(&inner) as *const ()
466        } else {
467            ptr::null()
468        }
469    }
470}
471impl<T: Send + 'static> WeakAddr<T> {
472    #[doc(hidden)]
473    pub fn upcast<U: ?Sized + Send + 'static, F: Fn(&mut T) -> &mut U + Copy + Send + 'static>(
474        self,
475        _f: F,
476    ) -> WeakAddr<U> {
477        WeakAddr {
478            inner: self.inner,
479            send_mut: &AddrInner::<T>::send_mut_upcasted::<U, F>,
480            send_fut: self.send_fut,
481        }
482    }
483}
484impl<T: ?Sized + Send + 'static> WeakAddr<T> {
485    /// Upgrade this to a strong reference. If the actor has already stopped the returned
486    /// address will be detached.
487    pub fn upgrade(&self) -> Addr<T> {
488        if let Some(inner) = upgrade_weak(&self.inner) {
489            Addr {
490                inner: Some(inner),
491                send_mut: self.send_mut,
492                send_fut: self.send_fut,
493            }
494        } else {
495            Addr::detached()
496        }
497    }
498}