Skip to main content

calimero_utils_actix/
adapters.rs

1use core::future::Future;
2use core::marker::PhantomData;
3
4use actix::dev::{MessageResponse, ToEnvelope};
5use actix::fut::wrap_stream;
6use actix::{
7    Actor, ActorFutureExt, ActorStreamExt, Addr, AsyncContext, Handler, MailboxError, Message,
8    WrapFuture,
9};
10use futures_util::stream::repeat;
11use futures_util::{FutureExt, Stream, StreamExt, TryFutureExt};
12use tokio::sync::oneshot;
13
14pub trait AddrExt<A: Actor> {
15    fn send_stream<S, M, F>(
16        &self,
17        stream: S,
18        handler: Option<F>,
19    ) -> impl Future<Output = Result<(), MailboxError>> + Send
20    where
21        A: Handler<StreamMessage<S, M, F>>,
22        A::Context: ToEnvelope<A, StreamMessage<S, M, F>>,
23        S: Send + 'static,
24        M: Message + Send + 'static,
25        F: Fn(M::Result) + Send + Clone + 'static;
26}
27
28#[derive(Debug, Message)]
29#[rtype("()")]
30pub struct StreamMessage<S, M, F>
31where
32    M: Message,
33    F: Fn(M::Result) + Send + Clone,
34{
35    stream: S,
36    handler: Option<F>,
37    _marker: PhantomData<M>,
38}
39
40impl<A, S, M, F> MessageResponse<A, Self> for StreamMessage<S, M, F>
41where
42    A: Actor + Handler<M>,
43    A::Context: AsyncContext<A>,
44    S: Stream<Item = M> + 'static,
45    M: Message,
46    F: Fn(M::Result) + Send + Clone + 'static,
47{
48    fn handle(self, ctx: &mut A::Context, tx: Option<oneshot::Sender<()>>) {
49        let stream = self.stream.zip(repeat(self.handler));
50
51        let fut = wrap_stream::<_, A>(stream).map(|(item, handler), act, ctx| {
52            let tx = handler.map(|handler| {
53                let (tx, rx) = oneshot::channel();
54
55                let _ignored = ctx.spawn(rx.map_ok(handler).map(|_| ()).into_actor(act));
56
57                tx
58            });
59
60            act.handle(item, ctx).handle(ctx, tx);
61        });
62
63        let _ignored = ctx.spawn(fut.finish().map(|(), _, _| {
64            if let Some(tx) = tx {
65                let _ignored = tx.send(());
66            }
67        }));
68    }
69}
70
71#[macro_export]
72macro_rules! impl_stream_sender {
73    ($($ty:path),*) => {
74        const _: () = {
75            use $crate::macros::__private::{Handler, Stream, Message};
76            use $crate::adapters::StreamMessage;
77
78            $(
79                impl<S, M, F> Handler<StreamMessage<S, M, F>> for $ty
80                where
81                    S: Stream<Item = M> + 'static,
82                    M: Message,
83                    F: Fn(M::Result) + Send + Clone + 'static,
84                    Self: Handler<M>,
85                {
86                    type Result = StreamMessage<S, M, F>;
87
88                    fn handle(&mut self, msg: StreamMessage<S, M, F>, _ctx: &mut Self::Context) -> Self::Result {
89                        msg
90                    }
91                }
92            )*
93        };
94    };
95}
96
97impl<A: Actor> AddrExt<A> for Addr<A> {
98    async fn send_stream<S, M, F>(&self, stream: S, handler: Option<F>) -> Result<(), MailboxError>
99    where
100        A: Handler<StreamMessage<S, M, F>>,
101        A::Context: ToEnvelope<A, StreamMessage<S, M, F>>,
102        S: Send + 'static,
103        M: Message + Send + 'static,
104        F: Fn(M::Result) + Send + Clone + 'static,
105    {
106        self.send(StreamMessage {
107            stream,
108            handler,
109            _marker: PhantomData,
110        })
111        .await
112    }
113}
114
115pub trait ActorExt: Actor {
116    fn forward_handler<M>(
117        &mut self,
118        ctx: &mut Self::Context,
119        msg: M,
120        receiver: oneshot::Sender<M::Result>,
121    ) where
122        Self: Handler<M>,
123        M: Message;
124}
125
126impl<A: Actor> ActorExt for A {
127    fn forward_handler<M>(
128        &mut self,
129        ctx: &mut Self::Context,
130        msg: M,
131        receiver: oneshot::Sender<M::Result>,
132    ) where
133        Self: Handler<M>,
134        M: Message,
135    {
136        self.handle(msg, ctx).handle(ctx, Some(receiver));
137    }
138}