1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::future::BoxFuture;
use futures_util::FutureExt;
use crate::chan::ActorMessage;
use crate::envelope::Shutdown;
use crate::instrumentation::Span;
use crate::mailbox::Mailbox;
use crate::Message;
impl<A> Message<A> {
/// Dispatches this message to the given actor.
pub fn dispatch_to(self, actor: &mut A) -> DispatchFuture<'_, A> {
DispatchFuture::new(
self.inner,
actor,
Mailbox::from_parts(self.channel, self.broadcast_mailbox),
)
}
}
/// Represents the dispatch of a message to an actor.
///
/// This future is **not** cancellation-safe. Dropping it will interrupt the execution of
/// [`Handler::handle`](crate::Handler::handle) which may leave the actor in an inconsistent state.
#[must_use = "Futures do nothing unless polled"]
pub struct DispatchFuture<'a, A> {
state: State<'a, A>,
span: Span,
}
impl<'a, A> DispatchFuture<'a, A> {
/// Returns a [`Span`] that covers the entire dispatching and handling of the message to the actor.
///
/// This can be used to log messages into the span when required, such as if it is cancelled later due to a timeout.
///
/// In case this future has not yet been polled, a new span will be created which is why this function takes `&mut self`.
///
/// ```rust
/// # use std::ops::ControlFlow;
/// # use std::time::Duration;
/// # use tokio::time::timeout;
/// # use xtra::prelude::*;
/// #
/// # struct MyActor;
/// # impl Actor for MyActor { type Stop = (); async fn stopped(self) {} }
/// #
/// # let mut actor = MyActor;
/// # let (addr, mut mailbox) = Mailbox::unbounded();
/// # drop(addr);
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// # actor.started(&mut mailbox).await;
/// #
/// # loop {
/// # let msg = mailbox.next().await;
/// let mut fut = msg.dispatch_to(&mut actor);
/// let span = fut.span().clone();
/// match timeout(Duration::from_secs(1), fut).await {
/// Ok(ControlFlow::Continue(())) => (),
/// Ok(ControlFlow::Break(())) => break actor.stopped().await,
/// Err(_elapsed) => {
/// let _entered = span.enter();
/// span.record("interrupted", &"timed_out");
/// tracing::warn!(timeout_seconds = 1, "Handler execution timed out");
/// }
/// }
/// # } })
/// ```
///
#[cfg(feature = "instrumentation")]
pub fn span(&mut self) -> &Span {
let span = mem::replace(&mut self.span, Span::none());
*self = match mem::replace(&mut self.state, State::Done) {
State::New { msg, act, mailbox } => DispatchFuture::running(msg, act, mailbox),
state => DispatchFuture { state, span },
};
&self.span
}
fn running(msg: ActorMessage<A>, act: &'a mut A, mailbox: Mailbox<A>) -> DispatchFuture<'a, A> {
let (fut, span) = match msg {
ActorMessage::ToOneActor(msg) => msg.handle(act, mailbox),
ActorMessage::ToAllActors(msg) => msg.handle(act, mailbox),
ActorMessage::Shutdown => Shutdown::<A>::handle(),
};
DispatchFuture {
state: State::Running {
fut,
phantom: PhantomData,
},
span,
}
}
}
enum State<'a, A> {
New {
msg: ActorMessage<A>,
act: &'a mut A,
mailbox: Mailbox<A>,
},
Running {
fut: BoxFuture<'a, ControlFlow<()>>,
phantom: PhantomData<fn(&'a A)>,
},
Done,
}
impl<'a, A> DispatchFuture<'a, A> {
pub fn new(msg: ActorMessage<A>, act: &'a mut A, mailbox: Mailbox<A>) -> Self {
DispatchFuture {
state: State::New { msg, act, mailbox },
span: Span::none(),
}
}
}
impl<'a, A> Future for DispatchFuture<'a, A> {
type Output = ControlFlow<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match mem::replace(&mut self.state, State::Done) {
State::New { msg, act, mailbox } => {
*self = DispatchFuture::running(msg, act, mailbox);
self.poll(cx)
}
State::Running { mut fut, phantom } => {
match self.span.in_scope(|| fut.poll_unpin(cx)) {
Poll::Ready(flow) => Poll::Ready(flow),
Poll::Pending => {
self.state = State::Running { fut, phantom };
Poll::Pending
}
}
}
State::Done => panic!("Polled after completion"),
}
}
}