1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use crate::stream::{StreamMessage, Streamable, CloneableStreamable};
use actix::prelude::*;
use std::marker::PhantomData;
#[derive(Debug)] // Added Debug for the actor struct itself
pub(crate) struct FoldActor<Out, Acc, F>
where
Out: Streamable, // Input item type
Acc: CloneableStreamable, // Accumulator type, must be Cloneable
F: FnMut(Acc, Out) -> Acc + Send + 'static + Unpin, // Fold function
{
accumulator: Option<Acc>, // Holds the current accumulated value
initial_value: Acc, // Initial value for the accumulator
fold_fn: F, // The folding function
downstream: Recipient<StreamMessage<Acc>>, // Where to send the final result
_phantom_out: PhantomData<Out>, // To use the Out type parameter
}
impl<Out, Acc, F> FoldActor<Out, Acc, F>
where
Out: Streamable,
Acc: CloneableStreamable,
F: FnMut(Acc, Out) -> Acc + Send + 'static + Unpin,
{
pub(crate) fn new(initial: Acc, fold_fn: F, downstream: Recipient<StreamMessage<Acc>>) -> Self {
FoldActor {
accumulator: None, // Accumulator starts empty, initialized with initial_value on first item
initial_value: initial,
fold_fn,
downstream,
_phantom_out: PhantomData, // Initialize PhantomData
}
}
}
impl<Out, Acc, F> Actor for FoldActor<Out, Acc, F>
where
Out: Streamable,
Acc: CloneableStreamable,
F: FnMut(Acc, Out) -> Acc + Send + 'static + Unpin,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
// log::debug!("FoldActor started with initial value: {:?}", self.initial_value);
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
// log::debug!("FoldActor stopped. Ensuring End is sent to downstream.");
// If the actor is stopped for any reason before sending the final accumulated value
// (e.g., downstream closed, panic), ensure downstream gets an End signal.
// Note: If the stream completes normally (End message handled), this might be redundant
// but ensures graceful termination in other scenarios.
// Consider if the accumulator should be sent here if it exists and hasn't been sent.
// For fold, typically only one value is sent upon completion of the input stream.
// If it stops prematurely, sending a partial fold might be misleading.
// So, just sending End is a safe default.
let _ = self.downstream.try_send(StreamMessage::End);
}
}
impl<Out, Acc, F> Handler<StreamMessage<Out>> for FoldActor<Out, Acc, F>
where
Out: Streamable,
Acc: CloneableStreamable,
F: FnMut(Acc, Out) -> Acc + Send + 'static + Unpin,
{
type Result = ();
fn handle(&mut self, msg: StreamMessage<Out>, ctx: &mut Context<Self>) {
// If actor is already stopping/stopped, ignore new messages.
if ctx.state() == ActorState::Stopping || ctx.state() == ActorState::Stopped {
return;
}
match msg {
StreamMessage::Element(out_item) => {
// log::trace!("FoldActor received element: {:?}. Current accumulator: {:?}", out_item, self.accumulator);
// Take the current accumulator or use initial_value if it's the first element.
let current_acc = self
.accumulator
.take()
.unwrap_or_else(|| self.initial_value.clone());
// Apply the fold function.
self.accumulator = Some((self.fold_fn)(current_acc, out_item));
// log::trace!("FoldActor new accumulator: {:?}", self.accumulator);
}
StreamMessage::End => {
// log::debug!("FoldActor received End from upstream.");
// Upstream ended. Take the final accumulated value (or initial if no elements were received).
let final_value = self
.accumulator
.take()
.unwrap_or_else(|| self.initial_value.clone());
// log::debug!("FoldActor sending final accumulated value: {:?}", final_value);
// Send the final value.
if self.downstream.try_send(StreamMessage::Element(final_value)).is_ok() {
// If final value sent successfully, also send End for this stream.
let _ = self.downstream.try_send(StreamMessage::End);
} else {
// log::warn!("FoldActor: Downstream recipient closed before final value could be sent.");
// Downstream is closed. The stopped() method will attempt to send End.
}
// Stop the actor as its work is done.
if ctx.state() != ActorState::Stopping && ctx.state() != ActorState::Stopped {
ctx.stop();
}
}
}
}
}