tesser_data/
lib.rs

1//! Data utilities including streaming and historical downloads.
2
3pub mod analytics;
4pub mod download;
5pub mod encoding;
6pub mod etl;
7pub mod io;
8pub mod merger;
9pub mod parquet;
10pub mod recorder;
11pub mod schema;
12pub mod transform;
13
14/// Data distribution utilities that consume [`tesser_broker::MarketStream`].
15use anyhow::Context;
16use async_trait::async_trait;
17#[cfg(test)]
18use rust_decimal::Decimal;
19use tokio::time::{sleep, Duration};
20use tracing::instrument;
21
22use tesser_broker::{BrokerResult, MarketStream};
23use tesser_core::{Candle, OrderBook, Tick};
24
25/// Abstract handler for tick events.
26#[async_trait]
27pub trait TickHandler: Send {
28    /// Called for each incoming tick.
29    async fn on_tick(&mut self, tick: Tick) -> anyhow::Result<()>;
30}
31
32/// Abstract handler for candle events.
33#[async_trait]
34pub trait CandleHandler: Send {
35    /// Called for each incoming candle.
36    async fn on_candle(&mut self, candle: Candle) -> anyhow::Result<()>;
37}
38
39/// Abstract handler for order book events.
40#[async_trait]
41pub trait OrderBookHandler: Send {
42    /// Called for each incoming order book snapshot.
43    async fn on_order_book(&mut self, book: OrderBook) -> anyhow::Result<()>;
44}
45
46/// Pulls data from a [`MarketStream`] and forwards it to registered handlers.
47pub struct DataDistributor<S> {
48    stream: S,
49    backoff: Duration,
50}
51
52impl<S> DataDistributor<S>
53where
54    S: MarketStream,
55{
56    /// Build a new distributor with the provided stream.
57    pub fn new(stream: S) -> Self {
58        Self {
59            stream,
60            backoff: Duration::from_millis(200),
61        }
62    }
63
64    /// Run the event loop until the stream ends.
65    #[instrument(skip_all)]
66    pub async fn run(
67        &mut self,
68        tick_handler: &mut dyn TickHandler,
69        candle_handler: &mut dyn CandleHandler,
70        order_book_handler: &mut dyn OrderBookHandler,
71    ) -> BrokerResult<()> {
72        loop {
73            let mut progressed = false;
74
75            if let Some(tick) = self.stream.next_tick().await? {
76                tick_handler
77                    .on_tick(tick)
78                    .await
79                    .context("tick handler failed")
80                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
81                progressed = true;
82            }
83
84            if let Some(candle) = self.stream.next_candle().await? {
85                candle_handler
86                    .on_candle(candle)
87                    .await
88                    .context("candle handler failed")
89                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
90                progressed = true;
91            }
92
93            if let Some(book) = self.stream.next_order_book().await? {
94                order_book_handler
95                    .on_order_book(book)
96                    .await
97                    .context("order book handler failed")
98                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
99                progressed = true;
100            }
101
102            if !progressed {
103                // Avoid busy looping when the upstream stream is momentarily idle.
104                sleep(self.backoff).await;
105            }
106        }
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use async_trait::async_trait;
114    use std::sync::{
115        atomic::{AtomicUsize, Ordering},
116        Arc,
117    };
118    use tesser_broker::BrokerInfo;
119    use tesser_core::{Interval, Side, Symbol};
120
121    struct TestStream {
122        ticks: Vec<Tick>,
123        candles: Vec<Candle>,
124    }
125
126    #[async_trait]
127    impl MarketStream for TestStream {
128        type Subscription = ();
129
130        fn name(&self) -> &str {
131            "test"
132        }
133
134        fn info(&self) -> Option<&BrokerInfo> {
135            None
136        }
137
138        async fn subscribe(&mut self, _subscription: Self::Subscription) -> BrokerResult<()> {
139            Ok(())
140        }
141
142        async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
143            Ok(self.ticks.pop())
144        }
145
146        async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
147            Ok(self.candles.pop())
148        }
149
150        async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
151            Ok(None)
152        }
153    }
154
155    struct CountHandler {
156        ticks: Arc<AtomicUsize>,
157        candles: Arc<AtomicUsize>,
158    }
159
160    #[async_trait]
161    impl TickHandler for CountHandler {
162        async fn on_tick(&mut self, _tick: Tick) -> anyhow::Result<()> {
163            self.ticks.fetch_add(1, Ordering::SeqCst);
164            Ok(())
165        }
166    }
167
168    #[async_trait]
169    impl CandleHandler for CountHandler {
170        async fn on_candle(&mut self, _candle: Candle) -> anyhow::Result<()> {
171            self.candles.fetch_add(1, Ordering::SeqCst);
172            Ok(())
173        }
174    }
175
176    #[async_trait]
177    impl OrderBookHandler for CountHandler {
178        async fn on_order_book(&mut self, _book: OrderBook) -> anyhow::Result<()> {
179            Ok(())
180        }
181    }
182
183    #[tokio::test]
184    async fn distributor_pumps_events() {
185        let ticks = vec![Tick {
186            symbol: Symbol::from("BTCUSDT"),
187            price: Decimal::ONE,
188            size: Decimal::ONE,
189            side: Side::Buy,
190            exchange_timestamp: chrono::Utc::now(),
191            received_at: chrono::Utc::now(),
192        }];
193        let candles = vec![Candle {
194            symbol: Symbol::from("BTCUSDT"),
195            interval: Interval::OneMinute,
196            open: Decimal::ONE,
197            high: Decimal::ONE,
198            low: Decimal::ONE,
199            close: Decimal::ONE,
200            volume: Decimal::ONE,
201            timestamp: chrono::Utc::now(),
202        }];
203        let mut distributor = DataDistributor::new(TestStream { ticks, candles });
204        let ticks_counter = Arc::new(AtomicUsize::new(0));
205        let candles_counter = Arc::new(AtomicUsize::new(0));
206        let mut tick_handler = CountHandler {
207            ticks: ticks_counter.clone(),
208            candles: candles_counter.clone(),
209        };
210        let mut candle_handler = CountHandler {
211            ticks: ticks_counter.clone(),
212            candles: candles_counter.clone(),
213        };
214        let mut order_book_handler = CountHandler {
215            ticks: ticks_counter.clone(),
216            candles: candles_counter.clone(),
217        };
218        let _ = tokio::time::timeout(Duration::from_millis(50), async {
219            distributor
220                .run(
221                    &mut tick_handler,
222                    &mut candle_handler,
223                    &mut order_book_handler,
224                )
225                .await
226        })
227        .await;
228        assert!(ticks_counter.load(Ordering::SeqCst) >= 1);
229        assert!(candles_counter.load(Ordering::SeqCst) >= 1);
230    }
231}