use pandrs::error::Result;
use pandrs::{
AggregationType, DataStream, MetricType, RealTimeAnalytics, StreamAggregator, StreamConfig,
StreamConnector, StreamProcessor, StreamRecord,
};
use std::collections::HashMap;
use std::thread;
use std::time::Duration;
#[test]
#[allow(clippy::result_large_err)]
fn test_data_stream_basics() -> Result<()> {
let headers = [
"id".to_string(),
"value".to_string(),
"category".to_string(),
];
let config = StreamConfig {
buffer_size: 100,
window_size: Some(10),
window_duration: None,
processing_interval: Duration::from_millis(10),
batch_size: 10,
};
let (connector, mut stream) = StreamConnector::new(headers.to_vec(), Some(config));
for i in 0..50 {
let fields = HashMap::from([
("id".to_string(), format!("{i}")),
("value".to_string(), format!("{}", i as f64 / 10.0)),
("category".to_string(), format!("cat{}", i % 5)),
]);
connector.send_fields(fields)?;
}
drop(connector);
let results = stream.process(
|batch| {
Ok(batch.len())
},
None,
)?;
assert!(!results.is_empty());
assert_eq!(results.iter().sum::<usize>(), 50);
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_stream_aggregator() -> Result<()> {
let headers = vec![
"id".to_string(),
"value".to_string(),
"quantity".to_string(),
];
let stream = DataStream::new(headers.clone(), None);
let mut aggregator = StreamAggregator::new(stream);
aggregator.add_aggregator("value", AggregationType::Average)?;
aggregator.add_aggregator("value", AggregationType::Max)?;
aggregator.add_aggregator("quantity", AggregationType::Sum)?;
println!("StreamAggregator API successfully created");
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_stream_processor() -> Result<()> {
let headers = ["id".to_string(), "text".to_string(), "category".to_string()];
let (_connector, stream) = StreamConnector::new(headers.to_vec(), None);
let mut processor = StreamProcessor::new(stream);
processor.add_transformer("text", |value| {
Ok(value.to_uppercase())
})?;
processor.set_filter(|record| {
if let Some(category) = record.fields.get("category") {
category == "A" || category == "B"
} else {
false
}
});
println!("StreamProcessor API successfully created");
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_window_operation() -> Result<()> {
let headers = ["timestamp".to_string(), "value".to_string()];
let config = StreamConfig {
buffer_size: 100,
window_size: Some(10),
window_duration: None,
processing_interval: Duration::from_millis(10),
batch_size: 10,
};
let (connector, mut stream) = StreamConnector::new(headers.to_vec(), Some(config));
for i in 0..50 {
let fields = HashMap::from([
("timestamp".to_string(), format!("{i}")),
("value".to_string(), format!("{}", i as f64)),
]);
connector.send_fields(fields)?;
}
drop(connector);
let results = stream.window_operation(|window| {
let sum: f64 = window
.iter()
.filter_map(|record| {
record
.fields
.get("value")
.and_then(|s| s.parse::<f64>().ok())
})
.sum();
let count = window.len();
let avg = if count > 0 { sum / count as f64 } else { 0.0 };
Ok(avg)
})?;
assert!(!results.is_empty());
assert!(results.last().unwrap() > &results[0]);
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_real_time_analytics() -> Result<()> {
let headers = vec!["timestamp".to_string(), "value".to_string()];
let stream = DataStream::new(headers.clone(), None);
let mut analytics = RealTimeAnalytics::new(
stream,
10, Duration::from_millis(10), );
analytics.add_metric("avg", "value", MetricType::WindowAverage)?;
analytics.add_metric("change", "value", MetricType::RateOfChange)?;
let metrics = analytics.start_background_processing()?;
let sender = analytics.stream.get_sender().unwrap();
for i in 0..20 {
let fields = HashMap::from([
("timestamp".to_string(), format!("{i}")),
("value".to_string(), format!("{}", i as f64)),
]);
let record = StreamRecord::new(fields);
sender.send(record).unwrap();
thread::sleep(Duration::from_millis(5));
}
thread::sleep(Duration::from_millis(100));
let current_metrics = metrics.lock().unwrap().clone();
assert!(!current_metrics.is_empty());
assert!(current_metrics.get("avg_value").unwrap_or(&0.0) > &0.0);
assert!(current_metrics.get("change_value").unwrap_or(&0.0) > &0.0);
drop(sender);
analytics.stop().expect("Failed to stop analytics");
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_batch_to_dataframe() -> Result<()> {
let headers = vec!["id".to_string(), "value".to_string()];
let _stream = DataStream::new(headers.clone(), None);
let mut records = Vec::new();
for i in 0..10 {
let fields = HashMap::from([
("id".to_string(), format!("{i}")),
("value".to_string(), format!("{}", i as f64)),
]);
records.push(StreamRecord::new(fields));
}
println!("Batch to DataFrame API successfully created");
Ok(())
}