Skip to main content

rustrade_execution/
indexer.rs

1use crate::{
2    AccountEvent, AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot,
3    UnindexedAccountEvent, UnindexedAccountSnapshot,
4    balance::AssetBalance,
5    error::{
6        ApiError, ClientError, KeyError, OrderError, UnindexedApiError, UnindexedClientError,
7        UnindexedOrderError,
8    },
9    map::ExecutionInstrumentMap,
10    order::{
11        Order, OrderEvent, OrderKey, OrderSnapshot, UnindexedOrderKey, UnindexedOrderSnapshot,
12        request::OrderResponseCancel,
13        state::{InactiveOrderState, OrderState, UnindexedOrderState},
14    },
15    trade::{AssetFees, Trade},
16};
17use derive_more::Constructor;
18use rustrade_instrument::{
19    asset::{AssetIndex, name::AssetNameExchange},
20    exchange::{ExchangeId, ExchangeIndex},
21    index::error::IndexError,
22    instrument::{InstrumentIndex, name::InstrumentNameExchange},
23};
24use rustrade_integration::{
25    collection::snapshot::Snapshot,
26    stream::ext::indexed::{IndexedStream, Indexer},
27};
28use std::sync::Arc;
29
30pub type IndexedAccountStream<St> = IndexedStream<St, AccountEventIndexer>;
31
32#[derive(Debug, Clone, Constructor)]
33pub struct AccountEventIndexer {
34    pub map: Arc<ExecutionInstrumentMap>,
35}
36
37impl Indexer for AccountEventIndexer {
38    type Unindexed = UnindexedAccountEvent;
39    type Indexed = AccountEvent;
40
41    fn index(&self, item: Self::Unindexed) -> Result<Self::Indexed, IndexError> {
42        self.account_event(item)
43    }
44}
45
46impl AccountEventIndexer {
47    pub fn account_event(&self, event: UnindexedAccountEvent) -> Result<AccountEvent, IndexError> {
48        let UnindexedAccountEvent { exchange, kind } = event;
49
50        let exchange = self.map.find_exchange_index(exchange)?;
51
52        let kind = match kind {
53            AccountEventKind::Snapshot(snapshot) => {
54                AccountEventKind::Snapshot(self.snapshot(snapshot)?)
55            }
56            AccountEventKind::BalanceSnapshot(snapshot) => {
57                AccountEventKind::BalanceSnapshot(self.asset_balance(snapshot.0).map(Snapshot)?)
58            }
59            AccountEventKind::OrderSnapshot(snapshot) => {
60                AccountEventKind::OrderSnapshot(self.order_snapshot(snapshot.0).map(Snapshot)?)
61            }
62            AccountEventKind::OrderCancelled(response) => {
63                AccountEventKind::OrderCancelled(self.order_response_cancel(response)?)
64            }
65            AccountEventKind::Trade(trade) => AccountEventKind::Trade(self.trade(trade)?),
66            AccountEventKind::StreamError(msg) => AccountEventKind::StreamError(msg),
67        };
68
69        Ok(AccountEvent { exchange, kind })
70    }
71
72    pub fn snapshot(
73        &self,
74        snapshot: UnindexedAccountSnapshot,
75    ) -> Result<AccountSnapshot, IndexError> {
76        let UnindexedAccountSnapshot {
77            exchange,
78            balances,
79            instruments,
80        } = snapshot;
81
82        let exchange = self.map.find_exchange_index(exchange)?;
83
84        let balances = balances
85            .into_iter()
86            .map(|balance| self.asset_balance(balance))
87            .collect::<Result<Vec<_>, _>>()?;
88
89        let instruments = instruments
90            .into_iter()
91            .map(|snapshot| {
92                let InstrumentAccountSnapshot {
93                    instrument,
94                    orders,
95                    position,
96                } = snapshot;
97
98                let instrument = self.map.find_instrument_index(&instrument)?;
99
100                let orders = orders
101                    .into_iter()
102                    .map(|order| self.order_snapshot(order))
103                    .collect::<Result<Vec<_>, _>>()?;
104
105                Ok(InstrumentAccountSnapshot {
106                    instrument,
107                    orders,
108                    position,
109                })
110            })
111            .collect::<Result<Vec<_>, _>>()?;
112
113        Ok(AccountSnapshot {
114            exchange,
115            balances,
116            instruments,
117        })
118    }
119
120    pub fn asset_balance(
121        &self,
122        balance: AssetBalance<AssetNameExchange>,
123    ) -> Result<AssetBalance<AssetIndex>, IndexError> {
124        let AssetBalance {
125            asset,
126            balance,
127            time_exchange,
128        } = balance;
129        let asset = self.map.find_asset_index(&asset)?;
130
131        Ok(AssetBalance {
132            asset,
133            balance,
134            time_exchange,
135        })
136    }
137
138    pub fn order_snapshot(
139        &self,
140        order: UnindexedOrderSnapshot,
141    ) -> Result<OrderSnapshot, IndexError> {
142        let Order {
143            key,
144            side,
145            price,
146            quantity,
147            kind,
148            time_in_force,
149            state,
150        } = order;
151
152        let key = self.order_key(key)?;
153        let state = self.order_state(state)?;
154
155        Ok(Order {
156            key,
157            side,
158            price,
159            quantity,
160            kind,
161            time_in_force,
162            state,
163        })
164    }
165
166    pub fn order_response_cancel(
167        &self,
168        response: OrderResponseCancel<ExchangeId, AssetNameExchange, InstrumentNameExchange>,
169    ) -> Result<OrderResponseCancel, IndexError> {
170        let OrderResponseCancel { key, state } = response;
171
172        Ok(OrderResponseCancel {
173            key: self.order_key(key)?,
174            state: match state {
175                Ok(cancelled) => Ok(cancelled),
176                Err(error) => Err(self.order_error(error)?),
177            },
178        })
179    }
180
181    pub fn order_key(&self, key: UnindexedOrderKey) -> Result<OrderKey, IndexError> {
182        let UnindexedOrderKey {
183            exchange,
184            instrument,
185            strategy,
186            cid,
187        } = key;
188
189        Ok(OrderKey {
190            exchange: self.map.find_exchange_index(exchange)?,
191            instrument: self.map.find_instrument_index(&instrument)?,
192            strategy,
193            cid,
194        })
195    }
196
197    /// Index an [`UnindexedOrderState`] to an [`OrderState`].
198    ///
199    /// Used by `ExecutionManager` to index `open_order` responses.
200    pub fn order_state(&self, state: UnindexedOrderState) -> Result<OrderState, IndexError> {
201        Ok(match state {
202            UnindexedOrderState::Active(active) => OrderState::Active(active),
203            UnindexedOrderState::Inactive(inactive) => match inactive {
204                InactiveOrderState::OpenFailed(failed) => match failed {
205                    OrderError::Rejected(rejected) => {
206                        OrderState::inactive(OrderError::Rejected(self.api_error(rejected)?))
207                    }
208                    OrderError::Connectivity(error) => {
209                        OrderState::inactive(OrderError::Connectivity(error))
210                    }
211                    OrderError::UnsupportedOrderType(msg) => {
212                        OrderState::inactive(OrderError::UnsupportedOrderType(msg))
213                    }
214                },
215                InactiveOrderState::Cancelled(cancelled) => OrderState::inactive(cancelled),
216                InactiveOrderState::FullyFilled(filled) => OrderState::fully_filled(filled),
217                InactiveOrderState::Expired(expired) => OrderState::expired(expired),
218            },
219        })
220    }
221
222    pub fn api_error(&self, error: UnindexedApiError) -> Result<ApiError, IndexError> {
223        Ok(match error {
224            UnindexedApiError::RateLimit => ApiError::RateLimit,
225            UnindexedApiError::Unauthenticated(msg) => ApiError::Unauthenticated(msg),
226            UnindexedApiError::AssetInvalid(asset, value) => {
227                ApiError::AssetInvalid(self.map.find_asset_index(&asset)?, value)
228            }
229            UnindexedApiError::InstrumentInvalid(instrument, value) => {
230                ApiError::InstrumentInvalid(self.map.find_instrument_index(&instrument)?, value)
231            }
232            UnindexedApiError::BalanceInsufficient(asset, value) => {
233                ApiError::BalanceInsufficient(self.map.find_asset_index(&asset)?, value)
234            }
235            UnindexedApiError::OrderRejected(reason) => ApiError::OrderRejected(reason),
236            UnindexedApiError::OrderAlreadyCancelled => ApiError::OrderAlreadyCancelled,
237            UnindexedApiError::OrderAlreadyFullyFilled => ApiError::OrderAlreadyFullyFilled,
238        })
239    }
240
241    pub fn order_request<Kind>(
242        &self,
243        order: &OrderEvent<Kind, ExchangeIndex, InstrumentIndex>,
244    ) -> Result<OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>, KeyError>
245    where
246        Kind: Clone,
247    {
248        let OrderEvent {
249            key:
250                OrderKey {
251                    exchange,
252                    instrument,
253                    strategy,
254                    cid,
255                },
256            state,
257        } = order;
258
259        let exchange = self.map.find_exchange_id(*exchange)?;
260        let instrument = self.map.find_instrument_name_exchange(*instrument)?;
261
262        Ok(OrderEvent {
263            key: OrderKey {
264                exchange,
265                instrument,
266                strategy: strategy.clone(),
267                cid: cid.clone(),
268            },
269            state: state.clone(),
270        })
271    }
272
273    pub fn order_error(&self, error: UnindexedOrderError) -> Result<OrderError, IndexError> {
274        Ok(match error {
275            UnindexedOrderError::Connectivity(error) => OrderError::Connectivity(error),
276            UnindexedOrderError::Rejected(error) => OrderError::Rejected(self.api_error(error)?),
277            UnindexedOrderError::UnsupportedOrderType(msg) => OrderError::UnsupportedOrderType(msg),
278        })
279    }
280
281    pub fn client_error(&self, error: UnindexedClientError) -> Result<ClientError, IndexError> {
282        Ok(match error {
283            UnindexedClientError::Connectivity(error) => ClientError::Connectivity(error),
284            UnindexedClientError::Api(error) => ClientError::Api(self.api_error(error)?),
285            UnindexedClientError::TaskFailed(value) => ClientError::TaskFailed(value),
286            UnindexedClientError::Internal(value) => ClientError::Internal(value),
287            UnindexedClientError::Truncated { limit } => ClientError::Truncated { limit },
288            UnindexedClientError::TruncatedSnapshot { limit } => {
289                ClientError::TruncatedSnapshot { limit }
290            }
291        })
292    }
293
294    /// Index a trade, converting fee asset and computing `fees_quote`.
295    ///
296    /// Computes `fees_quote` based on fee asset relationship to instrument:
297    /// - Fee in quote asset: `fees_quote = Some(fees)`
298    /// - Fee in base asset: `fees_quote = Some(fees * price)`
299    /// - Fee in third-party asset (e.g., BNB): `fees_quote = None`
300    ///
301    /// # Errors
302    /// Returns `IndexError` if fee asset is not in the map. Some integrations use
303    /// "UNKNOWN" as a placeholder when fee data is unavailable (e.g., IBKR `fetch_trades`,
304    /// Binance when API omits `commission_asset`). These trades will fail indexing.
305    pub fn trade(
306        &self,
307        trade: Trade<AssetNameExchange, InstrumentNameExchange>,
308    ) -> Result<Trade<AssetIndex, InstrumentIndex>, IndexError> {
309        let Trade {
310            id,
311            order_id,
312            instrument,
313            strategy,
314            time_exchange,
315            side,
316            price: trade_price,
317            quantity,
318            fees,
319        } = trade;
320
321        let instrument_index = self.map.find_instrument_index(&instrument)?;
322        let fee_asset_index = self.map.find_asset_index(&fees.asset)?;
323
324        // Compute fees_quote based on fee asset relationship to instrument
325        let fees_quote = self
326            .map
327            .instruments
328            .get_index(instrument_index.index())
329            .and_then(|instr| {
330                if fee_asset_index == instr.underlying.quote {
331                    // Fee is in quote asset — no conversion needed
332                    Some(fees.fees)
333                } else if fee_asset_index == instr.underlying.base {
334                    // Fee is in base asset — convert using trade price
335                    Some(fees.fees * trade_price)
336                } else {
337                    // Fee is in third-party asset (e.g., BNB) — needs external price
338                    None
339                }
340            });
341
342        Ok(Trade {
343            id,
344            order_id,
345            instrument: instrument_index,
346            strategy,
347            time_exchange,
348            side,
349            price: trade_price,
350            quantity,
351            fees: AssetFees {
352                asset: fee_asset_index,
353                fees: fees.fees,
354                fees_quote,
355            },
356        })
357    }
358}