1use futures::Stream;
2use rangebar_core::processor::ExportRangeBarProcessor;
10use rangebar_core::{AggTrade, RangeBar};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::task::{Context, Poll};
15use tokio::sync::mpsc;
16use tokio::time::{Duration, Instant};
17
18#[derive(Debug, Clone)]
20pub struct StreamingProcessorConfig {
21 pub trade_channel_capacity: usize,
23 pub bar_channel_capacity: usize,
25 pub memory_threshold_bytes: usize,
27 pub backpressure_timeout: Duration,
29 pub circuit_breaker_threshold: f64,
31 pub circuit_breaker_timeout: Duration,
33}
34
35impl Default for StreamingProcessorConfig {
36 fn default() -> Self {
37 Self {
38 trade_channel_capacity: 5_000, bar_channel_capacity: 100, memory_threshold_bytes: 100_000_000, backpressure_timeout: Duration::from_millis(100),
42 circuit_breaker_threshold: 0.5, circuit_breaker_timeout: Duration::from_secs(30),
44 }
45 }
46}
47
48pub struct StreamingProcessor {
50 processor: ExportRangeBarProcessor,
52
53 #[allow(dead_code)]
55 threshold_bps: u32,
56
57 trade_sender: Option<mpsc::Sender<AggTrade>>,
59 trade_receiver: mpsc::Receiver<AggTrade>,
60
61 bar_sender: mpsc::Sender<RangeBar>,
63 bar_receiver: Option<mpsc::Receiver<RangeBar>>,
64
65 config: StreamingProcessorConfig,
67
68 metrics: Arc<StreamingMetrics>,
70
71 circuit_breaker: CircuitBreaker,
73}
74
75#[derive(Debug)]
77struct CircuitBreaker {
78 state: CircuitBreakerState,
79 failure_count: u64,
80 success_count: u64,
81 last_failure_time: Option<Instant>,
82 threshold: f64,
83 timeout: Duration,
84}
85
86#[derive(Debug, PartialEq)]
87enum CircuitBreakerState {
88 Closed,
89 Open,
90 HalfOpen,
91}
92
93#[derive(Debug, Default)]
95pub struct StreamingMetrics {
96 pub trades_processed: AtomicU64,
97 pub bars_generated: AtomicU64,
98 pub errors_total: AtomicU64,
99 pub backpressure_events: AtomicU64,
100 pub circuit_breaker_trips: AtomicU64,
101 pub memory_usage_bytes: AtomicU64,
102}
103
104impl StreamingProcessor {
105 pub fn new(threshold_bps: u32) -> Result<Self, rangebar_core::processor::ProcessingError> {
107 Self::with_config(threshold_bps, StreamingProcessorConfig::default())
108 }
109
110 pub fn with_config(
112 threshold_bps: u32,
113 config: StreamingProcessorConfig,
114 ) -> Result<Self, rangebar_core::processor::ProcessingError> {
115 let (trade_sender, trade_receiver) = mpsc::channel(config.trade_channel_capacity);
116 let (bar_sender, bar_receiver) = mpsc::channel(config.bar_channel_capacity);
117
118 let circuit_breaker_threshold = config.circuit_breaker_threshold;
119 let circuit_breaker_timeout = config.circuit_breaker_timeout;
120
121 Ok(Self {
122 processor: ExportRangeBarProcessor::new(threshold_bps)?,
123 threshold_bps,
124 trade_sender: Some(trade_sender),
125 trade_receiver,
126 bar_sender,
127 bar_receiver: Some(bar_receiver),
128 config,
129 metrics: Arc::new(StreamingMetrics::default()),
130 circuit_breaker: CircuitBreaker::new(
131 circuit_breaker_threshold,
132 circuit_breaker_timeout,
133 ),
134 })
135 }
136
137 pub fn trade_sender(&mut self) -> Option<mpsc::Sender<AggTrade>> {
139 self.trade_sender.take()
140 }
141
142 pub fn bar_receiver(&mut self) -> Option<mpsc::Receiver<RangeBar>> {
144 self.bar_receiver.take()
145 }
146
147 pub async fn start_processing(&mut self) -> Result<(), StreamingError> {
149 loop {
150 if !self.circuit_breaker.can_process() {
152 tokio::time::sleep(Duration::from_millis(100)).await;
153 continue;
154 }
155
156 let trade = match tokio::time::timeout(
158 self.config.backpressure_timeout,
159 self.trade_receiver.recv(),
160 )
161 .await
162 {
163 Ok(Some(trade)) => trade,
164 Ok(None) => {
165 if let Some(final_bar) = self.processor.get_incomplete_bar()
167 && let Err(e) = self.send_bar_with_backpressure(final_bar).await
168 {
169 println!("Failed to send final incomplete bar: {:?}", e);
170 }
171 break;
172 }
173 Err(_) => continue, };
175
176 match self.process_single_trade(trade).await {
178 Ok(bar_opt) => {
179 self.circuit_breaker.record_success();
180
181 if let Some(bar) = bar_opt
183 && let Err(e) = self.send_bar_with_backpressure(bar).await
184 {
185 println!("Failed to send bar: {:?}", e);
186 self.circuit_breaker.record_failure();
187 }
188 }
189 Err(e) => {
190 println!("Trade processing error: {:?}", e);
191 self.circuit_breaker.record_failure();
192 self.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
193 }
194 }
195 }
196
197 Ok(())
198 }
199
200 async fn process_single_trade(
202 &mut self,
203 trade: AggTrade,
204 ) -> Result<Option<RangeBar>, StreamingError> {
205 self.metrics
207 .trades_processed
208 .fetch_add(1, Ordering::Relaxed);
209
210 self.processor.process_trades_continuously(&[trade]);
212
213 let mut completed_bars = self.processor.get_all_completed_bars();
215
216 if !completed_bars.is_empty() {
217 let completed_bar = completed_bars.remove(0);
220
221 if !completed_bars.is_empty() {
223 println!(
224 "Warning: {} additional bars completed, dropping for bounded memory",
225 completed_bars.len()
226 );
227 self.metrics
228 .backpressure_events
229 .fetch_add(completed_bars.len() as u64, Ordering::Relaxed);
230 }
231
232 self.metrics.bars_generated.fetch_add(1, Ordering::Relaxed);
233 Ok(Some(completed_bar))
234 } else {
235 Ok(None)
236 }
237 }
238
239 async fn send_bar_with_backpressure(&self, bar: RangeBar) -> Result<(), StreamingError> {
241 match self.bar_sender.try_send(bar.clone()) {
243 Ok(()) => Ok(()),
244 Err(mpsc::error::TrySendError::Full(_)) => {
245 println!("Bar channel full, applying backpressure");
247 self.metrics
248 .backpressure_events
249 .fetch_add(1, Ordering::Relaxed);
250
251 self.bar_sender
253 .send(bar)
254 .await
255 .map_err(|_| StreamingError::ChannelClosed)
256 }
257 Err(mpsc::error::TrySendError::Closed(_)) => Err(StreamingError::ChannelClosed),
258 }
259 }
260
261 pub fn metrics(&self) -> &StreamingMetrics {
263 &self.metrics
264 }
265
266 pub fn get_final_incomplete_bar(&mut self) -> Option<RangeBar> {
268 self.processor.get_incomplete_bar()
269 }
270
271 pub fn check_memory_usage(&self) -> bool {
273 let current_usage = self.metrics.memory_usage_bytes.load(Ordering::Relaxed);
274 current_usage < self.config.memory_threshold_bytes as u64
275 }
276}
277
278impl CircuitBreaker {
279 fn new(threshold: f64, timeout: Duration) -> Self {
280 Self {
281 state: CircuitBreakerState::Closed,
282 failure_count: 0,
283 success_count: 0,
284 last_failure_time: None,
285 threshold,
286 timeout,
287 }
288 }
289
290 fn can_process(&mut self) -> bool {
291 match self.state {
292 CircuitBreakerState::Closed => true,
293 CircuitBreakerState::Open => {
294 if let Some(last_failure) = self.last_failure_time {
295 if last_failure.elapsed() > self.timeout {
296 self.state = CircuitBreakerState::HalfOpen;
297 true
298 } else {
299 false
300 }
301 } else {
302 true
303 }
304 }
305 CircuitBreakerState::HalfOpen => true,
306 }
307 }
308
309 fn record_success(&mut self) {
310 self.success_count += 1;
311
312 if self.state == CircuitBreakerState::HalfOpen {
313 self.state = CircuitBreakerState::Closed;
315 self.failure_count = 0;
316 }
317 }
318
319 fn record_failure(&mut self) {
320 self.failure_count += 1;
321 self.last_failure_time = Some(Instant::now());
322
323 let total_requests = self.failure_count + self.success_count;
324 if total_requests >= 10 {
325 let failure_rate = self.failure_count as f64 / total_requests as f64;
327
328 if failure_rate >= self.threshold {
329 self.state = CircuitBreakerState::Open;
330 }
331 }
332 }
333}
334
335pub struct RangeBarStream {
337 receiver: mpsc::Receiver<RangeBar>,
338}
339
340impl RangeBarStream {
341 pub fn new(receiver: mpsc::Receiver<RangeBar>) -> Self {
342 Self { receiver }
343 }
344}
345
346impl Stream for RangeBarStream {
347 type Item = Result<RangeBar, StreamingError>;
348
349 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
350 match self.receiver.poll_recv(cx) {
351 Poll::Ready(Some(bar)) => Poll::Ready(Some(Ok(bar))),
352 Poll::Ready(None) => Poll::Ready(None),
353 Poll::Pending => Poll::Pending,
354 }
355 }
356}
357
358#[derive(Debug, thiserror::Error)]
360pub enum StreamingError {
361 #[error("Channel closed")]
362 ChannelClosed,
363
364 #[error("Backpressure timeout")]
365 BackpressureTimeout,
366
367 #[error("Circuit breaker open")]
368 CircuitBreakerOpen,
369
370 #[error("Memory threshold exceeded")]
371 MemoryThresholdExceeded,
372
373 #[error("Processing error: {0}")]
374 ProcessingError(String),
375}
376
377impl StreamingMetrics {
378 pub fn summary(&self) -> MetricsSummary {
380 MetricsSummary {
381 trades_processed: self.trades_processed.load(Ordering::Relaxed),
382 bars_generated: self.bars_generated.load(Ordering::Relaxed),
383 errors_total: self.errors_total.load(Ordering::Relaxed),
384 backpressure_events: self.backpressure_events.load(Ordering::Relaxed),
385 circuit_breaker_trips: self.circuit_breaker_trips.load(Ordering::Relaxed),
386 memory_usage_bytes: self.memory_usage_bytes.load(Ordering::Relaxed),
387 }
388 }
389}
390
391#[derive(Debug, Clone)]
393pub struct MetricsSummary {
394 pub trades_processed: u64,
395 pub bars_generated: u64,
396 pub errors_total: u64,
397 pub backpressure_events: u64,
398 pub circuit_breaker_trips: u64,
399 pub memory_usage_bytes: u64,
400}
401
402impl MetricsSummary {
403 pub fn bars_per_aggtrade(&self) -> f64 {
405 if self.trades_processed > 0 {
406 self.bars_generated as f64 / self.trades_processed as f64
407 } else {
408 0.0
409 }
410 }
411
412 pub fn error_rate(&self) -> f64 {
414 if self.trades_processed > 0 {
415 self.errors_total as f64 / self.trades_processed as f64
416 } else {
417 0.0
418 }
419 }
420
421 pub fn memory_usage_mb(&self) -> f64 {
423 self.memory_usage_bytes as f64 / 1_000_000.0
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use rangebar_core::FixedPoint;
431
432 fn create_test_trade(id: u64, price: f64, timestamp: u64) -> AggTrade {
433 let price_str = format!("{:.8}", price);
434 AggTrade {
435 agg_trade_id: id as i64,
436 price: FixedPoint::from_str(&price_str).unwrap(),
437 volume: FixedPoint::from_str("1.0").unwrap(),
438 first_trade_id: id as i64,
439 last_trade_id: id as i64,
440 timestamp: timestamp as i64,
441 is_buyer_maker: false,
442 is_best_match: None,
443 }
444 }
445
446 #[tokio::test]
447 async fn test_bounded_memory_streaming() {
448 let mut processor = StreamingProcessor::new(25).unwrap(); let initial_metrics = processor.metrics().summary();
452
453 for i in 0..1000 {
455 let trade = create_test_trade(i, 23000.0 + (i as f64), 1659312000000 + i);
456 if let Ok(bar_opt) = processor.process_single_trade(trade).await {
457 assert!(bar_opt.is_none() || bar_opt.is_some());
459 }
460 }
461
462 let final_metrics = processor.metrics().summary();
463 assert!(final_metrics.trades_processed >= initial_metrics.trades_processed);
464 assert!(final_metrics.trades_processed <= 1000);
465 }
466
467 #[tokio::test]
468 async fn test_circuit_breaker() {
469 let mut circuit_breaker = CircuitBreaker::new(0.5, Duration::from_millis(100));
470
471 assert!(circuit_breaker.can_process());
473
474 for _ in 0..20 {
476 circuit_breaker.record_failure();
477 }
478
479 assert_eq!(circuit_breaker.state, CircuitBreakerState::Open);
481 assert!(!circuit_breaker.can_process());
482
483 tokio::time::sleep(Duration::from_millis(150)).await;
485
486 assert!(circuit_breaker.can_process());
488
489 circuit_breaker.record_success();
491
492 assert_eq!(circuit_breaker.state, CircuitBreakerState::Closed);
494 }
495
496 #[test]
497 fn test_metrics_calculations() {
498 let metrics = MetricsSummary {
499 trades_processed: 1000,
500 bars_generated: 50,
501 errors_total: 5,
502 backpressure_events: 2,
503 circuit_breaker_trips: 1,
504 memory_usage_bytes: 50_000_000,
505 };
506
507 assert_eq!(metrics.bars_per_aggtrade(), 0.05);
508 assert_eq!(metrics.error_rate(), 0.005);
509 assert_eq!(metrics.memory_usage_mb(), 50.0);
510 }
511}