barter_execution/
indexer.rs

1use crate::{
2    AccountEvent, AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot,
3    UnindexedAccountEvent, UnindexedAccountSnapshot,
4    balance::AssetBalance,
5    error::{
6        ApiError, ClientError, KeyError, OrderError, UnindexedApiError, UnindexedClientError,
7        UnindexedOrderError,
8    },
9    map::ExecutionInstrumentMap,
10    order::{
11        Order, OrderEvent, OrderKey, OrderSnapshot, UnindexedOrderKey, UnindexedOrderSnapshot,
12        request::OrderResponseCancel,
13        state::{InactiveOrderState, OrderState, UnindexedOrderState},
14    },
15    trade::Trade,
16};
17use barter_instrument::{
18    asset::{AssetIndex, QuoteAsset, name::AssetNameExchange},
19    exchange::{ExchangeId, ExchangeIndex},
20    index::error::IndexError,
21    instrument::{InstrumentIndex, name::InstrumentNameExchange},
22};
23use barter_integration::{
24    snapshot::Snapshot,
25    stream::indexed::{IndexedStream, Indexer},
26};
27use derive_more::Constructor;
28use std::sync::Arc;
29
30pub type IndexedAccountStream<St> = IndexedStream<AccountEventIndexer, St>;
31
32#[derive(Debug, Clone, Constructor)]
33pub struct AccountEventIndexer {
34    pub map: Arc<ExecutionInstrumentMap>,
35}
36
37impl Indexer for AccountEventIndexer {
38    type Unindexed = UnindexedAccountEvent;
39    type Indexed = AccountEvent;
40
41    fn index(&self, item: Self::Unindexed) -> Result<Self::Indexed, IndexError> {
42        self.account_event(item)
43    }
44}
45
46impl AccountEventIndexer {
47    pub fn account_event(&self, event: UnindexedAccountEvent) -> Result<AccountEvent, IndexError> {
48        let UnindexedAccountEvent { exchange, kind } = event;
49
50        let exchange = self.map.find_exchange_index(exchange)?;
51
52        let kind = match kind {
53            AccountEventKind::Snapshot(snapshot) => {
54                AccountEventKind::Snapshot(self.snapshot(snapshot)?)
55            }
56            AccountEventKind::BalanceSnapshot(snapshot) => {
57                AccountEventKind::BalanceSnapshot(self.asset_balance(snapshot.0).map(Snapshot)?)
58            }
59            AccountEventKind::OrderSnapshot(snapshot) => {
60                AccountEventKind::OrderSnapshot(self.order_snapshot(snapshot.0).map(Snapshot)?)
61            }
62            AccountEventKind::OrderCancelled(response) => {
63                AccountEventKind::OrderCancelled(self.order_response_cancel(response)?)
64            }
65            AccountEventKind::Trade(trade) => AccountEventKind::Trade(self.trade(trade)?),
66        };
67
68        Ok(AccountEvent { exchange, kind })
69    }
70
71    pub fn snapshot(
72        &self,
73        snapshot: UnindexedAccountSnapshot,
74    ) -> Result<AccountSnapshot, IndexError> {
75        let UnindexedAccountSnapshot {
76            exchange,
77            balances,
78            instruments,
79        } = snapshot;
80
81        let exchange = self.map.find_exchange_index(exchange)?;
82
83        let balances = balances
84            .into_iter()
85            .map(|balance| self.asset_balance(balance))
86            .collect::<Result<Vec<_>, _>>()?;
87
88        let instruments = instruments
89            .into_iter()
90            .map(|snapshot| {
91                let InstrumentAccountSnapshot { instrument, orders } = snapshot;
92
93                let instrument = self.map.find_instrument_index(&instrument)?;
94
95                let orders = orders
96                    .into_iter()
97                    .map(|order| self.order_snapshot(order))
98                    .collect::<Result<Vec<_>, _>>()?;
99
100                Ok(InstrumentAccountSnapshot { instrument, orders })
101            })
102            .collect::<Result<Vec<_>, _>>()?;
103
104        Ok(AccountSnapshot {
105            exchange,
106            balances,
107            instruments,
108        })
109    }
110
111    pub fn asset_balance(
112        &self,
113        balance: AssetBalance<AssetNameExchange>,
114    ) -> Result<AssetBalance<AssetIndex>, IndexError> {
115        let AssetBalance {
116            asset,
117            balance,
118            time_exchange,
119        } = balance;
120        let asset = self.map.find_asset_index(&asset)?;
121
122        Ok(AssetBalance {
123            asset,
124            balance,
125            time_exchange,
126        })
127    }
128
129    pub fn order_snapshot(
130        &self,
131        order: UnindexedOrderSnapshot,
132    ) -> Result<OrderSnapshot, IndexError> {
133        let Order {
134            key,
135            side,
136            price,
137            quantity,
138            kind,
139            time_in_force,
140            state,
141        } = order;
142
143        let key = self.order_key(key)?;
144
145        let state = match state {
146            UnindexedOrderState::Active(active) => OrderState::Active(active),
147            UnindexedOrderState::Inactive(inactive) => match inactive {
148                InactiveOrderState::OpenFailed(failed) => match failed {
149                    OrderError::Rejected(rejected) => {
150                        OrderState::inactive(OrderError::Rejected(self.api_error(rejected)?))
151                    }
152                    OrderError::Connectivity(error) => {
153                        OrderState::inactive(OrderError::Connectivity(error))
154                    }
155                },
156                InactiveOrderState::Cancelled(cancelled) => OrderState::inactive(cancelled),
157                InactiveOrderState::FullyFilled => OrderState::fully_filled(),
158                InactiveOrderState::Expired => OrderState::expired(),
159            },
160        };
161
162        Ok(Order {
163            key,
164            side,
165            price,
166            quantity,
167            kind,
168            time_in_force,
169            state,
170        })
171    }
172
173    pub fn order_response_cancel(
174        &self,
175        response: OrderResponseCancel<ExchangeId, AssetNameExchange, InstrumentNameExchange>,
176    ) -> Result<OrderResponseCancel, IndexError> {
177        let OrderResponseCancel { key, state } = response;
178
179        Ok(OrderResponseCancel {
180            key: self.order_key(key)?,
181            state: match state {
182                Ok(cancelled) => Ok(cancelled),
183                Err(error) => Err(self.order_error(error)?),
184            },
185        })
186    }
187
188    pub fn order_key(&self, key: UnindexedOrderKey) -> Result<OrderKey, IndexError> {
189        let UnindexedOrderKey {
190            exchange,
191            instrument,
192            strategy,
193            cid,
194        } = key;
195
196        Ok(OrderKey {
197            exchange: self.map.find_exchange_index(exchange)?,
198            instrument: self.map.find_instrument_index(&instrument)?,
199            strategy,
200            cid,
201        })
202    }
203
204    pub fn api_error(&self, error: UnindexedApiError) -> Result<ApiError, IndexError> {
205        Ok(match error {
206            UnindexedApiError::RateLimit => ApiError::RateLimit,
207            UnindexedApiError::AssetInvalid(asset, value) => {
208                ApiError::AssetInvalid(self.map.find_asset_index(&asset)?, value)
209            }
210            UnindexedApiError::InstrumentInvalid(instrument, value) => {
211                ApiError::InstrumentInvalid(self.map.find_instrument_index(&instrument)?, value)
212            }
213            UnindexedApiError::BalanceInsufficient(asset, value) => {
214                ApiError::BalanceInsufficient(self.map.find_asset_index(&asset)?, value)
215            }
216            UnindexedApiError::OrderRejected(reason) => ApiError::OrderRejected(reason),
217            UnindexedApiError::OrderAlreadyCancelled => ApiError::OrderAlreadyCancelled,
218            UnindexedApiError::OrderAlreadyFullyFilled => ApiError::OrderAlreadyFullyFilled,
219        })
220    }
221
222    pub fn order_request<Kind>(
223        &self,
224        order: &OrderEvent<Kind, ExchangeIndex, InstrumentIndex>,
225    ) -> Result<OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>, KeyError>
226    where
227        Kind: Clone,
228    {
229        let OrderEvent {
230            key:
231                OrderKey {
232                    exchange,
233                    instrument,
234                    strategy,
235                    cid,
236                },
237            state,
238        } = order;
239
240        let exchange = self.map.find_exchange_id(*exchange)?;
241        let instrument = self.map.find_instrument_name_exchange(*instrument)?;
242
243        Ok(OrderEvent {
244            key: OrderKey {
245                exchange,
246                instrument,
247                strategy: strategy.clone(),
248                cid: cid.clone(),
249            },
250            state: state.clone(),
251        })
252    }
253
254    pub fn order_error(&self, error: UnindexedOrderError) -> Result<OrderError, IndexError> {
255        Ok(match error {
256            UnindexedOrderError::Connectivity(error) => OrderError::Connectivity(error),
257            UnindexedOrderError::Rejected(error) => OrderError::Rejected(self.api_error(error)?),
258        })
259    }
260
261    pub fn client_error(&self, error: UnindexedClientError) -> Result<ClientError, IndexError> {
262        Ok(match error {
263            UnindexedClientError::Connectivity(error) => ClientError::Connectivity(error),
264            UnindexedClientError::Api(error) => ClientError::Api(self.api_error(error)?),
265            UnindexedClientError::AccountSnapshot(value) => ClientError::AccountSnapshot(value),
266            UnindexedClientError::AccountStream(value) => ClientError::AccountStream(value),
267        })
268    }
269
270    pub fn trade(
271        &self,
272        trade: Trade<QuoteAsset, InstrumentNameExchange>,
273    ) -> Result<Trade<QuoteAsset, InstrumentIndex>, IndexError> {
274        let Trade {
275            id,
276            order_id,
277            instrument,
278            strategy,
279            time_exchange,
280            side,
281            price,
282            quantity,
283            fees,
284        } = trade;
285
286        let instrument_index = self.map.find_instrument_index(&instrument)?;
287
288        Ok(Trade {
289            id,
290            order_id,
291            instrument: instrument_index,
292            strategy,
293            time_exchange,
294            side,
295            price,
296            quantity,
297            fees,
298        })
299    }
300}