use crate::core::stream::{StreamElement, StreamMessage};
use crate::error::Result;
use async_trait::async_trait;
use std::sync::Arc;
#[async_trait]
pub trait StreamOperator: Send + Sync {
async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>>;
fn name(&self) -> &str;
async fn initialize(&mut self) -> Result<()> {
Ok(())
}
async fn finalize(&mut self) -> Result<()> {
Ok(())
}
}
#[async_trait]
pub trait SourceOperator: Send + Sync {
async fn produce(&mut self) -> Result<Vec<StreamMessage>>;
async fn has_more(&self) -> bool;
fn name(&self) -> &str;
}
#[async_trait]
pub trait SinkOperator: Send + Sync {
async fn consume(&mut self, messages: Vec<StreamMessage>) -> Result<()>;
async fn flush(&mut self) -> Result<()>;
fn name(&self) -> &str;
}
#[async_trait]
pub trait TransformOperator: StreamOperator {
async fn transform(&mut self, element: StreamElement) -> Result<Vec<StreamElement>>;
}
pub struct FilterOperator<F>
where
F: Fn(&StreamElement) -> bool + Send + Sync,
{
predicate: Arc<F>,
name: String,
}
impl<F> FilterOperator<F>
where
F: Fn(&StreamElement) -> bool + Send + Sync,
{
pub fn new(predicate: F, name: String) -> Self {
Self {
predicate: Arc::new(predicate),
name,
}
}
}
#[async_trait]
impl<F> StreamOperator for FilterOperator<F>
where
F: Fn(&StreamElement) -> bool + Send + Sync,
{
async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
match message {
StreamMessage::Data(elem) => {
if (self.predicate)(&elem) {
Ok(vec![StreamMessage::Data(elem)])
} else {
Ok(vec![])
}
}
other => Ok(vec![other]),
}
}
fn name(&self) -> &str {
&self.name
}
}
pub struct MapOperator<F>
where
F: Fn(StreamElement) -> StreamElement + Send + Sync,
{
mapper: Arc<F>,
name: String,
}
impl<F> MapOperator<F>
where
F: Fn(StreamElement) -> StreamElement + Send + Sync,
{
pub fn new(mapper: F, name: String) -> Self {
Self {
mapper: Arc::new(mapper),
name,
}
}
}
#[async_trait]
impl<F> StreamOperator for MapOperator<F>
where
F: Fn(StreamElement) -> StreamElement + Send + Sync,
{
async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
match message {
StreamMessage::Data(elem) => {
let transformed = (self.mapper)(elem);
Ok(vec![StreamMessage::Data(transformed)])
}
other => Ok(vec![other]),
}
}
fn name(&self) -> &str {
&self.name
}
}
pub struct FlatMapOperator<F>
where
F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
{
mapper: Arc<F>,
name: String,
}
impl<F> FlatMapOperator<F>
where
F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
{
pub fn new(mapper: F, name: String) -> Self {
Self {
mapper: Arc::new(mapper),
name,
}
}
}
#[async_trait]
impl<F> StreamOperator for FlatMapOperator<F>
where
F: Fn(StreamElement) -> Vec<StreamElement> + Send + Sync,
{
async fn process(&mut self, message: StreamMessage) -> Result<Vec<StreamMessage>> {
match message {
StreamMessage::Data(elem) => {
let elements = (self.mapper)(elem);
Ok(elements.into_iter().map(StreamMessage::Data).collect())
}
other => Ok(vec![other]),
}
}
fn name(&self) -> &str {
&self.name
}
}
pub struct LoggingSink {
name: String,
count: u64,
}
impl LoggingSink {
pub fn new(name: String) -> Self {
Self { name, count: 0 }
}
pub fn count(&self) -> u64 {
self.count
}
}
#[async_trait]
impl SinkOperator for LoggingSink {
async fn consume(&mut self, messages: Vec<StreamMessage>) -> Result<()> {
for msg in messages {
match msg {
StreamMessage::Data(elem) => {
tracing::debug!(
sink = %self.name,
event_time = %elem.event_time,
size = elem.size_bytes(),
"Received element"
);
self.count += 1;
}
StreamMessage::Watermark(wm) => {
tracing::debug!(sink = %self.name, watermark = %wm, "Received watermark");
}
StreamMessage::Checkpoint(id) => {
tracing::debug!(sink = %self.name, checkpoint = id, "Received checkpoint");
}
StreamMessage::EndOfStream => {
tracing::info!(sink = %self.name, "Received end of stream");
}
}
}
Ok(())
}
async fn flush(&mut self) -> Result<()> {
tracing::debug!(sink = %self.name, "Flushing sink");
Ok(())
}
fn name(&self) -> &str {
&self.name
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
#[tokio::test]
async fn test_filter_operator() {
let mut filter = FilterOperator::new(
|elem: &StreamElement| elem.data.len() > 2,
"test_filter".to_string(),
);
let elem1 = StreamElement::new(vec![1, 2, 3], Utc::now());
let elem2 = StreamElement::new(vec![1], Utc::now());
let result1 = filter
.process(StreamMessage::Data(elem1))
.await
.expect("filter should process element");
assert_eq!(result1.len(), 1);
let result2 = filter
.process(StreamMessage::Data(elem2))
.await
.expect("filter should process element");
assert_eq!(result2.len(), 0);
}
#[tokio::test]
async fn test_map_operator() {
let mut mapper = MapOperator::new(
|mut elem: StreamElement| {
elem.data.push(99);
elem
},
"test_map".to_string(),
);
let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
let result = mapper
.process(StreamMessage::Data(elem))
.await
.expect("map should transform element");
assert_eq!(result.len(), 1);
if let StreamMessage::Data(transformed) = &result[0] {
assert_eq!(transformed.data.len(), 4);
assert_eq!(transformed.data[3], 99);
} else {
panic!("Expected data message");
}
}
#[tokio::test]
async fn test_flat_map_operator() {
let mut flat_mapper = FlatMapOperator::new(
|elem: StreamElement| {
vec![
StreamElement::new(elem.data.clone(), elem.event_time),
StreamElement::new(elem.data.clone(), elem.event_time),
]
},
"test_flatmap".to_string(),
);
let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
let result = flat_mapper
.process(StreamMessage::Data(elem))
.await
.expect("flat_map should process element");
assert_eq!(result.len(), 2);
}
#[tokio::test]
async fn test_logging_sink() {
let mut sink = LoggingSink::new("test_sink".to_string());
let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
sink.consume(vec![StreamMessage::Data(elem)])
.await
.expect("sink should consume element");
assert_eq!(sink.count(), 1);
}
}