1use core::future::Future;
2use core::marker::PhantomData;
3
4use actix::dev::{MessageResponse, ToEnvelope};
5use actix::fut::wrap_stream;
6use actix::{
7 Actor, ActorFutureExt, ActorStreamExt, Addr, AsyncContext, Handler, MailboxError, Message,
8 WrapFuture,
9};
10use futures_util::stream::repeat;
11use futures_util::{FutureExt, Stream, StreamExt, TryFutureExt};
12use tokio::sync::oneshot;
13
14pub trait AddrExt<A: Actor> {
15 fn send_stream<S, M, F>(
16 &self,
17 stream: S,
18 handler: Option<F>,
19 ) -> impl Future<Output = Result<(), MailboxError>> + Send
20 where
21 A: Handler<StreamMessage<S, M, F>>,
22 A::Context: ToEnvelope<A, StreamMessage<S, M, F>>,
23 S: Send + 'static,
24 M: Message + Send + 'static,
25 F: Fn(M::Result) + Send + Clone + 'static;
26}
27
28#[derive(Debug, Message)]
29#[rtype("()")]
30pub struct StreamMessage<S, M, F>
31where
32 M: Message,
33 F: Fn(M::Result) + Send + Clone,
34{
35 stream: S,
36 handler: Option<F>,
37 _marker: PhantomData<M>,
38}
39
40impl<A, S, M, F> MessageResponse<A, Self> for StreamMessage<S, M, F>
41where
42 A: Actor + Handler<M>,
43 A::Context: AsyncContext<A>,
44 S: Stream<Item = M> + 'static,
45 M: Message,
46 F: Fn(M::Result) + Send + Clone + 'static,
47{
48 fn handle(self, ctx: &mut A::Context, tx: Option<oneshot::Sender<()>>) {
49 let stream = self.stream.zip(repeat(self.handler));
50
51 let fut = wrap_stream::<_, A>(stream).map(|(item, handler), act, ctx| {
52 let tx = handler.map(|handler| {
53 let (tx, rx) = oneshot::channel();
54
55 let _ignored = ctx.spawn(rx.map_ok(handler).map(|_| ()).into_actor(act));
56
57 tx
58 });
59
60 act.handle(item, ctx).handle(ctx, tx);
61 });
62
63 let _ignored = ctx.spawn(fut.finish().map(|(), _, _| {
64 if let Some(tx) = tx {
65 let _ignored = tx.send(());
66 }
67 }));
68 }
69}
70
71#[macro_export]
72macro_rules! impl_stream_sender {
73 ($($ty:path),*) => {
74 const _: () = {
75 use $crate::macros::__private::{Handler, Stream, Message};
76 use $crate::adapters::StreamMessage;
77
78 $(
79 impl<S, M, F> Handler<StreamMessage<S, M, F>> for $ty
80 where
81 S: Stream<Item = M> + 'static,
82 M: Message,
83 F: Fn(M::Result) + Send + Clone + 'static,
84 Self: Handler<M>,
85 {
86 type Result = StreamMessage<S, M, F>;
87
88 fn handle(&mut self, msg: StreamMessage<S, M, F>, _ctx: &mut Self::Context) -> Self::Result {
89 msg
90 }
91 }
92 )*
93 };
94 };
95}
96
97impl<A: Actor> AddrExt<A> for Addr<A> {
98 async fn send_stream<S, M, F>(&self, stream: S, handler: Option<F>) -> Result<(), MailboxError>
99 where
100 A: Handler<StreamMessage<S, M, F>>,
101 A::Context: ToEnvelope<A, StreamMessage<S, M, F>>,
102 S: Send + 'static,
103 M: Message + Send + 'static,
104 F: Fn(M::Result) + Send + Clone + 'static,
105 {
106 self.send(StreamMessage {
107 stream,
108 handler,
109 _marker: PhantomData,
110 })
111 .await
112 }
113}
114
115pub trait ActorExt: Actor {
116 fn forward_handler<M>(
117 &mut self,
118 ctx: &mut Self::Context,
119 msg: M,
120 receiver: oneshot::Sender<M::Result>,
121 ) where
122 Self: Handler<M>,
123 M: Message;
124}
125
126impl<A: Actor> ActorExt for A {
127 fn forward_handler<M>(
128 &mut self,
129 ctx: &mut Self::Context,
130 msg: M,
131 receiver: oneshot::Sender<M::Result>,
132 ) where
133 Self: Handler<M>,
134 M: Message,
135 {
136 self.handle(msg, ctx).handle(ctx, Some(receiver));
137 }
138}