1use crate::messaging::{AsyncSendError, CanSend, MessageWithCallback};
2use futures::FutureExt;
3use near_o11y::{WithSpanContext, WithSpanContextExt};
4
5impl<M, A> CanSend<M> for actix::Addr<A>
7where
8 M: actix::Message + Send + 'static,
9 M::Result: Send,
10 A: actix::Actor + actix::Handler<M>,
11 A::Context: actix::dev::ToEnvelope<A, M>,
12{
13 fn send(&self, message: M) {
14 match self.try_send(message) {
15 Ok(_) => {}
16 Err(err) => match err {
17 actix::dev::SendError::Full(message) => {
18 self.do_send(message);
19 }
20 actix::dev::SendError::Closed(_) => {
21 near_o11y::tracing::warn!(
22 "Tried to send {} message to closed actor",
23 std::any::type_name::<M>()
24 );
25 }
26 },
27 }
28 }
29}
30
31pub type ActixResult<T> = <T as actix::Message>::Result;
32
33impl<M, A> CanSend<MessageWithCallback<M, M::Result>> for actix::Addr<A>
34where
35 M: actix::Message + Send + 'static,
36 M::Result: Send,
37 A: actix::Actor + actix::Handler<M>,
38 A::Context: actix::dev::ToEnvelope<A, M>,
39{
40 fn send(&self, message: MessageWithCallback<M, M::Result>) {
41 let MessageWithCallback { message, callback: responder } = message;
42 let future = self.send(message);
43
44 let transformed_future = async move {
45 match future.await {
46 Ok(result) => Ok(result),
47 Err(actix::MailboxError::Closed) => Err(AsyncSendError::Closed),
48 Err(actix::MailboxError::Timeout) => Err(AsyncSendError::Timeout),
49 }
50 };
51 responder(transformed_future.boxed());
52 }
53}
54
55pub struct AddrWithAutoSpanContext<T: actix::Actor> {
71 inner: actix::Addr<T>,
72}
73
74impl<T: actix::Actor> Clone for AddrWithAutoSpanContext<T> {
75 fn clone(&self) -> Self {
76 Self { inner: self.inner.clone() }
77 }
78}
79
80pub trait AddrWithAutoSpanContextExt<T: actix::Actor> {
82 fn with_auto_span_context(self) -> AddrWithAutoSpanContext<T>;
83}
84
85impl<T: actix::Actor> AddrWithAutoSpanContextExt<T> for actix::Addr<T> {
86 fn with_auto_span_context(self) -> AddrWithAutoSpanContext<T> {
87 AddrWithAutoSpanContext { inner: self }
88 }
89}
90
91impl<M, S> CanSend<M> for AddrWithAutoSpanContext<S>
92where
93 M: actix::Message + 'static,
94 S: actix::Actor,
95 actix::Addr<S>: CanSend<WithSpanContext<M>>,
96{
97 fn send(&self, message: M) {
98 CanSend::send(&self.inner, message.with_span_context());
99 }
100}
101
102impl<M, S> CanSend<MessageWithCallback<M, M::Result>> for AddrWithAutoSpanContext<S>
103where
104 M: actix::Message + Send + 'static,
105 M::Result: Send,
106 S: actix::Actor,
107 actix::Addr<S>: CanSend<MessageWithCallback<WithSpanContext<M>, M::Result>>,
108{
109 fn send(&self, message: MessageWithCallback<M, M::Result>) {
110 let MessageWithCallback { message, callback: responder } = message;
111 CanSend::send(
112 &self.inner,
113 MessageWithCallback { message: message.with_span_context(), callback: responder },
114 );
115 }
116}