barter_execution/exchange/mock/
mod.rs

1use crate::{
2    AccountEventKind, InstrumentAccountSnapshot, UnindexedAccountEvent, UnindexedAccountSnapshot,
3    balance::AssetBalance,
4    client::mock::MockExecutionConfig,
5    error::{ApiError, UnindexedApiError, UnindexedOrderError},
6    exchange::mock::{
7        account::AccountState,
8        request::{MockExchangeRequest, MockExchangeRequestKind},
9    },
10    order::{
11        Order, OrderKind, UnindexedOrder,
12        id::OrderId,
13        request::{OrderRequestCancel, OrderRequestOpen},
14        state::{Cancelled, Open},
15    },
16    trade::{AssetFees, Trade, TradeId},
17};
18use barter_instrument::{
19    Side,
20    asset::{QuoteAsset, name::AssetNameExchange},
21    exchange::ExchangeId,
22    instrument::{Instrument, name::InstrumentNameExchange},
23};
24use barter_integration::snapshot::Snapshot;
25use chrono::{DateTime, TimeDelta, Utc};
26use fnv::FnvHashMap;
27use futures::stream::BoxStream;
28use itertools::Itertools;
29use rust_decimal::Decimal;
30use smol_str::ToSmolStr;
31use std::fmt::Debug;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_stream::{StreamExt, wrappers::BroadcastStream};
34use tracing::{error, info};
35
36pub mod account;
37pub mod request;
38
39#[derive(Debug)]
40pub struct MockExchange {
41    pub exchange: ExchangeId,
42    pub latency_ms: u64,
43    pub fees_percent: Decimal,
44    pub request_rx: mpsc::UnboundedReceiver<MockExchangeRequest>,
45    pub event_tx: broadcast::Sender<UnindexedAccountEvent>,
46    pub instruments: FnvHashMap<InstrumentNameExchange, Instrument<ExchangeId, AssetNameExchange>>,
47    pub account: AccountState,
48    pub order_sequence: u64,
49    pub time_exchange_latest: DateTime<Utc>,
50}
51
52impl MockExchange {
53    pub fn new(
54        config: MockExecutionConfig,
55        request_rx: mpsc::UnboundedReceiver<MockExchangeRequest>,
56        event_tx: broadcast::Sender<UnindexedAccountEvent>,
57        instruments: FnvHashMap<InstrumentNameExchange, Instrument<ExchangeId, AssetNameExchange>>,
58    ) -> Self {
59        Self {
60            exchange: config.mocked_exchange,
61            latency_ms: config.latency_ms,
62            fees_percent: config.fees_percent,
63            request_rx,
64            event_tx,
65            instruments,
66            account: AccountState::from(config.initial_state),
67            order_sequence: 0,
68            time_exchange_latest: Default::default(),
69        }
70    }
71
72    pub async fn run(mut self) {
73        while let Some(request) = self.request_rx.recv().await {
74            self.update_time_exchange(request.time_request);
75
76            match request.kind {
77                MockExchangeRequestKind::FetchAccountSnapshot { response_tx } => {
78                    let snapshot = self.account_snapshot();
79                    self.respond_with_latency(response_tx, snapshot);
80                }
81                MockExchangeRequestKind::FetchBalances { response_tx } => {
82                    let balances = self.account.balances().cloned().collect();
83                    self.respond_with_latency(response_tx, balances);
84                }
85                MockExchangeRequestKind::FetchOrdersOpen { response_tx } => {
86                    let orders_open = self.account.orders_open().cloned().collect();
87                    self.respond_with_latency(response_tx, orders_open);
88                }
89                MockExchangeRequestKind::FetchTrades {
90                    response_tx,
91                    time_since,
92                } => {
93                    let trades = self.account.trades(time_since).cloned().collect();
94                    self.respond_with_latency(response_tx, trades);
95                }
96                MockExchangeRequestKind::CancelOrder {
97                    response_tx: _,
98                    request,
99                } => {
100                    error!(
101                        exchange = %self.exchange,
102                        ?request,
103                        "MockExchange received cancel request but only Market orders are supported"
104                    );
105                }
106                MockExchangeRequestKind::OpenOrder {
107                    response_tx,
108                    request,
109                } => {
110                    let (response, notifications) = self.open_order(request);
111                    self.respond_with_latency(response_tx, response);
112
113                    if let Some(notifications) = notifications {
114                        self.account.ack_trade(notifications.trade.clone());
115                        self.send_notifications_with_latency(notifications);
116                    }
117                }
118            }
119        }
120
121        info!(exchange = %self.exchange, "MockExchange shutting down");
122    }
123
124    fn update_time_exchange(&mut self, time_request: DateTime<Utc>) {
125        let client_to_exchange_latency = self.latency_ms / 2;
126
127        self.time_exchange_latest = time_request
128            .checked_add_signed(TimeDelta::milliseconds(client_to_exchange_latency as i64))
129            .unwrap_or(time_request);
130
131        self.account.update_time_exchange(self.time_exchange_latest)
132    }
133
134    pub fn time_exchange(&self) -> DateTime<Utc> {
135        self.time_exchange_latest
136    }
137
138    pub fn account_snapshot(&self) -> UnindexedAccountSnapshot {
139        let balances = self.account.balances().cloned().collect();
140
141        let orders_open = self
142            .account
143            .orders_open()
144            .cloned()
145            .map(UnindexedOrder::from);
146
147        let orders_cancelled = self
148            .account
149            .orders_cancelled()
150            .cloned()
151            .map(UnindexedOrder::from);
152
153        let orders_all = orders_open.chain(orders_cancelled);
154        let orders_all = orders_all.sorted_unstable_by_key(|order| order.key.instrument.clone());
155        let orders_by_instrument = orders_all.chunk_by(|order| order.key.instrument.clone());
156
157        let instruments = orders_by_instrument
158            .into_iter()
159            .map(|(instrument, orders)| InstrumentAccountSnapshot {
160                instrument,
161                orders: orders.into_iter().collect(),
162            })
163            .collect();
164
165        UnindexedAccountSnapshot {
166            exchange: self.exchange,
167            balances,
168            instruments,
169        }
170    }
171
172    /// Sends the provided `Response` via the [`oneshot::Sender`] after waiting for the latency
173    /// [`Duration`].
174    ///
175    /// Used to simulate network latency between the exchange and client.
176    fn respond_with_latency<Response>(
177        &self,
178        response_tx: oneshot::Sender<Response>,
179        response: Response,
180    ) where
181        Response: Send + 'static,
182    {
183        let exchange = self.exchange;
184        let latency = std::time::Duration::from_millis(self.latency_ms);
185
186        tokio::spawn(async move {
187            tokio::time::sleep(latency).await;
188            if response_tx.send(response).is_err() {
189                error!(
190                    %exchange,
191                    kind = std::any::type_name::<Response>(),
192                    "MockExchange failed to send oneshot response to client"
193                );
194            }
195        });
196    }
197
198    /// Sends the provided `OpenOrderNotifications` via the `MockExchanges`
199    /// `broadcast::Sender<UnindexedAccountEvent>` after waiting for the latency
200    /// [`Duration`].
201    ///
202    /// Used to simulate network latency between the exchange and client.
203    fn send_notifications_with_latency(&self, notifications: OpenOrderNotifications) {
204        let balance = self.build_account_event(notifications.balance);
205        let trade = self.build_account_event(notifications.trade);
206
207        let exchange = self.exchange;
208        let latency = std::time::Duration::from_millis(self.latency_ms);
209        let tx = self.event_tx.clone();
210        tokio::spawn(async move {
211            tokio::time::sleep(latency).await;
212
213            if tx.send(balance).is_err() {
214                error!(
215                    %exchange,
216                    kind = "Snapshot<AssetBalance<AssetNameExchange>",
217                    "MockExchange failed to send AccountEvent notification to client"
218                );
219            }
220
221            if tx.send(trade).is_err() {
222                error!(
223                    %exchange,
224                    kind = "Trade<QuoteAsset, InstrumentNameExchange>",
225                    "MockExchange failed to send AccountEvent notification to client"
226                );
227            }
228        });
229    }
230
231    pub fn account_stream(&self) -> BoxStream<'static, UnindexedAccountEvent> {
232        futures::StreamExt::boxed(BroadcastStream::new(self.event_tx.subscribe()).map_while(
233            |result| match result {
234                Ok(event) => Some(event),
235                Err(error) => {
236                    error!(
237                        ?error,
238                        "MockExchange Broadcast AccountStream lagged - terminating"
239                    );
240                    None
241                }
242            },
243        ))
244    }
245
246    pub fn cancel_order(
247        &mut self,
248        _: OrderRequestCancel<ExchangeId, InstrumentNameExchange>,
249    ) -> Order<ExchangeId, InstrumentNameExchange, Result<Cancelled, UnindexedOrderError>> {
250        unimplemented!()
251    }
252
253    pub fn open_order(
254        &mut self,
255        request: OrderRequestOpen<ExchangeId, InstrumentNameExchange>,
256    ) -> (
257        Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>,
258        Option<OpenOrderNotifications>,
259    ) {
260        if let Err(error) = self.validate_order_kind_supported(request.state.kind) {
261            return (build_open_order_err_response(request, error), None);
262        }
263
264        let underlying = match self.find_instrument_data(&request.key.instrument) {
265            Ok(instrument) => instrument.underlying.clone(),
266            Err(error) => return (build_open_order_err_response(request, error), None),
267        };
268
269        let time_exchange = self.time_exchange();
270
271        let balance_change_result = match request.state.side {
272            Side::Buy => {
273                // Buying Instrument requires sufficient QuoteAsset Balance
274                let current = self
275                    .account
276                    .balance_mut(&underlying.quote)
277                    .expect("MockExchange has Balance for all configured Instrument assets");
278
279                // Currently we only supported MarketKind orders, so they should be identical
280                assert_eq!(current.balance.total, current.balance.free);
281
282                let order_value_quote = request.state.price * request.state.quantity.abs();
283                let order_fees_quote = order_value_quote * self.fees_percent;
284                let quote_required = order_value_quote + order_fees_quote;
285
286                let maybe_new_balance = current.balance.free - quote_required;
287
288                if maybe_new_balance >= Decimal::ZERO {
289                    current.balance.free = maybe_new_balance;
290                    current.balance.total = maybe_new_balance;
291                    current.time_exchange = time_exchange;
292
293                    Ok((current.clone(), AssetFees::quote_fees(order_fees_quote)))
294                } else {
295                    Err(ApiError::BalanceInsufficient(
296                        underlying.quote,
297                        format!(
298                            "Available Balance: {}, Required Balance inc. fees: {}",
299                            current.balance.free, quote_required
300                        ),
301                    ))
302                }
303            }
304            Side::Sell => {
305                // Selling Instrument requires sufficient BaseAsset Balance
306                let current = self
307                    .account
308                    .balance_mut(&underlying.quote)
309                    .expect("MockExchange has Balance for all configured Instrument assets");
310
311                // Currently we only supported MarketKind orders, so they should be identical
312                assert_eq!(current.balance.total, current.balance.free);
313
314                let order_value_base = request.state.quantity.abs();
315                let order_fees_base = order_value_base * self.fees_percent;
316                let base_required = order_value_base + order_fees_base;
317
318                let maybe_new_balance = current.balance.free - base_required;
319
320                if maybe_new_balance >= Decimal::ZERO {
321                    current.balance.free = maybe_new_balance;
322                    current.balance.total = maybe_new_balance;
323                    current.time_exchange = time_exchange;
324
325                    let fees_quote = order_fees_base * request.state.price;
326
327                    Ok((current.clone(), AssetFees::quote_fees(fees_quote)))
328                } else {
329                    Err(ApiError::BalanceInsufficient(
330                        underlying.quote,
331                        format!(
332                            "Available Balance: {}, Required Balance inc. fees: {}",
333                            current.balance.free, base_required
334                        ),
335                    ))
336                }
337            }
338        };
339
340        let (balance_snapshot, fees) = match balance_change_result {
341            Ok((balance_snapshot, fees)) => (Snapshot(balance_snapshot), fees),
342            Err(error) => return (build_open_order_err_response(request, error), None),
343        };
344
345        let order_id = self.order_id_sequence_fetch_add();
346        let trade_id = TradeId(order_id.0.clone());
347
348        let order_response = Order {
349            key: request.key.clone(),
350            side: request.state.side,
351            price: request.state.price,
352            quantity: request.state.quantity,
353            kind: request.state.kind,
354            time_in_force: request.state.time_in_force,
355            state: Ok(Open {
356                id: order_id.clone(),
357                time_exchange: self.time_exchange(),
358                filled_quantity: request.state.quantity,
359            }),
360        };
361
362        let notifications = OpenOrderNotifications {
363            balance: balance_snapshot,
364            trade: Trade {
365                id: trade_id,
366                order_id: order_id.clone(),
367                instrument: request.key.instrument,
368                strategy: request.key.strategy,
369                time_exchange: self.time_exchange(),
370                side: request.state.side,
371                price: request.state.price,
372                quantity: request.state.quantity,
373                fees,
374            },
375        };
376
377        (order_response, Some(notifications))
378    }
379
380    pub fn validate_order_kind_supported(
381        &self,
382        order_kind: OrderKind,
383    ) -> Result<(), UnindexedOrderError> {
384        if order_kind == OrderKind::Market {
385            Ok(())
386        } else {
387            Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
388                format!("MockExchange does not supported OrderKind: {order_kind}"),
389            )))
390        }
391    }
392
393    pub fn find_instrument_data(
394        &self,
395        instrument: &InstrumentNameExchange,
396    ) -> Result<&Instrument<ExchangeId, AssetNameExchange>, UnindexedApiError> {
397        self.instruments.get(instrument).ok_or_else(|| {
398            ApiError::InstrumentInvalid(
399                instrument.clone(),
400                format!("MockExchange is not set-up for managing: {}", instrument),
401            )
402        })
403    }
404
405    fn order_id_sequence_fetch_add(&mut self) -> OrderId {
406        let sequence = self.order_sequence;
407        self.order_sequence += 1;
408        OrderId::new(sequence.to_smolstr())
409    }
410
411    fn build_account_event<Kind>(&self, kind: Kind) -> UnindexedAccountEvent
412    where
413        Kind: Into<AccountEventKind<ExchangeId, AssetNameExchange, InstrumentNameExchange>>,
414    {
415        UnindexedAccountEvent {
416            exchange: self.exchange,
417            kind: kind.into(),
418        }
419    }
420}
421
422fn build_open_order_err_response<E>(
423    request: OrderRequestOpen<ExchangeId, InstrumentNameExchange>,
424    error: E,
425) -> Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>
426where
427    E: Into<UnindexedOrderError>,
428{
429    Order {
430        key: request.key,
431        side: request.state.side,
432        price: request.state.price,
433        quantity: request.state.quantity,
434        kind: request.state.kind,
435        time_in_force: request.state.time_in_force,
436        state: Err(error.into()),
437    }
438}
439
440#[derive(Debug)]
441pub struct OpenOrderNotifications {
442    pub balance: Snapshot<AssetBalance<AssetNameExchange>>,
443    pub trade: Trade<QuoteAsset, InstrumentNameExchange>,
444}