Skip to main content

barter_execution/exchange/mock/
mod.rs

1use crate::{
2    AccountEventKind, InstrumentAccountSnapshot, UnindexedAccountEvent, UnindexedAccountSnapshot,
3    balance::AssetBalance,
4    client::mock::MockExecutionConfig,
5    error::{ApiError, UnindexedApiError, UnindexedOrderError},
6    exchange::mock::{
7        account::AccountState,
8        request::{MockExchangeRequest, MockExchangeRequestKind},
9    },
10    order::{
11        Order, OrderKind, UnindexedOrder,
12        id::OrderId,
13        request::{OrderRequestCancel, OrderRequestOpen},
14        state::{Cancelled, Open},
15    },
16    trade::{AssetFees, Trade, TradeId},
17};
18use barter_instrument::{
19    Side,
20    asset::{QuoteAsset, name::AssetNameExchange},
21    exchange::ExchangeId,
22    instrument::{Instrument, name::InstrumentNameExchange},
23};
24use barter_integration::collection::snapshot::Snapshot;
25use chrono::{DateTime, TimeDelta, Utc};
26use fnv::FnvHashMap;
27use futures::stream::BoxStream;
28use itertools::Itertools;
29use rust_decimal::Decimal;
30use smol_str::ToSmolStr;
31use std::fmt::Debug;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_stream::{StreamExt, wrappers::BroadcastStream};
34use tracing::{error, info};
35
36pub mod account;
37pub mod request;
38
39#[derive(Debug)]
40pub struct MockExchange {
41    pub exchange: ExchangeId,
42    pub latency_ms: u64,
43    pub fees_percent: Decimal,
44    pub request_rx: mpsc::UnboundedReceiver<MockExchangeRequest>,
45    pub event_tx: broadcast::Sender<UnindexedAccountEvent>,
46    pub instruments: FnvHashMap<InstrumentNameExchange, Instrument<ExchangeId, AssetNameExchange>>,
47    pub account: AccountState,
48    pub order_sequence: u64,
49    pub time_exchange_latest: DateTime<Utc>,
50}
51
52impl MockExchange {
53    pub fn new(
54        config: MockExecutionConfig,
55        request_rx: mpsc::UnboundedReceiver<MockExchangeRequest>,
56        event_tx: broadcast::Sender<UnindexedAccountEvent>,
57        instruments: FnvHashMap<InstrumentNameExchange, Instrument<ExchangeId, AssetNameExchange>>,
58    ) -> Self {
59        Self {
60            exchange: config.mocked_exchange,
61            latency_ms: config.latency_ms,
62            fees_percent: config.fees_percent,
63            request_rx,
64            event_tx,
65            instruments,
66            account: AccountState::from(config.initial_state),
67            order_sequence: 0,
68            time_exchange_latest: Default::default(),
69        }
70    }
71
72    pub async fn run(mut self) {
73        while let Some(request) = self.request_rx.recv().await {
74            self.update_time_exchange(request.time_request);
75
76            match request.kind {
77                MockExchangeRequestKind::FetchAccountSnapshot { response_tx } => {
78                    let snapshot = self.account_snapshot();
79                    self.respond_with_latency(response_tx, snapshot);
80                }
81                MockExchangeRequestKind::FetchBalances {
82                    response_tx,
83                    assets,
84                } => {
85                    let balances = self
86                        .account
87                        .balances()
88                        .filter(|balance| assets.contains(&balance.asset))
89                        .cloned()
90                        .collect();
91                    self.respond_with_latency(response_tx, balances);
92                }
93                MockExchangeRequestKind::FetchOrdersOpen {
94                    response_tx,
95                    instruments,
96                } => {
97                    let orders_open = self
98                        .account
99                        .orders_open()
100                        .filter(|order| instruments.contains(&order.key.instrument))
101                        .cloned()
102                        .collect();
103                    self.respond_with_latency(response_tx, orders_open);
104                }
105                MockExchangeRequestKind::FetchTrades {
106                    response_tx,
107                    time_since,
108                } => {
109                    let trades = self.account.trades(time_since).cloned().collect();
110                    self.respond_with_latency(response_tx, trades);
111                }
112                MockExchangeRequestKind::CancelOrder {
113                    response_tx: _,
114                    request,
115                } => {
116                    error!(
117                        exchange = %self.exchange,
118                        ?request,
119                        "MockExchange received cancel request but only Market orders are supported"
120                    );
121                }
122                MockExchangeRequestKind::OpenOrder {
123                    response_tx,
124                    request,
125                } => {
126                    let (response, notifications) = self.open_order(request);
127                    self.respond_with_latency(response_tx, response);
128
129                    if let Some(notifications) = notifications {
130                        self.account.ack_trade(notifications.trade.clone());
131                        self.send_notifications_with_latency(notifications);
132                    }
133                }
134            }
135        }
136
137        info!(exchange = %self.exchange, "MockExchange shutting down");
138    }
139
140    fn update_time_exchange(&mut self, time_request: DateTime<Utc>) {
141        let client_to_exchange_latency = self.latency_ms / 2;
142
143        self.time_exchange_latest = time_request
144            .checked_add_signed(TimeDelta::milliseconds(client_to_exchange_latency as i64))
145            .unwrap_or(time_request);
146
147        self.account.update_time_exchange(self.time_exchange_latest)
148    }
149
150    pub fn time_exchange(&self) -> DateTime<Utc> {
151        self.time_exchange_latest
152    }
153
154    pub fn account_snapshot(&self) -> UnindexedAccountSnapshot {
155        let balances = self.account.balances().cloned().collect();
156
157        let orders_open = self
158            .account
159            .orders_open()
160            .cloned()
161            .map(UnindexedOrder::from);
162
163        let orders_cancelled = self
164            .account
165            .orders_cancelled()
166            .cloned()
167            .map(UnindexedOrder::from);
168
169        let orders_all = orders_open.chain(orders_cancelled);
170        let orders_all = orders_all.sorted_unstable_by_key(|order| order.key.instrument.clone());
171        let orders_by_instrument = orders_all.chunk_by(|order| order.key.instrument.clone());
172
173        let instruments = orders_by_instrument
174            .into_iter()
175            .map(|(instrument, orders)| InstrumentAccountSnapshot {
176                instrument,
177                orders: orders.into_iter().collect(),
178            })
179            .collect();
180
181        UnindexedAccountSnapshot {
182            exchange: self.exchange,
183            balances,
184            instruments,
185        }
186    }
187
188    /// Sends the provided `Response` via the [`oneshot::Sender`] after waiting for the latency
189    /// [`Duration`].
190    ///
191    /// Used to simulate network latency between the exchange and client.
192    fn respond_with_latency<Response>(
193        &self,
194        response_tx: oneshot::Sender<Response>,
195        response: Response,
196    ) where
197        Response: Send + 'static,
198    {
199        let exchange = self.exchange;
200        let latency = std::time::Duration::from_millis(self.latency_ms);
201
202        tokio::spawn(async move {
203            tokio::time::sleep(latency).await;
204            if response_tx.send(response).is_err() {
205                error!(
206                    %exchange,
207                    kind = std::any::type_name::<Response>(),
208                    "MockExchange failed to send oneshot response to client"
209                );
210            }
211        });
212    }
213
214    /// Sends the provided `OpenOrderNotifications` via the `MockExchanges`
215    /// `broadcast::Sender<UnindexedAccountEvent>` after waiting for the latency
216    /// [`Duration`].
217    ///
218    /// Used to simulate network latency between the exchange and client.
219    fn send_notifications_with_latency(&self, notifications: OpenOrderNotifications) {
220        let balance = self.build_account_event(notifications.balance);
221        let trade = self.build_account_event(notifications.trade);
222
223        let exchange = self.exchange;
224        let latency = std::time::Duration::from_millis(self.latency_ms);
225        let tx = self.event_tx.clone();
226        tokio::spawn(async move {
227            tokio::time::sleep(latency).await;
228
229            if tx.send(balance).is_err() {
230                error!(
231                    %exchange,
232                    kind = "Snapshot<AssetBalance<AssetNameExchange>",
233                    "MockExchange failed to send AccountEvent notification to client"
234                );
235            }
236
237            if tx.send(trade).is_err() {
238                error!(
239                    %exchange,
240                    kind = "Trade<QuoteAsset, InstrumentNameExchange>",
241                    "MockExchange failed to send AccountEvent notification to client"
242                );
243            }
244        });
245    }
246
247    pub fn account_stream(&self) -> BoxStream<'static, UnindexedAccountEvent> {
248        futures::StreamExt::boxed(BroadcastStream::new(self.event_tx.subscribe()).map_while(
249            |result| match result {
250                Ok(event) => Some(event),
251                Err(error) => {
252                    error!(
253                        ?error,
254                        "MockExchange Broadcast AccountStream lagged - terminating"
255                    );
256                    None
257                }
258            },
259        ))
260    }
261
262    pub fn cancel_order(
263        &mut self,
264        _: OrderRequestCancel<ExchangeId, InstrumentNameExchange>,
265    ) -> Order<ExchangeId, InstrumentNameExchange, Result<Cancelled, UnindexedOrderError>> {
266        unimplemented!()
267    }
268
269    pub fn open_order(
270        &mut self,
271        request: OrderRequestOpen<ExchangeId, InstrumentNameExchange>,
272    ) -> (
273        Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>,
274        Option<OpenOrderNotifications>,
275    ) {
276        if let Err(error) = self.validate_order_kind_supported(request.state.kind) {
277            return (build_open_order_err_response(request, error), None);
278        }
279
280        let underlying = match self.find_instrument_data(&request.key.instrument) {
281            Ok(instrument) => instrument.underlying.clone(),
282            Err(error) => return (build_open_order_err_response(request, error), None),
283        };
284
285        let time_exchange = self.time_exchange();
286
287        let balance_change_result = match request.state.side {
288            Side::Buy => {
289                // Buying Instrument requires sufficient QuoteAsset Balance
290                let current = self
291                    .account
292                    .balance_mut(&underlying.quote)
293                    .expect("MockExchange has Balance for all configured Instrument assets");
294
295                // Currently we only supported MarketKind orders, so they should be identical
296                assert_eq!(current.balance.total, current.balance.free);
297
298                let order_value_quote = request.state.price * request.state.quantity.abs();
299                let order_fees_quote = order_value_quote * self.fees_percent;
300                let quote_required = order_value_quote + order_fees_quote;
301
302                let maybe_new_balance = current.balance.free - quote_required;
303
304                if maybe_new_balance >= Decimal::ZERO {
305                    current.balance.free = maybe_new_balance;
306                    current.balance.total = maybe_new_balance;
307                    current.time_exchange = time_exchange;
308
309                    Ok((current.clone(), AssetFees::quote_fees(order_fees_quote)))
310                } else {
311                    Err(ApiError::BalanceInsufficient(
312                        underlying.quote,
313                        format!(
314                            "Available Balance: {}, Required Balance inc. fees: {}",
315                            current.balance.free, quote_required
316                        ),
317                    ))
318                }
319            }
320            Side::Sell => {
321                // Selling Instrument requires sufficient BaseAsset Balance
322                let current = self
323                    .account
324                    .balance_mut(&underlying.quote)
325                    .expect("MockExchange has Balance for all configured Instrument assets");
326
327                // Currently we only supported MarketKind orders, so they should be identical
328                assert_eq!(current.balance.total, current.balance.free);
329
330                let order_value_base = request.state.quantity.abs();
331                let order_fees_base = order_value_base * self.fees_percent;
332                let base_required = order_value_base + order_fees_base;
333
334                let maybe_new_balance = current.balance.free - base_required;
335
336                if maybe_new_balance >= Decimal::ZERO {
337                    current.balance.free = maybe_new_balance;
338                    current.balance.total = maybe_new_balance;
339                    current.time_exchange = time_exchange;
340
341                    let fees_quote = order_fees_base * request.state.price;
342
343                    Ok((current.clone(), AssetFees::quote_fees(fees_quote)))
344                } else {
345                    Err(ApiError::BalanceInsufficient(
346                        underlying.quote,
347                        format!(
348                            "Available Balance: {}, Required Balance inc. fees: {}",
349                            current.balance.free, base_required
350                        ),
351                    ))
352                }
353            }
354        };
355
356        let (balance_snapshot, fees) = match balance_change_result {
357            Ok((balance_snapshot, fees)) => (Snapshot(balance_snapshot), fees),
358            Err(error) => return (build_open_order_err_response(request, error), None),
359        };
360
361        let order_id = self.order_id_sequence_fetch_add();
362        let trade_id = TradeId(order_id.0.clone());
363
364        let order_response = Order {
365            key: request.key.clone(),
366            side: request.state.side,
367            price: request.state.price,
368            quantity: request.state.quantity,
369            kind: request.state.kind,
370            time_in_force: request.state.time_in_force,
371            state: Ok(Open {
372                id: order_id.clone(),
373                time_exchange: self.time_exchange(),
374                filled_quantity: request.state.quantity,
375            }),
376        };
377
378        let notifications = OpenOrderNotifications {
379            balance: balance_snapshot,
380            trade: Trade {
381                id: trade_id,
382                order_id: order_id.clone(),
383                instrument: request.key.instrument,
384                strategy: request.key.strategy,
385                time_exchange: self.time_exchange(),
386                side: request.state.side,
387                price: request.state.price,
388                quantity: request.state.quantity,
389                fees,
390            },
391        };
392
393        (order_response, Some(notifications))
394    }
395
396    pub fn validate_order_kind_supported(
397        &self,
398        order_kind: OrderKind,
399    ) -> Result<(), UnindexedOrderError> {
400        if order_kind == OrderKind::Market {
401            Ok(())
402        } else {
403            Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
404                format!("MockExchange does not supported OrderKind: {order_kind}"),
405            )))
406        }
407    }
408
409    pub fn find_instrument_data(
410        &self,
411        instrument: &InstrumentNameExchange,
412    ) -> Result<&Instrument<ExchangeId, AssetNameExchange>, UnindexedApiError> {
413        self.instruments.get(instrument).ok_or_else(|| {
414            ApiError::InstrumentInvalid(
415                instrument.clone(),
416                format!("MockExchange is not set-up for managing: {instrument}"),
417            )
418        })
419    }
420
421    fn order_id_sequence_fetch_add(&mut self) -> OrderId {
422        let sequence = self.order_sequence;
423        self.order_sequence += 1;
424        OrderId::new(sequence.to_smolstr())
425    }
426
427    fn build_account_event<Kind>(&self, kind: Kind) -> UnindexedAccountEvent
428    where
429        Kind: Into<AccountEventKind<ExchangeId, AssetNameExchange, InstrumentNameExchange>>,
430    {
431        UnindexedAccountEvent {
432            exchange: self.exchange,
433            kind: kind.into(),
434        }
435    }
436}
437
438fn build_open_order_err_response<E>(
439    request: OrderRequestOpen<ExchangeId, InstrumentNameExchange>,
440    error: E,
441) -> Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>
442where
443    E: Into<UnindexedOrderError>,
444{
445    Order {
446        key: request.key,
447        side: request.state.side,
448        price: request.state.price,
449        quantity: request.state.quantity,
450        kind: request.state.kind,
451        time_in_force: request.state.time_in_force,
452        state: Err(error.into()),
453    }
454}
455
456#[derive(Debug)]
457pub struct OpenOrderNotifications {
458    pub balance: Snapshot<AssetBalance<AssetNameExchange>>,
459    pub trade: Trade<QuoteAsset, InstrumentNameExchange>,
460}