Skip to main content

opendeviationbar_streaming/
processor.rs

1use futures::Stream;
2/// Production-ready streaming architecture with bounded memory and backpressure
3/// # FILE-SIZE-OK
4///
5/// This module implements true infinite streaming capabilities addressing critical failures:
6/// - Eliminates Vec<OpenDeviationBar> accumulation (unbounded memory growth)
7/// - Implements proper backpressure with bounded channels
8/// - Provides circuit breaker resilience patterns
9/// - Maintains temporal integrity for financial data
10use opendeviationbar_core::processor::ExportOpenDeviationBarProcessor;
11use opendeviationbar_core::{AggTrade, OpenDeviationBar};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::task::{Context, Poll};
16use tokio::sync::mpsc;
17use tokio::time::{Duration, Instant};
18
19/// Configuration for production streaming
20#[derive(Debug, Clone)]
21pub struct StreamingProcessorConfig {
22    /// Channel capacity for trade input
23    pub trade_channel_capacity: usize,
24    /// Channel capacity for completed bars
25    pub bar_channel_capacity: usize,
26    /// Memory usage threshold in bytes
27    pub memory_threshold_bytes: usize,
28    /// Backpressure timeout
29    pub backpressure_timeout: Duration,
30    /// Circuit breaker error rate threshold (0.0-1.0)
31    pub circuit_breaker_threshold: f64,
32    /// Circuit breaker timeout before retry
33    pub circuit_breaker_timeout: Duration,
34}
35
36impl StreamingProcessorConfig {
37    /// Get bar channel capacity from environment or use default (10K)
38    /// Issue #96 Task #6: OPENDEVIATIONBAR_MAX_PENDING_BARS env var support
39    fn get_bar_channel_capacity() -> usize {
40        std::env::var("OPENDEVIATIONBAR_MAX_PENDING_BARS")
41            .ok()
42            .and_then(|v| v.parse::<usize>().ok())
43            .unwrap_or(10_000)
44    }
45}
46
47impl Default for StreamingProcessorConfig {
48    fn default() -> Self {
49        Self {
50            trade_channel_capacity: 5_000, // Based on consensus analysis
51            bar_channel_capacity: StreamingProcessorConfig::get_bar_channel_capacity(), // Issue #96: 10K backpressure bound
52            memory_threshold_bytes: 100_000_000, // 100MB limit
53            backpressure_timeout: Duration::from_millis(100),
54            circuit_breaker_threshold: 0.5, // 50% error rate
55            circuit_breaker_timeout: Duration::from_secs(30),
56        }
57    }
58}
59
60/// Production streaming processor with bounded memory
61pub struct StreamingProcessor {
62    /// Open deviation bar processor (single instance, no accumulation)
63    processor: ExportOpenDeviationBarProcessor,
64
65    /// Threshold in decimal basis points for recreating processor
66    _threshold_decimal_bps: u32,
67
68    /// Bounded channel for incoming trades
69    trade_sender: Option<mpsc::Sender<AggTrade>>,
70    trade_receiver: mpsc::Receiver<AggTrade>,
71
72    /// Bounded channel for outgoing bars
73    bar_sender: mpsc::Sender<OpenDeviationBar>,
74    bar_receiver: Option<mpsc::Receiver<OpenDeviationBar>>,
75
76    /// Configuration
77    config: StreamingProcessorConfig,
78
79    /// Metrics
80    metrics: Arc<StreamingMetrics>,
81
82    /// Circuit breaker state
83    circuit_breaker: CircuitBreaker,
84}
85
86/// Circuit breaker implementation
87#[derive(Debug)]
88struct CircuitBreaker {
89    state: CircuitBreakerState,
90    failure_count: u64,
91    success_count: u64,
92    last_failure_time: Option<Instant>,
93    threshold: f64,
94    timeout: Duration,
95}
96
97#[derive(Debug, PartialEq)]
98enum CircuitBreakerState {
99    Closed,
100    Open,
101    HalfOpen,
102}
103
104/// Streaming metrics for observability
105/// Issue #96 Task #6: Extended with queue depth and block time tracking
106#[derive(Debug, Default)]
107pub struct StreamingMetrics {
108    pub trades_processed: AtomicU64,
109    pub bars_generated: AtomicU64,
110    pub errors_total: AtomicU64,
111    pub backpressure_events: AtomicU64,
112    pub circuit_breaker_trips: AtomicU64,
113    pub memory_usage_bytes: AtomicU64,
114    pub max_queue_depth: AtomicU64, // Issue #96 Task #6: Max observed queue depth
115    pub total_block_time_ms: AtomicU64, // Issue #96 Task #6: Accumulated block time
116}
117
118impl StreamingProcessor {
119    /// Create new production streaming processor
120    pub fn new(
121        threshold_decimal_bps: u32,
122    ) -> Result<Self, opendeviationbar_core::processor::ProcessingError> {
123        Self::with_config(threshold_decimal_bps, StreamingProcessorConfig::default())
124    }
125
126    /// Create with custom configuration
127    pub fn with_config(
128        threshold_decimal_bps: u32,
129        config: StreamingProcessorConfig,
130    ) -> Result<Self, opendeviationbar_core::processor::ProcessingError> {
131        let (trade_sender, trade_receiver) = mpsc::channel(config.trade_channel_capacity);
132        let (bar_sender, bar_receiver) = mpsc::channel(config.bar_channel_capacity);
133
134        let circuit_breaker_threshold = config.circuit_breaker_threshold;
135        let circuit_breaker_timeout = config.circuit_breaker_timeout;
136
137        Ok(Self {
138            processor: ExportOpenDeviationBarProcessor::new(threshold_decimal_bps)?,
139            _threshold_decimal_bps: threshold_decimal_bps,
140            trade_sender: Some(trade_sender),
141            trade_receiver,
142            bar_sender,
143            bar_receiver: Some(bar_receiver),
144            config,
145            metrics: Arc::new(StreamingMetrics::default()),
146            circuit_breaker: CircuitBreaker::new(
147                circuit_breaker_threshold,
148                circuit_breaker_timeout,
149            ),
150        })
151    }
152
153    /// Get trade sender for external components
154    pub fn trade_sender(&mut self) -> Option<mpsc::Sender<AggTrade>> {
155        self.trade_sender.take()
156    }
157
158    /// Get bar receiver for external components
159    pub fn bar_receiver(&mut self) -> Option<mpsc::Receiver<OpenDeviationBar>> {
160        self.bar_receiver.take()
161    }
162
163    /// Start processing loop (bounded memory, infinite capability)
164    pub async fn start_processing(&mut self) -> Result<(), StreamingError> {
165        loop {
166            // Check circuit breaker state
167            if !self.circuit_breaker.can_process() {
168                tokio::time::sleep(Duration::from_millis(100)).await;
169                continue;
170            }
171
172            // Receive trade with timeout (prevents blocking forever)
173            let trade = match tokio::time::timeout(
174                self.config.backpressure_timeout,
175                self.trade_receiver.recv(),
176            )
177            .await
178            {
179                Ok(Some(trade)) => trade,
180                Ok(None) => {
181                    // Channel closed - send final incomplete bar if exists
182                    if let Some(final_bar) = self.processor.get_incomplete_bar()
183                        && let Err(e) = self.send_bar_with_backpressure(final_bar).await
184                    {
185                        println!("Failed to send final incomplete bar: {:?}", e);
186                    }
187                    break;
188                }
189                Err(_) => continue, // Timeout, check circuit breaker again
190            };
191
192            // Process single trade (use borrowed reference per Issue #96 Task #78)
193            match self.process_single_trade(&trade).await {
194                Ok(bar_opt) => {
195                    self.circuit_breaker.record_success();
196
197                    // If bar completed, send with backpressure handling
198                    if let Some(bar) = bar_opt
199                        && let Err(e) = self.send_bar_with_backpressure(bar).await
200                    {
201                        println!("Failed to send bar: {:?}", e);
202                        self.circuit_breaker.record_failure();
203                    }
204                }
205                Err(e) => {
206                    println!("Trade processing error: {:?}", e);
207                    self.circuit_breaker.record_failure();
208                    self.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
209                }
210            }
211        }
212
213        Ok(())
214    }
215
216    /// Process single trade - extracts completed bars without accumulation
217    // Issue #96 Task #78: Accept borrowed AggTrade reference
218    async fn process_single_trade(
219        &mut self,
220        trade: &AggTrade,
221    ) -> Result<Option<OpenDeviationBar>, StreamingError> {
222        // Update metrics
223        self.metrics
224            .trades_processed
225            .fetch_add(1, Ordering::Relaxed);
226
227        // Process trade using existing algorithm (single trade at a time)
228        self.processor
229            .process_trades_continuously(std::slice::from_ref(trade));
230
231        // Extract completed bars immediately (prevents accumulation)
232        let mut completed_bars = self.processor.get_all_completed_bars();
233
234        if !completed_bars.is_empty() {
235            // Bounded memory: only return first completed bar
236            // Additional bars would be rare edge cases but must be handled
237            let completed_bar = completed_bars.remove(0);
238
239            // Handle rare case of multiple completions
240            if !completed_bars.is_empty() {
241                println!(
242                    "Warning: {} additional bars completed, dropping for bounded memory",
243                    completed_bars.len()
244                );
245                self.metrics
246                    .backpressure_events
247                    .fetch_add(completed_bars.len() as u64, Ordering::Relaxed);
248            }
249
250            self.metrics.bars_generated.fetch_add(1, Ordering::Relaxed);
251            Ok(Some(completed_bar))
252        } else {
253            Ok(None)
254        }
255    }
256
257    /// Send bar with backpressure handling
258    async fn send_bar_with_backpressure(
259        &self,
260        bar: OpenDeviationBar,
261    ) -> Result<(), StreamingError> {
262        // Use try_send for immediate check, then send for blocking
263        match self.bar_sender.try_send(bar.clone()) {
264            Ok(()) => Ok(()),
265            Err(mpsc::error::TrySendError::Full(_)) => {
266                // Apply backpressure - channel is full
267                println!("Bar channel full, applying backpressure");
268                self.metrics
269                    .backpressure_events
270                    .fetch_add(1, Ordering::Relaxed);
271
272                // Wait for capacity with blocking send
273                self.bar_sender
274                    .send(bar)
275                    .await
276                    .map_err(|_| StreamingError::ChannelClosed)
277            }
278            Err(mpsc::error::TrySendError::Closed(_)) => Err(StreamingError::ChannelClosed),
279        }
280    }
281
282    /// Get current metrics
283    pub fn metrics(&self) -> &StreamingMetrics {
284        &self.metrics
285    }
286
287    /// Extract final incomplete bar when stream ends (for algorithmic consistency)
288    pub fn get_final_incomplete_bar(&mut self) -> Option<OpenDeviationBar> {
289        self.processor.get_incomplete_bar()
290    }
291
292    /// Check memory usage against threshold
293    pub fn check_memory_usage(&self) -> bool {
294        let current_usage = self.metrics.memory_usage_bytes.load(Ordering::Relaxed);
295        current_usage < self.config.memory_threshold_bytes as u64
296    }
297}
298
299impl CircuitBreaker {
300    fn new(threshold: f64, timeout: Duration) -> Self {
301        Self {
302            state: CircuitBreakerState::Closed,
303            failure_count: 0,
304            success_count: 0,
305            last_failure_time: None,
306            threshold,
307            timeout,
308        }
309    }
310
311    fn can_process(&mut self) -> bool {
312        match self.state {
313            CircuitBreakerState::Closed => true,
314            CircuitBreakerState::Open => {
315                if let Some(last_failure) = self.last_failure_time {
316                    if last_failure.elapsed() >= self.timeout {
317                        self.state = CircuitBreakerState::HalfOpen;
318                        true
319                    } else {
320                        false
321                    }
322                } else {
323                    true
324                }
325            }
326            CircuitBreakerState::HalfOpen => true,
327        }
328    }
329
330    fn record_success(&mut self) {
331        self.success_count += 1;
332
333        if self.state == CircuitBreakerState::HalfOpen {
334            // Successful request in half-open, close circuit
335            self.state = CircuitBreakerState::Closed;
336            self.failure_count = 0;
337        }
338    }
339
340    fn record_failure(&mut self) {
341        self.failure_count += 1;
342        self.last_failure_time = Some(Instant::now());
343
344        let total_requests = self.failure_count + self.success_count;
345        if total_requests >= 10 {
346            // Minimum sample size
347            let failure_rate = self.failure_count as f64 / total_requests as f64;
348
349            if failure_rate >= self.threshold {
350                self.state = CircuitBreakerState::Open;
351            }
352        }
353    }
354}
355
356/// Stream implementation for open deviation bars (true streaming)
357pub struct OpenDeviationBarStream {
358    receiver: mpsc::Receiver<OpenDeviationBar>,
359}
360
361impl OpenDeviationBarStream {
362    pub fn new(receiver: mpsc::Receiver<OpenDeviationBar>) -> Self {
363        Self { receiver }
364    }
365}
366
367impl Stream for OpenDeviationBarStream {
368    type Item = Result<OpenDeviationBar, StreamingError>;
369
370    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371        match self.receiver.poll_recv(cx) {
372            Poll::Ready(Some(bar)) => Poll::Ready(Some(Ok(bar))),
373            Poll::Ready(None) => Poll::Ready(None),
374            Poll::Pending => Poll::Pending,
375        }
376    }
377}
378
379/// Streaming errors
380#[derive(Debug, thiserror::Error)]
381pub enum StreamingError {
382    #[error("Channel closed")]
383    ChannelClosed,
384
385    #[error("Backpressure timeout")]
386    BackpressureTimeout,
387
388    #[error("Circuit breaker open")]
389    CircuitBreakerOpen,
390
391    #[error("Memory threshold exceeded")]
392    MemoryThresholdExceeded,
393
394    #[error("Processing error: {0}")]
395    ProcessingError(String),
396}
397
398impl StreamingMetrics {
399    /// Get metrics summary
400    pub fn summary(&self) -> MetricsSummary {
401        MetricsSummary {
402            trades_processed: self.trades_processed.load(Ordering::Relaxed),
403            bars_generated: self.bars_generated.load(Ordering::Relaxed),
404            errors_total: self.errors_total.load(Ordering::Relaxed),
405            backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
406            circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
407            memory_usage_bytes: self.memory_usage_bytes.load(Ordering::Relaxed),
408        }
409    }
410}
411
412/// Metrics snapshot
413#[derive(Debug, Clone)]
414pub struct MetricsSummary {
415    pub trades_processed: u64,
416    pub bars_generated: u64,
417    pub errors_total: u64,
418    pub backpressure_events: u64,
419    pub circuit_breaker_trips: u64,
420    pub memory_usage_bytes: u64,
421}
422
423impl MetricsSummary {
424    /// Calculate bars per aggTrade ratio
425    pub fn bars_per_aggtrade(&self) -> f64 {
426        if self.trades_processed > 0 {
427            self.bars_generated as f64 / self.trades_processed as f64
428        } else {
429            0.0
430        }
431    }
432
433    /// Calculate error rate
434    pub fn error_rate(&self) -> f64 {
435        if self.trades_processed > 0 {
436            self.errors_total as f64 / self.trades_processed as f64
437        } else {
438            0.0
439        }
440    }
441
442    /// Format memory usage
443    pub fn memory_usage_mb(&self) -> f64 {
444        self.memory_usage_bytes as f64 / 1_000_000.0
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use opendeviationbar_core::FixedPoint;
452
453    fn create_test_trade(id: u64, price: f64, timestamp: u64) -> AggTrade {
454        let price_str = format!("{:.8}", price);
455        AggTrade {
456            agg_trade_id: id as i64,
457            price: FixedPoint::from_str(&price_str).unwrap(),
458            volume: FixedPoint::from_str("1.0").unwrap(),
459            first_trade_id: id as i64,
460            last_trade_id: id as i64,
461            timestamp: timestamp as i64,
462            is_buyer_maker: false,
463            is_best_match: None,
464        }
465    }
466
467    #[tokio::test]
468    async fn test_bounded_memory_streaming() {
469        let mut processor = StreamingProcessor::new(25).unwrap(); // 0.25% threshold
470
471        // Test that memory remains bounded
472        let initial_metrics = processor.metrics().summary();
473
474        // Send 1000 trades
475        for i in 0..1000 {
476            let trade = create_test_trade(i, 23000.0 + (i as f64), 1659312000000 + i);
477            if let Ok(bar_opt) = processor.process_single_trade(&trade).await {
478                // Verify no accumulation - at most one bar per aggTrade
479                assert!(bar_opt.is_none() || bar_opt.is_some());
480            }
481        }
482
483        let final_metrics = processor.metrics().summary();
484        assert!(final_metrics.trades_processed >= initial_metrics.trades_processed);
485        assert!(final_metrics.trades_processed <= 1000);
486    }
487
488    #[tokio::test]
489    async fn test_circuit_breaker() {
490        let mut circuit_breaker = CircuitBreaker::new(0.5, Duration::from_millis(100));
491
492        // Initially closed
493        assert!(circuit_breaker.can_process());
494
495        // Record failures
496        for _ in 0..20 {
497            circuit_breaker.record_failure();
498        }
499
500        // Should open after 50% failure rate
501        assert_eq!(circuit_breaker.state, CircuitBreakerState::Open);
502        assert!(!circuit_breaker.can_process());
503
504        // Wait for timeout
505        tokio::time::sleep(Duration::from_millis(150)).await;
506
507        // Should transition to half-open
508        assert!(circuit_breaker.can_process());
509
510        // Record success
511        circuit_breaker.record_success();
512
513        // Should close
514        assert_eq!(circuit_breaker.state, CircuitBreakerState::Closed);
515    }
516
517    // === Circuit Breaker State Machine Tests ===
518
519    #[test]
520    fn test_circuit_breaker_stays_closed_below_threshold() {
521        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(10));
522
523        // 8 successes, 2 failures = 20% failure rate, below 50% threshold
524        for _ in 0..8 {
525            cb.record_success();
526        }
527        for _ in 0..2 {
528            cb.record_failure();
529        }
530
531        // Should remain closed (20% < 50%)
532        assert_eq!(cb.state, CircuitBreakerState::Closed);
533        assert!(cb.can_process());
534    }
535
536    #[test]
537    fn test_circuit_breaker_minimum_sample_size() {
538        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(10));
539
540        // 9 failures, 0 successes = 100% failure rate, but only 9 requests (< 10 minimum)
541        for _ in 0..9 {
542            cb.record_failure();
543        }
544
545        // Should remain closed (minimum sample size not met)
546        assert_eq!(cb.state, CircuitBreakerState::Closed);
547        assert!(cb.can_process());
548
549        // 10th failure triggers open
550        cb.record_failure();
551        assert_eq!(cb.state, CircuitBreakerState::Open);
552    }
553
554    #[test]
555    fn test_circuit_breaker_halfopen_failure_reopens() {
556        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(0));
557
558        // Trip the breaker: 10 failures opens it
559        for _ in 0..10 {
560            cb.record_failure();
561        }
562        assert_eq!(cb.state, CircuitBreakerState::Open);
563
564        // Zero-second timeout → immediately transitions to HalfOpen on can_process
565        assert!(cb.can_process());
566        assert_eq!(cb.state, CircuitBreakerState::HalfOpen);
567
568        // Record failure in HalfOpen → should re-open
569        // (failure_count accumulates, total >= 10, rate >= threshold)
570        cb.record_failure();
571        assert_eq!(cb.state, CircuitBreakerState::Open);
572    }
573
574    #[test]
575    fn test_circuit_breaker_closed_resets_failure_count() {
576        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(0));
577
578        // Trip the breaker
579        for _ in 0..10 {
580            cb.record_failure();
581        }
582        assert_eq!(cb.state, CircuitBreakerState::Open);
583
584        // Transition to HalfOpen
585        assert!(cb.can_process());
586        assert_eq!(cb.state, CircuitBreakerState::HalfOpen);
587
588        // Record success → closes and resets failure_count
589        cb.record_success();
590        assert_eq!(cb.state, CircuitBreakerState::Closed);
591        assert_eq!(cb.failure_count, 0);
592    }
593
594    #[test]
595    fn test_circuit_breaker_open_blocks_until_timeout() {
596        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(3600)); // 1 hour timeout
597
598        // Trip the breaker
599        for _ in 0..10 {
600            cb.record_failure();
601        }
602
603        // Should be blocked — timeout hasn't elapsed
604        assert!(!cb.can_process());
605        assert_eq!(cb.state, CircuitBreakerState::Open);
606    }
607
608    #[test]
609    fn test_metrics_zero_trades() {
610        let metrics = MetricsSummary {
611            trades_processed: 0,
612            bars_generated: 0,
613            errors_total: 0,
614            backpressure_events: 0,
615            circuit_breaker_trips: 0,
616            memory_usage_bytes: 0,
617        };
618
619        // Division by zero guarded
620        assert_eq!(metrics.bars_per_aggtrade(), 0.0);
621        assert_eq!(metrics.error_rate(), 0.0);
622        assert_eq!(metrics.memory_usage_mb(), 0.0);
623    }
624
625    #[test]
626    fn test_metrics_calculations() {
627        let metrics = MetricsSummary {
628            trades_processed: 1000,
629            bars_generated: 50,
630            errors_total: 5,
631            backpressure_events: 2,
632            circuit_breaker_trips: 1,
633            memory_usage_bytes: 50_000_000,
634        };
635
636        assert_eq!(metrics.bars_per_aggtrade(), 0.05);
637        assert_eq!(metrics.error_rate(), 0.005);
638        assert_eq!(metrics.memory_usage_mb(), 50.0);
639    }
640
641    // === Metrics Snapshot & Take-Once Tests (Issue #96 Task #114) ===
642
643    #[test]
644    fn test_streaming_metrics_summary_snapshot() {
645        let metrics = StreamingMetrics::default();
646        metrics.trades_processed.store(500, Ordering::Relaxed);
647        metrics.bars_generated.store(25, Ordering::Relaxed);
648        metrics.errors_total.store(3, Ordering::Relaxed);
649        metrics.backpressure_events.store(1, Ordering::Relaxed);
650        metrics.circuit_breaker_trips.store(0, Ordering::Relaxed);
651        metrics
652            .memory_usage_bytes
653            .store(42_000_000, Ordering::Relaxed);
654
655        let summary = metrics.summary();
656
657        assert_eq!(summary.trades_processed, 500);
658        assert_eq!(summary.bars_generated, 25);
659        assert_eq!(summary.errors_total, 3);
660        assert_eq!(summary.backpressure_events, 1);
661        assert_eq!(summary.circuit_breaker_trips, 0);
662        assert_eq!(summary.memory_usage_bytes, 42_000_000);
663    }
664
665    #[test]
666    fn test_memory_usage_mb_conversion() {
667        // Exact MB boundary
668        let m1 = MetricsSummary {
669            trades_processed: 0,
670            bars_generated: 0,
671            errors_total: 0,
672            backpressure_events: 0,
673            circuit_breaker_trips: 0,
674            memory_usage_bytes: 1_000_000,
675        };
676        assert_eq!(m1.memory_usage_mb(), 1.0);
677
678        // Fractional MB
679        let m2 = MetricsSummary {
680            trades_processed: 0,
681            bars_generated: 0,
682            errors_total: 0,
683            backpressure_events: 0,
684            circuit_breaker_trips: 0,
685            memory_usage_bytes: 1_500_000,
686        };
687        assert_eq!(m2.memory_usage_mb(), 1.5);
688
689        // Large value (4 GB)
690        let m3 = MetricsSummary {
691            trades_processed: 0,
692            bars_generated: 0,
693            errors_total: 0,
694            backpressure_events: 0,
695            circuit_breaker_trips: 0,
696            memory_usage_bytes: 4_000_000_000,
697        };
698        assert_eq!(m3.memory_usage_mb(), 4000.0);
699    }
700
701    #[test]
702    fn test_trade_sender_take_once() {
703        let mut processor = StreamingProcessor::new(25).unwrap();
704
705        // First call returns Some
706        let sender = processor.trade_sender();
707        assert!(
708            sender.is_some(),
709            "First trade_sender() call must return Some"
710        );
711
712        // Second call returns None (already taken)
713        let sender2 = processor.trade_sender();
714        assert!(
715            sender2.is_none(),
716            "Second trade_sender() call must return None"
717        );
718    }
719
720    #[test]
721    fn test_bar_receiver_take_once() {
722        let mut processor = StreamingProcessor::new(25).unwrap();
723
724        // First call returns Some
725        let receiver = processor.bar_receiver();
726        assert!(
727            receiver.is_some(),
728            "First bar_receiver() call must return Some"
729        );
730
731        // Second call returns None (already taken)
732        let receiver2 = processor.bar_receiver();
733        assert!(
734            receiver2.is_none(),
735            "Second bar_receiver() call must return None"
736        );
737    }
738
739    #[test]
740    fn test_check_memory_usage_below_threshold() {
741        let processor = StreamingProcessor::new(25).unwrap();
742
743        // Default: memory_usage_bytes = 0, threshold = 100MB → within bounds
744        assert!(
745            processor.check_memory_usage(),
746            "Zero memory usage should be within threshold"
747        );
748    }
749
750    #[test]
751    fn test_check_memory_usage_above_threshold() {
752        let processor = StreamingProcessor::new(25).unwrap();
753
754        // Simulate exceeding threshold (100MB default)
755        processor
756            .metrics
757            .memory_usage_bytes
758            .store(200_000_000, Ordering::Relaxed);
759        assert!(
760            !processor.check_memory_usage(),
761            "200MB should exceed 100MB threshold"
762        );
763    }
764
765    #[test]
766    fn test_get_final_incomplete_bar_empty() {
767        let mut processor = StreamingProcessor::new(25).unwrap();
768
769        // No trades processed → no incomplete bar
770        let bar = processor.get_final_incomplete_bar();
771        assert!(bar.is_none(), "No incomplete bar before any trades");
772    }
773
774    #[test]
775    fn test_bars_per_aggtrade_ratio() {
776        let metrics = MetricsSummary {
777            trades_processed: 200,
778            bars_generated: 10,
779            errors_total: 0,
780            backpressure_events: 0,
781            circuit_breaker_trips: 0,
782            memory_usage_bytes: 0,
783        };
784
785        assert_eq!(metrics.bars_per_aggtrade(), 0.05);
786        assert_eq!(metrics.error_rate(), 0.0);
787    }
788}