1use futures::Stream;
2use rangebar_core::processor::ExportRangeBarProcessor;
11use rangebar_core::{AggTrade, RangeBar};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::task::{Context, Poll};
16use tokio::sync::mpsc;
17use tokio::time::{Duration, Instant};
18
19#[derive(Debug, Clone)]
21pub struct StreamingProcessorConfig {
22 pub trade_channel_capacity: usize,
24 pub bar_channel_capacity: usize,
26 pub memory_threshold_bytes: usize,
28 pub backpressure_timeout: Duration,
30 pub circuit_breaker_threshold: f64,
32 pub circuit_breaker_timeout: Duration,
34}
35
36impl StreamingProcessorConfig {
37 fn get_bar_channel_capacity() -> usize {
40 std::env::var("RANGEBAR_MAX_PENDING_BARS")
41 .ok()
42 .and_then(|v| v.parse::<usize>().ok())
43 .unwrap_or(10_000)
44 }
45}
46
47impl Default for StreamingProcessorConfig {
48 fn default() -> Self {
49 Self {
50 trade_channel_capacity: 5_000, bar_channel_capacity: StreamingProcessorConfig::get_bar_channel_capacity(), memory_threshold_bytes: 100_000_000, backpressure_timeout: Duration::from_millis(100),
54 circuit_breaker_threshold: 0.5, circuit_breaker_timeout: Duration::from_secs(30),
56 }
57 }
58}
59
60pub struct StreamingProcessor {
62 processor: ExportRangeBarProcessor,
64
65 #[allow(dead_code)]
67 threshold_decimal_bps: u32,
68
69 trade_sender: Option<mpsc::Sender<AggTrade>>,
71 trade_receiver: mpsc::Receiver<AggTrade>,
72
73 bar_sender: mpsc::Sender<RangeBar>,
75 bar_receiver: Option<mpsc::Receiver<RangeBar>>,
76
77 config: StreamingProcessorConfig,
79
80 metrics: Arc<StreamingMetrics>,
82
83 circuit_breaker: CircuitBreaker,
85}
86
87#[derive(Debug)]
89struct CircuitBreaker {
90 state: CircuitBreakerState,
91 failure_count: u64,
92 success_count: u64,
93 last_failure_time: Option<Instant>,
94 threshold: f64,
95 timeout: Duration,
96}
97
98#[derive(Debug, PartialEq)]
99enum CircuitBreakerState {
100 Closed,
101 Open,
102 HalfOpen,
103}
104
105#[derive(Debug, Default)]
108pub struct StreamingMetrics {
109 pub trades_processed: AtomicU64,
110 pub bars_generated: AtomicU64,
111 pub errors_total: AtomicU64,
112 pub backpressure_events: AtomicU64,
113 pub circuit_breaker_trips: AtomicU64,
114 pub memory_usage_bytes: AtomicU64,
115 pub max_queue_depth: AtomicU64, pub total_block_time_ms: AtomicU64, }
118
119impl StreamingProcessor {
120 pub fn new(
122 threshold_decimal_bps: u32,
123 ) -> Result<Self, rangebar_core::processor::ProcessingError> {
124 Self::with_config(threshold_decimal_bps, StreamingProcessorConfig::default())
125 }
126
127 pub fn with_config(
129 threshold_decimal_bps: u32,
130 config: StreamingProcessorConfig,
131 ) -> Result<Self, rangebar_core::processor::ProcessingError> {
132 let (trade_sender, trade_receiver) = mpsc::channel(config.trade_channel_capacity);
133 let (bar_sender, bar_receiver) = mpsc::channel(config.bar_channel_capacity);
134
135 let circuit_breaker_threshold = config.circuit_breaker_threshold;
136 let circuit_breaker_timeout = config.circuit_breaker_timeout;
137
138 Ok(Self {
139 processor: ExportRangeBarProcessor::new(threshold_decimal_bps)?,
140 threshold_decimal_bps,
141 trade_sender: Some(trade_sender),
142 trade_receiver,
143 bar_sender,
144 bar_receiver: Some(bar_receiver),
145 config,
146 metrics: Arc::new(StreamingMetrics::default()),
147 circuit_breaker: CircuitBreaker::new(
148 circuit_breaker_threshold,
149 circuit_breaker_timeout,
150 ),
151 })
152 }
153
154 pub fn trade_sender(&mut self) -> Option<mpsc::Sender<AggTrade>> {
156 self.trade_sender.take()
157 }
158
159 pub fn bar_receiver(&mut self) -> Option<mpsc::Receiver<RangeBar>> {
161 self.bar_receiver.take()
162 }
163
164 pub async fn start_processing(&mut self) -> Result<(), StreamingError> {
166 loop {
167 if !self.circuit_breaker.can_process() {
169 tokio::time::sleep(Duration::from_millis(100)).await;
170 continue;
171 }
172
173 let trade = match tokio::time::timeout(
175 self.config.backpressure_timeout,
176 self.trade_receiver.recv(),
177 )
178 .await
179 {
180 Ok(Some(trade)) => trade,
181 Ok(None) => {
182 if let Some(final_bar) = self.processor.get_incomplete_bar()
184 && let Err(e) = self.send_bar_with_backpressure(final_bar).await
185 {
186 println!("Failed to send final incomplete bar: {:?}", e);
187 }
188 break;
189 }
190 Err(_) => continue, };
192
193 match self.process_single_trade(&trade).await {
195 Ok(bar_opt) => {
196 self.circuit_breaker.record_success();
197
198 if let Some(bar) = bar_opt
200 && let Err(e) = self.send_bar_with_backpressure(bar).await
201 {
202 println!("Failed to send bar: {:?}", e);
203 self.circuit_breaker.record_failure();
204 }
205 }
206 Err(e) => {
207 println!("Trade processing error: {:?}", e);
208 self.circuit_breaker.record_failure();
209 self.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
210 }
211 }
212 }
213
214 Ok(())
215 }
216
217 async fn process_single_trade(
220 &mut self,
221 trade: &AggTrade,
222 ) -> Result<Option<RangeBar>, StreamingError> {
223 self.metrics
225 .trades_processed
226 .fetch_add(1, Ordering::Relaxed);
227
228 self.processor.process_trades_continuously(&[trade.clone()]);
232
233 let mut completed_bars = self.processor.get_all_completed_bars();
235
236 if !completed_bars.is_empty() {
237 let completed_bar = completed_bars.remove(0);
240
241 if !completed_bars.is_empty() {
243 println!(
244 "Warning: {} additional bars completed, dropping for bounded memory",
245 completed_bars.len()
246 );
247 self.metrics
248 .backpressure_events
249 .fetch_add(completed_bars.len() as u64, Ordering::Relaxed);
250 }
251
252 self.metrics.bars_generated.fetch_add(1, Ordering::Relaxed);
253 Ok(Some(completed_bar))
254 } else {
255 Ok(None)
256 }
257 }
258
259 async fn send_bar_with_backpressure(&self, bar: RangeBar) -> Result<(), StreamingError> {
261 match self.bar_sender.try_send(bar.clone()) {
263 Ok(()) => Ok(()),
264 Err(mpsc::error::TrySendError::Full(_)) => {
265 println!("Bar channel full, applying backpressure");
267 self.metrics
268 .backpressure_events
269 .fetch_add(1, Ordering::Relaxed);
270
271 self.bar_sender
273 .send(bar)
274 .await
275 .map_err(|_| StreamingError::ChannelClosed)
276 }
277 Err(mpsc::error::TrySendError::Closed(_)) => Err(StreamingError::ChannelClosed),
278 }
279 }
280
281 pub fn metrics(&self) -> &StreamingMetrics {
283 &self.metrics
284 }
285
286 pub fn get_final_incomplete_bar(&mut self) -> Option<RangeBar> {
288 self.processor.get_incomplete_bar()
289 }
290
291 pub fn check_memory_usage(&self) -> bool {
293 let current_usage = self.metrics.memory_usage_bytes.load(Ordering::Relaxed);
294 current_usage < self.config.memory_threshold_bytes as u64
295 }
296}
297
298impl CircuitBreaker {
299 fn new(threshold: f64, timeout: Duration) -> Self {
300 Self {
301 state: CircuitBreakerState::Closed,
302 failure_count: 0,
303 success_count: 0,
304 last_failure_time: None,
305 threshold,
306 timeout,
307 }
308 }
309
310 fn can_process(&mut self) -> bool {
311 match self.state {
312 CircuitBreakerState::Closed => true,
313 CircuitBreakerState::Open => {
314 if let Some(last_failure) = self.last_failure_time {
315 if last_failure.elapsed() > self.timeout {
316 self.state = CircuitBreakerState::HalfOpen;
317 true
318 } else {
319 false
320 }
321 } else {
322 true
323 }
324 }
325 CircuitBreakerState::HalfOpen => true,
326 }
327 }
328
329 fn record_success(&mut self) {
330 self.success_count += 1;
331
332 if self.state == CircuitBreakerState::HalfOpen {
333 self.state = CircuitBreakerState::Closed;
335 self.failure_count = 0;
336 }
337 }
338
339 fn record_failure(&mut self) {
340 self.failure_count += 1;
341 self.last_failure_time = Some(Instant::now());
342
343 let total_requests = self.failure_count + self.success_count;
344 if total_requests >= 10 {
345 let failure_rate = self.failure_count as f64 / total_requests as f64;
347
348 if failure_rate >= self.threshold {
349 self.state = CircuitBreakerState::Open;
350 }
351 }
352 }
353}
354
355pub struct RangeBarStream {
357 receiver: mpsc::Receiver<RangeBar>,
358}
359
360impl RangeBarStream {
361 pub fn new(receiver: mpsc::Receiver<RangeBar>) -> Self {
362 Self { receiver }
363 }
364}
365
366impl Stream for RangeBarStream {
367 type Item = Result<RangeBar, StreamingError>;
368
369 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
370 match self.receiver.poll_recv(cx) {
371 Poll::Ready(Some(bar)) => Poll::Ready(Some(Ok(bar))),
372 Poll::Ready(None) => Poll::Ready(None),
373 Poll::Pending => Poll::Pending,
374 }
375 }
376}
377
378#[derive(Debug, thiserror::Error)]
380pub enum StreamingError {
381 #[error("Channel closed")]
382 ChannelClosed,
383
384 #[error("Backpressure timeout")]
385 BackpressureTimeout,
386
387 #[error("Circuit breaker open")]
388 CircuitBreakerOpen,
389
390 #[error("Memory threshold exceeded")]
391 MemoryThresholdExceeded,
392
393 #[error("Processing error: {0}")]
394 ProcessingError(String),
395}
396
397impl StreamingMetrics {
398 pub fn summary(&self) -> MetricsSummary {
400 MetricsSummary {
401 trades_processed: self.trades_processed.load(Ordering::Relaxed),
402 bars_generated: self.bars_generated.load(Ordering::Relaxed),
403 errors_total: self.errors_total.load(Ordering::Relaxed),
404 backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
405 circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
406 memory_usage_bytes: self.memory_usage_bytes.load(Ordering::Relaxed),
407 }
408 }
409}
410
411#[derive(Debug, Clone)]
413pub struct MetricsSummary {
414 pub trades_processed: u64,
415 pub bars_generated: u64,
416 pub errors_total: u64,
417 pub backpressure_events: u64,
418 pub circuit_breaker_trips: u64,
419 pub memory_usage_bytes: u64,
420}
421
422impl MetricsSummary {
423 pub fn bars_per_aggtrade(&self) -> f64 {
425 if self.trades_processed > 0 {
426 self.bars_generated as f64 / self.trades_processed as f64
427 } else {
428 0.0
429 }
430 }
431
432 pub fn error_rate(&self) -> f64 {
434 if self.trades_processed > 0 {
435 self.errors_total as f64 / self.trades_processed as f64
436 } else {
437 0.0
438 }
439 }
440
441 pub fn memory_usage_mb(&self) -> f64 {
443 self.memory_usage_bytes as f64 / 1_000_000.0
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use rangebar_core::FixedPoint;
451
452 fn create_test_trade(id: u64, price: f64, timestamp: u64) -> AggTrade {
453 let price_str = format!("{:.8}", price);
454 AggTrade {
455 agg_trade_id: id as i64,
456 price: FixedPoint::from_str(&price_str).unwrap(),
457 volume: FixedPoint::from_str("1.0").unwrap(),
458 first_trade_id: id as i64,
459 last_trade_id: id as i64,
460 timestamp: timestamp as i64,
461 is_buyer_maker: false,
462 is_best_match: None,
463 }
464 }
465
466 #[tokio::test]
467 async fn test_bounded_memory_streaming() {
468 let mut processor = StreamingProcessor::new(25).unwrap(); let initial_metrics = processor.metrics().summary();
472
473 for i in 0..1000 {
475 let trade = create_test_trade(i, 23000.0 + (i as f64), 1659312000000 + i);
476 if let Ok(bar_opt) = processor.process_single_trade(&trade).await {
477 assert!(bar_opt.is_none() || bar_opt.is_some());
479 }
480 }
481
482 let final_metrics = processor.metrics().summary();
483 assert!(final_metrics.trades_processed >= initial_metrics.trades_processed);
484 assert!(final_metrics.trades_processed <= 1000);
485 }
486
487 #[tokio::test]
488 async fn test_circuit_breaker() {
489 let mut circuit_breaker = CircuitBreaker::new(0.5, Duration::from_millis(100));
490
491 assert!(circuit_breaker.can_process());
493
494 for _ in 0..20 {
496 circuit_breaker.record_failure();
497 }
498
499 assert_eq!(circuit_breaker.state, CircuitBreakerState::Open);
501 assert!(!circuit_breaker.can_process());
502
503 tokio::time::sleep(Duration::from_millis(150)).await;
505
506 assert!(circuit_breaker.can_process());
508
509 circuit_breaker.record_success();
511
512 assert_eq!(circuit_breaker.state, CircuitBreakerState::Closed);
514 }
515
516 #[test]
519 fn test_circuit_breaker_stays_closed_below_threshold() {
520 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(10));
521
522 for _ in 0..8 {
524 cb.record_success();
525 }
526 for _ in 0..2 {
527 cb.record_failure();
528 }
529
530 assert_eq!(cb.state, CircuitBreakerState::Closed);
532 assert!(cb.can_process());
533 }
534
535 #[test]
536 fn test_circuit_breaker_minimum_sample_size() {
537 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(10));
538
539 for _ in 0..9 {
541 cb.record_failure();
542 }
543
544 assert_eq!(cb.state, CircuitBreakerState::Closed);
546 assert!(cb.can_process());
547
548 cb.record_failure();
550 assert_eq!(cb.state, CircuitBreakerState::Open);
551 }
552
553 #[test]
554 fn test_circuit_breaker_halfopen_failure_reopens() {
555 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(0));
556
557 for _ in 0..10 {
559 cb.record_failure();
560 }
561 assert_eq!(cb.state, CircuitBreakerState::Open);
562
563 assert!(cb.can_process());
565 assert_eq!(cb.state, CircuitBreakerState::HalfOpen);
566
567 cb.record_failure();
570 assert_eq!(cb.state, CircuitBreakerState::Open);
571 }
572
573 #[test]
574 fn test_circuit_breaker_closed_resets_failure_count() {
575 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(0));
576
577 for _ in 0..10 {
579 cb.record_failure();
580 }
581 assert_eq!(cb.state, CircuitBreakerState::Open);
582
583 assert!(cb.can_process());
585 assert_eq!(cb.state, CircuitBreakerState::HalfOpen);
586
587 cb.record_success();
589 assert_eq!(cb.state, CircuitBreakerState::Closed);
590 assert_eq!(cb.failure_count, 0);
591 }
592
593 #[test]
594 fn test_circuit_breaker_open_blocks_until_timeout() {
595 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(3600)); for _ in 0..10 {
599 cb.record_failure();
600 }
601
602 assert!(!cb.can_process());
604 assert_eq!(cb.state, CircuitBreakerState::Open);
605 }
606
607 #[test]
608 fn test_metrics_zero_trades() {
609 let metrics = MetricsSummary {
610 trades_processed: 0,
611 bars_generated: 0,
612 errors_total: 0,
613 backpressure_events: 0,
614 circuit_breaker_trips: 0,
615 memory_usage_bytes: 0,
616 };
617
618 assert_eq!(metrics.bars_per_aggtrade(), 0.0);
620 assert_eq!(metrics.error_rate(), 0.0);
621 assert_eq!(metrics.memory_usage_mb(), 0.0);
622 }
623
624 #[test]
625 fn test_metrics_calculations() {
626 let metrics = MetricsSummary {
627 trades_processed: 1000,
628 bars_generated: 50,
629 errors_total: 5,
630 backpressure_events: 2,
631 circuit_breaker_trips: 1,
632 memory_usage_bytes: 50_000_000,
633 };
634
635 assert_eq!(metrics.bars_per_aggtrade(), 0.05);
636 assert_eq!(metrics.error_rate(), 0.005);
637 assert_eq!(metrics.memory_usage_mb(), 50.0);
638 }
639
640 #[test]
643 fn test_streaming_metrics_summary_snapshot() {
644 let metrics = StreamingMetrics::default();
645 metrics.trades_processed.store(500, Ordering::Relaxed);
646 metrics.bars_generated.store(25, Ordering::Relaxed);
647 metrics.errors_total.store(3, Ordering::Relaxed);
648 metrics.backpressure_events.store(1, Ordering::Relaxed);
649 metrics.circuit_breaker_trips.store(0, Ordering::Relaxed);
650 metrics.memory_usage_bytes.store(42_000_000, Ordering::Relaxed);
651
652 let summary = metrics.summary();
653
654 assert_eq!(summary.trades_processed, 500);
655 assert_eq!(summary.bars_generated, 25);
656 assert_eq!(summary.errors_total, 3);
657 assert_eq!(summary.backpressure_events, 1);
658 assert_eq!(summary.circuit_breaker_trips, 0);
659 assert_eq!(summary.memory_usage_bytes, 42_000_000);
660 }
661
662 #[test]
663 fn test_memory_usage_mb_conversion() {
664 let m1 = MetricsSummary {
666 trades_processed: 0, bars_generated: 0, errors_total: 0,
667 backpressure_events: 0, circuit_breaker_trips: 0,
668 memory_usage_bytes: 1_000_000,
669 };
670 assert_eq!(m1.memory_usage_mb(), 1.0);
671
672 let m2 = MetricsSummary {
674 trades_processed: 0, bars_generated: 0, errors_total: 0,
675 backpressure_events: 0, circuit_breaker_trips: 0,
676 memory_usage_bytes: 1_500_000,
677 };
678 assert_eq!(m2.memory_usage_mb(), 1.5);
679
680 let m3 = MetricsSummary {
682 trades_processed: 0, bars_generated: 0, errors_total: 0,
683 backpressure_events: 0, circuit_breaker_trips: 0,
684 memory_usage_bytes: 4_000_000_000,
685 };
686 assert_eq!(m3.memory_usage_mb(), 4000.0);
687 }
688
689 #[test]
690 fn test_trade_sender_take_once() {
691 let mut processor = StreamingProcessor::new(25).unwrap();
692
693 let sender = processor.trade_sender();
695 assert!(sender.is_some(), "First trade_sender() call must return Some");
696
697 let sender2 = processor.trade_sender();
699 assert!(sender2.is_none(), "Second trade_sender() call must return None");
700 }
701
702 #[test]
703 fn test_bar_receiver_take_once() {
704 let mut processor = StreamingProcessor::new(25).unwrap();
705
706 let receiver = processor.bar_receiver();
708 assert!(receiver.is_some(), "First bar_receiver() call must return Some");
709
710 let receiver2 = processor.bar_receiver();
712 assert!(receiver2.is_none(), "Second bar_receiver() call must return None");
713 }
714
715 #[test]
716 fn test_check_memory_usage_below_threshold() {
717 let processor = StreamingProcessor::new(25).unwrap();
718
719 assert!(processor.check_memory_usage(), "Zero memory usage should be within threshold");
721 }
722
723 #[test]
724 fn test_check_memory_usage_above_threshold() {
725 let processor = StreamingProcessor::new(25).unwrap();
726
727 processor.metrics.memory_usage_bytes.store(200_000_000, Ordering::Relaxed);
729 assert!(!processor.check_memory_usage(), "200MB should exceed 100MB threshold");
730 }
731
732 #[test]
733 fn test_get_final_incomplete_bar_empty() {
734 let mut processor = StreamingProcessor::new(25).unwrap();
735
736 let bar = processor.get_final_incomplete_bar();
738 assert!(bar.is_none(), "No incomplete bar before any trades");
739 }
740
741 #[test]
742 fn test_bars_per_aggtrade_ratio() {
743 let metrics = MetricsSummary {
744 trades_processed: 200,
745 bars_generated: 10,
746 errors_total: 0,
747 backpressure_events: 0,
748 circuit_breaker_trips: 0,
749 memory_usage_bytes: 0,
750 };
751
752 assert_eq!(metrics.bars_per_aggtrade(), 0.05);
753 assert_eq!(metrics.error_rate(), 0.0);
754 }
755}