1use crate::channel::{self, SendAsync};
2use crate::{
3 Actor, ActorError, CHANNEL_SIZE, JobController, JobSpec, LifeCycle, Mailbox, MaybeCodec,
4 RunJobResult,
5};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9#[cfg(feature = "multi-node")]
10use crate::inter_node::InterNodeRuntime;
11
12pub enum ActorSystemCmd {
21 Register {
25 actor_type: String,
26 #[cfg(not(feature = "multi-node"))]
27 address: String,
28 #[cfg(feature = "multi-node")]
29 address: crate::inter_node::Address,
30 mailbox: Arc<dyn Mailbox>,
31 restart_tx: channel::Sender<()>,
32 kill_tx: channel::Sender<()>,
33 life_cycle: LifeCycle,
34 result_tx: tokio::sync::oneshot::Sender<Result<(), ActorError>>,
35 is_restarted: bool,
36 },
37 Restart {
40 address_regex: String,
41 },
42 Unregister {
45 address_regex: String,
46 },
47 FilterAddress {
50 address_regex: String,
51 result_tx: tokio::sync::oneshot::Sender<Vec<String>>,
52 },
53 FindActor {
58 actor_type: String,
59 address: String,
60 result_tx: tokio::sync::oneshot::Sender<
61 Option<(Arc<dyn Mailbox>, bool)>, >,
63 },
64 SetLifeCycle {
67 address: String,
68 life_cycle: LifeCycle,
69 },
70 RegisterJob {
74 job_id: String,
75 controller: JobController,
76 },
77 FindJob {
80 job_id: String,
81 result_tx: tokio::sync::oneshot::Sender<Option<JobController>>,
82 },
83}
84
85#[derive(Clone)]
86pub struct ActorSystem {
102 handler_tx: channel::Sender<ActorSystemCmd>,
103 cache: HashMap<String, (String, Arc<dyn Mailbox>)>,
104 channel_size: usize,
105 #[cfg(feature = "multi-node")]
107 node_name: String,
108 #[cfg(feature = "multi-node")]
111 inter_node: Option<InterNodeRuntime>,
112}
113
114#[cfg(not(feature = "multi-node"))]
115impl Default for ActorSystem {
116 fn default() -> Self {
117 let (handler_tx, handler_rx) = channel::channel(CHANNEL_SIZE);
118 let mut me = Self {
119 handler_tx,
120 cache: HashMap::new(),
121 channel_size: CHANNEL_SIZE,
122 };
123 me.run(handler_rx);
124 me
125 }
126}
127
128impl ActorSystem {
129 #[cfg(not(feature = "multi-node"))]
136 pub fn new(channel_size: Option<usize>) -> Self {
137 let (handler_tx, handler_rx) =
138 channel::channel(channel_size.unwrap_or(CHANNEL_SIZE));
139 let mut me = Self {
140 handler_tx,
141 cache: HashMap::new(),
142 channel_size: channel_size.unwrap_or(CHANNEL_SIZE),
143 };
144 me.run(handler_rx);
145 me
146 }
147
148 #[cfg(feature = "multi-node")]
167 pub async fn new(
168 channel_size: Option<usize>,
169 node_name: String,
170 broker_addr: Option<String>,
171 ) -> Result<Self, ActorError> {
172 let (handler_tx, handler_rx) =
173 channel::channel(channel_size.unwrap_or(CHANNEL_SIZE));
174 let inter_node = match broker_addr {
175 Some(addr) => Some(InterNodeRuntime::connect(node_name.clone(), addr).await?),
176 None => None,
177 };
178 let mut me = Self {
179 handler_tx,
180 cache: HashMap::new(),
181 channel_size: channel_size.unwrap_or(CHANNEL_SIZE),
182 node_name,
183 inter_node: inter_node.clone(),
184 };
185 me.run(handler_rx);
186 if let Some(rt) = inter_node {
187 rt.start_consumers(me.clone()).await?;
188 }
189 Ok(me)
190 }
191
192 #[cfg(feature = "multi-node")]
194 pub fn node_name(&self) -> &str {
195 &self.node_name
196 }
197
198 #[cfg(feature = "multi-node")]
209 pub async fn dispatch_local_any(
210 &self,
211 actor_type: String,
212 address: String,
213 payload: Arc<dyn std::any::Any + Send + Sync>,
214 ) -> Result<(), ActorError> {
215 let (tx, rx) = tokio::sync::oneshot::channel();
216 let _ = self
217 .handler_tx
218 .send_async(ActorSystemCmd::FindActor {
219 actor_type,
220 address: address.clone(),
221 result_tx: tx,
222 })
223 .await;
224 if let Ok(Some((mailbox, ready))) = rx.await {
225 if ready {
226 mailbox.send(payload).await
227 } else {
228 Err(ActorError::ActorNotReady(address))
229 }
230 } else {
231 Err(ActorError::AddressNotFound(address))
232 }
233 }
234
235 #[cfg(feature = "multi-node")]
244 pub async fn dispatch_local_any_and_recv(
245 &self,
246 actor_type: String,
247 address: String,
248 payload: Arc<dyn std::any::Any + Send + Sync>,
249 ) -> Result<Box<dyn std::any::Any + Send>, ActorError> {
250 let (tx, rx) = tokio::sync::oneshot::channel();
251 let _ = self
252 .handler_tx
253 .send_async(ActorSystemCmd::FindActor {
254 actor_type,
255 address: address.clone(),
256 result_tx: tx,
257 })
258 .await;
259 if let Ok(Some((mailbox, ready))) = rx.await {
260 if ready {
261 mailbox.send_and_recv(payload).await
262 } else {
263 Err(ActorError::ActorNotReady(address))
264 }
265 } else {
266 Err(ActorError::AddressNotFound(address))
267 }
268 }
269
270 #[cfg(feature = "multi-node")]
274 async fn spawn_remote_job<T>(
275 &self,
276 address: crate::inter_node::Address,
277 subscribe: bool,
278 job: JobSpec,
279 msg: <T as Actor>::Message,
280 job_id: String,
281 rt: crate::inter_node::InterNodeRuntime,
282 ) -> Result<RunJobResult<T>, ActorError>
283 where
284 T: Actor,
285 <T as Actor>::Message: MaybeCodec,
286 <T as Actor>::Result: MaybeCodec,
287 {
288 let payload_bytes = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
289 let actor_type = std::any::type_name::<T>().to_string();
290 let channel_size = self.channel_size;
291
292 let (abort_tx, mut abort_rx) = channel::channel(channel_size);
293 let (stop_tx, mut stop_rx) = channel::channel(channel_size);
294 let (resume_tx, mut resume_rx) = channel::channel(channel_size);
295
296 let result_subscriber_rx = if subscribe {
297 let (sub_tx, sub_rx) = channel::channel(channel_size);
298 let rt = rt.clone();
299 let address = address.clone();
300 let actor_type = actor_type.clone();
301 let payload_bytes = payload_bytes.clone();
302 tokio::spawn(async move {
303 let mut i = 0usize;
304 loop {
305 let until_start = job
306 .start_at()
307 .duration_since(std::time::SystemTime::now())
308 .unwrap_or(std::time::Duration::ZERO);
309 if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
310 .await
311 {
312 drop(sub_tx);
313 return;
314 }
315 i += 1;
316 let outcome: Result<<T as Actor>::Result, ActorError> = match rt
317 .call(&address, &actor_type, payload_bytes.clone())
318 .await
319 {
320 Ok(bytes) => <<T as Actor>::Result as xancode::Codec>::decode(
321 &xancode::Bytes::copy_from_slice(&bytes),
322 )
323 .map_err(|_| {
324 ActorError::InterNodeDecode(format!(
325 "decode failed for {}",
326 std::any::type_name::<<T as Actor>::Result>()
327 ))
328 }),
329 Err(e) => Err(e),
330 };
331 let _ = sub_tx.send_async(outcome).await;
332 if let Some(max_iter) = job.max_iter() {
333 if i >= max_iter {
334 drop(sub_tx);
335 return;
336 }
337 }
338 let interval = match job.interval() {
339 Some(d) => d,
340 None => {
341 drop(sub_tx);
342 return;
343 }
344 };
345 if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
346 {
347 drop(sub_tx);
348 return;
349 }
350 }
351 });
352 Some(sub_rx)
353 } else {
354 let rt = rt.clone();
355 tokio::spawn(async move {
356 let mut i = 0usize;
357 loop {
358 let until_start = job
359 .start_at()
360 .duration_since(std::time::SystemTime::now())
361 .unwrap_or(std::time::Duration::ZERO);
362 if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
363 .await
364 {
365 return;
366 }
367 i += 1;
368 let _ = rt
369 .fire(&address, &actor_type, payload_bytes.clone())
370 .await;
371 if let Some(max_iter) = job.max_iter() {
372 if i >= max_iter {
373 return;
374 }
375 }
376 let interval = match job.interval() {
377 Some(d) => d,
378 None => return,
379 };
380 if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
381 {
382 return;
383 }
384 }
385 });
386 None
387 };
388
389 let _ = self
390 .handler_tx
391 .send_async(ActorSystemCmd::RegisterJob {
392 job_id: job_id.clone(),
393 controller: JobController {
394 abort_tx,
395 stop_tx,
396 resume_tx,
397 },
398 })
399 .await;
400
401 Ok(RunJobResult {
402 job_id,
403 result_subscriber_rx,
404 })
405 }
406
407 pub fn handler_tx(&self) -> channel::Sender<ActorSystemCmd> {
416 self.handler_tx.clone()
417 }
418
419 pub async fn filter_address(&self, address_regex: String) -> Vec<String> {
425 let (tx, rx) = tokio::sync::oneshot::channel();
426 let _ = self
427 .handler_tx
428 .send_async(ActorSystemCmd::FilterAddress {
429 address_regex,
430 result_tx: tx,
431 })
432 .await;
433 match rx.await {
434 Ok(addresses) => addresses,
435 Err(e) => {
436 error!("Receive address list failed: {:?}", e);
437 Vec::new()
438 }
439 }
440 }
441
442 pub fn restart(&mut self, address_regex: String) {
454 if let Err(e) =
455 channel::try_send(&self.handler_tx, ActorSystemCmd::Restart { address_regex })
456 {
457 error!("Send restart command failed: {}", e);
458 }
459 }
460
461 pub fn unregister(&mut self, address_regex: String) {
469 if let Err(e) =
470 channel::try_send(&self.handler_tx, ActorSystemCmd::Unregister { address_regex })
471 {
472 error!("Send unregister command failed: {}", e);
473 }
474 }
475
476 pub async fn send<T>(
489 &mut self,
490 #[cfg(not(feature = "multi-node"))] address: String,
491 #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
492 msg: <T as Actor>::Message,
493 ) -> Result<(), ActorError>
494 where
495 T: Actor,
496 <T as Actor>::Message: MaybeCodec,
497 {
498 #[cfg(feature = "multi-node")]
499 if address.node != self.node_name {
500 let rt = self
501 .inter_node
502 .as_ref()
503 .ok_or(ActorError::InterNodeNotConfigured)?
504 .clone();
505 let payload = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
506 return rt
507 .fire(&address, std::any::type_name::<T>(), payload)
508 .await;
509 }
510 #[cfg(feature = "multi-node")]
511 let address: String = address.name;
512
513 let mut retry_count = 0;
514 let actor_type = std::any::type_name::<T>();
515 let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
516 loop {
517 let (tx, rx) = tokio::sync::oneshot::channel();
518 match self.cache.entry(address.clone()) {
519 std::collections::hash_map::Entry::Occupied(o) => {
520 let (cached_actor_type, mailbox) = o.get().clone();
521 if actor_type == cached_actor_type {
522 match mailbox.send(payload.clone()).await {
523 Ok(()) => {
524 debug!(
525 "Send message to actor {} through cached_tx succeeded",
526 address
527 );
528 return Ok(());
529 }
530 Err(e) => {
531 warn!(
532 "Send message to actor {} through cached_tx failed: {:?} ... removing from cache",
533 address, e
534 );
535 self.cache.remove(&address);
536 }
537 }
538 } else {
539 warn!(
540 "Send message with cached tx failed: cached tx of address {} and target actor {} is mismatched ... removing from cache",
541 address, actor_type,
542 );
543 self.cache.remove(&address);
544 }
545 }
546 _ => {}
547 }
548 let _ = self
549 .handler_tx
550 .send_async(ActorSystemCmd::FindActor {
551 actor_type: actor_type.to_string(),
552 address: address.clone(),
553 result_tx: tx,
554 })
555 .await;
556 if let Ok(Some((tx, ready))) = rx.await {
557 if ready {
558 debug!("Saving actor {} tx to cache", address);
559 self.cache
560 .insert(address.clone(), (actor_type.to_string(), tx.clone()));
561 let _ = tx.send(payload.clone()).await?;
562 return Ok(());
563 } else {
564 retry_count += 1;
565 debug!(
566 "Actor {} not ready, retrying... ({}/10)",
567 address, retry_count
568 );
569 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
570 if retry_count < 10 {
571 continue;
572 } else {
573 error!("Actor {} not ready after 10 retries, giving up", address);
574 return Err(ActorError::ActorNotReady(address));
575 }
576 }
577 } else {
578 return Err(ActorError::AddressNotFound(address));
579 }
580 }
581 }
582
583 pub async fn send_without_tx_cache<T>(
593 &self,
594 #[cfg(not(feature = "multi-node"))] address: String,
595 #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
596 msg: <T as Actor>::Message,
597 ) -> Result<(), ActorError>
598 where
599 T: Actor,
600 <T as Actor>::Message: MaybeCodec,
601 {
602 #[cfg(feature = "multi-node")]
603 if address.node != self.node_name {
604 let rt = self
605 .inter_node
606 .as_ref()
607 .ok_or(ActorError::InterNodeNotConfigured)?
608 .clone();
609 let payload = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
610 return rt
611 .fire(&address, std::any::type_name::<T>(), payload)
612 .await;
613 }
614 #[cfg(feature = "multi-node")]
615 let address: String = address.name;
616
617 let mut retry_count = 0;
618 let actor_type = std::any::type_name::<T>();
619 let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
620 loop {
621 let (tx, rx) = tokio::sync::oneshot::channel();
622 let _ = self
623 .handler_tx
624 .send_async(ActorSystemCmd::FindActor {
625 actor_type: actor_type.to_string(),
626 address: address.clone(),
627 result_tx: tx,
628 })
629 .await;
630 if let Ok(Some((tx, ready))) = rx.await {
631 if ready {
632 let _ = tx.send(payload.clone()).await?;
633 return Ok(());
634 } else {
635 retry_count += 1;
636 debug!(
637 "Actor {} not ready, retrying... ({}/10)",
638 address, retry_count
639 );
640 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
641 if retry_count < 10 {
642 continue;
643 } else {
644 error!("Actor {} not ready after 10 retries, giving up", address);
645 return Err(ActorError::ActorNotReady(address));
646 }
647 }
648 } else {
649 return Err(ActorError::AddressNotFound(address));
650 }
651 }
652 }
653
654 async fn broadcast_local_cached<T>(
656 &mut self,
657 address_regex: String,
658 msg: <T as Actor>::Message,
659 ) -> Vec<Result<(), ActorError>>
660 where
661 T: Actor,
662 {
663 let actor_type = std::any::type_name::<T>();
664 let (tx, rx) = tokio::sync::oneshot::channel();
665 let _ = self
666 .handler_tx
667 .send_async(ActorSystemCmd::FilterAddress {
668 address_regex,
669 result_tx: tx,
670 })
671 .await;
672 let addresses = match rx.await {
673 Ok(addresses) => addresses,
674 Err(e) => {
675 error!("Receive address list failed: {:?}", e);
676 return vec![Err(ActorError::from(e))];
677 }
678 };
679 let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
680 let mut result = Vec::new();
681 for address in addresses.iter() {
682 match self.cache.entry(address.clone()) {
683 std::collections::hash_map::Entry::Occupied(o) => {
684 let (cached_actor_type, tx) = o.get().clone();
685 if cached_actor_type == actor_type {
686 match tx.send(payload.clone()).await {
687 Ok(()) => {
688 result.push(Ok(()));
689 continue;
690 }
691 Err(e) => {
692 warn!(
693 "Send message to actor {} through cached_tx failed: {:?} ... removing from cache",
694 address, e
695 );
696 self.cache.remove(address);
697 }
698 }
699 } else {
700 warn!(
701 "Send message with cached tx failed: cached tx of address {} and target actor {} is mismatched ... removing from cache",
702 address, actor_type,
703 );
704 self.cache.remove(address);
705 }
706 }
707 _ => {}
708 }
709 let mut retry_count = 0;
710 loop {
711 let (tx, rx) = tokio::sync::oneshot::channel();
712 let _ = self
713 .handler_tx
714 .send_async(ActorSystemCmd::FindActor {
715 actor_type: actor_type.to_string(),
716 address: address.clone(),
717 result_tx: tx,
718 })
719 .await;
720 if let Ok(Some((tx, ready))) = rx.await {
721 if ready {
722 self.cache
723 .insert(address.clone(), (actor_type.to_string(), tx.clone()));
724 result.push(tx.send(payload.clone()).await);
725 break;
726 } else {
727 retry_count += 1;
728 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
729 if retry_count < 10 {
730 continue;
731 } else {
732 error!("Actor {} not ready after 10 retries, giving up", address);
733 result.push(Err(ActorError::ActorNotReady(address.clone())));
734 break;
735 }
736 }
737 } else {
738 result.push(Err(ActorError::AddressNotFound(address.clone())));
739 break;
740 }
741 }
742 }
743 result
744 }
745
746 #[cfg(not(feature = "multi-node"))]
749 pub async fn send_broadcast<T>(
750 &mut self,
751 address_regex: String,
752 msg: <T as Actor>::Message,
753 ) -> Vec<Result<(), ActorError>>
754 where
755 T: Actor,
756 <T as Actor>::Message: MaybeCodec,
757 {
758 self.broadcast_local_cached::<T>(address_regex, msg).await
759 }
760
761 #[cfg(feature = "multi-node")]
781 pub async fn send_broadcast<T>(
782 &mut self,
783 address_regex: String,
784 filter: crate::inter_node::NodeFilter,
785 msg: <T as Actor>::Message,
786 ) -> crate::inter_node::BroadcastResult
787 where
788 T: Actor,
789 <T as Actor>::Message: MaybeCodec,
790 {
791 use crate::inter_node::{BroadcastResult, NodeFilter};
792 use std::collections::HashSet;
793
794 let (run_local, remote_nodes): (bool, Vec<String>) = {
795 let raw: Vec<String> = match filter {
796 NodeFilter::SelfOnly => vec![self.node_name.clone()],
797 NodeFilter::Node(n) => vec![n],
798 NodeFilter::Peers(ns) => ns,
799 };
800 let mut seen = HashSet::new();
801 let mut local = false;
802 let mut remote = Vec::new();
803 for node in raw {
804 if !seen.insert(node.clone()) {
805 continue;
806 }
807 if node == self.node_name {
808 local = true;
809 } else {
810 remote.push(node);
811 }
812 }
813 (local, remote)
814 };
815
816 let remote: Vec<Result<(), ActorError>> = if !remote_nodes.is_empty() {
817 let rt = match self.inter_node.as_ref() {
818 Some(rt) => rt.clone(),
819 None => {
820 return BroadcastResult {
821 local: Vec::new(),
822 remote: vec![Err(ActorError::InterNodeNotConfigured)],
823 };
824 }
825 };
826 let bytes = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
827 let actor_type = std::any::type_name::<T>();
828 let mut out = Vec::with_capacity(remote_nodes.len());
829 for node in &remote_nodes {
830 out.push(
831 rt.broadcast_fire(node, actor_type, &address_regex, bytes.clone())
832 .await,
833 );
834 }
835 out
836 } else {
837 Vec::new()
838 };
839
840 let local = if run_local {
841 self.broadcast_local_cached::<T>(address_regex, msg).await
842 } else {
843 Vec::new()
844 };
845
846 BroadcastResult { local, remote }
847 }
848
849 async fn broadcast_local_uncached<T>(
851 &self,
852 address_regex: String,
853 msg: <T as Actor>::Message,
854 ) -> Vec<Result<(), ActorError>>
855 where
856 T: Actor,
857 {
858 let actor_type = std::any::type_name::<T>();
859 let (tx, rx) = tokio::sync::oneshot::channel();
860 let _ = self
861 .handler_tx
862 .send_async(ActorSystemCmd::FilterAddress {
863 address_regex,
864 result_tx: tx,
865 })
866 .await;
867 let addresses = match rx.await {
868 Ok(addresses) => addresses,
869 Err(e) => {
870 error!("Receive address list failed: {:?}", e);
871 return vec![Err(ActorError::from(e))];
872 }
873 };
874 let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
875 let mut result = Vec::new();
876 for address in addresses.iter() {
877 let mut retry_count = 0;
878 loop {
879 let (tx, rx) = tokio::sync::oneshot::channel();
880 let _ = self
881 .handler_tx
882 .send_async(ActorSystemCmd::FindActor {
883 actor_type: actor_type.to_string(),
884 address: address.clone(),
885 result_tx: tx,
886 })
887 .await;
888 if let Ok(Some((tx, ready))) = rx.await {
889 if ready {
890 result.push(tx.send(payload.clone()).await);
891 break;
892 } else {
893 retry_count += 1;
894 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
895 if retry_count < 10 {
896 continue;
897 } else {
898 error!("Actor {} not ready after 10 retries, giving up", address);
899 result.push(Err(ActorError::ActorNotReady(address.clone())));
900 break;
901 }
902 }
903 } else {
904 result.push(Err(ActorError::AddressNotFound(address.clone())));
905 break;
906 }
907 }
908 }
909 result
910 }
911
912 #[cfg(not(feature = "multi-node"))]
914 pub async fn send_broadcast_without_tx_cache<T>(
915 &self,
916 address_regex: String,
917 msg: <T as Actor>::Message,
918 ) -> Vec<Result<(), ActorError>>
919 where
920 T: Actor,
921 <T as Actor>::Message: MaybeCodec,
922 {
923 self.broadcast_local_uncached::<T>(address_regex, msg).await
924 }
925
926 #[cfg(feature = "multi-node")]
934 pub async fn send_broadcast_without_tx_cache<T>(
935 &self,
936 address_regex: String,
937 filter: crate::inter_node::NodeFilter,
938 msg: <T as Actor>::Message,
939 ) -> crate::inter_node::BroadcastResult
940 where
941 T: Actor,
942 <T as Actor>::Message: MaybeCodec,
943 {
944 use crate::inter_node::{BroadcastResult, NodeFilter};
945 use std::collections::HashSet;
946
947 let (run_local, remote_nodes): (bool, Vec<String>) = {
948 let raw: Vec<String> = match filter {
949 NodeFilter::SelfOnly => vec![self.node_name.clone()],
950 NodeFilter::Node(n) => vec![n],
951 NodeFilter::Peers(ns) => ns,
952 };
953 let mut seen = HashSet::new();
954 let mut local = false;
955 let mut remote = Vec::new();
956 for node in raw {
957 if !seen.insert(node.clone()) {
958 continue;
959 }
960 if node == self.node_name {
961 local = true;
962 } else {
963 remote.push(node);
964 }
965 }
966 (local, remote)
967 };
968
969 let remote: Vec<Result<(), ActorError>> = if !remote_nodes.is_empty() {
970 let rt = match self.inter_node.as_ref() {
971 Some(rt) => rt.clone(),
972 None => {
973 return BroadcastResult {
974 local: Vec::new(),
975 remote: vec![Err(ActorError::InterNodeNotConfigured)],
976 };
977 }
978 };
979 let bytes = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
980 let actor_type = std::any::type_name::<T>();
981 let mut out = Vec::with_capacity(remote_nodes.len());
982 for node in &remote_nodes {
983 out.push(
984 rt.broadcast_fire(node, actor_type, &address_regex, bytes.clone())
985 .await,
986 );
987 }
988 out
989 } else {
990 Vec::new()
991 };
992
993 let local = if run_local {
994 self.broadcast_local_uncached::<T>(address_regex, msg).await
995 } else {
996 Vec::new()
997 };
998
999 BroadcastResult { local, remote }
1000 }
1001
1002 pub async fn send_and_recv<T>(
1022 &mut self,
1023 #[cfg(not(feature = "multi-node"))] address: String,
1024 #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1025 msg: <T as Actor>::Message,
1026 ) -> Result<<T as Actor>::Result, ActorError>
1027 where
1028 T: Actor,
1029 <T as Actor>::Message: MaybeCodec,
1030 <T as Actor>::Result: MaybeCodec,
1031 {
1032 #[cfg(feature = "multi-node")]
1033 if address.node != self.node_name {
1034 let rt = self
1035 .inter_node
1036 .as_ref()
1037 .ok_or(ActorError::InterNodeNotConfigured)?
1038 .clone();
1039 let payload = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
1040 let bytes = rt
1041 .call(&address, std::any::type_name::<T>(), payload)
1042 .await?;
1043 let result = <<T as Actor>::Result as xancode::Codec>::decode(
1044 &xancode::Bytes::copy_from_slice(&bytes),
1045 )
1046 .map_err(|_| {
1047 ActorError::InterNodeDecode(format!(
1048 "decode failed for {}",
1049 std::any::type_name::<<T as Actor>::Result>()
1050 ))
1051 })?;
1052 return Ok(result);
1053 }
1054 #[cfg(feature = "multi-node")]
1055 let address: String = address.name;
1056
1057 let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
1058 let actor_type = std::any::type_name::<T>();
1059 let mut retry_count = 0;
1060 loop {
1061 match self.cache.entry(address.clone()) {
1062 std::collections::hash_map::Entry::Occupied(o) => {
1063 let (cached_actor_type, tx) = o.get().clone();
1064 if cached_actor_type == actor_type {
1065 match tx.send_and_recv(payload.clone()).await {
1066 Ok(result_any) => {
1067 debug!(
1068 "Send message to actor {} through cached_tx succeeded",
1069 address
1070 );
1071 let result = result_any
1072 .downcast::<T::Result>()
1073 .map_err(|_| ActorError::MessageTypeMismatch)?;
1074 return Ok(*result);
1075 }
1076 Err(e) => {
1077 warn!(
1078 "Send message to actor {} through cached_tx failed: {:?} ... removing from cache",
1079 address, e
1080 );
1081 self.cache.remove(&address);
1082 }
1083 }
1084 } else {
1085 warn!(
1086 "Send message with cached tx failed: cached tx of address {} and target actor {} is mismatched ... removing from cache",
1087 address, actor_type,
1088 );
1089 self.cache.remove(&address);
1090 }
1091 }
1092 _ => {}
1093 }
1094 let (tx, rx) = tokio::sync::oneshot::channel();
1095 let _ = self
1096 .handler_tx
1097 .send_async(ActorSystemCmd::FindActor {
1098 actor_type: actor_type.to_string(),
1099 address: address.clone(),
1100 result_tx: tx,
1101 })
1102 .await;
1103 if let Ok(Some((tx, ready))) = rx.await {
1104 if ready {
1105 debug!("Saving actor {} tx to cache", address);
1106 self.cache
1107 .insert(address.clone(), (actor_type.to_string(), tx.clone()));
1108 let result_any = tx.send_and_recv(payload.clone()).await?;
1109 let result = result_any
1110 .downcast::<T::Result>()
1111 .map_err(|_| ActorError::MessageTypeMismatch)?;
1112 return Ok(*result);
1113 } else {
1114 retry_count += 1;
1115 debug!(
1116 "Actor {} not ready, retrying... ({}/10)",
1117 address, retry_count
1118 );
1119 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1120 if retry_count < 10 {
1121 continue;
1122 } else {
1123 error!("Actor {} not ready after 10 retries, giving up", address);
1124 return Err(ActorError::ActorNotReady(address));
1125 }
1126 }
1127 } else {
1128 return Err(ActorError::AddressNotFound(address));
1129 }
1130 }
1131 }
1132
1133 pub async fn send_and_recv_without_tx_cache<T>(
1138 &self,
1139 #[cfg(not(feature = "multi-node"))] address: String,
1140 #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1141 msg: <T as Actor>::Message,
1142 ) -> Result<<T as Actor>::Result, ActorError>
1143 where
1144 T: Actor,
1145 <T as Actor>::Message: MaybeCodec,
1146 <T as Actor>::Result: MaybeCodec,
1147 {
1148 #[cfg(feature = "multi-node")]
1149 if address.node != self.node_name {
1150 let rt = self
1151 .inter_node
1152 .as_ref()
1153 .ok_or(ActorError::InterNodeNotConfigured)?
1154 .clone();
1155 let payload = <<T as Actor>::Message as xancode::Codec>::encode(&msg).to_vec();
1156 let bytes = rt
1157 .call(&address, std::any::type_name::<T>(), payload)
1158 .await?;
1159 let result = <<T as Actor>::Result as xancode::Codec>::decode(
1160 &xancode::Bytes::copy_from_slice(&bytes),
1161 )
1162 .map_err(|_| {
1163 ActorError::InterNodeDecode(format!(
1164 "decode failed for {}",
1165 std::any::type_name::<<T as Actor>::Result>()
1166 ))
1167 })?;
1168 return Ok(result);
1169 }
1170 #[cfg(feature = "multi-node")]
1171 let address: String = address.name;
1172
1173 let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
1174 let actor_type = std::any::type_name::<T>();
1175 let mut retry_count = 0;
1176 loop {
1177 let (tx, rx) = tokio::sync::oneshot::channel();
1178 let _ = self
1179 .handler_tx
1180 .send_async(ActorSystemCmd::FindActor {
1181 actor_type: actor_type.to_string(),
1182 address: address.clone(),
1183 result_tx: tx,
1184 })
1185 .await;
1186 if let Ok(Some((tx, ready))) = rx.await {
1187 if ready {
1188 let result_any = tx.send_and_recv(payload.clone()).await?;
1189 let result = result_any
1190 .downcast::<T::Result>()
1191 .map_err(|_| ActorError::MessageTypeMismatch)?;
1192 return Ok(*result);
1193 } else {
1194 retry_count += 1;
1195 debug!(
1196 "Actor {} not ready, retrying... ({}/10)",
1197 address, retry_count
1198 );
1199 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1200 if retry_count < 10 {
1201 continue;
1202 } else {
1203 error!("Actor {} not ready after 10 retries, giving up", address);
1204 return Err(ActorError::ActorNotReady(address));
1205 }
1206 }
1207 } else {
1208 return Err(ActorError::AddressNotFound(address));
1209 }
1210 }
1211 }
1212
1213 pub async fn send_and_recv_with_timeout<T>(
1232 &mut self,
1233 #[cfg(not(feature = "multi-node"))] address: String,
1234 #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1235 msg: <T as Actor>::Message,
1236 timeout: std::time::Duration,
1237 ) -> Result<<T as Actor>::Result, ActorError>
1238 where
1239 T: Actor,
1240 <T as Actor>::Message: MaybeCodec,
1241 <T as Actor>::Result: MaybeCodec,
1242 {
1243 match tokio::time::timeout(timeout, self.send_and_recv::<T>(address, msg)).await {
1244 Ok(result) => result,
1245 Err(_) => Err(ActorError::Timeout(timeout)),
1246 }
1247 }
1248
1249 pub async fn send_and_recv_without_tx_cache_with_timeout<T>(
1254 &self,
1255 #[cfg(not(feature = "multi-node"))] address: String,
1256 #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1257 msg: <T as Actor>::Message,
1258 timeout: std::time::Duration,
1259 ) -> Result<<T as Actor>::Result, ActorError>
1260 where
1261 T: Actor,
1262 <T as Actor>::Message: MaybeCodec,
1263 <T as Actor>::Result: MaybeCodec,
1264 {
1265 match tokio::time::timeout(
1266 timeout,
1267 self.send_and_recv_without_tx_cache::<T>(address, msg),
1268 )
1269 .await
1270 {
1271 Ok(result) => result,
1272 Err(_) => Err(ActorError::Timeout(timeout)),
1273 }
1274 }
1275
1276 pub async fn run_job<T>(
1302 &mut self,
1303 #[cfg(not(feature = "multi-node"))] address: String,
1304 #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1305 subscribe: bool,
1306 job: JobSpec,
1307 msg: <T as Actor>::Message,
1308 job_id: Option<String>,
1309 ) -> Result<RunJobResult<T>, ActorError>
1310 where
1311 T: Actor,
1312 <T as Actor>::Message: MaybeCodec,
1313 <T as Actor>::Result: MaybeCodec,
1314 {
1315 let job_id = job_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1316
1317 #[cfg(feature = "multi-node")]
1318 if address.node != self.node_name {
1319 let rt = self
1320 .inter_node
1321 .as_ref()
1322 .ok_or(ActorError::InterNodeNotConfigured)?
1323 .clone();
1324 return self
1325 .spawn_remote_job::<T>(address, subscribe, job, msg, job_id, rt)
1326 .await;
1327 }
1328 #[cfg(feature = "multi-node")]
1329 let address: String = address.name;
1330
1331 let mut retry_count = 0;
1332 let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
1333 let actor_type = std::any::type_name::<T>();
1334 let mailbox = loop {
1335 let (tx, rx) = tokio::sync::oneshot::channel();
1336 let _ = self
1337 .handler_tx
1338 .send_async(ActorSystemCmd::FindActor {
1339 actor_type: actor_type.to_string(),
1340 address: address.clone(),
1341 result_tx: tx,
1342 })
1343 .await;
1344 if let Ok(Some((mailbox, ready))) = rx.await {
1345 if ready {
1346 debug!("Saving actor {} tx to cache", address);
1347 if let None = self.cache.get(&address) {
1348 self.cache
1349 .insert(address.clone(), (actor_type.to_string(), mailbox.clone()));
1350 }
1351 break mailbox;
1352 } else {
1353 retry_count += 1;
1354 debug!(
1355 "Actor {} not ready, retrying... ({}/10)",
1356 address, retry_count
1357 );
1358 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1359 if retry_count < 10 {
1360 continue;
1361 } else {
1362 error!("Actor {} not ready after 10 retries, giving up", address);
1363 return Err(ActorError::ActorNotReady(address));
1364 }
1365 }
1366 } else {
1367 return Err(ActorError::AddressNotFound(address.clone()));
1368 }
1369 };
1370 if subscribe {
1371 let (sub_tx, sub_rx) = channel::channel(self.channel_size);
1372 let (abort_tx, mut abort_rx) = channel::channel(self.channel_size);
1373 let (stop_tx, mut stop_rx) = channel::channel(self.channel_size);
1374 let (resume_tx, mut resume_rx) = channel::channel(self.channel_size);
1375 let payload = payload.clone();
1376 let _ = tokio::spawn(async move {
1377 let mut i = 0usize;
1378 loop {
1379 let until_start = job
1380 .start_at()
1381 .duration_since(std::time::SystemTime::now())
1382 .unwrap_or(std::time::Duration::ZERO);
1383 if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
1384 .await
1385 {
1386 drop(sub_tx);
1387 return;
1388 }
1389 i += 1;
1390 let result = match mailbox.send_and_recv(payload.clone()).await {
1391 Ok(result_any) => result_any
1392 .downcast::<T::Result>()
1393 .map(|x| Ok(*x))
1394 .unwrap_or_else(|_| Err(ActorError::MessageTypeMismatch)),
1395 Err(e) => Err(e),
1396 };
1397 let _ = sub_tx.send_async(result).await;
1398 if let Some(max_iter) = job.max_iter() {
1399 if i >= max_iter {
1400 drop(sub_tx);
1401 return;
1402 }
1403 }
1404 let interval = match job.interval() {
1405 Some(d) => d,
1406 None => {
1407 drop(sub_tx);
1408 return;
1409 }
1410 };
1411 if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
1412 {
1413 drop(sub_tx);
1414 return;
1415 }
1416 }
1417 });
1418 let _ = self
1419 .handler_tx
1420 .send_async(ActorSystemCmd::RegisterJob {
1421 job_id: job_id.clone(),
1422 controller: JobController {
1423 abort_tx,
1424 stop_tx,
1425 resume_tx,
1426 },
1427 })
1428 .await;
1429 return Ok(RunJobResult {
1430 job_id,
1431 result_subscriber_rx: Some(sub_rx),
1432 });
1433 } else {
1434 let (abort_tx, mut abort_rx) = channel::channel(self.channel_size);
1435 let (stop_tx, mut stop_rx) = channel::channel(self.channel_size);
1436 let (resume_tx, mut resume_rx) = channel::channel(self.channel_size);
1437 let _ = tokio::spawn(async move {
1438 let mut i = 0usize;
1439 loop {
1440 let until_start = job
1441 .start_at()
1442 .duration_since(std::time::SystemTime::now())
1443 .unwrap_or(std::time::Duration::ZERO);
1444 if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
1445 .await
1446 {
1447 return;
1448 }
1449 i += 1;
1450 let _ = mailbox.send(payload.clone()).await;
1451 if let Some(max_iter) = job.max_iter() {
1452 if i >= max_iter {
1453 return;
1454 }
1455 }
1456 let interval = match job.interval() {
1457 Some(d) => d,
1458 None => return,
1459 };
1460 if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
1461 {
1462 return;
1463 }
1464 }
1465 });
1466 let _ = self
1467 .handler_tx
1468 .send_async(ActorSystemCmd::RegisterJob {
1469 job_id: job_id.clone(),
1470 controller: JobController {
1471 abort_tx,
1472 stop_tx,
1473 resume_tx,
1474 },
1475 })
1476 .await;
1477 return Ok(RunJobResult {
1478 job_id,
1479 result_subscriber_rx: None,
1480 });
1481 }
1482 }
1483
1484 pub async fn run_job_without_tx_cache<T>(
1490 &self,
1491 #[cfg(not(feature = "multi-node"))] address: String,
1492 #[cfg(feature = "multi-node")] address: crate::inter_node::Address,
1493 subscribe: bool,
1494 job: JobSpec,
1495 msg: <T as Actor>::Message,
1496 job_id: Option<String>,
1497 ) -> Result<RunJobResult<T>, ActorError>
1498 where
1499 T: Actor,
1500 <T as Actor>::Message: MaybeCodec,
1501 <T as Actor>::Result: MaybeCodec,
1502 {
1503 let job_id = job_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1504
1505 #[cfg(feature = "multi-node")]
1506 if address.node != self.node_name {
1507 let rt = self
1508 .inter_node
1509 .as_ref()
1510 .ok_or(ActorError::InterNodeNotConfigured)?
1511 .clone();
1512 return self
1513 .spawn_remote_job::<T>(address, subscribe, job, msg, job_id, rt)
1514 .await;
1515 }
1516 #[cfg(feature = "multi-node")]
1517 let address: String = address.name;
1518
1519 let mut retry_count = 0;
1520 let payload: Arc<dyn std::any::Any + Send + Sync> = Arc::new(msg);
1521 let actor_type = std::any::type_name::<T>();
1522 let mailbox = loop {
1523 let (tx, rx) = tokio::sync::oneshot::channel();
1524 let _ = self
1525 .handler_tx
1526 .send_async(ActorSystemCmd::FindActor {
1527 actor_type: actor_type.to_string(),
1528 address: address.clone(),
1529 result_tx: tx,
1530 })
1531 .await;
1532 if let Ok(Some((mailbox, ready))) = rx.await {
1533 if ready {
1534 break mailbox;
1535 } else {
1536 retry_count += 1;
1537 debug!(
1538 "Actor {} not ready, retrying... ({}/10)",
1539 address, retry_count
1540 );
1541 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1542 if retry_count < 10 {
1543 continue;
1544 } else {
1545 error!("Actor {} not ready after 10 retries, giving up", address);
1546 return Err(ActorError::ActorNotReady(address));
1547 }
1548 }
1549 } else {
1550 return Err(ActorError::AddressNotFound(address.clone()));
1551 }
1552 };
1553 if subscribe {
1554 let (sub_tx, sub_rx) = channel::channel(self.channel_size);
1555 let payload = payload.clone();
1556 let (abort_tx, mut abort_rx) = channel::channel(self.channel_size);
1557 let (stop_tx, mut stop_rx) = channel::channel(self.channel_size);
1558 let (resume_tx, mut resume_rx) = channel::channel(self.channel_size);
1559 let _ = tokio::spawn(async move {
1560 let mut i = 0usize;
1561 loop {
1562 let until_start = job
1563 .start_at()
1564 .duration_since(std::time::SystemTime::now())
1565 .unwrap_or(std::time::Duration::ZERO);
1566 if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
1567 .await
1568 {
1569 drop(sub_tx);
1570 return;
1571 }
1572 i += 1;
1573 let result = match mailbox.send_and_recv(payload.clone()).await {
1574 Ok(result_any) => result_any
1575 .downcast::<T::Result>()
1576 .map(|x| Ok(*x))
1577 .unwrap_or_else(|_| Err(ActorError::MessageTypeMismatch)),
1578 Err(e) => Err(e),
1579 };
1580 let _ = sub_tx.send_async(result).await;
1581 if let Some(max_iter) = job.max_iter() {
1582 if i >= max_iter {
1583 drop(sub_tx);
1584 return;
1585 }
1586 }
1587 let interval = match job.interval() {
1588 Some(d) => d,
1589 None => {
1590 drop(sub_tx);
1591 return;
1592 }
1593 };
1594 if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
1595 {
1596 drop(sub_tx);
1597 return;
1598 }
1599 }
1600 });
1601 let _ = self
1602 .handler_tx
1603 .send_async(ActorSystemCmd::RegisterJob {
1604 job_id: job_id.clone(),
1605 controller: JobController {
1606 abort_tx,
1607 stop_tx,
1608 resume_tx,
1609 },
1610 })
1611 .await;
1612 return Ok(RunJobResult {
1613 job_id,
1614 result_subscriber_rx: Some(sub_rx),
1615 });
1616 } else {
1617 let (abort_tx, mut abort_rx) = channel::channel(self.channel_size);
1618 let (stop_tx, mut stop_rx) = channel::channel(self.channel_size);
1619 let (resume_tx, mut resume_rx) = channel::channel(self.channel_size);
1620 let _ = tokio::spawn(async move {
1621 let mut i = 0usize;
1622 loop {
1623 let until_start = job
1624 .start_at()
1625 .duration_since(std::time::SystemTime::now())
1626 .unwrap_or(std::time::Duration::ZERO);
1627 if !delay_or_abort(until_start, &mut abort_rx, &mut stop_rx, &mut resume_rx)
1628 .await
1629 {
1630 return;
1631 }
1632 i += 1;
1633 let _ = mailbox.send(payload.clone()).await;
1634 if let Some(max_iter) = job.max_iter() {
1635 if i >= max_iter {
1636 return;
1637 }
1638 }
1639 let interval = match job.interval() {
1640 Some(d) => d,
1641 None => return,
1642 };
1643 if !delay_or_abort(interval, &mut abort_rx, &mut stop_rx, &mut resume_rx).await
1644 {
1645 return;
1646 }
1647 }
1648 });
1649 let _ = self
1650 .handler_tx
1651 .send_async(ActorSystemCmd::RegisterJob {
1652 job_id: job_id.clone(),
1653 controller: JobController {
1654 abort_tx,
1655 stop_tx,
1656 resume_tx,
1657 },
1658 })
1659 .await;
1660 return Ok(RunJobResult {
1661 job_id,
1662 result_subscriber_rx: None,
1663 });
1664 }
1665 }
1666
1667 pub async fn abort_job(&self, job_id: String) {
1676 info!("Aborting job {}", job_id);
1677 let (tx, rx) = tokio::sync::oneshot::channel();
1678 let _ = self
1679 .handler_tx
1680 .send_async(ActorSystemCmd::FindJob {
1681 job_id: job_id.clone(),
1682 result_tx: tx,
1683 })
1684 .await;
1685 match rx.await {
1686 Ok(Some(controller)) => {
1687 let _ = controller.abort_tx.send_async(()).await;
1688 }
1689 Ok(None) => {
1690 error!("Job {} not found", job_id);
1691 }
1692 Err(e) => {
1693 error!("Find job {} failed: {}", job_id, e);
1694 }
1695 }
1696 }
1697
1698 pub async fn stop_job(&self, job_id: String) {
1705 info!("Stopping job {}", job_id);
1706 let (tx, rx) = tokio::sync::oneshot::channel();
1707 let _ = self
1708 .handler_tx
1709 .send_async(ActorSystemCmd::FindJob {
1710 job_id: job_id.clone(),
1711 result_tx: tx,
1712 })
1713 .await;
1714 match rx.await {
1715 Ok(Some(controller)) => {
1716 let _ = controller.stop_tx.send_async(()).await;
1717 }
1718 Ok(None) => {
1719 error!("Job {} not found", job_id);
1720 }
1721 Err(e) => {
1722 error!("Find job {} failed: {}", job_id, e);
1723 }
1724 }
1725 }
1726
1727 pub async fn resume_job(&self, job_id: String) {
1733 info!("Resuming job {}", job_id);
1734 let (tx, rx) = tokio::sync::oneshot::channel();
1735 let _ = self
1736 .handler_tx
1737 .send_async(ActorSystemCmd::FindJob {
1738 job_id: job_id.clone(),
1739 result_tx: tx,
1740 })
1741 .await;
1742 match rx.await {
1743 Ok(Some(controller)) => {
1744 let _ = controller.resume_tx.send_async(()).await;
1745 }
1746 Ok(None) => {
1747 error!("Job {} not found", job_id);
1748 }
1749 Err(e) => {
1750 error!("Find job {} failed: {}", job_id, e);
1751 }
1752 }
1753 }
1754
1755 fn run(
1756 &mut self,
1757 handler_rx: channel::Receiver<ActorSystemCmd>,
1758 ) -> tokio::task::JoinHandle<()> {
1759 #[cfg(feature = "multi-node")]
1760 let inter_node = self.inter_node.clone();
1761 let handle = tokio::task::spawn_blocking(move || {
1762 tokio::runtime::Handle::current().block_on(actor_system_loop(
1763 handler_rx,
1764 #[cfg(feature = "multi-node")]
1765 inter_node,
1766 ))
1767 });
1768 handle
1769 }
1770}
1771
1772async fn actor_system_loop(
1774 mut handler_rx: channel::Receiver<ActorSystemCmd>,
1775 #[cfg(feature = "multi-node")] inter_node: Option<InterNodeRuntime>,
1776) {
1777 let mut address_list = HashSet::<String>::new();
1778 let mut actor_map = HashMap::<
1779 String,
1780 (
1781 String,
1782 Arc<dyn Mailbox>,
1783 channel::Sender<()>,
1784 channel::Sender<()>,
1785 LifeCycle,
1786 ),
1787 >::new();
1788 let mut job_controllers = HashMap::new();
1789 while let Some(msg) = handler_rx.recv().await {
1790 match msg {
1791 ActorSystemCmd::Register {
1792 actor_type,
1793 address,
1794 mailbox,
1795 restart_tx,
1796 kill_tx,
1797 life_cycle,
1798 result_tx,
1799 is_restarted,
1800 } => {
1801 #[cfg(not(feature = "multi-node"))]
1802 let name: String = address;
1803 #[cfg(feature = "multi-node")]
1804 let name: String = {
1805 if let Some(rt) = inter_node.as_ref() {
1806 if address.node != rt.node_name() {
1807 let _ = result_tx
1808 .send(Err(ActorError::AddressNotOwned(address.to_string())));
1809 continue;
1810 }
1811 }
1812 address.name
1813 };
1814 debug!(
1815 "Register actor with address {} with type {}",
1816 name, actor_type
1817 );
1818 if actor_map.contains_key(&name) && !is_restarted {
1819 let _ = result_tx.send(Err(ActorError::AddressAlreadyExist(name)));
1820 continue;
1821 }
1822 actor_map.insert(
1823 name.clone(),
1824 (actor_type, mailbox, restart_tx, kill_tx, life_cycle),
1825 );
1826 address_list.insert(name);
1827 let _ = result_tx.send(Ok(()));
1828 }
1829 ActorSystemCmd::Restart { address_regex } => {
1830 debug!("Restart actor with address {}", address_regex);
1831 let addresses = match filter_address(&address_list, &address_regex) {
1832 Ok(addresses) => addresses,
1833 Err(e) => {
1834 error!("Filter address failed: {:?}", e);
1835 continue;
1836 }
1837 };
1838 for address in addresses {
1839 if let Some((_actor_type, _tx, restart_tx, _kill_tx, life_cycle)) =
1840 actor_map.get_mut(&address)
1841 {
1842 *life_cycle = LifeCycle::Restarting;
1843 let _ = restart_tx.send_async(()).await;
1844 }
1845 }
1846 }
1847 ActorSystemCmd::Unregister { address_regex } => {
1848 debug!("Unregister actor with address {}", address_regex);
1849 let addresses = match filter_address(&address_list, &address_regex) {
1850 Ok(addresses) => addresses,
1851 Err(e) => {
1852 error!("Filter address failed: {:?}", e);
1853 continue;
1854 }
1855 };
1856 for address in addresses {
1857 match actor_map.entry(address.to_string()) {
1858 std::collections::hash_map::Entry::Occupied(mut entry) => {
1859 let _ = entry.get_mut().3.send_async(()).await;
1860 entry.remove_entry();
1861 address_list.remove(&address);
1862 }
1863 std::collections::hash_map::Entry::Vacant(_) => {
1864 continue;
1865 }
1866 }
1867 }
1868 }
1869 ActorSystemCmd::FilterAddress {
1870 address_regex,
1871 result_tx,
1872 } => {
1873 debug!("FilterAddress with regex {}", address_regex);
1874 let addresses = match filter_address(&address_list, &address_regex) {
1875 Ok(addresses) => addresses,
1876 Err(e) => {
1877 error!("Filter address failed: {:?}", e);
1878 continue;
1879 }
1880 };
1881 let _ = result_tx.send(addresses);
1882 }
1883 ActorSystemCmd::FindActor {
1884 actor_type,
1885 address,
1886 result_tx,
1887 } => {
1888 debug!(
1889 "FindActor with address {} with type {}",
1890 address, actor_type
1891 );
1892 if let Some((target_actor_type, tx, _restart_tx, _kill_tx, life_cycle)) =
1893 actor_map.get(&address)
1894 {
1895 match life_cycle {
1896 LifeCycle::Receiving => {
1897 if *target_actor_type == actor_type {
1898 let _ = result_tx.send(Some((tx.clone(), true)));
1899 } else {
1900 let _ = result_tx.send(None);
1901 }
1902 }
1903 _ => {
1904 let _ = result_tx.send(Some((tx.clone(), false)));
1905 }
1906 }
1907 } else {
1908 let _ = result_tx.send(None);
1909 }
1910 }
1911 ActorSystemCmd::SetLifeCycle {
1912 address,
1913 life_cycle,
1914 } => {
1915 debug!(
1916 "SetLifecycle with address {} into {:?}",
1917 address, life_cycle
1918 );
1919 if let Some(actor) = actor_map.get_mut(&address) {
1920 actor.4 = life_cycle;
1921 };
1922 }
1923 ActorSystemCmd::RegisterJob { job_id, controller } => {
1924 debug!("RegisterJob with id {}", job_id);
1925 let _ = job_controllers.insert(job_id, controller);
1926 }
1927 ActorSystemCmd::FindJob { job_id, result_tx } => {
1928 debug!("FindJob with id {}", job_id);
1929 let _ = result_tx.send(job_controllers.get(&job_id).cloned());
1930 }
1931 };
1932 }
1933}
1934async fn delay_or_abort(
1951 duration: std::time::Duration,
1952 abort_rx: &mut channel::Receiver<()>,
1953 stop_rx: &mut channel::Receiver<()>,
1954 resume_rx: &mut channel::Receiver<()>,
1955) -> bool {
1956 if duration.is_zero() {
1957 return true;
1958 }
1959 tokio::select! {
1960 _ = tokio::time::sleep(duration) => true,
1961 _ = abort_rx.recv() => false,
1962 _ = stop_rx.recv() => {
1963 tokio::select! {
1964 _ = resume_rx.recv() => true,
1965 _ = abort_rx.recv() => false,
1966 }
1967 }
1968 }
1969}
1970fn filter_address(
1974 address_list: &HashSet<String>,
1975 regex: &str,
1976) -> Result<Vec<String>, regex::Error> {
1977 let regex = regex::Regex::new(&format!("^{}$", regex.replace("*", "(\\S+)"))).map_err(|e| {
1978 error!("Regex error: {:?}", e);
1979 e
1980 })?;
1981 Ok(address_list
1982 .iter()
1983 .filter(|x| regex.is_match(x))
1984 .map(|x| x.to_string())
1985 .collect())
1986}
1987