#[cfg(feature = "streaming")]
use pandrs::error::Result;
#[cfg(feature = "streaming")]
use pandrs::streaming::{
AggregationType, DataStream, MetricType, RealTimeAnalytics, StreamAggregator, StreamConfig,
StreamConnector, StreamProcessor, StreamRecord,
};
#[cfg(feature = "streaming")]
#[allow(clippy::single_component_path_imports)]
use rand;
#[cfg(feature = "streaming")]
use std::collections::HashMap;
#[cfg(feature = "streaming")]
use std::thread;
#[cfg(feature = "streaming")]
use std::time::{Duration, Instant};
#[cfg(not(feature = "streaming"))]
fn main() {
println!("This example requires the 'streaming' feature flag to be enabled.");
println!("Please recompile with:");
println!(" cargo run --example streaming_example --features streaming");
}
#[cfg(feature = "streaming")]
#[allow(clippy::result_large_err)]
fn main() -> Result<()> {
println!("Streaming Data Processing Example");
println!("--------------------------------");
println!("\nExample 1: Reading CSV as a stream");
csv_stream_example()?;
println!("\nExample 2: Stream aggregation");
stream_aggregation_example()?;
println!("\nExample 3: Real-time analytics");
realtime_analytics_example()?;
println!("\nExample 4: Custom stream connector");
custom_connector_example()?;
Ok(())
}
#[cfg(feature = "streaming")]
#[allow(clippy::result_large_err)]
fn csv_stream_example() -> Result<()> {
let config = StreamConfig {
buffer_size: 1000,
window_size: Some(100),
window_duration: None,
processing_interval: Duration::from_millis(10),
batch_size: 100,
};
let headers = vec![
"timestamp".to_string(),
"value".to_string(),
"category".to_string(),
];
let (connector, mut stream) = StreamConnector::new(headers.clone(), Some(config));
thread::spawn(move || {
let categories = ["A", "B", "C"];
for i in 0..500 {
let fields = HashMap::from([
("timestamp".to_string(), format!("{}", i)),
("value".to_string(), format!("{}", i as f64 / 10.0)),
("category".to_string(), categories[i % 3].to_string()),
]);
let _ = connector.send_fields(fields);
thread::sleep(Duration::from_millis(1));
}
drop(connector);
});
let results = stream.window_operation(|window| {
let sum: f64 = window
.iter()
.filter_map(|record| {
record
.fields
.get("value")
.and_then(|s| s.parse::<f64>().ok())
})
.sum();
let count = window.len();
let avg = if count > 0 { sum / count as f64 } else { 0.0 };
println!("Window size: {}, Average value: {:.2}", count, avg);
Ok(avg)
})?;
println!("Processed {} windows", results.len());
println!(
"Final window average: {:.2}",
results.last().unwrap_or(&0.0)
);
Ok(())
}
#[cfg(feature = "streaming")]
#[allow(clippy::result_large_err)]
fn stream_aggregation_example() -> Result<()> {
let headers = vec![
"timestamp".to_string(),
"value".to_string(),
"quantity".to_string(),
];
let config = StreamConfig {
buffer_size: 1000,
window_size: None,
window_duration: None,
processing_interval: Duration::from_millis(10),
batch_size: 50,
};
let stream = DataStream::new(headers.clone(), Some(config));
let mut aggregator = StreamAggregator::new(stream);
aggregator.add_aggregator("value", AggregationType::Average)?;
aggregator.add_aggregator("value", AggregationType::Max)?;
aggregator.add_aggregator("quantity", AggregationType::Sum)?;
let sender = aggregator.stream.get_sender().unwrap();
thread::spawn(move || {
for i in 0..300 {
let fields = HashMap::from([
("timestamp".to_string(), format!("{}", i)),
("value".to_string(), format!("{}", (i % 100) as f64 / 10.0)),
("quantity".to_string(), format!("{}", i % 10 + 1)),
]);
let record = StreamRecord::new(fields);
let _ = sender.send(record);
thread::sleep(Duration::from_millis(1));
}
drop(sender);
});
let aggregates = aggregator.process()?;
println!("Aggregation results:");
for (column, value) in aggregates {
println!(" {}: {:.2}", column, value);
}
Ok(())
}
#[cfg(feature = "streaming")]
#[allow(clippy::result_large_err)]
fn realtime_analytics_example() -> Result<()> {
let headers = vec![
"timestamp".to_string(),
"temperature".to_string(),
"humidity".to_string(),
];
let stream = DataStream::new(headers.clone(), None);
let mut analytics = RealTimeAnalytics::new(
stream,
20, Duration::from_millis(50), );
analytics.add_metric("avg", "temperature", MetricType::WindowAverage)?;
analytics.add_metric("change", "temperature", MetricType::RateOfChange)?;
analytics.add_metric(
"ema",
"temperature",
MetricType::ExponentialMovingAverage(0.3),
)?;
analytics.add_metric("std", "temperature", MetricType::StandardDeviation)?;
analytics.add_metric("p90", "temperature", MetricType::Percentile(0.9))?;
let metrics = analytics.start_background_processing()?;
let sender = analytics.stream.get_sender().unwrap();
thread::spawn(move || {
for i in 0..100 {
let base_temp = 20.0 + (i as f64 / 10.0).sin() * 5.0;
let noise = rand::random::<f64>() * 2.0 - 1.0;
let temp = base_temp + noise;
let fields = HashMap::from([
("timestamp".to_string(), format!("{}", i)),
("temperature".to_string(), format!("{:.1}", temp)),
("humidity".to_string(), format!("{:.1}", 60.0 + noise * 5.0)),
]);
let record = StreamRecord::new(fields);
let _ = sender.send(record);
thread::sleep(Duration::from_millis(20));
}
drop(sender);
});
let start = Instant::now();
while start.elapsed() < Duration::from_millis(2500) {
let current = metrics.lock().unwrap().clone();
if !current.is_empty() {
println!("Real-time metrics:");
for (name, value) in current {
println!(" {}: {:.2}", name, value);
}
println!();
}
thread::sleep(Duration::from_millis(500));
}
analytics.stop();
Ok(())
}
#[cfg(feature = "streaming")]
#[allow(clippy::result_large_err)]
fn custom_connector_example() -> Result<()> {
let headers = vec![
"timestamp".to_string(),
"event".to_string(),
"source".to_string(),
];
let (connector, stream) = StreamConnector::new(headers, None);
let mut processor = StreamProcessor::new(stream);
processor.add_transformer("event", |value| {
Ok(value.to_uppercase())
})?;
processor.set_filter(|record| {
if let Some(source) = record.fields.get("source") {
source == "system"
} else {
false
}
});
thread::spawn(move || {
let sources = ["user", "system", "application"];
let events = ["login", "logout", "error", "warning", "info"];
for i in 0..100 {
let source = sources[i % sources.len()];
let event = events[i % events.len()];
let fields = HashMap::from([
("timestamp".to_string(), format!("{}", i)),
("event".to_string(), event.to_string()),
("source".to_string(), source.to_string()),
]);
let _ = connector.send_fields(fields);
thread::sleep(Duration::from_millis(10));
}
drop(connector);
});
let results = processor.process()?;
println!("Processed {} batches", results.len());
let mut event_counts = HashMap::new();
for df in &results {
for row_idx in 0..df.row_count() {
if let Ok(event) = df.get_string_value("event", row_idx) {
*event_counts.entry(event.to_string()).or_insert(0) += 1;
}
}
}
println!("Event counts (from 'system' source only):");
for (event, count) in event_counts {
println!(" {}: {}", event, count);
}
Ok(())
}