Skip to main content

rangebar_streaming/
processor.rs

1use futures::Stream;
2/// Production-ready streaming architecture with bounded memory and backpressure
3/// # FILE-SIZE-OK
4///
5/// This module implements true infinite streaming capabilities addressing critical failures:
6/// - Eliminates Vec<RangeBar> accumulation (unbounded memory growth)
7/// - Implements proper backpressure with bounded channels
8/// - Provides circuit breaker resilience patterns
9/// - Maintains temporal integrity for financial data
10use rangebar_core::processor::ExportRangeBarProcessor;
11use rangebar_core::{AggTrade, RangeBar};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::task::{Context, Poll};
16use tokio::sync::mpsc;
17use tokio::time::{Duration, Instant};
18
19/// Configuration for production streaming
20#[derive(Debug, Clone)]
21pub struct StreamingProcessorConfig {
22    /// Channel capacity for trade input
23    pub trade_channel_capacity: usize,
24    /// Channel capacity for completed bars
25    pub bar_channel_capacity: usize,
26    /// Memory usage threshold in bytes
27    pub memory_threshold_bytes: usize,
28    /// Backpressure timeout
29    pub backpressure_timeout: Duration,
30    /// Circuit breaker error rate threshold (0.0-1.0)
31    pub circuit_breaker_threshold: f64,
32    /// Circuit breaker timeout before retry
33    pub circuit_breaker_timeout: Duration,
34}
35
36impl StreamingProcessorConfig {
37    /// Get bar channel capacity from environment or use default (10K)
38    /// Issue #96 Task #6: RANGEBAR_MAX_PENDING_BARS env var support
39    fn get_bar_channel_capacity() -> usize {
40        std::env::var("RANGEBAR_MAX_PENDING_BARS")
41            .ok()
42            .and_then(|v| v.parse::<usize>().ok())
43            .unwrap_or(10_000)
44    }
45}
46
47impl Default for StreamingProcessorConfig {
48    fn default() -> Self {
49        Self {
50            trade_channel_capacity: 5_000,                              // Based on consensus analysis
51            bar_channel_capacity: StreamingProcessorConfig::get_bar_channel_capacity(), // Issue #96: 10K backpressure bound
52            memory_threshold_bytes: 100_000_000,                        // 100MB limit
53            backpressure_timeout: Duration::from_millis(100),
54            circuit_breaker_threshold: 0.5, // 50% error rate
55            circuit_breaker_timeout: Duration::from_secs(30),
56        }
57    }
58}
59
60/// Production streaming processor with bounded memory
61pub struct StreamingProcessor {
62    /// Range bar processor (single instance, no accumulation)
63    processor: ExportRangeBarProcessor,
64
65    /// Threshold in decimal basis points for recreating processor
66    #[allow(dead_code)]
67    threshold_decimal_bps: u32,
68
69    /// Bounded channel for incoming trades
70    trade_sender: Option<mpsc::Sender<AggTrade>>,
71    trade_receiver: mpsc::Receiver<AggTrade>,
72
73    /// Bounded channel for outgoing bars
74    bar_sender: mpsc::Sender<RangeBar>,
75    bar_receiver: Option<mpsc::Receiver<RangeBar>>,
76
77    /// Configuration
78    config: StreamingProcessorConfig,
79
80    /// Metrics
81    metrics: Arc<StreamingMetrics>,
82
83    /// Circuit breaker state
84    circuit_breaker: CircuitBreaker,
85}
86
87/// Circuit breaker implementation
88#[derive(Debug)]
89struct CircuitBreaker {
90    state: CircuitBreakerState,
91    failure_count: u64,
92    success_count: u64,
93    last_failure_time: Option<Instant>,
94    threshold: f64,
95    timeout: Duration,
96}
97
98#[derive(Debug, PartialEq)]
99enum CircuitBreakerState {
100    Closed,
101    Open,
102    HalfOpen,
103}
104
105/// Streaming metrics for observability
106/// Issue #96 Task #6: Extended with queue depth and block time tracking
107#[derive(Debug, Default)]
108pub struct StreamingMetrics {
109    pub trades_processed: AtomicU64,
110    pub bars_generated: AtomicU64,
111    pub errors_total: AtomicU64,
112    pub backpressure_events: AtomicU64,
113    pub circuit_breaker_trips: AtomicU64,
114    pub memory_usage_bytes: AtomicU64,
115    pub max_queue_depth: AtomicU64,       // Issue #96 Task #6: Max observed queue depth
116    pub total_block_time_ms: AtomicU64,   // Issue #96 Task #6: Accumulated block time
117}
118
119impl StreamingProcessor {
120    /// Create new production streaming processor
121    pub fn new(
122        threshold_decimal_bps: u32,
123    ) -> Result<Self, rangebar_core::processor::ProcessingError> {
124        Self::with_config(threshold_decimal_bps, StreamingProcessorConfig::default())
125    }
126
127    /// Create with custom configuration
128    pub fn with_config(
129        threshold_decimal_bps: u32,
130        config: StreamingProcessorConfig,
131    ) -> Result<Self, rangebar_core::processor::ProcessingError> {
132        let (trade_sender, trade_receiver) = mpsc::channel(config.trade_channel_capacity);
133        let (bar_sender, bar_receiver) = mpsc::channel(config.bar_channel_capacity);
134
135        let circuit_breaker_threshold = config.circuit_breaker_threshold;
136        let circuit_breaker_timeout = config.circuit_breaker_timeout;
137
138        Ok(Self {
139            processor: ExportRangeBarProcessor::new(threshold_decimal_bps)?,
140            threshold_decimal_bps,
141            trade_sender: Some(trade_sender),
142            trade_receiver,
143            bar_sender,
144            bar_receiver: Some(bar_receiver),
145            config,
146            metrics: Arc::new(StreamingMetrics::default()),
147            circuit_breaker: CircuitBreaker::new(
148                circuit_breaker_threshold,
149                circuit_breaker_timeout,
150            ),
151        })
152    }
153
154    /// Get trade sender for external components
155    pub fn trade_sender(&mut self) -> Option<mpsc::Sender<AggTrade>> {
156        self.trade_sender.take()
157    }
158
159    /// Get bar receiver for external components
160    pub fn bar_receiver(&mut self) -> Option<mpsc::Receiver<RangeBar>> {
161        self.bar_receiver.take()
162    }
163
164    /// Start processing loop (bounded memory, infinite capability)
165    pub async fn start_processing(&mut self) -> Result<(), StreamingError> {
166        loop {
167            // Check circuit breaker state
168            if !self.circuit_breaker.can_process() {
169                tokio::time::sleep(Duration::from_millis(100)).await;
170                continue;
171            }
172
173            // Receive trade with timeout (prevents blocking forever)
174            let trade = match tokio::time::timeout(
175                self.config.backpressure_timeout,
176                self.trade_receiver.recv(),
177            )
178            .await
179            {
180                Ok(Some(trade)) => trade,
181                Ok(None) => {
182                    // Channel closed - send final incomplete bar if exists
183                    if let Some(final_bar) = self.processor.get_incomplete_bar()
184                        && let Err(e) = self.send_bar_with_backpressure(final_bar).await
185                    {
186                        println!("Failed to send final incomplete bar: {:?}", e);
187                    }
188                    break;
189                }
190                Err(_) => continue, // Timeout, check circuit breaker again
191            };
192
193            // Process single trade (use borrowed reference per Issue #96 Task #78)
194            match self.process_single_trade(&trade).await {
195                Ok(bar_opt) => {
196                    self.circuit_breaker.record_success();
197
198                    // If bar completed, send with backpressure handling
199                    if let Some(bar) = bar_opt
200                        && let Err(e) = self.send_bar_with_backpressure(bar).await
201                    {
202                        println!("Failed to send bar: {:?}", e);
203                        self.circuit_breaker.record_failure();
204                    }
205                }
206                Err(e) => {
207                    println!("Trade processing error: {:?}", e);
208                    self.circuit_breaker.record_failure();
209                    self.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
210                }
211            }
212        }
213
214        Ok(())
215    }
216
217    /// Process single trade - extracts completed bars without accumulation
218    // Issue #96 Task #78: Accept borrowed AggTrade reference
219    async fn process_single_trade(
220        &mut self,
221        trade: &AggTrade,
222    ) -> Result<Option<RangeBar>, StreamingError> {
223        // Update metrics
224        self.metrics
225            .trades_processed
226            .fetch_add(1, Ordering::Relaxed);
227
228        // Process trade using existing algorithm (single trade at a time)
229        // Note: Clone is acceptable here since this is the bounded-memory streaming processor
230        // (not the main hot path; LiveBarEngine uses non-cloning path via live_engine.rs)
231        self.processor.process_trades_continuously(&[trade.clone()]);
232
233        // Extract completed bars immediately (prevents accumulation)
234        let mut completed_bars = self.processor.get_all_completed_bars();
235
236        if !completed_bars.is_empty() {
237            // Bounded memory: only return first completed bar
238            // Additional bars would be rare edge cases but must be handled
239            let completed_bar = completed_bars.remove(0);
240
241            // Handle rare case of multiple completions
242            if !completed_bars.is_empty() {
243                println!(
244                    "Warning: {} additional bars completed, dropping for bounded memory",
245                    completed_bars.len()
246                );
247                self.metrics
248                    .backpressure_events
249                    .fetch_add(completed_bars.len() as u64, Ordering::Relaxed);
250            }
251
252            self.metrics.bars_generated.fetch_add(1, Ordering::Relaxed);
253            Ok(Some(completed_bar))
254        } else {
255            Ok(None)
256        }
257    }
258
259    /// Send bar with backpressure handling
260    async fn send_bar_with_backpressure(&self, bar: RangeBar) -> Result<(), StreamingError> {
261        // Use try_send for immediate check, then send for blocking
262        match self.bar_sender.try_send(bar.clone()) {
263            Ok(()) => Ok(()),
264            Err(mpsc::error::TrySendError::Full(_)) => {
265                // Apply backpressure - channel is full
266                println!("Bar channel full, applying backpressure");
267                self.metrics
268                    .backpressure_events
269                    .fetch_add(1, Ordering::Relaxed);
270
271                // Wait for capacity with blocking send
272                self.bar_sender
273                    .send(bar)
274                    .await
275                    .map_err(|_| StreamingError::ChannelClosed)
276            }
277            Err(mpsc::error::TrySendError::Closed(_)) => Err(StreamingError::ChannelClosed),
278        }
279    }
280
281    /// Get current metrics
282    pub fn metrics(&self) -> &StreamingMetrics {
283        &self.metrics
284    }
285
286    /// Extract final incomplete bar when stream ends (for algorithmic consistency)
287    pub fn get_final_incomplete_bar(&mut self) -> Option<RangeBar> {
288        self.processor.get_incomplete_bar()
289    }
290
291    /// Check memory usage against threshold
292    pub fn check_memory_usage(&self) -> bool {
293        let current_usage = self.metrics.memory_usage_bytes.load(Ordering::Relaxed);
294        current_usage < self.config.memory_threshold_bytes as u64
295    }
296}
297
298impl CircuitBreaker {
299    fn new(threshold: f64, timeout: Duration) -> Self {
300        Self {
301            state: CircuitBreakerState::Closed,
302            failure_count: 0,
303            success_count: 0,
304            last_failure_time: None,
305            threshold,
306            timeout,
307        }
308    }
309
310    fn can_process(&mut self) -> bool {
311        match self.state {
312            CircuitBreakerState::Closed => true,
313            CircuitBreakerState::Open => {
314                if let Some(last_failure) = self.last_failure_time {
315                    if last_failure.elapsed() > self.timeout {
316                        self.state = CircuitBreakerState::HalfOpen;
317                        true
318                    } else {
319                        false
320                    }
321                } else {
322                    true
323                }
324            }
325            CircuitBreakerState::HalfOpen => true,
326        }
327    }
328
329    fn record_success(&mut self) {
330        self.success_count += 1;
331
332        if self.state == CircuitBreakerState::HalfOpen {
333            // Successful request in half-open, close circuit
334            self.state = CircuitBreakerState::Closed;
335            self.failure_count = 0;
336        }
337    }
338
339    fn record_failure(&mut self) {
340        self.failure_count += 1;
341        self.last_failure_time = Some(Instant::now());
342
343        let total_requests = self.failure_count + self.success_count;
344        if total_requests >= 10 {
345            // Minimum sample size
346            let failure_rate = self.failure_count as f64 / total_requests as f64;
347
348            if failure_rate >= self.threshold {
349                self.state = CircuitBreakerState::Open;
350            }
351        }
352    }
353}
354
355/// Stream implementation for range bars (true streaming)
356pub struct RangeBarStream {
357    receiver: mpsc::Receiver<RangeBar>,
358}
359
360impl RangeBarStream {
361    pub fn new(receiver: mpsc::Receiver<RangeBar>) -> Self {
362        Self { receiver }
363    }
364}
365
366impl Stream for RangeBarStream {
367    type Item = Result<RangeBar, StreamingError>;
368
369    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
370        match self.receiver.poll_recv(cx) {
371            Poll::Ready(Some(bar)) => Poll::Ready(Some(Ok(bar))),
372            Poll::Ready(None) => Poll::Ready(None),
373            Poll::Pending => Poll::Pending,
374        }
375    }
376}
377
378/// Streaming errors
379#[derive(Debug, thiserror::Error)]
380pub enum StreamingError {
381    #[error("Channel closed")]
382    ChannelClosed,
383
384    #[error("Backpressure timeout")]
385    BackpressureTimeout,
386
387    #[error("Circuit breaker open")]
388    CircuitBreakerOpen,
389
390    #[error("Memory threshold exceeded")]
391    MemoryThresholdExceeded,
392
393    #[error("Processing error: {0}")]
394    ProcessingError(String),
395}
396
397impl StreamingMetrics {
398    /// Get metrics summary
399    pub fn summary(&self) -> MetricsSummary {
400        MetricsSummary {
401            trades_processed: self.trades_processed.load(Ordering::Relaxed),
402            bars_generated: self.bars_generated.load(Ordering::Relaxed),
403            errors_total: self.errors_total.load(Ordering::Relaxed),
404            backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
405            circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
406            memory_usage_bytes: self.memory_usage_bytes.load(Ordering::Relaxed),
407        }
408    }
409}
410
411/// Metrics snapshot
412#[derive(Debug, Clone)]
413pub struct MetricsSummary {
414    pub trades_processed: u64,
415    pub bars_generated: u64,
416    pub errors_total: u64,
417    pub backpressure_events: u64,
418    pub circuit_breaker_trips: u64,
419    pub memory_usage_bytes: u64,
420}
421
422impl MetricsSummary {
423    /// Calculate bars per aggTrade ratio
424    pub fn bars_per_aggtrade(&self) -> f64 {
425        if self.trades_processed > 0 {
426            self.bars_generated as f64 / self.trades_processed as f64
427        } else {
428            0.0
429        }
430    }
431
432    /// Calculate error rate
433    pub fn error_rate(&self) -> f64 {
434        if self.trades_processed > 0 {
435            self.errors_total as f64 / self.trades_processed as f64
436        } else {
437            0.0
438        }
439    }
440
441    /// Format memory usage
442    pub fn memory_usage_mb(&self) -> f64 {
443        self.memory_usage_bytes as f64 / 1_000_000.0
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use rangebar_core::FixedPoint;
451
452    fn create_test_trade(id: u64, price: f64, timestamp: u64) -> AggTrade {
453        let price_str = format!("{:.8}", price);
454        AggTrade {
455            agg_trade_id: id as i64,
456            price: FixedPoint::from_str(&price_str).unwrap(),
457            volume: FixedPoint::from_str("1.0").unwrap(),
458            first_trade_id: id as i64,
459            last_trade_id: id as i64,
460            timestamp: timestamp as i64,
461            is_buyer_maker: false,
462            is_best_match: None,
463        }
464    }
465
466    #[tokio::test]
467    async fn test_bounded_memory_streaming() {
468        let mut processor = StreamingProcessor::new(25).unwrap(); // 0.25% threshold
469
470        // Test that memory remains bounded
471        let initial_metrics = processor.metrics().summary();
472
473        // Send 1000 trades
474        for i in 0..1000 {
475            let trade = create_test_trade(i, 23000.0 + (i as f64), 1659312000000 + i);
476            if let Ok(bar_opt) = processor.process_single_trade(&trade).await {
477                // Verify no accumulation - at most one bar per aggTrade
478                assert!(bar_opt.is_none() || bar_opt.is_some());
479            }
480        }
481
482        let final_metrics = processor.metrics().summary();
483        assert!(final_metrics.trades_processed >= initial_metrics.trades_processed);
484        assert!(final_metrics.trades_processed <= 1000);
485    }
486
487    #[tokio::test]
488    async fn test_circuit_breaker() {
489        let mut circuit_breaker = CircuitBreaker::new(0.5, Duration::from_millis(100));
490
491        // Initially closed
492        assert!(circuit_breaker.can_process());
493
494        // Record failures
495        for _ in 0..20 {
496            circuit_breaker.record_failure();
497        }
498
499        // Should open after 50% failure rate
500        assert_eq!(circuit_breaker.state, CircuitBreakerState::Open);
501        assert!(!circuit_breaker.can_process());
502
503        // Wait for timeout
504        tokio::time::sleep(Duration::from_millis(150)).await;
505
506        // Should transition to half-open
507        assert!(circuit_breaker.can_process());
508
509        // Record success
510        circuit_breaker.record_success();
511
512        // Should close
513        assert_eq!(circuit_breaker.state, CircuitBreakerState::Closed);
514    }
515
516    // === Circuit Breaker State Machine Tests ===
517
518    #[test]
519    fn test_circuit_breaker_stays_closed_below_threshold() {
520        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(10));
521
522        // 8 successes, 2 failures = 20% failure rate, below 50% threshold
523        for _ in 0..8 {
524            cb.record_success();
525        }
526        for _ in 0..2 {
527            cb.record_failure();
528        }
529
530        // Should remain closed (20% < 50%)
531        assert_eq!(cb.state, CircuitBreakerState::Closed);
532        assert!(cb.can_process());
533    }
534
535    #[test]
536    fn test_circuit_breaker_minimum_sample_size() {
537        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(10));
538
539        // 9 failures, 0 successes = 100% failure rate, but only 9 requests (< 10 minimum)
540        for _ in 0..9 {
541            cb.record_failure();
542        }
543
544        // Should remain closed (minimum sample size not met)
545        assert_eq!(cb.state, CircuitBreakerState::Closed);
546        assert!(cb.can_process());
547
548        // 10th failure triggers open
549        cb.record_failure();
550        assert_eq!(cb.state, CircuitBreakerState::Open);
551    }
552
553    #[test]
554    fn test_circuit_breaker_halfopen_failure_reopens() {
555        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(0));
556
557        // Trip the breaker: 10 failures opens it
558        for _ in 0..10 {
559            cb.record_failure();
560        }
561        assert_eq!(cb.state, CircuitBreakerState::Open);
562
563        // Zero-second timeout → immediately transitions to HalfOpen on can_process
564        assert!(cb.can_process());
565        assert_eq!(cb.state, CircuitBreakerState::HalfOpen);
566
567        // Record failure in HalfOpen → should re-open
568        // (failure_count accumulates, total >= 10, rate >= threshold)
569        cb.record_failure();
570        assert_eq!(cb.state, CircuitBreakerState::Open);
571    }
572
573    #[test]
574    fn test_circuit_breaker_closed_resets_failure_count() {
575        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(0));
576
577        // Trip the breaker
578        for _ in 0..10 {
579            cb.record_failure();
580        }
581        assert_eq!(cb.state, CircuitBreakerState::Open);
582
583        // Transition to HalfOpen
584        assert!(cb.can_process());
585        assert_eq!(cb.state, CircuitBreakerState::HalfOpen);
586
587        // Record success → closes and resets failure_count
588        cb.record_success();
589        assert_eq!(cb.state, CircuitBreakerState::Closed);
590        assert_eq!(cb.failure_count, 0);
591    }
592
593    #[test]
594    fn test_circuit_breaker_open_blocks_until_timeout() {
595        let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(3600)); // 1 hour timeout
596
597        // Trip the breaker
598        for _ in 0..10 {
599            cb.record_failure();
600        }
601
602        // Should be blocked — timeout hasn't elapsed
603        assert!(!cb.can_process());
604        assert_eq!(cb.state, CircuitBreakerState::Open);
605    }
606
607    #[test]
608    fn test_metrics_zero_trades() {
609        let metrics = MetricsSummary {
610            trades_processed: 0,
611            bars_generated: 0,
612            errors_total: 0,
613            backpressure_events: 0,
614            circuit_breaker_trips: 0,
615            memory_usage_bytes: 0,
616        };
617
618        // Division by zero guarded
619        assert_eq!(metrics.bars_per_aggtrade(), 0.0);
620        assert_eq!(metrics.error_rate(), 0.0);
621        assert_eq!(metrics.memory_usage_mb(), 0.0);
622    }
623
624    #[test]
625    fn test_metrics_calculations() {
626        let metrics = MetricsSummary {
627            trades_processed: 1000,
628            bars_generated: 50,
629            errors_total: 5,
630            backpressure_events: 2,
631            circuit_breaker_trips: 1,
632            memory_usage_bytes: 50_000_000,
633        };
634
635        assert_eq!(metrics.bars_per_aggtrade(), 0.05);
636        assert_eq!(metrics.error_rate(), 0.005);
637        assert_eq!(metrics.memory_usage_mb(), 50.0);
638    }
639
640    // === Metrics Snapshot & Take-Once Tests (Issue #96 Task #114) ===
641
642    #[test]
643    fn test_streaming_metrics_summary_snapshot() {
644        let metrics = StreamingMetrics::default();
645        metrics.trades_processed.store(500, Ordering::Relaxed);
646        metrics.bars_generated.store(25, Ordering::Relaxed);
647        metrics.errors_total.store(3, Ordering::Relaxed);
648        metrics.backpressure_events.store(1, Ordering::Relaxed);
649        metrics.circuit_breaker_trips.store(0, Ordering::Relaxed);
650        metrics.memory_usage_bytes.store(42_000_000, Ordering::Relaxed);
651
652        let summary = metrics.summary();
653
654        assert_eq!(summary.trades_processed, 500);
655        assert_eq!(summary.bars_generated, 25);
656        assert_eq!(summary.errors_total, 3);
657        assert_eq!(summary.backpressure_events, 1);
658        assert_eq!(summary.circuit_breaker_trips, 0);
659        assert_eq!(summary.memory_usage_bytes, 42_000_000);
660    }
661
662    #[test]
663    fn test_memory_usage_mb_conversion() {
664        // Exact MB boundary
665        let m1 = MetricsSummary {
666            trades_processed: 0, bars_generated: 0, errors_total: 0,
667            backpressure_events: 0, circuit_breaker_trips: 0,
668            memory_usage_bytes: 1_000_000,
669        };
670        assert_eq!(m1.memory_usage_mb(), 1.0);
671
672        // Fractional MB
673        let m2 = MetricsSummary {
674            trades_processed: 0, bars_generated: 0, errors_total: 0,
675            backpressure_events: 0, circuit_breaker_trips: 0,
676            memory_usage_bytes: 1_500_000,
677        };
678        assert_eq!(m2.memory_usage_mb(), 1.5);
679
680        // Large value (4 GB)
681        let m3 = MetricsSummary {
682            trades_processed: 0, bars_generated: 0, errors_total: 0,
683            backpressure_events: 0, circuit_breaker_trips: 0,
684            memory_usage_bytes: 4_000_000_000,
685        };
686        assert_eq!(m3.memory_usage_mb(), 4000.0);
687    }
688
689    #[test]
690    fn test_trade_sender_take_once() {
691        let mut processor = StreamingProcessor::new(25).unwrap();
692
693        // First call returns Some
694        let sender = processor.trade_sender();
695        assert!(sender.is_some(), "First trade_sender() call must return Some");
696
697        // Second call returns None (already taken)
698        let sender2 = processor.trade_sender();
699        assert!(sender2.is_none(), "Second trade_sender() call must return None");
700    }
701
702    #[test]
703    fn test_bar_receiver_take_once() {
704        let mut processor = StreamingProcessor::new(25).unwrap();
705
706        // First call returns Some
707        let receiver = processor.bar_receiver();
708        assert!(receiver.is_some(), "First bar_receiver() call must return Some");
709
710        // Second call returns None (already taken)
711        let receiver2 = processor.bar_receiver();
712        assert!(receiver2.is_none(), "Second bar_receiver() call must return None");
713    }
714
715    #[test]
716    fn test_check_memory_usage_below_threshold() {
717        let processor = StreamingProcessor::new(25).unwrap();
718
719        // Default: memory_usage_bytes = 0, threshold = 100MB → within bounds
720        assert!(processor.check_memory_usage(), "Zero memory usage should be within threshold");
721    }
722
723    #[test]
724    fn test_check_memory_usage_above_threshold() {
725        let processor = StreamingProcessor::new(25).unwrap();
726
727        // Simulate exceeding threshold (100MB default)
728        processor.metrics.memory_usage_bytes.store(200_000_000, Ordering::Relaxed);
729        assert!(!processor.check_memory_usage(), "200MB should exceed 100MB threshold");
730    }
731
732    #[test]
733    fn test_get_final_incomplete_bar_empty() {
734        let mut processor = StreamingProcessor::new(25).unwrap();
735
736        // No trades processed → no incomplete bar
737        let bar = processor.get_final_incomplete_bar();
738        assert!(bar.is_none(), "No incomplete bar before any trades");
739    }
740
741    #[test]
742    fn test_bars_per_aggtrade_ratio() {
743        let metrics = MetricsSummary {
744            trades_processed: 200,
745            bars_generated: 10,
746            errors_total: 0,
747            backpressure_events: 0,
748            circuit_breaker_trips: 0,
749            memory_usage_bytes: 0,
750        };
751
752        assert_eq!(metrics.bars_per_aggtrade(), 0.05);
753        assert_eq!(metrics.error_rate(), 0.0);
754    }
755}