1pub mod analytics;
4pub mod download;
5pub mod encoding;
6pub mod etl;
7pub mod io;
8pub mod merger;
9pub mod parquet;
10pub mod recorder;
11pub mod schema;
12pub mod transform;
13
14use anyhow::Context;
16use async_trait::async_trait;
17#[cfg(test)]
18use rust_decimal::Decimal;
19use tokio::time::{sleep, Duration};
20use tracing::instrument;
21
22use tesser_broker::{BrokerResult, MarketStream};
23use tesser_core::{Candle, OrderBook, Tick};
24
25#[async_trait]
27pub trait TickHandler: Send {
28 async fn on_tick(&mut self, tick: Tick) -> anyhow::Result<()>;
30}
31
32#[async_trait]
34pub trait CandleHandler: Send {
35 async fn on_candle(&mut self, candle: Candle) -> anyhow::Result<()>;
37}
38
39#[async_trait]
41pub trait OrderBookHandler: Send {
42 async fn on_order_book(&mut self, book: OrderBook) -> anyhow::Result<()>;
44}
45
46pub struct DataDistributor<S> {
48 stream: S,
49 backoff: Duration,
50}
51
52impl<S> DataDistributor<S>
53where
54 S: MarketStream,
55{
56 pub fn new(stream: S) -> Self {
58 Self {
59 stream,
60 backoff: Duration::from_millis(200),
61 }
62 }
63
64 #[instrument(skip_all)]
66 pub async fn run(
67 &mut self,
68 tick_handler: &mut dyn TickHandler,
69 candle_handler: &mut dyn CandleHandler,
70 order_book_handler: &mut dyn OrderBookHandler,
71 ) -> BrokerResult<()> {
72 loop {
73 let mut progressed = false;
74
75 if let Some(tick) = self.stream.next_tick().await? {
76 tick_handler
77 .on_tick(tick)
78 .await
79 .context("tick handler failed")
80 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
81 progressed = true;
82 }
83
84 if let Some(candle) = self.stream.next_candle().await? {
85 candle_handler
86 .on_candle(candle)
87 .await
88 .context("candle handler failed")
89 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
90 progressed = true;
91 }
92
93 if let Some(book) = self.stream.next_order_book().await? {
94 order_book_handler
95 .on_order_book(book)
96 .await
97 .context("order book handler failed")
98 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
99 progressed = true;
100 }
101
102 if !progressed {
103 sleep(self.backoff).await;
105 }
106 }
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use async_trait::async_trait;
114 use std::sync::{
115 atomic::{AtomicUsize, Ordering},
116 Arc,
117 };
118 use tesser_broker::BrokerInfo;
119 use tesser_core::{Interval, Side, Symbol};
120
121 struct TestStream {
122 ticks: Vec<Tick>,
123 candles: Vec<Candle>,
124 }
125
126 #[async_trait]
127 impl MarketStream for TestStream {
128 type Subscription = ();
129
130 fn name(&self) -> &str {
131 "test"
132 }
133
134 fn info(&self) -> Option<&BrokerInfo> {
135 None
136 }
137
138 async fn subscribe(&mut self, _subscription: Self::Subscription) -> BrokerResult<()> {
139 Ok(())
140 }
141
142 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
143 Ok(self.ticks.pop())
144 }
145
146 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
147 Ok(self.candles.pop())
148 }
149
150 async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
151 Ok(None)
152 }
153 }
154
155 struct CountHandler {
156 ticks: Arc<AtomicUsize>,
157 candles: Arc<AtomicUsize>,
158 }
159
160 #[async_trait]
161 impl TickHandler for CountHandler {
162 async fn on_tick(&mut self, _tick: Tick) -> anyhow::Result<()> {
163 self.ticks.fetch_add(1, Ordering::SeqCst);
164 Ok(())
165 }
166 }
167
168 #[async_trait]
169 impl CandleHandler for CountHandler {
170 async fn on_candle(&mut self, _candle: Candle) -> anyhow::Result<()> {
171 self.candles.fetch_add(1, Ordering::SeqCst);
172 Ok(())
173 }
174 }
175
176 #[async_trait]
177 impl OrderBookHandler for CountHandler {
178 async fn on_order_book(&mut self, _book: OrderBook) -> anyhow::Result<()> {
179 Ok(())
180 }
181 }
182
183 #[tokio::test]
184 async fn distributor_pumps_events() {
185 let ticks = vec![Tick {
186 symbol: Symbol::from("BTCUSDT"),
187 price: Decimal::ONE,
188 size: Decimal::ONE,
189 side: Side::Buy,
190 exchange_timestamp: chrono::Utc::now(),
191 received_at: chrono::Utc::now(),
192 }];
193 let candles = vec![Candle {
194 symbol: Symbol::from("BTCUSDT"),
195 interval: Interval::OneMinute,
196 open: Decimal::ONE,
197 high: Decimal::ONE,
198 low: Decimal::ONE,
199 close: Decimal::ONE,
200 volume: Decimal::ONE,
201 timestamp: chrono::Utc::now(),
202 }];
203 let mut distributor = DataDistributor::new(TestStream { ticks, candles });
204 let ticks_counter = Arc::new(AtomicUsize::new(0));
205 let candles_counter = Arc::new(AtomicUsize::new(0));
206 let mut tick_handler = CountHandler {
207 ticks: ticks_counter.clone(),
208 candles: candles_counter.clone(),
209 };
210 let mut candle_handler = CountHandler {
211 ticks: ticks_counter.clone(),
212 candles: candles_counter.clone(),
213 };
214 let mut order_book_handler = CountHandler {
215 ticks: ticks_counter.clone(),
216 candles: candles_counter.clone(),
217 };
218 let _ = tokio::time::timeout(Duration::from_millis(50), async {
219 distributor
220 .run(
221 &mut tick_handler,
222 &mut candle_handler,
223 &mut order_book_handler,
224 )
225 .await
226 })
227 .await;
228 assert!(ticks_counter.load(Ordering::SeqCst) >= 1);
229 assert!(candles_counter.load(Ordering::SeqCst) >= 1);
230 }
231}