1use futures::Stream;
2use rangebar_core::processor::ExportRangeBarProcessor;
10use rangebar_core::{AggTrade, RangeBar};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::task::{Context, Poll};
15use tokio::sync::mpsc;
16use tokio::time::{Duration, Instant};
17
18#[derive(Debug, Clone)]
20pub struct StreamingProcessorConfig {
21 pub trade_channel_capacity: usize,
23 pub bar_channel_capacity: usize,
25 pub memory_threshold_bytes: usize,
27 pub backpressure_timeout: Duration,
29 pub circuit_breaker_threshold: f64,
31 pub circuit_breaker_timeout: Duration,
33}
34
35impl Default for StreamingProcessorConfig {
36 fn default() -> Self {
37 Self {
38 trade_channel_capacity: 5_000, bar_channel_capacity: 100, memory_threshold_bytes: 100_000_000, backpressure_timeout: Duration::from_millis(100),
42 circuit_breaker_threshold: 0.5, circuit_breaker_timeout: Duration::from_secs(30),
44 }
45 }
46}
47
48pub struct StreamingProcessor {
50 processor: ExportRangeBarProcessor,
52
53 #[allow(dead_code)]
55 threshold_decimal_bps: u32,
56
57 trade_sender: Option<mpsc::Sender<AggTrade>>,
59 trade_receiver: mpsc::Receiver<AggTrade>,
60
61 bar_sender: mpsc::Sender<RangeBar>,
63 bar_receiver: Option<mpsc::Receiver<RangeBar>>,
64
65 config: StreamingProcessorConfig,
67
68 metrics: Arc<StreamingMetrics>,
70
71 circuit_breaker: CircuitBreaker,
73}
74
75#[derive(Debug)]
77struct CircuitBreaker {
78 state: CircuitBreakerState,
79 failure_count: u64,
80 success_count: u64,
81 last_failure_time: Option<Instant>,
82 threshold: f64,
83 timeout: Duration,
84}
85
86#[derive(Debug, PartialEq)]
87enum CircuitBreakerState {
88 Closed,
89 Open,
90 HalfOpen,
91}
92
93#[derive(Debug, Default)]
95pub struct StreamingMetrics {
96 pub trades_processed: AtomicU64,
97 pub bars_generated: AtomicU64,
98 pub errors_total: AtomicU64,
99 pub backpressure_events: AtomicU64,
100 pub circuit_breaker_trips: AtomicU64,
101 pub memory_usage_bytes: AtomicU64,
102}
103
104impl StreamingProcessor {
105 pub fn new(
107 threshold_decimal_bps: u32,
108 ) -> Result<Self, rangebar_core::processor::ProcessingError> {
109 Self::with_config(threshold_decimal_bps, StreamingProcessorConfig::default())
110 }
111
112 pub fn with_config(
114 threshold_decimal_bps: u32,
115 config: StreamingProcessorConfig,
116 ) -> Result<Self, rangebar_core::processor::ProcessingError> {
117 let (trade_sender, trade_receiver) = mpsc::channel(config.trade_channel_capacity);
118 let (bar_sender, bar_receiver) = mpsc::channel(config.bar_channel_capacity);
119
120 let circuit_breaker_threshold = config.circuit_breaker_threshold;
121 let circuit_breaker_timeout = config.circuit_breaker_timeout;
122
123 Ok(Self {
124 processor: ExportRangeBarProcessor::new(threshold_decimal_bps)?,
125 threshold_decimal_bps,
126 trade_sender: Some(trade_sender),
127 trade_receiver,
128 bar_sender,
129 bar_receiver: Some(bar_receiver),
130 config,
131 metrics: Arc::new(StreamingMetrics::default()),
132 circuit_breaker: CircuitBreaker::new(
133 circuit_breaker_threshold,
134 circuit_breaker_timeout,
135 ),
136 })
137 }
138
139 pub fn trade_sender(&mut self) -> Option<mpsc::Sender<AggTrade>> {
141 self.trade_sender.take()
142 }
143
144 pub fn bar_receiver(&mut self) -> Option<mpsc::Receiver<RangeBar>> {
146 self.bar_receiver.take()
147 }
148
149 pub async fn start_processing(&mut self) -> Result<(), StreamingError> {
151 loop {
152 if !self.circuit_breaker.can_process() {
154 tokio::time::sleep(Duration::from_millis(100)).await;
155 continue;
156 }
157
158 let trade = match tokio::time::timeout(
160 self.config.backpressure_timeout,
161 self.trade_receiver.recv(),
162 )
163 .await
164 {
165 Ok(Some(trade)) => trade,
166 Ok(None) => {
167 if let Some(final_bar) = self.processor.get_incomplete_bar()
169 && let Err(e) = self.send_bar_with_backpressure(final_bar).await
170 {
171 println!("Failed to send final incomplete bar: {:?}", e);
172 }
173 break;
174 }
175 Err(_) => continue, };
177
178 match self.process_single_trade(trade).await {
180 Ok(bar_opt) => {
181 self.circuit_breaker.record_success();
182
183 if let Some(bar) = bar_opt
185 && let Err(e) = self.send_bar_with_backpressure(bar).await
186 {
187 println!("Failed to send bar: {:?}", e);
188 self.circuit_breaker.record_failure();
189 }
190 }
191 Err(e) => {
192 println!("Trade processing error: {:?}", e);
193 self.circuit_breaker.record_failure();
194 self.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
195 }
196 }
197 }
198
199 Ok(())
200 }
201
202 async fn process_single_trade(
204 &mut self,
205 trade: AggTrade,
206 ) -> Result<Option<RangeBar>, StreamingError> {
207 self.metrics
209 .trades_processed
210 .fetch_add(1, Ordering::Relaxed);
211
212 self.processor.process_trades_continuously(&[trade]);
214
215 let mut completed_bars = self.processor.get_all_completed_bars();
217
218 if !completed_bars.is_empty() {
219 let completed_bar = completed_bars.remove(0);
222
223 if !completed_bars.is_empty() {
225 println!(
226 "Warning: {} additional bars completed, dropping for bounded memory",
227 completed_bars.len()
228 );
229 self.metrics
230 .backpressure_events
231 .fetch_add(completed_bars.len() as u64, Ordering::Relaxed);
232 }
233
234 self.metrics.bars_generated.fetch_add(1, Ordering::Relaxed);
235 Ok(Some(completed_bar))
236 } else {
237 Ok(None)
238 }
239 }
240
241 async fn send_bar_with_backpressure(&self, bar: RangeBar) -> Result<(), StreamingError> {
243 match self.bar_sender.try_send(bar.clone()) {
245 Ok(()) => Ok(()),
246 Err(mpsc::error::TrySendError::Full(_)) => {
247 println!("Bar channel full, applying backpressure");
249 self.metrics
250 .backpressure_events
251 .fetch_add(1, Ordering::Relaxed);
252
253 self.bar_sender
255 .send(bar)
256 .await
257 .map_err(|_| StreamingError::ChannelClosed)
258 }
259 Err(mpsc::error::TrySendError::Closed(_)) => Err(StreamingError::ChannelClosed),
260 }
261 }
262
263 pub fn metrics(&self) -> &StreamingMetrics {
265 &self.metrics
266 }
267
268 pub fn get_final_incomplete_bar(&mut self) -> Option<RangeBar> {
270 self.processor.get_incomplete_bar()
271 }
272
273 pub fn check_memory_usage(&self) -> bool {
275 let current_usage = self.metrics.memory_usage_bytes.load(Ordering::Relaxed);
276 current_usage < self.config.memory_threshold_bytes as u64
277 }
278}
279
280impl CircuitBreaker {
281 fn new(threshold: f64, timeout: Duration) -> Self {
282 Self {
283 state: CircuitBreakerState::Closed,
284 failure_count: 0,
285 success_count: 0,
286 last_failure_time: None,
287 threshold,
288 timeout,
289 }
290 }
291
292 fn can_process(&mut self) -> bool {
293 match self.state {
294 CircuitBreakerState::Closed => true,
295 CircuitBreakerState::Open => {
296 if let Some(last_failure) = self.last_failure_time {
297 if last_failure.elapsed() > self.timeout {
298 self.state = CircuitBreakerState::HalfOpen;
299 true
300 } else {
301 false
302 }
303 } else {
304 true
305 }
306 }
307 CircuitBreakerState::HalfOpen => true,
308 }
309 }
310
311 fn record_success(&mut self) {
312 self.success_count += 1;
313
314 if self.state == CircuitBreakerState::HalfOpen {
315 self.state = CircuitBreakerState::Closed;
317 self.failure_count = 0;
318 }
319 }
320
321 fn record_failure(&mut self) {
322 self.failure_count += 1;
323 self.last_failure_time = Some(Instant::now());
324
325 let total_requests = self.failure_count + self.success_count;
326 if total_requests >= 10 {
327 let failure_rate = self.failure_count as f64 / total_requests as f64;
329
330 if failure_rate >= self.threshold {
331 self.state = CircuitBreakerState::Open;
332 }
333 }
334 }
335}
336
337pub struct RangeBarStream {
339 receiver: mpsc::Receiver<RangeBar>,
340}
341
342impl RangeBarStream {
343 pub fn new(receiver: mpsc::Receiver<RangeBar>) -> Self {
344 Self { receiver }
345 }
346}
347
348impl Stream for RangeBarStream {
349 type Item = Result<RangeBar, StreamingError>;
350
351 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352 match self.receiver.poll_recv(cx) {
353 Poll::Ready(Some(bar)) => Poll::Ready(Some(Ok(bar))),
354 Poll::Ready(None) => Poll::Ready(None),
355 Poll::Pending => Poll::Pending,
356 }
357 }
358}
359
360#[derive(Debug, thiserror::Error)]
362pub enum StreamingError {
363 #[error("Channel closed")]
364 ChannelClosed,
365
366 #[error("Backpressure timeout")]
367 BackpressureTimeout,
368
369 #[error("Circuit breaker open")]
370 CircuitBreakerOpen,
371
372 #[error("Memory threshold exceeded")]
373 MemoryThresholdExceeded,
374
375 #[error("Processing error: {0}")]
376 ProcessingError(String),
377}
378
379impl StreamingMetrics {
380 pub fn summary(&self) -> MetricsSummary {
382 MetricsSummary {
383 trades_processed: self.trades_processed.load(Ordering::Relaxed),
384 bars_generated: self.bars_generated.load(Ordering::Relaxed),
385 errors_total: self.errors_total.load(Ordering::Relaxed),
386 backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
387 circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
388 memory_usage_bytes: self.memory_usage_bytes.load(Ordering::Relaxed),
389 }
390 }
391}
392
393#[derive(Debug, Clone)]
395pub struct MetricsSummary {
396 pub trades_processed: u64,
397 pub bars_generated: u64,
398 pub errors_total: u64,
399 pub backpressure_events: u64,
400 pub circuit_breaker_trips: u64,
401 pub memory_usage_bytes: u64,
402}
403
404impl MetricsSummary {
405 pub fn bars_per_aggtrade(&self) -> f64 {
407 if self.trades_processed > 0 {
408 self.bars_generated as f64 / self.trades_processed as f64
409 } else {
410 0.0
411 }
412 }
413
414 pub fn error_rate(&self) -> f64 {
416 if self.trades_processed > 0 {
417 self.errors_total as f64 / self.trades_processed as f64
418 } else {
419 0.0
420 }
421 }
422
423 pub fn memory_usage_mb(&self) -> f64 {
425 self.memory_usage_bytes as f64 / 1_000_000.0
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use rangebar_core::FixedPoint;
433
434 fn create_test_trade(id: u64, price: f64, timestamp: u64) -> AggTrade {
435 let price_str = format!("{:.8}", price);
436 AggTrade {
437 agg_trade_id: id as i64,
438 price: FixedPoint::from_str(&price_str).unwrap(),
439 volume: FixedPoint::from_str("1.0").unwrap(),
440 first_trade_id: id as i64,
441 last_trade_id: id as i64,
442 timestamp: timestamp as i64,
443 is_buyer_maker: false,
444 is_best_match: None,
445 }
446 }
447
448 #[tokio::test]
449 async fn test_bounded_memory_streaming() {
450 let mut processor = StreamingProcessor::new(25).unwrap(); let initial_metrics = processor.metrics().summary();
454
455 for i in 0..1000 {
457 let trade = create_test_trade(i, 23000.0 + (i as f64), 1659312000000 + i);
458 if let Ok(bar_opt) = processor.process_single_trade(trade).await {
459 assert!(bar_opt.is_none() || bar_opt.is_some());
461 }
462 }
463
464 let final_metrics = processor.metrics().summary();
465 assert!(final_metrics.trades_processed >= initial_metrics.trades_processed);
466 assert!(final_metrics.trades_processed <= 1000);
467 }
468
469 #[tokio::test]
470 async fn test_circuit_breaker() {
471 let mut circuit_breaker = CircuitBreaker::new(0.5, Duration::from_millis(100));
472
473 assert!(circuit_breaker.can_process());
475
476 for _ in 0..20 {
478 circuit_breaker.record_failure();
479 }
480
481 assert_eq!(circuit_breaker.state, CircuitBreakerState::Open);
483 assert!(!circuit_breaker.can_process());
484
485 tokio::time::sleep(Duration::from_millis(150)).await;
487
488 assert!(circuit_breaker.can_process());
490
491 circuit_breaker.record_success();
493
494 assert_eq!(circuit_breaker.state, CircuitBreakerState::Closed);
496 }
497
498 #[test]
499 fn test_metrics_calculations() {
500 let metrics = MetricsSummary {
501 trades_processed: 1000,
502 bars_generated: 50,
503 errors_total: 5,
504 backpressure_events: 2,
505 circuit_breaker_trips: 1,
506 memory_usage_bytes: 50_000_000,
507 };
508
509 assert_eq!(metrics.bars_per_aggtrade(), 0.05);
510 assert_eq!(metrics.error_rate(), 0.005);
511 assert_eq!(metrics.memory_usage_mb(), 50.0);
512 }
513}