Skip to main content

rustrade_execution/client/mock/
mod.rs

1use crate::{
2    UnindexedAccountEvent, UnindexedAccountSnapshot,
3    balance::AssetBalance,
4    client::ExecutionClient,
5    error::{ConnectivityError, OrderError, UnindexedClientError, UnindexedOrderError},
6    exchange::mock::request::{MarketPrices, MockExchangeRequest},
7    fee::FeeModelConfig,
8    fill::SimFillConfig,
9    order::{
10        Order, OrderEvent, OrderKey,
11        request::{OrderRequestCancel, OrderRequestOpen, UnindexedOrderResponseCancel},
12        state::{Open, OrderState, UnindexedOrderState},
13    },
14    trade::Trade,
15};
16use chrono::{DateTime, Utc};
17use derive_more::Constructor;
18use futures::stream::BoxStream;
19use rustrade_instrument::{
20    asset::name::AssetNameExchange, exchange::ExchangeId, instrument::name::InstrumentNameExchange,
21};
22use serde::{Deserialize, Serialize};
23use tokio::sync::{broadcast, mpsc, oneshot};
24use tokio_stream::{StreamExt, wrappers::BroadcastStream};
25use tracing::error;
26
27#[derive(
28    Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
29)]
30pub struct MockExecutionConfig {
31    pub mocked_exchange: ExchangeId,
32    pub initial_state: UnindexedAccountSnapshot,
33    pub latency_ms: u64,
34    /// Fee model used by the mock exchange to compute trading fees.
35    ///
36    /// Defaults to [`FeeModelConfig::Zero`]. Use [`FeeModelConfig::Percentage`]
37    /// for spot/futures simulation (e.g. 0.1% taker fee).
38    #[serde(default)]
39    pub fee_model: FeeModelConfig,
40    /// Fill model used by the mock exchange to compute execution prices.
41    ///
42    /// Defaults to [`SimFillConfig::LastPrice`], which fills at the
43    /// order price (identical to pre-FillModel behaviour). Switch to
44    /// [`SimFillConfig::BidAsk`] or [`SimFillConfig::Midpoint`] for
45    /// more realistic spread-cost simulation when market prices are injected
46    /// alongside orders.
47    #[serde(default)]
48    pub fill_model: SimFillConfig,
49}
50
51#[derive(Debug, Constructor)]
52pub struct MockExecutionClientConfig<FnTime> {
53    pub mocked_exchange: ExchangeId,
54    pub clock: FnTime,
55    pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
56    pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
57}
58
59impl<FnTime> Clone for MockExecutionClientConfig<FnTime>
60where
61    FnTime: Clone,
62{
63    fn clone(&self) -> Self {
64        Self {
65            mocked_exchange: self.mocked_exchange,
66            clock: self.clock.clone(),
67            request_tx: self.request_tx.clone(),
68            event_rx: self.event_rx.resubscribe(),
69        }
70    }
71}
72
73#[derive(Debug, Constructor)]
74pub struct MockExecution<FnTime> {
75    pub mocked_exchange: ExchangeId,
76    pub clock: FnTime,
77    pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
78    pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
79}
80
81impl<FnTime> Clone for MockExecution<FnTime>
82where
83    FnTime: Clone,
84{
85    fn clone(&self) -> Self {
86        Self {
87            mocked_exchange: self.mocked_exchange,
88            clock: self.clock.clone(),
89            request_tx: self.request_tx.clone(),
90            event_rx: self.event_rx.resubscribe(),
91        }
92    }
93}
94
95impl<FnTime> MockExecution<FnTime>
96where
97    FnTime: Fn() -> DateTime<Utc>,
98{
99    pub fn time_request(&self) -> DateTime<Utc> {
100        (self.clock)()
101    }
102}
103
104impl<FnTime> ExecutionClient for MockExecution<FnTime>
105where
106    FnTime: Fn() -> DateTime<Utc> + Clone + Send + Sync,
107{
108    const EXCHANGE: ExchangeId = ExchangeId::Mock;
109    type Config = MockExecutionClientConfig<FnTime>;
110    type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
111
112    fn new(config: Self::Config) -> Self {
113        Self {
114            mocked_exchange: config.mocked_exchange,
115            clock: config.clock,
116            request_tx: config.request_tx,
117            event_rx: config.event_rx,
118        }
119    }
120
121    async fn account_snapshot(
122        &self,
123        _: &[AssetNameExchange],
124        _: &[InstrumentNameExchange],
125    ) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
126        let (response_tx, response_rx) = oneshot::channel();
127
128        self.request_tx
129            .send(MockExchangeRequest::fetch_account_snapshot(
130                self.time_request(),
131                response_tx,
132            ))
133            .map_err(|_| {
134                UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
135                    self.mocked_exchange,
136                ))
137            })?;
138
139        response_rx.await.map_err(|_| {
140            UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
141                self.mocked_exchange,
142            ))
143        })
144    }
145
146    async fn account_stream(
147        &self,
148        _: &[AssetNameExchange],
149        _: &[InstrumentNameExchange],
150    ) -> Result<Self::AccountStream, UnindexedClientError> {
151        Ok(futures::StreamExt::boxed(
152            BroadcastStream::new(self.event_rx.resubscribe()).map_while(|result| match result {
153                Ok(event) => Some(event),
154                Err(error) => {
155                    error!(
156                        ?error,
157                        "MockExchange Broadcast AccountStream lagged - terminating"
158                    );
159                    None
160                }
161            }),
162        ))
163    }
164
165    async fn cancel_order(
166        &self,
167        request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
168    ) -> Option<UnindexedOrderResponseCancel> {
169        let (response_tx, response_rx) = oneshot::channel();
170
171        let key = OrderKey {
172            exchange: request.key.exchange,
173            instrument: request.key.instrument.clone(),
174            strategy: request.key.strategy.clone(),
175            cid: request.key.cid.clone(),
176        };
177
178        if self
179            .request_tx
180            .send(MockExchangeRequest::cancel_order(
181                self.time_request(),
182                response_tx,
183                into_owned_request(request),
184            ))
185            .is_err()
186        {
187            return Some(UnindexedOrderResponseCancel {
188                key,
189                state: Err(UnindexedOrderError::Connectivity(
190                    ConnectivityError::ExchangeOffline(self.mocked_exchange),
191                )),
192            });
193        }
194
195        Some(match response_rx.await {
196            Ok(response) => response,
197            Err(_) => UnindexedOrderResponseCancel {
198                key,
199                state: Err(UnindexedOrderError::Connectivity(
200                    ConnectivityError::ExchangeOffline(self.mocked_exchange),
201                )),
202            },
203        })
204    }
205
206    async fn open_order(
207        &self,
208        request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
209    ) -> Option<Order<ExchangeId, InstrumentNameExchange, UnindexedOrderState>> {
210        let (response_tx, response_rx) = oneshot::channel();
211
212        let request = into_owned_request(request);
213
214        if self
215            .request_tx
216            .send(MockExchangeRequest::open_order(
217                self.time_request(),
218                response_tx,
219                request.clone(),
220                MarketPrices::default(), // no market-data subscription; FillModel uses last_price=Some(request.state.price) as fallback, so fill equals request price
221            ))
222            .is_err()
223        {
224            return Some(Order {
225                key: request.key,
226                side: request.state.side,
227                price: request.state.price,
228                quantity: request.state.quantity,
229                kind: request.state.kind,
230                time_in_force: request.state.time_in_force,
231                state: OrderState::inactive(OrderError::Connectivity(
232                    ConnectivityError::ExchangeOffline(self.mocked_exchange),
233                )),
234            });
235        }
236
237        Some(match response_rx.await {
238            Ok(response) => response,
239            Err(_) => Order {
240                key: request.key,
241                side: request.state.side,
242                price: request.state.price,
243                quantity: request.state.quantity,
244                kind: request.state.kind,
245                time_in_force: request.state.time_in_force,
246                state: OrderState::inactive(OrderError::Connectivity(
247                    ConnectivityError::ExchangeOffline(self.mocked_exchange),
248                )),
249            },
250        })
251    }
252
253    async fn fetch_balances(
254        &self,
255        assets: &[AssetNameExchange],
256    ) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
257        let (response_tx, response_rx) = oneshot::channel();
258
259        self.request_tx
260            .send(MockExchangeRequest::fetch_balances(
261                self.time_request(),
262                assets.to_vec(),
263                response_tx,
264            ))
265            .map_err(|_| {
266                UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
267                    self.mocked_exchange,
268                ))
269            })?;
270
271        response_rx.await.map_err(|_| {
272            UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
273                self.mocked_exchange,
274            ))
275        })
276    }
277
278    async fn fetch_open_orders(
279        &self,
280        instruments: &[InstrumentNameExchange],
281    ) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
282        let (response_tx, response_rx) = oneshot::channel();
283
284        self.request_tx
285            .send(MockExchangeRequest::fetch_orders_open(
286                self.time_request(),
287                instruments.to_vec(),
288                response_tx,
289            ))
290            .map_err(|_| {
291                UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
292                    self.mocked_exchange,
293                ))
294            })?;
295
296        response_rx.await.map_err(|_| {
297            UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
298                self.mocked_exchange,
299            ))
300        })
301    }
302
303    async fn fetch_trades(
304        &self,
305        time_since: DateTime<Utc>,
306        // MockExchange fetch_trades doesn't filter by instrument
307        _instruments: &[InstrumentNameExchange],
308    ) -> Result<Vec<Trade<AssetNameExchange, InstrumentNameExchange>>, UnindexedClientError> {
309        let (response_tx, response_rx) = oneshot::channel();
310
311        self.request_tx
312            .send(MockExchangeRequest::fetch_trades(
313                self.time_request(),
314                response_tx,
315                time_since,
316            ))
317            .map_err(|_| {
318                UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
319                    self.mocked_exchange,
320                ))
321            })?;
322
323        response_rx.await.map_err(|_| {
324            UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
325                self.mocked_exchange,
326            ))
327        })
328    }
329}
330
331fn into_owned_request<Kind>(
332    request: OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>,
333) -> OrderEvent<Kind, ExchangeId, InstrumentNameExchange> {
334    let OrderEvent {
335        key:
336            OrderKey {
337                exchange,
338                instrument,
339                strategy,
340                cid,
341            },
342        state,
343    } = request;
344
345    OrderEvent {
346        key: OrderKey {
347            exchange,
348            instrument: instrument.clone(),
349            strategy,
350            cid,
351        },
352        state,
353    }
354}