1#![warn(clippy::clone_on_ref_ptr, clippy::todo)]
2
3use flume::{Receiver, RecvError, Selector, Sender, select::SelectError};
63use log::*;
64use parking_lot::{Mutex, RwLock};
65use std::{
66 any::{TypeId, type_name},
67 collections::HashMap,
68 fmt,
69 ops::Deref,
70 sync::Arc,
71 thread,
72 time::{Duration, Instant},
73};
74
75#[cfg(feature = "async")]
76pub mod r#async;
77pub mod timed;
78
79#[cfg(feature = "async")]
81pub use r#async::AsyncActor;
82
83const CONTROL_CHANNEL_CAPACITY: usize = 5;
85
86#[derive(Debug)]
87pub enum ActorError {
88 SystemStopped { actor_name: &'static str },
90 SpawnFailed { actor_name: &'static str },
92 ActorPanic,
94}
95
96impl fmt::Display for ActorError {
97 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98 match self {
99 ActorError::SystemStopped { actor_name } => {
100 write!(f, "the system is not running; the actor {actor_name} can not be started")
101 },
102 ActorError::SpawnFailed { actor_name } => {
103 write!(f, "failed to spawn a thread for the actor {actor_name}")
104 },
105 ActorError::ActorPanic => {
106 write!(f, "panic inside an actor thread; see above for more verbose logs")
107 },
108 }
109 }
110}
111
112impl std::error::Error for ActorError {}
113
114#[derive(Debug, Clone, Copy)]
116pub struct SendError {
117 pub recipient_name: &'static str,
119 pub priority: Priority,
121 pub reason: SendErrorReason,
123}
124
125impl fmt::Display for SendError {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 let recipient_name = self.recipient_name;
128 let priority = self.priority;
129 match self.reason {
130 SendErrorReason::Full => {
131 write!(
132 f,
133 "the capacity of {recipient_name}'s {priority:?}-priority channel is full"
134 )
135 },
136 SendErrorReason::Disconnected => DisconnectedError { recipient_name, priority }.fmt(f),
137 }
138 }
139}
140
141impl std::error::Error for SendError {}
142
143#[derive(Debug)]
145pub struct PublishError(pub Vec<SendError>);
146
147impl fmt::Display for PublishError {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 let error_strings: Vec<String> = self.0.iter().map(ToString::to_string).collect();
150 write!(
151 f,
152 "failed to deliver an event to {} subscribers: {}",
153 self.0.len(),
154 error_strings.join(", ")
155 )
156 }
157}
158
159impl std::error::Error for PublishError {}
160
161#[derive(Debug, Clone, Copy)]
163pub struct DisconnectedError {
164 pub recipient_name: &'static str,
166 pub priority: Priority,
168}
169
170impl fmt::Display for DisconnectedError {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 write!(f, "the recipient of the message ({}) no longer exists", self.recipient_name)
173 }
174}
175
176impl std::error::Error for DisconnectedError {}
177
178#[derive(Debug, Clone, Copy)]
180pub enum SendErrorReason {
181 Full,
183 Disconnected,
185}
186
187impl<M> From<flume::TrySendError<M>> for SendErrorReason {
188 fn from(orig: flume::TrySendError<M>) -> Self {
189 match orig {
190 flume::TrySendError::Full(_) => Self::Full,
191 flume::TrySendError::Disconnected(_) => Self::Disconnected,
192 }
193 }
194}
195
196#[derive(Default)]
202pub struct System {
203 handle: SystemHandle,
204}
205
206type SystemCallback = Box<dyn Fn() -> Result<(), ActorError> + Send + Sync>;
207type EventCallback = Box<dyn Fn(&dyn std::any::Any) -> Result<(), SendError> + Send + Sync>;
208
209#[derive(Default)]
210pub struct SystemCallbacks {
211 pub preshutdown: Option<SystemCallback>,
212 pub postshutdown: Option<SystemCallback>,
213}
214
215#[derive(Debug, Default, PartialEq)]
216enum SystemState {
217 #[default]
219 Running,
220
221 ShuttingDown,
224
225 Stopped,
229}
230
231impl SystemState {
232 fn is_running(&self) -> bool {
234 match self {
235 SystemState::Running => true,
236 SystemState::ShuttingDown | SystemState::Stopped => false,
237 }
238 }
239}
240
241pub trait Event: Clone + std::any::Any + Send + Sync {}
244
245#[derive(Default)]
246struct EventSubscribers {
247 events: HashMap<TypeId, Vec<EventCallback>>,
248 last_value_cache: dashmap::DashMap<TypeId, Box<dyn std::any::Any + Send + Sync>>,
253}
254
255impl EventSubscribers {
256 fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&mut self, recipient: Recipient<M>) {
257 let subs = self.events.entry(TypeId::of::<E>()).or_default();
258 subs.push(Box::new(move |e| {
259 if let Some(event) = e.downcast_ref::<E>() {
260 let msg = event.clone();
261 recipient.send(msg.into())?;
262 }
263 Ok(())
264 }));
265 }
266}
267
268#[derive(Default, Clone)]
271pub struct SystemHandle {
272 name: String,
273 registry: Arc<Mutex<Vec<RegistryEntry>>>,
274 system_state: Arc<RwLock<SystemState>>,
275 callbacks: Arc<SystemCallbacks>,
276
277 event_subscribers: Arc<RwLock<EventSubscribers>>,
278}
279
280pub struct Context<M> {
287 bare: BareContext<M>,
288 receive_deadline: Option<Instant>,
289}
290
291impl<M> Context<M> {
292 fn new(system_handle: SystemHandle, myself: Recipient<M>) -> Self {
293 Self { bare: BareContext { system_handle, myself }, receive_deadline: None }
294 }
295
296 pub fn deadline(&self) -> &Option<Instant> {
299 &self.receive_deadline
300 }
301
302 pub fn set_deadline(&mut self, deadline: Option<Instant>) {
306 self.receive_deadline = deadline;
307 }
308
309 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
312 self.set_deadline(timeout.map(|t| Instant::now() + t));
313 }
314}
315
316impl<M> Deref for Context<M> {
317 type Target = BareContext<M>;
318
319 fn deref(&self) -> &BareContext<M> {
320 &self.bare
321 }
322}
323
324pub struct BareContext<M> {
328 pub system_handle: SystemHandle,
329 pub myself: Recipient<M>,
330}
331
332impl<M: 'static> BareContext<M> {
333 pub fn subscribe<E: Event + Into<M>>(&self) {
339 self.system_handle.subscribe_recipient::<M, E>(self.myself.clone());
340 }
341
342 pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError> {
352 self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.clone())
353 }
354}
355
356impl<M> Clone for BareContext<M> {
359 fn clone(&self) -> Self {
360 Self { system_handle: self.system_handle.clone(), myself: self.myself.clone() }
361 }
362}
363
364#[derive(Clone, Copy, Debug, Default)]
366pub struct Capacity {
367 pub normal: usize,
368 pub high: usize,
369}
370
371impl From<usize> for Capacity {
373 fn from(capacity: usize) -> Self {
374 Self { normal: capacity, high: capacity }
375 }
376}
377
378#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \
382 configure this builder"]
383pub struct SpawnBuilderWithoutAddress<'a, A: Actor, F: FnOnce() -> A> {
384 system: &'a mut System,
385 factory: F,
386}
387
388impl<'a, A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
389 SpawnBuilderWithoutAddress<'a, A, F>
390{
391 pub fn with_addr(self, addr: Addr<A::Message>) -> SpawnBuilderWithAddress<'a, A, F> {
393 SpawnBuilderWithAddress { spawn_builder: self, addr }
394 }
395
396 pub fn with_capacity(self, capacity: impl Into<Capacity>) -> SpawnBuilderWithAddress<'a, A, F> {
398 let addr = A::addr_with_capacity(capacity);
399 SpawnBuilderWithAddress { spawn_builder: self, addr }
400 }
401
402 pub fn with_default_capacity(self) -> SpawnBuilderWithAddress<'a, A, F> {
404 let addr = A::addr();
405 SpawnBuilderWithAddress { spawn_builder: self, addr }
406 }
407}
408
409#[must_use = "You must call .spawn() or .run_and_block() to run the actor"]
413pub struct SpawnBuilderWithAddress<'a, A: Actor, F: FnOnce() -> A> {
414 spawn_builder: SpawnBuilderWithoutAddress<'a, A, F>,
415 addr: Addr<A::Message>,
416}
417
418impl<A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
419 SpawnBuilderWithAddress<'_, A, F>
420{
421 pub fn run_and_block(self) -> Result<(), ActorError> {
425 let factory = self.spawn_builder.factory;
426 self.spawn_builder.system.block_on(factory(), self.addr)
427 }
428}
429
430impl<A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A + Send + 'static>
431 SpawnBuilderWithAddress<'_, A, F>
432{
433 pub fn spawn(self) -> Result<Addr<A::Message>, ActorError> {
435 let builder = self.spawn_builder;
436 builder.system.spawn_fn_with_addr(builder.factory, self.addr.clone())?;
437 Ok(self.addr)
438 }
439}
440
441impl System {
442 pub fn new(name: &str) -> Self {
444 System::with_callbacks(name, Default::default())
445 }
446
447 pub fn with_callbacks(name: &str, callbacks: SystemCallbacks) -> Self {
448 Self {
449 handle: SystemHandle {
450 name: name.to_owned(),
451 callbacks: Arc::new(callbacks),
452 ..SystemHandle::default()
453 },
454 }
455 }
456
457 pub fn prepare<A>(
460 &mut self,
461 actor: A,
462 ) -> SpawnBuilderWithoutAddress<'_, A, impl FnOnce() -> A + use<A>>
463 where
464 A: Actor,
465 {
466 SpawnBuilderWithoutAddress { system: self, factory: move || actor }
467 }
468
469 pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilderWithoutAddress<'_, A, F>
475 where
476 A: Actor,
477 F: FnOnce() -> A + Send,
478 {
479 SpawnBuilderWithoutAddress { system: self, factory }
480 }
481
482 pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A::Message>, ActorError>
486 where
487 A: Actor<Context = Context<<A as Actor>::Message>> + Send + 'static,
488 {
489 self.prepare(actor).with_default_capacity().spawn()
490 }
491
492 fn spawn_fn_with_addr<F, A>(
497 &mut self,
498 factory: F,
499 addr: Addr<A::Message>,
500 ) -> Result<(), ActorError>
501 where
502 F: FnOnce() -> A + Send + 'static,
503 A: Actor<Context = Context<<A as Actor>::Message>>,
504 {
505 let system_state_lock = self.handle.system_state.read();
508 if !system_state_lock.is_running() {
509 return Err(ActorError::SystemStopped { actor_name: A::name() });
510 }
511
512 let system_handle = self.handle.clone();
513 let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
514 let control_addr = addr.control_tx.clone();
515
516 let thread_handle = thread::Builder::new()
517 .name(A::name().into())
518 .spawn(move || {
519 let mut actor = factory();
520
521 if let Err(error) = actor.started(&mut context) {
522 Self::report_error_shutdown(&system_handle, A::name(), "started()", error);
523 return;
524 }
525 debug!("[{}] started actor: {}", system_handle.name, A::name());
526
527 Self::run_actor_select_loop(actor, addr, &mut context, &system_handle);
528 })
529 .map_err(|_| ActorError::SpawnFailed { actor_name: A::name() })?;
530
531 self.handle
532 .registry
533 .lock()
534 .push(RegistryEntry::BackgroundThread(control_addr, thread_handle));
535
536 Ok(())
537 }
538
539 pub fn run(&mut self) -> Result<(), ActorError> {
541 while *self.system_state.read() != SystemState::Stopped {
542 thread::sleep(Duration::from_millis(10));
543 }
544
545 Ok(())
546 }
547
548 fn block_on<A>(&mut self, mut actor: A, addr: Addr<A::Message>) -> Result<(), ActorError>
551 where
552 A: Actor<Context = Context<<A as Actor>::Message>>,
553 {
554 if !self.is_running() {
556 return Err(ActorError::SystemStopped { actor_name: A::name() });
557 }
558
559 let system_handle = &self.handle;
560 let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
561
562 self.handle
563 .registry
564 .lock()
565 .push(RegistryEntry::InPlace(addr.control_tx.clone(), thread::current()));
566
567 match actor.started(&mut context) {
568 Ok(()) => {
569 debug!("[{}] started actor: {}", system_handle.name, A::name());
570 Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
571 },
572 Err(error) => Self::report_error_shutdown(system_handle, A::name(), "started()", error),
573 }
574
575 while *self.system_state.read() != SystemState::Stopped {
580 thread::sleep(Duration::from_millis(10));
581 }
582
583 Ok(())
584 }
585
586 fn run_actor_select_loop<A>(
588 mut actor: A,
589 addr: Addr<A::Message>,
590 context: &mut Context<A::Message>,
591 system_handle: &SystemHandle,
592 ) where
593 A: Actor<Context = Context<<A as Actor>::Message>>,
594 {
595 enum Received<M> {
597 Control(Control),
598 Message(M),
599 Timeout,
600 }
601
602 loop {
603 let selector = Selector::new()
607 .recv(&addr.control_rx, |msg| match msg {
608 Ok(control) => Received::Control(control),
609 Err(RecvError::Disconnected) => {
610 panic!("We keep control_tx alive through addr, should not happen.")
611 },
612 })
613 .recv(&addr.priority_rx, |msg| match msg {
614 Ok(msg) => Received::Message(msg),
615 Err(RecvError::Disconnected) => {
616 panic!("We keep priority_tx alive through addr, should not happen.")
617 },
618 })
619 .recv(&addr.message_rx, |msg| match msg {
620 Ok(msg) => Received::Message(msg),
621 Err(RecvError::Disconnected) => {
622 panic!("We keep message_tx alive through addr, should not happen.")
623 },
624 });
625
626 let received = if let Some(deadline) = context.receive_deadline {
628 match selector.wait_deadline(deadline) {
629 Ok(received) => received,
630 Err(SelectError::Timeout) => Received::Timeout,
631 }
632 } else {
633 selector.wait()
634 };
635
636 match received {
638 Received::Control(Control::Stop) => {
639 if let Err(error) = actor.stopped(context) {
640 Self::report_error_shutdown(system_handle, A::name(), "stopped()", error);
642 }
643 debug!("[{}] stopped actor: {}", system_handle.name, A::name());
644 return;
645 },
646 Received::Message(msg) => {
647 trace!("[{}] message received by {}", system_handle.name, A::name());
648 if let Err(error) = actor.handle(context, msg) {
649 Self::report_error_shutdown(system_handle, A::name(), "handle()", error);
650 return;
651 }
652 },
653 Received::Timeout => {
654 let deadline = context.receive_deadline.take().expect("implied by timeout");
655 if let Err(error) = actor.deadline_passed(context, deadline) {
656 Self::report_error_shutdown(
657 system_handle,
658 A::name(),
659 "deadline_passed()",
660 error,
661 );
662 return;
663 }
664 },
665 }
666 }
667 }
668
669 fn report_error_shutdown(
670 system_handle: &SystemHandle,
671 actor_name: &str,
672 action: &str,
673 error: impl std::fmt::Display,
674 ) {
675 let system_name = &system_handle.name;
676
677 if system_handle.system_state.read().is_running() {
680 error!(
681 "[{system_name}] {actor_name} {action} error: {error:#}. Shutting down the actor \
682 system."
683 );
684 let _ = system_handle.shutdown();
685 } else {
686 warn!(
687 "[{system_name}] {actor_name} {action} error while shutting down: {error:#}. \
688 Ignoring."
689 );
690 }
691 }
692}
693
694impl Drop for System {
695 fn drop(&mut self) {
696 self.shutdown().unwrap();
697 }
698}
699
700impl Deref for System {
701 type Target = SystemHandle;
702
703 fn deref(&self) -> &Self::Target {
704 &self.handle
705 }
706}
707
708impl SystemHandle {
709 pub fn shutdown(&self) -> Result<(), ActorError> {
711 let shutdown_start = Instant::now();
712
713 let current_thread = thread::current();
714 let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
715
716 {
718 let mut system_state_lock = self.system_state.write();
719
720 if system_state_lock.is_running() {
721 info!(
722 "[{}] thread {} shutting down the actor system.",
723 self.name, current_thread_name,
724 );
725 *system_state_lock = SystemState::ShuttingDown;
726 } else {
727 trace!(
728 "[{}] thread {} called system.shutdown() but the system is already shutting \
729 down or stopped.",
730 self.name, current_thread_name,
731 );
732 return Ok(());
733 }
734 }
735
736 if let Some(callback) = self.callbacks.preshutdown.as_ref() {
737 info!("[{}] calling pre-shutdown callback.", self.name);
738 if let Err(err) = callback() {
739 warn!("[{}] pre-shutdown callback failed, reason: {}", self.name, err);
740 }
741 }
742
743 let err_count = {
744 let mut registry = self.registry.lock();
745 debug!("[{}] joining {} actor threads.", self.name, registry.len());
746
747 for entry in registry.iter_mut().rev() {
752 let actor_name = entry.name();
753
754 if let Err(e) = entry.control_addr().send(Control::Stop) {
755 warn!(
756 "Couldn't send Control::Stop to {actor_name} to shut it down: {e:#}. \
757 Ignoring and proceeding."
758 );
759 }
760 }
761
762 registry
763 .drain(..)
764 .enumerate()
765 .rev()
766 .filter_map(|(i, entry)| {
767 let actor_name = entry.name();
768
769 match entry {
770 RegistryEntry::InPlace(_, _) => {
771 debug!(
772 "[{}] [{i}] skipping join of an actor running in-place: \
773 {actor_name}",
774 self.name
775 );
776 None
777 },
778 RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
779 if thread_handle.thread().id() == current_thread.id() {
780 debug!(
781 "[{}] [{i}] skipping join of the actor thread currently \
782 executing SystemHandle::shutdown(): {actor_name}",
783 self.name,
784 );
785 return None;
786 }
787
788 debug!("[{}] [{}] joining actor thread: {}", self.name, i, actor_name);
789
790 let join_result = thread_handle.join().map_err(|e| {
791 error!("a panic inside actor thread {actor_name}: {e:?}")
792 });
793
794 debug!("[{}] [{}] joined actor thread: {}", self.name, i, actor_name);
795 join_result.err()
796 },
797 }
798 })
799 .count()
800 };
801
802 info!("[{}] system finished shutting down in {:?}.", self.name, shutdown_start.elapsed());
803
804 if let Some(callback) = self.callbacks.postshutdown.as_ref() {
805 info!("[{}] calling post-shutdown callback.", self.name);
806 if let Err(err) = callback() {
807 warn!("[{}] post-shutdown callback failed, reason: {}", self.name, err);
808 }
809 }
810
811 *self.system_state.write() = SystemState::Stopped;
812
813 if err_count > 0 { Err(ActorError::ActorPanic) } else { Ok(()) }
814 }
815
816 pub fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&self, recipient: Recipient<M>) {
818 let mut event_subscribers = self.event_subscribers.write();
819 event_subscribers.subscribe_recipient::<M, E>(recipient);
820 }
821
822 pub fn subscribe_and_receive_latest<M: 'static, E: Event + Into<M>>(
825 &self,
826 recipient: Recipient<M>,
827 ) -> Result<(), SendError> {
828 let mut event_subscribers = self.event_subscribers.write();
829
830 if let Some(last_cached_value) = event_subscribers.last_value_cache.get(&TypeId::of::<E>())
834 && let Some(msg) = last_cached_value.downcast_ref::<E>()
835 {
836 recipient.send(msg.clone().into())?;
837 }
838
839 event_subscribers.subscribe_recipient::<M, E>(recipient);
840 Ok(())
841 }
842
843 pub fn publish<E: Event>(&self, event: E) -> Result<(), PublishError> {
851 let event_subscribers = self.event_subscribers.read();
852 let type_id = TypeId::of::<E>();
853
854 event_subscribers.last_value_cache.insert(type_id, Box::new(event.clone()));
858
859 if let Some(subs) = event_subscribers.events.get(&type_id) {
860 let errors: Vec<SendError> = subs
861 .iter()
862 .filter_map(|subscriber_callback| subscriber_callback(&event).err())
863 .collect();
864 if !errors.is_empty() {
865 return Err(PublishError(errors));
866 }
867 }
868
869 Ok(())
870 }
871
872 pub fn name(&self) -> &str {
873 &self.name
874 }
875
876 pub fn is_running(&self) -> bool {
878 self.system_state.read().is_running()
879 }
880}
881
882enum RegistryEntry {
883 InPlace(Sender<Control>, thread::Thread),
884 BackgroundThread(Sender<Control>, thread::JoinHandle<()>),
885}
886
887impl RegistryEntry {
888 fn name(&self) -> String {
889 match self {
890 RegistryEntry::InPlace(_, thread_handle) => {
891 thread_handle.name().unwrap_or("unnamed").to_owned()
892 },
893 RegistryEntry::BackgroundThread(_, join_handle) => {
894 join_handle.thread().name().unwrap_or("unnamed").to_owned()
895 },
896 }
897 }
898
899 fn control_addr(&mut self) -> &mut Sender<Control> {
900 match self {
901 RegistryEntry::InPlace(control_addr, _) => control_addr,
902 RegistryEntry::BackgroundThread(control_addr, _) => control_addr,
903 }
904 }
905}
906
907pub enum Control {
909 Stop,
911}
912
913pub trait Actor {
915 type Message: Send + 'static;
918 type Error: std::fmt::Display;
920 type Context;
922
923 const DEFAULT_CAPACITY_NORMAL: usize = 5;
925 const DEFAULT_CAPACITY_HIGH: usize = 5;
927
928 fn name() -> &'static str {
931 type_name::<Self>()
932 }
933
934 fn priority(_message: &Self::Message) -> Priority {
937 Priority::Normal
938 }
939
940 fn started(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
942 Ok(())
943 }
944
945 fn handle(
947 &mut self,
948 context: &mut Self::Context,
949 message: Self::Message,
950 ) -> Result<(), Self::Error>;
951
952 fn stopped(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
954 Ok(())
955 }
956
957 fn deadline_passed(
991 &mut self,
992 _context: &mut Self::Context,
993 _deadline: Instant,
994 ) -> Result<(), Self::Error> {
995 Ok(())
996 }
997
998 fn addr() -> Addr<Self::Message> {
1000 let capacity =
1001 Capacity { normal: Self::DEFAULT_CAPACITY_NORMAL, high: Self::DEFAULT_CAPACITY_HIGH };
1002 Self::addr_with_capacity(capacity)
1003 }
1004
1005 fn addr_with_capacity(capacity: impl Into<Capacity>) -> Addr<Self::Message> {
1007 Addr::new(capacity, Self::name(), Self::priority)
1008 }
1009}
1010
1011pub struct Addr<M> {
1012 recipient: Recipient<M>,
1013 priority_rx: Receiver<M>,
1014 message_rx: Receiver<M>,
1015 control_rx: Receiver<Control>,
1016}
1017
1018impl<M> Clone for Addr<M> {
1019 fn clone(&self) -> Self {
1020 Self {
1021 recipient: self.recipient.clone(),
1022 priority_rx: self.priority_rx.clone(),
1023 message_rx: self.message_rx.clone(),
1024 control_rx: self.control_rx.clone(),
1025 }
1026 }
1027}
1028
1029impl<M> Deref for Addr<M> {
1030 type Target = Recipient<M>;
1031
1032 fn deref(&self) -> &Self::Target {
1033 &self.recipient
1034 }
1035}
1036
1037impl<M: Send + 'static> Addr<M> {
1038 fn new(
1039 capacity: impl Into<Capacity>,
1040 name: &'static str,
1041 get_priority: fn(&M) -> Priority,
1042 ) -> Self {
1043 let capacity: Capacity = capacity.into();
1044
1045 let (priority_tx, priority_rx) = flume::bounded::<M>(capacity.high);
1046 let (message_tx, message_rx) = flume::bounded::<M>(capacity.normal);
1047 let (control_tx, control_rx) = flume::bounded(CONTROL_CHANNEL_CAPACITY);
1048
1049 let message_tx =
1050 Arc::new(MessageSender { high: priority_tx, normal: message_tx, get_priority, name });
1051 Self {
1052 recipient: Recipient { message_tx, control_tx },
1053 priority_rx,
1054 message_rx,
1055 control_rx,
1056 }
1057 }
1058}
1059
1060#[derive(Clone, Copy, Debug)]
1062pub enum Priority {
1063 Normal,
1064 High,
1065}
1066
1067pub struct Recipient<M> {
1070 message_tx: Arc<dyn SenderTrait<M>>,
1071 control_tx: Sender<Control>,
1072}
1073
1074impl<M> Clone for Recipient<M> {
1077 fn clone(&self) -> Self {
1078 Self { message_tx: Arc::clone(&self.message_tx), control_tx: self.control_tx.clone() }
1079 }
1080}
1081
1082impl<M> Recipient<M> {
1083 pub fn send(&self, message: M) -> Result<(), SendError> {
1086 self.message_tx.try_send(message)
1087 }
1088}
1089
1090impl<M: 'static> Recipient<M> {
1091 pub fn recipient<N: Into<M>>(&self) -> Recipient<N> {
1094 Recipient {
1095 message_tx: Arc::new(Arc::clone(&self.message_tx)),
1097 control_tx: self.control_tx.clone(),
1098 }
1099 }
1100}
1101
1102pub trait SendResultExt {
1103 fn on_full<F: FnOnce(&'static str, Priority)>(self, func: F) -> Result<(), DisconnectedError>;
1106
1107 fn ignore_on_full(self) -> Result<(), DisconnectedError>;
1109}
1110
1111impl SendResultExt for Result<(), SendError> {
1112 fn on_full<F: FnOnce(&'static str, Priority)>(
1113 self,
1114 callback: F,
1115 ) -> Result<(), DisconnectedError> {
1116 self.or_else(|e| match e {
1117 SendError { recipient_name, priority, reason: SendErrorReason::Full } => {
1118 callback(recipient_name, priority);
1119 Ok(())
1120 },
1121 SendError { recipient_name, priority, reason: SendErrorReason::Disconnected } => {
1122 Err(DisconnectedError { recipient_name, priority })
1123 },
1124 })
1125 }
1126
1127 fn ignore_on_full(self) -> Result<(), DisconnectedError> {
1128 self.on_full(|_, _| ())
1129 }
1130}
1131
1132struct MessageSender<M> {
1134 high: Sender<M>,
1135 normal: Sender<M>,
1136 get_priority: fn(&M) -> Priority,
1137 name: &'static str,
1139}
1140
1141trait SenderTrait<M>: Send + Sync {
1143 fn try_send(&self, message: M) -> Result<(), SendError>;
1144}
1145
1146impl<M: Send> SenderTrait<M> for MessageSender<M> {
1148 fn try_send(&self, message: M) -> Result<(), SendError> {
1149 let priority = (self.get_priority)(&message);
1150 let sender = match priority {
1151 Priority::Normal => &self.normal,
1152 Priority::High => &self.high,
1153 };
1154 sender.try_send(message).map_err(|e| SendError {
1155 reason: e.into(),
1156 recipient_name: self.name,
1157 priority,
1158 })
1159 }
1160}
1161
1162impl<M: Into<N>, N> SenderTrait<M> for Arc<dyn SenderTrait<N>> {
1164 fn try_send(&self, message: M) -> Result<(), SendError> {
1165 self.deref().try_send(message.into())
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use std::{
1172 rc::Rc,
1173 sync::atomic::{AtomicU32, Ordering},
1174 time::Duration,
1175 };
1176
1177 use super::*;
1178
1179 struct TestActor;
1180 impl Actor for TestActor {
1181 type Context = Context<Self::Message>;
1182 type Error = String;
1183 type Message = usize;
1184
1185 fn name() -> &'static str {
1186 "TestActor"
1190 }
1191
1192 fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> {
1193 println!("message: {message}");
1194 Ok(())
1195 }
1196
1197 fn started(&mut self, _: &mut Self::Context) -> Result<(), String> {
1198 println!("started");
1199 Ok(())
1200 }
1201
1202 fn stopped(&mut self, _: &mut Self::Context) -> Result<(), String> {
1203 println!("stopped");
1204 Ok(())
1205 }
1206 }
1207
1208 #[test]
1209 fn it_works() {
1210 let mut system = System::new("hi");
1211 let address = system.spawn(TestActor).unwrap();
1212 let _ = system.spawn(TestActor).unwrap();
1213 let _ = system.spawn(TestActor).unwrap();
1214 let _ = system.spawn(TestActor).unwrap();
1215 let _ = system.spawn(TestActor).unwrap();
1216 address.send(1337usize).unwrap();
1217 address.send(666usize).unwrap();
1218 address.send(1usize).unwrap();
1219 thread::sleep(Duration::from_millis(100));
1220
1221 system.shutdown().unwrap();
1222 thread::sleep(Duration::from_millis(100));
1223 }
1224
1225 #[test]
1226 fn test_ignore_on_full() {
1227 let mut system = System::new("hi");
1228 let address = system.prepare(TestActor).with_capacity(1).spawn().unwrap();
1229 address.send(1337usize).unwrap();
1230 assert!(address.send(666usize).is_err());
1231 address.send(666usize).ignore_on_full().unwrap();
1232
1233 thread::sleep(Duration::from_millis(100));
1234
1235 system.shutdown().unwrap();
1236 thread::sleep(Duration::from_millis(100));
1237 }
1238
1239 #[test]
1240 fn send_constraints() {
1241 #[derive(Default)]
1242 struct LocalActor {
1243 _ensure_not_send_not_sync: Rc<()>,
1244 }
1245 impl Actor for LocalActor {
1246 type Context = Context<Self::Message>;
1247 type Error = String;
1248 type Message = ();
1249
1250 fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> {
1251 Ok(())
1252 }
1253
1254 fn started(&mut self, ctx: &mut Self::Context) -> Result<(), String> {
1256 ctx.system_handle.shutdown().map_err(|e| e.to_string())
1257 }
1258 }
1259
1260 {
1262 let mut system = System::new("send_constraints prepare_fn");
1263 let _ = system.prepare_fn(LocalActor::default).with_default_capacity().spawn().unwrap();
1264 }
1265
1266 {
1268 let mut system = System::new("send_constraints run_and_block");
1269 system.prepare(LocalActor::default()).with_default_capacity().run_and_block().unwrap();
1270 }
1271 }
1272
1273 #[test]
1274 fn timeouts() {
1275 struct TimeoutActor {
1276 handle_count: Arc<AtomicU32>,
1277 timeout_count: Arc<AtomicU32>,
1278 }
1279
1280 impl Actor for TimeoutActor {
1281 type Context = Context<Self::Message>;
1282 type Error = String;
1283 type Message = Option<Instant>;
1284
1285 fn handle(
1286 &mut self,
1287 ctx: &mut Self::Context,
1288 msg: Self::Message,
1289 ) -> Result<(), String> {
1290 self.handle_count.fetch_add(1, Ordering::SeqCst);
1291 if msg.is_some() {
1292 ctx.receive_deadline = msg;
1293 }
1294 Ok(())
1295 }
1296
1297 fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), String> {
1298 self.timeout_count.fetch_add(1, Ordering::SeqCst);
1299 Ok(())
1300 }
1301 }
1302
1303 let mut system = System::new("timeouts");
1304 let (handle_count, timeout_count) = (Default::default(), Default::default());
1305 let actor = TimeoutActor {
1306 handle_count: Arc::clone(&handle_count),
1307 timeout_count: Arc::clone(&timeout_count),
1308 };
1309 let addr = system.spawn(actor).unwrap();
1310
1311 addr.send(Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())).unwrap();
1313 thread::sleep(Duration::from_millis(10));
1314 assert_eq!(handle_count.load(Ordering::SeqCst), 1);
1315 assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
1316
1317 addr.send(Some(Instant::now() + Duration::from_millis(20))).unwrap();
1319 thread::sleep(Duration::from_millis(10));
1320 assert_eq!(handle_count.load(Ordering::SeqCst), 2);
1321 assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
1322 thread::sleep(Duration::from_millis(20));
1323 assert_eq!(handle_count.load(Ordering::SeqCst), 2);
1324 assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
1325
1326 addr.send(Some(Instant::now() + Duration::from_millis(40))).unwrap();
1328 thread::sleep(Duration::from_millis(20));
1329 assert_eq!(handle_count.load(Ordering::SeqCst), 3);
1330 assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
1331 addr.send(None).unwrap();
1332 thread::sleep(Duration::from_millis(30));
1333 assert_eq!(handle_count.load(Ordering::SeqCst), 4);
1334 assert_eq!(timeout_count.load(Ordering::SeqCst), 3);
1335
1336 system.shutdown().unwrap();
1337 }
1338
1339 #[test]
1340 fn errors() {
1341 let mut system = System::new("hi");
1342 let low_capacity_actor = TestActor::addr_with_capacity(1);
1343 let stopped_actor = system.spawn(TestActor).unwrap().recipient();
1345
1346 low_capacity_actor.send(9).expect("one message should fit");
1347 let error = low_capacity_actor.send(123).unwrap_err();
1348 assert_eq!(
1349 error.to_string(),
1350 "the capacity of TestActor's Normal-priority channel is full"
1351 );
1352 assert_eq!(
1353 format!("{error:?}"),
1354 r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Full }"#
1355 );
1356
1357 system.shutdown().unwrap();
1358
1359 let error = stopped_actor.send(456usize).unwrap_err();
1360 assert_eq!(error.to_string(), "the recipient of the message (TestActor) no longer exists");
1361 assert_eq!(
1362 format!("{error:?}"),
1363 r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Disconnected }"#
1364 );
1365 }
1366
1367 #[test]
1368 fn message_priorities() {
1369 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace"))
1371 .try_init()
1372 .ok();
1373
1374 struct PriorityActor {
1375 received: Arc<Mutex<Vec<usize>>>,
1376 }
1377
1378 impl Actor for PriorityActor {
1379 type Context = Context<Self::Message>;
1380 type Error = String;
1381 type Message = usize;
1382
1383 fn handle(
1384 &mut self,
1385 context: &mut Self::Context,
1386 message: Self::Message,
1387 ) -> Result<(), Self::Error> {
1388 let mut received = self.received.lock();
1389 received.push(message);
1390 if received.len() >= 20 {
1391 context.system_handle.shutdown().unwrap();
1392 }
1393 Ok(())
1394 }
1395
1396 fn priority(message: &Self::Message) -> Priority {
1397 if *message >= 10 { Priority::High } else { Priority::Normal }
1398 }
1399 }
1400
1401 let addr = PriorityActor::addr_with_capacity(10);
1402 let received = Arc::new(Mutex::new(Vec::<usize>::new()));
1403
1404 for message in 0..20usize {
1406 addr.send(message).unwrap();
1407 }
1408
1409 let mut system = System::new("priorities");
1410 system
1411 .prepare(PriorityActor { received: Arc::clone(&received) })
1412 .with_addr(addr)
1413 .run_and_block()
1414 .unwrap();
1415
1416 assert_eq!(
1417 *received.lock(),
1418 [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1419 );
1420 }
1421
1422 impl Event for () {}
1423
1424 #[test]
1425 fn last_cached_event() {
1426 struct Subscriber;
1427 impl Actor for Subscriber {
1428 type Context = Context<Self::Message>;
1429 type Error = String;
1430 type Message = ();
1431
1432 fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
1433 context.subscribe_and_receive_latest::<Self::Message>().map_err(|e| e.to_string())
1434 }
1435
1436 fn handle(
1437 &mut self,
1438 context: &mut Self::Context,
1439 _: Self::Message,
1440 ) -> Result<(), Self::Error> {
1441 println!("Event received!");
1442 context.system_handle.shutdown().unwrap();
1443 Ok(())
1444 }
1445 }
1446
1447 let mut system = System::new("last cached event");
1448 system.publish(()).expect("can publish event");
1449
1450 system
1452 .prepare(Subscriber)
1453 .with_capacity(1)
1454 .run_and_block()
1455 .expect("actor finishes successfully");
1456 }
1457}