1use crate::{
2 AccountEventKind, InstrumentAccountSnapshot, UnindexedAccountEvent, UnindexedAccountSnapshot,
3 balance::AssetBalance,
4 client::mock::MockExecutionConfig,
5 error::{ApiError, UnindexedApiError, UnindexedOrderError},
6 exchange::mock::{
7 account::AccountState,
8 request::{MockExchangeRequest, MockExchangeRequestKind},
9 },
10 order::{
11 Order, OrderKind, UnindexedOrder,
12 id::OrderId,
13 request::{OrderRequestCancel, OrderRequestOpen},
14 state::{Cancelled, Open},
15 },
16 trade::{AssetFees, Trade, TradeId},
17};
18use barter_instrument::{
19 Side,
20 asset::{QuoteAsset, name::AssetNameExchange},
21 exchange::ExchangeId,
22 instrument::{Instrument, name::InstrumentNameExchange},
23};
24use barter_integration::collection::snapshot::Snapshot;
25use chrono::{DateTime, TimeDelta, Utc};
26use fnv::FnvHashMap;
27use futures::stream::BoxStream;
28use itertools::Itertools;
29use rust_decimal::Decimal;
30use smol_str::ToSmolStr;
31use std::fmt::Debug;
32use tokio::sync::{broadcast, mpsc, oneshot};
33use tokio_stream::{StreamExt, wrappers::BroadcastStream};
34use tracing::{error, info};
35
36pub mod account;
37pub mod request;
38
39#[derive(Debug)]
40pub struct MockExchange {
41 pub exchange: ExchangeId,
42 pub latency_ms: u64,
43 pub fees_percent: Decimal,
44 pub request_rx: mpsc::UnboundedReceiver<MockExchangeRequest>,
45 pub event_tx: broadcast::Sender<UnindexedAccountEvent>,
46 pub instruments: FnvHashMap<InstrumentNameExchange, Instrument<ExchangeId, AssetNameExchange>>,
47 pub account: AccountState,
48 pub order_sequence: u64,
49 pub time_exchange_latest: DateTime<Utc>,
50}
51
52impl MockExchange {
53 pub fn new(
54 config: MockExecutionConfig,
55 request_rx: mpsc::UnboundedReceiver<MockExchangeRequest>,
56 event_tx: broadcast::Sender<UnindexedAccountEvent>,
57 instruments: FnvHashMap<InstrumentNameExchange, Instrument<ExchangeId, AssetNameExchange>>,
58 ) -> Self {
59 Self {
60 exchange: config.mocked_exchange,
61 latency_ms: config.latency_ms,
62 fees_percent: config.fees_percent,
63 request_rx,
64 event_tx,
65 instruments,
66 account: AccountState::from(config.initial_state),
67 order_sequence: 0,
68 time_exchange_latest: Default::default(),
69 }
70 }
71
72 pub async fn run(mut self) {
73 while let Some(request) = self.request_rx.recv().await {
74 self.update_time_exchange(request.time_request);
75
76 match request.kind {
77 MockExchangeRequestKind::FetchAccountSnapshot { response_tx } => {
78 let snapshot = self.account_snapshot();
79 self.respond_with_latency(response_tx, snapshot);
80 }
81 MockExchangeRequestKind::FetchBalances {
82 response_tx,
83 assets,
84 } => {
85 let balances = self
86 .account
87 .balances()
88 .filter(|balance| assets.contains(&balance.asset))
89 .cloned()
90 .collect();
91 self.respond_with_latency(response_tx, balances);
92 }
93 MockExchangeRequestKind::FetchOrdersOpen {
94 response_tx,
95 instruments,
96 } => {
97 let orders_open = self
98 .account
99 .orders_open()
100 .filter(|order| instruments.contains(&order.key.instrument))
101 .cloned()
102 .collect();
103 self.respond_with_latency(response_tx, orders_open);
104 }
105 MockExchangeRequestKind::FetchTrades {
106 response_tx,
107 time_since,
108 } => {
109 let trades = self.account.trades(time_since).cloned().collect();
110 self.respond_with_latency(response_tx, trades);
111 }
112 MockExchangeRequestKind::CancelOrder {
113 response_tx: _,
114 request,
115 } => {
116 error!(
117 exchange = %self.exchange,
118 ?request,
119 "MockExchange received cancel request but only Market orders are supported"
120 );
121 }
122 MockExchangeRequestKind::OpenOrder {
123 response_tx,
124 request,
125 } => {
126 let (response, notifications) = self.open_order(request);
127 self.respond_with_latency(response_tx, response);
128
129 if let Some(notifications) = notifications {
130 self.account.ack_trade(notifications.trade.clone());
131 self.send_notifications_with_latency(notifications);
132 }
133 }
134 }
135 }
136
137 info!(exchange = %self.exchange, "MockExchange shutting down");
138 }
139
140 fn update_time_exchange(&mut self, time_request: DateTime<Utc>) {
141 let client_to_exchange_latency = self.latency_ms / 2;
142
143 self.time_exchange_latest = time_request
144 .checked_add_signed(TimeDelta::milliseconds(client_to_exchange_latency as i64))
145 .unwrap_or(time_request);
146
147 self.account.update_time_exchange(self.time_exchange_latest)
148 }
149
150 pub fn time_exchange(&self) -> DateTime<Utc> {
151 self.time_exchange_latest
152 }
153
154 pub fn account_snapshot(&self) -> UnindexedAccountSnapshot {
155 let balances = self.account.balances().cloned().collect();
156
157 let orders_open = self
158 .account
159 .orders_open()
160 .cloned()
161 .map(UnindexedOrder::from);
162
163 let orders_cancelled = self
164 .account
165 .orders_cancelled()
166 .cloned()
167 .map(UnindexedOrder::from);
168
169 let orders_all = orders_open.chain(orders_cancelled);
170 let orders_all = orders_all.sorted_unstable_by_key(|order| order.key.instrument.clone());
171 let orders_by_instrument = orders_all.chunk_by(|order| order.key.instrument.clone());
172
173 let instruments = orders_by_instrument
174 .into_iter()
175 .map(|(instrument, orders)| InstrumentAccountSnapshot {
176 instrument,
177 orders: orders.into_iter().collect(),
178 })
179 .collect();
180
181 UnindexedAccountSnapshot {
182 exchange: self.exchange,
183 balances,
184 instruments,
185 }
186 }
187
188 fn respond_with_latency<Response>(
193 &self,
194 response_tx: oneshot::Sender<Response>,
195 response: Response,
196 ) where
197 Response: Send + 'static,
198 {
199 let exchange = self.exchange;
200 let latency = std::time::Duration::from_millis(self.latency_ms);
201
202 tokio::spawn(async move {
203 tokio::time::sleep(latency).await;
204 if response_tx.send(response).is_err() {
205 error!(
206 %exchange,
207 kind = std::any::type_name::<Response>(),
208 "MockExchange failed to send oneshot response to client"
209 );
210 }
211 });
212 }
213
214 fn send_notifications_with_latency(&self, notifications: OpenOrderNotifications) {
220 let balance = self.build_account_event(notifications.balance);
221 let trade = self.build_account_event(notifications.trade);
222
223 let exchange = self.exchange;
224 let latency = std::time::Duration::from_millis(self.latency_ms);
225 let tx = self.event_tx.clone();
226 tokio::spawn(async move {
227 tokio::time::sleep(latency).await;
228
229 if tx.send(balance).is_err() {
230 error!(
231 %exchange,
232 kind = "Snapshot<AssetBalance<AssetNameExchange>",
233 "MockExchange failed to send AccountEvent notification to client"
234 );
235 }
236
237 if tx.send(trade).is_err() {
238 error!(
239 %exchange,
240 kind = "Trade<QuoteAsset, InstrumentNameExchange>",
241 "MockExchange failed to send AccountEvent notification to client"
242 );
243 }
244 });
245 }
246
247 pub fn account_stream(&self) -> BoxStream<'static, UnindexedAccountEvent> {
248 futures::StreamExt::boxed(BroadcastStream::new(self.event_tx.subscribe()).map_while(
249 |result| match result {
250 Ok(event) => Some(event),
251 Err(error) => {
252 error!(
253 ?error,
254 "MockExchange Broadcast AccountStream lagged - terminating"
255 );
256 None
257 }
258 },
259 ))
260 }
261
262 pub fn cancel_order(
263 &mut self,
264 _: OrderRequestCancel<ExchangeId, InstrumentNameExchange>,
265 ) -> Order<ExchangeId, InstrumentNameExchange, Result<Cancelled, UnindexedOrderError>> {
266 unimplemented!()
267 }
268
269 pub fn open_order(
270 &mut self,
271 request: OrderRequestOpen<ExchangeId, InstrumentNameExchange>,
272 ) -> (
273 Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>,
274 Option<OpenOrderNotifications>,
275 ) {
276 if let Err(error) = self.validate_order_kind_supported(request.state.kind) {
277 return (build_open_order_err_response(request, error), None);
278 }
279
280 let underlying = match self.find_instrument_data(&request.key.instrument) {
281 Ok(instrument) => instrument.underlying.clone(),
282 Err(error) => return (build_open_order_err_response(request, error), None),
283 };
284
285 let time_exchange = self.time_exchange();
286
287 let balance_change_result = match request.state.side {
288 Side::Buy => {
289 let current = self
291 .account
292 .balance_mut(&underlying.quote)
293 .expect("MockExchange has Balance for all configured Instrument assets");
294
295 assert_eq!(current.balance.total, current.balance.free);
297
298 let order_value_quote = request.state.price * request.state.quantity.abs();
299 let order_fees_quote = order_value_quote * self.fees_percent;
300 let quote_required = order_value_quote + order_fees_quote;
301
302 let maybe_new_balance = current.balance.free - quote_required;
303
304 if maybe_new_balance >= Decimal::ZERO {
305 current.balance.free = maybe_new_balance;
306 current.balance.total = maybe_new_balance;
307 current.time_exchange = time_exchange;
308
309 Ok((current.clone(), AssetFees::quote_fees(order_fees_quote)))
310 } else {
311 Err(ApiError::BalanceInsufficient(
312 underlying.quote,
313 format!(
314 "Available Balance: {}, Required Balance inc. fees: {}",
315 current.balance.free, quote_required
316 ),
317 ))
318 }
319 }
320 Side::Sell => {
321 let current = self
323 .account
324 .balance_mut(&underlying.quote)
325 .expect("MockExchange has Balance for all configured Instrument assets");
326
327 assert_eq!(current.balance.total, current.balance.free);
329
330 let order_value_base = request.state.quantity.abs();
331 let order_fees_base = order_value_base * self.fees_percent;
332 let base_required = order_value_base + order_fees_base;
333
334 let maybe_new_balance = current.balance.free - base_required;
335
336 if maybe_new_balance >= Decimal::ZERO {
337 current.balance.free = maybe_new_balance;
338 current.balance.total = maybe_new_balance;
339 current.time_exchange = time_exchange;
340
341 let fees_quote = order_fees_base * request.state.price;
342
343 Ok((current.clone(), AssetFees::quote_fees(fees_quote)))
344 } else {
345 Err(ApiError::BalanceInsufficient(
346 underlying.quote,
347 format!(
348 "Available Balance: {}, Required Balance inc. fees: {}",
349 current.balance.free, base_required
350 ),
351 ))
352 }
353 }
354 };
355
356 let (balance_snapshot, fees) = match balance_change_result {
357 Ok((balance_snapshot, fees)) => (Snapshot(balance_snapshot), fees),
358 Err(error) => return (build_open_order_err_response(request, error), None),
359 };
360
361 let order_id = self.order_id_sequence_fetch_add();
362 let trade_id = TradeId(order_id.0.clone());
363
364 let order_response = Order {
365 key: request.key.clone(),
366 side: request.state.side,
367 price: request.state.price,
368 quantity: request.state.quantity,
369 kind: request.state.kind,
370 time_in_force: request.state.time_in_force,
371 state: Ok(Open {
372 id: order_id.clone(),
373 time_exchange: self.time_exchange(),
374 filled_quantity: request.state.quantity,
375 }),
376 };
377
378 let notifications = OpenOrderNotifications {
379 balance: balance_snapshot,
380 trade: Trade {
381 id: trade_id,
382 order_id: order_id.clone(),
383 instrument: request.key.instrument,
384 strategy: request.key.strategy,
385 time_exchange: self.time_exchange(),
386 side: request.state.side,
387 price: request.state.price,
388 quantity: request.state.quantity,
389 fees,
390 },
391 };
392
393 (order_response, Some(notifications))
394 }
395
396 pub fn validate_order_kind_supported(
397 &self,
398 order_kind: OrderKind,
399 ) -> Result<(), UnindexedOrderError> {
400 if order_kind == OrderKind::Market {
401 Ok(())
402 } else {
403 Err(UnindexedOrderError::Rejected(ApiError::OrderRejected(
404 format!("MockExchange does not supported OrderKind: {order_kind}"),
405 )))
406 }
407 }
408
409 pub fn find_instrument_data(
410 &self,
411 instrument: &InstrumentNameExchange,
412 ) -> Result<&Instrument<ExchangeId, AssetNameExchange>, UnindexedApiError> {
413 self.instruments.get(instrument).ok_or_else(|| {
414 ApiError::InstrumentInvalid(
415 instrument.clone(),
416 format!("MockExchange is not set-up for managing: {instrument}"),
417 )
418 })
419 }
420
421 fn order_id_sequence_fetch_add(&mut self) -> OrderId {
422 let sequence = self.order_sequence;
423 self.order_sequence += 1;
424 OrderId::new(sequence.to_smolstr())
425 }
426
427 fn build_account_event<Kind>(&self, kind: Kind) -> UnindexedAccountEvent
428 where
429 Kind: Into<AccountEventKind<ExchangeId, AssetNameExchange, InstrumentNameExchange>>,
430 {
431 UnindexedAccountEvent {
432 exchange: self.exchange,
433 kind: kind.into(),
434 }
435 }
436}
437
438fn build_open_order_err_response<E>(
439 request: OrderRequestOpen<ExchangeId, InstrumentNameExchange>,
440 error: E,
441) -> Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>>
442where
443 E: Into<UnindexedOrderError>,
444{
445 Order {
446 key: request.key,
447 side: request.state.side,
448 price: request.state.price,
449 quantity: request.state.quantity,
450 kind: request.state.kind,
451 time_in_force: request.state.time_in_force,
452 state: Err(error.into()),
453 }
454}
455
456#[derive(Debug)]
457pub struct OpenOrderNotifications {
458 pub balance: Snapshot<AssetBalance<AssetNameExchange>>,
459 pub trade: Trade<QuoteAsset, InstrumentNameExchange>,
460}