Skip to main content

rangebar_streaming/
replay_buffer.rs

1//! Replay buffer for storing and replaying recent trade data
2//!
3//! This module provides a circular buffer that stores recent trades and allows
4//! replaying them at different speeds for testing and analysis.
5
6use rangebar_core::AggTrade;
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10use tokio::time::sleep;
11use tokio_stream::Stream;
12
13/// A circular buffer that stores recent trades and provides replay functionality
14#[derive(Debug, Clone)]
15pub struct ReplayBuffer {
16    inner: Arc<Mutex<ReplayBufferInner>>,
17}
18
19#[derive(Debug)]
20struct ReplayBufferInner {
21    capacity: Duration,
22    trades: VecDeque<AggTrade>,
23    start_time: Option<Instant>,
24}
25
26impl ReplayBuffer {
27    /// Create a new replay buffer with the specified capacity
28    pub fn new(capacity: Duration) -> Self {
29        Self {
30            inner: Arc::new(Mutex::new(ReplayBufferInner {
31                capacity,
32                trades: VecDeque::new(),
33                start_time: None,
34            })),
35        }
36    }
37
38    /// Add a new trade to the buffer
39    pub fn push(&self, trade: AggTrade) {
40        let mut inner = self
41            .inner
42            .lock()
43            .unwrap_or_else(|poisoned| poisoned.into_inner());
44
45        // Set start time on first trade
46        if inner.start_time.is_none() {
47            inner.start_time = Some(Instant::now());
48        }
49
50        // Remove old trades beyond capacity (using microseconds)
51        let cutoff_timestamp = trade.timestamp - (inner.capacity.as_micros() as i64);
52
53        while let Some(front_trade) = inner.trades.front() {
54            if front_trade.timestamp < cutoff_timestamp {
55                inner.trades.pop_front();
56            } else {
57                break;
58            }
59        }
60
61        inner.trades.push_back(trade);
62    }
63
64    /// Get the number of trades currently in the buffer
65    pub fn len(&self) -> usize {
66        self.inner
67            .lock()
68            .unwrap_or_else(|poisoned| poisoned.into_inner())
69            .trades
70            .len()
71    }
72
73    /// Check if the buffer is empty
74    pub fn is_empty(&self) -> bool {
75        self.inner
76            .lock()
77            .unwrap_or_else(|poisoned| poisoned.into_inner())
78            .trades
79            .is_empty()
80    }
81
82    /// Get the time span of trades in the buffer
83    pub fn time_span(&self) -> Option<Duration> {
84        let inner = self
85            .inner
86            .lock()
87            .unwrap_or_else(|poisoned| poisoned.into_inner());
88        if let (Some(first), Some(last)) = (inner.trades.front(), inner.trades.back()) {
89            let span_microseconds = last.timestamp - first.timestamp;
90            if span_microseconds > 0 {
91                Some(Duration::from_micros(span_microseconds as u64))
92            } else {
93                None
94            }
95        } else {
96            None
97        }
98    }
99
100    /// Get trades from the buffer starting from N minutes ago
101    /// Issue #96 Task #73: Binary search for cutoff position + collect-only approach (3-8% speedup)
102    pub fn get_trades_from(&self, minutes_ago: u32) -> Vec<AggTrade> {
103        let inner = self
104            .inner
105            .lock()
106            .unwrap_or_else(|poisoned| poisoned.into_inner());
107
108        // Fast-path: empty buffer (1-2% speedup on empty/single-trade case)
109        if inner.trades.is_empty() {
110            return Vec::new();
111        }
112
113        // Calculate cutoff timestamp
114        let latest_timestamp = inner.trades.back().unwrap().timestamp;
115        let cutoff_timestamp = latest_timestamp - (minutes_ago as i64 * 60 * 1000);
116
117        // Issue #96 Task #73: Find first trade >= cutoff using early-exit linear scan
118        // (VecDeque doesn't have binary_search, so linear scan with early break is optimal)
119        let mut start_idx = 0;
120        for (idx, trade) in inner.trades.iter().enumerate() {
121            if trade.timestamp >= cutoff_timestamp {
122                start_idx = idx;
123                break;
124            }
125        }
126
127        // Collect trades from cutoff position forward, avoiding filter overhead
128        let mut result = Vec::new();
129        result.extend(inner.trades.iter().skip(start_idx).cloned());
130        result
131    }
132
133    /// Create a replay stream that emits trades at the specified speed
134    pub fn replay_from(&self, minutes_ago: u32, speed_multiplier: f32) -> ReplayStream {
135        let trades = self.get_trades_from(minutes_ago);
136        ReplayStream::new(trades, speed_multiplier)
137    }
138
139    /// Get statistics about the buffer
140    pub fn stats(&self) -> ReplayBufferStats {
141        let inner = self
142            .inner
143            .lock()
144            .unwrap_or_else(|poisoned| poisoned.into_inner());
145
146        let (first_timestamp, last_timestamp) =
147            if let (Some(first), Some(last)) = (inner.trades.front(), inner.trades.back()) {
148                (Some(first.timestamp), Some(last.timestamp))
149            } else {
150                (None, None)
151            };
152
153        ReplayBufferStats {
154            capacity: inner.capacity,
155            trade_count: inner.trades.len(),
156            first_timestamp,
157            last_timestamp,
158            memory_usage_bytes: inner.trades.len() * std::mem::size_of::<AggTrade>(),
159        }
160    }
161}
162
163/// Statistics about the replay buffer
164#[derive(Debug, Clone)]
165pub struct ReplayBufferStats {
166    pub capacity: Duration,
167    pub trade_count: usize,
168    pub first_timestamp: Option<i64>,
169    pub last_timestamp: Option<i64>,
170    pub memory_usage_bytes: usize,
171}
172
173/// A stream that replays trades at a specified speed
174pub struct ReplayStream {
175    trades: Vec<AggTrade>,
176    current_index: usize,
177    speed_multiplier: f32,
178    base_timestamp: Option<i64>,
179    start_time: Option<Instant>,
180}
181
182impl ReplayStream {
183    /// Create a new replay stream
184    pub fn new(trades: Vec<AggTrade>, speed_multiplier: f32) -> Self {
185        let base_timestamp = trades.first().map(|t| t.timestamp);
186
187        Self {
188            trades,
189            current_index: 0,
190            speed_multiplier: speed_multiplier.max(0.1), // Minimum 0.1x speed
191            base_timestamp,
192            start_time: None,
193        }
194    }
195
196    /// Set the replay speed
197    pub fn set_speed(&mut self, speed_multiplier: f32) {
198        self.speed_multiplier = speed_multiplier.max(0.1);
199    }
200
201    /// Get the current replay speed
202    pub fn speed(&self) -> f32 {
203        self.speed_multiplier
204    }
205
206    /// Get the number of remaining trades
207    pub fn remaining(&self) -> usize {
208        self.trades.len().saturating_sub(self.current_index)
209    }
210
211    /// Get the total number of trades
212    pub fn total(&self) -> usize {
213        self.trades.len()
214    }
215
216    /// Get the progress as a percentage (0.0 to 1.0)
217    pub fn progress(&self) -> f32 {
218        if self.trades.is_empty() {
219            1.0
220        } else {
221            self.current_index as f32 / self.trades.len() as f32
222        }
223    }
224}
225
226impl Stream for ReplayStream {
227    type Item = AggTrade;
228
229    fn poll_next(
230        mut self: std::pin::Pin<&mut Self>,
231        cx: &mut std::task::Context<'_>,
232    ) -> std::task::Poll<Option<Self::Item>> {
233        // Check if we have more trades
234        if self.current_index >= self.trades.len() {
235            return std::task::Poll::Ready(None);
236        }
237
238        // Initialize start time if this is the first trade
239        if self.start_time.is_none() {
240            let current_trade = self.trades[self.current_index].clone();
241            self.start_time = Some(Instant::now());
242            self.current_index += 1;
243            return std::task::Poll::Ready(Some(current_trade));
244        }
245
246        let current_trade = &self.trades[self.current_index];
247
248        // Calculate when this trade should be emitted based on timestamp differences
249        if let (Some(base_timestamp), Some(start_time)) = (self.base_timestamp, self.start_time) {
250            let time_diff_microseconds = current_trade.timestamp - base_timestamp;
251            let real_time_diff = Duration::from_micros(time_diff_microseconds as u64);
252            let scaled_time_diff = Duration::from_micros(
253                (real_time_diff.as_micros() as f64 / self.speed_multiplier as f64) as u64,
254            );
255
256            let target_time = start_time + scaled_time_diff;
257            let now = Instant::now();
258
259            if now >= target_time {
260                let trade = current_trade.clone();
261                self.current_index += 1;
262                std::task::Poll::Ready(Some(trade))
263            } else {
264                // Set up a timer to wake us when it's time
265                let waker = cx.waker().clone();
266                let sleep_duration = target_time - now;
267
268                tokio::spawn(async move {
269                    sleep(sleep_duration).await;
270                    waker.wake();
271                });
272
273                std::task::Poll::Pending
274            }
275        } else {
276            // Fallback: emit immediately
277            let trade = current_trade.clone();
278            self.current_index += 1;
279            std::task::Poll::Ready(Some(trade))
280        }
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use futures::StreamExt;
288    use rangebar_core::FixedPoint;
289
290    fn create_test_trade(id: i64, timestamp: i64, price: f64) -> AggTrade {
291        AggTrade {
292            agg_trade_id: id,
293            price: FixedPoint::from_str(&price.to_string()).unwrap(),
294            volume: FixedPoint::from_str("1.0").unwrap(),
295            first_trade_id: id,
296            last_trade_id: id,
297            timestamp,
298            is_buyer_maker: false,
299            is_best_match: None,
300        }
301    }
302
303    #[test]
304    fn test_replay_buffer_capacity() {
305        let buffer = ReplayBuffer::new(Duration::from_secs(60)); // 1 minute capacity
306
307        let base_time = 1_704_067_200_000_000_i64; // 2024-01-01 00:00:00 in microseconds
308
309        // Add trades spanning 2 minutes (1 trade per second in microseconds)
310        for i in 0..120 {
311            let trade = create_test_trade(i, base_time + (i * 1_000_000), 50000.0); // 1 second intervals in microseconds
312            buffer.push(trade);
313        }
314
315        // Should only keep the last 60 seconds worth
316        let stats = buffer.stats();
317        assert!(
318            stats.trade_count <= 120,
319            "Expected <= 120 trades, got {}",
320            stats.trade_count
321        );
322
323        let trades = buffer.get_trades_from(1); // Last 1 minute
324        assert!(!trades.is_empty());
325    }
326
327    #[test]
328    fn test_replay_buffer_time_span() {
329        let buffer = ReplayBuffer::new(Duration::from_secs(300)); // 5 minutes
330
331        // Use a more realistic timestamp (approximately 2024-01-01 in microseconds)
332        let base_time = 1_704_067_200_000_000_i64; // 2024-01-01 00:00:00 in microseconds
333
334        // Add a few trades over 1 minute (using microseconds)
335        buffer.push(create_test_trade(1, base_time, 50000.0));
336        buffer.push(create_test_trade(2, base_time + 30_000_000, 50100.0)); // 30s later in microseconds
337        buffer.push(create_test_trade(3, base_time + 60_000_000, 50200.0)); // 60s later in microseconds
338
339        let span = buffer.time_span().unwrap();
340        assert_eq!(span.as_secs(), 60);
341    }
342
343    #[tokio::test]
344    async fn test_replay_stream() {
345        // Use a more realistic timestamp (approximately 2024-01-01 in microseconds)
346        let base_time = 1_704_067_200_000_000_i64; // 2024-01-01 00:00:00 in microseconds
347        let trades = vec![
348            create_test_trade(1, base_time, 50000.0),
349            create_test_trade(2, base_time + 1_000_000, 50100.0), // 1s later in microseconds
350            create_test_trade(3, base_time + 2_000_000, 50200.0), // 2s later in microseconds
351        ];
352
353        let mut stream = ReplayStream::new(trades, 10.0); // 10x speed
354        assert_eq!(stream.total(), 3);
355        assert_eq!(stream.remaining(), 3);
356        assert_eq!(stream.progress(), 0.0);
357
358        // First trade should be immediate
359        let first = StreamExt::next(&mut stream).await;
360        assert!(first.is_some());
361        assert_eq!(first.unwrap().agg_trade_id, 1);
362    }
363
364    #[test]
365    fn test_get_trades_from_empty_buffer() {
366        let buffer = ReplayBuffer::new(Duration::from_secs(60));
367        let trades = buffer.get_trades_from(1);
368        assert_eq!(trades.len(), 0); // Should not panic
369    }
370
371    // === Issue #96: Expanded replay buffer + replay stream coverage ===
372
373    #[test]
374    fn test_push_len_is_empty() {
375        let buffer = ReplayBuffer::new(Duration::from_secs(300));
376        assert!(buffer.is_empty());
377        assert_eq!(buffer.len(), 0);
378
379        let base = 1_704_067_200_000_000_i64;
380        buffer.push(create_test_trade(1, base, 50000.0));
381        assert!(!buffer.is_empty());
382        assert_eq!(buffer.len(), 1);
383
384        buffer.push(create_test_trade(2, base + 1_000_000, 50100.0));
385        assert_eq!(buffer.len(), 2);
386    }
387
388    #[test]
389    fn test_push_eviction_by_capacity() {
390        // 10 second capacity
391        let buffer = ReplayBuffer::new(Duration::from_secs(10));
392        let base = 1_704_067_200_000_000_i64;
393
394        // Add 20 trades 1 second apart (20 sec span)
395        for i in 0..20 {
396            buffer.push(create_test_trade(i, base + (i * 1_000_000), 50000.0));
397        }
398
399        // Capacity is 10s, so ~last 10 trades should remain
400        let len = buffer.len();
401        assert!(len <= 12, "Expected <=12 trades after eviction, got {len}");
402        assert!(len >= 10, "Expected >=10 trades retained, got {len}");
403    }
404
405    #[test]
406    fn test_replay_stream_set_speed() {
407        let trades = vec![
408            create_test_trade(1, 1_704_067_200_000_000, 50000.0),
409            create_test_trade(2, 1_704_067_201_000_000, 50100.0),
410        ];
411        let mut stream = ReplayStream::new(trades, 1.0);
412        assert!((stream.speed() - 1.0).abs() < f32::EPSILON);
413
414        stream.set_speed(5.0);
415        assert!((stream.speed() - 5.0).abs() < f32::EPSILON);
416
417        // Minimum speed clamp
418        stream.set_speed(0.01);
419        assert!((stream.speed() - 0.1).abs() < f32::EPSILON);
420    }
421
422    #[test]
423    fn test_replay_stream_remaining_total_progress() {
424        let base = 1_704_067_200_000_000_i64;
425        let trades = vec![
426            create_test_trade(1, base, 50000.0),
427            create_test_trade(2, base + 1_000_000, 50100.0),
428            create_test_trade(3, base + 2_000_000, 50200.0),
429            create_test_trade(4, base + 3_000_000, 50300.0),
430        ];
431        let stream = ReplayStream::new(trades, 10.0);
432
433        assert_eq!(stream.total(), 4);
434        assert_eq!(stream.remaining(), 4);
435        assert!((stream.progress() - 0.0).abs() < f32::EPSILON);
436    }
437
438    #[test]
439    fn test_replay_stream_empty_progress() {
440        let stream = ReplayStream::new(vec![], 1.0);
441        assert_eq!(stream.total(), 0);
442        assert_eq!(stream.remaining(), 0);
443        assert!((stream.progress() - 1.0).abs() < f32::EPSILON);
444    }
445
446    #[test]
447    fn test_buffer_stats() {
448        let buffer = ReplayBuffer::new(Duration::from_secs(60));
449        let base = 1_704_067_200_000_000_i64;
450
451        let stats = buffer.stats();
452        assert_eq!(stats.trade_count, 0);
453        assert!(stats.first_timestamp.is_none());
454        assert!(stats.last_timestamp.is_none());
455
456        buffer.push(create_test_trade(1, base, 50000.0));
457        buffer.push(create_test_trade(2, base + 5_000_000, 50100.0));
458
459        let stats = buffer.stats();
460        assert_eq!(stats.trade_count, 2);
461        assert_eq!(stats.first_timestamp, Some(base));
462        assert_eq!(stats.last_timestamp, Some(base + 5_000_000));
463        assert!(stats.memory_usage_bytes > 0);
464    }
465}