1use crate::{
2 AccountEventKind, InstrumentAccountSnapshot, UnindexedAccountEvent, UnindexedAccountSnapshot,
3 balance::AssetBalance,
4 client::mock::MockExecutionConfig,
5 error::{ApiError, UnindexedApiError, UnindexedOrderError},
6 exchange::mock::{
7 account::AccountState,
8 request::{MockExchangeRequest, MockExchangeRequestKind},
9 },
10 order::{
11 Order, OrderKind, UnindexedOrder,
12 id::OrderId,
13 request::{OrderRequestCancel, OrderRequestOpen},
14 state::{Cancelled, Open},
15 },
16 trade::{AssetFees, Trade, TradeId},
17};
18use barter_instrument::{
19 Side,
20 asset::{QuoteAsset, name::AssetNameExchange},
21 exchange::ExchangeId,
22 instrument::{Instrument, name::InstrumentNameExchange},
23};
24use barter_integration::snapshot::Snapshot;
25use chrono::{DateTime, TimeDelta, Utc};
26use fnv::FnvHashMap;
27use futures::stream::BoxStream;
28use itertools::Itertools;
29use rust_decimal::Decimal;
30use smol_str::ToSmolStr;
31use std::fmt::Debug;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_stream::{StreamExt, wrappers::BroadcastStream};
34use tracing::{error, info};
35
36pub mod account;
37pub mod request;
38
39#[derive(Debug)]
40pub struct MockExchange {
41 pub exchange: ExchangeId,
42 pub latency_ms: u64,
43 pub fees_percent: Decimal,
44 pub request_rx: mpsc::UnboundedReceiver<MockExchangeRequest>,
45 pub event_tx: broadcast::Sender<UnindexedAccountEvent>,
46 pub instruments: FnvHashMap<InstrumentNameExchange, Instrument<ExchangeId, AssetNameExchange>>,
47 pub account: AccountState,
48 pub order_sequence: u64,
49 pub time_exchange_latest: DateTime<Utc>,
50}
51
52impl MockExchange {
53 pub fn new(
54 config: MockExecutionConfig,
55 request_rx: mpsc::UnboundedReceiver<MockExchangeRequest>,
56 event_tx: broadcast::Sender<UnindexedAccountEvent>,
57 instruments: FnvHashMap<InstrumentNameExchange, Instrument<ExchangeId, AssetNameExchange>>,
58 ) -> Self {
59 Self {
60 exchange: config.mocked_exchange,
61 latency_ms: config.latency_ms,
62 fees_percent: config.fees_percent,
63 request_rx,
64 event_tx,
65 instruments,
66 account: AccountState::from(config.initial_state),
67 order_sequence: 0,
68 time_exchange_latest: Default::default(),
69 }
70 }
71
72 pub async fn run(mut self) {
73 while let Some(request) = self.request_rx.recv().await {
74 self.update_time_exchange(request.time_request);
75
76 match request.kind {
77 MockExchangeRequestKind::FetchAccountSnapshot { response_tx } => {
78 let snapshot = self.account_snapshot();
79 self.respond_with_latency(response_tx, snapshot);
80 }
81 MockExchangeRequestKind::FetchBalances { response_tx } => {
82 let balances = self.account.balances().cloned().collect();
83 self.respond_with_latency(response_tx, balances);
84 }
85 MockExchangeRequestKind::FetchOrdersOpen { response_tx } => {
86 let orders_open = self.account.orders_open().cloned().collect();
87 self.respond_with_latency(response_tx, orders_open);
88 }
89 MockExchangeRequestKind::FetchTrades {
90 response_tx,
91 time_since,
92 } => {
93 let trades = self.account.trades(time_since).cloned().collect();
94 self.respond_with_latency(response_tx, trades);
95 }
96 MockExchangeRequestKind::CancelOrder {
97 response_tx: _,
98 request,
99 } => {
100 error!(
101 exchange = %self.exchange,
102 ?request,
103 "MockExchange received cancel request but only Market orders are supported"
104 );
105 }
106 MockExchangeRequestKind::OpenOrder {
107 response_tx,
108 request,
109 } => {
110 let (response, notifications) = self.open_order(request);
111 self.respond_with_latency(response_tx, response);
112
113 if let Some(notifications) = notifications {
114 self.account.ack_trade(notifications.trade.clone());
115 self.send_notifications_with_latency(notifications);
116 }
117 }
118 }
119 }
120
121 info!(exchange = %self.exchange, "MockExchange shutting down");
122 }
123
124 fn update_time_exchange(&mut self, time_request: DateTime<Utc>) {
125 let client_to_exchange_latency = self.latency_ms / 2;
126
127 self.time_exchange_latest = time_request
128 .checked_add_signed(TimeDelta::milliseconds(client_to_exchange_latency as i64))
129 .unwrap_or(time_request);
130
131 self.account.update_time_exchange(self.time_exchange_latest)
132 }
133
134 pub fn time_exchange(&self) -> DateTime<Utc> {
135 self.time_exchange_latest
136 }
137
138 pub fn account_snapshot(&self) -> UnindexedAccountSnapshot {
139 let balances = self.account.balances().cloned().collect();
140
141 let orders_open = self
142 .account
143 .orders_open()
144 .cloned()
145 .map(UnindexedOrder::from);
146
147 let orders_cancelled = self
148 .account
149 .orders_cancelled()
150 .cloned()
151 .map(UnindexedOrder::from);
152
153 let orders_all = orders_open.chain(orders_cancelled);
154 let orders_all = orders_all.sorted_unstable_by_key(|order| order.key.instrument.clone());
155 let orders_by_instrument = orders_all.chunk_by(|order| order.key.instrument.clone());
156
157 let instruments = orders_by_instrument
158 .into_iter()
159 .map(|(instrument, orders)| InstrumentAccountSnapshot {
160 instrument,
161 orders: orders.into_iter().collect(),
162 })
163 .collect();
164
165 UnindexedAccountSnapshot {
166 exchange: self.exchange,
167 balances,
168 instruments,
169 }
170 }
171
172 fn respond_with_latency<Response>(
177 &self,
178 response_tx: oneshot::Sender<Response>,
179 response: Response,
180 ) where
181 Response: Send + 'static,
182 {
183 let exchange = self.exchange;
184 let latency = std::time::Duration::from_millis(self.latency_ms);
185
186 tokio::spawn(async move {
187 tokio::time::sleep(latency).await;
188 if response_tx.send(response).is_err() {
189 error!(
190 %exchange,
191 kind = std::any::type_name::<Response>(),
192 "MockExchange failed to send oneshot response to client"
193 );
194 }
195 });
196 }
197
198 fn send_notifications_with_latency(&self, notifications: OpenOrderNotifications) {
204 let balance = self.build_account_event(notifications.balance);
205 let trade = self.build_account_event(notifications.trade);
206
207 let exchange = self.exchange;
208 let latency = std::time::Duration::from_millis(self.latency_ms);
209 let tx = self.event_tx.clone();
210 tokio::spawn(async move {
211 tokio::time::sleep(latency).await;
212
213 if tx.send(balance).is_err() {
214 error!(
215 %exchange,
216 kind = "Snapshot<AssetBalance<AssetNameExchange>",
217 "MockExchange failed to send AccountEvent notification to client"
218 );
219 }
220
221 if tx.send(trade).is_err() {
222 error!(
223 %exchange,
224 kind = "Trade<QuoteAsset, InstrumentNameExchange>",
225 "MockExchange failed to send AccountEvent notification to client"
226 );
227 }
228 });
229 }
230
231 pub fn account_stream(&self) -> BoxStream<'static, UnindexedAccountEvent> {
232 futures::StreamExt::boxed(BroadcastStream::new(self.event_tx.subscribe()).map_while(
233 |result| match result {
234 Ok(event) => Some(event),
235 Err(error) => {
236 error!(
237 ?error,
238 "MockExchange Broadcast AccountStream lagged - terminating"
239 );
240 None
241 }
242 },
243 ))
244 }
245
246 pub fn cancel_order(
247 &mut self,
248 _: OrderRequestCancel<ExchangeId, InstrumentNameExchange>,
249 ) -> Order<ExchangeId, InstrumentNameExchange, Result<Cancelled, UnindexedOrderError>> {
250 unimplemented!()
251 }
252
253 pub fn open_order(
254 &mut self,
255 request: OrderRequestOpen<ExchangeId, InstrumentNameExchange>,
256 ) -> (
257 Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>,
258 Option<OpenOrderNotifications>,
259 ) {
260 if let Err(error) = self.validate_order_kind_supported(request.state.kind) {
261 return (build_open_order_err_response(request, error), None);
262 }
263
264 let underlying = match self.find_instrument_data(&request.key.instrument) {
265 Ok(instrument) => instrument.underlying.clone(),
266 Err(error) => return (build_open_order_err_response(request, error), None),
267 };
268
269 let time_exchange = self.time_exchange();
270
271 let balance_change_result = match request.state.side {
272 Side::Buy => {
273 let current = self
275 .account
276 .balance_mut(&underlying.quote)
277 .expect("MockExchange has Balance for all configured Instrument assets");
278
279 assert_eq!(current.balance.total, current.balance.free);
281
282 let order_value_quote = request.state.price * request.state.quantity.abs();
283 let order_fees_quote = order_value_quote * self.fees_percent;
284 let quote_required = order_value_quote + order_fees_quote;
285
286 let maybe_new_balance = current.balance.free - quote_required;
287
288 if maybe_new_balance >= Decimal::ZERO {
289 current.balance.free = maybe_new_balance;
290 current.balance.total = maybe_new_balance;
291 current.time_exchange = time_exchange;
292
293 Ok((current.clone(), AssetFees::quote_fees(order_fees_quote)))
294 } else {
295 Err(ApiError::BalanceInsufficient(
296 underlying.quote,
297 format!(
298 "Available Balance: {}, Required Balance inc. fees: {}",
299 current.balance.free, quote_required
300 ),
301 ))
302 }
303 }
304 Side::Sell => {
305 let current = self
307 .account
308 .balance_mut(&underlying.quote)
309 .expect("MockExchange has Balance for all configured Instrument assets");
310
311 assert_eq!(current.balance.total, current.balance.free);
313
314 let order_value_base = request.state.quantity.abs();
315 let order_fees_base = order_value_base * self.fees_percent;
316 let base_required = order_value_base + order_fees_base;
317
318 let maybe_new_balance = current.balance.free - base_required;
319
320 if maybe_new_balance >= Decimal::ZERO {
321 current.balance.free = maybe_new_balance;
322 current.balance.total = maybe_new_balance;
323 current.time_exchange = time_exchange;
324
325 let fees_quote = order_fees_base * request.state.price;
326
327 Ok((current.clone(), AssetFees::quote_fees(fees_quote)))
328 } else {
329 Err(ApiError::BalanceInsufficient(
330 underlying.quote,
331 format!(
332 "Available Balance: {}, Required Balance inc. fees: {}",
333 current.balance.free, base_required
334 ),
335 ))
336 }
337 }
338 };
339
340 let (balance_snapshot, fees) = match balance_change_result {
341 Ok((balance_snapshot, fees)) => (Snapshot(balance_snapshot), fees),
342 Err(error) => return (build_open_order_err_response(request, error), None),
343 };
344
345 let order_id = self.order_id_sequence_fetch_add();
346 let trade_id = TradeId(order_id.0.clone());
347
348 let order_response = Order {
349 key: request.key.clone(),
350 side: request.state.side,
351 price: request.state.price,
352 quantity: request.state.quantity,
353 kind: request.state.kind,
354 time_in_force: request.state.time_in_force,
355 state: Ok(Open {
356 id: order_id.clone(),
357 time_exchange: self.time_exchange(),
358 filled_quantity: request.state.quantity,
359 }),
360 };
361
362 let notifications = OpenOrderNotifications {
363 balance: balance_snapshot,
364 trade: Trade {
365 id: trade_id,
366 order_id: order_id.clone(),
367 instrument: request.key.instrument,
368 strategy: request.key.strategy,
369 time_exchange: self.time_exchange(),
370 side: request.state.side,
371 price: request.state.price,
372 quantity: request.state.quantity,
373 fees,
374 },
375 };
376
377 (order_response, Some(notifications))
378 }
379
380 pub fn validate_order_kind_supported(
381 &self,
382 order_kind: OrderKind,
383 ) -> Result<(), UnindexedOrderError> {
384 if order_kind == OrderKind::Market {
385 Ok(())
386 } else {
387 Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
388 format!("MockExchange does not supported OrderKind: {order_kind}"),
389 )))
390 }
391 }
392
393 pub fn find_instrument_data(
394 &self,
395 instrument: &InstrumentNameExchange,
396 ) -> Result<&Instrument<ExchangeId, AssetNameExchange>, UnindexedApiError> {
397 self.instruments.get(instrument).ok_or_else(|| {
398 ApiError::InstrumentInvalid(
399 instrument.clone(),
400 format!("MockExchange is not set-up for managing: {}", instrument),
401 )
402 })
403 }
404
405 fn order_id_sequence_fetch_add(&mut self) -> OrderId {
406 let sequence = self.order_sequence;
407 self.order_sequence += 1;
408 OrderId::new(sequence.to_smolstr())
409 }
410
411 fn build_account_event<Kind>(&self, kind: Kind) -> UnindexedAccountEvent
412 where
413 Kind: Into<AccountEventKind<ExchangeId, AssetNameExchange, InstrumentNameExchange>>,
414 {
415 UnindexedAccountEvent {
416 exchange: self.exchange,
417 kind: kind.into(),
418 }
419 }
420}
421
422fn build_open_order_err_response<E>(
423 request: OrderRequestOpen<ExchangeId, InstrumentNameExchange>,
424 error: E,
425) -> Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>
426where
427 E: Into<UnindexedOrderError>,
428{
429 Order {
430 key: request.key,
431 side: request.state.side,
432 price: request.state.price,
433 quantity: request.state.quantity,
434 kind: request.state.kind,
435 time_in_force: request.state.time_in_force,
436 state: Err(error.into()),
437 }
438}
439
440#[derive(Debug)]
441pub struct OpenOrderNotifications {
442 pub balance: Snapshot<AssetBalance<AssetNameExchange>>,
443 pub trade: Trade<QuoteAsset, InstrumentNameExchange>,
444}