1use crate::{
2 UnindexedAccountEvent, UnindexedAccountSnapshot,
3 balance::AssetBalance,
4 client::ExecutionClient,
5 error::{ConnectivityError, OrderError, UnindexedClientError, UnindexedOrderError},
6 exchange::mock::request::{MarketPrices, MockExchangeRequest},
7 fee::FeeModelConfig,
8 fill::SimFillConfig,
9 order::{
10 Order, OrderEvent, OrderKey,
11 request::{OrderRequestCancel, OrderRequestOpen, UnindexedOrderResponseCancel},
12 state::{Open, OrderState, UnindexedOrderState},
13 },
14 trade::Trade,
15};
16use chrono::{DateTime, Utc};
17use derive_more::Constructor;
18use futures::stream::BoxStream;
19use rustrade_instrument::{
20 asset::name::AssetNameExchange, exchange::ExchangeId, instrument::name::InstrumentNameExchange,
21};
22use serde::{Deserialize, Serialize};
23use tokio::sync::{broadcast, mpsc, oneshot};
24use tokio_stream::{StreamExt, wrappers::BroadcastStream};
25use tracing::error;
26
27#[derive(
28 Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
29)]
30pub struct MockExecutionConfig {
31 pub mocked_exchange: ExchangeId,
32 pub initial_state: UnindexedAccountSnapshot,
33 pub latency_ms: u64,
34 #[serde(default)]
39 pub fee_model: FeeModelConfig,
40 #[serde(default)]
48 pub fill_model: SimFillConfig,
49}
50
51#[derive(Debug, Constructor)]
52pub struct MockExecutionClientConfig<FnTime> {
53 pub mocked_exchange: ExchangeId,
54 pub clock: FnTime,
55 pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
56 pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
57}
58
59impl<FnTime> Clone for MockExecutionClientConfig<FnTime>
60where
61 FnTime: Clone,
62{
63 fn clone(&self) -> Self {
64 Self {
65 mocked_exchange: self.mocked_exchange,
66 clock: self.clock.clone(),
67 request_tx: self.request_tx.clone(),
68 event_rx: self.event_rx.resubscribe(),
69 }
70 }
71}
72
73#[derive(Debug, Constructor)]
74pub struct MockExecution<FnTime> {
75 pub mocked_exchange: ExchangeId,
76 pub clock: FnTime,
77 pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
78 pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
79}
80
81impl<FnTime> Clone for MockExecution<FnTime>
82where
83 FnTime: Clone,
84{
85 fn clone(&self) -> Self {
86 Self {
87 mocked_exchange: self.mocked_exchange,
88 clock: self.clock.clone(),
89 request_tx: self.request_tx.clone(),
90 event_rx: self.event_rx.resubscribe(),
91 }
92 }
93}
94
95impl<FnTime> MockExecution<FnTime>
96where
97 FnTime: Fn() -> DateTime<Utc>,
98{
99 pub fn time_request(&self) -> DateTime<Utc> {
100 (self.clock)()
101 }
102}
103
104impl<FnTime> ExecutionClient for MockExecution<FnTime>
105where
106 FnTime: Fn() -> DateTime<Utc> + Clone + Send + Sync,
107{
108 const EXCHANGE: ExchangeId = ExchangeId::Mock;
109 type Config = MockExecutionClientConfig<FnTime>;
110 type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
111
112 fn new(config: Self::Config) -> Self {
113 Self {
114 mocked_exchange: config.mocked_exchange,
115 clock: config.clock,
116 request_tx: config.request_tx,
117 event_rx: config.event_rx,
118 }
119 }
120
121 async fn account_snapshot(
122 &self,
123 _: &[AssetNameExchange],
124 _: &[InstrumentNameExchange],
125 ) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
126 let (response_tx, response_rx) = oneshot::channel();
127
128 self.request_tx
129 .send(MockExchangeRequest::fetch_account_snapshot(
130 self.time_request(),
131 response_tx,
132 ))
133 .map_err(|_| {
134 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
135 self.mocked_exchange,
136 ))
137 })?;
138
139 response_rx.await.map_err(|_| {
140 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
141 self.mocked_exchange,
142 ))
143 })
144 }
145
146 async fn account_stream(
147 &self,
148 _: &[AssetNameExchange],
149 _: &[InstrumentNameExchange],
150 ) -> Result<Self::AccountStream, UnindexedClientError> {
151 Ok(futures::StreamExt::boxed(
152 BroadcastStream::new(self.event_rx.resubscribe()).map_while(|result| match result {
153 Ok(event) => Some(event),
154 Err(error) => {
155 error!(
156 ?error,
157 "MockExchange Broadcast AccountStream lagged - terminating"
158 );
159 None
160 }
161 }),
162 ))
163 }
164
165 async fn cancel_order(
166 &self,
167 request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
168 ) -> Option<UnindexedOrderResponseCancel> {
169 let (response_tx, response_rx) = oneshot::channel();
170
171 let key = OrderKey {
172 exchange: request.key.exchange,
173 instrument: request.key.instrument.clone(),
174 strategy: request.key.strategy.clone(),
175 cid: request.key.cid.clone(),
176 };
177
178 if self
179 .request_tx
180 .send(MockExchangeRequest::cancel_order(
181 self.time_request(),
182 response_tx,
183 into_owned_request(request),
184 ))
185 .is_err()
186 {
187 return Some(UnindexedOrderResponseCancel {
188 key,
189 state: Err(UnindexedOrderError::Connectivity(
190 ConnectivityError::ExchangeOffline(self.mocked_exchange),
191 )),
192 });
193 }
194
195 Some(match response_rx.await {
196 Ok(response) => response,
197 Err(_) => UnindexedOrderResponseCancel {
198 key,
199 state: Err(UnindexedOrderError::Connectivity(
200 ConnectivityError::ExchangeOffline(self.mocked_exchange),
201 )),
202 },
203 })
204 }
205
206 async fn open_order(
207 &self,
208 request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
209 ) -> Option<Order<ExchangeId, InstrumentNameExchange, UnindexedOrderState>> {
210 let (response_tx, response_rx) = oneshot::channel();
211
212 let request = into_owned_request(request);
213
214 if self
215 .request_tx
216 .send(MockExchangeRequest::open_order(
217 self.time_request(),
218 response_tx,
219 request.clone(),
220 MarketPrices::default(), ))
222 .is_err()
223 {
224 return Some(Order {
225 key: request.key,
226 side: request.state.side,
227 price: request.state.price,
228 quantity: request.state.quantity,
229 kind: request.state.kind,
230 time_in_force: request.state.time_in_force,
231 state: OrderState::inactive(OrderError::Connectivity(
232 ConnectivityError::ExchangeOffline(self.mocked_exchange),
233 )),
234 });
235 }
236
237 Some(match response_rx.await {
238 Ok(response) => response,
239 Err(_) => Order {
240 key: request.key,
241 side: request.state.side,
242 price: request.state.price,
243 quantity: request.state.quantity,
244 kind: request.state.kind,
245 time_in_force: request.state.time_in_force,
246 state: OrderState::inactive(OrderError::Connectivity(
247 ConnectivityError::ExchangeOffline(self.mocked_exchange),
248 )),
249 },
250 })
251 }
252
253 async fn fetch_balances(
254 &self,
255 assets: &[AssetNameExchange],
256 ) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
257 let (response_tx, response_rx) = oneshot::channel();
258
259 self.request_tx
260 .send(MockExchangeRequest::fetch_balances(
261 self.time_request(),
262 assets.to_vec(),
263 response_tx,
264 ))
265 .map_err(|_| {
266 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
267 self.mocked_exchange,
268 ))
269 })?;
270
271 response_rx.await.map_err(|_| {
272 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
273 self.mocked_exchange,
274 ))
275 })
276 }
277
278 async fn fetch_open_orders(
279 &self,
280 instruments: &[InstrumentNameExchange],
281 ) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
282 let (response_tx, response_rx) = oneshot::channel();
283
284 self.request_tx
285 .send(MockExchangeRequest::fetch_orders_open(
286 self.time_request(),
287 instruments.to_vec(),
288 response_tx,
289 ))
290 .map_err(|_| {
291 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
292 self.mocked_exchange,
293 ))
294 })?;
295
296 response_rx.await.map_err(|_| {
297 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
298 self.mocked_exchange,
299 ))
300 })
301 }
302
303 async fn fetch_trades(
304 &self,
305 time_since: DateTime<Utc>,
306 _instruments: &[InstrumentNameExchange],
308 ) -> Result<Vec<Trade<AssetNameExchange, InstrumentNameExchange>>, UnindexedClientError> {
309 let (response_tx, response_rx) = oneshot::channel();
310
311 self.request_tx
312 .send(MockExchangeRequest::fetch_trades(
313 self.time_request(),
314 response_tx,
315 time_since,
316 ))
317 .map_err(|_| {
318 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
319 self.mocked_exchange,
320 ))
321 })?;
322
323 response_rx.await.map_err(|_| {
324 UnindexedClientError::Connectivity(ConnectivityError::ExchangeOffline(
325 self.mocked_exchange,
326 ))
327 })
328 }
329}
330
331fn into_owned_request<Kind>(
332 request: OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>,
333) -> OrderEvent<Kind, ExchangeId, InstrumentNameExchange> {
334 let OrderEvent {
335 key:
336 OrderKey {
337 exchange,
338 instrument,
339 strategy,
340 cid,
341 },
342 state,
343 } = request;
344
345 OrderEvent {
346 key: OrderKey {
347 exchange,
348 instrument: instrument.clone(),
349 strategy,
350 cid,
351 },
352 state,
353 }
354}