tesser_data/
merger.rs

1use std::{
2    collections::HashSet,
3    path::{Path, PathBuf},
4};
5
6use anyhow::{anyhow, Result};
7use chrono::{DateTime, Utc};
8use futures::{stream, Stream};
9use tesser_core::{DepthUpdate, OrderBook, Symbol, Tick};
10
11use crate::{
12    analytics::collect_parquet_files,
13    parquet::{DepthCursor, OrderBookCursor, TickCursor},
14};
15
16#[derive(Clone, Copy)]
17enum Source {
18    Tick,
19    Book,
20    Depth,
21}
22
23/// Unified event emitted by the merged parquet cursors.
24#[derive(Debug)]
25pub struct UnifiedEvent {
26    pub timestamp: DateTime<Utc>,
27    pub kind: UnifiedEventKind,
28}
29
30/// Enum describing the concrete payload contained within a [`UnifiedEvent`].
31#[derive(Debug)]
32pub enum UnifiedEventKind {
33    OrderBook(OrderBook),
34    Depth(DepthUpdate),
35    Trade(Tick),
36}
37
38/// Builder that merges heterogeneous parquet cursors into a single chronological stream.
39pub struct UnifiedEventStream {
40    symbols: HashSet<Symbol>,
41    ticks: Option<TickCursor>,
42    tick_peek: Option<Tick>,
43    books: Option<OrderBookCursor>,
44    book_peek: Option<OrderBook>,
45    depth: Option<DepthCursor>,
46    depth_peek: Option<DepthUpdate>,
47}
48
49impl UnifiedEventStream {
50    /// Construct a stream backed by parquet files located under a flight-recorder root.
51    pub fn from_flight_recorder(root: impl AsRef<Path>, symbols: &[Symbol]) -> Result<Self> {
52        let root = root.as_ref();
53        let tick_paths = collect_parquet_files(&root.join("ticks"))?;
54        let book_paths = collect_first_existing(root, &["order_books", "books"])?;
55        let depth_paths = collect_first_existing(root, &["depth", "depth_updates"])?;
56        Self::from_paths(symbols, tick_paths, book_paths, depth_paths)
57    }
58
59    /// Construct a stream from explicit parquet path lists.
60    pub fn from_paths(
61        symbols: &[Symbol],
62        tick_paths: Vec<PathBuf>,
63        order_book_paths: Vec<PathBuf>,
64        depth_paths: Vec<PathBuf>,
65    ) -> Result<Self> {
66        if tick_paths.is_empty() && order_book_paths.is_empty() && depth_paths.is_empty() {
67            return Err(anyhow!("at least one parquet data source must be provided"));
68        }
69        Ok(Self {
70            symbols: symbols.iter().cloned().collect(),
71            ticks: (!tick_paths.is_empty()).then(|| TickCursor::new(tick_paths)),
72            tick_peek: None,
73            books: (!order_book_paths.is_empty()).then(|| OrderBookCursor::new(order_book_paths)),
74            book_peek: None,
75            depth: (!depth_paths.is_empty()).then(|| DepthCursor::new(depth_paths)),
76            depth_peek: None,
77        })
78    }
79
80    /// Convert this stream into a [`futures::Stream`] implementation.
81    pub fn into_stream(self) -> impl Stream<Item = Result<UnifiedEvent>> {
82        stream::unfold(self, |mut state| async move {
83            match state.next_event().await {
84                Ok(Some(event)) => Some((Ok(event), state)),
85                Ok(None) => None,
86                Err(err) => Some((Err(err), state)),
87            }
88        })
89    }
90
91    async fn next_event(&mut self) -> Result<Option<UnifiedEvent>> {
92        self.ensure_tick().await?;
93        self.ensure_book().await?;
94        self.ensure_depth().await?;
95
96        let mut candidate: Option<(DateTime<Utc>, Source)> = None;
97
98        if let Some(tick) = self.tick_peek.as_ref() {
99            candidate = pick_candidate(candidate, tick.exchange_timestamp, Source::Tick);
100        }
101        if let Some(book) = self.book_peek.as_ref() {
102            candidate = pick_candidate(candidate, book.timestamp, Source::Book);
103        }
104        if let Some(update) = self.depth_peek.as_ref() {
105            candidate = pick_candidate(candidate, update.timestamp, Source::Depth);
106        }
107
108        let Some((_, source)) = candidate else {
109            return Ok(None);
110        };
111
112        let event = match source {
113            Source::Tick => {
114                let tick = self
115                    .tick_peek
116                    .take()
117                    .expect("tick candidate must be populated");
118                UnifiedEvent {
119                    timestamp: tick.exchange_timestamp,
120                    kind: UnifiedEventKind::Trade(tick),
121                }
122            }
123            Source::Book => {
124                let book = self
125                    .book_peek
126                    .take()
127                    .expect("order book candidate must be populated");
128                UnifiedEvent {
129                    timestamp: book.timestamp,
130                    kind: UnifiedEventKind::OrderBook(book),
131                }
132            }
133            Source::Depth => {
134                let update = self
135                    .depth_peek
136                    .take()
137                    .expect("depth candidate must be populated");
138                UnifiedEvent {
139                    timestamp: update.timestamp,
140                    kind: UnifiedEventKind::Depth(update),
141                }
142            }
143        };
144        Ok(Some(event))
145    }
146
147    async fn ensure_tick(&mut self) -> Result<()> {
148        if self.tick_peek.is_some() {
149            return Ok(());
150        }
151        let filter = self.symbols.clone();
152        let allow_all = filter.is_empty();
153        let Some(cursor) = self.ticks.as_mut() else {
154            return Ok(());
155        };
156        while let Some(tick) = cursor.next().await? {
157            if allow_all || filter.contains(&tick.symbol) {
158                self.tick_peek = Some(tick);
159                break;
160            }
161        }
162        Ok(())
163    }
164
165    async fn ensure_book(&mut self) -> Result<()> {
166        if self.book_peek.is_some() {
167            return Ok(());
168        }
169        let filter = self.symbols.clone();
170        let allow_all = filter.is_empty();
171        let Some(cursor) = self.books.as_mut() else {
172            return Ok(());
173        };
174        while let Some(book) = cursor.next().await? {
175            if allow_all || filter.contains(&book.symbol) {
176                self.book_peek = Some(book);
177                break;
178            }
179        }
180        Ok(())
181    }
182
183    async fn ensure_depth(&mut self) -> Result<()> {
184        if self.depth_peek.is_some() {
185            return Ok(());
186        }
187        let filter = self.symbols.clone();
188        let allow_all = filter.is_empty();
189        let Some(cursor) = self.depth.as_mut() else {
190            return Ok(());
191        };
192        while let Some(update) = cursor.next().await? {
193            if allow_all || filter.contains(&update.symbol) {
194                self.depth_peek = Some(update);
195                break;
196            }
197        }
198        Ok(())
199    }
200}
201
202fn pick_candidate(
203    current: Option<(DateTime<Utc>, Source)>,
204    ts: DateTime<Utc>,
205    source: Source,
206) -> Option<(DateTime<Utc>, Source)> {
207    match current {
208        Some((existing_ts, existing_source)) => {
209            if ts < existing_ts {
210                Some((ts, source))
211            } else {
212                Some((existing_ts, existing_source))
213            }
214        }
215        None => Some((ts, source)),
216    }
217}
218
219fn collect_first_existing(root: &Path, names: &[&str]) -> Result<Vec<PathBuf>> {
220    for name in names {
221        let path = root.join(name);
222        if path.exists() {
223            return collect_parquet_files(&path);
224        }
225    }
226    Ok(Vec::new())
227}