1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use crate::stream::{CloneableStreamable, StreamMessage};
use actix::prelude::*;
use actix::Arbiter; // Added for Arbiter::spawn
use std::future::Future;
/// Actor responsible for the `on_finalize` stream operation.
///
/// It forwards all elements from an upstream source to a downstream recipient.
/// When the upstream stream completes (either successfully with `End` or with an `Error`),
/// or when this actor itself is stopped for any reason (e.g., downstream subscriber disappears,
/// or an upstream error causes a stop), it executes a provided side-effecting future (`effect_fn`).
///
/// The effect is guaranteed to be triggered at most once.
/// After triggering the effect, the original terminal message (if any from upstream)
/// is propagated to the downstream recipient.
pub(crate) struct OnFinalizeActor<O, Fut, F>
where
O: CloneableStreamable,
Fut: Future<Output = ()> + Send + 'static,
F: FnOnce() -> Fut + Send + 'static + Unpin,
{
downstream_recipient: Recipient<StreamMessage<O>>,
effect_fn: Option<F>,
effect_triggered: bool,
/// True if a terminal message (`End`) has been received from upstream.
/// This helps prevent processing further messages after termination and in `stopping` logic.
upstream_has_terminated: bool,
}
impl<O, Fut, F> OnFinalizeActor<O, Fut, F>
where
O: CloneableStreamable,
Fut: Future<Output = ()> + Send + 'static,
F: FnOnce() -> Fut + Send + 'static + Unpin,
{
pub fn new(downstream_recipient: Recipient<StreamMessage<O>>, effect_fn: F) -> Self {
Self {
downstream_recipient,
effect_fn: Some(effect_fn),
effect_triggered: false,
upstream_has_terminated: false,
}
}
/// Triggers the effect if it hasn't been triggered yet.
/// The effect future is spawned onto the current Actix Arbiter.
/// This means the actor does not wait for the effect to complete, and the effect's
/// execution is not tied to this actor's context lifecycle after spawning.
fn trigger_effect_if_needed(&mut self, _ctx: &mut Context<Self>) { // ctx no longer strictly needed here
if !self.effect_triggered {
if let Some(f) = self.effect_fn.take() {
let effect_future = f();
// Spawn the future on the current arbiter's thread pool.
// It runs independently of this actor's lifecycle once spawned.
Arbiter::current().spawn(effect_future);
// log::trace!("[OnFinalizeActor] Effect triggered via Arbiter::spawn.");
}
self.effect_triggered = true;
}
}
/// Sends a terminal message to the downstream recipient and stops the actor.
/// This should be called after ensuring the effect is triggered (usually via `stopping`).
fn send_terminal_to_downstream_and_stop(&mut self, msg: StreamMessage<O>, ctx: &mut Context<Self>) {
self.downstream_recipient.do_send(msg.clone());
// log::trace!("[OnFinalizeActor] Propagated terminal message: {:?}, stopping actor.", msg);
ctx.stop();
}
}
impl<O, Fut, F> Actor for OnFinalizeActor<O, Fut, F>
where
O: CloneableStreamable,
Fut: Future<Output = ()> + Send + 'static,
F: FnOnce() -> Fut + Send + 'static + Unpin,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Context<Self>) {
// log::trace!("[OnFinalizeActor] Started.");
// This actor is now passive; its recipient for StreamMessage<O>
// will be provided to the upstream stream's setup_fn.
}
fn stopping(&mut self, ctx: &mut Context<Self>) -> Running {
// log::trace!("[OnFinalizeActor] Stopping. Ensuring effect is triggered.");
// This is a crucial point: `stopping` is called when the actor is about to stop for ANY reason,
// including the downstream recipient disappearing, an explicit ctx.stop(), or supervisor stopping it.
// This ensures the effect is run even if the stream is prematurely terminated or an error occurs.
self.trigger_effect_if_needed(ctx); // Pass ctx, though it's marked unused in the method for now
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
// log::trace!("[OnFinalizeActor] Stopped.");
// Effect is ensured by the `stopping` handler.
}
}
// Handler for messages from the UPSTREAM source (StreamMessage<O>)
impl<O, Fut, F> Handler<StreamMessage<O>> for OnFinalizeActor<O, Fut, F>
where
O: CloneableStreamable,
Fut: Future<Output = ()> + Send + 'static,
F: FnOnce() -> Fut + Send + 'static + Unpin,
{
type Result = ();
fn handle(&mut self, msg: StreamMessage<O>, ctx: &mut Context<Self>) {
// log::trace!("[OnFinalizeActor] Received from upstream: {:?}", msg);
if self.upstream_has_terminated {
// If upstream already sent a terminal message, ignore further messages.
// log::warn!("[OnFinalizeActor] Received message after upstream termination: {:?}", msg);
return;
}
match msg {
StreamMessage::Element(o) => {
// Forward the element. If sending fails, it means the downstream recipient
// is no longer available (e.g., a 'take' operator has completed and stopped listening).
// In such cases, we should trigger the finalization effect and stop this actor.
if self.downstream_recipient.try_send(StreamMessage::Element(o.clone())).is_err() {
// log::debug!("[OnFinalizeActor] Downstream recipient is gone (e.g., 'take' completed or downstream error). Triggering effect and stopping.");
// Trigger effect: stopping() will be called by ctx.stop(), which handles it.
// We call trigger_effect_if_needed here to ensure it's initiated if stopping() pathway is delayed or complex.
// The effect_triggered flag prevents double execution.
self.trigger_effect_if_needed(ctx);
ctx.stop(); // This will invoke self.stopping()
}
}
StreamMessage::End => {
self.upstream_has_terminated = true;
// The effect is triggered when the actor stops.
// self.stopping() will be called as part of ctx.stop().
self.send_terminal_to_downstream_and_stop(StreamMessage::End, ctx);
}
// Note: StreamMessage does not have an Error variant.
// Errors from an upstream SetupFn or unexpected upstream actor termination
// should lead to this actor being stopped by its supervisor or the Actix runtime,
// which in turn calls self.stopping(), triggering the finalize effect.
}
}
}