1use std::{
2 collections::HashSet,
3 path::{Path, PathBuf},
4};
5
6use anyhow::{anyhow, Result};
7use chrono::{DateTime, Utc};
8use futures::{stream, Stream};
9use tesser_core::{DepthUpdate, OrderBook, Symbol, Tick};
10
11use crate::{
12 analytics::collect_parquet_files,
13 parquet::{DepthCursor, OrderBookCursor, TickCursor},
14};
15
16#[derive(Clone, Copy)]
17enum Source {
18 Tick,
19 Book,
20 Depth,
21}
22
23#[derive(Debug)]
25pub struct UnifiedEvent {
26 pub timestamp: DateTime<Utc>,
27 pub kind: UnifiedEventKind,
28}
29
30#[derive(Debug)]
32pub enum UnifiedEventKind {
33 OrderBook(OrderBook),
34 Depth(DepthUpdate),
35 Trade(Tick),
36}
37
38pub struct UnifiedEventStream {
40 symbols: HashSet<Symbol>,
41 ticks: Option<TickCursor>,
42 tick_peek: Option<Tick>,
43 books: Option<OrderBookCursor>,
44 book_peek: Option<OrderBook>,
45 depth: Option<DepthCursor>,
46 depth_peek: Option<DepthUpdate>,
47}
48
49impl UnifiedEventStream {
50 pub fn from_flight_recorder(root: impl AsRef<Path>, symbols: &[Symbol]) -> Result<Self> {
52 let root = root.as_ref();
53 let tick_paths = collect_parquet_files(&root.join("ticks"))?;
54 let book_paths = collect_first_existing(root, &["order_books", "books"])?;
55 let depth_paths = collect_first_existing(root, &["depth", "depth_updates"])?;
56 Self::from_paths(symbols, tick_paths, book_paths, depth_paths)
57 }
58
59 pub fn from_paths(
61 symbols: &[Symbol],
62 tick_paths: Vec<PathBuf>,
63 order_book_paths: Vec<PathBuf>,
64 depth_paths: Vec<PathBuf>,
65 ) -> Result<Self> {
66 if tick_paths.is_empty() && order_book_paths.is_empty() && depth_paths.is_empty() {
67 return Err(anyhow!("at least one parquet data source must be provided"));
68 }
69 Ok(Self {
70 symbols: symbols.iter().cloned().collect(),
71 ticks: (!tick_paths.is_empty()).then(|| TickCursor::new(tick_paths)),
72 tick_peek: None,
73 books: (!order_book_paths.is_empty()).then(|| OrderBookCursor::new(order_book_paths)),
74 book_peek: None,
75 depth: (!depth_paths.is_empty()).then(|| DepthCursor::new(depth_paths)),
76 depth_peek: None,
77 })
78 }
79
80 pub fn into_stream(self) -> impl Stream<Item = Result<UnifiedEvent>> {
82 stream::unfold(self, |mut state| async move {
83 match state.next_event().await {
84 Ok(Some(event)) => Some((Ok(event), state)),
85 Ok(None) => None,
86 Err(err) => Some((Err(err), state)),
87 }
88 })
89 }
90
91 async fn next_event(&mut self) -> Result<Option<UnifiedEvent>> {
92 self.ensure_tick().await?;
93 self.ensure_book().await?;
94 self.ensure_depth().await?;
95
96 let mut candidate: Option<(DateTime<Utc>, Source)> = None;
97
98 if let Some(tick) = self.tick_peek.as_ref() {
99 candidate = pick_candidate(candidate, tick.exchange_timestamp, Source::Tick);
100 }
101 if let Some(book) = self.book_peek.as_ref() {
102 candidate = pick_candidate(candidate, book.timestamp, Source::Book);
103 }
104 if let Some(update) = self.depth_peek.as_ref() {
105 candidate = pick_candidate(candidate, update.timestamp, Source::Depth);
106 }
107
108 let Some((_, source)) = candidate else {
109 return Ok(None);
110 };
111
112 let event = match source {
113 Source::Tick => {
114 let tick = self
115 .tick_peek
116 .take()
117 .expect("tick candidate must be populated");
118 UnifiedEvent {
119 timestamp: tick.exchange_timestamp,
120 kind: UnifiedEventKind::Trade(tick),
121 }
122 }
123 Source::Book => {
124 let book = self
125 .book_peek
126 .take()
127 .expect("order book candidate must be populated");
128 UnifiedEvent {
129 timestamp: book.timestamp,
130 kind: UnifiedEventKind::OrderBook(book),
131 }
132 }
133 Source::Depth => {
134 let update = self
135 .depth_peek
136 .take()
137 .expect("depth candidate must be populated");
138 UnifiedEvent {
139 timestamp: update.timestamp,
140 kind: UnifiedEventKind::Depth(update),
141 }
142 }
143 };
144 Ok(Some(event))
145 }
146
147 async fn ensure_tick(&mut self) -> Result<()> {
148 if self.tick_peek.is_some() {
149 return Ok(());
150 }
151 let filter = self.symbols.clone();
152 let allow_all = filter.is_empty();
153 let Some(cursor) = self.ticks.as_mut() else {
154 return Ok(());
155 };
156 while let Some(tick) = cursor.next().await? {
157 if allow_all || filter.contains(&tick.symbol) {
158 self.tick_peek = Some(tick);
159 break;
160 }
161 }
162 Ok(())
163 }
164
165 async fn ensure_book(&mut self) -> Result<()> {
166 if self.book_peek.is_some() {
167 return Ok(());
168 }
169 let filter = self.symbols.clone();
170 let allow_all = filter.is_empty();
171 let Some(cursor) = self.books.as_mut() else {
172 return Ok(());
173 };
174 while let Some(book) = cursor.next().await? {
175 if allow_all || filter.contains(&book.symbol) {
176 self.book_peek = Some(book);
177 break;
178 }
179 }
180 Ok(())
181 }
182
183 async fn ensure_depth(&mut self) -> Result<()> {
184 if self.depth_peek.is_some() {
185 return Ok(());
186 }
187 let filter = self.symbols.clone();
188 let allow_all = filter.is_empty();
189 let Some(cursor) = self.depth.as_mut() else {
190 return Ok(());
191 };
192 while let Some(update) = cursor.next().await? {
193 if allow_all || filter.contains(&update.symbol) {
194 self.depth_peek = Some(update);
195 break;
196 }
197 }
198 Ok(())
199 }
200}
201
202fn pick_candidate(
203 current: Option<(DateTime<Utc>, Source)>,
204 ts: DateTime<Utc>,
205 source: Source,
206) -> Option<(DateTime<Utc>, Source)> {
207 match current {
208 Some((existing_ts, existing_source)) => {
209 if ts < existing_ts {
210 Some((ts, source))
211 } else {
212 Some((existing_ts, existing_source))
213 }
214 }
215 None => Some((ts, source)),
216 }
217}
218
219fn collect_first_existing(root: &Path, names: &[&str]) -> Result<Vec<PathBuf>> {
220 for name in names {
221 let path = root.join(name);
222 if path.exists() {
223 return collect_parquet_files(&path);
224 }
225 }
226 Ok(Vec::new())
227}