tesser_data/
lib.rs

1//! Data utilities including streaming and historical downloads.
2
3pub mod download;
4
5/// Data distribution utilities that consume [`tesser_broker::MarketStream`].
6use anyhow::Context;
7use async_trait::async_trait;
8#[cfg(test)]
9use rust_decimal::Decimal;
10use tokio::time::{sleep, Duration};
11use tracing::instrument;
12
13use tesser_broker::{BrokerResult, MarketStream};
14use tesser_core::{Candle, OrderBook, Tick};
15
16/// Abstract handler for tick events.
17#[async_trait]
18pub trait TickHandler: Send {
19    /// Called for each incoming tick.
20    async fn on_tick(&mut self, tick: Tick) -> anyhow::Result<()>;
21}
22
23/// Abstract handler for candle events.
24#[async_trait]
25pub trait CandleHandler: Send {
26    /// Called for each incoming candle.
27    async fn on_candle(&mut self, candle: Candle) -> anyhow::Result<()>;
28}
29
30/// Abstract handler for order book events.
31#[async_trait]
32pub trait OrderBookHandler: Send {
33    /// Called for each incoming order book snapshot.
34    async fn on_order_book(&mut self, book: OrderBook) -> anyhow::Result<()>;
35}
36
37/// Pulls data from a [`MarketStream`] and forwards it to registered handlers.
38pub struct DataDistributor<S> {
39    stream: S,
40    backoff: Duration,
41}
42
43impl<S> DataDistributor<S>
44where
45    S: MarketStream,
46{
47    /// Build a new distributor with the provided stream.
48    pub fn new(stream: S) -> Self {
49        Self {
50            stream,
51            backoff: Duration::from_millis(200),
52        }
53    }
54
55    /// Run the event loop until the stream ends.
56    #[instrument(skip_all)]
57    pub async fn run(
58        &mut self,
59        tick_handler: &mut dyn TickHandler,
60        candle_handler: &mut dyn CandleHandler,
61        order_book_handler: &mut dyn OrderBookHandler,
62    ) -> BrokerResult<()> {
63        loop {
64            let mut progressed = false;
65
66            if let Some(tick) = self.stream.next_tick().await? {
67                tick_handler
68                    .on_tick(tick)
69                    .await
70                    .context("tick handler failed")
71                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
72                progressed = true;
73            }
74
75            if let Some(candle) = self.stream.next_candle().await? {
76                candle_handler
77                    .on_candle(candle)
78                    .await
79                    .context("candle handler failed")
80                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
81                progressed = true;
82            }
83
84            if let Some(book) = self.stream.next_order_book().await? {
85                order_book_handler
86                    .on_order_book(book)
87                    .await
88                    .context("order book handler failed")
89                    .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
90                progressed = true;
91            }
92
93            if !progressed {
94                // Avoid busy looping when the upstream stream is momentarily idle.
95                sleep(self.backoff).await;
96            }
97        }
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use async_trait::async_trait;
105    use std::sync::{
106        atomic::{AtomicUsize, Ordering},
107        Arc,
108    };
109    use tesser_broker::BrokerInfo;
110    use tesser_core::{Interval, Side, Symbol};
111
112    struct TestStream {
113        ticks: Vec<Tick>,
114        candles: Vec<Candle>,
115    }
116
117    #[async_trait]
118    impl MarketStream for TestStream {
119        type Subscription = ();
120
121        fn name(&self) -> &str {
122            "test"
123        }
124
125        fn info(&self) -> Option<&BrokerInfo> {
126            None
127        }
128
129        async fn subscribe(&mut self, _subscription: Self::Subscription) -> BrokerResult<()> {
130            Ok(())
131        }
132
133        async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
134            Ok(self.ticks.pop())
135        }
136
137        async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
138            Ok(self.candles.pop())
139        }
140
141        async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
142            Ok(None)
143        }
144    }
145
146    struct CountHandler {
147        ticks: Arc<AtomicUsize>,
148        candles: Arc<AtomicUsize>,
149    }
150
151    #[async_trait]
152    impl TickHandler for CountHandler {
153        async fn on_tick(&mut self, _tick: Tick) -> anyhow::Result<()> {
154            self.ticks.fetch_add(1, Ordering::SeqCst);
155            Ok(())
156        }
157    }
158
159    #[async_trait]
160    impl CandleHandler for CountHandler {
161        async fn on_candle(&mut self, _candle: Candle) -> anyhow::Result<()> {
162            self.candles.fetch_add(1, Ordering::SeqCst);
163            Ok(())
164        }
165    }
166
167    #[async_trait]
168    impl OrderBookHandler for CountHandler {
169        async fn on_order_book(&mut self, _book: OrderBook) -> anyhow::Result<()> {
170            Ok(())
171        }
172    }
173
174    #[tokio::test]
175    async fn distributor_pumps_events() {
176        let ticks = vec![Tick {
177            symbol: Symbol::from("BTCUSDT"),
178            price: Decimal::ONE,
179            size: Decimal::ONE,
180            side: Side::Buy,
181            exchange_timestamp: chrono::Utc::now(),
182            received_at: chrono::Utc::now(),
183        }];
184        let candles = vec![Candle {
185            symbol: Symbol::from("BTCUSDT"),
186            interval: Interval::OneMinute,
187            open: Decimal::ONE,
188            high: Decimal::ONE,
189            low: Decimal::ONE,
190            close: Decimal::ONE,
191            volume: Decimal::ONE,
192            timestamp: chrono::Utc::now(),
193        }];
194        let mut distributor = DataDistributor::new(TestStream { ticks, candles });
195        let ticks_counter = Arc::new(AtomicUsize::new(0));
196        let candles_counter = Arc::new(AtomicUsize::new(0));
197        let mut tick_handler = CountHandler {
198            ticks: ticks_counter.clone(),
199            candles: candles_counter.clone(),
200        };
201        let mut candle_handler = CountHandler {
202            ticks: ticks_counter.clone(),
203            candles: candles_counter.clone(),
204        };
205        let mut order_book_handler = CountHandler {
206            ticks: ticks_counter.clone(),
207            candles: candles_counter.clone(),
208        };
209        let _ = tokio::time::timeout(Duration::from_millis(50), async {
210            distributor
211                .run(
212                    &mut tick_handler,
213                    &mut candle_handler,
214                    &mut order_book_handler,
215                )
216                .await
217        })
218        .await;
219        assert!(ticks_counter.load(Ordering::SeqCst) >= 1);
220        assert!(candles_counter.load(Ordering::SeqCst) >= 1);
221    }
222}