1use crate::{
2 UnindexedAccountEvent, UnindexedAccountSnapshot,
3 balance::AssetBalance,
4 client::ExecutionClient,
5 error::{ConnectivityError, UnindexedClientError, UnindexedOrderError},
6 exchange::mock::request::MockExchangeRequest,
7 order::{
8 Order, OrderEvent, OrderKey,
9 request::{OrderRequestCancel, OrderRequestOpen, UnindexedOrderResponseCancel},
10 state::Open,
11 },
12 trade::Trade,
13};
14use barter_instrument::{
15 asset::{QuoteAsset, name::AssetNameExchange},
16 exchange::ExchangeId,
17 instrument::name::InstrumentNameExchange,
18};
19use chrono::{DateTime, Utc};
20use derive_more::Constructor;
21use futures::stream::BoxStream;
22use rust_decimal::Decimal;
23use serde::{Deserialize, Serialize};
24use tokio::sync::{broadcast, mpsc, oneshot};
25use tokio_stream::{StreamExt, wrappers::BroadcastStream};
26use tracing::error;
27
28#[derive(
29 Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
30)]
31pub struct MockExecutionConfig {
32 pub mocked_exchange: ExchangeId,
33 pub initial_state: UnindexedAccountSnapshot,
34 pub latency_ms: u64,
35 pub fees_percent: Decimal,
36}
37
38#[derive(Debug, Constructor)]
39pub struct MockExecutionClientConfig<FnTime> {
40 pub mocked_exchange: ExchangeId,
41 pub clock: FnTime,
42 pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
43 pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
44}
45
46impl<FnTime> Clone for MockExecutionClientConfig<FnTime>
47where
48 FnTime: Clone,
49{
50 fn clone(&self) -> Self {
51 Self {
52 mocked_exchange: self.mocked_exchange,
53 clock: self.clock.clone(),
54 request_tx: self.request_tx.clone(),
55 event_rx: self.event_rx.resubscribe(),
56 }
57 }
58}
59
60#[derive(Debug, Constructor)]
61pub struct MockExecution<FnTime> {
62 pub mocked_exchange: ExchangeId,
63 pub clock: FnTime,
64 pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
65 pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
66}
67
68impl<FnTime> Clone for MockExecution<FnTime>
69where
70 FnTime: Clone,
71{
72 fn clone(&self) -> Self {
73 Self {
74 mocked_exchange: self.mocked_exchange,
75 clock: self.clock.clone(),
76 request_tx: self.request_tx.clone(),
77 event_rx: self.event_rx.resubscribe(),
78 }
79 }
80}
81
82impl<FnTime> MockExecution<FnTime>
83where
84 FnTime: Fn() -> DateTime<Utc>,
85{
86 pub fn time_request(&self) -> DateTime<Utc> {
87 (self.clock)()
88 }
89}
90
91impl<FnTime> ExecutionClient for MockExecution<FnTime>
92where
93 FnTime: Fn() -> DateTime<Utc> + Clone + Sync,
94{
95 const EXCHANGE: ExchangeId = ExchangeId::Mock;
96 type Config = MockExecutionClientConfig<FnTime>;
97 type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
98
99 fn new(config: Self::Config) -> Self {
100 Self {
101 mocked_exchange: config.mocked_exchange,
102 clock: config.clock,
103 request_tx: config.request_tx,
104 event_rx: config.event_rx,
105 }
106 }
107
108 async fn account_snapshot(
109 &self,
110 _: &[AssetNameExchange],
111 _: &[InstrumentNameExchange],
112 ) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
113 let (response_tx, response_rx) = oneshot::channel();
114
115 self.request_tx
116 .send(MockExchangeRequest::fetch_account_snapshot(
117 self.time_request(),
118 response_tx,
119 ))
120 .map_err(|_| {
121 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
122 self.mocked_exchange,
123 ))
124 })?;
125
126 response_rx.await.map_err(|_| {
127 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
128 self.mocked_exchange,
129 ))
130 })
131 }
132
133 async fn account_stream(
134 &self,
135 _: &[AssetNameExchange],
136 _: &[InstrumentNameExchange],
137 ) -> Result<Self::AccountStream, UnindexedClientError> {
138 Ok(futures::StreamExt::boxed(
139 BroadcastStream::new(self.event_rx.resubscribe()).map_while(|result| match result {
140 Ok(event) => Some(event),
141 Err(error) => {
142 error!(
143 ?error,
144 "MockExchange Broadcast AccountStream lagged - terminating"
145 );
146 None
147 }
148 }),
149 ))
150 }
151
152 async fn cancel_order(
153 &self,
154 request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
155 ) -> Option<UnindexedOrderResponseCancel> {
156 let (response_tx, response_rx) = oneshot::channel();
157
158 let key = OrderKey {
159 exchange: request.key.exchange,
160 instrument: request.key.instrument.clone(),
161 strategy: request.key.strategy.clone(),
162 cid: request.key.cid.clone(),
163 };
164
165 if self
166 .request_tx
167 .send(MockExchangeRequest::cancel_order(
168 self.time_request(),
169 response_tx,
170 into_owned_request(request),
171 ))
172 .is_err()
173 {
174 return Some(UnindexedOrderResponseCancel {
175 key,
176 state: Err(UnindexedOrderError::Connectivity(
177 ConnectivityError::ExchangeOffline(self.mocked_exchange),
178 )),
179 });
180 }
181
182 Some(match response_rx.await {
183 Ok(response) => response,
184 Err(_) => UnindexedOrderResponseCancel {
185 key,
186 state: Err(UnindexedOrderError::Connectivity(
187 ConnectivityError::ExchangeOffline(self.mocked_exchange),
188 )),
189 },
190 })
191 }
192
193 async fn open_order(
194 &self,
195 request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
196 ) -> Option<Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>> {
197 let (response_tx, response_rx) = oneshot::channel();
198
199 let request = into_owned_request(request);
200
201 if self
202 .request_tx
203 .send(MockExchangeRequest::open_order(
204 self.time_request(),
205 response_tx,
206 request.clone(),
207 ))
208 .is_err()
209 {
210 return Some(Order {
211 key: request.key,
212 side: request.state.side,
213 price: request.state.price,
214 quantity: request.state.quantity,
215 kind: request.state.kind,
216 time_in_force: request.state.time_in_force,
217 state: Err(UnindexedOrderError::Connectivity(
218 ConnectivityError::ExchangeOffline(self.mocked_exchange),
219 )),
220 });
221 }
222
223 Some(match response_rx.await {
224 Ok(response) => response,
225 Err(_) => Order {
226 key: request.key,
227 side: request.state.side,
228 price: request.state.price,
229 quantity: request.state.quantity,
230 kind: request.state.kind,
231 time_in_force: request.state.time_in_force,
232 state: Err(UnindexedOrderError::Connectivity(
233 ConnectivityError::ExchangeOffline(self.mocked_exchange),
234 )),
235 },
236 })
237 }
238
239 async fn fetch_balances(
240 &self,
241 assets: &[AssetNameExchange],
242 ) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
243 let (response_tx, response_rx) = oneshot::channel();
244
245 self.request_tx
246 .send(MockExchangeRequest::fetch_balances(
247 self.time_request(),
248 assets.to_vec(),
249 response_tx,
250 ))
251 .map_err(|_| {
252 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
253 self.mocked_exchange,
254 ))
255 })?;
256
257 response_rx.await.map_err(|_| {
258 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
259 self.mocked_exchange,
260 ))
261 })
262 }
263
264 async fn fetch_open_orders(
265 &self,
266 instruments: &[InstrumentNameExchange],
267 ) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
268 let (response_tx, response_rx) = oneshot::channel();
269
270 self.request_tx
271 .send(MockExchangeRequest::fetch_orders_open(
272 self.time_request(),
273 instruments.to_vec(),
274 response_tx,
275 ))
276 .map_err(|_| {
277 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
278 self.mocked_exchange,
279 ))
280 })?;
281
282 response_rx.await.map_err(|_| {
283 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
284 self.mocked_exchange,
285 ))
286 })
287 }
288
289 async fn fetch_trades(
290 &self,
291 time_since: DateTime<Utc>,
292 ) -> Result<Vec<Trade<QuoteAsset, InstrumentNameExchange>>, UnindexedClientError> {
293 let (response_tx, response_rx) = oneshot::channel();
294
295 self.request_tx
296 .send(MockExchangeRequest::fetch_trades(
297 self.time_request(),
298 response_tx,
299 time_since,
300 ))
301 .map_err(|_| {
302 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
303 self.mocked_exchange,
304 ))
305 })?;
306
307 response_rx.await.map_err(|_| {
308 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
309 self.mocked_exchange,
310 ))
311 })
312 }
313}
314
315fn into_owned_request<Kind>(
316 request: OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>,
317) -> OrderEvent<Kind, ExchangeId, InstrumentNameExchange> {
318 let OrderEvent {
319 key:
320 OrderKey {
321 exchange,
322 instrument,
323 strategy,
324 cid,
325 },
326 state,
327 } = request;
328
329 OrderEvent {
330 key: OrderKey {
331 exchange,
332 instrument: instrument.clone(),
333 strategy,
334 cid,
335 },
336 state,
337 }
338}