Skip to main content

actify/handles/
handle.rs

1use std::any::Any;
2use std::any::type_name;
3use std::fmt::{self, Debug};
4use tokio::sync::{broadcast, mpsc, oneshot};
5
6use super::read_handle::ReadHandle;
7use crate::actor::{Actor, ActorMethod, BroadcastFn, Job, serve};
8use crate::throttle::Throttle;
9use crate::{Cache, Frequency, Throttled};
10
11const CHANNEL_SIZE: usize = 100;
12const DOWNCAST_FAIL: &str =
13    "Actify Macro error: failed to downcast arguments to their concrete type";
14
15/// Defines how to convert an actor's value to its broadcast type.
16///
17/// A blanket implementation is provided for [`Clone`] types, broadcasting
18/// themselves. Implement this trait to broadcast a different type `V` from
19/// your actor type `T`, enabling:
20///
21/// - Non-Clone types to participate in broadcasting
22/// - Clone types to broadcast a lightweight summary instead of the full value
23///
24/// # Examples
25///
26/// ```
27/// use actify::BroadcastAs;
28///
29/// struct HeavyState {
30///     data: Vec<u8>,
31///     summary: String,
32/// }
33///
34/// #[derive(Clone, Debug)]
35/// struct Summary(String);
36///
37/// impl BroadcastAs<Summary> for HeavyState {
38///     fn to_broadcast(&self) -> Summary {
39///         Summary(self.summary.clone())
40///     }
41/// }
42/// ```
43pub trait BroadcastAs<V> {
44    fn to_broadcast(&self) -> V;
45}
46
47impl<T: Clone> BroadcastAs<T> for T {
48    fn to_broadcast(&self) -> T {
49        self.clone()
50    }
51}
52
53/// Creates the broadcast function that the [`Actor`] calls after each `&mut self` method.
54/// Converts the actor value to `V` via [`BroadcastAs`] and sends it to all subscribers.
55fn make_broadcast_fn<T, V>(sender: broadcast::Sender<V>) -> BroadcastFn<T>
56where
57    T: BroadcastAs<V>,
58    V: Clone + Send + Sync + 'static,
59{
60    Box::new(move |inner: &T, method: &str| {
61        if sender.receiver_count() > 0 {
62            if sender.send(inner.to_broadcast()).is_err() {
63                log::trace!("Broadcast failed because there are no active on {method:?}");
64            } else {
65                log::trace!("Broadcasted new value on {method:?}");
66            }
67        } else {
68            log::trace!("Skipping broadcast because there are no active receivers on {method:?}");
69        }
70    })
71}
72
73/// A clonable handle that can be used to remotely execute a closure on the corresponding [`Actor`].
74///
75/// Handles are the primary way to interact with actors. Clone them freely to share
76/// access across tasks. For read-only access, see [`ReadHandle`]. For local
77/// synchronization, see [`Cache`]. For rate-limited updates, see [`Throttle`].
78///
79/// The second type parameter `V` is the broadcast type. By default `V = T`,
80/// meaning the actor broadcasts clones of itself. To broadcast a different
81/// type, implement [`BroadcastAs<V>`] and specify `V` explicitly
82/// (e.g. `Handle::<MyType, Summary>::new(val)`).
83pub struct Handle<T, V = T> {
84    pub(super) tx: mpsc::Sender<Job<T>>,
85    pub(super) broadcast_sender: broadcast::Sender<V>,
86}
87
88impl<T, V> Clone for Handle<T, V> {
89    fn clone(&self) -> Self {
90        Handle {
91            tx: self.tx.clone(),
92            broadcast_sender: self.broadcast_sender.clone(),
93        }
94    }
95}
96
97impl<T, V> Debug for Handle<T, V> {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(f, "Handle<{}>", type_name::<T>())
100    }
101}
102
103impl<T: Default + Clone + Send + Sync + 'static> Default for Handle<T> {
104    fn default() -> Self {
105        Handle::new(T::default())
106    }
107}
108
109impl<T, V> Handle<T, V>
110where
111    T: BroadcastAs<V> + Send + Sync + 'static,
112    V: Clone + Send + Sync + 'static,
113{
114    /// Creates a new [`Handle`] and spawns the corresponding [`Actor`].
115    ///
116    /// For `Clone` types, `V` defaults to `T` — the actor broadcasts clones of
117    /// itself and you can simply write `Handle::new(val)`.
118    ///
119    /// For non-Clone types (or to broadcast a lightweight summary), implement
120    /// [`BroadcastAs<V>`] and specify `V` explicitly:
121    ///
122    /// ```
123    /// # use actify::{Handle, BroadcastAs};
124    /// # #[tokio::main]
125    /// # async fn main() {
126    /// #[derive(Clone, Debug, PartialEq)]
127    /// struct Size(usize);
128    ///
129    /// impl BroadcastAs<Size> for Vec<u8> {
130    ///     fn to_broadcast(&self) -> Size { Size(self.len()) }
131    /// }
132    ///
133    /// let handle: Handle<Vec<u8>, Size> = Handle::new(vec![1, 2, 3]);
134    /// let mut rx = handle.subscribe();
135    /// # }
136    /// ```
137    pub fn new(val: T) -> Handle<T, V> {
138        let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
139        let (broadcast_tx, _) = broadcast::channel::<V>(CHANNEL_SIZE);
140        tokio::spawn(serve(
141            rx,
142            Actor::new(make_broadcast_fn(broadcast_tx.clone()), val),
143        ));
144        Handle {
145            tx,
146            broadcast_sender: broadcast_tx,
147        }
148    }
149
150    /// Creates a new [`Handle`] and initializes a corresponding [`Throttle`].
151    /// The throttle fires given a specified [`Frequency`].
152    /// See [`Handle::spawn_throttle`] for an example.
153    pub fn new_throttled<C, F>(val: T, client: C, call: fn(&C, F), freq: Frequency) -> Handle<T, V>
154    where
155        C: Send + Sync + 'static,
156        V: Throttled<F>,
157        F: Clone + Send + Sync + 'static,
158    {
159        let init = val.to_broadcast();
160        let handle = Self::new(val);
161        let receiver = handle.subscribe();
162        Throttle::spawn_from_receiver(client, call, freq, receiver, Some(init));
163        handle
164    }
165}
166
167impl<T, V> Handle<T, V> {
168    /// Returns a [`tokio::sync::broadcast::Receiver`] that receives all broadcasted values.
169    /// Note that the inner value might not actually have changed.
170    /// It broadcasts on any method that has a mutable reference to the actor.
171    ///
172    /// # Examples
173    ///
174    /// ```
175    /// # use actify::Handle;
176    /// # #[tokio::main]
177    /// # async fn main() {
178    /// let handle = Handle::new(None);
179    /// let mut rx = handle.subscribe();
180    /// handle.set(Some("testing!")).await;
181    /// assert_eq!(rx.recv().await.unwrap(), Some("testing!"));
182    /// # }
183    /// ```
184    pub fn subscribe(&self) -> broadcast::Receiver<V> {
185        self.broadcast_sender.subscribe()
186    }
187
188    /// Returns a [`ReadHandle`] that provides read-only access to this actor.
189    pub fn get_read_handle(&self) -> ReadHandle<T, V> {
190        ReadHandle::new(self.clone())
191    }
192}
193
194impl<T: Send + Sync + 'static, V> Handle<T, V> {
195    /// Returns the current capacity of the channel.
196    pub fn capacity(&self) -> usize {
197        self.tx.capacity()
198    }
199
200    #[doc(hidden)]
201    pub async fn send_job(
202        &self,
203        call: ActorMethod<T>,
204        args: Box<dyn Any + Send>,
205    ) -> Box<dyn Any + Send> {
206        let (respond_to, get_result) = oneshot::channel();
207        let job = Job {
208            call,
209            args,
210            respond_to,
211        };
212        self.tx
213            .send(job)
214            .await
215            .expect("A panic occurred in the Actor");
216        get_result.await.expect("A panic occurred in the Actor")
217    }
218
219    /// Sends a closure to the actor, handling all boxing/unboxing internally.
220    async fn run<F, A, R>(&self, args: A, f: F) -> R
221    where
222        F: FnOnce(&mut Actor<T>, A) -> R + Send + 'static,
223        A: Send + 'static,
224        R: Send + 'static,
225    {
226        // ActorMethod requires FnMut because Box<dyn FnOnce> can't be called.
227        // We wrap f in Option so we can .take() it out of the FnMut closure.
228        // The unwrap is safe: send_job sends exactly one job, and serve()
229        // calls it exactly once, but the compiler just can't prove that so we need a work-around.
230        let mut f = Some(f);
231        let res = self
232            .send_job(
233                Box::new(move |s: &mut Actor<T>, boxed_args: Box<dyn Any + Send>| {
234                    let f = f.take().unwrap();
235                    Box::pin(async move {
236                        let args = *boxed_args.downcast::<A>().expect(DOWNCAST_FAIL);
237                        Box::new(f(s, args)) as Box<dyn Any + Send>
238                    })
239                }),
240                Box::new(args),
241            )
242            .await;
243        *res.downcast::<R>().expect(DOWNCAST_FAIL)
244    }
245
246    /// Overwrites the inner value of the actor with the new value.
247    /// Broadcasts the new value to all subscribers.
248    ///
249    /// # Examples
250    ///
251    /// ```
252    /// # use actify::Handle;
253    /// # #[tokio::main]
254    /// # async fn main() {
255    /// let handle = Handle::new(None);
256    /// handle.set(Some(1)).await;
257    /// assert_eq!(handle.get().await, Some(1));
258    /// # }
259    /// ```
260    pub async fn set(&self, val: T) {
261        self.run(val, |s, val| {
262            s.inner = val;
263            s.broadcast(&format!("{}::set", type_name::<T>()));
264        })
265        .await
266    }
267
268    /// Overwrites the inner value, but only broadcasts if it actually changed.
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// # use actify::Handle;
274    /// # #[tokio::main]
275    /// # async fn main() {
276    /// let handle = Handle::new(1);
277    /// let mut rx = handle.subscribe();
278    /// handle.set_if_changed(1).await; // Same value, no broadcast
279    /// handle.set_if_changed(2).await; // Different value, broadcasts
280    /// assert_eq!(rx.recv().await.unwrap(), 2);
281    /// # }
282    /// ```
283    pub async fn set_if_changed(&self, val: T)
284    where
285        T: PartialEq,
286    {
287        self.run(val, |s, val| {
288            if s.inner != val {
289                s.inner = val;
290                s.broadcast(&format!("{}::set", type_name::<T>()));
291            }
292        })
293        .await
294    }
295
296    /// Runs a read-only closure on the actor's value and returns the result.
297    /// Does not broadcast.
298    ///
299    /// This is useful for reading parts of the actor state without cloning
300    /// the entire value, and works with non-Clone types.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// # use actify::Handle;
306    /// # #[tokio::main]
307    /// # async fn main() {
308    /// let handle = Handle::new(vec![1, 2, 3]);
309    ///
310    /// // Extract just what you need, without cloning the whole Vec
311    /// let len = handle.with(|v| v.len()).await;
312    /// assert_eq!(len, 3);
313    ///
314    /// let first = handle.with(|v| v.first().copied()).await;
315    /// assert_eq!(first, Some(1));
316    /// # }
317    /// ```
318    pub async fn with<R, F>(&self, f: F) -> R
319    where
320        F: FnOnce(&T) -> R + Send + 'static,
321        R: Send + 'static,
322    {
323        self.run(f, |s, f| f(&s.inner)).await
324    }
325
326    /// Runs a closure on the actor's value mutably and returns the result.
327    ///
328    /// This is useful for atomic read-modify-return operations without
329    /// defining a dedicated `#[actify]` method.
330    ///
331    /// **Note:** This always broadcasts after the closure returns, even if
332    /// the closure did not actually mutate anything. Use [`Handle::with`]
333    /// for read-only access that does not broadcast.
334    ///
335    /// # Examples
336    ///
337    /// ```
338    /// # use actify::Handle;
339    /// # #[tokio::main]
340    /// # async fn main() {
341    /// let handle = Handle::new(vec![1, 2, 3]);
342    /// let mut rx = handle.subscribe();
343    ///
344    /// // Mutate and return a result in one atomic operation
345    /// let popped = handle.with_mut(|v| v.pop()).await;
346    /// assert_eq!(popped, Some(3));
347    /// assert_eq!(handle.get().await, vec![1, 2]);
348    ///
349    /// // The mutation triggered a broadcast
350    /// assert!(rx.try_recv().is_ok());
351    /// # }
352    /// ```
353    pub async fn with_mut<R, F>(&self, f: F) -> R
354    where
355        F: FnOnce(&mut T) -> R + Send + 'static,
356        R: Send + 'static,
357    {
358        self.run(f, |s, f| {
359            let result = f(&mut s.inner);
360            s.broadcast(&format!("{}::with_mut", type_name::<T>()));
361            result
362        })
363        .await
364    }
365}
366
367impl<T: Clone + Send + Sync + 'static, V> Handle<T, V> {
368    /// Receives a clone of the current value of the actor.
369    /// Does not broadcast.
370    ///
371    /// # Examples
372    ///
373    /// ```
374    /// # use actify::Handle;
375    /// # #[tokio::main]
376    /// # async fn main() {
377    /// let handle = Handle::new(1);
378    /// let result = handle.get().await;
379    /// assert_eq!(result, 1);
380    /// # }
381    /// ```
382    pub async fn get(&self) -> T {
383        self.run((), |s, _| s.inner.clone()).await
384    }
385}
386
387impl<T, V: Clone + Send + Sync + 'static> Handle<T, V> {
388    /// Creates a [`Cache`] initialized with the given value that locally synchronizes
389    /// with broadcasted updates from the actor.
390    /// As it is not initialized with the current value, any updates before construction are missed.
391    ///
392    /// See also [`Handle::create_cache`] for a cache initialized with the current actor value,
393    /// or [`Handle::create_cache_from_default`] to start from `V::default()`.
394    ///
395    /// # Examples
396    ///
397    /// ```
398    /// # use actify::Handle;
399    /// # #[tokio::main]
400    /// # async fn main() {
401    /// let handle = Handle::new(10);
402    /// let mut cache = handle.create_cache_from(42);
403    /// assert_eq!(cache.get_current(), &42);
404    ///
405    /// handle.set(99).await;
406    /// assert_eq!(cache.get_newest(), &99);
407    /// # }
408    /// ```
409    pub fn create_cache_from(&self, initial_value: V) -> Cache<V> {
410        Cache::new(self.subscribe(), initial_value)
411    }
412}
413
414impl<T, V: Default + Clone + Send + Sync + 'static> Handle<T, V> {
415    /// Creates a [`Cache`] initialized with `V::default()` that locally synchronizes
416    /// with broadcasted updates from the actor.
417    /// As it is not initialized with the current value, any updates before construction are missed.
418    ///
419    /// See also [`Handle::create_cache`] for a cache initialized with the current actor value,
420    /// or [`Handle::create_cache_from`] to start from a custom value.
421    pub fn create_cache_from_default(&self) -> Cache<V> {
422        self.create_cache_from(V::default())
423    }
424}
425
426impl<T, V> Handle<T, V>
427where
428    T: Clone + BroadcastAs<V> + Send + Sync + 'static,
429    V: Clone + Send + Sync + 'static,
430{
431    /// Creates an initialized [`Cache`] that locally synchronizes with the remote actor.
432    /// As it is initialized with the current value, any updates before construction are included.
433    ///
434    /// See also [`Handle::create_cache_from_default`] for a cache that starts from `V::default()`.
435    pub async fn create_cache(&self) -> Cache<V> {
436        let init = self.get().await;
437        Cache::new(self.subscribe(), init.to_broadcast())
438    }
439
440    /// Spawns a [`Throttle`] that fires given a specified [`Frequency`].
441    ///
442    /// The broadcast type must implement [`Throttled<F>`](crate::Throttled) to
443    /// convert the value into the callback argument.
444    ///
445    /// # Examples
446    ///
447    /// ```
448    /// # use actify::{Handle, Frequency};
449    /// # use std::sync::{Arc, Mutex};
450    /// # #[tokio::main]
451    /// # async fn main() {
452    /// struct Logger(Arc<Mutex<Vec<i32>>>);
453    /// impl Logger {
454    ///     fn log(&self, val: i32) { self.0.lock().unwrap().push(val); }
455    /// }
456    ///
457    /// let handle = Handle::new(1);
458    /// let values = Arc::new(Mutex::new(Vec::new()));
459    /// handle.spawn_throttle(Logger(values.clone()), Logger::log, Frequency::OnEvent).await;
460    ///
461    /// handle.set(2).await;
462    /// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
463    /// // Fires once with the current value on creation, then on each broadcast
464    /// assert_eq!(*values.lock().unwrap(), vec![1, 2]);
465    /// # }
466    /// ```
467    pub async fn spawn_throttle<C, F>(&self, client: C, call: fn(&C, F), freq: Frequency)
468    where
469        C: Send + Sync + 'static,
470        V: Throttled<F>,
471        F: Clone + Send + Sync + 'static,
472    {
473        let current = self.get().await;
474        let receiver = self.subscribe();
475        Throttle::spawn_from_receiver(client, call, freq, receiver, Some(current.to_broadcast()));
476    }
477}
478
479#[cfg(test)]
480mod tests {
481    use super::*;
482    use crate as actify;
483
484    #[tokio::test]
485    #[should_panic]
486    async fn test_handle_panic() {
487        let handle = Handle::new(PanicStruct {});
488        handle.panic().await;
489    }
490
491    #[derive(Debug, Clone)]
492    struct PanicStruct {}
493
494    #[actify_macros::actify]
495    impl PanicStruct {
496        fn panic(&self) {
497            panic!()
498        }
499    }
500
501    #[derive(Debug)]
502    struct NonCloneActor {
503        value: i32,
504    }
505
506    #[actify_macros::actify]
507    impl NonCloneActor {
508        fn get_value(&self) -> i32 {
509            self.value
510        }
511
512        fn set_value(&mut self, val: i32) {
513            self.value = val;
514        }
515    }
516
517    impl BroadcastAs<i32> for NonCloneActor {
518        fn to_broadcast(&self) -> i32 {
519            self.value
520        }
521    }
522
523    #[tokio::test]
524    async fn test_non_clone_actor() {
525        let handle: Handle<NonCloneActor, i32> = Handle::new(NonCloneActor { value: 42 });
526        assert_eq!(handle.get_value().await, 42);
527
528        handle.set_value(100).await;
529        assert_eq!(handle.get_value().await, 100);
530
531        let handle2 = handle.clone();
532        assert_eq!(handle2.get_value().await, 100);
533    }
534
535    #[tokio::test]
536    async fn test_non_clone_actor_with_broadcast() {
537        let handle: Handle<NonCloneActor, i32> = Handle::new(NonCloneActor { value: 42 });
538        let mut rx = handle.subscribe();
539
540        handle.set_value(100).await;
541        assert_eq!(rx.recv().await.unwrap(), 100);
542
543        handle.set(NonCloneActor { value: 45 }).await;
544        assert_eq!(rx.recv().await.unwrap(), 45);
545    }
546
547    #[derive(Clone, Debug, PartialEq)]
548    struct BigState {
549        data: Vec<u8>,
550        count: usize,
551    }
552
553    impl BroadcastAs<usize> for BigState {
554        fn to_broadcast(&self) -> usize {
555            self.count
556        }
557    }
558
559    #[tokio::test]
560    async fn test_with_does_not_broadcast() {
561        let handle = Handle::new(vec![1, 2, 3]);
562        let mut rx = handle.subscribe();
563
564        let _len = handle.with(|v| v.len()).await;
565        assert!(rx.try_recv().is_err());
566    }
567
568    #[tokio::test]
569    async fn test_with_mut_broadcasts_even_without_mutation() {
570        let handle = Handle::new(vec![1, 2, 3]);
571        let mut rx = handle.subscribe();
572
573        // Read-only operation through with_mut still broadcasts
574        let _len = handle.with_mut(|v| v.len()).await;
575        assert!(rx.try_recv().is_ok());
576    }
577
578    #[tokio::test]
579    async fn test_clone_actor_with_custom_broadcast() {
580        let handle: Handle<BigState, usize> = Handle::new(BigState {
581            data: vec![1, 2, 3],
582            count: 3,
583        });
584
585        let mut rx = handle.subscribe();
586
587        let val = handle.get().await;
588        assert_eq!(val.count, 3);
589
590        let new_big_state = BigState {
591            data: vec![1, 2, 3, 4],
592            count: 4,
593        };
594        handle.set(new_big_state.clone()).await;
595
596        let broadcast_val: usize = rx.recv().await.unwrap();
597        assert_eq!(broadcast_val, 4);
598
599        let big_state = handle.get().await;
600        assert_eq!(big_state, new_big_state);
601    }
602}