1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::stream::{CloneableStreamable, StreamMessage, Streamable};
use actix::prelude::*; // Changed to prelude
// TakeWhileActor takes elements from the stream as long as the predicate is true.
// Once the predicate returns false, it stops taking elements and ends the downstream.
#[derive(Debug)]
pub(crate) struct TakeWhileActor<Out, P>
where
Out: Streamable, // Using Streamable for concise and correct bounds
P: FnMut(&Out) -> bool + Send + 'static + Unpin, // Predicate function
{
predicate: P,
downstream: Recipient<StreamMessage<Out>>,
is_taking: bool, // State to track if we are still in the "taking" phase
}
impl<Out, P> TakeWhileActor<Out, P>
where
Out: Streamable,
P: FnMut(&Out) -> bool + Send + 'static + Unpin,
{
pub(crate) fn new(predicate: P, downstream: Recipient<StreamMessage<Out>>) -> Self {
Self {
predicate,
downstream,
is_taking: true, // Start in the taking phase
}
}
}
impl<Out, P> Actor for TakeWhileActor<Out, P>
where
Out: Streamable,
P: FnMut(&Out) -> bool + Send + 'static + Unpin,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
// log::debug!("TakeWhileActor started. Initial state: is_taking = {}", self.is_taking);
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
// log::debug!("TakeWhileActor stopped. Ensuring End message is sent if still taking or not properly ended.");
// If the actor stops for any reason (e.g. downstream closed, or manually stopped)
// and we were still in the "taking" phase, or if the End wasn't sent explicitly
// after predicate turned false, ensure downstream gets an End.
// This handles cases where the actor might stop before the predicate turns false
// or before an upstream End is received.
if self.is_taking {
let _ = self.downstream.try_send(StreamMessage::End);
}
// If !self.is_taking, it means either predicate became false (End was sent)
// or upstream End was received (End was sent).
}
}
impl<Out, P> Handler<StreamMessage<Out>> for TakeWhileActor<Out, P>
where
Out: CloneableStreamable, // Clone needed for StreamMessage::Element(item.clone())
P: FnMut(&Out) -> bool + Send + 'static + Unpin,
{
type Result = ();
fn handle(&mut self, msg: StreamMessage<Out>, ctx: &mut Context<Self>) {
if !self.is_taking {
// Already stopped taking, or upstream ended and this actor is shutting down.
// If upstream sends End after we've stopped taking, we still need to stop the actor.
if matches!(msg, StreamMessage::End) {
// log::debug!("TakeWhileActor: Received End after stop_taking. Ensuring actor stops.");
ctx.stop(); // Will trigger `stopped()` which ensures End to downstream if needed.
}
return;
}
match msg {
StreamMessage::Element(item) => {
// log::trace!("TakeWhileActor received element: {:?}, is_taking: {}", item, self.is_taking);
if (self.predicate)(&item) {
// Predicate is true, forward the item.
// log::trace!("TakeWhileActor: Predicate true for {:?}. Forwarding.", item);
if self
.downstream
.try_send(StreamMessage::Element(item.clone()))
.is_err()
{
// log::warn!("TakeWhileActor: Downstream recipient closed. Stopping actor and further takes.");
self.is_taking = false; // No longer taking as downstream is gone
ctx.stop(); // Stop the actor
}
} else {
// Predicate is false, stop taking and end the stream.
// log::debug!("TakeWhileActor: Predicate became false for {:?}. Stopping take and sending End.", item);
self.is_taking = false; // Transition out of taking state
let _ = self.downstream.try_send(StreamMessage::End); // Notify downstream that this stream part is ending
ctx.stop(); // Stop this actor
}
}
StreamMessage::End => {
// log::debug!("TakeWhileActor received End from upstream while still taking.");
// Upstream ended while we were still taking.
self.is_taking = false; // No longer in a taking state because upstream ended.
let _ = self.downstream.try_send(StreamMessage::End); // Propagate End to downstream.
ctx.stop(); // Stop this actor.
}
}
}
}