use crate::core::error::{Error, Result};
use crate::dataframe::DataFrame;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
pub trait StreamingDataSource: Send + Sync {
type Item: Send + Sync + Clone;
type Error: std::error::Error + Send + Sync + 'static;
fn next_batch(
&mut self,
) -> Pin<
Box<
dyn Future<Output = std::result::Result<Option<Vec<Self::Item>>, Self::Error>>
+ Send
+ '_,
>,
>;
fn has_more(&self) -> bool;
fn metadata(&self) -> StreamMetadata;
fn set_batch_size(&mut self, size: usize);
fn reset(&mut self) -> std::result::Result<(), Self::Error>;
fn estimated_size(&self) -> Option<usize>;
}
pub trait StreamingDataSink: Send + Sync {
type Item: Send + Sync + Clone;
type Error: std::error::Error + Send + Sync + 'static;
fn write_batch(
&mut self,
batch: Vec<Self::Item>,
) -> Pin<Box<dyn Future<Output = std::result::Result<(), Self::Error>> + Send + '_>>;
fn flush(
&mut self,
) -> Pin<Box<dyn Future<Output = std::result::Result<(), Self::Error>> + Send + '_>>;
fn close(
&mut self,
) -> Pin<Box<dyn Future<Output = std::result::Result<(), Self::Error>> + Send + '_>>;
fn metadata(&self) -> SinkMetadata;
fn set_max_batch_size(&mut self, size: usize);
}
#[derive(Debug, Clone)]
pub struct StreamMetadata {
pub id: String,
pub stream_type: StreamType,
pub schema: Option<StreamSchema>,
pub estimated_size: Option<usize>,
pub created_at: Instant,
pub properties: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct SinkMetadata {
pub id: String,
pub sink_type: SinkType,
pub schema: Option<StreamSchema>,
pub created_at: Instant,
pub properties: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct StreamSchema {
pub fields: Vec<StreamField>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StreamField {
pub name: String,
pub data_type: StreamDataType,
pub nullable: bool,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamDataType {
Boolean,
Int8,
Int16,
Int32,
Int64,
UInt8,
UInt16,
UInt32,
UInt64,
Float32,
Float64,
String,
Binary,
Date,
DateTime,
Timestamp,
Decimal { precision: u8, scale: i8 },
List(Box<StreamDataType>),
Struct(Vec<StreamField>),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamType {
File,
Network,
Database,
Queue,
Memory,
Kafka,
Custom,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SinkType {
File,
Network,
Database,
Queue,
Memory,
Kafka,
Custom,
}
pub trait StreamProcessor: Send + Sync {
type Input: Send + Sync + Clone;
type Output: Send + Sync + Clone;
type Error: std::error::Error + Send + Sync + 'static;
fn process_batch(
&mut self,
batch: Vec<Self::Input>,
) -> Pin<
Box<dyn Future<Output = std::result::Result<Vec<Self::Output>, Self::Error>> + Send + '_>,
>;
fn metadata(&self) -> ProcessorMetadata;
fn configure(&mut self, config: ProcessorConfig) -> std::result::Result<(), Self::Error>;
fn stats(&self) -> ProcessorStats;
}
#[derive(Debug, Clone)]
pub struct ProcessorMetadata {
pub name: String,
pub processor_type: ProcessorType,
pub input_schema: Option<StreamSchema>,
pub output_schema: Option<StreamSchema>,
pub version: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessorType {
Filter,
Map,
Reduce,
Aggregate,
Join,
Window,
Custom,
}
#[derive(Debug, Clone)]
pub struct ProcessorConfig {
pub parameters: HashMap<String, String>,
pub parallelism: usize,
pub buffer_size: usize,
pub timeout: Option<Duration>,
}
#[derive(Debug, Clone)]
pub struct ProcessorStats {
pub items_processed: u64,
pub total_processing_time: Duration,
pub avg_processing_time: Duration,
pub error_count: u64,
pub last_processed: Option<Instant>,
}
pub struct StreamingPipeline<T> {
stages: Vec<Box<dyn PipelineStage<T>>>,
config: PipelineConfig,
stats: Arc<Mutex<PipelineStats>>,
error_handler: Option<Box<dyn ErrorHandler>>,
}
impl<T> StreamingPipeline<T>
where
T: Send + Sync + Clone + 'static,
{
pub fn new(config: PipelineConfig) -> Self {
Self {
stages: Vec::new(),
config,
stats: Arc::new(Mutex::new(PipelineStats::new())),
error_handler: None,
}
}
pub fn add_stage<S: PipelineStage<T> + 'static>(&mut self, stage: S) {
self.stages.push(Box::new(stage));
}
pub fn set_error_handler<H: ErrorHandler + 'static>(&mut self, handler: H) {
self.error_handler = Some(Box::new(handler));
}
pub async fn execute<S, K>(&mut self, source: S, sink: K) -> Result<()>
where
S: StreamingDataSource<Item = T> + 'static,
K: StreamingDataSink<Item = T> + 'static,
{
let (tx, rx) = bounded(self.config.buffer_size);
let source_handle = self.spawn_source_reader(source, tx).await?;
let processor_handle = self.spawn_pipeline_processor(rx, sink).await?;
let (source_result, processor_result) = tokio::join!(source_handle, processor_handle);
source_result
.map_err(|e| Error::InvalidOperation(format!("Source task failed: {}", e)))??;
processor_result
.map_err(|e| Error::InvalidOperation(format!("Processor task failed: {}", e)))??;
Ok(())
}
async fn spawn_source_reader<S>(
&self,
mut source: S,
tx: Sender<Vec<T>>,
) -> Result<tokio::task::JoinHandle<Result<()>>>
where
S: StreamingDataSource<Item = T> + 'static,
{
let stats = Arc::clone(&self.stats);
let handle = tokio::spawn(async move {
while source.has_more() {
match source.next_batch().await {
Ok(Some(batch)) => {
if let Ok(mut pipeline_stats) = stats.lock() {
pipeline_stats.record_batch_read(batch.len());
}
if tx.send(batch).is_err() {
break; }
}
Ok(None) => break, Err(_e) => {
break;
}
}
}
Ok(())
});
Ok(handle)
}
async fn spawn_pipeline_processor<K>(
&self,
rx: Receiver<Vec<T>>,
mut sink: K,
) -> Result<tokio::task::JoinHandle<Result<()>>>
where
K: StreamingDataSink<Item = T> + 'static,
{
let stats = Arc::clone(&self.stats);
let handle = tokio::spawn(async move {
while let Ok(batch) = rx.recv() {
let current_batch = batch;
if let Err(_e) = sink.write_batch(current_batch.clone()).await {
break;
}
if let Ok(mut pipeline_stats) = stats.lock() {
pipeline_stats.record_batch_processed(current_batch.len());
}
}
let _ = sink.flush().await;
let _ = sink.close().await;
Ok(())
});
Ok(handle)
}
pub fn stats(&self) -> Result<PipelineStats> {
self.stats
.lock()
.map(|stats| stats.clone())
.map_err(|_| Error::InvalidOperation("Failed to acquire stats lock".to_string()))
}
}
pub trait PipelineStage<T>: Send + Sync {
fn process(
&mut self,
data: Vec<T>,
) -> Pin<Box<dyn Future<Output = Result<Vec<T>>> + Send + '_>>;
fn name(&self) -> &str;
fn stats(&self) -> StageStats;
}
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub buffer_size: usize,
pub max_parallelism: usize,
pub timeout: Duration,
pub error_strategy: ErrorStrategy,
pub checkpoint_interval: Option<Duration>,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
buffer_size: 1000,
max_parallelism: num_cpus::get(),
timeout: Duration::from_secs(30),
error_strategy: ErrorStrategy::FailFast,
checkpoint_interval: Some(Duration::from_secs(60)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorStrategy {
FailFast,
SkipErrors,
RetryWithBackoff,
}
#[derive(Debug, Clone)]
pub struct PipelineStats {
pub batches_read: u64,
pub batches_processed: u64,
pub items_read: u64,
pub items_processed: u64,
pub start_time: Option<Instant>,
pub end_time: Option<Instant>,
pub error_count: u64,
}
impl PipelineStats {
pub fn new() -> Self {
Self {
batches_read: 0,
batches_processed: 0,
items_read: 0,
items_processed: 0,
start_time: None,
end_time: None,
error_count: 0,
}
}
pub fn record_batch_read(&mut self, items: usize) {
if self.start_time.is_none() {
self.start_time = Some(Instant::now());
}
self.batches_read += 1;
self.items_read += items as u64;
}
pub fn record_batch_processed(&mut self, items: usize) {
self.batches_processed += 1;
self.items_processed += items as u64;
self.end_time = Some(Instant::now());
}
pub fn record_error(&mut self) {
self.error_count += 1;
}
pub fn duration(&self) -> Option<Duration> {
match (self.start_time, self.end_time) {
(Some(start), Some(end)) => Some(end.duration_since(start)),
_ => None,
}
}
pub fn throughput(&self) -> Option<f64> {
self.duration().map(|d| {
if d.as_secs_f64() > 0.0 {
self.items_processed as f64 / d.as_secs_f64()
} else {
0.0
}
})
}
}
#[derive(Debug, Clone)]
pub struct StageStats {
pub items_processed: u64,
pub processing_time: Duration,
pub error_count: u64,
}
pub trait ErrorHandler: Send + Sync {
fn handle_error(&self, error: &dyn std::error::Error) -> ErrorAction;
fn error_stats(&self) -> ErrorStats;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorAction {
Continue,
Retry,
Abort,
}
#[derive(Debug, Clone)]
pub struct ErrorStats {
pub total_errors: u64,
pub errors_by_type: HashMap<String, u64>,
pub last_error_time: Option<Instant>,
}
pub struct MemoryStreamSource<T> {
data: Vec<Vec<T>>,
current_index: usize,
batch_size: usize,
metadata: StreamMetadata,
}
impl<T> MemoryStreamSource<T>
where
T: Clone + Send + Sync,
{
pub fn new(data: Vec<T>, batch_size: usize) -> Self {
let total_items = data.len();
let batches: Vec<Vec<T>> = data
.chunks(batch_size)
.map(|chunk| chunk.to_vec())
.collect();
let metadata = StreamMetadata {
id: "memory_stream".to_string(),
stream_type: StreamType::Memory,
schema: None,
estimated_size: Some(total_items),
created_at: Instant::now(),
properties: HashMap::new(),
};
Self {
data: batches,
current_index: 0,
batch_size,
metadata,
}
}
}
impl<T> StreamingDataSource for MemoryStreamSource<T>
where
T: Clone + Send + Sync + 'static,
{
type Item = T;
type Error = Error;
fn next_batch(
&mut self,
) -> Pin<
Box<
dyn Future<Output = std::result::Result<Option<Vec<Self::Item>>, Self::Error>>
+ Send
+ '_,
>,
> {
Box::pin(async move {
if self.current_index < self.data.len() {
let batch = self.data[self.current_index].clone();
self.current_index += 1;
Ok(Some(batch))
} else {
Ok(None)
}
})
}
fn has_more(&self) -> bool {
self.current_index < self.data.len()
}
fn metadata(&self) -> StreamMetadata {
self.metadata.clone()
}
fn set_batch_size(&mut self, size: usize) {
self.batch_size = size;
}
fn reset(&mut self) -> std::result::Result<(), Self::Error> {
self.current_index = 0;
Ok(())
}
fn estimated_size(&self) -> Option<usize> {
self.metadata.estimated_size
}
}
pub struct MemoryStreamSink<T> {
data: Arc<Mutex<Vec<T>>>,
max_batch_size: usize,
metadata: SinkMetadata,
}
impl<T> MemoryStreamSink<T>
where
T: Clone + Send + Sync,
{
pub fn new() -> Self {
let metadata = SinkMetadata {
id: "memory_sink".to_string(),
sink_type: SinkType::Memory,
schema: None,
created_at: Instant::now(),
properties: HashMap::new(),
};
Self {
data: Arc::new(Mutex::new(Vec::new())),
max_batch_size: 1000,
metadata,
}
}
pub fn get_data(&self) -> Result<Vec<T>> {
self.data
.lock()
.map(|data| data.clone())
.map_err(|_| Error::InvalidOperation("Failed to acquire data lock".to_string()))
}
}
impl<T> StreamingDataSink for MemoryStreamSink<T>
where
T: Clone + Send + Sync + 'static,
{
type Item = T;
type Error = Error;
fn write_batch(
&mut self,
batch: Vec<Self::Item>,
) -> Pin<Box<dyn Future<Output = std::result::Result<(), Self::Error>> + Send + '_>> {
Box::pin(async move {
if let Ok(mut data) = self.data.lock() {
data.extend(batch);
Ok(())
} else {
Err(Error::InvalidOperation(
"Failed to acquire data lock".to_string(),
))
}
})
}
fn flush(
&mut self,
) -> Pin<Box<dyn Future<Output = std::result::Result<(), Self::Error>> + Send + '_>> {
Box::pin(async move {
Ok(())
})
}
fn close(
&mut self,
) -> Pin<Box<dyn Future<Output = std::result::Result<(), Self::Error>> + Send + '_>> {
Box::pin(async move {
Ok(())
})
}
fn metadata(&self) -> SinkMetadata {
self.metadata.clone()
}
fn set_max_batch_size(&mut self, size: usize) {
self.max_batch_size = size;
}
}
pub trait DataFrameStreaming {
fn to_streaming_source(
&self,
batch_size: usize,
) -> Result<Box<dyn StreamingDataSource<Item = DataFrame, Error = Error>>>;
fn from_streaming_source<S>(source: S) -> Pin<Box<dyn Future<Output = Result<Self>> + Send>>
where
S: StreamingDataSource<Item = DataFrame, Error = Error> + 'static,
Self: Sized;
fn process_streaming<F>(
&self,
batch_size: usize,
processor: F,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>
where
F: Fn(DataFrame) -> Pin<Box<dyn Future<Output = Result<DataFrame>> + Send + 'static>>
+ Send
+ Sync
+ 'static;
}
pub struct StreamWindow<T> {
window_type: WindowType,
window_size: Duration,
slide_interval: Duration,
current_window: Vec<(Instant, T)>,
stats: WindowStats,
}
impl<T> StreamWindow<T>
where
T: Clone + Send + Sync,
{
pub fn new(window_type: WindowType, window_size: Duration, slide_interval: Duration) -> Self {
Self {
window_type,
window_size,
slide_interval,
current_window: Vec::new(),
stats: WindowStats::new(),
}
}
pub fn add(&mut self, timestamp: Instant, data: T) {
self.current_window.push((timestamp, data));
self.evict_expired(timestamp);
self.stats.items_added += 1;
}
pub fn current_data(&self) -> Vec<&T> {
self.current_window.iter().map(|(_, data)| data).collect()
}
pub fn is_ready(&self, current_time: Instant) -> bool {
match self.window_type {
WindowType::Tumbling => {
if let Some((first_time, _)) = self.current_window.first() {
current_time.duration_since(*first_time) >= self.window_size
} else {
false
}
}
WindowType::Sliding => {
if let Some(last_emit) = self.stats.last_emit_time {
current_time.duration_since(last_emit) >= self.slide_interval
} else {
!self.current_window.is_empty()
}
}
WindowType::Session => {
if let Some((last_time, _)) = self.current_window.last() {
current_time.duration_since(*last_time) >= self.slide_interval
} else {
false
}
}
}
}
pub fn emit(&mut self, current_time: Instant) -> Vec<T> {
let data = self
.current_window
.iter()
.map(|(_, data)| data.clone())
.collect();
match self.window_type {
WindowType::Tumbling => {
self.current_window.clear();
}
WindowType::Sliding => {
self.evict_expired(current_time);
}
WindowType::Session => {
self.current_window.clear();
}
}
self.stats.windows_emitted += 1;
self.stats.last_emit_time = Some(current_time);
data
}
fn evict_expired(&mut self, current_time: Instant) {
let cutoff_time = current_time - self.window_size;
self.current_window
.retain(|(timestamp, _)| *timestamp >= cutoff_time);
}
pub fn stats(&self) -> &WindowStats {
&self.stats
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WindowType {
Tumbling,
Sliding,
Session,
}
#[derive(Debug, Clone)]
pub struct WindowStats {
pub items_added: u64,
pub windows_emitted: u64,
pub last_emit_time: Option<Instant>,
}
impl WindowStats {
pub fn new() -> Self {
Self {
items_added: 0,
windows_emitted: 0,
last_emit_time: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio;
#[tokio::test]
async fn test_memory_stream_source() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut source = MemoryStreamSource::new(data, 3);
assert!(source.has_more());
assert_eq!(source.estimated_size(), Some(10));
let batch1 = source
.next_batch()
.await
.expect("operation should succeed")
.expect("operation should succeed");
assert_eq!(batch1, vec![1, 2, 3]);
let batch2 = source
.next_batch()
.await
.expect("operation should succeed")
.expect("operation should succeed");
assert_eq!(batch2, vec![4, 5, 6]);
let batch3 = source
.next_batch()
.await
.expect("operation should succeed")
.expect("operation should succeed");
assert_eq!(batch3, vec![7, 8, 9]);
let batch4 = source
.next_batch()
.await
.expect("operation should succeed")
.expect("operation should succeed");
assert_eq!(batch4, vec![10]);
let batch5 = source.next_batch().await.expect("operation should succeed");
assert!(batch5.is_none());
assert!(!source.has_more());
}
#[tokio::test]
async fn test_memory_stream_sink() {
let mut sink = MemoryStreamSink::new();
sink.write_batch(vec![1, 2, 3])
.await
.expect("operation should succeed");
sink.write_batch(vec![4, 5, 6])
.await
.expect("operation should succeed");
sink.flush().await.expect("operation should succeed");
let data = sink.get_data().expect("operation should succeed");
assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
sink.close().await.expect("operation should succeed");
}
#[tokio::test]
async fn test_streaming_pipeline() {
let config = PipelineConfig::default();
let mut pipeline = StreamingPipeline::new(config);
let source_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let source = MemoryStreamSource::new(source_data, 3);
let sink = MemoryStreamSink::new();
pipeline
.execute(source, sink)
.await
.expect("operation should succeed");
let stats = pipeline.stats().expect("operation should succeed");
assert!(stats.items_read > 0);
assert!(stats.items_processed > 0);
}
#[test]
fn test_stream_window() {
let mut window = StreamWindow::new(
WindowType::Tumbling,
Duration::from_secs(5),
Duration::from_secs(1),
);
let base_time = Instant::now();
window.add(base_time, 1);
window.add(base_time + Duration::from_secs(1), 2);
window.add(base_time + Duration::from_secs(2), 3);
assert_eq!(window.current_data(), vec![&1, &2, &3]);
assert!(!window.is_ready(base_time + Duration::from_secs(3)));
assert!(window.is_ready(base_time + Duration::from_secs(6)));
let emitted = window.emit(base_time + Duration::from_secs(6));
assert_eq!(emitted, vec![1, 2, 3]);
assert!(window.current_data().is_empty());
}
#[test]
fn test_stream_schema() {
let schema = StreamSchema {
fields: vec![
StreamField {
name: "id".to_string(),
data_type: StreamDataType::Int64,
nullable: false,
metadata: HashMap::new(),
},
StreamField {
name: "name".to_string(),
data_type: StreamDataType::String,
nullable: true,
metadata: HashMap::new(),
},
StreamField {
name: "scores".to_string(),
data_type: StreamDataType::List(Box::new(StreamDataType::Float64)),
nullable: false,
metadata: HashMap::new(),
},
],
metadata: HashMap::new(),
};
assert_eq!(schema.fields.len(), 3);
assert_eq!(schema.fields[0].name, "id");
assert_eq!(schema.fields[0].data_type, StreamDataType::Int64);
assert!(!schema.fields[0].nullable);
assert_eq!(schema.fields[1].name, "name");
assert_eq!(schema.fields[1].data_type, StreamDataType::String);
assert!(schema.fields[1].nullable);
if let StreamDataType::List(inner_type) = &schema.fields[2].data_type {
assert_eq!(**inner_type, StreamDataType::Float64);
} else {
panic!("Expected List type");
}
}
#[test]
fn test_pipeline_stats() {
let mut stats = PipelineStats::new();
stats.record_batch_read(100);
stats.record_batch_read(150);
stats.record_batch_processed(90);
stats.record_batch_processed(140);
assert_eq!(stats.batches_read, 2);
assert_eq!(stats.batches_processed, 2);
assert_eq!(stats.items_read, 250);
assert_eq!(stats.items_processed, 230);
assert!(stats.start_time.is_some());
assert!(stats.end_time.is_some());
assert!(stats.duration().is_some());
}
}