1pub mod analytics;
4pub mod download;
5pub mod encoding;
6pub mod parquet;
7pub mod recorder;
8
9use anyhow::Context;
11use async_trait::async_trait;
12#[cfg(test)]
13use rust_decimal::Decimal;
14use tokio::time::{sleep, Duration};
15use tracing::instrument;
16
17use tesser_broker::{BrokerResult, MarketStream};
18use tesser_core::{Candle, OrderBook, Tick};
19
20#[async_trait]
22pub trait TickHandler: Send {
23 async fn on_tick(&mut self, tick: Tick) -> anyhow::Result<()>;
25}
26
27#[async_trait]
29pub trait CandleHandler: Send {
30 async fn on_candle(&mut self, candle: Candle) -> anyhow::Result<()>;
32}
33
34#[async_trait]
36pub trait OrderBookHandler: Send {
37 async fn on_order_book(&mut self, book: OrderBook) -> anyhow::Result<()>;
39}
40
41pub struct DataDistributor<S> {
43 stream: S,
44 backoff: Duration,
45}
46
47impl<S> DataDistributor<S>
48where
49 S: MarketStream,
50{
51 pub fn new(stream: S) -> Self {
53 Self {
54 stream,
55 backoff: Duration::from_millis(200),
56 }
57 }
58
59 #[instrument(skip_all)]
61 pub async fn run(
62 &mut self,
63 tick_handler: &mut dyn TickHandler,
64 candle_handler: &mut dyn CandleHandler,
65 order_book_handler: &mut dyn OrderBookHandler,
66 ) -> BrokerResult<()> {
67 loop {
68 let mut progressed = false;
69
70 if let Some(tick) = self.stream.next_tick().await? {
71 tick_handler
72 .on_tick(tick)
73 .await
74 .context("tick handler failed")
75 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
76 progressed = true;
77 }
78
79 if let Some(candle) = self.stream.next_candle().await? {
80 candle_handler
81 .on_candle(candle)
82 .await
83 .context("candle handler failed")
84 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
85 progressed = true;
86 }
87
88 if let Some(book) = self.stream.next_order_book().await? {
89 order_book_handler
90 .on_order_book(book)
91 .await
92 .context("order book handler failed")
93 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
94 progressed = true;
95 }
96
97 if !progressed {
98 sleep(self.backoff).await;
100 }
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 use async_trait::async_trait;
109 use std::sync::{
110 atomic::{AtomicUsize, Ordering},
111 Arc,
112 };
113 use tesser_broker::BrokerInfo;
114 use tesser_core::{Interval, Side, Symbol};
115
116 struct TestStream {
117 ticks: Vec<Tick>,
118 candles: Vec<Candle>,
119 }
120
121 #[async_trait]
122 impl MarketStream for TestStream {
123 type Subscription = ();
124
125 fn name(&self) -> &str {
126 "test"
127 }
128
129 fn info(&self) -> Option<&BrokerInfo> {
130 None
131 }
132
133 async fn subscribe(&mut self, _subscription: Self::Subscription) -> BrokerResult<()> {
134 Ok(())
135 }
136
137 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
138 Ok(self.ticks.pop())
139 }
140
141 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
142 Ok(self.candles.pop())
143 }
144
145 async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
146 Ok(None)
147 }
148 }
149
150 struct CountHandler {
151 ticks: Arc<AtomicUsize>,
152 candles: Arc<AtomicUsize>,
153 }
154
155 #[async_trait]
156 impl TickHandler for CountHandler {
157 async fn on_tick(&mut self, _tick: Tick) -> anyhow::Result<()> {
158 self.ticks.fetch_add(1, Ordering::SeqCst);
159 Ok(())
160 }
161 }
162
163 #[async_trait]
164 impl CandleHandler for CountHandler {
165 async fn on_candle(&mut self, _candle: Candle) -> anyhow::Result<()> {
166 self.candles.fetch_add(1, Ordering::SeqCst);
167 Ok(())
168 }
169 }
170
171 #[async_trait]
172 impl OrderBookHandler for CountHandler {
173 async fn on_order_book(&mut self, _book: OrderBook) -> anyhow::Result<()> {
174 Ok(())
175 }
176 }
177
178 #[tokio::test]
179 async fn distributor_pumps_events() {
180 let ticks = vec![Tick {
181 symbol: Symbol::from("BTCUSDT"),
182 price: Decimal::ONE,
183 size: Decimal::ONE,
184 side: Side::Buy,
185 exchange_timestamp: chrono::Utc::now(),
186 received_at: chrono::Utc::now(),
187 }];
188 let candles = vec![Candle {
189 symbol: Symbol::from("BTCUSDT"),
190 interval: Interval::OneMinute,
191 open: Decimal::ONE,
192 high: Decimal::ONE,
193 low: Decimal::ONE,
194 close: Decimal::ONE,
195 volume: Decimal::ONE,
196 timestamp: chrono::Utc::now(),
197 }];
198 let mut distributor = DataDistributor::new(TestStream { ticks, candles });
199 let ticks_counter = Arc::new(AtomicUsize::new(0));
200 let candles_counter = Arc::new(AtomicUsize::new(0));
201 let mut tick_handler = CountHandler {
202 ticks: ticks_counter.clone(),
203 candles: candles_counter.clone(),
204 };
205 let mut candle_handler = CountHandler {
206 ticks: ticks_counter.clone(),
207 candles: candles_counter.clone(),
208 };
209 let mut order_book_handler = CountHandler {
210 ticks: ticks_counter.clone(),
211 candles: candles_counter.clone(),
212 };
213 let _ = tokio::time::timeout(Duration::from_millis(50), async {
214 distributor
215 .run(
216 &mut tick_handler,
217 &mut candle_handler,
218 &mut order_book_handler,
219 )
220 .await
221 })
222 .await;
223 assert!(ticks_counter.load(Ordering::SeqCst) >= 1);
224 assert!(candles_counter.load(Ordering::SeqCst) >= 1);
225 }
226}