Skip to main content

barter_execution/client/mock/
mod.rs

1use crate::{
2    UnindexedAccountEvent, UnindexedAccountSnapshot,
3    balance::AssetBalance,
4    client::ExecutionClient,
5    error::{ConnectivityError, UnindexedClientError, UnindexedOrderError},
6    exchange::mock::request::MockExchangeRequest,
7    order::{
8        Order, OrderEvent, OrderKey,
9        request::{OrderRequestCancel, OrderRequestOpen, UnindexedOrderResponseCancel},
10        state::Open,
11    },
12    trade::Trade,
13};
14use barter_instrument::{
15    asset::{QuoteAsset, name::AssetNameExchange},
16    exchange::ExchangeId,
17    instrument::name::InstrumentNameExchange,
18};
19use chrono::{DateTime, Utc};
20use derive_more::Constructor;
21use futures::stream::BoxStream;
22use rust_decimal::Decimal;
23use serde::{Deserialize, Serialize};
24use tokio::sync::{broadcast, mpsc, oneshot};
25use tokio_stream::{StreamExt, wrappers::BroadcastStream};
26use tracing::error;
27
28#[derive(
29    Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
30)]
31pub struct MockExecutionConfig {
32    pub mocked_exchange: ExchangeId,
33    pub initial_state: UnindexedAccountSnapshot,
34    pub latency_ms: u64,
35    pub fees_percent: Decimal,
36}
37
38#[derive(Debug, Constructor)]
39pub struct MockExecutionClientConfig<FnTime> {
40    pub mocked_exchange: ExchangeId,
41    pub clock: FnTime,
42    pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
43    pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
44}
45
46impl<FnTime> Clone for MockExecutionClientConfig<FnTime>
47where
48    FnTime: Clone,
49{
50    fn clone(&self) -> Self {
51        Self {
52            mocked_exchange: self.mocked_exchange,
53            clock: self.clock.clone(),
54            request_tx: self.request_tx.clone(),
55            event_rx: self.event_rx.resubscribe(),
56        }
57    }
58}
59
60#[derive(Debug, Constructor)]
61pub struct MockExecution<FnTime> {
62    pub mocked_exchange: ExchangeId,
63    pub clock: FnTime,
64    pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
65    pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
66}
67
68impl<FnTime> Clone for MockExecution<FnTime>
69where
70    FnTime: Clone,
71{
72    fn clone(&self) -> Self {
73        Self {
74            mocked_exchange: self.mocked_exchange,
75            clock: self.clock.clone(),
76            request_tx: self.request_tx.clone(),
77            event_rx: self.event_rx.resubscribe(),
78        }
79    }
80}
81
82impl<FnTime> MockExecution<FnTime>
83where
84    FnTime: Fn() -> DateTime<Utc>,
85{
86    pub fn time_request(&self) -> DateTime<Utc> {
87        (self.clock)()
88    }
89}
90
91impl<FnTime> ExecutionClient for MockExecution<FnTime>
92where
93    FnTime: Fn() -> DateTime<Utc> + Clone + Sync,
94{
95    const EXCHANGE: ExchangeId = ExchangeId::Mock;
96    type Config = MockExecutionClientConfig<FnTime>;
97    type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
98
99    fn new(config: Self::Config) -> Self {
100        Self {
101            mocked_exchange: config.mocked_exchange,
102            clock: config.clock,
103            request_tx: config.request_tx,
104            event_rx: config.event_rx,
105        }
106    }
107
108    async fn account_snapshot(
109        &self,
110        _: &[AssetNameExchange],
111        _: &[InstrumentNameExchange],
112    ) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
113        let (response_tx, response_rx) = oneshot::channel();
114
115        self.request_tx
116            .send(MockExchangeRequest::fetch_account_snapshot(
117                self.time_request(),
118                response_tx,
119            ))
120            .map_err(|_| {
121                UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
122                    self.mocked_exchange,
123                ))
124            })?;
125
126        response_rx.await.map_err(|_| {
127            UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
128                self.mocked_exchange,
129            ))
130        })
131    }
132
133    async fn account_stream(
134        &self,
135        _: &[AssetNameExchange],
136        _: &[InstrumentNameExchange],
137    ) -> Result<Self::AccountStream, UnindexedClientError> {
138        Ok(futures::StreamExt::boxed(
139            BroadcastStream::new(self.event_rx.resubscribe()).map_while(|result| match result {
140                Ok(event) => Some(event),
141                Err(error) => {
142                    error!(
143                        ?error,
144                        "MockExchange Broadcast AccountStream lagged - terminating"
145                    );
146                    None
147                }
148            }),
149        ))
150    }
151
152    async fn cancel_order(
153        &self,
154        request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
155    ) -> Option<UnindexedOrderResponseCancel> {
156        let (response_tx, response_rx) = oneshot::channel();
157
158        let key = OrderKey {
159            exchange: request.key.exchange,
160            instrument: request.key.instrument.clone(),
161            strategy: request.key.strategy.clone(),
162            cid: request.key.cid.clone(),
163        };
164
165        if self
166            .request_tx
167            .send(MockExchangeRequest::cancel_order(
168                self.time_request(),
169                response_tx,
170                into_owned_request(request),
171            ))
172            .is_err()
173        {
174            return Some(UnindexedOrderResponseCancel {
175                key,
176                state: Err(UnindexedOrderError::Connectivity(
177                    ConnectivityError::ExchangeOffline(self.mocked_exchange),
178                )),
179            });
180        }
181
182        Some(match response_rx.await {
183            Ok(response) => response,
184            Err(_) => UnindexedOrderResponseCancel {
185                key,
186                state: Err(UnindexedOrderError::Connectivity(
187                    ConnectivityError::ExchangeOffline(self.mocked_exchange),
188                )),
189            },
190        })
191    }
192
193    async fn open_order(
194        &self,
195        request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
196    ) -> Option<Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>> {
197        let (response_tx, response_rx) = oneshot::channel();
198
199        let request = into_owned_request(request);
200
201        if self
202            .request_tx
203            .send(MockExchangeRequest::open_order(
204                self.time_request(),
205                response_tx,
206                request.clone(),
207            ))
208            .is_err()
209        {
210            return Some(Order {
211                key: request.key,
212                side: request.state.side,
213                price: request.state.price,
214                quantity: request.state.quantity,
215                kind: request.state.kind,
216                time_in_force: request.state.time_in_force,
217                state: Err(UnindexedOrderError::Connectivity(
218                    ConnectivityError::ExchangeOffline(self.mocked_exchange),
219                )),
220            });
221        }
222
223        Some(match response_rx.await {
224            Ok(response) => response,
225            Err(_) => Order {
226                key: request.key,
227                side: request.state.side,
228                price: request.state.price,
229                quantity: request.state.quantity,
230                kind: request.state.kind,
231                time_in_force: request.state.time_in_force,
232                state: Err(UnindexedOrderError::Connectivity(
233                    ConnectivityError::ExchangeOffline(self.mocked_exchange),
234                )),
235            },
236        })
237    }
238
239    async fn fetch_balances(
240        &self,
241        assets: &[AssetNameExchange],
242    ) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
243        let (response_tx, response_rx) = oneshot::channel();
244
245        self.request_tx
246            .send(MockExchangeRequest::fetch_balances(
247                self.time_request(),
248                assets.to_vec(),
249                response_tx,
250            ))
251            .map_err(|_| {
252                UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
253                    self.mocked_exchange,
254                ))
255            })?;
256
257        response_rx.await.map_err(|_| {
258            UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
259                self.mocked_exchange,
260            ))
261        })
262    }
263
264    async fn fetch_open_orders(
265        &self,
266        instruments: &[InstrumentNameExchange],
267    ) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
268        let (response_tx, response_rx) = oneshot::channel();
269
270        self.request_tx
271            .send(MockExchangeRequest::fetch_orders_open(
272                self.time_request(),
273                instruments.to_vec(),
274                response_tx,
275            ))
276            .map_err(|_| {
277                UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
278                    self.mocked_exchange,
279                ))
280            })?;
281
282        response_rx.await.map_err(|_| {
283            UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
284                self.mocked_exchange,
285            ))
286        })
287    }
288
289    async fn fetch_trades(
290        &self,
291        time_since: DateTime<Utc>,
292    ) -> Result<Vec<Trade<QuoteAsset, InstrumentNameExchange>>, UnindexedClientError> {
293        let (response_tx, response_rx) = oneshot::channel();
294
295        self.request_tx
296            .send(MockExchangeRequest::fetch_trades(
297                self.time_request(),
298                response_tx,
299                time_since,
300            ))
301            .map_err(|_| {
302                UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
303                    self.mocked_exchange,
304                ))
305            })?;
306
307        response_rx.await.map_err(|_| {
308            UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
309                self.mocked_exchange,
310            ))
311        })
312    }
313}
314
315fn into_owned_request<Kind>(
316    request: OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>,
317) -> OrderEvent<Kind, ExchangeId, InstrumentNameExchange> {
318    let OrderEvent {
319        key:
320            OrderKey {
321                exchange,
322                instrument,
323                strategy,
324                cid,
325            },
326        state,
327    } = request;
328
329    OrderEvent {
330        key: OrderKey {
331            exchange,
332            instrument: instrument.clone(),
333            strategy,
334            cid,
335        },
336        state,
337    }
338}