use crate::error::{Result, TrustformersError};
use futures::stream::{Stream, StreamExt};
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::time::sleep;
pub trait StreamingPipeline: Send + Sync {
type Input: Send + 'static;
type Output: Send + 'static;
type Intermediate: Send + 'static;
fn process_item(
&self,
input: Self::Input,
) -> Pin<Box<dyn std::future::Future<Output = Result<Self::Output>> + Send + '_>>;
fn process_with_intermediate(
&self,
input: Self::Input,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<(Self::Output, Vec<Self::Intermediate>)>>
+ Send
+ '_,
>,
>;
fn create_stream_processor(
&self,
config: StreamConfig,
) -> StreamProcessor<Self::Input, Self::Output, Self::Intermediate>
where
Self: Sized + Clone + 'static,
{
StreamProcessor::new(self.clone(), config)
}
fn create_realtime_processor(
&self,
config: RealTimeConfig,
) -> RealTimeProcessor<Self::Input, Self::Output, Self::Intermediate>
where
Self: Sized + Clone + 'static,
{
RealTimeProcessor::new(self.clone(), config)
}
}
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub buffer_size: usize,
pub max_concurrent: usize,
pub backpressure_threshold: f64,
pub timeout_ms: u64,
pub enable_partial_results: bool,
pub enable_transformations: bool,
pub batch_size: Option<usize>,
pub flush_interval_ms: u64,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
buffer_size: 1000,
max_concurrent: 10,
backpressure_threshold: 0.8,
timeout_ms: 5000,
enable_partial_results: true,
enable_transformations: true,
batch_size: Some(4),
flush_interval_ms: 100,
}
}
}
pub type StreamingConfig = StreamConfig;
#[derive(Debug, Clone)]
pub struct RealTimeConfig {
pub max_latency_ms: u64,
pub priority_levels: usize,
pub enable_preemption: bool,
pub adaptive_batching: bool,
pub quality_threshold: f64,
pub fallback_timeout_ms: u64,
}
impl Default for RealTimeConfig {
fn default() -> Self {
Self {
max_latency_ms: 100,
priority_levels: 3,
enable_preemption: true,
adaptive_batching: true,
quality_threshold: 0.9,
fallback_timeout_ms: 50,
}
}
}
pub struct StreamProcessor<I, O, Int = String>
where
I: Send + 'static,
O: Send + 'static,
Int: Send + 'static,
{
pipeline: Arc<dyn StreamingPipeline<Input = I, Output = O, Intermediate = Int>>,
config: StreamConfig,
buffer: Arc<Mutex<VecDeque<I>>>,
stats: Arc<Mutex<StreamStats>>,
backpressure_controller: BackpressureController,
}
impl<I, O, Int> StreamProcessor<I, O, Int>
where
I: Send + 'static,
O: Send + 'static,
Int: Send + 'static,
{
pub fn new<P>(pipeline: P, config: StreamConfig) -> Self
where
P: StreamingPipeline<Input = I, Output = O, Intermediate = Int> + 'static,
{
Self {
pipeline: Arc::new(pipeline),
config: config.clone(),
buffer: Arc::new(Mutex::new(VecDeque::new())),
stats: Arc::new(Mutex::new(StreamStats::default())),
backpressure_controller: BackpressureController::new(config.backpressure_threshold),
}
}
pub fn new_from_pipeline<P>(pipeline: P, config: StreamConfig) -> StreamProcessor<I, O, String>
where
P: Clone + Send + Sync + 'static,
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
struct MockStreamingPipeline<P, I, O> {
inner: P,
_phantom: std::marker::PhantomData<(I, O)>,
}
impl<P, I, O> StreamingPipeline for MockStreamingPipeline<P, I, O>
where
P: Clone + Send + Sync + 'static,
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
type Input = I;
type Output = O;
type Intermediate = String;
fn process_item(
&self,
_input: Self::Input,
) -> Pin<Box<dyn std::future::Future<Output = Result<Self::Output>> + Send + '_>>
{
Box::pin(async move {
Err(crate::error::TrustformersError::InvalidInput {
message: "Mock streaming pipeline process_item not implemented".to_string(),
parameter: None,
expected: None,
received: None,
suggestion: None,
})
})
}
fn process_with_intermediate(
&self,
_input: Self::Input,
) -> Pin<
Box<
dyn std::future::Future<
Output = Result<(Self::Output, Vec<Self::Intermediate>)>,
> + Send
+ '_,
>,
> {
Box::pin(async move {
Err(crate::error::TrustformersError::InvalidInput {
message:
"Mock streaming pipeline process_with_intermediate not implemented"
.to_string(),
parameter: None,
expected: None,
received: None,
suggestion: None,
})
})
}
}
let mock_pipeline = MockStreamingPipeline {
inner: pipeline,
_phantom: std::marker::PhantomData,
};
StreamProcessor::new(mock_pipeline, config)
}
pub async fn process_stream<S>(
&self,
input_stream: S,
) -> impl Stream<Item = Result<StreamResult<O>>>
where
S: Stream<Item = I> + Send + Unpin + 'static,
{
let (tx, rx) = mpsc::channel(self.config.buffer_size);
let processor = self.clone();
tokio::spawn(async move {
processor.process_stream_internal(input_stream, tx).await;
});
StreamResultStream::new(rx)
}
async fn process_stream_internal<S>(
&self,
mut input_stream: S,
mut output_tx: mpsc::Sender<Result<StreamResult<O>>>,
) where
S: Stream<Item = I> + Send + Unpin,
{
let mut batch_buffer = Vec::new();
let mut last_flush = Instant::now();
let flush_interval = Duration::from_millis(self.config.flush_interval_ms);
while let Some(input) = input_stream.next().await {
if self.backpressure_controller.should_throttle() {
self.update_stats(|stats| stats.backpressure_events += 1);
sleep(Duration::from_millis(10)).await;
continue;
}
if let Some(batch_size) = self.config.batch_size {
batch_buffer.push(input);
if batch_buffer.len() >= batch_size || last_flush.elapsed() >= flush_interval {
self.process_batch(&mut batch_buffer, &mut output_tx).await;
last_flush = Instant::now();
}
} else {
self.process_single_item(input, &mut output_tx).await;
}
}
if !batch_buffer.is_empty() {
self.process_batch(&mut batch_buffer, &mut output_tx).await;
}
}
async fn process_batch(
&self,
batch: &mut Vec<I>,
output_tx: &mut mpsc::Sender<Result<StreamResult<O>>>,
) {
let batch_items = std::mem::take(batch);
let batch_size = batch_items.len();
let start_time = Instant::now();
let semaphore = Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrent));
let mut handles = Vec::new();
for (index, item) in batch_items.into_iter().enumerate() {
let pipeline = self.pipeline.clone();
let permit =
semaphore.clone().acquire_owned().await.expect("semaphore should not be closed");
let tx = output_tx.clone();
let handle = tokio::spawn(async move {
let _permit = permit;
let result = pipeline.process_item(item).await;
let stream_result = match result {
Ok(output) => StreamResult::Complete {
output,
index,
processing_time: start_time.elapsed(),
},
Err(e) => StreamResult::Error {
error: e,
index,
processing_time: start_time.elapsed(),
},
};
let _ = tx.send(Ok(stream_result)).await;
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
self.update_stats(|stats| {
stats.items_processed += batch_size;
stats.avg_batch_size = (stats.avg_batch_size + batch_size as f64) / 2.0;
stats.total_processing_time += start_time.elapsed();
});
}
async fn process_single_item(
&self,
item: I,
output_tx: &mut mpsc::Sender<Result<StreamResult<O>>>,
) {
let start_time = Instant::now();
let result = self.pipeline.process_item(item).await;
let stream_result = match result {
Ok(output) => StreamResult::Complete {
output,
index: 0,
processing_time: start_time.elapsed(),
},
Err(e) => StreamResult::Error {
error: e,
index: 0,
processing_time: start_time.elapsed(),
},
};
let _ = output_tx.send(Ok(stream_result)).await;
self.update_stats(|stats| {
stats.items_processed += 1;
stats.total_processing_time += start_time.elapsed();
});
}
fn update_stats<F>(&self, updater: F)
where
F: FnOnce(&mut StreamStats),
{
if let Ok(mut stats) = self.stats.lock() {
updater(&mut stats);
}
}
pub fn get_stats(&self) -> StreamStats {
self.stats.lock().expect("lock should not be poisoned").clone()
}
}
impl<I, O, Int> Clone for StreamProcessor<I, O, Int>
where
I: Send + 'static,
O: Send + 'static,
Int: Send + 'static,
{
fn clone(&self) -> Self {
Self {
pipeline: self.pipeline.clone(),
config: self.config.clone(),
buffer: self.buffer.clone(),
stats: self.stats.clone(),
backpressure_controller: self.backpressure_controller.clone(),
}
}
}
pub struct RealTimeProcessor<I, O, Int = String>
where
I: Send + 'static,
O: Send + 'static,
Int: Send + 'static,
{
pipeline: Arc<dyn StreamingPipeline<Input = I, Output = O, Intermediate = Int>>,
config: RealTimeConfig,
priority_queues: Arc<Mutex<Vec<VecDeque<PriorityItem<I>>>>>,
stats: Arc<Mutex<RealTimeStats>>,
adaptive_batcher: Arc<Mutex<AdaptiveBatcher>>,
}
impl<I, O, Int> RealTimeProcessor<I, O, Int>
where
I: Send + 'static,
O: Send + 'static,
Int: Send + 'static,
{
pub fn new<P>(pipeline: P, config: RealTimeConfig) -> Self
where
P: StreamingPipeline<Input = I, Output = O, Intermediate = Int> + 'static,
{
let priority_queues = (0..config.priority_levels).map(|_| VecDeque::new()).collect();
Self {
pipeline: Arc::new(pipeline),
config: config.clone(),
priority_queues: Arc::new(Mutex::new(priority_queues)),
stats: Arc::new(Mutex::new(RealTimeStats::default())),
adaptive_batcher: Arc::new(Mutex::new(AdaptiveBatcher::new(config.max_latency_ms))),
}
}
pub async fn process_with_priority(&self, item: I, priority: usize) -> Result<O> {
let start_time = Instant::now();
if priority >= self.config.priority_levels {
return Err(TrustformersError::invalid_input_simple(format!(
"Priority {} exceeds maximum level {}",
priority,
self.config.priority_levels - 1
)));
}
{
let mut queues = self.priority_queues.lock().expect("lock should not be poisoned");
queues[priority].push_back(PriorityItem {
item,
timestamp: start_time,
priority,
});
}
self.process_next_priority_item().await
}
async fn process_next_priority_item(&self) -> Result<O> {
let priority_item = {
let mut queues = self.priority_queues.lock().expect("lock should not be poisoned");
let mut found_item = None;
for priority_level in 0..self.config.priority_levels {
if let Some(item) = queues[priority_level].pop_front() {
found_item = Some(item);
break;
}
}
found_item.ok_or_else(|| {
TrustformersError::invalid_input_simple("No items in priority queues".to_string())
})?
};
if self.config.enable_preemption {
let elapsed = priority_item.timestamp.elapsed();
if elapsed.as_millis() > self.config.max_latency_ms as u128 {
self.update_realtime_stats(|stats| stats.preemption_events += 1);
}
}
if self.config.adaptive_batching {
if let Ok(mut batcher) = self.adaptive_batcher.lock() {
batcher.add_sample(priority_item.timestamp.elapsed());
}
}
let result = self.pipeline.process_item(priority_item.item).await;
self.update_realtime_stats(|stats| {
stats.items_processed += 1;
stats.total_latency += priority_item.timestamp.elapsed();
stats.priority_distribution[priority_item.priority] += 1;
});
result
}
fn update_realtime_stats<F>(&self, updater: F)
where
F: FnOnce(&mut RealTimeStats),
{
if let Ok(mut stats) = self.stats.lock() {
updater(&mut stats);
}
}
pub fn get_stats(&self) -> RealTimeStats {
self.stats.lock().expect("lock should not be poisoned").clone()
}
}
pub struct StreamTransformer;
impl StreamTransformer {
pub fn filter<I, F>(predicate: F) -> impl Fn(I) -> Option<I>
where
F: Fn(&I) -> bool + Send + Sync,
I: Send,
{
move |item| if predicate(&item) { Some(item) } else { None }
}
pub fn map<I, O, F>(mapper: F) -> impl Fn(I) -> O
where
F: Fn(I) -> O + Send + Sync,
I: Send,
O: Send,
{
mapper
}
pub fn reduce<I, A, F>(mut accumulator: A, reducer: F) -> impl FnMut(I) -> A
where
F: Fn(A, I) -> A + Send + Sync,
I: Send,
A: Send + Clone,
{
move |item| {
accumulator = reducer(accumulator.clone(), item);
accumulator.clone()
}
}
pub fn window<I>(window_size: usize) -> impl FnMut(I) -> Option<Vec<I>>
where
I: Send + Clone,
{
let mut window = VecDeque::with_capacity(window_size);
move |item| {
window.push_back(item);
if window.len() > window_size {
window.pop_front();
}
if window.len() == window_size {
Some(window.iter().cloned().collect())
} else {
None
}
}
}
}
pub struct PartialResultAggregator<T> {
results: VecDeque<PartialResult<T>>,
config: AggregatorConfig,
}
impl<T> PartialResultAggregator<T>
where
T: Send + Clone,
{
pub fn new(config: AggregatorConfig) -> Self {
Self {
results: VecDeque::new(),
config,
}
}
pub fn add_partial(&mut self, result: PartialResult<T>) {
self.results.push_back(result);
while self.results.len() > self.config.max_partial_results {
self.results.pop_front();
}
}
pub fn try_aggregate(&self) -> Option<T> {
if self.results.len() >= self.config.min_results_for_aggregation {
self.results
.iter()
.rev()
.find(|r| r.confidence >= self.config.confidence_threshold)
.map(|r| r.data.clone())
} else {
None
}
}
pub fn force_aggregate(&self) -> Option<T> {
self.results.back().map(|r| r.data.clone())
}
}
#[derive(Clone)]
pub struct BackpressureController {
threshold: f64,
current_load: Arc<Mutex<f64>>,
measurement_window: VecDeque<Instant>,
}
impl BackpressureController {
pub fn new(threshold: f64) -> Self {
Self {
threshold,
current_load: Arc::new(Mutex::new(0.0)),
measurement_window: VecDeque::new(),
}
}
pub fn should_throttle(&self) -> bool {
let load = *self.current_load.lock().expect("lock should not be poisoned");
load > self.threshold
}
pub fn update_load(&mut self, new_measurement: f64) {
let mut load = self.current_load.lock().expect("lock should not be poisoned");
*load = (*load * 0.9) + (new_measurement * 0.1); }
}
pub struct AdaptiveBatcher {
target_latency_ms: u64,
current_batch_size: usize,
latency_samples: VecDeque<Duration>,
last_adjustment: Instant,
}
impl AdaptiveBatcher {
pub fn new(target_latency_ms: u64) -> Self {
Self {
target_latency_ms,
current_batch_size: 1,
latency_samples: VecDeque::new(),
last_adjustment: Instant::now(),
}
}
pub fn add_sample(&mut self, latency: Duration) {
self.latency_samples.push_back(latency);
while self.latency_samples.len() > 10 {
self.latency_samples.pop_front();
}
if self.last_adjustment.elapsed() > Duration::from_secs(5) {
self.adjust_batch_size();
self.last_adjustment = Instant::now();
}
}
fn adjust_batch_size(&mut self) {
if self.latency_samples.is_empty() {
return;
}
let avg_latency = self.latency_samples.iter().map(|d| d.as_millis() as f64).sum::<f64>()
/ self.latency_samples.len() as f64;
let target = self.target_latency_ms as f64;
if avg_latency > target * 1.2 {
self.current_batch_size = std::cmp::max(1, self.current_batch_size - 1);
} else if avg_latency < target * 0.8 {
self.current_batch_size = std::cmp::min(32, self.current_batch_size + 1);
}
}
pub fn get_current_batch_size(&self) -> usize {
self.current_batch_size
}
}
#[derive(Debug, Clone)]
pub struct AggregatorConfig {
pub max_partial_results: usize,
pub min_results_for_aggregation: usize,
pub confidence_threshold: f64,
pub timeout_ms: u64,
}
impl Default for AggregatorConfig {
fn default() -> Self {
Self {
max_partial_results: 100,
min_results_for_aggregation: 3,
confidence_threshold: 0.8,
timeout_ms: 1000,
}
}
}
#[derive(Debug, Clone)]
pub struct PartialResult<T> {
pub data: T,
pub confidence: f64,
pub timestamp: Instant,
pub processing_stage: String,
}
#[derive(Debug)]
pub struct PriorityItem<T> {
pub item: T,
pub timestamp: Instant,
pub priority: usize,
}
#[derive(Debug)]
pub enum StreamResult<T> {
Complete {
output: T,
index: usize,
processing_time: Duration,
},
Partial {
partial_output: T,
confidence: f64,
index: usize,
processing_time: Duration,
},
Error {
error: TrustformersError,
index: usize,
processing_time: Duration,
},
}
#[derive(Debug, Clone)]
pub struct StreamStats {
pub items_processed: usize,
pub total_processing_time: Duration,
pub avg_batch_size: f64,
pub backpressure_events: usize,
pub throughput_rps: f64,
pub latency_p95_ms: f64,
}
impl Default for StreamStats {
fn default() -> Self {
Self {
items_processed: 0,
total_processing_time: Duration::new(0, 0),
avg_batch_size: 1.0,
backpressure_events: 0,
throughput_rps: 0.0,
latency_p95_ms: 0.0,
}
}
}
#[derive(Debug, Clone)]
pub struct RealTimeStats {
pub items_processed: usize,
pub total_latency: Duration,
pub preemption_events: usize,
pub priority_distribution: Vec<usize>,
pub avg_latency_ms: f64,
pub quality_score: f64,
}
impl Default for RealTimeStats {
fn default() -> Self {
Self {
items_processed: 0,
total_latency: Duration::new(0, 0),
preemption_events: 0,
priority_distribution: vec![0; 3], avg_latency_ms: 0.0,
quality_score: 1.0,
}
}
}
pub struct StreamResultStream<T> {
receiver: mpsc::Receiver<Result<StreamResult<T>>>,
}
impl<T> StreamResultStream<T> {
pub fn new(receiver: mpsc::Receiver<Result<StreamResult<T>>>) -> Self {
Self { receiver }
}
}
impl<T> Stream for StreamResultStream<T> {
type Item = Result<StreamResult<T>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.receiver.poll_recv(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream::iter;
#[derive(Clone)]
struct TestPipeline;
impl StreamingPipeline for TestPipeline {
type Input = String;
type Output = String;
type Intermediate = String;
fn process_item(
&self,
input: Self::Input,
) -> Pin<Box<dyn std::future::Future<Output = Result<Self::Output>> + Send + '_>> {
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(format!("processed: {}", input))
})
}
fn process_with_intermediate(
&self,
input: Self::Input,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<(Self::Output, Vec<Self::Intermediate>)>>
+ Send
+ '_,
>,
> {
Box::pin(async move {
let output = self.process_item(input.clone()).await?;
let intermediate = vec![format!("intermediate: {}", input)];
Ok((output, intermediate))
})
}
}
#[derive(Clone)]
struct ErrorPipeline;
impl StreamingPipeline for ErrorPipeline {
type Input = String;
type Output = String;
type Intermediate = String;
fn process_item(
&self,
_input: Self::Input,
) -> Pin<Box<dyn std::future::Future<Output = Result<Self::Output>> + Send + '_>> {
Box::pin(async move {
Err(crate::error::TrustformersError::invalid_input_simple(
"simulated error".to_string(),
))
})
}
fn process_with_intermediate(
&self,
input: Self::Input,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<(Self::Output, Vec<Self::Intermediate>)>>
+ Send
+ '_,
>,
> {
Box::pin(async move {
Err(crate::error::TrustformersError::invalid_input_simple(
"simulated error".to_string(),
))
})
}
}
#[test]
fn test_stream_config_default_buffer_size() {
let config = StreamConfig::default();
assert!(config.buffer_size > 0);
}
#[test]
fn test_stream_config_backpressure_threshold_in_range() {
let config = StreamConfig::default();
assert!(config.backpressure_threshold > 0.0 && config.backpressure_threshold <= 1.0);
}
#[test]
fn test_stream_config_alias() {
let _config: StreamingConfig = StreamConfig::default();
}
#[test]
fn test_realtime_config_priority_levels_positive() {
let config = RealTimeConfig::default();
assert!(config.priority_levels > 0);
}
#[test]
fn test_realtime_config_quality_threshold_in_range() {
let config = RealTimeConfig::default();
assert!(config.quality_threshold > 0.0 && config.quality_threshold <= 1.0);
}
#[tokio::test]
async fn test_stream_processor() {
let pipeline = TestPipeline;
let config = StreamConfig::default();
let processor = pipeline.create_stream_processor(config);
let inputs = vec!["test1", "test2", "test3"];
let input_stream = iter(inputs.into_iter().map(|s| s.to_string()));
let mut results = processor.process_stream(input_stream).await;
let mut count = 0;
while let Some(result) = results.next().await {
match result.expect("operation failed in test") {
StreamResult::Complete { output, .. } => {
assert!(output.starts_with("processed:"));
count += 1;
},
_ => panic!("Unexpected result type"),
}
}
assert_eq!(count, 3);
}
#[tokio::test]
async fn test_stream_processor_empty_stream() {
let pipeline = TestPipeline;
let config = StreamConfig::default();
let processor = pipeline.create_stream_processor(config);
let input_stream = iter(std::iter::empty::<String>());
let mut results = processor.process_stream(input_stream).await;
let mut count = 0;
while results.next().await.is_some() {
count += 1;
}
assert_eq!(count, 0, "empty stream should produce zero results");
}
#[tokio::test]
async fn test_stream_processor_error_results_in_error_variant() {
let pipeline = ErrorPipeline;
let config = StreamConfig {
batch_size: None,
..StreamConfig::default()
};
let processor = pipeline.create_stream_processor(config);
let input_stream = iter(vec!["item".to_string()]);
let mut results = processor.process_stream(input_stream).await;
if let Some(Ok(StreamResult::Error { .. })) = results.next().await {
} else {
}
}
#[tokio::test]
async fn test_process_with_intermediate_returns_intermediate() {
let pipeline = TestPipeline;
let (output, intermediates) = pipeline
.process_with_intermediate("hello".to_string())
.await
.expect("process_with_intermediate should succeed");
assert!(output.starts_with("processed:"));
assert!(
!intermediates.is_empty(),
"should have at least one intermediate result"
);
}
#[tokio::test]
async fn test_stream_stats_updated_after_processing() {
let pipeline = TestPipeline;
let config = StreamConfig {
batch_size: None,
..StreamConfig::default()
};
let processor = pipeline.create_stream_processor(config);
let input_stream = iter(vec!["a".to_string(), "b".to_string()]);
let mut results = processor.process_stream(input_stream).await;
while results.next().await.is_some() {}
tokio::time::sleep(Duration::from_millis(50)).await;
let stats = processor.get_stats();
let _ = stats; }
#[test]
fn test_backpressure_controller_no_throttle_initially() {
let controller = BackpressureController::new(0.8);
assert!(
!controller.should_throttle(),
"fresh controller should not throttle"
);
}
#[test]
fn test_backpressure_controller_throttles_after_high_load() {
let mut controller = BackpressureController::new(0.1); for _ in 0..20 {
controller.update_load(1.0);
}
assert!(
controller.should_throttle(),
"should throttle when load exceeds threshold"
);
}
#[test]
fn test_backpressure_controller_ema_update() {
let mut controller = BackpressureController::new(0.8);
controller.update_load(0.9);
assert!(
!controller.should_throttle(),
"single high-load update should not immediately throttle due to EMA"
);
}
#[test]
fn test_backpressure_recovers_after_low_load() {
let mut controller = BackpressureController::new(0.05); for _ in 0..30 {
controller.update_load(1.0); }
assert!(
controller.should_throttle(),
"should be throttling after sustained high load"
);
for _ in 0..50 {
controller.update_load(0.0); }
assert!(
!controller.should_throttle(),
"should stop throttling after sustained low load"
);
}
#[tokio::test]
async fn test_realtime_processor() {
let pipeline = TestPipeline;
let config = RealTimeConfig::default();
let processor = pipeline.create_realtime_processor(config);
let result = processor.process_with_priority("test".to_string(), 0).await;
assert!(result.is_ok());
assert_eq!(result.expect("operation failed in test"), "processed: test");
}
#[tokio::test]
async fn test_realtime_processor_invalid_priority_errors() {
let pipeline = TestPipeline;
let config = RealTimeConfig {
priority_levels: 3,
..Default::default()
};
let processor = pipeline.create_realtime_processor(config);
let result = processor.process_with_priority("test".to_string(), 99).await;
assert!(
result.is_err(),
"priority >= priority_levels should be rejected"
);
}
#[test]
fn test_partial_result_aggregator() {
let config = AggregatorConfig::default();
let mut aggregator = PartialResultAggregator::new(config);
aggregator.add_partial(PartialResult {
data: "result1".to_string(),
confidence: 0.9,
timestamp: Instant::now(),
processing_stage: "stage1".to_string(),
});
assert!(
aggregator.try_aggregate().is_none(),
"single result below min threshold"
);
for i in 2..=3 {
aggregator.add_partial(PartialResult {
data: format!("result{}", i),
confidence: 0.9,
timestamp: Instant::now(),
processing_stage: format!("stage{}", i),
});
}
assert!(
aggregator.try_aggregate().is_some(),
"should aggregate with 3 results"
);
}
#[test]
fn test_partial_result_aggregator_low_confidence_not_returned() {
let config = AggregatorConfig {
confidence_threshold: 0.95,
min_results_for_aggregation: 2,
..Default::default()
};
let mut aggregator = PartialResultAggregator::new(config);
for i in 0..3 {
aggregator.add_partial(PartialResult {
data: format!("low{}", i),
confidence: 0.5, timestamp: Instant::now(),
processing_stage: "s".to_string(),
});
}
assert!(
aggregator.try_aggregate().is_none(),
"should not aggregate low-confidence results"
);
}
#[test]
fn test_force_aggregate_returns_last() {
let config = AggregatorConfig::default();
let mut aggregator = PartialResultAggregator::new(config);
aggregator.add_partial(PartialResult {
data: "only_one".to_string(),
confidence: 0.3,
timestamp: Instant::now(),
processing_stage: "s".to_string(),
});
let result = aggregator.force_aggregate();
assert_eq!(result, Some("only_one".to_string()));
}
#[test]
fn test_adaptive_batcher() {
let mut batcher = AdaptiveBatcher::new(100);
assert_eq!(batcher.get_current_batch_size(), 1);
for _ in 0..5 {
batcher.add_sample(Duration::from_millis(200));
}
std::thread::sleep(Duration::from_millis(100));
batcher.last_adjustment = Instant::now() - Duration::from_secs(6);
batcher.add_sample(Duration::from_millis(200));
assert_eq!(
batcher.get_current_batch_size(),
1,
"slow samples should not increase batch size"
);
}
#[test]
fn test_adaptive_batcher_fast_samples_increase_batch_size() {
let mut batcher = AdaptiveBatcher::new(100);
for _ in 0..10 {
batcher.add_sample(Duration::from_millis(50)); }
batcher.last_adjustment = Instant::now() - Duration::from_secs(6);
batcher.add_sample(Duration::from_millis(50));
assert!(
batcher.get_current_batch_size() >= 1,
"batch size must remain at least 1"
);
}
#[test]
fn test_stream_transformer_filter() {
let filter_fn = StreamTransformer::filter(|x: &i32| *x > 2);
assert_eq!(filter_fn(1), None);
assert_eq!(filter_fn(3), Some(3));
}
#[test]
fn test_stream_transformer_map() {
let map_fn = StreamTransformer::map(|x: i32| x * 10);
assert_eq!(map_fn(5), 50);
}
#[test]
fn test_stream_transformer_window_incomplete() {
let mut window_fn = StreamTransformer::window::<i32>(3);
assert_eq!(window_fn(1), None, "window not full yet");
assert_eq!(window_fn(2), None, "window not full yet");
}
#[test]
fn test_stream_transformer_window_complete() {
let mut window_fn = StreamTransformer::window::<i32>(2);
let _ = window_fn(1);
let result = window_fn(2);
assert_eq!(result, Some(vec![1, 2]), "full window should emit slice");
}
}