use super::stream_message::StreamMessage;
use crate::actors::actor::Actor;
use crate::actors::messages::Message;
use log::info;
pub struct Flow<F>
where
F: Fn(StreamMessage) -> StreamMessage + Send + 'static,
{
name: String,
transform: F,
downstream: Option<tokio::sync::mpsc::Sender<Message<StreamMessage, StreamMessage>>>,
}
impl<F> Flow<F>
where
F: Fn(StreamMessage) -> StreamMessage + Send + 'static,
{
pub fn new(name: String, transform: F) -> Self {
Flow {
name,
transform,
downstream: None,
}
}
pub fn set_downstream(
&mut self,
sender: tokio::sync::mpsc::Sender<Message<StreamMessage, StreamMessage>>,
) {
self.downstream = Some(sender);
}
pub fn map<G>(self, g: G) -> Flow<impl Fn(StreamMessage) -> StreamMessage + Send + 'static>
where
G: Fn(StreamMessage) -> StreamMessage + Send + 'static,
{
let name = format!("{}_mapped", self.name);
let f = self.transform;
Flow::new(name, move |msg| g(f(msg)))
}
pub fn filter<P>(
self,
predicate: P,
) -> Flow<impl Fn(StreamMessage) -> StreamMessage + Send + 'static>
where
P: Fn(&StreamMessage) -> bool + Send + 'static,
{
let name = format!("{}_filtered", self.name);
let f = self.transform;
Flow::new(name, move |msg| {
let transformed = f(msg);
if predicate(&transformed) {
transformed
} else {
StreamMessage::Error("Filtered out".to_string())
}
})
}
}
impl<F> Actor<StreamMessage, StreamMessage> for Flow<F>
where
F: Fn(StreamMessage) -> StreamMessage + Send + 'static,
{
async fn receive(&mut self, message: Message<StreamMessage, StreamMessage>) {
if let Some(payload) = message.payload {
info!(actor=self.name.clone().as_str(); "Flow '{}' received message", self.name);
if payload.is_terminal() {
info!(actor=self.name.clone().as_str(); "Flow '{}' received terminal message, forwarding downstream", self.name);
if let Some(downstream) = &self.downstream {
let _ = downstream
.send(Message {
payload: Some(payload),
stop: false,
responder: None,
blocking: None,
})
.await;
}
return;
}
let transformed = (self.transform)(payload);
info!(actor=self.name.clone().as_str(); "Flow '{}' transformed data", self.name);
if let Some(downstream) = &self.downstream {
let _ = downstream
.send(Message {
payload: Some(transformed),
stop: false,
responder: None,
blocking: None,
})
.await;
}
}
}
}
pub mod transforms {
use super::StreamMessage;
pub fn to_uppercase(msg: StreamMessage) -> StreamMessage {
match msg {
StreamMessage::Data(bytes) => {
if let Ok(text) = String::from_utf8(bytes) {
StreamMessage::Text(text.to_uppercase())
} else {
StreamMessage::Error("Invalid UTF-8 data".to_string())
}
}
StreamMessage::Text(text) => StreamMessage::Text(text.to_uppercase()),
other => other,
}
}
pub fn filter_empty(msg: StreamMessage) -> StreamMessage {
match &msg {
StreamMessage::Data(bytes) if bytes.is_empty() => {
StreamMessage::Error("Empty data".to_string())
}
StreamMessage::Text(text) if text.is_empty() => {
StreamMessage::Error("Empty text".to_string())
}
_ => msg,
}
}
pub fn identity(msg: StreamMessage) -> StreamMessage {
msg
}
}