1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use crate::stream::{StreamMessage, CloneableStreamable};
use actix::prelude::*; // Changed to prelude for ctx.stop()
use futures::channel::oneshot;
use std::mem;
use std::fmt;
// CollectorActor collects all incoming items into a Vec and sends the Vec
// via a oneshot channel when the input stream ends.
#[derive(fmt::Debug)]
pub(crate) struct CollectorActor<T>
where
T: CloneableStreamable,
{
items: Vec<T>,
// The oneshot sender to send the final Vec<T> when the stream ends.
result_sender: Option<oneshot::Sender<Vec<T>>>,
}
impl<T> CollectorActor<T>
where
T: CloneableStreamable,
{
pub(crate) fn new(result_sender: oneshot::Sender<Vec<T>>) -> Self {
CollectorActor {
items: Vec::new(),
result_sender: Some(result_sender),
}
}
}
impl<T> Actor for CollectorActor<T>
where
T: CloneableStreamable,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
// log::debug!("CollectorActor started.");
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
// log::debug!("CollectorActor stopped. Sending collected items.");
// Ensure that the collected items are sent when the actor stops.
// This handles cases where the actor might be stopped for reasons other
// than receiving StreamMessage::End (e.g., if the system shuts down).
if let Some(sender) = self.result_sender.take() {
// mem::take is used to move items out without needing CollectorActor.items to be Option<Vec<T>>
let _ = sender.send(mem::take(&mut self.items));
}
// If sender.send fails, it means the receiver was dropped, which is acceptable.
}
}
impl<T> Handler<StreamMessage<T>> for CollectorActor<T>
where
T: CloneableStreamable,
{
type Result = ();
fn handle(&mut self, msg: StreamMessage<T>, ctx: &mut Context<Self>) {
match msg {
StreamMessage::Element(elem) => {
// log::trace!("CollectorActor received element: {:?}", elem);
self.items.push(elem);
}
StreamMessage::End => {
// log::debug!("CollectorActor received End. Stopping actor to send results via stopped().");
// The `stopped` method will handle sending the items.
ctx.stop(); // This should now work with actix::prelude::*
}
}
}
}