use async_trait::async_trait;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::RwLock;
use crate::{
error::Result,
streaming::{PipelineConfig, RDFEvent},
};
type AggregationFn = Box<dyn Fn(&[RDFEvent]) -> Option<RDFEvent> + Send + Sync>;
#[async_trait]
pub trait PipelineStage: Send + Sync {
async fn process(&self, event: RDFEvent) -> Result<Vec<RDFEvent>>;
fn name(&self) -> &str;
}
#[derive(Debug, Clone)]
pub enum WindowType {
Fixed(Duration),
Sliding { size: Duration, slide: Duration },
Session(Duration),
}
#[derive(Debug, Clone)]
pub struct TimeWindow {
pub start: Instant,
pub end: Instant,
pub events: Vec<RDFEvent>,
}
impl TimeWindow {
pub fn new(start: Instant, duration: Duration) -> Self {
Self {
start,
end: start + duration,
events: Vec::new(),
}
}
pub fn contains(&self, timestamp: Instant) -> bool {
timestamp >= self.start && timestamp < self.end
}
pub fn add_event(&mut self, event: RDFEvent) {
self.events.push(event);
}
}
pub struct WindowAggregator {
window_type: WindowType,
aggregation_fn: AggregationFn,
windows: Arc<RwLock<VecDeque<TimeWindow>>>,
watermark: Arc<RwLock<Instant>>,
}
impl WindowAggregator {
pub fn new<F>(window_type: WindowType, aggregation_fn: F) -> Self
where
F: Fn(&[RDFEvent]) -> Option<RDFEvent> + Send + Sync + 'static,
{
Self {
window_type,
aggregation_fn: Box::new(aggregation_fn),
windows: Arc::new(RwLock::new(VecDeque::new())),
watermark: Arc::new(RwLock::new(Instant::now())),
}
}
async fn update_watermark(&self, event_time: Instant) -> Vec<RDFEvent> {
let mut watermark = self.watermark.write().await;
*watermark = (*watermark).max(event_time);
let mut results = Vec::new();
let mut windows = self.windows.write().await;
while let Some(window) = windows.front() {
if window.end <= *watermark {
let completed = windows
.pop_front()
.expect("window should exist after front() check");
if let Some(result) = (self.aggregation_fn)(&completed.events) {
results.push(result);
}
} else {
break;
}
}
results
}
async fn get_window(&self, timestamp: Instant) -> Result<()> {
let mut windows = self.windows.write().await;
match &self.window_type {
WindowType::Fixed(duration) => {
let window_start = timestamp
- Duration::from_millis(
timestamp.duration_since(Instant::now()).as_millis() as u64
% duration.as_millis() as u64,
);
let exists = windows.iter().any(|w| w.contains(timestamp));
if !exists {
windows.push_back(TimeWindow::new(window_start, *duration));
}
}
WindowType::Sliding { size, slide } => {
let mut window_start = timestamp - *size;
while window_start <= timestamp {
let exists = windows
.iter()
.any(|w| w.start == window_start && w.contains(timestamp));
if !exists {
windows.push_back(TimeWindow::new(window_start, *size));
}
window_start += *slide;
}
}
WindowType::Session(_gap) => {
}
}
Ok(())
}
}
#[async_trait]
impl PipelineStage for WindowAggregator {
async fn process(&self, event: RDFEvent) -> Result<Vec<RDFEvent>> {
let event_time = Instant::now();
self.get_window(event_time).await?;
let mut windows = self.windows.write().await;
for window in windows.iter_mut() {
if window.contains(event_time) {
window.add_event(event.clone());
}
}
drop(windows);
Ok(self.update_watermark(event_time).await)
}
fn name(&self) -> &str {
"WindowAggregator"
}
}
pub struct FilterStage {
name: String,
predicate: Box<dyn Fn(&RDFEvent) -> bool + Send + Sync>,
}
impl FilterStage {
pub fn new<F>(name: String, predicate: F) -> Self
where
F: Fn(&RDFEvent) -> bool + Send + Sync + 'static,
{
Self {
name,
predicate: Box::new(predicate),
}
}
}
#[async_trait]
impl PipelineStage for FilterStage {
async fn process(&self, event: RDFEvent) -> Result<Vec<RDFEvent>> {
if (self.predicate)(&event) {
Ok(vec![event])
} else {
Ok(vec![])
}
}
fn name(&self) -> &str {
&self.name
}
}
pub struct TransformStage {
name: String,
transform_fn: Box<dyn Fn(RDFEvent) -> Result<RDFEvent> + Send + Sync>,
}
impl TransformStage {
pub fn new<F>(name: String, transform_fn: F) -> Self
where
F: Fn(RDFEvent) -> Result<RDFEvent> + Send + Sync + 'static,
{
Self {
name,
transform_fn: Box::new(transform_fn),
}
}
}
#[async_trait]
impl PipelineStage for TransformStage {
async fn process(&self, event: RDFEvent) -> Result<Vec<RDFEvent>> {
Ok(vec![(self.transform_fn)(event)?])
}
fn name(&self) -> &str {
&self.name
}
}
pub struct StreamPipeline {
name: String,
stages: Vec<Box<dyn PipelineStage>>,
#[allow(dead_code)]
config: PipelineConfig,
metrics: Arc<RwLock<PipelineMetrics>>,
}
#[derive(Debug, Default, Clone)]
pub struct PipelineMetrics {
pub events_processed: u64,
pub events_dropped: u64,
pub processing_time: Duration,
pub stage_metrics: HashMap<String, StageMetrics>,
}
#[derive(Debug, Default, Clone)]
pub struct StageMetrics {
pub events_in: u64,
pub events_out: u64,
pub processing_time: Duration,
pub errors: u64,
}
impl StreamPipeline {
pub fn new(name: String, config: PipelineConfig) -> Self {
Self {
name,
stages: Vec::new(),
config,
metrics: Arc::new(RwLock::new(PipelineMetrics::default())),
}
}
pub fn add_stage(&mut self, stage: Box<dyn PipelineStage>) {
self.stages.push(stage);
}
pub async fn process(&self, event: RDFEvent) -> Result<Vec<RDFEvent>> {
let start = Instant::now();
let mut current_events = vec![event];
let mut metrics = self.metrics.write().await;
for stage in &self.stages {
let stage_start = Instant::now();
let mut next_events = Vec::new();
let stage_name = stage.name().to_string();
let stage_metrics = metrics
.stage_metrics
.entry(stage_name.clone())
.or_insert_with(StageMetrics::default);
stage_metrics.events_in += current_events.len() as u64;
for event in current_events {
match stage.process(event).await {
Ok(outputs) => {
next_events.extend(outputs);
}
Err(e) => {
tracing::error!("Pipeline stage {} error: {}", stage.name(), e);
stage_metrics.errors += 1;
}
}
}
stage_metrics.events_out += next_events.len() as u64;
stage_metrics.processing_time += stage_start.elapsed();
current_events = next_events;
if current_events.is_empty() {
break;
}
}
metrics.events_processed += 1;
if current_events.is_empty() {
metrics.events_dropped += 1;
}
metrics.processing_time += start.elapsed();
Ok(current_events)
}
pub async fn get_metrics(&self) -> PipelineMetrics {
(*self.metrics.read().await).clone()
}
}
pub struct PipelineBuilder {
name: String,
config: PipelineConfig,
stages: Vec<Box<dyn PipelineStage>>,
}
impl PipelineBuilder {
pub fn new(name: String) -> Self {
Self {
name,
config: PipelineConfig::default(),
stages: Vec::new(),
}
}
pub fn with_config(mut self, config: PipelineConfig) -> Self {
self.config = config;
self
}
pub fn filter<F>(mut self, name: &str, predicate: F) -> Self
where
F: Fn(&RDFEvent) -> bool + Send + Sync + 'static,
{
self.stages
.push(Box::new(FilterStage::new(name.to_string(), predicate)));
self
}
pub fn transform<F>(mut self, name: &str, transform_fn: F) -> Self
where
F: Fn(RDFEvent) -> Result<RDFEvent> + Send + Sync + 'static,
{
self.stages.push(Box::new(TransformStage::new(
name.to_string(),
transform_fn,
)));
self
}
pub fn window<F>(mut self, window_type: WindowType, aggregation_fn: F) -> Self
where
F: Fn(&[RDFEvent]) -> Option<RDFEvent> + Send + Sync + 'static,
{
self.stages
.push(Box::new(WindowAggregator::new(window_type, aggregation_fn)));
self
}
pub fn build(self) -> StreamPipeline {
let mut pipeline = StreamPipeline::new(self.name, self.config);
for stage in self.stages {
pipeline.add_stage(stage);
}
pipeline
}
}
#[cfg(test)]
mod tests {
use super::*;
use oxirs_core::{Literal, NamedNode};
#[tokio::test]
async fn test_filter_stage() {
let stage = FilterStage::new("test_filter".to_string(), |event| {
matches!(event, RDFEvent::GraphCleared { .. })
});
let event1 = RDFEvent::GraphCleared {
graph: "test".to_string(),
timestamp: 12345,
};
let event2 = RDFEvent::TripleAdded {
triple: oxirs_core::Triple::new(
NamedNode::new("http://example.org/subject").unwrap(),
NamedNode::new("http://example.org/predicate").unwrap(),
Literal::new("object"),
),
graph: None,
timestamp: 12345,
};
let result1 = stage.process(event1).await.unwrap();
assert_eq!(result1.len(), 1);
let result2 = stage.process(event2).await.unwrap();
assert_eq!(result2.len(), 0);
}
#[test]
fn test_pipeline_builder() {
let pipeline = PipelineBuilder::new("test_pipeline".to_string())
.filter("graph_events", |event| {
matches!(event, RDFEvent::GraphCleared { .. })
})
.transform("add_metadata", |event| {
Ok(event)
})
.build();
assert_eq!(pipeline.name, "test_pipeline");
assert_eq!(pipeline.stages.len(), 2);
}
}