Skip to main content

elfo_core/
context.rs

1use std::{
2    future::poll_fn,
3    marker::PhantomData,
4    pin::Pin,
5    sync::{Arc, LazyLock},
6    task::Poll,
7};
8
9use futures::{pin_mut, Stream};
10use idr_ebr::EbrGuard;
11use tracing::{info, trace};
12
13use elfo_utils::unlikely;
14
15use crate::{
16    actor::{Actor, ActorStartInfo},
17    actor_status::ActorStatus,
18    addr::Addr,
19    address_book::AddressBook,
20    config::AnyConfig,
21    coop,
22    demux::Demux,
23    dumping::{Direction, Dump, Dumper, INTERNAL_CLASS},
24    envelope::{Envelope, MessageKind},
25    errors::{RequestError, SendError, TryRecvError, TrySendError},
26    mailbox::RecvResult,
27    message::{Message, Request},
28    messages, msg,
29    object::{BorrowedObject, Object, OwnedObject},
30    request_table::ResponseToken,
31    restarting::RestartPolicy,
32    routers::Singleton,
33    scope,
34    source::{SourceHandle, Sources, UnattachedSource},
35    ActorStatusKind,
36};
37
38use self::stats::Stats;
39
40mod stats;
41
42static DUMPER: LazyLock<Dumper> = LazyLock::new(|| Dumper::new(INTERNAL_CLASS));
43
44/// An actor execution context.
45pub struct Context<C = (), K = Singleton> {
46    book: AddressBook,
47    actor: Option<OwnedObject>, // `None` for group's and pruned context.
48    actor_addr: Addr,
49    actor_start_info: Option<ActorStartInfo>, // `None` for group's context,
50    group_addr: Addr,
51    demux: Demux,
52    config: Arc<C>,
53    key: K,
54    sources: Sources,
55    stage: Stage,
56    stats: Stats,
57}
58
59#[derive(Clone, Copy, PartialEq)]
60enum Stage {
61    PreRecv,
62    Working,
63    Closed,
64}
65
66assert_impl_all!(Context: Send);
67// TODO: !Sync?
68
69impl<C, K> Context<C, K> {
70    /// Returns the actor's address.
71    #[inline]
72    pub fn addr(&self) -> Addr {
73        self.actor_addr
74    }
75
76    /// Returns the current group's address.
77    #[inline]
78    pub fn group(&self) -> Addr {
79        self.group_addr
80    }
81
82    /// Returns the actual config.
83    #[inline]
84    pub fn config(&self) -> &C {
85        &self.config
86    }
87
88    /// Returns the actor's key.
89    #[inline]
90    pub fn key(&self) -> &K {
91        &self.key
92    }
93
94    /// Attaches the provided source to the context.
95    ///
96    /// Messages produced by the source will be available via
97    /// [`Context::recv()`] and [`Context::try_recv()`] methods.
98    pub fn attach<S1: SourceHandle>(&mut self, source: UnattachedSource<S1>) -> S1 {
99        source.attach_to(&mut self.sources)
100    }
101
102    /// Updates the actor's status.
103    ///
104    /// # Example
105    /// ```
106    /// # use elfo_core as elfo;
107    /// # fn exec(ctx: elfo::Context) {
108    /// ctx.set_status(elfo::ActorStatus::ALARMING.with_details("something wrong"));
109    /// # }
110    /// ```
111    pub fn set_status(&self, status: ActorStatus) {
112        ward!(self.actor.as_ref().and_then(|o| o.as_actor())).set_status(status);
113    }
114
115    /// Gets the actor's status kind.
116    ///
117    /// # Example
118    /// ```
119    /// # use elfo_core as elfo;
120    /// # fn exec(ctx: elfo::Context) {
121    /// // if actor is terminating.
122    /// assert!(ctx.status_kind().is_terminating());
123    /// // if actor is alarming.
124    /// assert!(ctx.status_kind().is_alarming());
125    /// // and so on...
126    /// # }
127    /// ```
128    /// # Panics
129    ///
130    /// Panics when called on pruned context.
131    pub fn status_kind(&self) -> ActorStatusKind {
132        self.actor
133            .as_ref()
134            .expect("called `status_kind()` on pruned context")
135            .as_actor()
136            .expect("invariant")
137            .status_kind()
138    }
139
140    /// Overrides the group's default mailbox capacity, which set in the config.
141    ///
142    /// Note: after restart the actor will be created from scratch, so this
143    /// override will be also reset to the group's default mailbox capacity.
144    ///
145    /// # Example
146    /// ```
147    /// # use elfo_core as elfo;
148    /// # fn exec(ctx: elfo::Context) {
149    /// // Override the group's default mailbox capacity.
150    /// ctx.set_mailbox_capacity(42);
151    ///
152    /// // Set the group's default mailbox capacity.
153    /// ctx.set_mailbox_capacity(None);
154    /// # }
155    /// ```
156    pub fn set_mailbox_capacity(&self, capacity: impl Into<Option<usize>>) {
157        ward!(self.actor.as_ref().and_then(|o| o.as_actor()))
158            .set_mailbox_capacity_override(capacity.into());
159    }
160
161    /// Overrides the group's default restart policy, which set in the config.
162    ///
163    /// Note: after restart the actor will be created from scratch, so this
164    /// override will be also reset to the group's default restart policy.
165    ///
166    /// # Example
167    /// ```
168    /// # use elfo_core as elfo;
169    /// # fn exec(ctx: elfo::Context) {
170    /// // Override the group's default restart policy.
171    /// ctx.set_restart_policy(elfo::RestartPolicy::never());
172    ///
173    /// // Set the group's default restart policy.
174    /// ctx.set_restart_policy(None);
175    /// # }
176    /// ```
177    pub fn set_restart_policy(&self, policy: impl Into<Option<RestartPolicy>>) {
178        ward!(self.actor.as_ref().and_then(|o| o.as_actor())).set_restart_policy(policy.into());
179    }
180
181    /// Closes the mailbox, that leads to returning `None` from `recv()` and
182    /// `try_recv()` after handling all available messages in the mailbox.
183    ///
184    /// Returns `true` if the mailbox has just been closed.
185    pub fn close(&self) -> bool {
186        ward!(self.actor.as_ref().and_then(|o| o.as_actor()), return false).close()
187    }
188
189    /// Sends a message using the [inter-group routing] system.
190    ///
191    /// It's possible to send requests if the response is not needed.
192    ///
193    /// Returns `Err` if the message hasn't reached any mailboxes.
194    ///
195    /// # Cancel safety
196    ///
197    /// If cancelled, recipients with full mailboxes wont't receive the message.
198    ///
199    /// # Example
200    /// ```
201    /// # use elfo_core as elfo;
202    /// # async fn exec(mut ctx: elfo::Context) {
203    /// # use elfo::{message, msg};
204    /// #[message]
205    /// struct SomethingHappened;
206    ///
207    /// // Fire and forget.
208    /// let _ = ctx.send(SomethingHappened).await;
209    ///
210    /// // Fire or log.
211    /// if let Err(error) = ctx.send(SomethingHappened).await {
212    ///     tracing::warn!(%error, "...");
213    /// }
214    /// # }
215    /// ```
216    ///
217    /// [inter-group routing]: https://actoromicon.rs/ch03-01-routing.html
218    pub async fn send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
219        let kind = MessageKind::regular(self.actor_addr);
220        self.do_send_async(message, kind).await
221    }
222
223    /// Tries to send a message using the [inter-group routing] system.
224    ///
225    /// It's possible to send requests if the response is not needed.
226    ///
227    /// Returns
228    /// * `Ok(())` if the message has been added to any mailbox.
229    /// * `Err(Full(_))` if some mailboxes are full.
230    /// * `Err(Closed(_))` otherwise.
231    ///
232    /// # Example
233    /// ```
234    /// # use elfo_core as elfo;
235    /// # async fn exec(mut ctx: elfo::Context) {
236    /// # use elfo::{message, msg};
237    /// #[message]
238    /// struct SomethingHappened;
239    ///
240    /// // Fire and forget.
241    /// let _ = ctx.try_send(SomethingHappened);
242    ///
243    /// // Fire or log.
244    /// if let Err(error) = ctx.try_send(SomethingHappened) {
245    ///     tracing::warn!(%error, "...");
246    /// }
247    /// # }
248    /// ```
249    ///
250    /// [inter-group routing]: https://actoromicon.rs/ch03-01-routing.html
251    pub fn try_send<M: Message>(&self, message: M) -> Result<(), TrySendError<M>> {
252        // XXX: avoid duplication with `unbounded_send()` and `send()`.
253
254        let kind = MessageKind::regular(self.actor_addr);
255
256        self.stats.on_sent_message(&message); // TODO: only if successful?
257
258        trace!("> {:?}", message);
259        if let Some(permit) = DUMPER.acquire_m(&message) {
260            permit.record(Dump::message(&message, &kind, Direction::Out));
261        }
262
263        let envelope = Envelope::new(message, kind);
264        let addrs = self.demux.filter(&envelope);
265
266        if addrs.is_empty() {
267            return Err(TrySendError::Closed(e2m(envelope)));
268        }
269
270        let guard = EbrGuard::new();
271
272        if addrs.len() == 1 {
273            return match self.book.get(addrs[0], &guard) {
274                Some(object) => object
275                    .try_send(Addr::NULL, envelope)
276                    .map_err(|err| err.map(e2m)),
277                None => Err(TrySendError::Closed(e2m(envelope))),
278            };
279        }
280
281        let mut unused = None;
282        let mut has_full = false;
283        let mut success = false;
284
285        for (addr, envelope) in addrs_with_envelope(envelope, &addrs) {
286            match self.book.get(addr, &guard) {
287                Some(object) => match object.try_send(Addr::NULL, envelope) {
288                    Ok(()) => success = true,
289                    Err(err) => {
290                        has_full |= err.is_full();
291                        unused = Some(err.into_inner());
292                    }
293                },
294                None => unused = Some(envelope),
295            };
296        }
297
298        if success {
299            Ok(())
300        } else if has_full {
301            Err(TrySendError::Full(e2m(unused.unwrap())))
302        } else {
303            Err(TrySendError::Closed(e2m(unused.unwrap())))
304        }
305    }
306
307    /// Sends a message using the [inter-group routing] system.
308    /// Affects other senders.
309    ///
310    /// Usually this method shouldn't be used because it can lead to high memory
311    /// usage and even OOM if the recipient works too slowly.
312    /// Prefer [`Context::try_send()`] or [`Context::send()`] instead.
313    ///
314    /// It's possible to send requests if the response is not needed.
315    ///
316    /// Returns `Err` if the message hasn't reached mailboxes.
317    ///
318    /// # Example
319    /// ```
320    /// # use elfo_core as elfo;
321    /// # async fn exec(mut ctx: elfo::Context) {
322    /// # use elfo::{message, msg};
323    /// #[message]
324    /// struct SomethingHappened;
325    ///
326    /// // Fire and forget.
327    /// let _ = ctx.unbounded_send(SomethingHappened);
328    ///
329    /// // Fire or log.
330    /// if let Err(error) = ctx.unbounded_send(SomethingHappened) {
331    ///     tracing::warn!(%error, "...");
332    /// }
333    /// # }
334    /// ```
335    ///
336    /// [inter-group routing]: https://actoromicon.rs/ch03-01-routing.html
337    pub fn unbounded_send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
338        let kind = MessageKind::regular(self.actor_addr);
339
340        self.stats.on_sent_message(&message); // TODO: only if successful?
341
342        trace!("> {:?}", message);
343        if let Some(permit) = DUMPER.acquire_m(&message) {
344            permit.record(Dump::message(&message, &kind, Direction::Out));
345        }
346
347        let envelope = Envelope::new(message, kind);
348        let addrs = self.demux.filter(&envelope);
349
350        if addrs.is_empty() {
351            return Err(SendError(e2m(envelope)));
352        }
353
354        let guard = EbrGuard::new();
355
356        if addrs.len() == 1 {
357            return match self.book.get(addrs[0], &guard) {
358                Some(object) => object
359                    .unbounded_send(Addr::NULL, envelope)
360                    .map_err(|err| err.map(e2m)),
361                None => Err(SendError(e2m(envelope))),
362            };
363        }
364
365        let mut unused = None;
366        let mut success = false;
367
368        for (addr, envelope) in addrs_with_envelope(envelope, &addrs) {
369            match self.book.get(addr, &guard) {
370                Some(object) => match object.unbounded_send(Addr::NULL, envelope) {
371                    Ok(()) => success = true,
372                    Err(err) => unused = Some(err.into_inner()),
373                },
374                None => unused = Some(envelope),
375            };
376        }
377
378        if success {
379            Ok(())
380        } else {
381            Err(SendError(e2m(unused.unwrap())))
382        }
383    }
384
385    /// Returns a request builder to send a request (on `resolve()`) using
386    /// the [inter-group routing] system.
387    ///
388    /// # Example
389    /// ```ignore
390    /// // Request and wait for a response.
391    /// let response = ctx.request(SomeCommand).resolve().await?;
392    ///
393    /// // Request and wait for all responses.
394    /// for result in ctx.request(SomeCommand).all().resolve().await {
395    ///     // ...
396    /// }
397    /// ```
398    ///
399    /// [inter-group routing]: https://actoromicon.rs/ch03-01-routing.html
400    #[inline]
401    pub fn request<R: Request>(&self, request: R) -> RequestBuilder<'_, C, K, R, Any> {
402        RequestBuilder::new(self, request)
403    }
404
405    /// Returns a request builder to send a request to the specified recipient.
406    ///
407    /// # Example
408    /// ```ignore
409    /// // Request and wait for a response.
410    /// let response = ctx.request_to(addr, SomeCommand).resolve().await?;
411    ///
412    /// // Request and wait for all responses.
413    /// for result in ctx.request_to(addr, SomeCommand).all().resolve().await {
414    ///     // ...
415    /// }
416    /// ```
417    #[inline]
418    pub fn request_to<R: Request>(
419        &self,
420        recipient: Addr,
421        request: R,
422    ) -> RequestBuilder<'_, C, K, R, Any> {
423        RequestBuilder::new(self, request).to(recipient)
424    }
425
426    async fn do_send_async<M: Message>(
427        &self,
428        message: M,
429        kind: MessageKind,
430    ) -> Result<(), SendError<M>> {
431        self.stats.on_sent_message(&message); // TODO: only if successful?
432
433        trace!("> {:?}", message);
434        if let Some(permit) = DUMPER.acquire_m(&message) {
435            permit.record(Dump::message(&message, &kind, Direction::Out));
436        }
437
438        let envelope = Envelope::new(message, kind);
439        let addrs = self.demux.filter(&envelope);
440
441        if addrs.is_empty() {
442            return Err(SendError(e2m(envelope)));
443        }
444
445        if addrs.len() == 1 {
446            let recipient = addrs[0];
447            return {
448                let guard = EbrGuard::new();
449                let entry = self.book.get(recipient, &guard);
450                let object = ward!(entry, return Err(SendError(e2m(envelope))));
451                Object::send(object, Addr::NULL, envelope)
452            }
453            .await
454            .map_err(|err| err.map(e2m));
455        }
456
457        let mut unused = None;
458        let mut success = false;
459
460        // TODO: send concurrently.
461        for (recipient, envelope) in addrs_with_envelope(envelope, &addrs) {
462            let returned_envelope = {
463                let guard = EbrGuard::new();
464                let entry = self.book.get(recipient, &guard);
465                let object = ward!(entry, {
466                    unused = Some(envelope);
467                    continue;
468                });
469                Object::send(object, Addr::NULL, envelope)
470            }
471            .await
472            .err()
473            .map(|err| err.into_inner());
474
475            unused = returned_envelope;
476            if unused.is_none() {
477                success = true;
478            }
479        }
480
481        if success {
482            Ok(())
483        } else {
484            Err(SendError(e2m(unused.unwrap())))
485        }
486    }
487
488    /// Sends a message to the specified recipient.
489    /// Waits if the recipient's mailbox is full.
490    ///
491    /// It's possible to send requests if the response is not needed.
492    ///
493    /// Returns `Err` if the message hasn't reached any mailboxes.
494    ///
495    /// # Cancel safety
496    ///
497    /// If cancelled, recipients with full mailboxes wont't receive the message.
498    ///
499    /// # Example
500    /// ```
501    /// # use elfo_core as elfo;
502    /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) {
503    /// # use elfo::{message, msg};
504    /// #[message]
505    /// struct SomethingHappened;
506    ///
507    /// let _ = ctx.send_to(addr, SomethingHappened).await;
508    ///
509    /// // Fire or log.
510    /// if let Err(error) = ctx.send_to(addr, SomethingHappened).await {
511    ///     tracing::warn!(%error, "...");
512    /// }
513    /// # }
514    /// ```
515    pub async fn send_to<M: Message>(
516        &self,
517        recipient: Addr,
518        message: M,
519    ) -> Result<(), SendError<M>> {
520        let kind = MessageKind::regular(self.actor_addr);
521        self.do_send_to(recipient, message, kind, |object, envelope| {
522            Object::send(object, recipient, envelope)
523        })?
524        .await
525        .map_err(|err| err.map(e2m))
526    }
527
528    /// Tries to send a message to the specified recipient.
529    /// Returns an error if the recipient's mailbox is full.
530    ///
531    /// It's possible to send requests if the response is not needed.
532    ///
533    /// Returns
534    /// * `Ok(())` if the message has been added to any mailbox.
535    /// * `Err(Full(_))` if some mailboxes are full.
536    /// * `Err(Closed(_))` otherwise.
537    ///
538    /// # Example
539    /// ```
540    /// # use elfo_core as elfo;
541    /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) {
542    /// # use elfo::{message, msg};
543    /// #[message]
544    /// struct SomethingHappened;
545    ///
546    /// // Fire and forget.
547    /// let _ = ctx.try_send_to(addr, SomethingHappened);
548    ///
549    /// // Fire or log.
550    /// if let Err(error) = ctx.try_send_to(addr, SomethingHappened) {
551    ///     tracing::warn!(%error, "...");
552    /// }
553    /// # }
554    /// ```
555    pub fn try_send_to<M: Message>(
556        &self,
557        recipient: Addr,
558        message: M,
559    ) -> Result<(), TrySendError<M>> {
560        let kind = MessageKind::regular(self.actor_addr);
561        self.do_send_to(recipient, message, kind, |object, envelope| {
562            object
563                .try_send(recipient, envelope)
564                .map_err(|err| err.map(e2m))
565        })?
566    }
567
568    /// Sends a message to the specified recipient.
569    /// Affects other senders.
570    ///
571    /// Usually this method shouldn't be used because it can lead to high memory
572    /// usage and even OOM if the recipient works too slowly.
573    /// Prefer [`Context::try_send_to()`] or [`Context::send_to()`] instead.
574    ///
575    /// It's possible to send requests if the response is not needed.
576    ///
577    /// Returns `Err` if the message hasn't reached mailboxes.
578    ///
579    /// # Example
580    /// ```
581    /// # use elfo_core as elfo;
582    /// # async fn exec(mut ctx: elfo::Context, addr: elfo::Addr) {
583    /// # use elfo::{message, msg};
584    /// #[message]
585    /// struct SomethingHappened;
586    ///
587    /// // Fire and forget.
588    /// let _ = ctx.unbounded_send_to(addr, SomethingHappened);
589    ///
590    /// // Fire or log.
591    /// if let Err(error) = ctx.unbounded_send_to(addr, SomethingHappened) {
592    ///     tracing::warn!(%error, "...");
593    /// }
594    /// # }
595    /// ```
596    pub fn unbounded_send_to<M: Message>(
597        &self,
598        recipient: Addr,
599        message: M,
600    ) -> Result<(), SendError<M>> {
601        let kind = MessageKind::regular(self.actor_addr);
602        self.do_send_to(recipient, message, kind, |object, envelope| {
603            object
604                .unbounded_send(recipient, envelope)
605                .map_err(|err| err.map(e2m))
606        })?
607    }
608
609    #[inline(always)]
610    fn do_send_to<M: Message, R>(
611        &self,
612        recipient: Addr,
613        message: M,
614        kind: MessageKind,
615        f: impl FnOnce(BorrowedObject<'_>, Envelope) -> R,
616    ) -> Result<R, SendError<M>> {
617        self.stats.on_sent_message(&message); // TODO: only if successful?
618
619        trace!(to = %recipient, "> {:?}", message);
620        if let Some(permit) = DUMPER.acquire_m(&message) {
621            permit.record(Dump::message(&message, &kind, Direction::Out));
622        }
623
624        let guard = EbrGuard::new();
625        let entry = self.book.get(recipient, &guard);
626        let object = ward!(entry, return Err(SendError(message)));
627        let envelope = Envelope::new(message, kind);
628
629        Ok(f(object, envelope))
630    }
631
632    /// Responds to the requester with the provided response.
633    ///
634    /// The token can be used only once.
635    ///
636    /// ```ignore
637    /// msg!(match envelope {
638    ///     (SomeRequest, token) => {
639    ///         ctx.respond(token, SomeResponse);
640    ///     }
641    /// })
642    /// ```
643    pub fn respond<R: Request>(&self, token: ResponseToken<R>, message: R::Response) {
644        if token.is_forgotten() {
645            return;
646        }
647
648        let token = token.into_untyped();
649        let recipient = token.sender();
650        let message = R::Wrapper::from(message);
651        self.stats.on_sent_message(&message); // TODO: only if successful?
652
653        let kind = MessageKind::Response {
654            sender: self.addr(),
655            request_id: token.request_id(),
656        };
657
658        trace!(to = %recipient, "> {:?}", message);
659        if let Some(permit) = DUMPER.acquire_m(&message) {
660            permit.record(Dump::message(&message, &kind, Direction::Out));
661        }
662
663        let envelope = Envelope::new(message, kind);
664        let guard = EbrGuard::new();
665        let object = ward!(self.book.get(recipient, &guard));
666        object.respond(token, Ok(envelope));
667    }
668
669    /// Receives the next envelope from the mailbox or sources.
670    /// If the envelope isn't available, the method waits for the next one.
671    /// If the mailbox is closed, `None` is returned.
672    ///
673    /// # Budget
674    ///
675    /// The method returns the execution back to the runtime once the actor's
676    /// budget has been exhausted. It prevents the actor from blocking the
677    /// runtime for too long. See [`coop`] for details.
678    ///
679    /// # Cancel safety
680    ///
681    /// This method is cancel safe. However, using `select!` requires handling
682    /// tracing on your own, so avoid it if possible (use sources instead).
683    ///
684    /// # Panics
685    ///
686    /// If the method is called again after `None` is returned.
687    ///
688    /// # Example
689    /// ```
690    /// # use elfo_core as elfo;
691    /// # async fn exec(mut ctx: elfo::Context) {
692    /// # use elfo::{message, msg};
693    /// # #[message]
694    /// # struct SomethingHappened;
695    /// while let Some(envelope) = ctx.recv().await {
696    ///     msg!(match envelope {
697    ///         SomethingHappened => { /* ... */ },
698    ///     });
699    /// }
700    /// # }
701    /// ```
702    pub async fn recv(&mut self) -> Option<Envelope>
703    where
704        C: 'static,
705    {
706        'outer: loop {
707            self.pre_recv().await;
708
709            let envelope = 'received: {
710                let mailbox_fut = self.actor.as_ref()?.as_actor()?.recv();
711                pin_mut!(mailbox_fut);
712
713                tokio::select! {
714                    result = mailbox_fut => match result {
715                        RecvResult::Data(envelope) => {
716                            break 'received envelope;
717                        },
718                        RecvResult::Closed(trace_id) => {
719                            scope::set_trace_id(trace_id);
720                            let actor = self.actor.as_ref()?.as_actor()?;
721                            on_input_closed(&mut self.stage, actor);
722                            return None;
723                        }
724                    },
725                    option = self.sources.next(), if !self.sources.is_empty() => {
726                        let envelope = ward!(option, continue 'outer);
727                        break 'received envelope;
728                    },
729                }
730            };
731
732            if let Some(envelope) = self.post_recv(envelope) {
733                return Some(envelope);
734            }
735        }
736    }
737
738    /// Receives the next envelope from the mailbox or sources without waiting.
739    /// If the envelope isn't available, `Err(TryRecvError::Empty)` is returned.
740    /// If the mailbox is closed, `Err(TryRecvError::Closed)` is returned.
741    /// Useful to batch message processing.
742    ///
743    /// The method is async due to the following reasons:
744    /// 1. To poll sources, not only the mailbox.
745    /// 2. To respect the actor budget (see below).
746    ///
747    /// # Budget
748    ///
749    /// The method returns the execution back to the runtime once the actor's
750    /// budget has been exhausted. It prevents the actor from blocking the
751    /// runtime for too long. See [`coop`] for details.
752    ///
753    /// # Cancel safety
754    ///
755    /// This method is cancel safe. However, using `select!` requires handling
756    /// tracing on your own, so avoid it if possible (use sources instead).
757    ///
758    /// # Panics
759    ///
760    /// If the method is called again after `Err(TryRecvError::Closed)`.
761    ///
762    /// # Example
763    ///
764    /// Handle all available messages:
765    /// ```
766    /// # use elfo_core as elfo;
767    /// # async fn exec(mut ctx: elfo::Context) {
768    /// # fn handle_batch(_batch: impl Iterator<Item = elfo::Envelope>) {}
769    /// let mut batch = Vec::new();
770    ///
771    /// loop {
772    ///     match ctx.try_recv().await {
773    ///         Ok(envelope) => batch.push(envelope),
774    ///         Err(err) => {
775    ///             handle_batch(batch.drain(..));
776    ///
777    ///             if err.is_empty() {
778    ///                 // Wait for the next batch.
779    ///                 if let Some(envelope) = ctx.recv().await {
780    ///                     batch.push(envelope);
781    ///                     continue;
782    ///                 }
783    ///             }
784    ///
785    ///             break;
786    ///         },
787    ///     }
788    /// }
789    /// # }
790    /// ```
791    pub async fn try_recv(&mut self) -> Result<Envelope, TryRecvError>
792    where
793        C: 'static,
794    {
795        #[allow(clippy::never_loop)] // false positive
796        loop {
797            self.pre_recv().await;
798
799            let envelope = 'received: {
800                let actor = ward!(
801                    self.actor.as_ref().and_then(|o| o.as_actor()),
802                    return Err(TryRecvError::Closed)
803                );
804
805                // TODO: poll mailbox and sources fairly.
806                match actor.try_recv() {
807                    Some(RecvResult::Data(envelope)) => {
808                        break 'received envelope;
809                    }
810                    Some(RecvResult::Closed(trace_id)) => {
811                        scope::set_trace_id(trace_id);
812                        on_input_closed(&mut self.stage, actor);
813                        return Err(TryRecvError::Closed);
814                    }
815                    None => {}
816                }
817
818                if !self.sources.is_empty() {
819                    let envelope = poll_fn(|cx| match Pin::new(&mut self.sources).poll_next(cx) {
820                        Poll::Ready(Some(envelope)) => Poll::Ready(Some(envelope)),
821                        _ => Poll::Ready(None),
822                    })
823                    .await;
824
825                    if let Some(envelope) = envelope {
826                        break 'received envelope;
827                    }
828                }
829
830                self.stats.on_empty_mailbox();
831                return Err(TryRecvError::Empty);
832            };
833
834            if let Some(envelope) = self.post_recv(envelope) {
835                return Ok(envelope);
836            }
837        }
838    }
839
840    /// Retrieves information related to the start of the actor.
841    ///
842    /// # Panics
843    ///
844    /// This method will panic if the context is pruned, indicating that the
845    /// required information is no longer available.
846    ///
847    /// # Example
848    /// ```
849    /// # use elfo_core as elfo;
850    /// # use elfo_core::{ActorStartCause, ActorStartInfo};
851    /// # async fn exec(mut ctx: elfo::Context) {
852    /// match ctx.start_info().cause {
853    ///     ActorStartCause::GroupMounted => {
854    ///         // The actor started because its group was mounted.
855    ///     }
856    ///     ActorStartCause::OnMessage => {
857    ///         // The actor started in response to a message.
858    ///     }
859    ///     ActorStartCause::Restarted => {
860    ///         // The actor started due to the restart policy.
861    ///     }
862    ///     _ => {}
863    /// }
864    /// # }
865    /// ```
866    pub fn start_info(&self) -> &ActorStartInfo {
867        self.actor_start_info
868            .as_ref()
869            .expect("start_info is not available for a group context")
870    }
871
872    async fn pre_recv(&mut self) {
873        self.stats.on_recv();
874
875        coop::consume_budget().await;
876
877        if unlikely(self.stage == Stage::Closed) {
878            panic!("calling `recv()` or `try_recv()` after `None` is returned, an infinite loop?");
879        }
880
881        if unlikely(self.stage == Stage::PreRecv) {
882            let actor = ward!(self.actor.as_ref().and_then(|o| o.as_actor()));
883            if actor.status_kind().is_initializing() {
884                actor.set_status(ActorStatus::NORMAL);
885            }
886            self.stage = Stage::Working;
887        }
888    }
889
890    fn post_recv(&mut self, envelope: Envelope) -> Option<Envelope>
891    where
892        C: 'static,
893    {
894        scope::set_trace_id(envelope.trace_id());
895
896        let envelope = msg!(match envelope {
897            (messages::UpdateConfig { config }, token) => {
898                self.config = config.get_user::<C>().clone();
899                info!("config updated");
900                let message = messages::ConfigUpdated {};
901                let kind = MessageKind::regular(self.actor_addr);
902                let envelope = Envelope::new(message, kind);
903                self.respond(token, Ok(()));
904                envelope
905            }
906            envelope => envelope,
907        });
908
909        let message = envelope.message();
910        trace!("< {:?}", message);
911        if let Some(permit) = DUMPER.acquire_m(&*message) {
912            let kind = envelope.message_kind();
913            permit.record(Dump::message(&*message, kind, Direction::In));
914        }
915
916        // We should change the status after dumping the original message
917        // in order to see `ActorStatusReport` after that message.
918        if envelope.is::<messages::Terminate>() {
919            self.set_status(ActorStatus::TERMINATING);
920        }
921
922        self.stats.on_received_envelope(&envelope);
923
924        msg!(match envelope {
925            (messages::Ping, token) => {
926                self.respond(token, ());
927                None
928            }
929            envelope => Some(envelope),
930        })
931    }
932
933    /// This is a part of private API for now.
934    /// We should provide a way to handle it asynchronous.
935    #[doc(hidden)]
936    pub async fn finished(&self, addr: Addr) {
937        ward!(self.book.get_owned(addr)).finished().await;
938    }
939
940    /// Used to get the typed config from `ValidateConfig`.
941    /// ```ignore
942    /// msg!(match envelope {
943    ///     (ValidateConfig { config, .. }, token) => {
944    ///         let new_config = ctx.unpack_config(&config);
945    ///         ctx.respond(token, Err("oops".into()));
946    ///     }
947    /// })
948    /// ```
949    pub fn unpack_config<'c>(&self, config: &'c AnyConfig) -> &'c C
950    where
951        C: for<'de> serde::Deserialize<'de> + 'static,
952    {
953        config.get_user()
954    }
955
956    /// Produces a new context that can be used for sending messages only.
957    ///
958    /// Pruned contexts are likely to be removed in favor of `Output`.
959    pub fn pruned(&self) -> Context {
960        Context {
961            book: self.book.clone(),
962            actor: None,
963            actor_addr: self.actor_addr,
964            actor_start_info: self.actor_start_info.clone(),
965            group_addr: self.group_addr,
966            demux: self.demux.clone(),
967            config: Arc::new(()),
968            key: Singleton,
969            sources: Sources::new(),
970            stage: self.stage,
971            stats: Stats::empty(),
972        }
973    }
974
975    #[doc(hidden)]
976    #[instability::unstable]
977    pub fn book(&self) -> &AddressBook {
978        &self.book
979    }
980
981    pub(crate) fn with_config<C1>(self, config: Arc<C1>) -> Context<C1, K> {
982        Context {
983            book: self.book,
984            actor: self.actor,
985            actor_addr: self.actor_addr,
986            actor_start_info: self.actor_start_info,
987            group_addr: self.group_addr,
988            demux: self.demux,
989            config,
990            key: self.key,
991            sources: self.sources,
992            stage: self.stage,
993            stats: self.stats,
994        }
995    }
996
997    pub(crate) fn with_addr(mut self, addr: Addr) -> Self {
998        self.actor = self.book.get_owned(addr);
999        assert!(self.actor.is_some());
1000        self.actor_addr = addr;
1001        self.stats = Stats::startup();
1002        self
1003    }
1004
1005    pub(crate) fn with_group(mut self, group: Addr) -> Self {
1006        self.group_addr = group;
1007        self
1008    }
1009
1010    pub(crate) fn with_start_info(mut self, actor_start_info: ActorStartInfo) -> Self {
1011        self.actor_start_info = Some(actor_start_info);
1012        self
1013    }
1014
1015    pub(crate) fn with_key<K1>(self, key: K1) -> Context<C, K1> {
1016        Context {
1017            book: self.book,
1018            actor: self.actor,
1019            actor_addr: self.actor_addr,
1020            actor_start_info: self.actor_start_info,
1021            group_addr: self.group_addr,
1022            demux: self.demux,
1023            config: self.config,
1024            key,
1025            sources: self.sources,
1026            stage: self.stage,
1027            stats: self.stats,
1028        }
1029    }
1030}
1031
1032#[cold]
1033fn e2m<M: Message>(envelope: Envelope) -> M {
1034    envelope.unpack().expect("invalid message").0
1035}
1036
1037#[cold]
1038fn on_input_closed(stage: &mut Stage, actor: &Actor) {
1039    if !actor.status_kind().is_terminating() {
1040        actor.set_status(ActorStatus::TERMINATING);
1041    }
1042    *stage = Stage::Closed;
1043    trace!("input closed");
1044}
1045
1046fn addrs_with_envelope(
1047    envelope: Envelope,
1048    addrs: &[Addr],
1049) -> impl Iterator<Item = (Addr, Envelope)> + '_ {
1050    let mut envelope = Some(envelope);
1051
1052    // TODO: use the visitor pattern in order to avoid extra cloning,
1053    //       but think about response tokens first.
1054    addrs.iter().enumerate().map(move |(i, addr)| {
1055        (
1056            *addr,
1057            if i + 1 == addrs.len() {
1058                envelope.take().unwrap()
1059            } else {
1060                envelope.as_ref().unwrap().duplicate()
1061            },
1062        )
1063    })
1064}
1065
1066impl Context {
1067    pub(crate) fn new(book: AddressBook, demux: Demux) -> Self {
1068        Self {
1069            book,
1070            actor: None,
1071            actor_addr: Addr::NULL,
1072            group_addr: Addr::NULL,
1073            actor_start_info: None,
1074            demux,
1075            config: Arc::new(()),
1076            key: Singleton,
1077            sources: Sources::new(),
1078            stage: Stage::PreRecv,
1079            stats: Stats::empty(),
1080        }
1081    }
1082}
1083
1084// TODO(v0.2): remove this instance.
1085impl<C, K: Clone> Clone for Context<C, K> {
1086    fn clone(&self) -> Self {
1087        Self {
1088            book: self.book.clone(),
1089            actor: self.book.get_owned(self.actor_addr),
1090            actor_addr: self.actor_addr,
1091            actor_start_info: self.actor_start_info.clone(),
1092            group_addr: self.group_addr,
1093            demux: self.demux.clone(),
1094            config: self.config.clone(),
1095            key: self.key.clone(),
1096            sources: Sources::new(),
1097            stage: self.stage,
1098            stats: Stats::empty(),
1099        }
1100    }
1101}
1102
1103#[must_use]
1104pub struct RequestBuilder<'c, C, K, R, M> {
1105    context: &'c Context<C, K>,
1106    request: R,
1107    to: Option<Addr>,
1108    marker: PhantomData<M>,
1109}
1110
1111pub struct Any;
1112pub struct All;
1113
1114impl<'c, C, K, R> RequestBuilder<'c, C, K, R, Any> {
1115    fn new(context: &'c Context<C, K>, request: R) -> Self {
1116        Self {
1117            context,
1118            request,
1119            to: None,
1120            marker: PhantomData,
1121        }
1122    }
1123
1124    #[inline]
1125    pub fn all(self) -> RequestBuilder<'c, C, K, R, All> {
1126        RequestBuilder {
1127            context: self.context,
1128            request: self.request,
1129            to: self.to,
1130            marker: PhantomData,
1131        }
1132    }
1133}
1134
1135impl<C, K, R: Request, M> RequestBuilder<'_, C, K, R, M> {
1136    /// Specified the recipient of the request.
1137    #[inline]
1138    fn to(mut self, addr: Addr) -> Self {
1139        self.to = Some(addr);
1140        self
1141    }
1142
1143    async fn do_send(self, kind: MessageKind) -> bool {
1144        if let Some(recipient) = self.to {
1145            let res = self
1146                .context
1147                .do_send_to(recipient, self.request, kind, |o, e| {
1148                    Object::send(o, recipient, e)
1149                });
1150
1151            match res {
1152                Ok(fut) => fut.await.is_ok(),
1153                Err(_) => false,
1154            }
1155        } else {
1156            self.context.do_send_async(self.request, kind).await.is_ok()
1157        }
1158    }
1159}
1160
1161// TODO: add `pub async fn id() { ... }`
1162impl<C: 'static, K, R: Request> RequestBuilder<'_, C, K, R, Any> {
1163    /// Waits for the response.
1164    pub async fn resolve(self) -> Result<R::Response, RequestError> {
1165        // TODO: use `context.actor` after removing pruned contexts.
1166        let this = self.context.actor_addr;
1167        let object = self.context.book.get_owned(this).expect("invalid addr");
1168        let actor = object.as_actor().expect("can be called only on actors");
1169        let token =
1170            actor
1171                .request_table()
1172                .new_request(self.context.book.clone(), scope::trace_id(), false);
1173        let request_id = token.request_id();
1174        let kind = MessageKind::RequestAny(token);
1175
1176        if !self.do_send(kind).await {
1177            actor.request_table().cancel_request(request_id);
1178            return Err(RequestError::Failed);
1179        }
1180
1181        let mut responses = actor.request_table().wait(request_id).await;
1182        debug_assert_eq!(responses.len(), 1);
1183        prepare_response::<R>(responses.pop().expect("missing response"))
1184    }
1185}
1186
1187impl<C: 'static, K, R: Request> RequestBuilder<'_, C, K, R, All> {
1188    /// Waits for the responses.
1189    pub async fn resolve(self) -> Vec<Result<R::Response, RequestError>> {
1190        // TODO: use `context.actor` after removing pruned contexts.
1191        let this = self.context.actor_addr;
1192        let object = self.context.book.get_owned(this).expect("invalid addr");
1193        let actor = object.as_actor().expect("can be called only on actors");
1194        let token =
1195            actor
1196                .request_table()
1197                .new_request(self.context.book.clone(), scope::trace_id(), true);
1198        let request_id = token.request_id();
1199        let kind = MessageKind::RequestAll(token);
1200
1201        if !self.do_send(kind).await {
1202            actor.request_table().cancel_request(request_id);
1203            return vec![Err(RequestError::Failed)];
1204        }
1205
1206        actor
1207            .request_table()
1208            .wait(request_id)
1209            .await
1210            .into_iter()
1211            .map(prepare_response::<R>)
1212            .collect()
1213    }
1214}
1215
1216fn prepare_response<R: Request>(
1217    response: Result<Envelope, RequestError>,
1218) -> Result<R::Response, RequestError> {
1219    let envelope = response?;
1220    let (message, kind) = envelope.unpack::<R::Wrapper>().expect("invalid response");
1221
1222    // TODO: increase a counter.
1223    trace!("< {:?}", message);
1224    if let Some(permit) = DUMPER.acquire_m(&message) {
1225        permit.record(Dump::message(&message, &kind, Direction::In));
1226    }
1227
1228    Ok(message.into())
1229}