near_async/
actix_wrapper.rs1use std::ops::{Deref, DerefMut};
2
3use actix::Actor;
4use near_o11y::{WithSpanContext, handler_debug_span};
5
6use crate::futures::DelayedActionRunner;
7use crate::messaging;
8
9pub struct ActixWrapper<T> {
13 actor: T,
14}
15
16impl<T> ActixWrapper<T> {
17 pub fn new(actor: T) -> Self {
18 Self { actor }
19 }
20}
21
22impl<T> actix::Actor for ActixWrapper<T>
23where
24 T: messaging::Actor + Unpin + 'static,
25{
26 type Context = actix::Context<Self>;
27
28 fn started(&mut self, ctx: &mut Self::Context) {
29 self.actor.start_actor(ctx);
30 }
31}
32
33impl<T> Deref for ActixWrapper<T> {
36 type Target = T;
37 fn deref(&self) -> &Self::Target {
38 &self.actor
39 }
40}
41
42impl<T> DerefMut for ActixWrapper<T> {
43 fn deref_mut(&mut self) -> &mut Self::Target {
44 &mut self.actor
45 }
46}
47
48impl<M, T> actix::Handler<WithSpanContext<M>> for ActixWrapper<T>
49where
50 Self: actix::Actor,
51 Self::Context: DelayedActionRunner<T>,
52 T: messaging::HandlerWithContext<M>,
53 M: actix::Message,
54 <M as actix::Message>::Result:
55 actix::dev::MessageResponse<ActixWrapper<T>, WithSpanContext<M>> + Send,
56{
57 type Result = M::Result;
58 fn handle(&mut self, msg: WithSpanContext<M>, ctx: &mut Self::Context) -> Self::Result {
59 let (_span, msg) = handler_debug_span!(target: "actix_message_handler", msg);
60 self.actor.handle(msg, ctx)
61 }
62}
63
64pub struct SyncActixWrapper<T> {
65 actor: T,
66}
67
68impl<T> SyncActixWrapper<T> {
69 pub fn new(actor: T) -> Self {
70 Self { actor }
71 }
72}
73
74impl<T> actix::Actor for SyncActixWrapper<T>
75where
76 T: Unpin + 'static,
77{
78 type Context = actix::SyncContext<Self>;
79}
80
81impl<M, T> actix::Handler<WithSpanContext<M>> for SyncActixWrapper<T>
82where
83 Self: actix::Actor,
84 T: messaging::Handler<M>,
85 M: actix::Message,
86 <M as actix::Message>::Result:
87 actix::dev::MessageResponse<SyncActixWrapper<T>, WithSpanContext<M>> + Send,
88{
89 type Result = M::Result;
90 fn handle(&mut self, msg: WithSpanContext<M>, _ctx: &mut Self::Context) -> Self::Result {
91 let (_span, msg) = handler_debug_span!(target: "actix_message_handler", msg);
92 self.actor.handle(msg)
93 }
94}
95
96pub fn spawn_actix_actor<T>(actor: T) -> (actix::Addr<ActixWrapper<T>>, actix::ArbiterHandle)
99where
100 T: messaging::Actor + Unpin + Send + 'static,
101{
102 let actix_wrapper = ActixWrapper::new(actor);
103 let arbiter = actix::Arbiter::new().handle();
104 let addr = ActixWrapper::<T>::start_in_arbiter(&arbiter, |_| actix_wrapper);
105 (addr, arbiter)
106}