1use rangebar_core::AggTrade;
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10use tokio::time::sleep;
11use tokio_stream::Stream;
12
13#[derive(Debug, Clone)]
15pub struct ReplayBuffer {
16 inner: Arc<Mutex<ReplayBufferInner>>,
17}
18
19#[derive(Debug)]
20struct ReplayBufferInner {
21 capacity: Duration,
22 trades: VecDeque<AggTrade>,
23 start_time: Option<Instant>,
24}
25
26impl ReplayBuffer {
27 pub fn new(capacity: Duration) -> Self {
29 Self {
30 inner: Arc::new(Mutex::new(ReplayBufferInner {
31 capacity,
32 trades: VecDeque::new(),
33 start_time: None,
34 })),
35 }
36 }
37
38 pub fn push(&self, trade: AggTrade) {
40 let mut inner = self
41 .inner
42 .lock()
43 .unwrap_or_else(|poisoned| poisoned.into_inner());
44
45 if inner.start_time.is_none() {
47 inner.start_time = Some(Instant::now());
48 }
49
50 let cutoff_timestamp = trade.timestamp - (inner.capacity.as_micros() as i64);
52
53 while let Some(front_trade) = inner.trades.front() {
54 if front_trade.timestamp < cutoff_timestamp {
55 inner.trades.pop_front();
56 } else {
57 break;
58 }
59 }
60
61 inner.trades.push_back(trade);
62 }
63
64 pub fn len(&self) -> usize {
66 self.inner
67 .lock()
68 .unwrap_or_else(|poisoned| poisoned.into_inner())
69 .trades
70 .len()
71 }
72
73 pub fn is_empty(&self) -> bool {
75 self.inner
76 .lock()
77 .unwrap_or_else(|poisoned| poisoned.into_inner())
78 .trades
79 .is_empty()
80 }
81
82 pub fn time_span(&self) -> Option<Duration> {
84 let inner = self
85 .inner
86 .lock()
87 .unwrap_or_else(|poisoned| poisoned.into_inner());
88 if let (Some(first), Some(last)) = (inner.trades.front(), inner.trades.back()) {
89 let span_microseconds = last.timestamp - first.timestamp;
90 if span_microseconds > 0 {
91 Some(Duration::from_micros(span_microseconds as u64))
92 } else {
93 None
94 }
95 } else {
96 None
97 }
98 }
99
100 pub fn get_trades_from(&self, minutes_ago: u32) -> Vec<AggTrade> {
103 let inner = self
104 .inner
105 .lock()
106 .unwrap_or_else(|poisoned| poisoned.into_inner());
107
108 if inner.trades.is_empty() {
110 return Vec::new();
111 }
112
113 let latest_timestamp = inner.trades.back().unwrap().timestamp;
115 let cutoff_timestamp = latest_timestamp - (minutes_ago as i64 * 60 * 1000);
116
117 let mut start_idx = 0;
120 for (idx, trade) in inner.trades.iter().enumerate() {
121 if trade.timestamp >= cutoff_timestamp {
122 start_idx = idx;
123 break;
124 }
125 }
126
127 let mut result = Vec::new();
129 result.extend(inner.trades.iter().skip(start_idx).cloned());
130 result
131 }
132
133 pub fn replay_from(&self, minutes_ago: u32, speed_multiplier: f32) -> ReplayStream {
135 let trades = self.get_trades_from(minutes_ago);
136 ReplayStream::new(trades, speed_multiplier)
137 }
138
139 pub fn stats(&self) -> ReplayBufferStats {
141 let inner = self
142 .inner
143 .lock()
144 .unwrap_or_else(|poisoned| poisoned.into_inner());
145
146 let (first_timestamp, last_timestamp) =
147 if let (Some(first), Some(last)) = (inner.trades.front(), inner.trades.back()) {
148 (Some(first.timestamp), Some(last.timestamp))
149 } else {
150 (None, None)
151 };
152
153 ReplayBufferStats {
154 capacity: inner.capacity,
155 trade_count: inner.trades.len(),
156 first_timestamp,
157 last_timestamp,
158 memory_usage_bytes: inner.trades.len() * std::mem::size_of::<AggTrade>(),
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
165pub struct ReplayBufferStats {
166 pub capacity: Duration,
167 pub trade_count: usize,
168 pub first_timestamp: Option<i64>,
169 pub last_timestamp: Option<i64>,
170 pub memory_usage_bytes: usize,
171}
172
173pub struct ReplayStream {
175 trades: Vec<AggTrade>,
176 current_index: usize,
177 speed_multiplier: f32,
178 base_timestamp: Option<i64>,
179 start_time: Option<Instant>,
180}
181
182impl ReplayStream {
183 pub fn new(trades: Vec<AggTrade>, speed_multiplier: f32) -> Self {
185 let base_timestamp = trades.first().map(|t| t.timestamp);
186
187 Self {
188 trades,
189 current_index: 0,
190 speed_multiplier: speed_multiplier.max(0.1), base_timestamp,
192 start_time: None,
193 }
194 }
195
196 pub fn set_speed(&mut self, speed_multiplier: f32) {
198 self.speed_multiplier = speed_multiplier.max(0.1);
199 }
200
201 pub fn speed(&self) -> f32 {
203 self.speed_multiplier
204 }
205
206 pub fn remaining(&self) -> usize {
208 self.trades.len().saturating_sub(self.current_index)
209 }
210
211 pub fn total(&self) -> usize {
213 self.trades.len()
214 }
215
216 pub fn progress(&self) -> f32 {
218 if self.trades.is_empty() {
219 1.0
220 } else {
221 self.current_index as f32 / self.trades.len() as f32
222 }
223 }
224}
225
226impl Stream for ReplayStream {
227 type Item = AggTrade;
228
229 fn poll_next(
230 mut self: std::pin::Pin<&mut Self>,
231 cx: &mut std::task::Context<'_>,
232 ) -> std::task::Poll<Option<Self::Item>> {
233 if self.current_index >= self.trades.len() {
235 return std::task::Poll::Ready(None);
236 }
237
238 if self.start_time.is_none() {
240 let current_trade = self.trades[self.current_index].clone();
241 self.start_time = Some(Instant::now());
242 self.current_index += 1;
243 return std::task::Poll::Ready(Some(current_trade));
244 }
245
246 let current_trade = &self.trades[self.current_index];
247
248 if let (Some(base_timestamp), Some(start_time)) = (self.base_timestamp, self.start_time) {
250 let time_diff_microseconds = current_trade.timestamp - base_timestamp;
251 let real_time_diff = Duration::from_micros(time_diff_microseconds as u64);
252 let scaled_time_diff = Duration::from_micros(
253 (real_time_diff.as_micros() as f64 / self.speed_multiplier as f64) as u64,
254 );
255
256 let target_time = start_time + scaled_time_diff;
257 let now = Instant::now();
258
259 if now >= target_time {
260 let trade = current_trade.clone();
261 self.current_index += 1;
262 std::task::Poll::Ready(Some(trade))
263 } else {
264 let waker = cx.waker().clone();
266 let sleep_duration = target_time - now;
267
268 tokio::spawn(async move {
269 sleep(sleep_duration).await;
270 waker.wake();
271 });
272
273 std::task::Poll::Pending
274 }
275 } else {
276 let trade = current_trade.clone();
278 self.current_index += 1;
279 std::task::Poll::Ready(Some(trade))
280 }
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use futures::StreamExt;
288 use rangebar_core::FixedPoint;
289
290 fn create_test_trade(id: i64, timestamp: i64, price: f64) -> AggTrade {
291 AggTrade {
292 agg_trade_id: id,
293 price: FixedPoint::from_str(&price.to_string()).unwrap(),
294 volume: FixedPoint::from_str("1.0").unwrap(),
295 first_trade_id: id,
296 last_trade_id: id,
297 timestamp,
298 is_buyer_maker: false,
299 is_best_match: None,
300 }
301 }
302
303 #[test]
304 fn test_replay_buffer_capacity() {
305 let buffer = ReplayBuffer::new(Duration::from_secs(60)); let base_time = 1_704_067_200_000_000_i64; for i in 0..120 {
311 let trade = create_test_trade(i, base_time + (i * 1_000_000), 50000.0); buffer.push(trade);
313 }
314
315 let stats = buffer.stats();
317 assert!(
318 stats.trade_count <= 120,
319 "Expected <= 120 trades, got {}",
320 stats.trade_count
321 );
322
323 let trades = buffer.get_trades_from(1); assert!(!trades.is_empty());
325 }
326
327 #[test]
328 fn test_replay_buffer_time_span() {
329 let buffer = ReplayBuffer::new(Duration::from_secs(300)); let base_time = 1_704_067_200_000_000_i64; buffer.push(create_test_trade(1, base_time, 50000.0));
336 buffer.push(create_test_trade(2, base_time + 30_000_000, 50100.0)); buffer.push(create_test_trade(3, base_time + 60_000_000, 50200.0)); let span = buffer.time_span().unwrap();
340 assert_eq!(span.as_secs(), 60);
341 }
342
343 #[tokio::test]
344 async fn test_replay_stream() {
345 let base_time = 1_704_067_200_000_000_i64; let trades = vec![
348 create_test_trade(1, base_time, 50000.0),
349 create_test_trade(2, base_time + 1_000_000, 50100.0), create_test_trade(3, base_time + 2_000_000, 50200.0), ];
352
353 let mut stream = ReplayStream::new(trades, 10.0); assert_eq!(stream.total(), 3);
355 assert_eq!(stream.remaining(), 3);
356 assert_eq!(stream.progress(), 0.0);
357
358 let first = StreamExt::next(&mut stream).await;
360 assert!(first.is_some());
361 assert_eq!(first.unwrap().agg_trade_id, 1);
362 }
363
364 #[test]
365 fn test_get_trades_from_empty_buffer() {
366 let buffer = ReplayBuffer::new(Duration::from_secs(60));
367 let trades = buffer.get_trades_from(1);
368 assert_eq!(trades.len(), 0); }
370
371 #[test]
374 fn test_push_len_is_empty() {
375 let buffer = ReplayBuffer::new(Duration::from_secs(300));
376 assert!(buffer.is_empty());
377 assert_eq!(buffer.len(), 0);
378
379 let base = 1_704_067_200_000_000_i64;
380 buffer.push(create_test_trade(1, base, 50000.0));
381 assert!(!buffer.is_empty());
382 assert_eq!(buffer.len(), 1);
383
384 buffer.push(create_test_trade(2, base + 1_000_000, 50100.0));
385 assert_eq!(buffer.len(), 2);
386 }
387
388 #[test]
389 fn test_push_eviction_by_capacity() {
390 let buffer = ReplayBuffer::new(Duration::from_secs(10));
392 let base = 1_704_067_200_000_000_i64;
393
394 for i in 0..20 {
396 buffer.push(create_test_trade(i, base + (i * 1_000_000), 50000.0));
397 }
398
399 let len = buffer.len();
401 assert!(len <= 12, "Expected <=12 trades after eviction, got {len}");
402 assert!(len >= 10, "Expected >=10 trades retained, got {len}");
403 }
404
405 #[test]
406 fn test_replay_stream_set_speed() {
407 let trades = vec![
408 create_test_trade(1, 1_704_067_200_000_000, 50000.0),
409 create_test_trade(2, 1_704_067_201_000_000, 50100.0),
410 ];
411 let mut stream = ReplayStream::new(trades, 1.0);
412 assert!((stream.speed() - 1.0).abs() < f32::EPSILON);
413
414 stream.set_speed(5.0);
415 assert!((stream.speed() - 5.0).abs() < f32::EPSILON);
416
417 stream.set_speed(0.01);
419 assert!((stream.speed() - 0.1).abs() < f32::EPSILON);
420 }
421
422 #[test]
423 fn test_replay_stream_remaining_total_progress() {
424 let base = 1_704_067_200_000_000_i64;
425 let trades = vec![
426 create_test_trade(1, base, 50000.0),
427 create_test_trade(2, base + 1_000_000, 50100.0),
428 create_test_trade(3, base + 2_000_000, 50200.0),
429 create_test_trade(4, base + 3_000_000, 50300.0),
430 ];
431 let stream = ReplayStream::new(trades, 10.0);
432
433 assert_eq!(stream.total(), 4);
434 assert_eq!(stream.remaining(), 4);
435 assert!((stream.progress() - 0.0).abs() < f32::EPSILON);
436 }
437
438 #[test]
439 fn test_replay_stream_empty_progress() {
440 let stream = ReplayStream::new(vec![], 1.0);
441 assert_eq!(stream.total(), 0);
442 assert_eq!(stream.remaining(), 0);
443 assert!((stream.progress() - 1.0).abs() < f32::EPSILON);
444 }
445
446 #[test]
447 fn test_buffer_stats() {
448 let buffer = ReplayBuffer::new(Duration::from_secs(60));
449 let base = 1_704_067_200_000_000_i64;
450
451 let stats = buffer.stats();
452 assert_eq!(stats.trade_count, 0);
453 assert!(stats.first_timestamp.is_none());
454 assert!(stats.last_timestamp.is_none());
455
456 buffer.push(create_test_trade(1, base, 50000.0));
457 buffer.push(create_test_trade(2, base + 5_000_000, 50100.0));
458
459 let stats = buffer.stats();
460 assert_eq!(stats.trade_count, 2);
461 assert_eq!(stats.first_timestamp, Some(base));
462 assert_eq!(stats.last_timestamp, Some(base + 5_000_000));
463 assert!(stats.memory_usage_bytes > 0);
464 }
465}