elfo_core/context.rs
1use std::{
2 future::poll_fn,
3 marker::PhantomData,
4 pin::Pin,
5 sync::{Arc, LazyLock},
6 task::Poll,
7};
8
9use futures::{pin_mut, Stream};
10use idr_ebr::EbrGuard;
11use tracing::{info, trace};
12
13use elfo_utils::unlikely;
14
15use crate::{
16 actor::{Actor, ActorStartInfo},
17 actor_status::ActorStatus,
18 addr::Addr,
19 address_book::AddressBook,
20 config::AnyConfig,
21 coop,
22 demux::Demux,
23 dumping::{Direction, Dump, Dumper, INTERNAL_CLASS},
24 envelope::{Envelope, MessageKind},
25 errors::{RequestError, SendError, TryRecvError, TrySendError},
26 mailbox::RecvResult,
27 message::{Message, Request},
28 messages, msg,
29 object::{BorrowedObject, Object, OwnedObject},
30 request_table::ResponseToken,
31 restarting::RestartPolicy,
32 routers::Singleton,
33 scope,
34 source::{SourceHandle, Sources, UnattachedSource},
35 ActorStatusKind,
36};
37
38use self::stats::Stats;
39
40mod stats;
41
42static DUMPER: LazyLock<Dumper> = LazyLock::new(|| Dumper::new(INTERNAL_CLASS));
43
44/// An actor execution context.
45pub struct Context<C = (), K = Singleton> {
46 book: AddressBook,
47 actor: Option<OwnedObject>, // `None` for group's and pruned context.
48 actor_addr: Addr,
49 actor_start_info: Option<ActorStartInfo>, // `None` for group's context,
50 group_addr: Addr,
51 demux: Demux,
52 config: Arc<C>,
53 key: K,
54 sources: Sources,
55 stage: Stage,
56 stats: Stats,
57}
58
59#[derive(Clone, Copy, PartialEq)]
60enum Stage {
61 PreRecv,
62 Working,
63 Closed,
64}
65
66assert_impl_all!(Context: Send);
67// TODO: !Sync?
68
69impl<C, K> Context<C, K> {
70 /// Returns the actor's address.
71 #[inline]
72 pub fn addr(&self) -> Addr {
73 self.actor_addr
74 }
75
76 /// Returns the current group's address.
77 #[inline]
78 pub fn group(&self) -> Addr {
79 self.group_addr
80 }
81
82 /// Returns the actual config.
83 #[inline]
84 pub fn config(&self) -> &C {
85 &self.config
86 }
87
88 /// Returns the actor's key.
89 #[inline]
90 pub fn key(&self) -> &K {
91 &self.key
92 }
93
94 /// Attaches the provided source to the context.
95 ///
96 /// Messages produced by the source will be available via
97 /// [`Context::recv()`] and [`Context::try_recv()`] methods.
98 pub fn attach<S1: SourceHandle>(&mut self, source: UnattachedSource<S1>) -> S1 {
99 source.attach_to(&mut self.sources)
100 }
101
102 /// Updates the actor's status.
103 ///
104 /// # Example
105 /// ```
106 /// # use elfo_core as elfo;
107 /// # fn exec(ctx: elfo::Context) {
108 /// ctx.set_status(elfo::ActorStatus::ALARMING.with_details("something wrong"));
109 /// # }
110 /// ```
111 pub fn set_status(&self, status: ActorStatus) {
112 ward!(self.actor.as_ref().and_then(|o| o.as_actor())).set_status(status);
113 }
114
115 /// Gets the actor's status kind.
116 ///
117 /// # Example
118 /// ```
119 /// # use elfo_core as elfo;
120 /// # fn exec(ctx: elfo::Context) {
121 /// // if actor is terminating.
122 /// assert!(ctx.status_kind().is_terminating());
123 /// // if actor is alarming.
124 /// assert!(ctx.status_kind().is_alarming());
125 /// // and so on...
126 /// # }
127 /// ```
128 /// # Panics
129 ///
130 /// Panics when called on pruned context.
131 pub fn status_kind(&self) -> ActorStatusKind {
132 self.actor
133 .as_ref()
134 .expect("called `status_kind()` on pruned context")
135 .as_actor()
136 .expect("invariant")
137 .status_kind()
138 }
139
140 /// Overrides the group's default mailbox capacity, which set in the config.
141 ///
142 /// Note: after restart the actor will be created from scratch, so this
143 /// override will be also reset to the group's default mailbox capacity.
144 ///
145 /// # Example
146 /// ```
147 /// # use elfo_core as elfo;
148 /// # fn exec(ctx: elfo::Context) {
149 /// // Override the group's default mailbox capacity.
150 /// ctx.set_mailbox_capacity(42);
151 ///
152 /// // Set the group's default mailbox capacity.
153 /// ctx.set_mailbox_capacity(None);
154 /// # }
155 /// ```
156 pub fn set_mailbox_capacity(&self, capacity: impl Into<Option<usize>>) {
157 ward!(self.actor.as_ref().and_then(|o| o.as_actor()))
158 .set_mailbox_capacity_override(capacity.into());
159 }
160
161 /// Overrides the group's default restart policy, which set in the config.
162 ///
163 /// Note: after restart the actor will be created from scratch, so this
164 /// override will be also reset to the group's default restart policy.
165 ///
166 /// # Example
167 /// ```
168 /// # use elfo_core as elfo;
169 /// # fn exec(ctx: elfo::Context) {
170 /// // Override the group's default restart policy.
171 /// ctx.set_restart_policy(elfo::RestartPolicy::never());
172 ///
173 /// // Set the group's default restart policy.
174 /// ctx.set_restart_policy(None);
175 /// # }
176 /// ```
177 pub fn set_restart_policy(&self, policy: impl Into<Option<RestartPolicy>>) {
178 ward!(self.actor.as_ref().and_then(|o| o.as_actor())).set_restart_policy(policy.into());
179 }
180
181 /// Closes the mailbox, that leads to returning `None` from `recv()` and
182 /// `try_recv()` after handling all available messages in the mailbox.
183 ///
184 /// Returns `true` if the mailbox has just been closed.
185 pub fn close(&self) -> bool {
186 ward!(self.actor.as_ref().and_then(|o| o.as_actor()), return false).close()
187 }
188
189 /// Sends a message using the [inter-group routing] system.
190 ///
191 /// It's possible to send requests if the response is not needed.
192 ///
193 /// Returns `Err` if the message hasn't reached any mailboxes.
194 ///
195 /// # Cancel safety
196 ///
197 /// If cancelled, recipients with full mailboxes wont't receive the message.
198 ///
199 /// # Example
200 /// ```
201 /// # use elfo_core as elfo;
202 /// # async fn exec(mut ctx: elfo::Context) {
203 /// # use elfo::{message, msg};
204 /// #[message]
205 /// struct SomethingHappened;
206 ///
207 /// // Fire and forget.
208 /// let _ = ctx.send(SomethingHappened).await;
209 ///
210 /// // Fire or log.
211 /// if let Err(error) = ctx.send(SomethingHappened).await {
212 /// tracing::warn!(%error, "...");
213 /// }
214 /// # }
215 /// ```
216 ///
217 /// [inter-group routing]: https://actoromicon.rs/ch03-01-routing.html
218 pub async fn send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
219 let kind = MessageKind::regular(self.actor_addr);
220 self.do_send_async(message, kind).await
221 }
222
223 /// Tries to send a message using the [inter-group routing] system.
224 ///
225 /// It's possible to send requests if the response is not needed.
226 ///
227 /// Returns
228 /// * `Ok(())` if the message has been added to any mailbox.
229 /// * `Err(Full(_))` if some mailboxes are full.
230 /// * `Err(Closed(_))` otherwise.
231 ///
232 /// # Example
233 /// ```
234 /// # use elfo_core as elfo;
235 /// # async fn exec(mut ctx: elfo::Context) {
236 /// # use elfo::{message, msg};
237 /// #[message]
238 /// struct SomethingHappened;
239 ///
240 /// // Fire and forget.
241 /// let _ = ctx.try_send(SomethingHappened);
242 ///
243 /// // Fire or log.
244 /// if let Err(error) = ctx.try_send(SomethingHappened) {
245 /// tracing::warn!(%error, "...");
246 /// }
247 /// # }
248 /// ```
249 ///
250 /// [inter-group routing]: https://actoromicon.rs/ch03-01-routing.html
251 pub fn try_send<M: Message>(&self, message: M) -> Result<(), TrySendError<M>> {
252 // XXX: avoid duplication with `unbounded_send()` and `send()`.
253
254 let kind = MessageKind::regular(self.actor_addr);
255
256 self.stats.on_sent_message(&message); // TODO: only if successful?
257
258 trace!("> {:?}", message);
259 if let Some(permit) = DUMPER.acquire_m(&message) {
260 permit.record(Dump::message(&message, &kind, Direction::Out));
261 }
262
263 let envelope = Envelope::new(message, kind);
264 let addrs = self.demux.filter(&envelope);
265
266 if addrs.is_empty() {
267 return Err(TrySendError::Closed(e2m(envelope)));
268 }
269
270 let guard = EbrGuard::new();
271
272 if addrs.len() == 1 {
273 return match self.book.get(addrs[0], &guard) {
274 Some(object) => object
275 .try_send(Addr::NULL, envelope)
276 .map_err(|err| err.map(e2m)),
277 None => Err(TrySendError::Closed(e2m(envelope))),
278 };
279 }
280
281 let mut unused = None;
282 let mut has_full = false;
283 let mut success = false;
284
285 for (addr, envelope) in addrs_with_envelope(envelope, &addrs) {
286 match self.book.get(addr, &guard) {
287 Some(object) => match object.try_send(Addr::NULL, envelope) {
288 Ok(()) => success = true,
289 Err(err) => {
290 has_full |= err.is_full();
291 unused = Some(err.into_inner());
292 }
293 },
294 None => unused = Some(envelope),
295 };
296 }
297
298 if success {
299 Ok(())
300 } else if has_full {
301 Err(TrySendError::Full(e2m(unused.unwrap())))
302 } else {
303 Err(TrySendError::Closed(e2m(unused.unwrap())))
304 }
305 }
306
307 /// Sends a message using the [inter-group routing] system.
308 /// Affects other senders.
309 ///
310 /// Usually this method shouldn't be used because it can lead to high memory
311 /// usage and even OOM if the recipient works too slowly.
312 /// Prefer [`Context::try_send()`] or [`Context::send()`] instead.
313 ///
314 /// It's possible to send requests if the response is not needed.
315 ///
316 /// Returns `Err` if the message hasn't reached mailboxes.
317 ///
318 /// # Example
319 /// ```
320 /// # use elfo_core as elfo;
321 /// # async fn exec(mut ctx: elfo::Context) {
322 /// # use elfo::{message, msg};
323 /// #[message]
324 /// struct SomethingHappened;
325 ///
326 /// // Fire and forget.
327 /// let _ = ctx.unbounded_send(SomethingHappened);
328 ///
329 /// // Fire or log.
330 /// if let Err(error) = ctx.unbounded_send(SomethingHappened) {
331 /// tracing::warn!(%error, "...");
332 /// }
333 /// # }
334 /// ```
335 ///
336 /// [inter-group routing]: https://actoromicon.rs/ch03-01-routing.html
337 pub fn unbounded_send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
338 let kind = MessageKind::regular(self.actor_addr);
339
340 self.stats.on_sent_message(&message); // TODO: only if successful?
341
342 trace!("> {:?}", message);
343 if let Some(permit) = DUMPER.acquire_m(&message) {
344 permit.record(Dump::message(&message, &kind, Direction::Out));
345 }
346
347 let envelope = Envelope::new(message, kind);
348 let addrs = self.demux.filter(&envelope);
349
350 if addrs.is_empty() {
351 return Err(SendError(e2m(envelope)));
352 }
353
354 let guard = EbrGuard::new();
355
356 if addrs.len() == 1 {
357 return match self.book.get(addrs[0], &guard) {
358 Some(object) => object
359 .unbounded_send(Addr::NULL, envelope)
360 .map_err(|err| err.map(e2m)),
361 None => Err(SendError(e2m(envelope))),
362 };
363 }
364
365 let mut unused = None;
366 let mut success = false;
367
368 for (addr, envelope) in addrs_with_envelope(envelope, &addrs) {
369 match self.book.get(addr, &guard) {
370 Some(object) => match object.unbounded_send(Addr::NULL, envelope) {
371 Ok(()) => success = true,
372 Err(err) => unused = Some(err.into_inner()),
373 },
374 None => unused = Some(envelope),
375 };
376 }
377
378 if success {
379 Ok(())
380 } else {
381 Err(SendError(e2m(unused.unwrap())))
382 }
383 }
384
385 /// Returns a request builder to send a request (on `resolve()`) using
386 /// the [inter-group routing] system.
387 ///
388 /// # Example
389 /// ```ignore
390 /// // Request and wait for a response.
391 /// let response = ctx.request(SomeCommand).resolve().await?;
392 ///
393 /// // Request and wait for all responses.
394 /// for result in ctx.request(SomeCommand).all().resolve().await {
395 /// // ...
396 /// }
397 /// ```
398 ///
399 /// [inter-group routing]: https://actoromicon.rs/ch03-01-routing.html
400 #[inline]
401 pub fn request<R: Request>(&self, request: R) -> RequestBuilder<'_, C, K, R, Any> {
402 RequestBuilder::new(self, request)
403 }
404
405 /// Returns a request builder to send a request to the specified recipient.
406 ///
407 /// # Example
408 /// ```ignore
409 /// // Request and wait for a response.
410 /// let response = ctx.request_to(addr, SomeCommand).resolve().await?;
411 ///
412 /// // Request and wait for all responses.
413 /// for result in ctx.request_to(addr, SomeCommand).all().resolve().await {
414 /// // ...
415 /// }
416 /// ```
417 #[inline]
418 pub fn request_to<R: Request>(
419 &self,
420 recipient: Addr,
421 request: R,
422 ) -> RequestBuilder<'_, C, K, R, Any> {
423 RequestBuilder::new(self, request).to(recipient)
424 }
425
426 async fn do_send_async<M: Message>(
427 &self,
428 message: M,
429 kind: MessageKind,
430 ) -> Result<(), SendError<M>> {
431 self.stats.on_sent_message(&message); // TODO: only if successful?
432
433 trace!("> {:?}", message);
434 if let Some(permit) = DUMPER.acquire_m(&message) {
435 permit.record(Dump::message(&message, &kind, Direction::Out));
436 }
437
438 let envelope = Envelope::new(message, kind);
439 let addrs = self.demux.filter(&envelope);
440
441 if addrs.is_empty() {
442 return Err(SendError(e2m(envelope)));
443 }
444
445 if addrs.len() == 1 {
446 let recipient = addrs[0];
447 return {
448 let guard = EbrGuard::new();
449 let entry = self.book.get(recipient, &guard);
450 let object = ward!(entry, return Err(SendError(e2m(envelope))));
451 Object::send(object, Addr::NULL, envelope)
452 }
453 .await
454 .map_err(|err| err.map(e2m));
455 }
456
457 let mut unused = None;
458 let mut success = false;
459
460 // TODO: send concurrently.
461 for (recipient, envelope) in addrs_with_envelope(envelope, &addrs) {
462 let returned_envelope = {
463 let guard = EbrGuard::new();
464 let entry = self.book.get(recipient, &guard);
465 let object = ward!(entry, {
466 unused = Some(envelope);
467 continue;
468 });
469 Object::send(object, Addr::NULL, envelope)
470 }
471 .await
472 .err()
473 .map(|err| err.into_inner());
474
475 unused = returned_envelope;
476 if unused.is_none() {
477 success = true;
478 }
479 }
480
481 if success {
482 Ok(())
483 } else {
484 Err(SendError(e2m(unused.unwrap())))
485 }
486 }
487
488 /// Sends a message to the specified recipient.
489 /// Waits if the recipient's mailbox is full.
490 ///
491 /// It's possible to send requests if the response is not needed.
492 ///
493 /// Returns `Err` if the message hasn't reached any mailboxes.
494 ///
495 /// # Cancel safety
496 ///
497 /// If cancelled, recipients with full mailboxes wont't receive the message.
498 ///
499 /// # Example
500 /// ```
501 /// # use elfo_core as elfo;
502 /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) {
503 /// # use elfo::{message, msg};
504 /// #[message]
505 /// struct SomethingHappened;
506 ///
507 /// let _ = ctx.send_to(addr, SomethingHappened).await;
508 ///
509 /// // Fire or log.
510 /// if let Err(error) = ctx.send_to(addr, SomethingHappened).await {
511 /// tracing::warn!(%error, "...");
512 /// }
513 /// # }
514 /// ```
515 pub async fn send_to<M: Message>(
516 &self,
517 recipient: Addr,
518 message: M,
519 ) -> Result<(), SendError<M>> {
520 let kind = MessageKind::regular(self.actor_addr);
521 self.do_send_to(recipient, message, kind, |object, envelope| {
522 Object::send(object, recipient, envelope)
523 })?
524 .await
525 .map_err(|err| err.map(e2m))
526 }
527
528 /// Tries to send a message to the specified recipient.
529 /// Returns an error if the recipient's mailbox is full.
530 ///
531 /// It's possible to send requests if the response is not needed.
532 ///
533 /// Returns
534 /// * `Ok(())` if the message has been added to any mailbox.
535 /// * `Err(Full(_))` if some mailboxes are full.
536 /// * `Err(Closed(_))` otherwise.
537 ///
538 /// # Example
539 /// ```
540 /// # use elfo_core as elfo;
541 /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) {
542 /// # use elfo::{message, msg};
543 /// #[message]
544 /// struct SomethingHappened;
545 ///
546 /// // Fire and forget.
547 /// let _ = ctx.try_send_to(addr, SomethingHappened);
548 ///
549 /// // Fire or log.
550 /// if let Err(error) = ctx.try_send_to(addr, SomethingHappened) {
551 /// tracing::warn!(%error, "...");
552 /// }
553 /// # }
554 /// ```
555 pub fn try_send_to<M: Message>(
556 &self,
557 recipient: Addr,
558 message: M,
559 ) -> Result<(), TrySendError<M>> {
560 let kind = MessageKind::regular(self.actor_addr);
561 self.do_send_to(recipient, message, kind, |object, envelope| {
562 object
563 .try_send(recipient, envelope)
564 .map_err(|err| err.map(e2m))
565 })?
566 }
567
568 /// Sends a message to the specified recipient.
569 /// Affects other senders.
570 ///
571 /// Usually this method shouldn't be used because it can lead to high memory
572 /// usage and even OOM if the recipient works too slowly.
573 /// Prefer [`Context::try_send_to()`] or [`Context::send_to()`] instead.
574 ///
575 /// It's possible to send requests if the response is not needed.
576 ///
577 /// Returns `Err` if the message hasn't reached mailboxes.
578 ///
579 /// # Example
580 /// ```
581 /// # use elfo_core as elfo;
582 /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) {
583 /// # use elfo::{message, msg};
584 /// #[message]
585 /// struct SomethingHappened;
586 ///
587 /// // Fire and forget.
588 /// let _ = ctx.unbounded_send_to(addr, SomethingHappened);
589 ///
590 /// // Fire or log.
591 /// if let Err(error) = ctx.unbounded_send_to(addr, SomethingHappened) {
592 /// tracing::warn!(%error, "...");
593 /// }
594 /// # }
595 /// ```
596 pub fn unbounded_send_to<M: Message>(
597 &self,
598 recipient: Addr,
599 message: M,
600 ) -> Result<(), SendError<M>> {
601 let kind = MessageKind::regular(self.actor_addr);
602 self.do_send_to(recipient, message, kind, |object, envelope| {
603 object
604 .unbounded_send(recipient, envelope)
605 .map_err(|err| err.map(e2m))
606 })?
607 }
608
609 #[inline(always)]
610 fn do_send_to<M: Message, R>(
611 &self,
612 recipient: Addr,
613 message: M,
614 kind: MessageKind,
615 f: impl FnOnce(BorrowedObject<'_>, Envelope) -> R,
616 ) -> Result<R, SendError<M>> {
617 self.stats.on_sent_message(&message); // TODO: only if successful?
618
619 trace!(to = %recipient, "> {:?}", message);
620 if let Some(permit) = DUMPER.acquire_m(&message) {
621 permit.record(Dump::message(&message, &kind, Direction::Out));
622 }
623
624 let guard = EbrGuard::new();
625 let entry = self.book.get(recipient, &guard);
626 let object = ward!(entry, return Err(SendError(message)));
627 let envelope = Envelope::new(message, kind);
628
629 Ok(f(object, envelope))
630 }
631
632 /// Responds to the requester with the provided response.
633 ///
634 /// The token can be used only once.
635 ///
636 /// ```ignore
637 /// msg!(match envelope {
638 /// (SomeRequest, token) => {
639 /// ctx.respond(token, SomeResponse);
640 /// }
641 /// })
642 /// ```
643 pub fn respond<R: Request>(&self, token: ResponseToken<R>, message: R::Response) {
644 if token.is_forgotten() {
645 return;
646 }
647
648 let token = token.into_untyped();
649 let recipient = token.sender();
650 let message = R::Wrapper::from(message);
651 self.stats.on_sent_message(&message); // TODO: only if successful?
652
653 let kind = MessageKind::Response {
654 sender: self.addr(),
655 request_id: token.request_id(),
656 };
657
658 trace!(to = %recipient, "> {:?}", message);
659 if let Some(permit) = DUMPER.acquire_m(&message) {
660 permit.record(Dump::message(&message, &kind, Direction::Out));
661 }
662
663 let envelope = Envelope::new(message, kind);
664 let guard = EbrGuard::new();
665 let object = ward!(self.book.get(recipient, &guard));
666 object.respond(token, Ok(envelope));
667 }
668
669 /// Receives the next envelope from the mailbox or sources.
670 /// If the envelope isn't available, the method waits for the next one.
671 /// If the mailbox is closed, `None` is returned.
672 ///
673 /// # Budget
674 ///
675 /// The method returns the execution back to the runtime once the actor's
676 /// budget has been exhausted. It prevents the actor from blocking the
677 /// runtime for too long. See [`coop`] for details.
678 ///
679 /// # Cancel safety
680 ///
681 /// This method is cancel safe. However, using `select!` requires handling
682 /// tracing on your own, so avoid it if possible (use sources instead).
683 ///
684 /// # Panics
685 ///
686 /// If the method is called again after `None` is returned.
687 ///
688 /// # Example
689 /// ```
690 /// # use elfo_core as elfo;
691 /// # async fn exec(mut ctx: elfo::Context) {
692 /// # use elfo::{message, msg};
693 /// # #[message]
694 /// # struct SomethingHappened;
695 /// while let Some(envelope) = ctx.recv().await {
696 /// msg!(match envelope {
697 /// SomethingHappened => { /* ... */ },
698 /// });
699 /// }
700 /// # }
701 /// ```
702 pub async fn recv(&mut self) -> Option<Envelope>
703 where
704 C: 'static,
705 {
706 'outer: loop {
707 self.pre_recv().await;
708
709 let envelope = 'received: {
710 let mailbox_fut = self.actor.as_ref()?.as_actor()?.recv();
711 pin_mut!(mailbox_fut);
712
713 tokio::select! {
714 result = mailbox_fut => match result {
715 RecvResult::Data(envelope) => {
716 break 'received envelope;
717 },
718 RecvResult::Closed(trace_id) => {
719 scope::set_trace_id(trace_id);
720 let actor = self.actor.as_ref()?.as_actor()?;
721 on_input_closed(&mut self.stage, actor);
722 return None;
723 }
724 },
725 option = self.sources.next(), if !self.sources.is_empty() => {
726 let envelope = ward!(option, continue 'outer);
727 break 'received envelope;
728 },
729 }
730 };
731
732 if let Some(envelope) = self.post_recv(envelope) {
733 return Some(envelope);
734 }
735 }
736 }
737
738 /// Receives the next envelope from the mailbox or sources without waiting.
739 /// If the envelope isn't available, `Err(TryRecvError::Empty)` is returned.
740 /// If the mailbox is closed, `Err(TryRecvError::Closed)` is returned.
741 /// Useful to batch message processing.
742 ///
743 /// The method is async due to the following reasons:
744 /// 1. To poll sources, not only the mailbox.
745 /// 2. To respect the actor budget (see below).
746 ///
747 /// # Budget
748 ///
749 /// The method returns the execution back to the runtime once the actor's
750 /// budget has been exhausted. It prevents the actor from blocking the
751 /// runtime for too long. See [`coop`] for details.
752 ///
753 /// # Cancel safety
754 ///
755 /// This method is cancel safe. However, using `select!` requires handling
756 /// tracing on your own, so avoid it if possible (use sources instead).
757 ///
758 /// # Panics
759 ///
760 /// If the method is called again after `Err(TryRecvError::Closed)`.
761 ///
762 /// # Example
763 ///
764 /// Handle all available messages:
765 /// ```
766 /// # use elfo_core as elfo;
767 /// # async fn exec(mut ctx: elfo::Context) {
768 /// # fn handle_batch(_batch: impl Iterator<Item = elfo::Envelope>) {}
769 /// let mut batch = Vec::new();
770 ///
771 /// loop {
772 /// match ctx.try_recv().await {
773 /// Ok(envelope) => batch.push(envelope),
774 /// Err(err) => {
775 /// handle_batch(batch.drain(..));
776 ///
777 /// if err.is_empty() {
778 /// // Wait for the next batch.
779 /// if let Some(envelope) = ctx.recv().await {
780 /// batch.push(envelope);
781 /// continue;
782 /// }
783 /// }
784 ///
785 /// break;
786 /// },
787 /// }
788 /// }
789 /// # }
790 /// ```
791 pub async fn try_recv(&mut self) -> Result<Envelope, TryRecvError>
792 where
793 C: 'static,
794 {
795 #[allow(clippy::never_loop)] // false positive
796 loop {
797 self.pre_recv().await;
798
799 let envelope = 'received: {
800 let actor = ward!(
801 self.actor.as_ref().and_then(|o| o.as_actor()),
802 return Err(TryRecvError::Closed)
803 );
804
805 // TODO: poll mailbox and sources fairly.
806 match actor.try_recv() {
807 Some(RecvResult::Data(envelope)) => {
808 break 'received envelope;
809 }
810 Some(RecvResult::Closed(trace_id)) => {
811 scope::set_trace_id(trace_id);
812 on_input_closed(&mut self.stage, actor);
813 return Err(TryRecvError::Closed);
814 }
815 None => {}
816 }
817
818 if !self.sources.is_empty() {
819 let envelope = poll_fn(|cx| match Pin::new(&mut self.sources).poll_next(cx) {
820 Poll::Ready(Some(envelope)) => Poll::Ready(Some(envelope)),
821 _ => Poll::Ready(None),
822 })
823 .await;
824
825 if let Some(envelope) = envelope {
826 break 'received envelope;
827 }
828 }
829
830 self.stats.on_empty_mailbox();
831 return Err(TryRecvError::Empty);
832 };
833
834 if let Some(envelope) = self.post_recv(envelope) {
835 return Ok(envelope);
836 }
837 }
838 }
839
840 /// Retrieves information related to the start of the actor.
841 ///
842 /// # Panics
843 ///
844 /// This method will panic if the context is pruned, indicating that the
845 /// required information is no longer available.
846 ///
847 /// # Example
848 /// ```
849 /// # use elfo_core as elfo;
850 /// # use elfo_core::{ActorStartCause, ActorStartInfo};
851 /// # async fn exec(mut ctx: elfo::Context) {
852 /// match ctx.start_info().cause {
853 /// ActorStartCause::GroupMounted => {
854 /// // The actor started because its group was mounted.
855 /// }
856 /// ActorStartCause::OnMessage => {
857 /// // The actor started in response to a message.
858 /// }
859 /// ActorStartCause::Restarted => {
860 /// // The actor started due to the restart policy.
861 /// }
862 /// _ => {}
863 /// }
864 /// # }
865 /// ```
866 pub fn start_info(&self) -> &ActorStartInfo {
867 self.actor_start_info
868 .as_ref()
869 .expect("start_info is not available for a group context")
870 }
871
872 async fn pre_recv(&mut self) {
873 self.stats.on_recv();
874
875 coop::consume_budget().await;
876
877 if unlikely(self.stage == Stage::Closed) {
878 panic!("calling `recv()` or `try_recv()` after `None` is returned, an infinite loop?");
879 }
880
881 if unlikely(self.stage == Stage::PreRecv) {
882 let actor = ward!(self.actor.as_ref().and_then(|o| o.as_actor()));
883 if actor.status_kind().is_initializing() {
884 actor.set_status(ActorStatus::NORMAL);
885 }
886 self.stage = Stage::Working;
887 }
888 }
889
890 fn post_recv(&mut self, envelope: Envelope) -> Option<Envelope>
891 where
892 C: 'static,
893 {
894 scope::set_trace_id(envelope.trace_id());
895
896 let envelope = msg!(match envelope {
897 (messages::UpdateConfig { config }, token) => {
898 self.config = config.get_user::<C>().clone();
899 info!("config updated");
900 let message = messages::ConfigUpdated {};
901 let kind = MessageKind::regular(self.actor_addr);
902 let envelope = Envelope::new(message, kind);
903 self.respond(token, Ok(()));
904 envelope
905 }
906 envelope => envelope,
907 });
908
909 let message = envelope.message();
910 trace!("< {:?}", message);
911 if let Some(permit) = DUMPER.acquire_m(&*message) {
912 let kind = envelope.message_kind();
913 permit.record(Dump::message(&*message, kind, Direction::In));
914 }
915
916 // We should change the status after dumping the original message
917 // in order to see `ActorStatusReport` after that message.
918 if envelope.is::<messages::Terminate>() {
919 self.set_status(ActorStatus::TERMINATING);
920 }
921
922 self.stats.on_received_envelope(&envelope);
923
924 msg!(match envelope {
925 (messages::Ping, token) => {
926 self.respond(token, ());
927 None
928 }
929 envelope => Some(envelope),
930 })
931 }
932
933 /// This is a part of private API for now.
934 /// We should provide a way to handle it asynchronous.
935 #[doc(hidden)]
936 pub async fn finished(&self, addr: Addr) {
937 ward!(self.book.get_owned(addr)).finished().await;
938 }
939
940 /// Used to get the typed config from `ValidateConfig`.
941 /// ```ignore
942 /// msg!(match envelope {
943 /// (ValidateConfig { config, .. }, token) => {
944 /// let new_config = ctx.unpack_config(&config);
945 /// ctx.respond(token, Err("oops".into()));
946 /// }
947 /// })
948 /// ```
949 pub fn unpack_config<'c>(&self, config: &'c AnyConfig) -> &'c C
950 where
951 C: for<'de> serde::Deserialize<'de> + 'static,
952 {
953 config.get_user()
954 }
955
956 /// Produces a new context that can be used for sending messages only.
957 ///
958 /// Pruned contexts are likely to be removed in favor of `Output`.
959 pub fn pruned(&self) -> Context {
960 Context {
961 book: self.book.clone(),
962 actor: None,
963 actor_addr: self.actor_addr,
964 actor_start_info: self.actor_start_info.clone(),
965 group_addr: self.group_addr,
966 demux: self.demux.clone(),
967 config: Arc::new(()),
968 key: Singleton,
969 sources: Sources::new(),
970 stage: self.stage,
971 stats: Stats::empty(),
972 }
973 }
974
975 #[doc(hidden)]
976 #[instability::unstable]
977 pub fn book(&self) -> &AddressBook {
978 &self.book
979 }
980
981 pub(crate) fn with_config<C1>(self, config: Arc<C1>) -> Context<C1, K> {
982 Context {
983 book: self.book,
984 actor: self.actor,
985 actor_addr: self.actor_addr,
986 actor_start_info: self.actor_start_info,
987 group_addr: self.group_addr,
988 demux: self.demux,
989 config,
990 key: self.key,
991 sources: self.sources,
992 stage: self.stage,
993 stats: self.stats,
994 }
995 }
996
997 pub(crate) fn with_addr(mut self, addr: Addr) -> Self {
998 self.actor = self.book.get_owned(addr);
999 assert!(self.actor.is_some());
1000 self.actor_addr = addr;
1001 self.stats = Stats::startup();
1002 self
1003 }
1004
1005 pub(crate) fn with_group(mut self, group: Addr) -> Self {
1006 self.group_addr = group;
1007 self
1008 }
1009
1010 pub(crate) fn with_start_info(mut self, actor_start_info: ActorStartInfo) -> Self {
1011 self.actor_start_info = Some(actor_start_info);
1012 self
1013 }
1014
1015 pub(crate) fn with_key<K1>(self, key: K1) -> Context<C, K1> {
1016 Context {
1017 book: self.book,
1018 actor: self.actor,
1019 actor_addr: self.actor_addr,
1020 actor_start_info: self.actor_start_info,
1021 group_addr: self.group_addr,
1022 demux: self.demux,
1023 config: self.config,
1024 key,
1025 sources: self.sources,
1026 stage: self.stage,
1027 stats: self.stats,
1028 }
1029 }
1030}
1031
1032#[cold]
1033fn e2m<M: Message>(envelope: Envelope) -> M {
1034 envelope.unpack().expect("invalid message").0
1035}
1036
1037#[cold]
1038fn on_input_closed(stage: &mut Stage, actor: &Actor) {
1039 if !actor.status_kind().is_terminating() {
1040 actor.set_status(ActorStatus::TERMINATING);
1041 }
1042 *stage = Stage::Closed;
1043 trace!("input closed");
1044}
1045
1046fn addrs_with_envelope(
1047 envelope: Envelope,
1048 addrs: &[Addr],
1049) -> impl Iterator<Item = (Addr, Envelope)> + '_ {
1050 let mut envelope = Some(envelope);
1051
1052 // TODO: use the visitor pattern in order to avoid extra cloning,
1053 // but think about response tokens first.
1054 addrs.iter().enumerate().map(move |(i, addr)| {
1055 (
1056 *addr,
1057 if i + 1 == addrs.len() {
1058 envelope.take().unwrap()
1059 } else {
1060 envelope.as_ref().unwrap().duplicate()
1061 },
1062 )
1063 })
1064}
1065
1066impl Context {
1067 pub(crate) fn new(book: AddressBook, demux: Demux) -> Self {
1068 Self {
1069 book,
1070 actor: None,
1071 actor_addr: Addr::NULL,
1072 group_addr: Addr::NULL,
1073 actor_start_info: None,
1074 demux,
1075 config: Arc::new(()),
1076 key: Singleton,
1077 sources: Sources::new(),
1078 stage: Stage::PreRecv,
1079 stats: Stats::empty(),
1080 }
1081 }
1082}
1083
1084// TODO(v0.2): remove this instance.
1085impl<C, K: Clone> Clone for Context<C, K> {
1086 fn clone(&self) -> Self {
1087 Self {
1088 book: self.book.clone(),
1089 actor: self.book.get_owned(self.actor_addr),
1090 actor_addr: self.actor_addr,
1091 actor_start_info: self.actor_start_info.clone(),
1092 group_addr: self.group_addr,
1093 demux: self.demux.clone(),
1094 config: self.config.clone(),
1095 key: self.key.clone(),
1096 sources: Sources::new(),
1097 stage: self.stage,
1098 stats: Stats::empty(),
1099 }
1100 }
1101}
1102
1103#[must_use]
1104pub struct RequestBuilder<'c, C, K, R, M> {
1105 context: &'c Context<C, K>,
1106 request: R,
1107 to: Option<Addr>,
1108 marker: PhantomData<M>,
1109}
1110
1111pub struct Any;
1112pub struct All;
1113
1114impl<'c, C, K, R> RequestBuilder<'c, C, K, R, Any> {
1115 fn new(context: &'c Context<C, K>, request: R) -> Self {
1116 Self {
1117 context,
1118 request,
1119 to: None,
1120 marker: PhantomData,
1121 }
1122 }
1123
1124 #[inline]
1125 pub fn all(self) -> RequestBuilder<'c, C, K, R, All> {
1126 RequestBuilder {
1127 context: self.context,
1128 request: self.request,
1129 to: self.to,
1130 marker: PhantomData,
1131 }
1132 }
1133}
1134
1135impl<C, K, R: Request, M> RequestBuilder<'_, C, K, R, M> {
1136 /// Specified the recipient of the request.
1137 #[inline]
1138 fn to(mut self, addr: Addr) -> Self {
1139 self.to = Some(addr);
1140 self
1141 }
1142
1143 async fn do_send(self, kind: MessageKind) -> bool {
1144 if let Some(recipient) = self.to {
1145 let res = self
1146 .context
1147 .do_send_to(recipient, self.request, kind, |o, e| {
1148 Object::send(o, recipient, e)
1149 });
1150
1151 match res {
1152 Ok(fut) => fut.await.is_ok(),
1153 Err(_) => false,
1154 }
1155 } else {
1156 self.context.do_send_async(self.request, kind).await.is_ok()
1157 }
1158 }
1159}
1160
1161// TODO: add `pub async fn id() { ... }`
1162impl<C: 'static, K, R: Request> RequestBuilder<'_, C, K, R, Any> {
1163 /// Waits for the response.
1164 pub async fn resolve(self) -> Result<R::Response, RequestError> {
1165 // TODO: use `context.actor` after removing pruned contexts.
1166 let this = self.context.actor_addr;
1167 let object = self.context.book.get_owned(this).expect("invalid addr");
1168 let actor = object.as_actor().expect("can be called only on actors");
1169 let token =
1170 actor
1171 .request_table()
1172 .new_request(self.context.book.clone(), scope::trace_id(), false);
1173 let request_id = token.request_id();
1174 let kind = MessageKind::RequestAny(token);
1175
1176 if !self.do_send(kind).await {
1177 actor.request_table().cancel_request(request_id);
1178 return Err(RequestError::Failed);
1179 }
1180
1181 let mut responses = actor.request_table().wait(request_id).await;
1182 debug_assert_eq!(responses.len(), 1);
1183 prepare_response::<R>(responses.pop().expect("missing response"))
1184 }
1185}
1186
1187impl<C: 'static, K, R: Request> RequestBuilder<'_, C, K, R, All> {
1188 /// Waits for the responses.
1189 pub async fn resolve(self) -> Vec<Result<R::Response, RequestError>> {
1190 // TODO: use `context.actor` after removing pruned contexts.
1191 let this = self.context.actor_addr;
1192 let object = self.context.book.get_owned(this).expect("invalid addr");
1193 let actor = object.as_actor().expect("can be called only on actors");
1194 let token =
1195 actor
1196 .request_table()
1197 .new_request(self.context.book.clone(), scope::trace_id(), true);
1198 let request_id = token.request_id();
1199 let kind = MessageKind::RequestAll(token);
1200
1201 if !self.do_send(kind).await {
1202 actor.request_table().cancel_request(request_id);
1203 return vec![Err(RequestError::Failed)];
1204 }
1205
1206 actor
1207 .request_table()
1208 .wait(request_id)
1209 .await
1210 .into_iter()
1211 .map(prepare_response::<R>)
1212 .collect()
1213 }
1214}
1215
1216fn prepare_response<R: Request>(
1217 response: Result<Envelope, RequestError>,
1218) -> Result<R::Response, RequestError> {
1219 let envelope = response?;
1220 let (message, kind) = envelope.unpack::<R::Wrapper>().expect("invalid response");
1221
1222 // TODO: increase a counter.
1223 trace!("< {:?}", message);
1224 if let Some(permit) = DUMPER.acquire_m(&message) {
1225 permit.record(Dump::message(&message, &kind, Direction::In));
1226 }
1227
1228 Ok(message.into())
1229}