Skip to main content

rustrade_execution/
indexer.rs

1use crate::{
2    AccountEvent, AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot,
3    UnindexedAccountEvent, UnindexedAccountSnapshot,
4    balance::AssetBalance,
5    error::{
6        ApiError, ClientError, KeyError, OrderError, UnindexedApiError, UnindexedClientError,
7        UnindexedOrderError,
8    },
9    map::ExecutionInstrumentMap,
10    order::{
11        Order, OrderEvent, OrderKey, OrderSnapshot, UnindexedOrderKey, UnindexedOrderSnapshot,
12        request::OrderResponseCancel,
13        state::{InactiveOrderState, OrderState, UnindexedOrderState},
14    },
15    trade::{AssetFees, Trade},
16};
17use derive_more::Constructor;
18use rustrade_instrument::{
19    asset::{AssetIndex, name::AssetNameExchange},
20    exchange::{ExchangeId, ExchangeIndex},
21    index::error::IndexError,
22    instrument::{InstrumentIndex, name::InstrumentNameExchange},
23};
24use rustrade_integration::{
25    collection::snapshot::Snapshot,
26    stream::ext::indexed::{IndexedStream, Indexer},
27};
28use std::sync::Arc;
29
30pub type IndexedAccountStream<St> = IndexedStream<St, AccountEventIndexer>;
31
32#[derive(Debug, Clone, Constructor)]
33pub struct AccountEventIndexer {
34    pub map: Arc<ExecutionInstrumentMap>,
35}
36
37impl Indexer for AccountEventIndexer {
38    type Unindexed = UnindexedAccountEvent;
39    type Indexed = AccountEvent;
40
41    fn index(&self, item: Self::Unindexed) -> Result<Self::Indexed, IndexError> {
42        self.account_event(item)
43    }
44}
45
46impl AccountEventIndexer {
47    pub fn account_event(&self, event: UnindexedAccountEvent) -> Result<AccountEvent, IndexError> {
48        let UnindexedAccountEvent { exchange, kind } = event;
49
50        let exchange = self.map.find_exchange_index(exchange)?;
51
52        let kind = match kind {
53            AccountEventKind::Snapshot(snapshot) => {
54                AccountEventKind::Snapshot(self.snapshot(snapshot)?)
55            }
56            AccountEventKind::BalanceSnapshot(snapshot) => {
57                AccountEventKind::BalanceSnapshot(self.asset_balance(snapshot.0).map(Snapshot)?)
58            }
59            AccountEventKind::OrderSnapshot(snapshot) => {
60                AccountEventKind::OrderSnapshot(self.order_snapshot(snapshot.0).map(Snapshot)?)
61            }
62            AccountEventKind::OrderCancelled(response) => {
63                AccountEventKind::OrderCancelled(self.order_response_cancel(response)?)
64            }
65            AccountEventKind::Trade(trade) => AccountEventKind::Trade(self.trade(trade)?),
66            AccountEventKind::StreamError(msg) => AccountEventKind::StreamError(msg),
67        };
68
69        Ok(AccountEvent { exchange, kind })
70    }
71
72    pub fn snapshot(
73        &self,
74        snapshot: UnindexedAccountSnapshot,
75    ) -> Result<AccountSnapshot, IndexError> {
76        let UnindexedAccountSnapshot {
77            exchange,
78            balances,
79            instruments,
80        } = snapshot;
81
82        let exchange = self.map.find_exchange_index(exchange)?;
83
84        let balances = balances
85            .into_iter()
86            .map(|balance| self.asset_balance(balance))
87            .collect::<Result<Vec<_>, _>>()?;
88
89        let instruments = instruments
90            .into_iter()
91            .map(|snapshot| {
92                let InstrumentAccountSnapshot {
93                    instrument,
94                    orders,
95                    position,
96                } = snapshot;
97
98                let instrument = self.map.find_instrument_index(&instrument)?;
99
100                let orders = orders
101                    .into_iter()
102                    .map(|order| self.order_snapshot(order))
103                    .collect::<Result<Vec<_>, _>>()?;
104
105                Ok(InstrumentAccountSnapshot {
106                    instrument,
107                    orders,
108                    position,
109                })
110            })
111            .collect::<Result<Vec<_>, _>>()?;
112
113        Ok(AccountSnapshot {
114            exchange,
115            balances,
116            instruments,
117        })
118    }
119
120    pub fn asset_balance(
121        &self,
122        balance: AssetBalance<AssetNameExchange>,
123    ) -> Result<AssetBalance<AssetIndex>, IndexError> {
124        let AssetBalance {
125            asset,
126            balance,
127            time_exchange,
128        } = balance;
129        let asset = self.map.find_asset_index(&asset)?;
130
131        Ok(AssetBalance {
132            asset,
133            balance,
134            time_exchange,
135        })
136    }
137
138    pub fn order_snapshot(
139        &self,
140        order: UnindexedOrderSnapshot,
141    ) -> Result<OrderSnapshot, IndexError> {
142        let Order {
143            key,
144            side,
145            price,
146            quantity,
147            kind,
148            time_in_force,
149            state,
150        } = order;
151
152        let key = self.order_key(key)?;
153        let state = self.order_state(state)?;
154
155        Ok(Order {
156            key,
157            side,
158            price,
159            quantity,
160            kind,
161            time_in_force,
162            state,
163        })
164    }
165
166    pub fn order_response_cancel(
167        &self,
168        response: OrderResponseCancel<ExchangeId, AssetNameExchange, InstrumentNameExchange>,
169    ) -> Result<OrderResponseCancel, IndexError> {
170        let OrderResponseCancel { key, state } = response;
171
172        Ok(OrderResponseCancel {
173            key: self.order_key(key)?,
174            state: match state {
175                Ok(cancelled) => Ok(cancelled),
176                Err(error) => Err(self.order_error(error)?),
177            },
178        })
179    }
180
181    pub fn order_key(&self, key: UnindexedOrderKey) -> Result<OrderKey, IndexError> {
182        let UnindexedOrderKey {
183            exchange,
184            instrument,
185            strategy,
186            cid,
187        } = key;
188
189        Ok(OrderKey {
190            exchange: self.map.find_exchange_index(exchange)?,
191            instrument: self.map.find_instrument_index(&instrument)?,
192            strategy,
193            cid,
194        })
195    }
196
197    /// Index an [`UnindexedOrderState`] to an [`OrderState`].
198    ///
199    /// Used by `ExecutionManager` to index `open_order` responses.
200    pub fn order_state(&self, state: UnindexedOrderState) -> Result<OrderState, IndexError> {
201        Ok(match state {
202            UnindexedOrderState::Active(active) => OrderState::Active(active),
203            UnindexedOrderState::Inactive(inactive) => match inactive {
204                InactiveOrderState::OpenFailed(failed) => match failed {
205                    OrderError::Rejected(rejected) => {
206                        OrderState::inactive(OrderError::Rejected(self.api_error(rejected)?))
207                    }
208                    OrderError::Connectivity(error) => {
209                        OrderState::inactive(OrderError::Connectivity(error))
210                    }
211                },
212                InactiveOrderState::Cancelled(cancelled) => OrderState::inactive(cancelled),
213                InactiveOrderState::FullyFilled(filled) => OrderState::fully_filled(filled),
214                InactiveOrderState::Expired(expired) => OrderState::expired(expired),
215            },
216        })
217    }
218
219    pub fn api_error(&self, error: UnindexedApiError) -> Result<ApiError, IndexError> {
220        Ok(match error {
221            UnindexedApiError::RateLimit => ApiError::RateLimit,
222            UnindexedApiError::Unauthenticated(msg) => ApiError::Unauthenticated(msg),
223            UnindexedApiError::AssetInvalid(asset, value) => {
224                ApiError::AssetInvalid(self.map.find_asset_index(&asset)?, value)
225            }
226            UnindexedApiError::InstrumentInvalid(instrument, value) => {
227                ApiError::InstrumentInvalid(self.map.find_instrument_index(&instrument)?, value)
228            }
229            UnindexedApiError::BalanceInsufficient(asset, value) => {
230                ApiError::BalanceInsufficient(self.map.find_asset_index(&asset)?, value)
231            }
232            UnindexedApiError::OrderRejected(reason) => ApiError::OrderRejected(reason),
233            UnindexedApiError::OrderAlreadyCancelled => ApiError::OrderAlreadyCancelled,
234            UnindexedApiError::OrderAlreadyFullyFilled => ApiError::OrderAlreadyFullyFilled,
235        })
236    }
237
238    pub fn order_request<Kind>(
239        &self,
240        order: &OrderEvent<Kind, ExchangeIndex, InstrumentIndex>,
241    ) -> Result<OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>, KeyError>
242    where
243        Kind: Clone,
244    {
245        let OrderEvent {
246            key:
247                OrderKey {
248                    exchange,
249                    instrument,
250                    strategy,
251                    cid,
252                },
253            state,
254        } = order;
255
256        let exchange = self.map.find_exchange_id(*exchange)?;
257        let instrument = self.map.find_instrument_name_exchange(*instrument)?;
258
259        Ok(OrderEvent {
260            key: OrderKey {
261                exchange,
262                instrument,
263                strategy: strategy.clone(),
264                cid: cid.clone(),
265            },
266            state: state.clone(),
267        })
268    }
269
270    pub fn order_error(&self, error: UnindexedOrderError) -> Result<OrderError, IndexError> {
271        Ok(match error {
272            UnindexedOrderError::Connectivity(error) => OrderError::Connectivity(error),
273            UnindexedOrderError::Rejected(error) => OrderError::Rejected(self.api_error(error)?),
274        })
275    }
276
277    pub fn client_error(&self, error: UnindexedClientError) -> Result<ClientError, IndexError> {
278        Ok(match error {
279            UnindexedClientError::Connectivity(error) => ClientError::Connectivity(error),
280            UnindexedClientError::Api(error) => ClientError::Api(self.api_error(error)?),
281            UnindexedClientError::TaskFailed(value) => ClientError::TaskFailed(value),
282            UnindexedClientError::Internal(value) => ClientError::Internal(value),
283            UnindexedClientError::Truncated { limit } => ClientError::Truncated { limit },
284            UnindexedClientError::TruncatedSnapshot { limit } => {
285                ClientError::TruncatedSnapshot { limit }
286            }
287        })
288    }
289
290    /// Index a trade, converting fee asset and computing `fees_quote`.
291    ///
292    /// Computes `fees_quote` based on fee asset relationship to instrument:
293    /// - Fee in quote asset: `fees_quote = Some(fees)`
294    /// - Fee in base asset: `fees_quote = Some(fees * price)`
295    /// - Fee in third-party asset (e.g., BNB): `fees_quote = None`
296    ///
297    /// # Errors
298    /// Returns `IndexError` if fee asset is not in the map. Some integrations use
299    /// "UNKNOWN" as a placeholder when fee data is unavailable (e.g., IBKR `fetch_trades`,
300    /// Binance when API omits `commission_asset`). These trades will fail indexing.
301    pub fn trade(
302        &self,
303        trade: Trade<AssetNameExchange, InstrumentNameExchange>,
304    ) -> Result<Trade<AssetIndex, InstrumentIndex>, IndexError> {
305        let Trade {
306            id,
307            order_id,
308            instrument,
309            strategy,
310            time_exchange,
311            side,
312            price: trade_price,
313            quantity,
314            fees,
315        } = trade;
316
317        let instrument_index = self.map.find_instrument_index(&instrument)?;
318        let fee_asset_index = self.map.find_asset_index(&fees.asset)?;
319
320        // Compute fees_quote based on fee asset relationship to instrument
321        let fees_quote = self
322            .map
323            .instruments
324            .get_index(instrument_index.index())
325            .and_then(|instr| {
326                if fee_asset_index == instr.underlying.quote {
327                    // Fee is in quote asset — no conversion needed
328                    Some(fees.fees)
329                } else if fee_asset_index == instr.underlying.base {
330                    // Fee is in base asset — convert using trade price
331                    Some(fees.fees * trade_price)
332                } else {
333                    // Fee is in third-party asset (e.g., BNB) — needs external price
334                    None
335                }
336            });
337
338        Ok(Trade {
339            id,
340            order_id,
341            instrument: instrument_index,
342            strategy,
343            time_exchange,
344            side,
345            price: trade_price,
346            quantity,
347            fees: AssetFees {
348                asset: fee_asset_index,
349                fees: fees.fees,
350                fees_quote,
351            },
352        })
353    }
354}