actify/handles/handle.rs
1use std::any::Any;
2use std::any::type_name;
3use std::fmt::{self, Debug};
4use tokio::sync::{broadcast, mpsc, oneshot};
5
6use super::read_handle::ReadHandle;
7use crate::actor::{Actor, ActorMethod, BroadcastFn, Job, serve};
8use crate::throttle::Throttle;
9use crate::{Cache, Frequency, Throttled};
10
11const CHANNEL_SIZE: usize = 100;
12const DOWNCAST_FAIL: &str =
13 "Actify Macro error: failed to downcast arguments to their concrete type";
14
15/// Defines how to convert an actor's value to its broadcast type.
16///
17/// A blanket implementation is provided for [`Clone`] types, broadcasting
18/// themselves. Implement this trait to broadcast a different type `V` from
19/// your actor type `T`, enabling:
20///
21/// - Non-Clone types to participate in broadcasting
22/// - Clone types to broadcast a lightweight summary instead of the full value
23///
24/// # Examples
25///
26/// ```
27/// use actify::BroadcastAs;
28///
29/// struct HeavyState {
30/// data: Vec<u8>,
31/// summary: String,
32/// }
33///
34/// #[derive(Clone, Debug)]
35/// struct Summary(String);
36///
37/// impl BroadcastAs<Summary> for HeavyState {
38/// fn to_broadcast(&self) -> Summary {
39/// Summary(self.summary.clone())
40/// }
41/// }
42/// ```
43pub trait BroadcastAs<V> {
44 fn to_broadcast(&self) -> V;
45}
46
47impl<T: Clone> BroadcastAs<T> for T {
48 fn to_broadcast(&self) -> T {
49 self.clone()
50 }
51}
52
53/// Creates the broadcast function that the [`Actor`] calls after each `&mut self` method.
54/// Converts the actor value to `V` via [`BroadcastAs`] and sends it to all subscribers.
55fn make_broadcast_fn<T, V>(sender: broadcast::Sender<V>) -> BroadcastFn<T>
56where
57 T: BroadcastAs<V>,
58 V: Clone + Send + Sync + 'static,
59{
60 Box::new(move |inner: &T, method: &str| {
61 if sender.receiver_count() > 0 {
62 if sender.send(inner.to_broadcast()).is_err() {
63 log::trace!("Broadcast failed because there are no active on {method:?}");
64 } else {
65 log::trace!("Broadcasted new value on {method:?}");
66 }
67 } else {
68 log::trace!("Skipping broadcast because there are no active receivers on {method:?}");
69 }
70 })
71}
72
73/// A clonable handle that can be used to remotely execute a closure on the corresponding [`Actor`].
74///
75/// Handles are the primary way to interact with actors. Clone them freely to share
76/// access across tasks. For read-only access, see [`ReadHandle`]. For local
77/// synchronization, see [`Cache`]. For rate-limited updates, see [`Throttle`].
78///
79/// The second type parameter `V` is the broadcast type. By default `V = T`,
80/// meaning the actor broadcasts clones of itself. To broadcast a different
81/// type, implement [`BroadcastAs<V>`] and specify `V` explicitly
82/// (e.g. `Handle::<MyType, Summary>::new(val)`).
83pub struct Handle<T, V = T> {
84 pub(super) tx: mpsc::Sender<Job<T>>,
85 pub(super) broadcast_sender: broadcast::Sender<V>,
86}
87
88impl<T, V> Clone for Handle<T, V> {
89 fn clone(&self) -> Self {
90 Handle {
91 tx: self.tx.clone(),
92 broadcast_sender: self.broadcast_sender.clone(),
93 }
94 }
95}
96
97impl<T, V> Debug for Handle<T, V> {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(f, "Handle<{}>", type_name::<T>())
100 }
101}
102
103impl<T: Default + Clone + Send + Sync + 'static> Default for Handle<T> {
104 fn default() -> Self {
105 Handle::new(T::default())
106 }
107}
108
109impl<T, V> Handle<T, V>
110where
111 T: BroadcastAs<V> + Send + Sync + 'static,
112 V: Clone + Send + Sync + 'static,
113{
114 /// Creates a new [`Handle`] and spawns the corresponding [`Actor`].
115 ///
116 /// For `Clone` types, `V` defaults to `T` — the actor broadcasts clones of
117 /// itself and you can simply write `Handle::new(val)`.
118 ///
119 /// For non-Clone types (or to broadcast a lightweight summary), implement
120 /// [`BroadcastAs<V>`] and specify `V` explicitly:
121 ///
122 /// ```
123 /// # use actify::{Handle, BroadcastAs};
124 /// # #[tokio::main]
125 /// # async fn main() {
126 /// #[derive(Clone, Debug, PartialEq)]
127 /// struct Size(usize);
128 ///
129 /// impl BroadcastAs<Size> for Vec<u8> {
130 /// fn to_broadcast(&self) -> Size { Size(self.len()) }
131 /// }
132 ///
133 /// let handle: Handle<Vec<u8>, Size> = Handle::new(vec![1, 2, 3]);
134 /// let mut rx = handle.subscribe();
135 /// # }
136 /// ```
137 pub fn new(val: T) -> Handle<T, V> {
138 let (tx, rx) = mpsc::channel(CHANNEL_SIZE);
139 let (broadcast_tx, _) = broadcast::channel::<V>(CHANNEL_SIZE);
140 tokio::spawn(serve(
141 rx,
142 Actor::new(make_broadcast_fn(broadcast_tx.clone()), val),
143 ));
144 Handle {
145 tx,
146 broadcast_sender: broadcast_tx,
147 }
148 }
149
150 /// Creates a new [`Handle`] and initializes a corresponding [`Throttle`].
151 /// The throttle fires given a specified [`Frequency`].
152 /// See [`Handle::spawn_throttle`] for an example.
153 pub fn new_throttled<C, F>(val: T, client: C, call: fn(&C, F), freq: Frequency) -> Handle<T, V>
154 where
155 C: Send + Sync + 'static,
156 V: Throttled<F>,
157 F: Clone + Send + Sync + 'static,
158 {
159 let init = val.to_broadcast();
160 let handle = Self::new(val);
161 let receiver = handle.subscribe();
162 Throttle::spawn_from_receiver(client, call, freq, receiver, Some(init));
163 handle
164 }
165}
166
167impl<T, V> Handle<T, V> {
168 /// Returns a [`tokio::sync::broadcast::Receiver`] that receives all broadcasted values.
169 /// Note that the inner value might not actually have changed.
170 /// It broadcasts on any method that has a mutable reference to the actor.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// # use actify::Handle;
176 /// # #[tokio::main]
177 /// # async fn main() {
178 /// let handle = Handle::new(None);
179 /// let mut rx = handle.subscribe();
180 /// handle.set(Some("testing!")).await;
181 /// assert_eq!(rx.recv().await.unwrap(), Some("testing!"));
182 /// # }
183 /// ```
184 pub fn subscribe(&self) -> broadcast::Receiver<V> {
185 self.broadcast_sender.subscribe()
186 }
187
188 /// Returns a [`ReadHandle`] that provides read-only access to this actor.
189 pub fn get_read_handle(&self) -> ReadHandle<T, V> {
190 ReadHandle::new(self.clone())
191 }
192}
193
194impl<T: Send + Sync + 'static, V> Handle<T, V> {
195 /// Returns the current capacity of the channel.
196 pub fn capacity(&self) -> usize {
197 self.tx.capacity()
198 }
199
200 #[doc(hidden)]
201 pub async fn send_job(
202 &self,
203 call: ActorMethod<T>,
204 args: Box<dyn Any + Send>,
205 ) -> Box<dyn Any + Send> {
206 let (respond_to, get_result) = oneshot::channel();
207 let job = Job {
208 call,
209 args,
210 respond_to,
211 };
212 self.tx
213 .send(job)
214 .await
215 .expect("A panic occurred in the Actor");
216 get_result.await.expect("A panic occurred in the Actor")
217 }
218
219 /// Sends a closure to the actor, handling all boxing/unboxing internally.
220 async fn run<F, A, R>(&self, args: A, f: F) -> R
221 where
222 F: FnOnce(&mut Actor<T>, A) -> R + Send + 'static,
223 A: Send + 'static,
224 R: Send + 'static,
225 {
226 // ActorMethod requires FnMut because Box<dyn FnOnce> can't be called.
227 // We wrap f in Option so we can .take() it out of the FnMut closure.
228 // The unwrap is safe: send_job sends exactly one job, and serve()
229 // calls it exactly once, but the compiler just can't prove that so we need a work-around.
230 let mut f = Some(f);
231 let res = self
232 .send_job(
233 Box::new(move |s: &mut Actor<T>, boxed_args: Box<dyn Any + Send>| {
234 let f = f.take().unwrap();
235 Box::pin(async move {
236 let args = *boxed_args.downcast::<A>().expect(DOWNCAST_FAIL);
237 Box::new(f(s, args)) as Box<dyn Any + Send>
238 })
239 }),
240 Box::new(args),
241 )
242 .await;
243 *res.downcast::<R>().expect(DOWNCAST_FAIL)
244 }
245
246 /// Overwrites the inner value of the actor with the new value.
247 /// Broadcasts the new value to all subscribers.
248 ///
249 /// # Examples
250 ///
251 /// ```
252 /// # use actify::Handle;
253 /// # #[tokio::main]
254 /// # async fn main() {
255 /// let handle = Handle::new(None);
256 /// handle.set(Some(1)).await;
257 /// assert_eq!(handle.get().await, Some(1));
258 /// # }
259 /// ```
260 pub async fn set(&self, val: T) {
261 self.run(val, |s, val| {
262 s.inner = val;
263 s.broadcast(&format!("{}::set", type_name::<T>()));
264 })
265 .await
266 }
267
268 /// Overwrites the inner value, but only broadcasts if it actually changed.
269 ///
270 /// # Examples
271 ///
272 /// ```
273 /// # use actify::Handle;
274 /// # #[tokio::main]
275 /// # async fn main() {
276 /// let handle = Handle::new(1);
277 /// let mut rx = handle.subscribe();
278 /// handle.set_if_changed(1).await; // Same value, no broadcast
279 /// handle.set_if_changed(2).await; // Different value, broadcasts
280 /// assert_eq!(rx.recv().await.unwrap(), 2);
281 /// # }
282 /// ```
283 pub async fn set_if_changed(&self, val: T)
284 where
285 T: PartialEq,
286 {
287 self.run(val, |s, val| {
288 if s.inner != val {
289 s.inner = val;
290 s.broadcast(&format!("{}::set", type_name::<T>()));
291 }
292 })
293 .await
294 }
295
296 /// Runs a read-only closure on the actor's value and returns the result.
297 /// Does not broadcast.
298 ///
299 /// This is useful for reading parts of the actor state without cloning
300 /// the entire value, and works with non-Clone types.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// # use actify::Handle;
306 /// # #[tokio::main]
307 /// # async fn main() {
308 /// let handle = Handle::new(vec![1, 2, 3]);
309 ///
310 /// // Extract just what you need, without cloning the whole Vec
311 /// let len = handle.with(|v| v.len()).await;
312 /// assert_eq!(len, 3);
313 ///
314 /// let first = handle.with(|v| v.first().copied()).await;
315 /// assert_eq!(first, Some(1));
316 /// # }
317 /// ```
318 pub async fn with<R, F>(&self, f: F) -> R
319 where
320 F: FnOnce(&T) -> R + Send + 'static,
321 R: Send + 'static,
322 {
323 self.run(f, |s, f| f(&s.inner)).await
324 }
325
326 /// Runs a closure on the actor's value mutably and returns the result.
327 ///
328 /// This is useful for atomic read-modify-return operations without
329 /// defining a dedicated `#[actify]` method.
330 ///
331 /// **Note:** This always broadcasts after the closure returns, even if
332 /// the closure did not actually mutate anything. Use [`Handle::with`]
333 /// for read-only access that does not broadcast.
334 ///
335 /// # Examples
336 ///
337 /// ```
338 /// # use actify::Handle;
339 /// # #[tokio::main]
340 /// # async fn main() {
341 /// let handle = Handle::new(vec![1, 2, 3]);
342 /// let mut rx = handle.subscribe();
343 ///
344 /// // Mutate and return a result in one atomic operation
345 /// let popped = handle.with_mut(|v| v.pop()).await;
346 /// assert_eq!(popped, Some(3));
347 /// assert_eq!(handle.get().await, vec![1, 2]);
348 ///
349 /// // The mutation triggered a broadcast
350 /// assert!(rx.try_recv().is_ok());
351 /// # }
352 /// ```
353 pub async fn with_mut<R, F>(&self, f: F) -> R
354 where
355 F: FnOnce(&mut T) -> R + Send + 'static,
356 R: Send + 'static,
357 {
358 self.run(f, |s, f| {
359 let result = f(&mut s.inner);
360 s.broadcast(&format!("{}::with_mut", type_name::<T>()));
361 result
362 })
363 .await
364 }
365}
366
367impl<T: Clone + Send + Sync + 'static, V> Handle<T, V> {
368 /// Receives a clone of the current value of the actor.
369 /// Does not broadcast.
370 ///
371 /// # Examples
372 ///
373 /// ```
374 /// # use actify::Handle;
375 /// # #[tokio::main]
376 /// # async fn main() {
377 /// let handle = Handle::new(1);
378 /// let result = handle.get().await;
379 /// assert_eq!(result, 1);
380 /// # }
381 /// ```
382 pub async fn get(&self) -> T {
383 self.run((), |s, _| s.inner.clone()).await
384 }
385}
386
387impl<T, V: Clone + Send + Sync + 'static> Handle<T, V> {
388 /// Creates a [`Cache`] initialized with the given value that locally synchronizes
389 /// with broadcasted updates from the actor.
390 /// As it is not initialized with the current value, any updates before construction are missed.
391 ///
392 /// See also [`Handle::create_cache`] for a cache initialized with the current actor value,
393 /// or [`Handle::create_cache_from_default`] to start from `V::default()`.
394 ///
395 /// # Examples
396 ///
397 /// ```
398 /// # use actify::Handle;
399 /// # #[tokio::main]
400 /// # async fn main() {
401 /// let handle = Handle::new(10);
402 /// let mut cache = handle.create_cache_from(42);
403 /// assert_eq!(cache.get_current(), &42);
404 ///
405 /// handle.set(99).await;
406 /// assert_eq!(cache.get_newest(), &99);
407 /// # }
408 /// ```
409 pub fn create_cache_from(&self, initial_value: V) -> Cache<V> {
410 Cache::new(self.subscribe(), initial_value)
411 }
412}
413
414impl<T, V: Default + Clone + Send + Sync + 'static> Handle<T, V> {
415 /// Creates a [`Cache`] initialized with `V::default()` that locally synchronizes
416 /// with broadcasted updates from the actor.
417 /// As it is not initialized with the current value, any updates before construction are missed.
418 ///
419 /// See also [`Handle::create_cache`] for a cache initialized with the current actor value,
420 /// or [`Handle::create_cache_from`] to start from a custom value.
421 pub fn create_cache_from_default(&self) -> Cache<V> {
422 self.create_cache_from(V::default())
423 }
424}
425
426impl<T, V> Handle<T, V>
427where
428 T: Clone + BroadcastAs<V> + Send + Sync + 'static,
429 V: Clone + Send + Sync + 'static,
430{
431 /// Creates an initialized [`Cache`] that locally synchronizes with the remote actor.
432 /// As it is initialized with the current value, any updates before construction are included.
433 ///
434 /// See also [`Handle::create_cache_from_default`] for a cache that starts from `V::default()`.
435 pub async fn create_cache(&self) -> Cache<V> {
436 let init = self.get().await;
437 Cache::new(self.subscribe(), init.to_broadcast())
438 }
439
440 /// Spawns a [`Throttle`] that fires given a specified [`Frequency`].
441 ///
442 /// The broadcast type must implement [`Throttled<F>`](crate::Throttled) to
443 /// convert the value into the callback argument.
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// # use actify::{Handle, Frequency};
449 /// # use std::sync::{Arc, Mutex};
450 /// # #[tokio::main]
451 /// # async fn main() {
452 /// struct Logger(Arc<Mutex<Vec<i32>>>);
453 /// impl Logger {
454 /// fn log(&self, val: i32) { self.0.lock().unwrap().push(val); }
455 /// }
456 ///
457 /// let handle = Handle::new(1);
458 /// let values = Arc::new(Mutex::new(Vec::new()));
459 /// handle.spawn_throttle(Logger(values.clone()), Logger::log, Frequency::OnEvent).await;
460 ///
461 /// handle.set(2).await;
462 /// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
463 /// // Fires once with the current value on creation, then on each broadcast
464 /// assert_eq!(*values.lock().unwrap(), vec![1, 2]);
465 /// # }
466 /// ```
467 pub async fn spawn_throttle<C, F>(&self, client: C, call: fn(&C, F), freq: Frequency)
468 where
469 C: Send + Sync + 'static,
470 V: Throttled<F>,
471 F: Clone + Send + Sync + 'static,
472 {
473 let current = self.get().await;
474 let receiver = self.subscribe();
475 Throttle::spawn_from_receiver(client, call, freq, receiver, Some(current.to_broadcast()));
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482 use crate as actify;
483
484 #[tokio::test]
485 #[should_panic]
486 async fn test_handle_panic() {
487 let handle = Handle::new(PanicStruct {});
488 handle.panic().await;
489 }
490
491 #[derive(Debug, Clone)]
492 struct PanicStruct {}
493
494 #[actify_macros::actify]
495 impl PanicStruct {
496 fn panic(&self) {
497 panic!()
498 }
499 }
500
501 #[derive(Debug)]
502 struct NonCloneActor {
503 value: i32,
504 }
505
506 #[actify_macros::actify]
507 impl NonCloneActor {
508 fn get_value(&self) -> i32 {
509 self.value
510 }
511
512 fn set_value(&mut self, val: i32) {
513 self.value = val;
514 }
515 }
516
517 impl BroadcastAs<i32> for NonCloneActor {
518 fn to_broadcast(&self) -> i32 {
519 self.value
520 }
521 }
522
523 #[tokio::test]
524 async fn test_non_clone_actor() {
525 let handle: Handle<NonCloneActor, i32> = Handle::new(NonCloneActor { value: 42 });
526 assert_eq!(handle.get_value().await, 42);
527
528 handle.set_value(100).await;
529 assert_eq!(handle.get_value().await, 100);
530
531 let handle2 = handle.clone();
532 assert_eq!(handle2.get_value().await, 100);
533 }
534
535 #[tokio::test]
536 async fn test_non_clone_actor_with_broadcast() {
537 let handle: Handle<NonCloneActor, i32> = Handle::new(NonCloneActor { value: 42 });
538 let mut rx = handle.subscribe();
539
540 handle.set_value(100).await;
541 assert_eq!(rx.recv().await.unwrap(), 100);
542
543 handle.set(NonCloneActor { value: 45 }).await;
544 assert_eq!(rx.recv().await.unwrap(), 45);
545 }
546
547 #[derive(Clone, Debug, PartialEq)]
548 struct BigState {
549 data: Vec<u8>,
550 count: usize,
551 }
552
553 impl BroadcastAs<usize> for BigState {
554 fn to_broadcast(&self) -> usize {
555 self.count
556 }
557 }
558
559 #[tokio::test]
560 async fn test_with_does_not_broadcast() {
561 let handle = Handle::new(vec![1, 2, 3]);
562 let mut rx = handle.subscribe();
563
564 let _len = handle.with(|v| v.len()).await;
565 assert!(rx.try_recv().is_err());
566 }
567
568 #[tokio::test]
569 async fn test_with_mut_broadcasts_even_without_mutation() {
570 let handle = Handle::new(vec![1, 2, 3]);
571 let mut rx = handle.subscribe();
572
573 // Read-only operation through with_mut still broadcasts
574 let _len = handle.with_mut(|v| v.len()).await;
575 assert!(rx.try_recv().is_ok());
576 }
577
578 #[tokio::test]
579 async fn test_clone_actor_with_custom_broadcast() {
580 let handle: Handle<BigState, usize> = Handle::new(BigState {
581 data: vec![1, 2, 3],
582 count: 3,
583 });
584
585 let mut rx = handle.subscribe();
586
587 let val = handle.get().await;
588 assert_eq!(val.count, 3);
589
590 let new_big_state = BigState {
591 data: vec![1, 2, 3, 4],
592 count: 4,
593 };
594 handle.set(new_big_state.clone()).await;
595
596 let broadcast_val: usize = rx.recv().await.unwrap();
597 assert_eq!(broadcast_val, 4);
598
599 let big_state = handle.get().await;
600 assert_eq!(big_state, new_big_state);
601 }
602}