barter_execution/client/mock/
mod.rs

1use crate::{
2    UnindexedAccountEvent, UnindexedAccountSnapshot,
3    balance::AssetBalance,
4    client::ExecutionClient,
5    error::{UnindexedClientError, UnindexedOrderError},
6    exchange::mock::request::MockExchangeRequest,
7    order::{
8        Order, OrderEvent, OrderKey,
9        request::{OrderRequestCancel, OrderRequestOpen, UnindexedOrderResponseCancel},
10        state::Open,
11    },
12    trade::Trade,
13};
14use barter_instrument::{
15    asset::{QuoteAsset, name::AssetNameExchange},
16    exchange::ExchangeId,
17    instrument::name::InstrumentNameExchange,
18};
19use chrono::{DateTime, Utc};
20use derive_more::Constructor;
21use futures::stream::BoxStream;
22use rust_decimal::Decimal;
23use serde::{Deserialize, Serialize};
24use tokio::sync::{broadcast, mpsc, oneshot};
25use tokio_stream::{StreamExt, wrappers::BroadcastStream};
26use tracing::error;
27
28#[derive(
29    Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
30)]
31pub struct MockExecutionConfig {
32    pub mocked_exchange: ExchangeId,
33    pub initial_state: UnindexedAccountSnapshot,
34    pub latency_ms: u64,
35    pub fees_percent: Decimal,
36}
37
38#[derive(Debug, Constructor)]
39pub struct MockExecutionClientConfig {
40    pub mocked_exchange: ExchangeId,
41    pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
42    pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
43}
44
45impl Clone for MockExecutionClientConfig {
46    fn clone(&self) -> Self {
47        Self {
48            mocked_exchange: self.mocked_exchange,
49            request_tx: self.request_tx.clone(),
50            event_rx: self.event_rx.resubscribe(),
51        }
52    }
53}
54
55#[derive(Debug, Constructor)]
56pub struct MockExecution {
57    pub mocked_exchange: ExchangeId,
58    pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
59    pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
60}
61
62impl Clone for MockExecution {
63    fn clone(&self) -> Self {
64        Self {
65            mocked_exchange: self.mocked_exchange,
66            request_tx: self.request_tx.clone(),
67            event_rx: self.event_rx.resubscribe(),
68        }
69    }
70}
71
72impl MockExecution {
73    pub fn time_request(&self) -> DateTime<Utc> {
74        // Todo: use input time_engine from requests once this is added
75        Utc::now()
76    }
77}
78
79impl ExecutionClient for MockExecution {
80    const EXCHANGE: ExchangeId = ExchangeId::Mock;
81    type Config = MockExecutionClientConfig;
82    type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
83
84    fn new(config: Self::Config) -> Self {
85        Self {
86            mocked_exchange: config.mocked_exchange,
87            request_tx: config.request_tx,
88            event_rx: config.event_rx,
89        }
90    }
91
92    async fn account_snapshot(
93        &self,
94        _: &[AssetNameExchange],
95        _: &[InstrumentNameExchange],
96    ) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
97        let (response_tx, response_rx) = oneshot::channel();
98
99        self.request_tx
100            .send(MockExchangeRequest::fetch_account_snapshot(
101                self.time_request(),
102                response_tx,
103            ))
104            .expect("MockExchange is offline - failed to send request");
105
106        let snapshot = response_rx
107            .await
108            .expect("MockExchange if offline - failed to receive response");
109
110        Ok(snapshot)
111    }
112
113    async fn account_stream(
114        &self,
115        _: &[AssetNameExchange],
116        _: &[InstrumentNameExchange],
117    ) -> Result<Self::AccountStream, UnindexedClientError> {
118        Ok(futures::StreamExt::boxed(
119            BroadcastStream::new(self.event_rx.resubscribe()).map_while(|result| match result {
120                Ok(event) => Some(event),
121                Err(error) => {
122                    error!(
123                        ?error,
124                        "MockExchange Broadcast AccountStream lagged - terminating"
125                    );
126                    None
127                }
128            }),
129        ))
130    }
131
132    async fn cancel_order(
133        &self,
134        request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
135    ) -> UnindexedOrderResponseCancel {
136        let (response_tx, response_rx) = oneshot::channel();
137
138        self.request_tx
139            .send(MockExchangeRequest::cancel_order(
140                self.time_request(),
141                response_tx,
142                into_owned_request(request),
143            ))
144            .expect("MockExchange is offline - failed to send request");
145
146        response_rx
147            .await
148            .expect("MockExchange if offline - failed to receive response")
149    }
150
151    async fn open_order(
152        &self,
153        request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
154    ) -> Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>> {
155        let (response_tx, response_rx) = oneshot::channel();
156
157        self.request_tx
158            .send(MockExchangeRequest::open_order(
159                self.time_request(),
160                response_tx,
161                into_owned_request(request),
162            ))
163            .expect("MockExchange is offline - failed to send request");
164
165        response_rx
166            .await
167            .expect("MockExchange if offline - failed to receive response")
168    }
169
170    async fn fetch_balances(
171        &self,
172    ) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
173        let (response_tx, response_rx) = oneshot::channel();
174
175        self.request_tx
176            .send(MockExchangeRequest::fetch_balances(
177                self.time_request(),
178                response_tx,
179            ))
180            .expect("MockExchange is offline - failed to send request");
181
182        let balances = response_rx
183            .await
184            .expect("MockExchange if offline - failed to receive response");
185
186        Ok(balances)
187    }
188
189    async fn fetch_open_orders(
190        &self,
191    ) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
192        let (response_tx, response_rx) = oneshot::channel();
193
194        self.request_tx
195            .send(MockExchangeRequest::fetch_orders_open(
196                self.time_request(),
197                response_tx,
198            ))
199            .expect("MockExchange is offline - failed to send request");
200
201        let open_orders = response_rx
202            .await
203            .expect("MockExchange if offline - failed to receive response");
204
205        Ok(open_orders)
206    }
207
208    async fn fetch_trades(
209        &self,
210        time_since: DateTime<Utc>,
211    ) -> Result<Vec<Trade<QuoteAsset, InstrumentNameExchange>>, UnindexedClientError> {
212        let (response_tx, response_rx) = oneshot::channel();
213
214        self.request_tx
215            .send(MockExchangeRequest::fetch_trades(
216                self.time_request(),
217                response_tx,
218                time_since,
219            ))
220            .expect("MockExchange is offline - failed to send request");
221
222        let trades = response_rx
223            .await
224            .expect("MockExchange if offline - failed to receive response");
225
226        Ok(trades)
227    }
228}
229
230fn into_owned_request<Kind>(
231    request: OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>,
232) -> OrderEvent<Kind, ExchangeId, InstrumentNameExchange> {
233    let OrderEvent {
234        key:
235            OrderKey {
236                exchange,
237                instrument,
238                strategy,
239                cid,
240            },
241        state,
242    } = request;
243
244    OrderEvent {
245        key: OrderKey {
246            exchange,
247            instrument: instrument.clone(),
248            strategy,
249            cid,
250        },
251        state,
252    }
253}