1use std::{marker::PhantomData, sync::Arc};
2
3use futures::{future::poll_fn, pin_mut};
4use once_cell::sync::Lazy;
5use tracing::{error, info, trace};
6
7use crate::{self as elfo};
8use elfo_macros::msg_raw as msg;
9
10use crate::{
11 actor::{Actor, ActorStatus},
12 addr::Addr,
13 address_book::AddressBook,
14 config::AnyConfig,
15 demux::Demux,
16 dumping::{self, Direction, Dump, Dumper, INTERNAL_CLASS},
17 envelope::{Envelope, MessageKind},
18 errors::{RequestError, SendError, TryRecvError, TrySendError},
19 mailbox::RecvResult,
20 message::{Message, Request},
21 messages,
22 request_table::ResponseToken,
23 routers::Singleton,
24 scope,
25 source::{Combined, Source},
26};
27
28use self::{budget::Budget, stats::Stats};
29
30mod budget;
31mod stats;
32
33static DUMPER: Lazy<Dumper> = Lazy::new(|| Dumper::new(INTERNAL_CLASS));
34
35pub struct Context<C = (), K = Singleton, S = ()> {
37 book: AddressBook,
38 addr: Addr,
39 group: Addr,
40 demux: Demux,
41 config: Arc<C>,
42 key: K,
43 source: S,
44 stage: Stage,
45 stats: Stats,
46 budget: Budget,
47}
48
49#[derive(Clone, Copy, PartialEq)]
50enum Stage {
51 PreRecv,
52 Working,
53 Closed,
54}
55
56assert_impl_all!(Context: Send);
57impl<C, K, S> Context<C, K, S> {
60 #[inline]
62 pub fn addr(&self) -> Addr {
63 self.addr
64 }
65
66 #[inline]
68 pub fn group(&self) -> Addr {
69 self.group
70 }
71
72 #[deprecated]
73 #[doc(hidden)]
74 #[cfg(feature = "test-util")]
75 pub fn set_addr(&mut self, addr: Addr) {
76 self.addr = addr;
77 }
78
79 #[inline]
81 pub fn config(&self) -> &C {
82 &self.config
83 }
84
85 #[inline]
87 pub fn key(&self) -> &K {
88 &self.key
89 }
90
91 pub fn with<S1>(self, source: S1) -> Context<C, K, Combined<S, S1>> {
93 Context {
94 book: self.book,
95 addr: self.addr,
96 group: self.group,
97 demux: self.demux,
98 config: self.config,
99 key: self.key,
100 source: Combined::new(self.source, source),
101 stage: Stage::PreRecv,
102 stats: self.stats,
103 budget: self.budget,
104 }
105 }
106
107 pub fn set_status(&self, status: ActorStatus) {
113 let object = ward!(self.book.get_owned(self.addr));
114 let actor = ward!(object.as_actor());
115 actor.set_status(status);
116 }
117
118 pub fn close(&self) -> bool {
123 let object = ward!(self.book.get_owned(self.addr), return false);
124 ward!(object.as_actor(), return false).close()
125 }
126
127 pub async fn send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
145 let kind = MessageKind::Regular { sender: self.addr };
146 self.do_send(message, kind).await
147 }
148
149 pub fn try_send<M: Message>(&self, message: M) -> Result<(), TrySendError<M>> {
170 let kind = MessageKind::Regular { sender: self.addr };
171
172 self.stats.on_sent_message::<M>();
174
175 trace!("> {:?}", message);
176 if let Some(permit) = DUMPER.acquire_m::<M>() {
177 permit.record(Dump::message(message.clone(), &kind, Direction::Out));
178 }
179
180 let envelope = Envelope::new(message, kind).upcast();
181 let addrs = self.demux.filter(&envelope);
182
183 if addrs.is_empty() {
184 return Err(TrySendError::Closed(e2m(envelope)));
185 }
186
187 if addrs.len() == 1 {
188 return match self.book.get(addrs[0]) {
189 Some(object) => object.try_send(envelope).map_err(|err| err.map(e2m)),
190 None => Err(TrySendError::Closed(e2m(envelope))),
191 };
192 }
193
194 let mut unused = None;
195 let mut has_full = false;
196 let mut success = false;
197
198 for addr in addrs {
200 let envelope = unused.take().or_else(|| envelope.duplicate(&self.book));
201 let envelope = ward!(envelope, break);
202
203 match self.book.get(addr) {
204 Some(object) => match object.try_send(envelope) {
205 Ok(()) => success = true,
206 Err(err) => {
207 has_full |= err.is_full();
208 unused = Some(err.into_inner());
209 }
210 },
211 None => unused = Some(envelope),
212 };
213 }
214
215 if success {
216 Ok(())
217 } else if has_full {
218 Err(TrySendError::Full(e2m(envelope)))
219 } else {
220 Err(TrySendError::Closed(e2m(envelope)))
221 }
222 }
223
224 #[inline]
237 pub fn request<R: Request>(&self, request: R) -> RequestBuilder<'_, C, K, S, R, Any> {
238 RequestBuilder::new(self, request)
239 }
240
241 #[inline]
254 pub fn request_to<R: Request>(
255 &self,
256 recipient: Addr,
257 request: R,
258 ) -> RequestBuilder<'_, C, K, S, R, Any> {
259 RequestBuilder::new(self, request).to(recipient)
260 }
261
262 async fn do_send<M: Message>(&self, message: M, kind: MessageKind) -> Result<(), SendError<M>> {
263 self.stats.on_sent_message::<M>();
264
265 trace!("> {:?}", message);
266 if let Some(permit) = DUMPER.acquire_m::<M>() {
267 permit.record(Dump::message(message.clone(), &kind, Direction::Out));
268 }
269
270 let envelope = Envelope::new(message, kind).upcast();
271 let addrs = self.demux.filter(&envelope);
272
273 if addrs.is_empty() {
274 return Err(SendError(e2m(envelope)));
275 }
276
277 if addrs.len() == 1 {
278 return match self.book.get_owned(addrs[0]) {
279 Some(object) => object
280 .send(self, envelope)
281 .await
282 .map_err(|err| SendError(e2m(err.0))),
283 None => Err(SendError(e2m(envelope))),
284 };
285 }
286
287 let mut unused = None;
288 let mut success = false;
289
290 for addr in addrs {
293 let envelope = unused.take().or_else(|| envelope.duplicate(&self.book));
294 let envelope = ward!(envelope, break);
295
296 match self.book.get_owned(addr) {
297 Some(object) => {
298 unused = object.send(self, envelope).await.err().map(|err| err.0);
299 if unused.is_none() {
300 success = true;
301 }
302 }
303 None => unused = Some(envelope),
304 };
305 }
306
307 if success {
308 Ok(())
309 } else {
310 Err(SendError(e2m(envelope)))
311 }
312 }
313
314 pub async fn send_to<M: Message>(
332 &self,
333 recipient: Addr,
334 message: M,
335 ) -> Result<(), SendError<M>> {
336 let kind = MessageKind::Regular { sender: self.addr };
337 self.do_send_to(recipient, message, kind).await
338 }
339
340 async fn do_send_to<M: Message>(
341 &self,
342 recipient: Addr,
343 message: M,
344 kind: MessageKind,
345 ) -> Result<(), SendError<M>> {
346 self.stats.on_sent_message::<M>();
347
348 trace!(to = %recipient, "> {:?}", message);
349 if let Some(permit) = DUMPER.acquire_m::<M>() {
350 permit.record(Dump::message(message.clone(), &kind, Direction::Out));
351 }
352
353 let entry = self.book.get_owned(recipient);
354 let object = ward!(entry, return Err(SendError(message)));
355 let envelope = Envelope::new(message, kind);
356 let fut = object.send(self, envelope.upcast());
357 let result = fut.await;
358 result.map_err(|err| SendError(e2m(err.0)))
359 }
360
361 pub fn try_send_to<M: Message>(
379 &self,
380 recipient: Addr,
381 message: M,
382 ) -> Result<(), TrySendError<M>> {
383 self.stats.on_sent_message::<M>();
384
385 let kind = MessageKind::Regular { sender: self.addr };
386
387 trace!(to = %recipient, "> {:?}", message);
388 if let Some(permit) = DUMPER.acquire_m::<M>() {
389 permit.record(Dump::message(message.clone(), &kind, Direction::Out));
390 }
391
392 let entry = self.book.get(recipient);
393 let object = ward!(entry, return Err(TrySendError::Closed(message)));
394 let envelope = Envelope::new(message, kind);
395
396 object
397 .try_send(envelope.upcast())
398 .map_err(|err| err.map(e2m))
399 }
400
401 pub fn respond<R: Request>(&self, token: ResponseToken<R>, message: R::Response) {
413 if token.is_forgotten() {
414 return;
415 }
416
417 self.stats.on_sent_message::<R::Wrapper>();
418
419 let sender = token.sender;
420 let message = R::Wrapper::from(message);
421 let kind = MessageKind::Response {
422 sender: self.addr(),
423 request_id: token.request_id,
424 };
425
426 trace!(to = %sender, "> {:?}", message);
427 if let Some(permit) = DUMPER.acquire_m::<R>() {
428 permit.record(Dump::message(message.clone(), &kind, Direction::Out));
429 }
430
431 let envelope = Envelope::new(message, kind).upcast();
432 let object = ward!(self.book.get(token.sender));
433 let actor = ward!(object.as_actor());
434 actor
435 .request_table()
436 .respond(token.into_untyped(), envelope);
437 }
438
439 pub async fn recv(&mut self) -> Option<Envelope>
453 where
454 C: 'static,
455 S: Source,
456 {
457 loop {
458 self.stats.on_recv();
459
460 if self.stage == Stage::Closed {
461 on_recv_after_close();
462 }
463
464 self.budget.acquire().await;
466
467 let object = self.book.get_owned(self.addr)?;
469 let actor = object.as_actor()?;
470
471 if self.stage == Stage::PreRecv {
472 on_first_recv(&mut self.stage, actor);
473 }
474
475 let mailbox_fut = actor.recv();
478 pin_mut!(mailbox_fut);
479
480 let source_fut = poll_fn(|cx| self.source.poll_recv(cx));
481 pin_mut!(source_fut);
482
483 tokio::select! {
484 result = mailbox_fut => match result {
485 RecvResult::Data(envelope) => {
486 if let Some(envelope) = self.post_recv(envelope) {
487 return Some(envelope);
488 }
489 },
490 RecvResult::Closed(trace_id) => {
491 scope::set_trace_id(trace_id);
492 on_input_closed(&mut self.stage, actor);
493 return None;
494 }
495 },
496 option = source_fut => {
497 let envelope = option.expect("source cannot return None");
499
500 if let Some(envelope) = self.post_recv(envelope) {
501 return Some(envelope);
502 }
503 },
504 }
505 }
506 }
507
508 pub fn try_recv(&mut self) -> Result<Envelope, TryRecvError>
524 where
525 C: 'static,
526 {
527 loop {
528 self.stats.on_recv();
529
530 if self.stage == Stage::Closed {
531 on_recv_after_close();
532 }
533
534 let object = self.book.get(self.addr).ok_or(TryRecvError::Closed)?;
535 let actor = object.as_actor().ok_or(TryRecvError::Closed)?;
536
537 if self.stage == Stage::PreRecv {
538 on_first_recv(&mut self.stage, actor);
539 }
540
541 match actor.try_recv() {
543 Some(RecvResult::Data(envelope)) => {
544 drop(object);
545
546 if let Some(envelope) = self.post_recv(envelope) {
547 return Ok(envelope);
548 }
549 }
550 Some(RecvResult::Closed(trace_id)) => {
551 scope::set_trace_id(trace_id);
552 on_input_closed(&mut self.stage, actor);
553 return Err(TryRecvError::Closed);
554 }
555 None => {
556 self.stats.on_empty_mailbox();
557 return Err(TryRecvError::Empty);
558 }
559 }
560 }
561 }
562
563 fn post_recv(&mut self, envelope: Envelope) -> Option<Envelope>
564 where
565 C: 'static,
566 {
567 self.budget.decrement();
568
569 scope::set_trace_id(envelope.trace_id());
570
571 let envelope = msg!(match envelope {
572 (messages::UpdateConfig { config }, token) => {
573 self.config = config.get_user::<C>().clone();
574 info!("config updated");
575 let message = messages::ConfigUpdated {};
576 let kind = MessageKind::Regular { sender: self.addr };
577 let envelope = Envelope::new(message, kind).upcast();
578 self.respond(token, Ok(()));
579 envelope
580 }
581 envelope => envelope,
582 });
583
584 let message = envelope.message();
585 trace!("< {:?}", message);
586
587 if message.dumping_allowed() {
588 if let Some(permit) = DUMPER.acquire() {
589 let dump = Dump::builder()
591 .direction(Direction::In)
592 .message_name(message.name())
593 .message_protocol(message.protocol())
594 .message_kind(dumping::MessageKind::from_message_kind(
595 envelope.message_kind(),
596 ))
597 .do_finish(message.erase());
598
599 permit.record(dump);
600 }
601 }
602
603 if envelope.is::<messages::Terminate>() {
606 self.set_status(ActorStatus::TERMINATING);
607 }
608
609 self.stats.on_received_envelope(&envelope);
610
611 msg!(match envelope {
612 (messages::Ping, token) => {
613 self.respond(token, ());
614 None
615 }
616 envelope => Some(envelope),
617 })
618 }
619
620 #[doc(hidden)]
623 pub async fn finished(&self, addr: Addr) {
624 ward!(self.book.get_owned(addr)).finished().await;
625 }
626
627 pub fn unpack_config<'c>(&self, config: &'c AnyConfig) -> &'c C
637 where
638 C: for<'de> serde::Deserialize<'de> + 'static,
639 {
640 config.get_user()
641 }
642
643 pub fn pruned(&self) -> Context {
647 Context {
648 book: self.book.clone(),
649 addr: self.addr,
650 group: self.group,
651 demux: self.demux.clone(),
652 config: Arc::new(()),
653 key: Singleton,
654 source: (),
655 stage: self.stage,
656 stats: Stats::empty(),
657 budget: self.budget.clone(),
658 }
659 }
660
661 pub(crate) fn book(&self) -> &AddressBook {
662 &self.book
663 }
664
665 pub(crate) fn with_config<C1>(self, config: Arc<C1>) -> Context<C1, K, S> {
666 Context {
667 book: self.book,
668 addr: self.addr,
669 group: self.group,
670 demux: self.demux,
671 config,
672 key: self.key,
673 source: self.source,
674 stage: self.stage,
675 stats: self.stats,
676 budget: self.budget,
677 }
678 }
679
680 pub(crate) fn with_addr(mut self, addr: Addr) -> Self {
681 self.addr = addr;
682 self.stats = Stats::startup();
683 self
684 }
685
686 pub(crate) fn with_group(mut self, group: Addr) -> Self {
687 self.group = group;
688 self
689 }
690
691 pub(crate) fn with_key<K1>(self, key: K1) -> Context<C, K1, S> {
692 Context {
693 book: self.book,
694 addr: self.addr,
695 group: self.group,
696 demux: self.demux,
697 config: self.config,
698 key,
699 source: self.source,
700 stage: self.stage,
701 stats: self.stats,
702 budget: self.budget,
703 }
704 }
705}
706
707fn e2m<M: Message>(envelope: Envelope) -> M {
708 envelope.do_downcast::<M>().into_message()
709}
710
711#[cold]
712fn on_first_recv(stage: &mut Stage, actor: &Actor) {
713 if actor.is_initializing() {
714 actor.set_status(ActorStatus::NORMAL);
715 }
716 *stage = Stage::Working;
717}
718
719#[cold]
720fn on_input_closed(stage: &mut Stage, actor: &Actor) {
721 if !actor.is_terminating() {
722 actor.set_status(ActorStatus::TERMINATING);
723 }
724 *stage = Stage::Closed;
725 trace!("input closed");
726}
727
728#[cold]
729fn on_recv_after_close() {
730 error!("calling `recv()` or `try_recv()` after `None` is returned, an infinite loop?");
731 panic!("suicide");
732}
733
734impl Context {
735 pub(crate) fn new(book: AddressBook, demux: Demux) -> Self {
736 Self {
737 book,
738 addr: Addr::NULL,
739 group: Addr::NULL,
740 demux,
741 config: Arc::new(()),
742 key: Singleton,
743 source: (),
744 stage: Stage::PreRecv,
745 stats: Stats::empty(),
746 budget: Budget::default(),
747 }
748 }
749}
750
751impl<C, K: Clone> Clone for Context<C, K> {
753 fn clone(&self) -> Self {
754 Self {
755 book: self.book.clone(),
756 addr: self.addr,
757 group: self.group,
758 demux: self.demux.clone(),
759 config: self.config.clone(),
760 key: self.key.clone(),
761 source: (),
762 stage: self.stage,
763 stats: Stats::empty(),
764 budget: self.budget.clone(),
765 }
766 }
767}
768
769#[must_use]
770pub struct RequestBuilder<'c, C, K, S, R, M> {
771 context: &'c Context<C, K, S>,
772 request: R,
773 to: Option<Addr>,
774 marker: PhantomData<M>,
775}
776
777pub struct Any;
778pub struct All;
779pub(crate) struct Forgotten;
780
781impl<'c, C, K, S, R> RequestBuilder<'c, C, K, S, R, Any> {
782 fn new(context: &'c Context<C, K, S>, request: R) -> Self {
783 Self {
784 context,
785 request,
786 to: None,
787 marker: PhantomData,
788 }
789 }
790
791 #[inline]
792 pub fn all(self) -> RequestBuilder<'c, C, K, S, R, All> {
793 RequestBuilder {
794 context: self.context,
795 request: self.request,
796 to: self.to,
797 marker: PhantomData,
798 }
799 }
800
801 #[allow(unused)]
803 pub(crate) fn forgotten(self) -> RequestBuilder<'c, C, K, S, R, Forgotten> {
804 RequestBuilder {
805 context: self.context,
806 request: self.request,
807 to: self.to,
808 marker: PhantomData,
809 }
810 }
811}
812
813impl<'c, C, K, S, R, M> RequestBuilder<'c, C, K, S, R, M> {
814 #[deprecated(note = "use `Context::request_to()` instead")]
815 #[doc(hidden)]
816 #[inline]
817 pub fn from(self, addr: Addr) -> Self {
818 self.to(addr)
819 }
820
821 #[inline]
823 fn to(mut self, addr: Addr) -> Self {
824 self.to = Some(addr);
825 self
826 }
827}
828
829impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, S, K, R, Any> {
831 pub async fn resolve(self) -> Result<R::Response, RequestError<R>> {
833 let this = self.context.addr;
835 let object = self.context.book.get_owned(this).expect("invalid addr");
836 let actor = object.as_actor().expect("can be called only on actors");
837 let token = actor
838 .request_table()
839 .new_request(self.context.book.clone(), false);
840 let request_id = token.request_id;
841 let kind = MessageKind::RequestAny(token);
842
843 let res = if let Some(recipient) = self.to {
844 self.context.do_send_to(recipient, self.request, kind).await
845 } else {
846 self.context.do_send(self.request, kind).await
847 };
848
849 if let Err(err) = res {
850 return Err(RequestError::Closed(err.0));
851 }
852
853 let mut data = actor.request_table().wait(request_id).await;
854 if let Some(Some(envelope)) = data.pop() {
855 let envelope = envelope.do_downcast::<R::Wrapper>();
856
857 trace!("< {:?}", envelope.message());
859 if let Some(permit) = DUMPER.acquire_m::<R>() {
860 permit.record(Dump::message(
861 envelope.message().clone(),
862 envelope.message_kind(),
863 Direction::In,
864 ));
865 }
866 Ok(envelope.into_message().into())
867 } else {
868 Err(RequestError::Ignored)
870 }
871 }
872}
873
874impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, K, S, R, All> {
875 pub async fn resolve(self) -> Vec<Result<R::Response, RequestError<R>>> {
877 let this = self.context.addr;
879 let object = self.context.book.get_owned(this).expect("invalid addr");
880 let actor = object.as_actor().expect("can be called only on actors");
881 let token = actor
882 .request_table()
883 .new_request(self.context.book.clone(), true);
884 let request_id = token.request_id;
885 let kind = MessageKind::RequestAll(token);
886
887 let res = if let Some(recipient) = self.to {
888 self.context.do_send_to(recipient, self.request, kind).await
889 } else {
890 self.context.do_send(self.request, kind).await
891 };
892
893 if let Err(err) = res {
894 return vec![Err(RequestError::Closed(err.0))];
895 }
896
897 actor
898 .request_table()
899 .wait(request_id)
900 .await
901 .into_iter()
902 .map(|opt| match opt {
903 Some(envelope) => Ok(envelope.do_downcast::<R::Wrapper>()),
904 None => Err(RequestError::Ignored),
905 })
906 .map(|res| {
907 let envelope = res?;
908
909 trace!("< {:?}", envelope.message());
911
912 if let Some(permit) = DUMPER.acquire_m::<R>() {
914 permit.record(Dump::message(
915 envelope.message().clone(),
916 envelope.message_kind(),
917 Direction::In,
918 ));
919 }
920 Ok(envelope.into_message().into())
921 })
922 .collect()
923 }
924}
925
926impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, S, K, R, Forgotten> {
927 pub async fn resolve(self) -> Result<R::Response, RequestError<R>> {
928 let token = ResponseToken::forgotten(self.context.book.clone());
929 let kind = MessageKind::RequestAny(token);
930
931 let res = if let Some(recipient) = self.to {
932 self.context.do_send_to(recipient, self.request, kind).await
933 } else {
934 self.context.do_send(self.request, kind).await
935 };
936
937 if let Err(err) = res {
938 return Err(RequestError::Closed(err.0));
939 }
940
941 Err(RequestError::Ignored)
942 }
943}