1#![warn(clippy::clone_on_ref_ptr, clippy::todo)]
2
3use flume::{select::SelectError, Receiver, RecvError, Selector, Sender};
57use log::*;
58use parking_lot::{Mutex, RwLock};
59use std::{
60 any::{type_name, TypeId},
61 collections::HashMap,
62 fmt,
63 ops::Deref,
64 sync::Arc,
65 thread,
66 time::{Duration, Instant},
67};
68
69pub mod timed;
70
71const CONTROL_CHANNEL_CAPACITY: usize = 5;
73
74#[derive(Debug)]
75pub enum ActorError {
76 SystemStopped { actor_name: &'static str },
78 SpawnFailed { actor_name: &'static str },
80 ActorPanic,
82}
83
84impl fmt::Display for ActorError {
85 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
86 match self {
87 ActorError::SystemStopped { actor_name } => {
88 write!(f, "the system is not running; the actor {actor_name} can not be started")
89 },
90 ActorError::SpawnFailed { actor_name } => {
91 write!(f, "failed to spawn a thread for the actor {actor_name}")
92 },
93 ActorError::ActorPanic => {
94 write!(f, "panic inside an actor thread; see above for more verbose logs")
95 },
96 }
97 }
98}
99
100impl std::error::Error for ActorError {}
101
102#[derive(Debug, Clone, Copy)]
104pub struct SendError {
105 pub recipient_name: &'static str,
107 pub priority: Priority,
109 pub reason: SendErrorReason,
111}
112
113impl fmt::Display for SendError {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 let recipient_name = self.recipient_name;
116 let priority = self.priority;
117 match self.reason {
118 SendErrorReason::Full => {
119 write!(
120 f,
121 "the capacity of {recipient_name}'s {priority:?}-priority channel is full"
122 )
123 },
124 SendErrorReason::Disconnected => DisconnectedError { recipient_name, priority }.fmt(f),
125 }
126 }
127}
128
129impl std::error::Error for SendError {}
130
131#[derive(Debug)]
133pub struct PublishError(pub Vec<SendError>);
134
135impl fmt::Display for PublishError {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 let error_strings: Vec<String> = self.0.iter().map(ToString::to_string).collect();
138 write!(
139 f,
140 "failed to deliver an event to {} subscribers: {}",
141 self.0.len(),
142 error_strings.join(", ")
143 )
144 }
145}
146
147impl std::error::Error for PublishError {}
148
149#[derive(Debug, Clone, Copy)]
151pub struct DisconnectedError {
152 pub recipient_name: &'static str,
154 pub priority: Priority,
156}
157
158impl fmt::Display for DisconnectedError {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 write!(f, "the recipient of the message ({}) no longer exists", self.recipient_name)
161 }
162}
163
164impl std::error::Error for DisconnectedError {}
165
166#[derive(Debug, Clone, Copy)]
168pub enum SendErrorReason {
169 Full,
171 Disconnected,
173}
174
175impl<M> From<flume::TrySendError<M>> for SendErrorReason {
176 fn from(orig: flume::TrySendError<M>) -> Self {
177 match orig {
178 flume::TrySendError::Full(_) => Self::Full,
179 flume::TrySendError::Disconnected(_) => Self::Disconnected,
180 }
181 }
182}
183
184#[derive(Default)]
190pub struct System {
191 handle: SystemHandle,
192}
193
194type SystemCallback = Box<dyn Fn() -> Result<(), ActorError> + Send + Sync>;
195type EventCallback = Box<dyn Fn(&dyn std::any::Any) -> Result<(), SendError> + Send + Sync>;
196
197#[derive(Default)]
198pub struct SystemCallbacks {
199 pub preshutdown: Option<SystemCallback>,
200 pub postshutdown: Option<SystemCallback>,
201}
202
203#[derive(Debug, Default, PartialEq)]
204enum SystemState {
205 #[default]
207 Running,
208
209 ShuttingDown,
212
213 Stopped,
217}
218
219pub trait Event: Clone + std::any::Any + Send + Sync {}
222
223#[derive(Default)]
224struct EventSubscribers {
225 events: HashMap<TypeId, Vec<EventCallback>>,
226 last_value_cache: dashmap::DashMap<TypeId, Box<dyn std::any::Any + Send + Sync>>,
231}
232
233impl EventSubscribers {
234 fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&mut self, recipient: Recipient<M>) {
235 let subs = self.events.entry(TypeId::of::<E>()).or_default();
236 subs.push(Box::new(move |e| {
237 if let Some(event) = e.downcast_ref::<E>() {
238 let msg = event.clone();
239 recipient.send(msg.into())?;
240 }
241 Ok(())
242 }));
243 }
244}
245
246#[derive(Default, Clone)]
249pub struct SystemHandle {
250 name: String,
251 registry: Arc<Mutex<Vec<RegistryEntry>>>,
252 system_state: Arc<RwLock<SystemState>>,
253 callbacks: Arc<SystemCallbacks>,
254
255 event_subscribers: Arc<RwLock<EventSubscribers>>,
256}
257
258pub struct Context<M> {
263 pub system_handle: SystemHandle,
264 pub myself: Recipient<M>,
265 receive_deadline: Option<Instant>,
266}
267
268impl<M> Context<M> {
269 fn new(system_handle: SystemHandle, myself: Recipient<M>) -> Self {
270 Self { system_handle, myself, receive_deadline: None }
271 }
272
273 pub fn deadline(&self) -> &Option<Instant> {
276 &self.receive_deadline
277 }
278
279 pub fn set_deadline(&mut self, deadline: Option<Instant>) {
283 self.receive_deadline = deadline;
284 }
285
286 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
289 self.set_deadline(timeout.map(|t| Instant::now() + t));
290 }
291
292 pub fn subscribe<E: Event + Into<M>>(&self)
298 where
299 M: 'static,
300 {
301 self.system_handle.subscribe_recipient::<M, E>(self.myself.clone());
302 }
303
304 pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError>
314 where
315 M: 'static,
316 {
317 self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.clone())
318 }
319}
320
321#[derive(Clone, Copy, Debug, Default)]
324pub struct Capacity {
325 pub normal: Option<usize>,
326 pub high: Option<usize>,
327}
328
329impl Capacity {
330 pub fn of_normal_priority(capacity: usize) -> Self {
332 Self { normal: Some(capacity), ..Default::default() }
333 }
334
335 pub fn of_high_priority(capacity: usize) -> Self {
337 Self { high: Some(capacity), ..Default::default() }
338 }
339}
340
341impl From<usize> for Capacity {
343 fn from(capacity: usize) -> Self {
344 Self { normal: Some(capacity), high: Some(capacity) }
345 }
346}
347
348#[must_use = "You must call .with_addr(), .with_capacity(), or .with_default_capacity() to \
352 configure this builder"]
353pub struct SpawnBuilderWithoutAddress<'a, A: Actor, F: FnOnce() -> A> {
354 system: &'a mut System,
355 factory: F,
356}
357
358impl<'a, A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
359 SpawnBuilderWithoutAddress<'a, A, F>
360{
361 pub fn with_addr(self, addr: Addr<A>) -> SpawnBuilderWithAddress<'a, A, F> {
363 SpawnBuilderWithAddress { spawn_builder: self, addr }
364 }
365
366 pub fn with_capacity(self, capacity: impl Into<Capacity>) -> SpawnBuilderWithAddress<'a, A, F> {
368 let addr = Addr::with_capacity(capacity);
369 SpawnBuilderWithAddress { spawn_builder: self, addr }
370 }
371
372 pub fn with_default_capacity(self) -> SpawnBuilderWithAddress<'a, A, F> {
374 let addr = Addr::with_capacity(Capacity::default());
375 SpawnBuilderWithAddress { spawn_builder: self, addr }
376 }
377}
378
379#[must_use = "You must call .spawn() or .run_and_block() to run an actor"]
380pub struct SpawnBuilderWithAddress<'a, A: Actor, F: FnOnce() -> A> {
384 spawn_builder: SpawnBuilderWithoutAddress<'a, A, F>,
385 addr: Addr<A>,
386}
387
388impl<A: Actor<Context = Context<<A as Actor>::Message>>, F: FnOnce() -> A>
389 SpawnBuilderWithAddress<'_, A, F>
390{
391 pub fn run_and_block(self) -> Result<(), ActorError> {
395 let factory = self.spawn_builder.factory;
396 self.spawn_builder.system.block_on(factory(), self.addr)
397 }
398}
399
400impl<
401 A: 'static + Actor<Context = Context<<A as Actor>::Message>>,
402 F: FnOnce() -> A + Send + 'static,
403 > SpawnBuilderWithAddress<'_, A, F>
404{
405 pub fn spawn(self) -> Result<Addr<A>, ActorError> {
407 let builder = self.spawn_builder;
408 builder.system.spawn_fn_with_addr(builder.factory, self.addr.clone())?;
409 Ok(self.addr)
410 }
411}
412
413impl System {
414 pub fn new(name: &str) -> Self {
416 System::with_callbacks(name, Default::default())
417 }
418
419 pub fn with_callbacks(name: &str, callbacks: SystemCallbacks) -> Self {
420 Self {
421 handle: SystemHandle {
422 name: name.to_owned(),
423 callbacks: Arc::new(callbacks),
424 ..SystemHandle::default()
425 },
426 }
427 }
428
429 pub fn prepare<A>(&mut self, actor: A) -> SpawnBuilderWithoutAddress<A, impl FnOnce() -> A>
432 where
433 A: Actor,
434 {
435 SpawnBuilderWithoutAddress { system: self, factory: move || actor }
436 }
437
438 pub fn prepare_fn<A, F>(&mut self, factory: F) -> SpawnBuilderWithoutAddress<A, F>
444 where
445 A: Actor,
446 F: FnOnce() -> A + Send,
447 {
448 SpawnBuilderWithoutAddress { system: self, factory }
449 }
450
451 pub fn spawn<A>(&mut self, actor: A) -> Result<Addr<A>, ActorError>
455 where
456 A: Actor<Context = Context<<A as Actor>::Message>> + Send + 'static,
457 {
458 self.prepare(actor).with_default_capacity().spawn()
459 }
460
461 fn spawn_fn_with_addr<F, A>(&mut self, factory: F, addr: Addr<A>) -> Result<(), ActorError>
466 where
467 F: FnOnce() -> A + Send + 'static,
468 A: Actor<Context = Context<<A as Actor>::Message>> + 'static,
469 {
470 let system_state_lock = self.handle.system_state.read();
473 match *system_state_lock {
474 SystemState::ShuttingDown | SystemState::Stopped => {
475 return Err(ActorError::SystemStopped { actor_name: A::name() });
476 },
477 SystemState::Running => {},
478 }
479
480 let system_handle = self.handle.clone();
481 let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
482 let control_addr = addr.control_tx.clone();
483
484 let thread_handle = thread::Builder::new()
485 .name(A::name().into())
486 .spawn(move || {
487 let mut actor = factory();
488
489 if let Err(error) = actor.started(&mut context) {
490 Self::report_error_shutdown(&system_handle, A::name(), "started", error);
491 return;
492 }
493 debug!("[{}] started actor: {}", system_handle.name, A::name());
494
495 Self::run_actor_select_loop(actor, addr, &mut context, &system_handle);
496 })
497 .map_err(|_| ActorError::SpawnFailed { actor_name: A::name() })?;
498
499 self.handle
500 .registry
501 .lock()
502 .push(RegistryEntry::BackgroundThread(control_addr, thread_handle));
503
504 Ok(())
505 }
506
507 pub fn run(&mut self) -> Result<(), ActorError> {
509 while *self.system_state.read() != SystemState::Stopped {
510 thread::sleep(Duration::from_millis(10));
511 }
512
513 Ok(())
514 }
515
516 fn block_on<A>(&mut self, mut actor: A, addr: Addr<A>) -> Result<(), ActorError>
519 where
520 A: Actor<Context = Context<<A as Actor>::Message>>,
521 {
522 if !self.is_running() {
524 return Err(ActorError::SystemStopped { actor_name: A::name() });
525 }
526
527 let system_handle = &self.handle;
528 let mut context = Context::new(system_handle.clone(), addr.recipient.clone());
529
530 self.handle
531 .registry
532 .lock()
533 .push(RegistryEntry::InPlace(addr.control_tx.clone(), thread::current()));
534
535 match actor.started(&mut context) {
536 Ok(()) => {
537 debug!("[{}] started actor: {}", system_handle.name, A::name());
538 Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
539 },
540 Err(error) => Self::report_error_shutdown(system_handle, A::name(), "started", error),
541 }
542
543 while *self.system_state.read() != SystemState::Stopped {
548 thread::sleep(Duration::from_millis(10));
549 }
550
551 Ok(())
552 }
553
554 fn run_actor_select_loop<A>(
555 mut actor: A,
556 addr: Addr<A>,
557 context: &mut Context<A::Message>,
558 system_handle: &SystemHandle,
559 ) where
560 A: Actor<Context = Context<<A as Actor>::Message>>,
561 {
562 enum Received<M> {
564 Control(Control),
565 Message(M),
566 Timeout,
567 }
568
569 loop {
570 let selector = Selector::new()
574 .recv(&addr.control_rx, |msg| match msg {
575 Ok(control) => Received::Control(control),
576 Err(RecvError::Disconnected) => {
577 panic!("We keep control_tx alive through addr, should not happen.")
578 },
579 })
580 .recv(&addr.priority_rx, |msg| match msg {
581 Ok(msg) => Received::Message(msg),
582 Err(RecvError::Disconnected) => {
583 panic!("We keep priority_tx alive through addr, should not happen.")
584 },
585 })
586 .recv(&addr.message_rx, |msg| match msg {
587 Ok(msg) => Received::Message(msg),
588 Err(RecvError::Disconnected) => {
589 panic!("We keep message_tx alive through addr, should not happen.")
590 },
591 });
592
593 let received = if let Some(deadline) = context.receive_deadline {
595 match selector.wait_deadline(deadline) {
596 Ok(received) => received,
597 Err(SelectError::Timeout) => Received::Timeout,
598 }
599 } else {
600 selector.wait()
601 };
602
603 match received {
605 Received::Control(Control::Stop) => {
606 if let Err(error) = actor.stopped(context) {
607 Self::report_error_shutdown(system_handle, A::name(), "stopped", error);
609 }
610 debug!("[{}] stopped actor: {}", system_handle.name, A::name());
611 return;
612 },
613 Received::Message(msg) => {
614 trace!("[{}] message received by {}", system_handle.name, A::name());
615 if let Err(error) = actor.handle(context, msg) {
616 Self::report_error_shutdown(system_handle, A::name(), "handle", error);
617 return;
618 }
619 },
620 Received::Timeout => {
621 let deadline = context.receive_deadline.take().expect("implied by timeout");
622 if let Err(error) = actor.deadline_passed(context, deadline) {
623 Self::report_error_shutdown(
624 system_handle,
625 A::name(),
626 "deadline_passed",
627 error,
628 );
629 return;
630 }
631 },
632 }
633 }
634 }
635
636 fn report_error_shutdown(
637 system_handle: &SystemHandle,
638 actor_name: &str,
639 method_name: &str,
640 error: impl std::fmt::Display,
641 ) {
642 let is_running = match *system_handle.system_state.read() {
643 SystemState::Running => true,
644 SystemState::ShuttingDown | SystemState::Stopped => false,
645 };
646
647 let system_name = &system_handle.name;
648
649 if is_running {
652 error!(
653 "[{system_name}] {actor_name} {method_name}() error: {error:#}. Shutting down the \
654 actor system."
655 );
656 let _ = system_handle.shutdown();
657 } else {
658 warn!(
659 "[{system_name}] {actor_name} {method_name}() error while shutting down: \
660 {error:#}. Ignoring."
661 );
662 }
663 }
664}
665
666impl Drop for System {
667 fn drop(&mut self) {
668 self.shutdown().unwrap();
669 }
670}
671
672impl Deref for System {
673 type Target = SystemHandle;
674
675 fn deref(&self) -> &Self::Target {
676 &self.handle
677 }
678}
679
680impl SystemHandle {
681 pub fn shutdown(&self) -> Result<(), ActorError> {
683 let shutdown_start = Instant::now();
684
685 let current_thread = thread::current();
686 let current_thread_name = current_thread.name().unwrap_or("Unknown thread id");
687
688 {
690 let mut system_state_lock = self.system_state.write();
691
692 match *system_state_lock {
693 SystemState::ShuttingDown | SystemState::Stopped => {
694 debug!(
695 "[{}] thread {} called system.shutdown() but the system is already \
696 shutting down or stopped.",
697 self.name, current_thread_name,
698 );
699 return Ok(());
700 },
701 SystemState::Running => {
702 info!(
703 "[{}] thread {} shutting down the actor system.",
704 self.name, current_thread_name,
705 );
706 *system_state_lock = SystemState::ShuttingDown;
707 },
708 }
709 }
710
711 if let Some(callback) = self.callbacks.preshutdown.as_ref() {
712 info!("[{}] calling pre-shutdown callback.", self.name);
713 if let Err(err) = callback() {
714 warn!("[{}] pre-shutdown callback failed, reason: {}", self.name, err);
715 }
716 }
717
718 let err_count = {
719 let mut registry = self.registry.lock();
720 debug!("[{}] joining {} actor threads.", self.name, registry.len());
721
722 for entry in registry.iter_mut().rev() {
727 let actor_name = entry.name();
728
729 if let Err(e) = entry.control_addr().send(Control::Stop) {
730 warn!(
731 "Couldn't send Control::Stop to {actor_name} to shut it down: {e:#}. \
732 Ignoring and proceeding."
733 );
734 }
735 }
736
737 registry
738 .drain(..)
739 .enumerate()
740 .rev()
741 .filter_map(|(i, entry)| {
742 let actor_name = entry.name();
743
744 match entry {
745 RegistryEntry::InPlace(_, _) => {
746 debug!(
747 "[{}] [{i}] skipping join of an actor running in-place: \
748 {actor_name}",
749 self.name
750 );
751 None
752 },
753 RegistryEntry::BackgroundThread(_control_addr, thread_handle) => {
754 if thread_handle.thread().id() == current_thread.id() {
755 debug!(
756 "[{}] [{i}] skipping join of the actor thread currently \
757 executing SystemHandle::shutdown(): {actor_name}",
758 self.name,
759 );
760 return None;
761 }
762
763 debug!("[{}] [{}] joining actor thread: {}", self.name, i, actor_name);
764
765 let join_result = thread_handle.join().map_err(|e| {
766 error!("a panic inside actor thread {actor_name}: {e:?}")
767 });
768
769 debug!("[{}] [{}] joined actor thread: {}", self.name, i, actor_name);
770 join_result.err()
771 },
772 }
773 })
774 .count()
775 };
776
777 info!("[{}] system finished shutting down in {:?}.", self.name, shutdown_start.elapsed());
778
779 if let Some(callback) = self.callbacks.postshutdown.as_ref() {
780 info!("[{}] calling post-shutdown callback.", self.name);
781 if let Err(err) = callback() {
782 warn!("[{}] post-shutdown callback failed, reason: {}", self.name, err);
783 }
784 }
785
786 *self.system_state.write() = SystemState::Stopped;
787
788 if err_count > 0 {
789 Err(ActorError::ActorPanic)
790 } else {
791 Ok(())
792 }
793 }
794
795 pub fn subscribe_recipient<M: 'static, E: Event + Into<M>>(&self, recipient: Recipient<M>) {
797 let mut event_subscribers = self.event_subscribers.write();
798 event_subscribers.subscribe_recipient::<M, E>(recipient);
799 }
800
801 pub fn subscribe_and_receive_latest<M: 'static, E: Event + Into<M>>(
804 &self,
805 recipient: Recipient<M>,
806 ) -> Result<(), SendError> {
807 let mut event_subscribers = self.event_subscribers.write();
808
809 if let Some(last_cached_value) = event_subscribers.last_value_cache.get(&TypeId::of::<E>())
813 {
814 if let Some(msg) = last_cached_value.downcast_ref::<E>() {
815 recipient.send(msg.clone().into())?;
816 }
817 }
818
819 event_subscribers.subscribe_recipient::<M, E>(recipient);
820 Ok(())
821 }
822
823 pub fn publish<E: Event>(&self, event: E) -> Result<(), PublishError> {
831 let event_subscribers = self.event_subscribers.read();
832 let type_id = TypeId::of::<E>();
833
834 event_subscribers.last_value_cache.insert(type_id, Box::new(event.clone()));
838
839 if let Some(subs) = event_subscribers.events.get(&type_id) {
840 let errors: Vec<SendError> = subs
841 .iter()
842 .filter_map(|subscriber_callback| subscriber_callback(&event).err())
843 .collect();
844 if !errors.is_empty() {
845 return Err(PublishError(errors));
846 }
847 }
848
849 Ok(())
850 }
851
852 pub fn is_running(&self) -> bool {
853 *self.system_state.read() == SystemState::Running
854 }
855}
856
857enum RegistryEntry {
858 InPlace(Sender<Control>, thread::Thread),
859 BackgroundThread(Sender<Control>, thread::JoinHandle<()>),
860}
861
862impl RegistryEntry {
863 fn name(&self) -> String {
864 match self {
865 RegistryEntry::InPlace(_, thread_handle) => {
866 thread_handle.name().unwrap_or("unnamed").to_owned()
867 },
868 RegistryEntry::BackgroundThread(_, join_handle) => {
869 join_handle.thread().name().unwrap_or("unnamed").to_owned()
870 },
871 }
872 }
873
874 fn control_addr(&mut self) -> &mut Sender<Control> {
875 match self {
876 RegistryEntry::InPlace(control_addr, _) => control_addr,
877 RegistryEntry::BackgroundThread(control_addr, _) => control_addr,
878 }
879 }
880}
881
882pub enum Control {
884 Stop,
886}
887
888pub trait Actor {
890 type Message: Send + 'static;
893 type Error: std::fmt::Display;
895 type Context;
897
898 const DEFAULT_CAPACITY_NORMAL: usize = 5;
900 const DEFAULT_CAPACITY_HIGH: usize = 5;
902
903 fn name() -> &'static str {
906 type_name::<Self>()
907 }
908
909 fn priority(_message: &Self::Message) -> Priority {
912 Priority::Normal
913 }
914
915 fn started(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
917 Ok(())
918 }
919
920 fn handle(
922 &mut self,
923 context: &mut Self::Context,
924 message: Self::Message,
925 ) -> Result<(), Self::Error>;
926
927 fn stopped(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
929 Ok(())
930 }
931
932 fn deadline_passed(
966 &mut self,
967 _context: &mut Self::Context,
968 _deadline: Instant,
969 ) -> Result<(), Self::Error> {
970 Ok(())
971 }
972}
973
974pub struct Addr<A: Actor + ?Sized> {
975 recipient: Recipient<A::Message>,
976 priority_rx: Receiver<A::Message>,
977 message_rx: Receiver<A::Message>,
978 control_rx: Receiver<Control>,
979}
980
981impl<A: Actor> Default for Addr<A> {
982 fn default() -> Self {
983 Self::with_capacity(Capacity::default())
984 }
985}
986
987impl<A: Actor> Clone for Addr<A> {
988 fn clone(&self) -> Self {
989 Self {
990 recipient: self.recipient.clone(),
991 priority_rx: self.priority_rx.clone(),
992 message_rx: self.message_rx.clone(),
993 control_rx: self.control_rx.clone(),
994 }
995 }
996}
997
998impl<A, M> Deref for Addr<A>
999where
1000 A: Actor<Message = M>,
1001{
1002 type Target = Recipient<M>;
1003
1004 fn deref(&self) -> &Self::Target {
1005 &self.recipient
1006 }
1007}
1008
1009impl<A: Actor> Addr<A> {
1010 pub fn with_capacity(capacity: impl Into<Capacity>) -> Self {
1012 let capacity: Capacity = capacity.into();
1013 let prio_capacity = capacity.high.unwrap_or(A::DEFAULT_CAPACITY_HIGH);
1014 let normal_capacity = capacity.normal.unwrap_or(A::DEFAULT_CAPACITY_NORMAL);
1015
1016 let (priority_tx, priority_rx) = flume::bounded::<A::Message>(prio_capacity);
1017 let (message_tx, message_rx) = flume::bounded::<A::Message>(normal_capacity);
1018 let (control_tx, control_rx) = flume::bounded(CONTROL_CHANNEL_CAPACITY);
1019
1020 let name = A::name();
1021 let message_tx = Arc::new(MessageSender {
1022 high: priority_tx,
1023 normal: message_tx,
1024 get_priority: A::priority,
1025 name,
1026 });
1027 Self {
1028 recipient: Recipient { message_tx, control_tx },
1029 priority_rx,
1030 message_rx,
1031 control_rx,
1032 }
1033 }
1034}
1035
1036#[derive(Clone, Copy, Debug)]
1038pub enum Priority {
1039 Normal,
1040 High,
1041}
1042
1043pub struct Recipient<M> {
1046 message_tx: Arc<dyn SenderTrait<M>>,
1047 control_tx: Sender<Control>,
1048}
1049
1050impl<M> Clone for Recipient<M> {
1053 fn clone(&self) -> Self {
1054 Self { message_tx: Arc::clone(&self.message_tx), control_tx: self.control_tx.clone() }
1055 }
1056}
1057
1058impl<M> Recipient<M> {
1059 pub fn send(&self, message: M) -> Result<(), SendError> {
1062 self.message_tx.try_send(message)
1063 }
1064}
1065
1066impl<M: 'static> Recipient<M> {
1067 pub fn recipient<N: Into<M>>(&self) -> Recipient<N> {
1073 Recipient {
1074 message_tx: Arc::new(Arc::clone(&self.message_tx)),
1076 control_tx: self.control_tx.clone(),
1077 }
1078 }
1079}
1080
1081pub trait SendResultExt {
1082 fn on_full<F: FnOnce(&'static str, Priority)>(self, func: F) -> Result<(), DisconnectedError>;
1085
1086 fn ignore_on_full(self) -> Result<(), DisconnectedError>;
1088}
1089
1090impl SendResultExt for Result<(), SendError> {
1091 fn on_full<F: FnOnce(&'static str, Priority)>(
1092 self,
1093 callback: F,
1094 ) -> Result<(), DisconnectedError> {
1095 self.or_else(|e| match e {
1096 SendError { recipient_name, priority, reason: SendErrorReason::Full } => {
1097 callback(recipient_name, priority);
1098 Ok(())
1099 },
1100 SendError { recipient_name, priority, reason: SendErrorReason::Disconnected } => {
1101 Err(DisconnectedError { recipient_name, priority })
1102 },
1103 })
1104 }
1105
1106 fn ignore_on_full(self) -> Result<(), DisconnectedError> {
1107 self.on_full(|_, _| ())
1108 }
1109}
1110
1111struct MessageSender<M> {
1113 high: Sender<M>,
1114 normal: Sender<M>,
1115 get_priority: fn(&M) -> Priority,
1116 name: &'static str,
1118}
1119
1120trait SenderTrait<M>: Send + Sync {
1122 fn try_send(&self, message: M) -> Result<(), SendError>;
1123}
1124
1125impl<M: Send> SenderTrait<M> for MessageSender<M> {
1127 fn try_send(&self, message: M) -> Result<(), SendError> {
1128 let priority = (self.get_priority)(&message);
1129 let sender = match priority {
1130 Priority::Normal => &self.normal,
1131 Priority::High => &self.high,
1132 };
1133 sender.try_send(message).map_err(|e| SendError {
1134 reason: e.into(),
1135 recipient_name: self.name,
1136 priority,
1137 })
1138 }
1139}
1140
1141impl<M: Into<N>, N> SenderTrait<M> for Arc<dyn SenderTrait<N>> {
1143 fn try_send(&self, message: M) -> Result<(), SendError> {
1144 self.deref().try_send(message.into())
1145 }
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150 use std::{
1151 rc::Rc,
1152 sync::atomic::{AtomicU32, Ordering},
1153 time::Duration,
1154 };
1155
1156 use super::*;
1157
1158 struct TestActor;
1159 impl Actor for TestActor {
1160 type Context = Context<Self::Message>;
1161 type Error = String;
1162 type Message = usize;
1163
1164 fn name() -> &'static str {
1165 "TestActor"
1169 }
1170
1171 fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> {
1172 println!("message: {message}");
1173 Ok(())
1174 }
1175
1176 fn started(&mut self, _: &mut Self::Context) -> Result<(), String> {
1177 println!("started");
1178 Ok(())
1179 }
1180
1181 fn stopped(&mut self, _: &mut Self::Context) -> Result<(), String> {
1182 println!("stopped");
1183 Ok(())
1184 }
1185 }
1186
1187 #[test]
1188 fn it_works() {
1189 let mut system = System::new("hi");
1190 let address = system.spawn(TestActor).unwrap();
1191 let _ = system.spawn(TestActor).unwrap();
1192 let _ = system.spawn(TestActor).unwrap();
1193 let _ = system.spawn(TestActor).unwrap();
1194 let _ = system.spawn(TestActor).unwrap();
1195 address.send(1337usize).unwrap();
1196 address.send(666usize).unwrap();
1197 address.send(1usize).unwrap();
1198 thread::sleep(Duration::from_millis(100));
1199
1200 system.shutdown().unwrap();
1201 thread::sleep(Duration::from_millis(100));
1202 }
1203
1204 #[test]
1205 fn test_ignore_on_full() {
1206 let mut system = System::new("hi");
1207 let address = system.prepare(TestActor).with_capacity(1).spawn().unwrap();
1208 address.send(1337usize).unwrap();
1209 assert!(address.send(666usize).is_err());
1210 address.send(666usize).ignore_on_full().unwrap();
1211
1212 thread::sleep(Duration::from_millis(100));
1213
1214 system.shutdown().unwrap();
1215 thread::sleep(Duration::from_millis(100));
1216 }
1217
1218 #[test]
1219 fn send_constraints() {
1220 #[derive(Default)]
1221 struct LocalActor {
1222 _ensure_not_send_not_sync: Rc<()>,
1223 }
1224 impl Actor for LocalActor {
1225 type Context = Context<Self::Message>;
1226 type Error = String;
1227 type Message = ();
1228
1229 fn handle(&mut self, _: &mut Self::Context, _: ()) -> Result<(), String> {
1230 Ok(())
1231 }
1232
1233 fn started(&mut self, ctx: &mut Self::Context) -> Result<(), String> {
1235 ctx.system_handle.shutdown().map_err(|e| e.to_string())
1236 }
1237 }
1238
1239 let mut system = System::new("main");
1240
1241 let _ = system.prepare_fn(LocalActor::default).with_default_capacity().spawn().unwrap();
1243
1244 system.prepare(LocalActor::default()).with_default_capacity().run_and_block().unwrap();
1246
1247 system.shutdown().unwrap();
1248 }
1249
1250 #[test]
1251 fn timeouts() {
1252 struct TimeoutActor {
1253 handle_count: Arc<AtomicU32>,
1254 timeout_count: Arc<AtomicU32>,
1255 }
1256
1257 impl Actor for TimeoutActor {
1258 type Context = Context<Self::Message>;
1259 type Error = String;
1260 type Message = Option<Instant>;
1261
1262 fn handle(
1263 &mut self,
1264 ctx: &mut Self::Context,
1265 msg: Self::Message,
1266 ) -> Result<(), String> {
1267 self.handle_count.fetch_add(1, Ordering::SeqCst);
1268 if msg.is_some() {
1269 ctx.receive_deadline = msg;
1270 }
1271 Ok(())
1272 }
1273
1274 fn deadline_passed(&mut self, _: &mut Self::Context, _: Instant) -> Result<(), String> {
1275 self.timeout_count.fetch_add(1, Ordering::SeqCst);
1276 Ok(())
1277 }
1278 }
1279
1280 let mut system = System::new("timeouts");
1281 let (handle_count, timeout_count) = (Default::default(), Default::default());
1282 let actor = TimeoutActor {
1283 handle_count: Arc::clone(&handle_count),
1284 timeout_count: Arc::clone(&timeout_count),
1285 };
1286 let addr = system.spawn(actor).unwrap();
1287
1288 addr.send(Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap())).unwrap();
1290 thread::sleep(Duration::from_millis(10));
1291 assert_eq!(handle_count.load(Ordering::SeqCst), 1);
1292 assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
1293
1294 addr.send(Some(Instant::now() + Duration::from_millis(20))).unwrap();
1296 thread::sleep(Duration::from_millis(10));
1297 assert_eq!(handle_count.load(Ordering::SeqCst), 2);
1298 assert_eq!(timeout_count.load(Ordering::SeqCst), 1);
1299 thread::sleep(Duration::from_millis(20));
1300 assert_eq!(handle_count.load(Ordering::SeqCst), 2);
1301 assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
1302
1303 addr.send(Some(Instant::now() + Duration::from_millis(40))).unwrap();
1305 thread::sleep(Duration::from_millis(20));
1306 assert_eq!(handle_count.load(Ordering::SeqCst), 3);
1307 assert_eq!(timeout_count.load(Ordering::SeqCst), 2);
1308 addr.send(None).unwrap();
1309 thread::sleep(Duration::from_millis(30));
1310 assert_eq!(handle_count.load(Ordering::SeqCst), 4);
1311 assert_eq!(timeout_count.load(Ordering::SeqCst), 3);
1312
1313 system.shutdown().unwrap();
1314 }
1315
1316 #[test]
1317 fn errors() {
1318 let mut system = System::new("hi");
1319 let low_capacity_actor: Addr<TestActor> = Addr::with_capacity(1);
1320 let stopped_actor = system.spawn(TestActor).unwrap().recipient();
1322
1323 low_capacity_actor.send(9).expect("one message should fit");
1324 let error = low_capacity_actor.send(123).unwrap_err();
1325 assert_eq!(
1326 error.to_string(),
1327 "the capacity of TestActor's Normal-priority channel is full"
1328 );
1329 assert_eq!(
1330 format!("{error:?}"),
1331 r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Full }"#
1332 );
1333
1334 system.shutdown().unwrap();
1335
1336 let error = stopped_actor.send(456usize).unwrap_err();
1337 assert_eq!(error.to_string(), "the recipient of the message (TestActor) no longer exists");
1338 assert_eq!(
1339 format!("{error:?}"),
1340 r#"SendError { recipient_name: "TestActor", priority: Normal, reason: Disconnected }"#
1341 );
1342 }
1343
1344 #[test]
1345 fn message_priorities() {
1346 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace")).init();
1347
1348 struct PriorityActor {
1349 received: Arc<Mutex<Vec<usize>>>,
1350 }
1351
1352 impl Actor for PriorityActor {
1353 type Context = Context<Self::Message>;
1354 type Error = String;
1355 type Message = usize;
1356
1357 fn handle(
1358 &mut self,
1359 context: &mut Self::Context,
1360 message: Self::Message,
1361 ) -> Result<(), Self::Error> {
1362 let mut received = self.received.lock();
1363 received.push(message);
1364 if received.len() >= 20 {
1365 context.system_handle.shutdown().unwrap();
1366 }
1367 Ok(())
1368 }
1369
1370 fn priority(message: &Self::Message) -> Priority {
1371 if *message >= 10 {
1372 Priority::High
1373 } else {
1374 Priority::Normal
1375 }
1376 }
1377 }
1378
1379 let addr = Addr::with_capacity(10);
1380 let received = Arc::new(Mutex::new(Vec::<usize>::new()));
1381
1382 for message in 0..20usize {
1384 addr.send(message).unwrap();
1385 }
1386
1387 let mut system = System::new("priorities");
1388 system
1389 .prepare(PriorityActor { received: Arc::clone(&received) })
1390 .with_addr(addr)
1391 .run_and_block()
1392 .unwrap();
1393
1394 assert_eq!(
1395 *received.lock(),
1396 [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1397 );
1398 }
1399
1400 impl Event for () {}
1401
1402 #[test]
1403 fn last_cached_event() {
1404 struct Subscriber;
1405 impl Actor for Subscriber {
1406 type Context = Context<Self::Message>;
1407 type Error = String;
1408 type Message = ();
1409
1410 fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
1411 context.subscribe_and_receive_latest::<Self::Message>().map_err(|e| e.to_string())
1412 }
1413
1414 fn handle(
1415 &mut self,
1416 context: &mut Self::Context,
1417 _: Self::Message,
1418 ) -> Result<(), Self::Error> {
1419 println!("Event received!");
1420 context.system_handle.shutdown().unwrap();
1421 Ok(())
1422 }
1423 }
1424
1425 let mut system = System::new("last cached event");
1426 system.publish(()).expect("can publish event");
1427
1428 system
1430 .prepare(Subscriber)
1431 .with_addr(Addr::with_capacity(1))
1432 .run_and_block()
1433 .expect("actor finishes successfully");
1434 }
1435}