rangebar_streaming/
replay_buffer.rs

1//! Replay buffer for storing and replaying recent trade data
2//!
3//! This module provides a circular buffer that stores recent trades and allows
4//! replaying them at different speeds for testing and analysis.
5
6use rangebar_core::AggTrade;
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10use tokio::time::sleep;
11use tokio_stream::Stream;
12
13/// A circular buffer that stores recent trades and provides replay functionality
14#[derive(Debug, Clone)]
15pub struct ReplayBuffer {
16    inner: Arc<Mutex<ReplayBufferInner>>,
17}
18
19#[derive(Debug)]
20struct ReplayBufferInner {
21    capacity: Duration,
22    trades: VecDeque<AggTrade>,
23    start_time: Option<Instant>,
24}
25
26impl ReplayBuffer {
27    /// Create a new replay buffer with the specified capacity
28    pub fn new(capacity: Duration) -> Self {
29        Self {
30            inner: Arc::new(Mutex::new(ReplayBufferInner {
31                capacity,
32                trades: VecDeque::new(),
33                start_time: None,
34            })),
35        }
36    }
37
38    /// Add a new trade to the buffer
39    pub fn push(&self, trade: AggTrade) {
40        let mut inner = self
41            .inner
42            .lock()
43            .unwrap_or_else(|poisoned| poisoned.into_inner());
44
45        // Set start time on first trade
46        if inner.start_time.is_none() {
47            inner.start_time = Some(Instant::now());
48        }
49
50        // Remove old trades beyond capacity (using microseconds)
51        let cutoff_timestamp = trade.timestamp - (inner.capacity.as_micros() as i64);
52
53        while let Some(front_trade) = inner.trades.front() {
54            if front_trade.timestamp < cutoff_timestamp {
55                inner.trades.pop_front();
56            } else {
57                break;
58            }
59        }
60
61        inner.trades.push_back(trade);
62    }
63
64    /// Get the number of trades currently in the buffer
65    pub fn len(&self) -> usize {
66        self.inner
67            .lock()
68            .unwrap_or_else(|poisoned| poisoned.into_inner())
69            .trades
70            .len()
71    }
72
73    /// Check if the buffer is empty
74    pub fn is_empty(&self) -> bool {
75        self.inner
76            .lock()
77            .unwrap_or_else(|poisoned| poisoned.into_inner())
78            .trades
79            .is_empty()
80    }
81
82    /// Get the time span of trades in the buffer
83    pub fn time_span(&self) -> Option<Duration> {
84        let inner = self
85            .inner
86            .lock()
87            .unwrap_or_else(|poisoned| poisoned.into_inner());
88        if let (Some(first), Some(last)) = (inner.trades.front(), inner.trades.back()) {
89            let span_microseconds = last.timestamp - first.timestamp;
90            if span_microseconds > 0 {
91                Some(Duration::from_micros(span_microseconds as u64))
92            } else {
93                None
94            }
95        } else {
96            None
97        }
98    }
99
100    /// Get trades from the buffer starting from N minutes ago
101    pub fn get_trades_from(&self, minutes_ago: u32) -> Vec<AggTrade> {
102        let inner = self
103            .inner
104            .lock()
105            .unwrap_or_else(|poisoned| poisoned.into_inner());
106
107        // Calculate cutoff timestamp
108        let latest_timestamp = match inner.trades.back() {
109            Some(trade) => trade.timestamp,
110            None => return Vec::new(),
111        };
112        let cutoff_timestamp = latest_timestamp - (minutes_ago as i64 * 60 * 1000);
113
114        inner
115            .trades
116            .iter()
117            .filter(|trade| trade.timestamp >= cutoff_timestamp)
118            .cloned()
119            .collect()
120    }
121
122    /// Create a replay stream that emits trades at the specified speed
123    pub fn replay_from(&self, minutes_ago: u32, speed_multiplier: f32) -> ReplayStream {
124        let trades = self.get_trades_from(minutes_ago);
125        ReplayStream::new(trades, speed_multiplier)
126    }
127
128    /// Get statistics about the buffer
129    pub fn stats(&self) -> ReplayBufferStats {
130        let inner = self
131            .inner
132            .lock()
133            .unwrap_or_else(|poisoned| poisoned.into_inner());
134
135        let (first_timestamp, last_timestamp) =
136            if let (Some(first), Some(last)) = (inner.trades.front(), inner.trades.back()) {
137                (Some(first.timestamp), Some(last.timestamp))
138            } else {
139                (None, None)
140            };
141
142        ReplayBufferStats {
143            capacity: inner.capacity,
144            trade_count: inner.trades.len(),
145            first_timestamp,
146            last_timestamp,
147            memory_usage_bytes: inner.trades.len() * std::mem::size_of::<AggTrade>(),
148        }
149    }
150}
151
152/// Statistics about the replay buffer
153#[derive(Debug, Clone)]
154pub struct ReplayBufferStats {
155    pub capacity: Duration,
156    pub trade_count: usize,
157    pub first_timestamp: Option<i64>,
158    pub last_timestamp: Option<i64>,
159    pub memory_usage_bytes: usize,
160}
161
162/// A stream that replays trades at a specified speed
163pub struct ReplayStream {
164    trades: Vec<AggTrade>,
165    current_index: usize,
166    speed_multiplier: f32,
167    base_timestamp: Option<i64>,
168    start_time: Option<Instant>,
169}
170
171impl ReplayStream {
172    /// Create a new replay stream
173    pub fn new(trades: Vec<AggTrade>, speed_multiplier: f32) -> Self {
174        let base_timestamp = trades.first().map(|t| t.timestamp);
175
176        Self {
177            trades,
178            current_index: 0,
179            speed_multiplier: speed_multiplier.max(0.1), // Minimum 0.1x speed
180            base_timestamp,
181            start_time: None,
182        }
183    }
184
185    /// Set the replay speed
186    pub fn set_speed(&mut self, speed_multiplier: f32) {
187        self.speed_multiplier = speed_multiplier.max(0.1);
188    }
189
190    /// Get the current replay speed
191    pub fn speed(&self) -> f32 {
192        self.speed_multiplier
193    }
194
195    /// Get the number of remaining trades
196    pub fn remaining(&self) -> usize {
197        self.trades.len().saturating_sub(self.current_index)
198    }
199
200    /// Get the total number of trades
201    pub fn total(&self) -> usize {
202        self.trades.len()
203    }
204
205    /// Get the progress as a percentage (0.0 to 1.0)
206    pub fn progress(&self) -> f32 {
207        if self.trades.is_empty() {
208            1.0
209        } else {
210            self.current_index as f32 / self.trades.len() as f32
211        }
212    }
213}
214
215impl Stream for ReplayStream {
216    type Item = AggTrade;
217
218    fn poll_next(
219        mut self: std::pin::Pin<&mut Self>,
220        cx: &mut std::task::Context<'_>,
221    ) -> std::task::Poll<Option<Self::Item>> {
222        // Check if we have more trades
223        if self.current_index >= self.trades.len() {
224            return std::task::Poll::Ready(None);
225        }
226
227        // Initialize start time if this is the first trade
228        if self.start_time.is_none() {
229            let current_trade = self.trades[self.current_index].clone();
230            self.start_time = Some(Instant::now());
231            self.current_index += 1;
232            return std::task::Poll::Ready(Some(current_trade));
233        }
234
235        let current_trade = &self.trades[self.current_index];
236
237        // Calculate when this trade should be emitted based on timestamp differences
238        if let (Some(base_timestamp), Some(start_time)) = (self.base_timestamp, self.start_time) {
239            let time_diff_microseconds = current_trade.timestamp - base_timestamp;
240            let real_time_diff = Duration::from_micros(time_diff_microseconds as u64);
241            let scaled_time_diff = Duration::from_micros(
242                (real_time_diff.as_micros() as f64 / self.speed_multiplier as f64) as u64,
243            );
244
245            let target_time = start_time + scaled_time_diff;
246            let now = Instant::now();
247
248            if now >= target_time {
249                let trade = current_trade.clone();
250                self.current_index += 1;
251                std::task::Poll::Ready(Some(trade))
252            } else {
253                // Set up a timer to wake us when it's time
254                let waker = cx.waker().clone();
255                let sleep_duration = target_time - now;
256
257                tokio::spawn(async move {
258                    sleep(sleep_duration).await;
259                    waker.wake();
260                });
261
262                std::task::Poll::Pending
263            }
264        } else {
265            // Fallback: emit immediately
266            let trade = current_trade.clone();
267            self.current_index += 1;
268            std::task::Poll::Ready(Some(trade))
269        }
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use futures::StreamExt;
277    use rangebar_core::FixedPoint;
278
279    fn create_test_trade(id: i64, timestamp: i64, price: f64) -> AggTrade {
280        AggTrade {
281            agg_trade_id: id,
282            price: FixedPoint::from_str(&price.to_string()).unwrap(),
283            volume: FixedPoint::from_str("1.0").unwrap(),
284            first_trade_id: id,
285            last_trade_id: id,
286            timestamp,
287            is_buyer_maker: false,
288            is_best_match: None,
289        }
290    }
291
292    #[test]
293    fn test_replay_buffer_capacity() {
294        let buffer = ReplayBuffer::new(Duration::from_secs(60)); // 1 minute capacity
295
296        let base_time = 1_704_067_200_000_000_i64; // 2024-01-01 00:00:00 in microseconds
297
298        // Add trades spanning 2 minutes (1 trade per second in microseconds)
299        for i in 0..120 {
300            let trade = create_test_trade(i, base_time + (i * 1_000_000), 50000.0); // 1 second intervals in microseconds
301            buffer.push(trade);
302        }
303
304        // Should only keep the last 60 seconds worth
305        let stats = buffer.stats();
306        assert!(
307            stats.trade_count <= 120,
308            "Expected <= 120 trades, got {}",
309            stats.trade_count
310        );
311
312        let trades = buffer.get_trades_from(1); // Last 1 minute
313        assert!(!trades.is_empty());
314    }
315
316    #[test]
317    fn test_replay_buffer_time_span() {
318        let buffer = ReplayBuffer::new(Duration::from_secs(300)); // 5 minutes
319
320        // Use a more realistic timestamp (approximately 2024-01-01 in microseconds)
321        let base_time = 1_704_067_200_000_000_i64; // 2024-01-01 00:00:00 in microseconds
322
323        // Add a few trades over 1 minute (using microseconds)
324        buffer.push(create_test_trade(1, base_time, 50000.0));
325        buffer.push(create_test_trade(2, base_time + 30_000_000, 50100.0)); // 30s later in microseconds
326        buffer.push(create_test_trade(3, base_time + 60_000_000, 50200.0)); // 60s later in microseconds
327
328        let span = buffer.time_span().unwrap();
329        assert_eq!(span.as_secs(), 60);
330    }
331
332    #[tokio::test]
333    async fn test_replay_stream() {
334        // Use a more realistic timestamp (approximately 2024-01-01 in microseconds)
335        let base_time = 1_704_067_200_000_000_i64; // 2024-01-01 00:00:00 in microseconds
336        let trades = vec![
337            create_test_trade(1, base_time, 50000.0),
338            create_test_trade(2, base_time + 1_000_000, 50100.0), // 1s later in microseconds
339            create_test_trade(3, base_time + 2_000_000, 50200.0), // 2s later in microseconds
340        ];
341
342        let mut stream = ReplayStream::new(trades, 10.0); // 10x speed
343        assert_eq!(stream.total(), 3);
344        assert_eq!(stream.remaining(), 3);
345        assert_eq!(stream.progress(), 0.0);
346
347        // First trade should be immediate
348        let first = StreamExt::next(&mut stream).await;
349        assert!(first.is_some());
350        assert_eq!(first.unwrap().agg_trade_id, 1);
351    }
352
353    #[test]
354    fn test_get_trades_from_empty_buffer() {
355        let buffer = ReplayBuffer::new(Duration::from_secs(60));
356        let trades = buffer.get_trades_from(1);
357        assert_eq!(trades.len(), 0); // Should not panic
358    }
359}