elfo_core/
context.rs

1use std::{marker::PhantomData, sync::Arc};
2
3use futures::{future::poll_fn, pin_mut};
4use once_cell::sync::Lazy;
5use tracing::{error, info, trace};
6
7use crate::{self as elfo};
8use elfo_macros::msg_raw as msg;
9
10use crate::{
11    actor::{Actor, ActorStatus},
12    addr::Addr,
13    address_book::AddressBook,
14    config::AnyConfig,
15    demux::Demux,
16    dumping::{self, Direction, Dump, Dumper, INTERNAL_CLASS},
17    envelope::{Envelope, MessageKind},
18    errors::{RequestError, SendError, TryRecvError, TrySendError},
19    mailbox::RecvResult,
20    message::{Message, Request},
21    messages,
22    request_table::ResponseToken,
23    routers::Singleton,
24    scope,
25    source::{Combined, Source},
26};
27
28use self::{budget::Budget, stats::Stats};
29
30mod budget;
31mod stats;
32
33static DUMPER: Lazy<Dumper> = Lazy::new(|| Dumper::new(INTERNAL_CLASS));
34
35/// An actor execution context.
36pub struct Context<C = (), K = Singleton, S = ()> {
37    book: AddressBook,
38    addr: Addr,
39    group: Addr,
40    demux: Demux,
41    config: Arc<C>,
42    key: K,
43    source: S,
44    stage: Stage,
45    stats: Stats,
46    budget: Budget,
47}
48
49#[derive(Clone, Copy, PartialEq)]
50enum Stage {
51    PreRecv,
52    Working,
53    Closed,
54}
55
56assert_impl_all!(Context: Send);
57// TODO: !Sync?
58
59impl<C, K, S> Context<C, K, S> {
60    /// Returns the actor's address.
61    #[inline]
62    pub fn addr(&self) -> Addr {
63        self.addr
64    }
65
66    /// Returns the current group's address.
67    #[inline]
68    pub fn group(&self) -> Addr {
69        self.group
70    }
71
72    #[deprecated]
73    #[doc(hidden)]
74    #[cfg(feature = "test-util")]
75    pub fn set_addr(&mut self, addr: Addr) {
76        self.addr = addr;
77    }
78
79    /// Returns the actual config.
80    #[inline]
81    pub fn config(&self) -> &C {
82        &self.config
83    }
84
85    /// Returns the actor's key.
86    #[inline]
87    pub fn key(&self) -> &K {
88        &self.key
89    }
90
91    /// Transforms the context to another one with the provided source.
92    pub fn with<S1>(self, source: S1) -> Context<C, K, Combined<S, S1>> {
93        Context {
94            book: self.book,
95            addr: self.addr,
96            group: self.group,
97            demux: self.demux,
98            config: self.config,
99            key: self.key,
100            source: Combined::new(self.source, source),
101            stage: Stage::PreRecv,
102            stats: self.stats,
103            budget: self.budget,
104        }
105    }
106
107    /// Updates the actor's status.
108    ///
109    /// ```ignore
110    /// ctx.set_status(ActorStatus::ALARMING.with_details("something wrong"));
111    /// ```
112    pub fn set_status(&self, status: ActorStatus) {
113        let object = ward!(self.book.get_owned(self.addr));
114        let actor = ward!(object.as_actor());
115        actor.set_status(status);
116    }
117
118    /// Closes the mailbox, that leads to returning `None` from `recv()` and
119    /// `try_recv()` after handling all available messages in the mailbox.
120    ///
121    /// Returns `true` if the mailbox has just been closed.
122    pub fn close(&self) -> bool {
123        let object = ward!(self.book.get_owned(self.addr), return false);
124        ward!(object.as_actor(), return false).close()
125    }
126
127    /// Sends a message using the routing system.
128    ///
129    /// Returns `Err` if the message hasn't reached any mailboxes.
130    ///
131    /// # Example
132    /// ```ignore
133    /// // Fire and forget.
134    /// let _ = ctx.send(SomethingHappened).await;
135    ///
136    /// // Fire or fail.
137    /// ctx.send(SomethingHappened).await?;
138    ///
139    /// // Fire or log.
140    /// if let Ok(err) = ctx.send(SomethingHappened).await {
141    ///     warn!("...", error = err);
142    /// }
143    /// ```
144    pub async fn send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
145        let kind = MessageKind::Regular { sender: self.addr };
146        self.do_send(message, kind).await
147    }
148
149    /// Tries to send a message using the routing system.
150    ///
151    /// Returns
152    /// * `Ok(())` if the message has been added to any mailbox.
153    /// * `Err(Full(_))` if some mailboxes are full.
154    /// * `Err(Closed(_))` otherwise.
155    ///
156    /// # Example
157    /// ```ignore
158    /// // Fire and forget.
159    /// let _ = ctx.try_send(SomethingHappened);
160    ///
161    /// // Fire or fail.
162    /// ctx.try_send(SomethingHappened)?;
163    ///
164    /// // Fire or log.
165    /// if let Err(err) = ctx.try_send(SomethingHappened) {
166    ///     warn!("...", error = err);
167    /// }
168    /// ```
169    pub fn try_send<M: Message>(&self, message: M) -> Result<(), TrySendError<M>> {
170        let kind = MessageKind::Regular { sender: self.addr };
171
172        // XXX: unify with `do_send`.
173        self.stats.on_sent_message::<M>();
174
175        trace!("> {:?}", message);
176        if let Some(permit) = DUMPER.acquire_m::<M>() {
177            permit.record(Dump::message(message.clone(), &kind, Direction::Out));
178        }
179
180        let envelope = Envelope::new(message, kind).upcast();
181        let addrs = self.demux.filter(&envelope);
182
183        if addrs.is_empty() {
184            return Err(TrySendError::Closed(e2m(envelope)));
185        }
186
187        if addrs.len() == 1 {
188            return match self.book.get(addrs[0]) {
189                Some(object) => object.try_send(envelope).map_err(|err| err.map(e2m)),
190                None => Err(TrySendError::Closed(e2m(envelope))),
191            };
192        }
193
194        let mut unused = None;
195        let mut has_full = false;
196        let mut success = false;
197
198        // TODO: use the visitor pattern in order to avoid extra cloning.
199        for addr in addrs {
200            let envelope = unused.take().or_else(|| envelope.duplicate(&self.book));
201            let envelope = ward!(envelope, break);
202
203            match self.book.get(addr) {
204                Some(object) => match object.try_send(envelope) {
205                    Ok(()) => success = true,
206                    Err(err) => {
207                        has_full |= err.is_full();
208                        unused = Some(err.into_inner());
209                    }
210                },
211                None => unused = Some(envelope),
212            };
213        }
214
215        if success {
216            Ok(())
217        } else if has_full {
218            Err(TrySendError::Full(e2m(envelope)))
219        } else {
220            Err(TrySendError::Closed(e2m(envelope)))
221        }
222    }
223
224    /// Returns a request builder.
225    ///
226    /// # Example
227    /// ```ignore
228    /// // Request and wait for a response.
229    /// let response = ctx.request(SomeCommand).resolve().await?;
230    ///
231    /// // Request and wait for all responses.
232    /// for result in ctx.request(SomeCommand).all().resolve().await {
233    ///     // ...
234    /// }
235    /// ```
236    #[inline]
237    pub fn request<R: Request>(&self, request: R) -> RequestBuilder<'_, C, K, S, R, Any> {
238        RequestBuilder::new(self, request)
239    }
240
241    /// Returns a request builder to the specified recipient.
242    ///
243    /// # Example
244    /// ```ignore
245    /// // Request and wait for a response.
246    /// let response = ctx.request_to(addr, SomeCommand).resolve().await?;
247    ///
248    /// // Request and wait for all responses.
249    /// for result in ctx.request_to(addr, SomeCommand).all().resolve().await {
250    ///     // ...
251    /// }
252    /// ```
253    #[inline]
254    pub fn request_to<R: Request>(
255        &self,
256        recipient: Addr,
257        request: R,
258    ) -> RequestBuilder<'_, C, K, S, R, Any> {
259        RequestBuilder::new(self, request).to(recipient)
260    }
261
262    async fn do_send<M: Message>(&self, message: M, kind: MessageKind) -> Result<(), SendError<M>> {
263        self.stats.on_sent_message::<M>();
264
265        trace!("> {:?}", message);
266        if let Some(permit) = DUMPER.acquire_m::<M>() {
267            permit.record(Dump::message(message.clone(), &kind, Direction::Out));
268        }
269
270        let envelope = Envelope::new(message, kind).upcast();
271        let addrs = self.demux.filter(&envelope);
272
273        if addrs.is_empty() {
274            return Err(SendError(e2m(envelope)));
275        }
276
277        if addrs.len() == 1 {
278            return match self.book.get_owned(addrs[0]) {
279                Some(object) => object
280                    .send(self, envelope)
281                    .await
282                    .map_err(|err| SendError(e2m(err.0))),
283                None => Err(SendError(e2m(envelope))),
284            };
285        }
286
287        let mut unused = None;
288        let mut success = false;
289
290        // TODO: use the visitor pattern in order to avoid extra cloning.
291        // TODO: send concurrently.
292        for addr in addrs {
293            let envelope = unused.take().or_else(|| envelope.duplicate(&self.book));
294            let envelope = ward!(envelope, break);
295
296            match self.book.get_owned(addr) {
297                Some(object) => {
298                    unused = object.send(self, envelope).await.err().map(|err| err.0);
299                    if unused.is_none() {
300                        success = true;
301                    }
302                }
303                None => unused = Some(envelope),
304            };
305        }
306
307        if success {
308            Ok(())
309        } else {
310            Err(SendError(e2m(envelope)))
311        }
312    }
313
314    /// Sends a message to the specified recipient.
315    ///
316    /// Returns `Err` if the message hasn't reached any mailboxes.
317    ///
318    /// # Example
319    /// ```ignore
320    /// // Fire and forget.
321    /// let _ = ctx.send_to(addr, SomethingHappened).await;
322    ///
323    /// // Fire or fail.
324    /// ctx.send_to(addr, SomethingHappened).await?;
325    ///
326    /// // Fire or log.
327    /// if let Some(err) = ctx.send_to(addr, SomethingHappened).await {
328    ///     warn!("...", error = err);
329    /// }
330    /// ```
331    pub async fn send_to<M: Message>(
332        &self,
333        recipient: Addr,
334        message: M,
335    ) -> Result<(), SendError<M>> {
336        let kind = MessageKind::Regular { sender: self.addr };
337        self.do_send_to(recipient, message, kind).await
338    }
339
340    async fn do_send_to<M: Message>(
341        &self,
342        recipient: Addr,
343        message: M,
344        kind: MessageKind,
345    ) -> Result<(), SendError<M>> {
346        self.stats.on_sent_message::<M>();
347
348        trace!(to = %recipient, "> {:?}", message);
349        if let Some(permit) = DUMPER.acquire_m::<M>() {
350            permit.record(Dump::message(message.clone(), &kind, Direction::Out));
351        }
352
353        let entry = self.book.get_owned(recipient);
354        let object = ward!(entry, return Err(SendError(message)));
355        let envelope = Envelope::new(message, kind);
356        let fut = object.send(self, envelope.upcast());
357        let result = fut.await;
358        result.map_err(|err| SendError(e2m(err.0)))
359    }
360
361    /// Tries to send a message to the specified recipient.
362    ///
363    /// Returns `Err` if the message hasn't reached mailboxes or they are full.
364    ///
365    /// # Example
366    /// ```ignore
367    /// // Fire and forget.
368    /// let _ = ctx.send(SomethingHappened).await;
369    ///
370    /// // Fire or fail.
371    /// ctx.send(SomethingHappened).await?;
372    ///
373    /// // Fire or log.
374    /// if let Some(err) = ctx.send(SomethingHappened).await {
375    ///     warn!("...", error = err);
376    /// }
377    /// ```
378    pub fn try_send_to<M: Message>(
379        &self,
380        recipient: Addr,
381        message: M,
382    ) -> Result<(), TrySendError<M>> {
383        self.stats.on_sent_message::<M>();
384
385        let kind = MessageKind::Regular { sender: self.addr };
386
387        trace!(to = %recipient, "> {:?}", message);
388        if let Some(permit) = DUMPER.acquire_m::<M>() {
389            permit.record(Dump::message(message.clone(), &kind, Direction::Out));
390        }
391
392        let entry = self.book.get(recipient);
393        let object = ward!(entry, return Err(TrySendError::Closed(message)));
394        let envelope = Envelope::new(message, kind);
395
396        object
397            .try_send(envelope.upcast())
398            .map_err(|err| err.map(e2m))
399    }
400
401    /// Responds to the requester with the provided response.
402    ///
403    /// The token can be used only once.
404    ///
405    /// ```ignore
406    /// msg!(match envelope {
407    ///     (SomeRequest, token) => {
408    ///         ctx.respond(token, SomeResponse);
409    ///     }
410    /// })
411    /// ```
412    pub fn respond<R: Request>(&self, token: ResponseToken<R>, message: R::Response) {
413        if token.is_forgotten() {
414            return;
415        }
416
417        self.stats.on_sent_message::<R::Wrapper>();
418
419        let sender = token.sender;
420        let message = R::Wrapper::from(message);
421        let kind = MessageKind::Response {
422            sender: self.addr(),
423            request_id: token.request_id,
424        };
425
426        trace!(to = %sender, "> {:?}", message);
427        if let Some(permit) = DUMPER.acquire_m::<R>() {
428            permit.record(Dump::message(message.clone(), &kind, Direction::Out));
429        }
430
431        let envelope = Envelope::new(message, kind).upcast();
432        let object = ward!(self.book.get(token.sender));
433        let actor = ward!(object.as_actor());
434        actor
435            .request_table()
436            .respond(token.into_untyped(), envelope);
437    }
438
439    /// Receives the next envelope.
440    ///
441    /// # Panics
442    /// If the method is called again after returning `None`.
443    ///
444    /// # Examples
445    /// ```ignore
446    /// while let Some(envelope) = ctx.recv().await {
447    ///     msg!(match envelope {
448    ///         SomethingHappened => /* ... */,
449    ///     })
450    /// }
451    /// ```
452    pub async fn recv(&mut self) -> Option<Envelope>
453    where
454        C: 'static,
455        S: Source,
456    {
457        loop {
458            self.stats.on_recv();
459
460            if self.stage == Stage::Closed {
461                on_recv_after_close();
462            }
463
464            // TODO: reset if the mailbox is empty.
465            self.budget.acquire().await;
466
467            // TODO: cache `OwnedEntry`?
468            let object = self.book.get_owned(self.addr)?;
469            let actor = object.as_actor()?;
470
471            if self.stage == Stage::PreRecv {
472                on_first_recv(&mut self.stage, actor);
473            }
474
475            // TODO: reset `trace_id` to `None`?
476
477            let mailbox_fut = actor.recv();
478            pin_mut!(mailbox_fut);
479
480            let source_fut = poll_fn(|cx| self.source.poll_recv(cx));
481            pin_mut!(source_fut);
482
483            tokio::select! {
484                result = mailbox_fut => match result {
485                    RecvResult::Data(envelope) => {
486                        if let Some(envelope) = self.post_recv(envelope) {
487                            return Some(envelope);
488                        }
489                    },
490                    RecvResult::Closed(trace_id) => {
491                        scope::set_trace_id(trace_id);
492                        on_input_closed(&mut self.stage, actor);
493                        return None;
494                    }
495                },
496                option = source_fut => {
497                    // Sources cannot return `None` for now.
498                    let envelope = option.expect("source cannot return None");
499
500                    if let Some(envelope) = self.post_recv(envelope) {
501                        return Some(envelope);
502                    }
503                },
504            }
505        }
506    }
507
508    /// Receives the next envelope without waiting.
509    ///
510    /// # Panics
511    /// If the method is called again after returning
512    /// `Err(TryRecvError::Closed)`.
513    ///
514    /// # Examples
515    /// ```ignore
516    /// // Iterate over all available messages.
517    /// while let Ok(envelope) = ctx.try_recv() {
518    ///     msg!(match envelope {
519    ///         SomethingHappened => /* ... */,
520    ///     })
521    /// }
522    /// ```
523    pub fn try_recv(&mut self) -> Result<Envelope, TryRecvError>
524    where
525        C: 'static,
526    {
527        loop {
528            self.stats.on_recv();
529
530            if self.stage == Stage::Closed {
531                on_recv_after_close();
532            }
533
534            let object = self.book.get(self.addr).ok_or(TryRecvError::Closed)?;
535            let actor = object.as_actor().ok_or(TryRecvError::Closed)?;
536
537            if self.stage == Stage::PreRecv {
538                on_first_recv(&mut self.stage, actor);
539            }
540
541            // TODO: poll the sources.
542            match actor.try_recv() {
543                Some(RecvResult::Data(envelope)) => {
544                    drop(object);
545
546                    if let Some(envelope) = self.post_recv(envelope) {
547                        return Ok(envelope);
548                    }
549                }
550                Some(RecvResult::Closed(trace_id)) => {
551                    scope::set_trace_id(trace_id);
552                    on_input_closed(&mut self.stage, actor);
553                    return Err(TryRecvError::Closed);
554                }
555                None => {
556                    self.stats.on_empty_mailbox();
557                    return Err(TryRecvError::Empty);
558                }
559            }
560        }
561    }
562
563    fn post_recv(&mut self, envelope: Envelope) -> Option<Envelope>
564    where
565        C: 'static,
566    {
567        self.budget.decrement();
568
569        scope::set_trace_id(envelope.trace_id());
570
571        let envelope = msg!(match envelope {
572            (messages::UpdateConfig { config }, token) => {
573                self.config = config.get_user::<C>().clone();
574                info!("config updated");
575                let message = messages::ConfigUpdated {};
576                let kind = MessageKind::Regular { sender: self.addr };
577                let envelope = Envelope::new(message, kind).upcast();
578                self.respond(token, Ok(()));
579                envelope
580            }
581            envelope => envelope,
582        });
583
584        let message = envelope.message();
585        trace!("< {:?}", message);
586
587        if message.dumping_allowed() {
588            if let Some(permit) = DUMPER.acquire() {
589                // TODO: reuse `Dump::message`, it requires `AnyMessage: Message`.
590                let dump = Dump::builder()
591                    .direction(Direction::In)
592                    .message_name(message.name())
593                    .message_protocol(message.protocol())
594                    .message_kind(dumping::MessageKind::from_message_kind(
595                        envelope.message_kind(),
596                    ))
597                    .do_finish(message.erase());
598
599                permit.record(dump);
600            }
601        }
602
603        // We should change the status after dumping the original message
604        // in order to see `ActorStatusReport` after that message.
605        if envelope.is::<messages::Terminate>() {
606            self.set_status(ActorStatus::TERMINATING);
607        }
608
609        self.stats.on_received_envelope(&envelope);
610
611        msg!(match envelope {
612            (messages::Ping, token) => {
613                self.respond(token, ());
614                None
615            }
616            envelope => Some(envelope),
617        })
618    }
619
620    /// This is a part of private API for now.
621    /// We should provide a way to handle it asynchronous.
622    #[doc(hidden)]
623    pub async fn finished(&self, addr: Addr) {
624        ward!(self.book.get_owned(addr)).finished().await;
625    }
626
627    /// Used to get the typed config from `ValidateConfig`.
628    /// ```ignore
629    /// msg!(match envelope {
630    ///     (ValidateConfig { config, .. }, token) => {
631    ///         let new_config = ctx.unpack_config(&config);
632    ///         ctx.respond(token, Err("oops".into()));
633    ///     }
634    /// })
635    /// ```
636    pub fn unpack_config<'c>(&self, config: &'c AnyConfig) -> &'c C
637    where
638        C: for<'de> serde::Deserialize<'de> + 'static,
639    {
640        config.get_user()
641    }
642
643    /// Produces a new context that can be used for sending messages only.
644    ///
645    /// Pruned contexts are likely to be removed in favor of `Output`.
646    pub fn pruned(&self) -> Context {
647        Context {
648            book: self.book.clone(),
649            addr: self.addr,
650            group: self.group,
651            demux: self.demux.clone(),
652            config: Arc::new(()),
653            key: Singleton,
654            source: (),
655            stage: self.stage,
656            stats: Stats::empty(),
657            budget: self.budget.clone(),
658        }
659    }
660
661    pub(crate) fn book(&self) -> &AddressBook {
662        &self.book
663    }
664
665    pub(crate) fn with_config<C1>(self, config: Arc<C1>) -> Context<C1, K, S> {
666        Context {
667            book: self.book,
668            addr: self.addr,
669            group: self.group,
670            demux: self.demux,
671            config,
672            key: self.key,
673            source: self.source,
674            stage: self.stage,
675            stats: self.stats,
676            budget: self.budget,
677        }
678    }
679
680    pub(crate) fn with_addr(mut self, addr: Addr) -> Self {
681        self.addr = addr;
682        self.stats = Stats::startup();
683        self
684    }
685
686    pub(crate) fn with_group(mut self, group: Addr) -> Self {
687        self.group = group;
688        self
689    }
690
691    pub(crate) fn with_key<K1>(self, key: K1) -> Context<C, K1, S> {
692        Context {
693            book: self.book,
694            addr: self.addr,
695            group: self.group,
696            demux: self.demux,
697            config: self.config,
698            key,
699            source: self.source,
700            stage: self.stage,
701            stats: self.stats,
702            budget: self.budget,
703        }
704    }
705}
706
707fn e2m<M: Message>(envelope: Envelope) -> M {
708    envelope.do_downcast::<M>().into_message()
709}
710
711#[cold]
712fn on_first_recv(stage: &mut Stage, actor: &Actor) {
713    if actor.is_initializing() {
714        actor.set_status(ActorStatus::NORMAL);
715    }
716    *stage = Stage::Working;
717}
718
719#[cold]
720fn on_input_closed(stage: &mut Stage, actor: &Actor) {
721    if !actor.is_terminating() {
722        actor.set_status(ActorStatus::TERMINATING);
723    }
724    *stage = Stage::Closed;
725    trace!("input closed");
726}
727
728#[cold]
729fn on_recv_after_close() {
730    error!("calling `recv()` or `try_recv()` after `None` is returned, an infinite loop?");
731    panic!("suicide");
732}
733
734impl Context {
735    pub(crate) fn new(book: AddressBook, demux: Demux) -> Self {
736        Self {
737            book,
738            addr: Addr::NULL,
739            group: Addr::NULL,
740            demux,
741            config: Arc::new(()),
742            key: Singleton,
743            source: (),
744            stage: Stage::PreRecv,
745            stats: Stats::empty(),
746            budget: Budget::default(),
747        }
748    }
749}
750
751// TODO(v0.2): remove this instance.
752impl<C, K: Clone> Clone for Context<C, K> {
753    fn clone(&self) -> Self {
754        Self {
755            book: self.book.clone(),
756            addr: self.addr,
757            group: self.group,
758            demux: self.demux.clone(),
759            config: self.config.clone(),
760            key: self.key.clone(),
761            source: (),
762            stage: self.stage,
763            stats: Stats::empty(),
764            budget: self.budget.clone(),
765        }
766    }
767}
768
769#[must_use]
770pub struct RequestBuilder<'c, C, K, S, R, M> {
771    context: &'c Context<C, K, S>,
772    request: R,
773    to: Option<Addr>,
774    marker: PhantomData<M>,
775}
776
777pub struct Any;
778pub struct All;
779pub(crate) struct Forgotten;
780
781impl<'c, C, K, S, R> RequestBuilder<'c, C, K, S, R, Any> {
782    fn new(context: &'c Context<C, K, S>, request: R) -> Self {
783        Self {
784            context,
785            request,
786            to: None,
787            marker: PhantomData,
788        }
789    }
790
791    #[inline]
792    pub fn all(self) -> RequestBuilder<'c, C, K, S, R, All> {
793        RequestBuilder {
794            context: self.context,
795            request: self.request,
796            to: self.to,
797            marker: PhantomData,
798        }
799    }
800
801    // TODO
802    #[allow(unused)]
803    pub(crate) fn forgotten(self) -> RequestBuilder<'c, C, K, S, R, Forgotten> {
804        RequestBuilder {
805            context: self.context,
806            request: self.request,
807            to: self.to,
808            marker: PhantomData,
809        }
810    }
811}
812
813impl<'c, C, K, S, R, M> RequestBuilder<'c, C, K, S, R, M> {
814    #[deprecated(note = "use `Context::request_to()` instead")]
815    #[doc(hidden)]
816    #[inline]
817    pub fn from(self, addr: Addr) -> Self {
818        self.to(addr)
819    }
820
821    /// Specified the recipient of the request.
822    #[inline]
823    fn to(mut self, addr: Addr) -> Self {
824        self.to = Some(addr);
825        self
826    }
827}
828
829// TODO: add `pub async fn id() { ... }`
830impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, S, K, R, Any> {
831    /// Waits for the response.
832    pub async fn resolve(self) -> Result<R::Response, RequestError<R>> {
833        // TODO: cache `OwnedEntry`?
834        let this = self.context.addr;
835        let object = self.context.book.get_owned(this).expect("invalid addr");
836        let actor = object.as_actor().expect("can be called only on actors");
837        let token = actor
838            .request_table()
839            .new_request(self.context.book.clone(), false);
840        let request_id = token.request_id;
841        let kind = MessageKind::RequestAny(token);
842
843        let res = if let Some(recipient) = self.to {
844            self.context.do_send_to(recipient, self.request, kind).await
845        } else {
846            self.context.do_send(self.request, kind).await
847        };
848
849        if let Err(err) = res {
850            return Err(RequestError::Closed(err.0));
851        }
852
853        let mut data = actor.request_table().wait(request_id).await;
854        if let Some(Some(envelope)) = data.pop() {
855            let envelope = envelope.do_downcast::<R::Wrapper>();
856
857            // TODO: increase a counter.
858            trace!("< {:?}", envelope.message());
859            if let Some(permit) = DUMPER.acquire_m::<R>() {
860                permit.record(Dump::message(
861                    envelope.message().clone(),
862                    envelope.message_kind(),
863                    Direction::In,
864                ));
865            }
866            Ok(envelope.into_message().into())
867        } else {
868            // TODO: should we dump it and increase a counter?
869            Err(RequestError::Ignored)
870        }
871    }
872}
873
874impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, K, S, R, All> {
875    /// Waits for the responses.
876    pub async fn resolve(self) -> Vec<Result<R::Response, RequestError<R>>> {
877        // TODO: cache `OwnedEntry`?
878        let this = self.context.addr;
879        let object = self.context.book.get_owned(this).expect("invalid addr");
880        let actor = object.as_actor().expect("can be called only on actors");
881        let token = actor
882            .request_table()
883            .new_request(self.context.book.clone(), true);
884        let request_id = token.request_id;
885        let kind = MessageKind::RequestAll(token);
886
887        let res = if let Some(recipient) = self.to {
888            self.context.do_send_to(recipient, self.request, kind).await
889        } else {
890            self.context.do_send(self.request, kind).await
891        };
892
893        if let Err(err) = res {
894            return vec![Err(RequestError::Closed(err.0))];
895        }
896
897        actor
898            .request_table()
899            .wait(request_id)
900            .await
901            .into_iter()
902            .map(|opt| match opt {
903                Some(envelope) => Ok(envelope.do_downcast::<R::Wrapper>()),
904                None => Err(RequestError::Ignored),
905            })
906            .map(|res| {
907                let envelope = res?;
908
909                // TODO: increase a counter.
910                trace!("< {:?}", envelope.message());
911
912                // TODO: `acquire_many` or even unconditionally?
913                if let Some(permit) = DUMPER.acquire_m::<R>() {
914                    permit.record(Dump::message(
915                        envelope.message().clone(),
916                        envelope.message_kind(),
917                        Direction::In,
918                    ));
919                }
920                Ok(envelope.into_message().into())
921            })
922            .collect()
923    }
924}
925
926impl<'c, C: 'static, K, S, R: Request> RequestBuilder<'c, C, S, K, R, Forgotten> {
927    pub async fn resolve(self) -> Result<R::Response, RequestError<R>> {
928        let token = ResponseToken::forgotten(self.context.book.clone());
929        let kind = MessageKind::RequestAny(token);
930
931        let res = if let Some(recipient) = self.to {
932            self.context.do_send_to(recipient, self.request, kind).await
933        } else {
934            self.context.do_send(self.request, kind).await
935        };
936
937        if let Err(err) = res {
938            return Err(RequestError::Closed(err.0));
939        }
940
941        Err(RequestError::Ignored)
942    }
943}