1use rangebar_core::AggTrade;
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10use tokio::time::sleep;
11use tokio_stream::Stream;
12
13#[derive(Debug, Clone)]
15pub struct ReplayBuffer {
16 inner: Arc<Mutex<ReplayBufferInner>>,
17}
18
19#[derive(Debug)]
20struct ReplayBufferInner {
21 capacity: Duration,
22 trades: VecDeque<AggTrade>,
23 start_time: Option<Instant>,
24}
25
26impl ReplayBuffer {
27 pub fn new(capacity: Duration) -> Self {
29 Self {
30 inner: Arc::new(Mutex::new(ReplayBufferInner {
31 capacity,
32 trades: VecDeque::new(),
33 start_time: None,
34 })),
35 }
36 }
37
38 pub fn push(&self, trade: AggTrade) {
40 let mut inner = self
41 .inner
42 .lock()
43 .unwrap_or_else(|poisoned| poisoned.into_inner());
44
45 if inner.start_time.is_none() {
47 inner.start_time = Some(Instant::now());
48 }
49
50 let cutoff_timestamp = trade.timestamp - (inner.capacity.as_micros() as i64);
52
53 while let Some(front_trade) = inner.trades.front() {
54 if front_trade.timestamp < cutoff_timestamp {
55 inner.trades.pop_front();
56 } else {
57 break;
58 }
59 }
60
61 inner.trades.push_back(trade);
62 }
63
64 pub fn len(&self) -> usize {
66 self.inner
67 .lock()
68 .unwrap_or_else(|poisoned| poisoned.into_inner())
69 .trades
70 .len()
71 }
72
73 pub fn is_empty(&self) -> bool {
75 self.inner
76 .lock()
77 .unwrap_or_else(|poisoned| poisoned.into_inner())
78 .trades
79 .is_empty()
80 }
81
82 pub fn time_span(&self) -> Option<Duration> {
84 let inner = self
85 .inner
86 .lock()
87 .unwrap_or_else(|poisoned| poisoned.into_inner());
88 if let (Some(first), Some(last)) = (inner.trades.front(), inner.trades.back()) {
89 let span_microseconds = last.timestamp - first.timestamp;
90 if span_microseconds > 0 {
91 Some(Duration::from_micros(span_microseconds as u64))
92 } else {
93 None
94 }
95 } else {
96 None
97 }
98 }
99
100 pub fn get_trades_from(&self, minutes_ago: u32) -> Vec<AggTrade> {
102 let inner = self
103 .inner
104 .lock()
105 .unwrap_or_else(|poisoned| poisoned.into_inner());
106
107 let latest_timestamp = match inner.trades.back() {
109 Some(trade) => trade.timestamp,
110 None => return Vec::new(),
111 };
112 let cutoff_timestamp = latest_timestamp - (minutes_ago as i64 * 60 * 1000);
113
114 inner
115 .trades
116 .iter()
117 .filter(|trade| trade.timestamp >= cutoff_timestamp)
118 .cloned()
119 .collect()
120 }
121
122 pub fn replay_from(&self, minutes_ago: u32, speed_multiplier: f32) -> ReplayStream {
124 let trades = self.get_trades_from(minutes_ago);
125 ReplayStream::new(trades, speed_multiplier)
126 }
127
128 pub fn stats(&self) -> ReplayBufferStats {
130 let inner = self
131 .inner
132 .lock()
133 .unwrap_or_else(|poisoned| poisoned.into_inner());
134
135 let (first_timestamp, last_timestamp) =
136 if let (Some(first), Some(last)) = (inner.trades.front(), inner.trades.back()) {
137 (Some(first.timestamp), Some(last.timestamp))
138 } else {
139 (None, None)
140 };
141
142 ReplayBufferStats {
143 capacity: inner.capacity,
144 trade_count: inner.trades.len(),
145 first_timestamp,
146 last_timestamp,
147 memory_usage_bytes: inner.trades.len() * std::mem::size_of::<AggTrade>(),
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct ReplayBufferStats {
155 pub capacity: Duration,
156 pub trade_count: usize,
157 pub first_timestamp: Option<i64>,
158 pub last_timestamp: Option<i64>,
159 pub memory_usage_bytes: usize,
160}
161
162pub struct ReplayStream {
164 trades: Vec<AggTrade>,
165 current_index: usize,
166 speed_multiplier: f32,
167 base_timestamp: Option<i64>,
168 start_time: Option<Instant>,
169}
170
171impl ReplayStream {
172 pub fn new(trades: Vec<AggTrade>, speed_multiplier: f32) -> Self {
174 let base_timestamp = trades.first().map(|t| t.timestamp);
175
176 Self {
177 trades,
178 current_index: 0,
179 speed_multiplier: speed_multiplier.max(0.1), base_timestamp,
181 start_time: None,
182 }
183 }
184
185 pub fn set_speed(&mut self, speed_multiplier: f32) {
187 self.speed_multiplier = speed_multiplier.max(0.1);
188 }
189
190 pub fn speed(&self) -> f32 {
192 self.speed_multiplier
193 }
194
195 pub fn remaining(&self) -> usize {
197 self.trades.len().saturating_sub(self.current_index)
198 }
199
200 pub fn total(&self) -> usize {
202 self.trades.len()
203 }
204
205 pub fn progress(&self) -> f32 {
207 if self.trades.is_empty() {
208 1.0
209 } else {
210 self.current_index as f32 / self.trades.len() as f32
211 }
212 }
213}
214
215impl Stream for ReplayStream {
216 type Item = AggTrade;
217
218 fn poll_next(
219 mut self: std::pin::Pin<&mut Self>,
220 cx: &mut std::task::Context<'_>,
221 ) -> std::task::Poll<Option<Self::Item>> {
222 if self.current_index >= self.trades.len() {
224 return std::task::Poll::Ready(None);
225 }
226
227 if self.start_time.is_none() {
229 let current_trade = self.trades[self.current_index].clone();
230 self.start_time = Some(Instant::now());
231 self.current_index += 1;
232 return std::task::Poll::Ready(Some(current_trade));
233 }
234
235 let current_trade = &self.trades[self.current_index];
236
237 if let (Some(base_timestamp), Some(start_time)) = (self.base_timestamp, self.start_time) {
239 let time_diff_microseconds = current_trade.timestamp - base_timestamp;
240 let real_time_diff = Duration::from_micros(time_diff_microseconds as u64);
241 let scaled_time_diff = Duration::from_micros(
242 (real_time_diff.as_micros() as f64 / self.speed_multiplier as f64) as u64,
243 );
244
245 let target_time = start_time + scaled_time_diff;
246 let now = Instant::now();
247
248 if now >= target_time {
249 let trade = current_trade.clone();
250 self.current_index += 1;
251 std::task::Poll::Ready(Some(trade))
252 } else {
253 let waker = cx.waker().clone();
255 let sleep_duration = target_time - now;
256
257 tokio::spawn(async move {
258 sleep(sleep_duration).await;
259 waker.wake();
260 });
261
262 std::task::Poll::Pending
263 }
264 } else {
265 let trade = current_trade.clone();
267 self.current_index += 1;
268 std::task::Poll::Ready(Some(trade))
269 }
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use futures::StreamExt;
277 use rangebar_core::FixedPoint;
278
279 fn create_test_trade(id: i64, timestamp: i64, price: f64) -> AggTrade {
280 AggTrade {
281 agg_trade_id: id,
282 price: FixedPoint::from_str(&price.to_string()).unwrap(),
283 volume: FixedPoint::from_str("1.0").unwrap(),
284 first_trade_id: id,
285 last_trade_id: id,
286 timestamp,
287 is_buyer_maker: false,
288 is_best_match: None,
289 }
290 }
291
292 #[test]
293 fn test_replay_buffer_capacity() {
294 let buffer = ReplayBuffer::new(Duration::from_secs(60)); let base_time = 1_704_067_200_000_000_i64; for i in 0..120 {
300 let trade = create_test_trade(i, base_time + (i * 1_000_000), 50000.0); buffer.push(trade);
302 }
303
304 let stats = buffer.stats();
306 assert!(
307 stats.trade_count <= 120,
308 "Expected <= 120 trades, got {}",
309 stats.trade_count
310 );
311
312 let trades = buffer.get_trades_from(1); assert!(!trades.is_empty());
314 }
315
316 #[test]
317 fn test_replay_buffer_time_span() {
318 let buffer = ReplayBuffer::new(Duration::from_secs(300)); let base_time = 1_704_067_200_000_000_i64; buffer.push(create_test_trade(1, base_time, 50000.0));
325 buffer.push(create_test_trade(2, base_time + 30_000_000, 50100.0)); buffer.push(create_test_trade(3, base_time + 60_000_000, 50200.0)); let span = buffer.time_span().unwrap();
329 assert_eq!(span.as_secs(), 60);
330 }
331
332 #[tokio::test]
333 async fn test_replay_stream() {
334 let base_time = 1_704_067_200_000_000_i64; let trades = vec![
337 create_test_trade(1, base_time, 50000.0),
338 create_test_trade(2, base_time + 1_000_000, 50100.0), create_test_trade(3, base_time + 2_000_000, 50200.0), ];
341
342 let mut stream = ReplayStream::new(trades, 10.0); assert_eq!(stream.total(), 3);
344 assert_eq!(stream.remaining(), 3);
345 assert_eq!(stream.progress(), 0.0);
346
347 let first = StreamExt::next(&mut stream).await;
349 assert!(first.is_some());
350 assert_eq!(first.unwrap().agg_trade_id, 1);
351 }
352
353 #[test]
354 fn test_get_trades_from_empty_buffer() {
355 let buffer = ReplayBuffer::new(Duration::from_secs(60));
356 let trades = buffer.get_trades_from(1);
357 assert_eq!(trades.len(), 0); }
359}