1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use crate::stream::{CloneableStreamable, StreamMessage};
use actix::prelude::*;
use std::future::Future;
use std::marker::PhantomData;
/// Actor responsible for the `eval_tap` stream operation.
///
/// For each element in the stream, this actor executes a provided side-effecting future
/// (`effect_fn`). After the future completes, the original element is forwarded
/// to the downstream recipient. The processing is sequential: the effect for an element
/// must complete before that element is passed downstream, and before the effect for
/// the next element begins.
pub(crate) struct EvalTapActor<In, Fut, F>
where
In: CloneableStreamable,
Fut: Future<Output = ()> + 'static + Unpin, // Not necessarily Send for boxed_local futures, Unpin required for Actor
F: FnMut(In) -> Fut + Clone + Unpin + 'static, // Cloned for setup, Unpin for potential across awaits
{
downstream_recipient: Recipient<StreamMessage<In>>,
effect_fn: F,
_phantom_in: PhantomData<In>,
_phantom_fut: PhantomData<Fut>,
}
impl<In, Fut, F> EvalTapActor<In, Fut, F>
where
In: CloneableStreamable,
Fut: Future<Output = ()> + 'static + Unpin,
F: FnMut(In) -> Fut + Clone + Unpin + 'static,
{
pub fn new(downstream_recipient: Recipient<StreamMessage<In>>, effect_fn: F) -> Self {
Self {
downstream_recipient,
effect_fn,
_phantom_in: PhantomData,
_phantom_fut: PhantomData,
}
}
}
impl<In, Fut, F> Actor for EvalTapActor<In, Fut, F>
where
In: CloneableStreamable + 'static, // 'static for actor messages
Fut: Future<Output = ()> + 'static + Unpin,
F: FnMut(In) -> Fut + Clone + Unpin + 'static,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Context<Self>) {
// log::trace!("[EvalTapActor] Started");
}
fn stopping(&mut self, _ctx: &mut Context<Self>) -> Running {
// log::trace!("[EvalTapActor] Stopping");
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
// log::trace!("[EvalTapActor] Stopped");
}
}
impl<In, Fut, F> Handler<StreamMessage<In>> for EvalTapActor<In, Fut, F>
where
In: CloneableStreamable + 'static,
Fut: Future<Output = ()> + 'static + Unpin,
F: FnMut(In) -> Fut + Clone + Unpin + 'static,
{
type Result = ResponseActFuture<Self, ()>;
fn handle(&mut self, msg: StreamMessage<In>, ctx: &mut Context<Self>) -> Self::Result {
match msg {
StreamMessage::Element(item) => {
// Call FnMut: self.effect_fn is called before the async block.
// The resulting future is then awaited within the async block.
let effect_future = (self.effect_fn)(item.clone()); // Item is cloned for the effect function
let recipient = self.downstream_recipient.clone();
// The original 'item' (from the msg) is moved into the main async block
// to be sent after the effect_future completes.
async move {
effect_future.await; // Await the side-effecting future
// Try to send the original item downstream.
// If downstream is closed, try_send will return an error.
if recipient.try_send(StreamMessage::Element(item)).is_err() {
// log::debug!("[EvalTapActor] Downstream send failed.");
return Err(()); // Indicate failure to the .map block below
}
Ok(())
}
.into_actor(self) // Attaches the future to this actor's lifecycle and context
.map(|result, _actor, actor_ctx| {
// This closure runs after the above async block completes.
if result.is_err() {
// log::debug!("[EvalTapActor] Downstream recipient closed or send failed. Stopping actor.");
actor_ctx.stop(); // Stop this EvalTapActor if sending failed
}
})
.boxed_local() // Required for ResponseActFuture as the future is not Send
}
StreamMessage::End => {
// log::trace!("[EvalTapActor] Received End. Propagating and stopping.");
self.downstream_recipient.do_send(StreamMessage::End);
ctx.stop();
// Return an empty future, required for ResponseActFuture
async {}.into_actor(self).boxed_local()
}
}
}
}