barter_execution/client/mock/
mod.rs1use crate::{
2 UnindexedAccountEvent, UnindexedAccountSnapshot,
3 balance::AssetBalance,
4 client::ExecutionClient,
5 error::{UnindexedClientError, UnindexedOrderError},
6 exchange::mock::request::MockExchangeRequest,
7 order::{
8 Order, OrderEvent, OrderKey,
9 request::{OrderRequestCancel, OrderRequestOpen, UnindexedOrderResponseCancel},
10 state::Open,
11 },
12 trade::Trade,
13};
14use barter_instrument::{
15 asset::{QuoteAsset, name::AssetNameExchange},
16 exchange::ExchangeId,
17 instrument::name::InstrumentNameExchange,
18};
19use chrono::{DateTime, Utc};
20use derive_more::Constructor;
21use futures::stream::BoxStream;
22use rust_decimal::Decimal;
23use serde::{Deserialize, Serialize};
24use tokio::sync::{broadcast, mpsc, oneshot};
25use tokio_stream::{StreamExt, wrappers::BroadcastStream};
26use tracing::error;
27
28#[derive(
29 Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
30)]
31pub struct MockExecutionConfig {
32 pub mocked_exchange: ExchangeId,
33 pub initial_state: UnindexedAccountSnapshot,
34 pub latency_ms: u64,
35 pub fees_percent: Decimal,
36}
37
38#[derive(Debug, Constructor)]
39pub struct MockExecutionClientConfig {
40 pub mocked_exchange: ExchangeId,
41 pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
42 pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
43}
44
45impl Clone for MockExecutionClientConfig {
46 fn clone(&self) -> Self {
47 Self {
48 mocked_exchange: self.mocked_exchange,
49 request_tx: self.request_tx.clone(),
50 event_rx: self.event_rx.resubscribe(),
51 }
52 }
53}
54
55#[derive(Debug, Constructor)]
56pub struct MockExecution {
57 pub mocked_exchange: ExchangeId,
58 pub request_tx: mpsc::UnboundedSender<MockExchangeRequest>,
59 pub event_rx: broadcast::Receiver<UnindexedAccountEvent>,
60}
61
62impl Clone for MockExecution {
63 fn clone(&self) -> Self {
64 Self {
65 mocked_exchange: self.mocked_exchange,
66 request_tx: self.request_tx.clone(),
67 event_rx: self.event_rx.resubscribe(),
68 }
69 }
70}
71
72impl MockExecution {
73 pub fn time_request(&self) -> DateTime<Utc> {
74 Utc::now()
76 }
77}
78
79impl ExecutionClient for MockExecution {
80 const EXCHANGE: ExchangeId = ExchangeId::Mock;
81 type Config = MockExecutionClientConfig;
82 type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
83
84 fn new(config: Self::Config) -> Self {
85 Self {
86 mocked_exchange: config.mocked_exchange,
87 request_tx: config.request_tx,
88 event_rx: config.event_rx,
89 }
90 }
91
92 async fn account_snapshot(
93 &self,
94 _: &[AssetNameExchange],
95 _: &[InstrumentNameExchange],
96 ) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
97 let (response_tx, response_rx) = oneshot::channel();
98
99 self.request_tx
100 .send(MockExchangeRequest::fetch_account_snapshot(
101 self.time_request(),
102 response_tx,
103 ))
104 .expect("MockExchange is offline - failed to send request");
105
106 let snapshot = response_rx
107 .await
108 .expect("MockExchange if offline - failed to receive response");
109
110 Ok(snapshot)
111 }
112
113 async fn account_stream(
114 &self,
115 _: &[AssetNameExchange],
116 _: &[InstrumentNameExchange],
117 ) -> Result<Self::AccountStream, UnindexedClientError> {
118 Ok(futures::StreamExt::boxed(
119 BroadcastStream::new(self.event_rx.resubscribe()).map_while(|result| match result {
120 Ok(event) => Some(event),
121 Err(error) => {
122 error!(
123 ?error,
124 "MockExchange Broadcast AccountStream lagged - terminating"
125 );
126 None
127 }
128 }),
129 ))
130 }
131
132 async fn cancel_order(
133 &self,
134 request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
135 ) -> UnindexedOrderResponseCancel {
136 let (response_tx, response_rx) = oneshot::channel();
137
138 self.request_tx
139 .send(MockExchangeRequest::cancel_order(
140 self.time_request(),
141 response_tx,
142 into_owned_request(request),
143 ))
144 .expect("MockExchange is offline - failed to send request");
145
146 response_rx
147 .await
148 .expect("MockExchange if offline - failed to receive response")
149 }
150
151 async fn open_order(
152 &self,
153 request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
154 ) -> Order<ExchangeId, InstrumentNameExchange, Result<Open, UnindexedOrderError>> {
155 let (response_tx, response_rx) = oneshot::channel();
156
157 self.request_tx
158 .send(MockExchangeRequest::open_order(
159 self.time_request(),
160 response_tx,
161 into_owned_request(request),
162 ))
163 .expect("MockExchange is offline - failed to send request");
164
165 response_rx
166 .await
167 .expect("MockExchange if offline - failed to receive response")
168 }
169
170 async fn fetch_balances(
171 &self,
172 ) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
173 let (response_tx, response_rx) = oneshot::channel();
174
175 self.request_tx
176 .send(MockExchangeRequest::fetch_balances(
177 self.time_request(),
178 response_tx,
179 ))
180 .expect("MockExchange is offline - failed to send request");
181
182 let balances = response_rx
183 .await
184 .expect("MockExchange if offline - failed to receive response");
185
186 Ok(balances)
187 }
188
189 async fn fetch_open_orders(
190 &self,
191 ) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
192 let (response_tx, response_rx) = oneshot::channel();
193
194 self.request_tx
195 .send(MockExchangeRequest::fetch_orders_open(
196 self.time_request(),
197 response_tx,
198 ))
199 .expect("MockExchange is offline - failed to send request");
200
201 let open_orders = response_rx
202 .await
203 .expect("MockExchange if offline - failed to receive response");
204
205 Ok(open_orders)
206 }
207
208 async fn fetch_trades(
209 &self,
210 time_since: DateTime<Utc>,
211 ) -> Result<Vec<Trade<QuoteAsset, InstrumentNameExchange>>, UnindexedClientError> {
212 let (response_tx, response_rx) = oneshot::channel();
213
214 self.request_tx
215 .send(MockExchangeRequest::fetch_trades(
216 self.time_request(),
217 response_tx,
218 time_since,
219 ))
220 .expect("MockExchange is offline - failed to send request");
221
222 let trades = response_rx
223 .await
224 .expect("MockExchange if offline - failed to receive response");
225
226 Ok(trades)
227 }
228}
229
230fn into_owned_request<Kind>(
231 request: OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>,
232) -> OrderEvent<Kind, ExchangeId, InstrumentNameExchange> {
233 let OrderEvent {
234 key:
235 OrderKey {
236 exchange,
237 instrument,
238 strategy,
239 cid,
240 },
241 state,
242 } = request;
243
244 OrderEvent {
245 key: OrderKey {
246 exchange,
247 instrument: instrument.clone(),
248 strategy,
249 cid,
250 },
251 state,
252 }
253}