1use futures::Stream;
2use opendeviationbar_core::processor::ExportOpenDeviationBarProcessor;
11use opendeviationbar_core::{AggTrade, OpenDeviationBar};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::task::{Context, Poll};
16use tokio::sync::mpsc;
17use tokio::time::{Duration, Instant};
18
19#[derive(Debug, Clone)]
21pub struct StreamingProcessorConfig {
22 pub trade_channel_capacity: usize,
24 pub bar_channel_capacity: usize,
26 pub memory_threshold_bytes: usize,
28 pub backpressure_timeout: Duration,
30 pub circuit_breaker_threshold: f64,
32 pub circuit_breaker_timeout: Duration,
34}
35
36impl StreamingProcessorConfig {
37 fn get_bar_channel_capacity() -> usize {
40 std::env::var("OPENDEVIATIONBAR_MAX_PENDING_BARS")
41 .ok()
42 .and_then(|v| v.parse::<usize>().ok())
43 .unwrap_or(10_000)
44 }
45}
46
47impl Default for StreamingProcessorConfig {
48 fn default() -> Self {
49 Self {
50 trade_channel_capacity: 5_000, bar_channel_capacity: StreamingProcessorConfig::get_bar_channel_capacity(), memory_threshold_bytes: 100_000_000, backpressure_timeout: Duration::from_millis(100),
54 circuit_breaker_threshold: 0.5, circuit_breaker_timeout: Duration::from_secs(30),
56 }
57 }
58}
59
60pub struct StreamingProcessor {
62 processor: ExportOpenDeviationBarProcessor,
64
65 _threshold_decimal_bps: u32,
67
68 trade_sender: Option<mpsc::Sender<AggTrade>>,
70 trade_receiver: mpsc::Receiver<AggTrade>,
71
72 bar_sender: mpsc::Sender<OpenDeviationBar>,
74 bar_receiver: Option<mpsc::Receiver<OpenDeviationBar>>,
75
76 config: StreamingProcessorConfig,
78
79 metrics: Arc<StreamingMetrics>,
81
82 circuit_breaker: CircuitBreaker,
84}
85
86#[derive(Debug)]
88struct CircuitBreaker {
89 state: CircuitBreakerState,
90 failure_count: u64,
91 success_count: u64,
92 last_failure_time: Option<Instant>,
93 threshold: f64,
94 timeout: Duration,
95}
96
97#[derive(Debug, PartialEq)]
98enum CircuitBreakerState {
99 Closed,
100 Open,
101 HalfOpen,
102}
103
104#[derive(Debug, Default)]
107pub struct StreamingMetrics {
108 pub trades_processed: AtomicU64,
109 pub bars_generated: AtomicU64,
110 pub errors_total: AtomicU64,
111 pub backpressure_events: AtomicU64,
112 pub circuit_breaker_trips: AtomicU64,
113 pub memory_usage_bytes: AtomicU64,
114 pub max_queue_depth: AtomicU64, pub total_block_time_ms: AtomicU64, }
117
118impl StreamingProcessor {
119 pub fn new(
121 threshold_decimal_bps: u32,
122 ) -> Result<Self, opendeviationbar_core::processor::ProcessingError> {
123 Self::with_config(threshold_decimal_bps, StreamingProcessorConfig::default())
124 }
125
126 pub fn with_config(
128 threshold_decimal_bps: u32,
129 config: StreamingProcessorConfig,
130 ) -> Result<Self, opendeviationbar_core::processor::ProcessingError> {
131 let (trade_sender, trade_receiver) = mpsc::channel(config.trade_channel_capacity);
132 let (bar_sender, bar_receiver) = mpsc::channel(config.bar_channel_capacity);
133
134 let circuit_breaker_threshold = config.circuit_breaker_threshold;
135 let circuit_breaker_timeout = config.circuit_breaker_timeout;
136
137 Ok(Self {
138 processor: ExportOpenDeviationBarProcessor::new(threshold_decimal_bps)?,
139 _threshold_decimal_bps: threshold_decimal_bps,
140 trade_sender: Some(trade_sender),
141 trade_receiver,
142 bar_sender,
143 bar_receiver: Some(bar_receiver),
144 config,
145 metrics: Arc::new(StreamingMetrics::default()),
146 circuit_breaker: CircuitBreaker::new(
147 circuit_breaker_threshold,
148 circuit_breaker_timeout,
149 ),
150 })
151 }
152
153 pub fn trade_sender(&mut self) -> Option<mpsc::Sender<AggTrade>> {
155 self.trade_sender.take()
156 }
157
158 pub fn bar_receiver(&mut self) -> Option<mpsc::Receiver<OpenDeviationBar>> {
160 self.bar_receiver.take()
161 }
162
163 pub async fn start_processing(&mut self) -> Result<(), StreamingError> {
165 loop {
166 if !self.circuit_breaker.can_process() {
168 tokio::time::sleep(Duration::from_millis(100)).await;
169 continue;
170 }
171
172 let trade = match tokio::time::timeout(
174 self.config.backpressure_timeout,
175 self.trade_receiver.recv(),
176 )
177 .await
178 {
179 Ok(Some(trade)) => trade,
180 Ok(None) => {
181 if let Some(final_bar) = self.processor.get_incomplete_bar()
183 && let Err(e) = self.send_bar_with_backpressure(final_bar).await
184 {
185 println!("Failed to send final incomplete bar: {:?}", e);
186 }
187 break;
188 }
189 Err(_) => continue, };
191
192 match self.process_single_trade(&trade).await {
194 Ok(bar_opt) => {
195 self.circuit_breaker.record_success();
196
197 if let Some(bar) = bar_opt
199 && let Err(e) = self.send_bar_with_backpressure(bar).await
200 {
201 println!("Failed to send bar: {:?}", e);
202 self.circuit_breaker.record_failure();
203 }
204 }
205 Err(e) => {
206 println!("Trade processing error: {:?}", e);
207 self.circuit_breaker.record_failure();
208 self.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
209 }
210 }
211 }
212
213 Ok(())
214 }
215
216 async fn process_single_trade(
219 &mut self,
220 trade: &AggTrade,
221 ) -> Result<Option<OpenDeviationBar>, StreamingError> {
222 self.metrics
224 .trades_processed
225 .fetch_add(1, Ordering::Relaxed);
226
227 self.processor
229 .process_trades_continuously(std::slice::from_ref(trade));
230
231 let mut completed_bars = self.processor.get_all_completed_bars();
233
234 if !completed_bars.is_empty() {
235 let completed_bar = completed_bars.remove(0);
238
239 if !completed_bars.is_empty() {
241 println!(
242 "Warning: {} additional bars completed, dropping for bounded memory",
243 completed_bars.len()
244 );
245 self.metrics
246 .backpressure_events
247 .fetch_add(completed_bars.len() as u64, Ordering::Relaxed);
248 }
249
250 self.metrics.bars_generated.fetch_add(1, Ordering::Relaxed);
251 Ok(Some(completed_bar))
252 } else {
253 Ok(None)
254 }
255 }
256
257 async fn send_bar_with_backpressure(
259 &self,
260 bar: OpenDeviationBar,
261 ) -> Result<(), StreamingError> {
262 match self.bar_sender.try_send(bar.clone()) {
264 Ok(()) => Ok(()),
265 Err(mpsc::error::TrySendError::Full(_)) => {
266 println!("Bar channel full, applying backpressure");
268 self.metrics
269 .backpressure_events
270 .fetch_add(1, Ordering::Relaxed);
271
272 self.bar_sender
274 .send(bar)
275 .await
276 .map_err(|_| StreamingError::ChannelClosed)
277 }
278 Err(mpsc::error::TrySendError::Closed(_)) => Err(StreamingError::ChannelClosed),
279 }
280 }
281
282 pub fn metrics(&self) -> &StreamingMetrics {
284 &self.metrics
285 }
286
287 pub fn get_final_incomplete_bar(&mut self) -> Option<OpenDeviationBar> {
289 self.processor.get_incomplete_bar()
290 }
291
292 pub fn check_memory_usage(&self) -> bool {
294 let current_usage = self.metrics.memory_usage_bytes.load(Ordering::Relaxed);
295 current_usage < self.config.memory_threshold_bytes as u64
296 }
297}
298
299impl CircuitBreaker {
300 fn new(threshold: f64, timeout: Duration) -> Self {
301 Self {
302 state: CircuitBreakerState::Closed,
303 failure_count: 0,
304 success_count: 0,
305 last_failure_time: None,
306 threshold,
307 timeout,
308 }
309 }
310
311 fn can_process(&mut self) -> bool {
312 match self.state {
313 CircuitBreakerState::Closed => true,
314 CircuitBreakerState::Open => {
315 if let Some(last_failure) = self.last_failure_time {
316 if last_failure.elapsed() >= self.timeout {
317 self.state = CircuitBreakerState::HalfOpen;
318 true
319 } else {
320 false
321 }
322 } else {
323 true
324 }
325 }
326 CircuitBreakerState::HalfOpen => true,
327 }
328 }
329
330 fn record_success(&mut self) {
331 self.success_count += 1;
332
333 if self.state == CircuitBreakerState::HalfOpen {
334 self.state = CircuitBreakerState::Closed;
336 self.failure_count = 0;
337 }
338 }
339
340 fn record_failure(&mut self) {
341 self.failure_count += 1;
342 self.last_failure_time = Some(Instant::now());
343
344 let total_requests = self.failure_count + self.success_count;
345 if total_requests >= 10 {
346 let failure_rate = self.failure_count as f64 / total_requests as f64;
348
349 if failure_rate >= self.threshold {
350 self.state = CircuitBreakerState::Open;
351 }
352 }
353 }
354}
355
356pub struct OpenDeviationBarStream {
358 receiver: mpsc::Receiver<OpenDeviationBar>,
359}
360
361impl OpenDeviationBarStream {
362 pub fn new(receiver: mpsc::Receiver<OpenDeviationBar>) -> Self {
363 Self { receiver }
364 }
365}
366
367impl Stream for OpenDeviationBarStream {
368 type Item = Result<OpenDeviationBar, StreamingError>;
369
370 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371 match self.receiver.poll_recv(cx) {
372 Poll::Ready(Some(bar)) => Poll::Ready(Some(Ok(bar))),
373 Poll::Ready(None) => Poll::Ready(None),
374 Poll::Pending => Poll::Pending,
375 }
376 }
377}
378
379#[derive(Debug, thiserror::Error)]
381pub enum StreamingError {
382 #[error("Channel closed")]
383 ChannelClosed,
384
385 #[error("Backpressure timeout")]
386 BackpressureTimeout,
387
388 #[error("Circuit breaker open")]
389 CircuitBreakerOpen,
390
391 #[error("Memory threshold exceeded")]
392 MemoryThresholdExceeded,
393
394 #[error("Processing error: {0}")]
395 ProcessingError(String),
396}
397
398impl StreamingMetrics {
399 pub fn summary(&self) -> MetricsSummary {
401 MetricsSummary {
402 trades_processed: self.trades_processed.load(Ordering::Relaxed),
403 bars_generated: self.bars_generated.load(Ordering::Relaxed),
404 errors_total: self.errors_total.load(Ordering::Relaxed),
405 backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
406 circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
407 memory_usage_bytes: self.memory_usage_bytes.load(Ordering::Relaxed),
408 }
409 }
410}
411
412#[derive(Debug, Clone)]
414pub struct MetricsSummary {
415 pub trades_processed: u64,
416 pub bars_generated: u64,
417 pub errors_total: u64,
418 pub backpressure_events: u64,
419 pub circuit_breaker_trips: u64,
420 pub memory_usage_bytes: u64,
421}
422
423impl MetricsSummary {
424 pub fn bars_per_aggtrade(&self) -> f64 {
426 if self.trades_processed > 0 {
427 self.bars_generated as f64 / self.trades_processed as f64
428 } else {
429 0.0
430 }
431 }
432
433 pub fn error_rate(&self) -> f64 {
435 if self.trades_processed > 0 {
436 self.errors_total as f64 / self.trades_processed as f64
437 } else {
438 0.0
439 }
440 }
441
442 pub fn memory_usage_mb(&self) -> f64 {
444 self.memory_usage_bytes as f64 / 1_000_000.0
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use opendeviationbar_core::FixedPoint;
452
453 fn create_test_trade(id: u64, price: f64, timestamp: u64) -> AggTrade {
454 let price_str = format!("{:.8}", price);
455 AggTrade {
456 agg_trade_id: id as i64,
457 price: FixedPoint::from_str(&price_str).unwrap(),
458 volume: FixedPoint::from_str("1.0").unwrap(),
459 first_trade_id: id as i64,
460 last_trade_id: id as i64,
461 timestamp: timestamp as i64,
462 is_buyer_maker: false,
463 is_best_match: None,
464 }
465 }
466
467 #[tokio::test]
468 async fn test_bounded_memory_streaming() {
469 let mut processor = StreamingProcessor::new(25).unwrap(); let initial_metrics = processor.metrics().summary();
473
474 for i in 0..1000 {
476 let trade = create_test_trade(i, 23000.0 + (i as f64), 1659312000000 + i);
477 if let Ok(bar_opt) = processor.process_single_trade(&trade).await {
478 assert!(bar_opt.is_none() || bar_opt.is_some());
480 }
481 }
482
483 let final_metrics = processor.metrics().summary();
484 assert!(final_metrics.trades_processed >= initial_metrics.trades_processed);
485 assert!(final_metrics.trades_processed <= 1000);
486 }
487
488 #[tokio::test]
489 async fn test_circuit_breaker() {
490 let mut circuit_breaker = CircuitBreaker::new(0.5, Duration::from_millis(100));
491
492 assert!(circuit_breaker.can_process());
494
495 for _ in 0..20 {
497 circuit_breaker.record_failure();
498 }
499
500 assert_eq!(circuit_breaker.state, CircuitBreakerState::Open);
502 assert!(!circuit_breaker.can_process());
503
504 tokio::time::sleep(Duration::from_millis(150)).await;
506
507 assert!(circuit_breaker.can_process());
509
510 circuit_breaker.record_success();
512
513 assert_eq!(circuit_breaker.state, CircuitBreakerState::Closed);
515 }
516
517 #[test]
520 fn test_circuit_breaker_stays_closed_below_threshold() {
521 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(10));
522
523 for _ in 0..8 {
525 cb.record_success();
526 }
527 for _ in 0..2 {
528 cb.record_failure();
529 }
530
531 assert_eq!(cb.state, CircuitBreakerState::Closed);
533 assert!(cb.can_process());
534 }
535
536 #[test]
537 fn test_circuit_breaker_minimum_sample_size() {
538 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(10));
539
540 for _ in 0..9 {
542 cb.record_failure();
543 }
544
545 assert_eq!(cb.state, CircuitBreakerState::Closed);
547 assert!(cb.can_process());
548
549 cb.record_failure();
551 assert_eq!(cb.state, CircuitBreakerState::Open);
552 }
553
554 #[test]
555 fn test_circuit_breaker_halfopen_failure_reopens() {
556 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(0));
557
558 for _ in 0..10 {
560 cb.record_failure();
561 }
562 assert_eq!(cb.state, CircuitBreakerState::Open);
563
564 assert!(cb.can_process());
566 assert_eq!(cb.state, CircuitBreakerState::HalfOpen);
567
568 cb.record_failure();
571 assert_eq!(cb.state, CircuitBreakerState::Open);
572 }
573
574 #[test]
575 fn test_circuit_breaker_closed_resets_failure_count() {
576 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(0));
577
578 for _ in 0..10 {
580 cb.record_failure();
581 }
582 assert_eq!(cb.state, CircuitBreakerState::Open);
583
584 assert!(cb.can_process());
586 assert_eq!(cb.state, CircuitBreakerState::HalfOpen);
587
588 cb.record_success();
590 assert_eq!(cb.state, CircuitBreakerState::Closed);
591 assert_eq!(cb.failure_count, 0);
592 }
593
594 #[test]
595 fn test_circuit_breaker_open_blocks_until_timeout() {
596 let mut cb = CircuitBreaker::new(0.5, Duration::from_secs(3600)); for _ in 0..10 {
600 cb.record_failure();
601 }
602
603 assert!(!cb.can_process());
605 assert_eq!(cb.state, CircuitBreakerState::Open);
606 }
607
608 #[test]
609 fn test_metrics_zero_trades() {
610 let metrics = MetricsSummary {
611 trades_processed: 0,
612 bars_generated: 0,
613 errors_total: 0,
614 backpressure_events: 0,
615 circuit_breaker_trips: 0,
616 memory_usage_bytes: 0,
617 };
618
619 assert_eq!(metrics.bars_per_aggtrade(), 0.0);
621 assert_eq!(metrics.error_rate(), 0.0);
622 assert_eq!(metrics.memory_usage_mb(), 0.0);
623 }
624
625 #[test]
626 fn test_metrics_calculations() {
627 let metrics = MetricsSummary {
628 trades_processed: 1000,
629 bars_generated: 50,
630 errors_total: 5,
631 backpressure_events: 2,
632 circuit_breaker_trips: 1,
633 memory_usage_bytes: 50_000_000,
634 };
635
636 assert_eq!(metrics.bars_per_aggtrade(), 0.05);
637 assert_eq!(metrics.error_rate(), 0.005);
638 assert_eq!(metrics.memory_usage_mb(), 50.0);
639 }
640
641 #[test]
644 fn test_streaming_metrics_summary_snapshot() {
645 let metrics = StreamingMetrics::default();
646 metrics.trades_processed.store(500, Ordering::Relaxed);
647 metrics.bars_generated.store(25, Ordering::Relaxed);
648 metrics.errors_total.store(3, Ordering::Relaxed);
649 metrics.backpressure_events.store(1, Ordering::Relaxed);
650 metrics.circuit_breaker_trips.store(0, Ordering::Relaxed);
651 metrics
652 .memory_usage_bytes
653 .store(42_000_000, Ordering::Relaxed);
654
655 let summary = metrics.summary();
656
657 assert_eq!(summary.trades_processed, 500);
658 assert_eq!(summary.bars_generated, 25);
659 assert_eq!(summary.errors_total, 3);
660 assert_eq!(summary.backpressure_events, 1);
661 assert_eq!(summary.circuit_breaker_trips, 0);
662 assert_eq!(summary.memory_usage_bytes, 42_000_000);
663 }
664
665 #[test]
666 fn test_memory_usage_mb_conversion() {
667 let m1 = MetricsSummary {
669 trades_processed: 0,
670 bars_generated: 0,
671 errors_total: 0,
672 backpressure_events: 0,
673 circuit_breaker_trips: 0,
674 memory_usage_bytes: 1_000_000,
675 };
676 assert_eq!(m1.memory_usage_mb(), 1.0);
677
678 let m2 = MetricsSummary {
680 trades_processed: 0,
681 bars_generated: 0,
682 errors_total: 0,
683 backpressure_events: 0,
684 circuit_breaker_trips: 0,
685 memory_usage_bytes: 1_500_000,
686 };
687 assert_eq!(m2.memory_usage_mb(), 1.5);
688
689 let m3 = MetricsSummary {
691 trades_processed: 0,
692 bars_generated: 0,
693 errors_total: 0,
694 backpressure_events: 0,
695 circuit_breaker_trips: 0,
696 memory_usage_bytes: 4_000_000_000,
697 };
698 assert_eq!(m3.memory_usage_mb(), 4000.0);
699 }
700
701 #[test]
702 fn test_trade_sender_take_once() {
703 let mut processor = StreamingProcessor::new(25).unwrap();
704
705 let sender = processor.trade_sender();
707 assert!(
708 sender.is_some(),
709 "First trade_sender() call must return Some"
710 );
711
712 let sender2 = processor.trade_sender();
714 assert!(
715 sender2.is_none(),
716 "Second trade_sender() call must return None"
717 );
718 }
719
720 #[test]
721 fn test_bar_receiver_take_once() {
722 let mut processor = StreamingProcessor::new(25).unwrap();
723
724 let receiver = processor.bar_receiver();
726 assert!(
727 receiver.is_some(),
728 "First bar_receiver() call must return Some"
729 );
730
731 let receiver2 = processor.bar_receiver();
733 assert!(
734 receiver2.is_none(),
735 "Second bar_receiver() call must return None"
736 );
737 }
738
739 #[test]
740 fn test_check_memory_usage_below_threshold() {
741 let processor = StreamingProcessor::new(25).unwrap();
742
743 assert!(
745 processor.check_memory_usage(),
746 "Zero memory usage should be within threshold"
747 );
748 }
749
750 #[test]
751 fn test_check_memory_usage_above_threshold() {
752 let processor = StreamingProcessor::new(25).unwrap();
753
754 processor
756 .metrics
757 .memory_usage_bytes
758 .store(200_000_000, Ordering::Relaxed);
759 assert!(
760 !processor.check_memory_usage(),
761 "200MB should exceed 100MB threshold"
762 );
763 }
764
765 #[test]
766 fn test_get_final_incomplete_bar_empty() {
767 let mut processor = StreamingProcessor::new(25).unwrap();
768
769 let bar = processor.get_final_incomplete_bar();
771 assert!(bar.is_none(), "No incomplete bar before any trades");
772 }
773
774 #[test]
775 fn test_bars_per_aggtrade_ratio() {
776 let metrics = MetricsSummary {
777 trades_processed: 200,
778 bars_generated: 10,
779 errors_total: 0,
780 backpressure_events: 0,
781 circuit_breaker_trips: 0,
782 memory_usage_bytes: 0,
783 };
784
785 assert_eq!(metrics.bars_per_aggtrade(), 0.05);
786 assert_eq!(metrics.error_rate(), 0.0);
787 }
788}