tesser_data/
lib.rs

1//! Data utilities including streaming and historical downloads.
2
3pub mod analytics;
4pub mod download;
5pub mod encoding;
6pub mod parquet;
7pub mod recorder;
8
9/// Data distribution utilities that consume [`tesser_broker::MarketStream`].
10use anyhow::Context;
11use async_trait::async_trait;
12#[cfg(test)]
13use rust_decimal::Decimal;
14use tokio::time::{sleep, Duration};
15use tracing::instrument;
16
17use tesser_broker::{BrokerResult, MarketStream};
18use tesser_core::{Candle, OrderBook, Tick};
19
20/// Abstract handler for tick events.
21#[async_trait]
22pub trait TickHandler: Send {
23    /// Called for each incoming tick.
24    async fn on_tick(&mut self, tick: Tick) -> anyhow::Result<()>;
25}
26
27/// Abstract handler for candle events.
28#[async_trait]
29pub trait CandleHandler: Send {
30    /// Called for each incoming candle.
31    async fn on_candle(&mut self, candle: Candle) -> anyhow::Result<()>;
32}
33
34/// Abstract handler for order book events.
35#[async_trait]
36pub trait OrderBookHandler: Send {
37    /// Called for each incoming order book snapshot.
38    async fn on_order_book(&mut self, book: OrderBook) -> anyhow::Result<()>;
39}
40
41/// Pulls data from a [`MarketStream`] and forwards it to registered handlers.
42pub struct DataDistributor<S> {
43    stream: S,
44    backoff: Duration,
45}
46
47impl<S> DataDistributor<S>
48where
49    S: MarketStream,
50{
51    /// Build a new distributor with the provided stream.
52    pub fn new(stream: S) -> Self {
53        Self {
54            stream,
55            backoff: Duration::from_millis(200),
56        }
57    }
58
59    /// Run the event loop until the stream ends.
60    #[instrument(skip_all)]
61    pub async fn run(
62        &mut self,
63        tick_handler: &mut dyn TickHandler,
64        candle_handler: &mut dyn CandleHandler,
65        order_book_handler: &mut dyn OrderBookHandler,
66    ) -> BrokerResult<()> {
67        loop {
68            let mut progressed = false;
69
70            if let Some(tick) = self.stream.next_tick().await? {
71                tick_handler
72                    .on_tick(tick)
73                    .await
74                    .context("tick handler failed")
75                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
76                progressed = true;
77            }
78
79            if let Some(candle) = self.stream.next_candle().await? {
80                candle_handler
81                    .on_candle(candle)
82                    .await
83                    .context("candle handler failed")
84                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
85                progressed = true;
86            }
87
88            if let Some(book) = self.stream.next_order_book().await? {
89                order_book_handler
90                    .on_order_book(book)
91                    .await
92                    .context("order book handler failed")
93                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
94                progressed = true;
95            }
96
97            if !progressed {
98                // Avoid busy looping when the upstream stream is momentarily idle.
99                sleep(self.backoff).await;
100            }
101        }
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    use async_trait::async_trait;
109    use std::sync::{
110        atomic::{AtomicUsize, Ordering},
111        Arc,
112    };
113    use tesser_broker::BrokerInfo;
114    use tesser_core::{Interval, Side, Symbol};
115
116    struct TestStream {
117        ticks: Vec<Tick>,
118        candles: Vec<Candle>,
119    }
120
121    #[async_trait]
122    impl MarketStream for TestStream {
123        type Subscription = ();
124
125        fn name(&self) -> &str {
126            "test"
127        }
128
129        fn info(&self) -> Option<&BrokerInfo> {
130            None
131        }
132
133        async fn subscribe(&mut self, _subscription: Self::Subscription) -> BrokerResult<()> {
134            Ok(())
135        }
136
137        async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
138            Ok(self.ticks.pop())
139        }
140
141        async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
142            Ok(self.candles.pop())
143        }
144
145        async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
146            Ok(None)
147        }
148    }
149
150    struct CountHandler {
151        ticks: Arc<AtomicUsize>,
152        candles: Arc<AtomicUsize>,
153    }
154
155    #[async_trait]
156    impl TickHandler for CountHandler {
157        async fn on_tick(&mut self, _tick: Tick) -> anyhow::Result<()> {
158            self.ticks.fetch_add(1, Ordering::SeqCst);
159            Ok(())
160        }
161    }
162
163    #[async_trait]
164    impl CandleHandler for CountHandler {
165        async fn on_candle(&mut self, _candle: Candle) -> anyhow::Result<()> {
166            self.candles.fetch_add(1, Ordering::SeqCst);
167            Ok(())
168        }
169    }
170
171    #[async_trait]
172    impl OrderBookHandler for CountHandler {
173        async fn on_order_book(&mut self, _book: OrderBook) -> anyhow::Result<()> {
174            Ok(())
175        }
176    }
177
178    #[tokio::test]
179    async fn distributor_pumps_events() {
180        let ticks = vec![Tick {
181            symbol: Symbol::from("BTCUSDT"),
182            price: Decimal::ONE,
183            size: Decimal::ONE,
184            side: Side::Buy,
185            exchange_timestamp: chrono::Utc::now(),
186            received_at: chrono::Utc::now(),
187        }];
188        let candles = vec![Candle {
189            symbol: Symbol::from("BTCUSDT"),
190            interval: Interval::OneMinute,
191            open: Decimal::ONE,
192            high: Decimal::ONE,
193            low: Decimal::ONE,
194            close: Decimal::ONE,
195            volume: Decimal::ONE,
196            timestamp: chrono::Utc::now(),
197        }];
198        let mut distributor = DataDistributor::new(TestStream { ticks, candles });
199        let ticks_counter = Arc::new(AtomicUsize::new(0));
200        let candles_counter = Arc::new(AtomicUsize::new(0));
201        let mut tick_handler = CountHandler {
202            ticks: ticks_counter.clone(),
203            candles: candles_counter.clone(),
204        };
205        let mut candle_handler = CountHandler {
206            ticks: ticks_counter.clone(),
207            candles: candles_counter.clone(),
208        };
209        let mut order_book_handler = CountHandler {
210            ticks: ticks_counter.clone(),
211            candles: candles_counter.clone(),
212        };
213        let _ = tokio::time::timeout(Duration::from_millis(50), async {
214            distributor
215                .run(
216                    &mut tick_handler,
217                    &mut candle_handler,
218                    &mut order_book_handler,
219                )
220                .await
221        })
222        .await;
223        assert!(ticks_counter.load(Ordering::SeqCst) >= 1);
224        assert!(candles_counter.load(Ordering::SeqCst) >= 1);
225    }
226}