near_async/
actix_wrapper.rs

1use std::ops::{Deref, DerefMut};
2
3use actix::Actor;
4use near_o11y::{WithSpanContext, handler_debug_span};
5
6use crate::futures::DelayedActionRunner;
7use crate::messaging;
8
9/// Wrapper on top of a generic actor to make it implement actix::Actor trait. The wrapped actor
10/// should implement the Handler trait for all the messages it would like to handle.
11/// ActixWrapper is then used to create an actix actor that implements the CanSend trait.
12pub struct ActixWrapper<T> {
13    actor: T,
14}
15
16impl<T> ActixWrapper<T> {
17    pub fn new(actor: T) -> Self {
18        Self { actor }
19    }
20}
21
22impl<T> actix::Actor for ActixWrapper<T>
23where
24    T: messaging::Actor + Unpin + 'static,
25{
26    type Context = actix::Context<Self>;
27
28    fn started(&mut self, ctx: &mut Self::Context) {
29        self.actor.start_actor(ctx);
30    }
31}
32
33// Implementing Deref and DerefMut for the wrapped actor to allow access to the inner struct
34// This is required for implementing DelayedActionRunner<T> for actix::Context<Outer>
35impl<T> Deref for ActixWrapper<T> {
36    type Target = T;
37    fn deref(&self) -> &Self::Target {
38        &self.actor
39    }
40}
41
42impl<T> DerefMut for ActixWrapper<T> {
43    fn deref_mut(&mut self) -> &mut Self::Target {
44        &mut self.actor
45    }
46}
47
48impl<M, T> actix::Handler<WithSpanContext<M>> for ActixWrapper<T>
49where
50    Self: actix::Actor,
51    Self::Context: DelayedActionRunner<T>,
52    T: messaging::HandlerWithContext<M>,
53    M: actix::Message,
54    <M as actix::Message>::Result:
55        actix::dev::MessageResponse<ActixWrapper<T>, WithSpanContext<M>> + Send,
56{
57    type Result = M::Result;
58    fn handle(&mut self, msg: WithSpanContext<M>, ctx: &mut Self::Context) -> Self::Result {
59        let (_span, msg) = handler_debug_span!(target: "actix_message_handler", msg);
60        self.actor.handle(msg, ctx)
61    }
62}
63
64pub struct SyncActixWrapper<T> {
65    actor: T,
66}
67
68impl<T> SyncActixWrapper<T> {
69    pub fn new(actor: T) -> Self {
70        Self { actor }
71    }
72}
73
74impl<T> actix::Actor for SyncActixWrapper<T>
75where
76    T: Unpin + 'static,
77{
78    type Context = actix::SyncContext<Self>;
79}
80
81impl<M, T> actix::Handler<WithSpanContext<M>> for SyncActixWrapper<T>
82where
83    Self: actix::Actor,
84    T: messaging::Handler<M>,
85    M: actix::Message,
86    <M as actix::Message>::Result:
87        actix::dev::MessageResponse<SyncActixWrapper<T>, WithSpanContext<M>> + Send,
88{
89    type Result = M::Result;
90    fn handle(&mut self, msg: WithSpanContext<M>, _ctx: &mut Self::Context) -> Self::Result {
91        let (_span, msg) = handler_debug_span!(target: "actix_message_handler", msg);
92        self.actor.handle(msg)
93    }
94}
95
96/// Spawns an actix actor with the given actor. Returns the address of the actor and the arbiter
97/// Note that the actor should implement the Handler trait for all the messages it would like to handle.
98pub fn spawn_actix_actor<T>(actor: T) -> (actix::Addr<ActixWrapper<T>>, actix::ArbiterHandle)
99where
100    T: messaging::Actor + Unpin + Send + 'static,
101{
102    let actix_wrapper = ActixWrapper::new(actor);
103    let arbiter = actix::Arbiter::new().handle();
104    let addr = ActixWrapper::<T>::start_in_arbiter(&arbiter, |_| actix_wrapper);
105    (addr, arbiter)
106}