near_async/
actix.rs

1use crate::messaging::{AsyncSendError, CanSend, MessageWithCallback};
2use futures::FutureExt;
3use near_o11y::{WithSpanContext, WithSpanContextExt};
4
5/// An actix Addr implements CanSend for any message type that the actor handles.
6impl<M, A> CanSend<M> for actix::Addr<A>
7where
8    M: actix::Message + Send + 'static,
9    M::Result: Send,
10    A: actix::Actor + actix::Handler<M>,
11    A::Context: actix::dev::ToEnvelope<A, M>,
12{
13    fn send(&self, message: M) {
14        match self.try_send(message) {
15            Ok(_) => {}
16            Err(err) => match err {
17                actix::dev::SendError::Full(message) => {
18                    self.do_send(message);
19                }
20                actix::dev::SendError::Closed(_) => {
21                    near_o11y::tracing::warn!(
22                        "Tried to send {} message to closed actor",
23                        std::any::type_name::<M>()
24                    );
25                }
26            },
27        }
28    }
29}
30
31pub type ActixResult<T> = <T as actix::Message>::Result;
32
33impl<M, A> CanSend<MessageWithCallback<M, M::Result>> for actix::Addr<A>
34where
35    M: actix::Message + Send + 'static,
36    M::Result: Send,
37    A: actix::Actor + actix::Handler<M>,
38    A::Context: actix::dev::ToEnvelope<A, M>,
39{
40    fn send(&self, message: MessageWithCallback<M, M::Result>) {
41        let MessageWithCallback { message, callback: responder } = message;
42        let future = self.send(message);
43
44        let transformed_future = async move {
45            match future.await {
46                Ok(result) => Ok(result),
47                Err(actix::MailboxError::Closed) => Err(AsyncSendError::Closed),
48                Err(actix::MailboxError::Timeout) => Err(AsyncSendError::Timeout),
49            }
50        };
51        responder(transformed_future.boxed());
52    }
53}
54
55/// Allows an actix Addr<WithSpanContext<T>> to act as if it were an Addr<T>,
56/// by automatically wrapping any message sent with .with_span_context().
57///
58/// This way, the sender side does not need to be concerned about attaching span contexts, e.g.
59///
60///   impl SomeOtherComponent {
61///     pub fn new(sender: Sender<Message>) -> Self {...}
62///   }
63///
64///   impl actix::Handler<WithSpanContext<Message>> for SomeActor {...}
65///
66///   let addr = SomeActor::spawn(...);
67///   let other = SomeOtherComponent::new(
68///       addr.with_auto_span_context().into_sender()  // or .clone() on the addr if needed
69///   );
70pub struct AddrWithAutoSpanContext<T: actix::Actor> {
71    inner: actix::Addr<T>,
72}
73
74impl<T: actix::Actor> Clone for AddrWithAutoSpanContext<T> {
75    fn clone(&self) -> Self {
76        Self { inner: self.inner.clone() }
77    }
78}
79
80/// Extension function to convert an Addr<WithSpanContext<T>> to an AddrWithAutoSpanContext<T>.
81pub trait AddrWithAutoSpanContextExt<T: actix::Actor> {
82    fn with_auto_span_context(self) -> AddrWithAutoSpanContext<T>;
83}
84
85impl<T: actix::Actor> AddrWithAutoSpanContextExt<T> for actix::Addr<T> {
86    fn with_auto_span_context(self) -> AddrWithAutoSpanContext<T> {
87        AddrWithAutoSpanContext { inner: self }
88    }
89}
90
91impl<M, S> CanSend<M> for AddrWithAutoSpanContext<S>
92where
93    M: actix::Message + 'static,
94    S: actix::Actor,
95    actix::Addr<S>: CanSend<WithSpanContext<M>>,
96{
97    fn send(&self, message: M) {
98        CanSend::send(&self.inner, message.with_span_context());
99    }
100}
101
102impl<M, S> CanSend<MessageWithCallback<M, M::Result>> for AddrWithAutoSpanContext<S>
103where
104    M: actix::Message + Send + 'static,
105    M::Result: Send,
106    S: actix::Actor,
107    actix::Addr<S>: CanSend<MessageWithCallback<WithSpanContext<M>, M::Result>>,
108{
109    fn send(&self, message: MessageWithCallback<M, M::Result>) {
110        let MessageWithCallback { message, callback: responder } = message;
111        CanSend::send(
112            &self.inner,
113            MessageWithCallback { message: message.with_span_context(), callback: responder },
114        );
115    }
116}