1pub mod download;
4
5use anyhow::Context;
7use async_trait::async_trait;
8#[cfg(test)]
9use rust_decimal::Decimal;
10use tokio::time::{sleep, Duration};
11use tracing::instrument;
12
13use tesser_broker::{BrokerResult, MarketStream};
14use tesser_core::{Candle, OrderBook, Tick};
15
16#[async_trait]
18pub trait TickHandler: Send {
19 async fn on_tick(&mut self, tick: Tick) -> anyhow::Result<()>;
21}
22
23#[async_trait]
25pub trait CandleHandler: Send {
26 async fn on_candle(&mut self, candle: Candle) -> anyhow::Result<()>;
28}
29
30#[async_trait]
32pub trait OrderBookHandler: Send {
33 async fn on_order_book(&mut self, book: OrderBook) -> anyhow::Result<()>;
35}
36
37pub struct DataDistributor<S> {
39 stream: S,
40 backoff: Duration,
41}
42
43impl<S> DataDistributor<S>
44where
45 S: MarketStream,
46{
47 pub fn new(stream: S) -> Self {
49 Self {
50 stream,
51 backoff: Duration::from_millis(200),
52 }
53 }
54
55 #[instrument(skip_all)]
57 pub async fn run(
58 &mut self,
59 tick_handler: &mut dyn TickHandler,
60 candle_handler: &mut dyn CandleHandler,
61 order_book_handler: &mut dyn OrderBookHandler,
62 ) -> BrokerResult<()> {
63 loop {
64 let mut progressed = false;
65
66 if let Some(tick) = self.stream.next_tick().await? {
67 tick_handler
68 .on_tick(tick)
69 .await
70 .context("tick handler failed")
71 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
72 progressed = true;
73 }
74
75 if let Some(candle) = self.stream.next_candle().await? {
76 candle_handler
77 .on_candle(candle)
78 .await
79 .context("candle handler failed")
80 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
81 progressed = true;
82 }
83
84 if let Some(book) = self.stream.next_order_book().await? {
85 order_book_handler
86 .on_order_book(book)
87 .await
88 .context("order book handler failed")
89 .map_err(|err| tesser_broker::BrokerError::Other(err.to_string()))?;
90 progressed = true;
91 }
92
93 if !progressed {
94 sleep(self.backoff).await;
96 }
97 }
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use async_trait::async_trait;
105 use std::sync::{
106 atomic::{AtomicUsize, Ordering},
107 Arc,
108 };
109 use tesser_broker::BrokerInfo;
110 use tesser_core::{Interval, Side, Symbol};
111
112 struct TestStream {
113 ticks: Vec<Tick>,
114 candles: Vec<Candle>,
115 }
116
117 #[async_trait]
118 impl MarketStream for TestStream {
119 type Subscription = ();
120
121 fn name(&self) -> &str {
122 "test"
123 }
124
125 fn info(&self) -> Option<&BrokerInfo> {
126 None
127 }
128
129 async fn subscribe(&mut self, _subscription: Self::Subscription) -> BrokerResult<()> {
130 Ok(())
131 }
132
133 async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
134 Ok(self.ticks.pop())
135 }
136
137 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
138 Ok(self.candles.pop())
139 }
140
141 async fn next_order_book(&mut self) -> BrokerResult<Option<tesser_core::OrderBook>> {
142 Ok(None)
143 }
144 }
145
146 struct CountHandler {
147 ticks: Arc<AtomicUsize>,
148 candles: Arc<AtomicUsize>,
149 }
150
151 #[async_trait]
152 impl TickHandler for CountHandler {
153 async fn on_tick(&mut self, _tick: Tick) -> anyhow::Result<()> {
154 self.ticks.fetch_add(1, Ordering::SeqCst);
155 Ok(())
156 }
157 }
158
159 #[async_trait]
160 impl CandleHandler for CountHandler {
161 async fn on_candle(&mut self, _candle: Candle) -> anyhow::Result<()> {
162 self.candles.fetch_add(1, Ordering::SeqCst);
163 Ok(())
164 }
165 }
166
167 #[async_trait]
168 impl OrderBookHandler for CountHandler {
169 async fn on_order_book(&mut self, _book: OrderBook) -> anyhow::Result<()> {
170 Ok(())
171 }
172 }
173
174 #[tokio::test]
175 async fn distributor_pumps_events() {
176 let ticks = vec![Tick {
177 symbol: Symbol::from("BTCUSDT"),
178 price: Decimal::ONE,
179 size: Decimal::ONE,
180 side: Side::Buy,
181 exchange_timestamp: chrono::Utc::now(),
182 received_at: chrono::Utc::now(),
183 }];
184 let candles = vec![Candle {
185 symbol: Symbol::from("BTCUSDT"),
186 interval: Interval::OneMinute,
187 open: Decimal::ONE,
188 high: Decimal::ONE,
189 low: Decimal::ONE,
190 close: Decimal::ONE,
191 volume: Decimal::ONE,
192 timestamp: chrono::Utc::now(),
193 }];
194 let mut distributor = DataDistributor::new(TestStream { ticks, candles });
195 let ticks_counter = Arc::new(AtomicUsize::new(0));
196 let candles_counter = Arc::new(AtomicUsize::new(0));
197 let mut tick_handler = CountHandler {
198 ticks: ticks_counter.clone(),
199 candles: candles_counter.clone(),
200 };
201 let mut candle_handler = CountHandler {
202 ticks: ticks_counter.clone(),
203 candles: candles_counter.clone(),
204 };
205 let mut order_book_handler = CountHandler {
206 ticks: ticks_counter.clone(),
207 candles: candles_counter.clone(),
208 };
209 let _ = tokio::time::timeout(Duration::from_millis(50), async {
210 distributor
211 .run(
212 &mut tick_handler,
213 &mut candle_handler,
214 &mut order_book_handler,
215 )
216 .await
217 })
218 .await;
219 assert!(ticks_counter.load(Ordering::SeqCst) >= 1);
220 assert!(candles_counter.load(Ordering::SeqCst) >= 1);
221 }
222}