1use crate::{
2 AccountEvent, AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot,
3 UnindexedAccountEvent, UnindexedAccountSnapshot,
4 balance::AssetBalance,
5 error::{
6 ApiError, ClientError, KeyError, OrderError, UnindexedApiError, UnindexedClientError,
7 UnindexedOrderError,
8 },
9 map::ExecutionInstrumentMap,
10 order::{
11 Order, OrderEvent, OrderKey, OrderSnapshot, UnindexedOrderKey, UnindexedOrderSnapshot,
12 request::OrderResponseCancel,
13 state::{InactiveOrderState, OrderState, UnindexedOrderState},
14 },
15 trade::{AssetFees, Trade},
16};
17use derive_more::Constructor;
18use rustrade_instrument::{
19 asset::{AssetIndex, name::AssetNameExchange},
20 exchange::{ExchangeId, ExchangeIndex},
21 index::error::IndexError,
22 instrument::{InstrumentIndex, name::InstrumentNameExchange},
23};
24use rustrade_integration::{
25 collection::snapshot::Snapshot,
26 stream::ext::indexed::{IndexedStream, Indexer},
27};
28use std::sync::Arc;
29
30pub type IndexedAccountStream<St> = IndexedStream<St, AccountEventIndexer>;
31
32#[derive(Debug, Clone, Constructor)]
33pub struct AccountEventIndexer {
34 pub map: Arc<ExecutionInstrumentMap>,
35}
36
37impl Indexer for AccountEventIndexer {
38 type Unindexed = UnindexedAccountEvent;
39 type Indexed = AccountEvent;
40
41 fn index(&self, item: Self::Unindexed) -> Result<Self::Indexed, IndexError> {
42 self.account_event(item)
43 }
44}
45
46impl AccountEventIndexer {
47 pub fn account_event(&self, event: UnindexedAccountEvent) -> Result<AccountEvent, IndexError> {
48 let UnindexedAccountEvent { exchange, kind } = event;
49
50 let exchange = self.map.find_exchange_index(exchange)?;
51
52 let kind = match kind {
53 AccountEventKind::Snapshot(snapshot) => {
54 AccountEventKind::Snapshot(self.snapshot(snapshot)?)
55 }
56 AccountEventKind::BalanceSnapshot(snapshot) => {
57 AccountEventKind::BalanceSnapshot(self.asset_balance(snapshot.0).map(Snapshot)?)
58 }
59 AccountEventKind::OrderSnapshot(snapshot) => {
60 AccountEventKind::OrderSnapshot(self.order_snapshot(snapshot.0).map(Snapshot)?)
61 }
62 AccountEventKind::OrderCancelled(response) => {
63 AccountEventKind::OrderCancelled(self.order_response_cancel(response)?)
64 }
65 AccountEventKind::Trade(trade) => AccountEventKind::Trade(self.trade(trade)?),
66 AccountEventKind::StreamError(msg) => AccountEventKind::StreamError(msg),
67 };
68
69 Ok(AccountEvent { exchange, kind })
70 }
71
72 pub fn snapshot(
73 &self,
74 snapshot: UnindexedAccountSnapshot,
75 ) -> Result<AccountSnapshot, IndexError> {
76 let UnindexedAccountSnapshot {
77 exchange,
78 balances,
79 instruments,
80 } = snapshot;
81
82 let exchange = self.map.find_exchange_index(exchange)?;
83
84 let balances = balances
85 .into_iter()
86 .map(|balance| self.asset_balance(balance))
87 .collect::<Result<Vec<_>, _>>()?;
88
89 let instruments = instruments
90 .into_iter()
91 .map(|snapshot| {
92 let InstrumentAccountSnapshot {
93 instrument,
94 orders,
95 position,
96 } = snapshot;
97
98 let instrument = self.map.find_instrument_index(&instrument)?;
99
100 let orders = orders
101 .into_iter()
102 .map(|order| self.order_snapshot(order))
103 .collect::<Result<Vec<_>, _>>()?;
104
105 Ok(InstrumentAccountSnapshot {
106 instrument,
107 orders,
108 position,
109 })
110 })
111 .collect::<Result<Vec<_>, _>>()?;
112
113 Ok(AccountSnapshot {
114 exchange,
115 balances,
116 instruments,
117 })
118 }
119
120 pub fn asset_balance(
121 &self,
122 balance: AssetBalance<AssetNameExchange>,
123 ) -> Result<AssetBalance<AssetIndex>, IndexError> {
124 let AssetBalance {
125 asset,
126 balance,
127 time_exchange,
128 } = balance;
129 let asset = self.map.find_asset_index(&asset)?;
130
131 Ok(AssetBalance {
132 asset,
133 balance,
134 time_exchange,
135 })
136 }
137
138 pub fn order_snapshot(
139 &self,
140 order: UnindexedOrderSnapshot,
141 ) -> Result<OrderSnapshot, IndexError> {
142 let Order {
143 key,
144 side,
145 price,
146 quantity,
147 kind,
148 time_in_force,
149 state,
150 } = order;
151
152 let key = self.order_key(key)?;
153 let state = self.order_state(state)?;
154
155 Ok(Order {
156 key,
157 side,
158 price,
159 quantity,
160 kind,
161 time_in_force,
162 state,
163 })
164 }
165
166 pub fn order_response_cancel(
167 &self,
168 response: OrderResponseCancel<ExchangeId, AssetNameExchange, InstrumentNameExchange>,
169 ) -> Result<OrderResponseCancel, IndexError> {
170 let OrderResponseCancel { key, state } = response;
171
172 Ok(OrderResponseCancel {
173 key: self.order_key(key)?,
174 state: match state {
175 Ok(cancelled) => Ok(cancelled),
176 Err(error) => Err(self.order_error(error)?),
177 },
178 })
179 }
180
181 pub fn order_key(&self, key: UnindexedOrderKey) -> Result<OrderKey, IndexError> {
182 let UnindexedOrderKey {
183 exchange,
184 instrument,
185 strategy,
186 cid,
187 } = key;
188
189 Ok(OrderKey {
190 exchange: self.map.find_exchange_index(exchange)?,
191 instrument: self.map.find_instrument_index(&instrument)?,
192 strategy,
193 cid,
194 })
195 }
196
197 pub fn order_state(&self, state: UnindexedOrderState) -> Result<OrderState, IndexError> {
201 Ok(match state {
202 UnindexedOrderState::Active(active) => OrderState::Active(active),
203 UnindexedOrderState::Inactive(inactive) => match inactive {
204 InactiveOrderState::OpenFailed(failed) => match failed {
205 OrderError::Rejected(rejected) => {
206 OrderState::inactive(OrderError::Rejected(self.api_error(rejected)?))
207 }
208 OrderError::Connectivity(error) => {
209 OrderState::inactive(OrderError::Connectivity(error))
210 }
211 },
212 InactiveOrderState::Cancelled(cancelled) => OrderState::inactive(cancelled),
213 InactiveOrderState::FullyFilled(filled) => OrderState::fully_filled(filled),
214 InactiveOrderState::Expired(expired) => OrderState::expired(expired),
215 },
216 })
217 }
218
219 pub fn api_error(&self, error: UnindexedApiError) -> Result<ApiError, IndexError> {
220 Ok(match error {
221 UnindexedApiError::RateLimit => ApiError::RateLimit,
222 UnindexedApiError::Unauthenticated(msg) => ApiError::Unauthenticated(msg),
223 UnindexedApiError::AssetInvalid(asset, value) => {
224 ApiError::AssetInvalid(self.map.find_asset_index(&asset)?, value)
225 }
226 UnindexedApiError::InstrumentInvalid(instrument, value) => {
227 ApiError::InstrumentInvalid(self.map.find_instrument_index(&instrument)?, value)
228 }
229 UnindexedApiError::BalanceInsufficient(asset, value) => {
230 ApiError::BalanceInsufficient(self.map.find_asset_index(&asset)?, value)
231 }
232 UnindexedApiError::OrderRejected(reason) => ApiError::OrderRejected(reason),
233 UnindexedApiError::OrderAlreadyCancelled => ApiError::OrderAlreadyCancelled,
234 UnindexedApiError::OrderAlreadyFullyFilled => ApiError::OrderAlreadyFullyFilled,
235 })
236 }
237
238 pub fn order_request<Kind>(
239 &self,
240 order: &OrderEvent<Kind, ExchangeIndex, InstrumentIndex>,
241 ) -> Result<OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>, KeyError>
242 where
243 Kind: Clone,
244 {
245 let OrderEvent {
246 key:
247 OrderKey {
248 exchange,
249 instrument,
250 strategy,
251 cid,
252 },
253 state,
254 } = order;
255
256 let exchange = self.map.find_exchange_id(*exchange)?;
257 let instrument = self.map.find_instrument_name_exchange(*instrument)?;
258
259 Ok(OrderEvent {
260 key: OrderKey {
261 exchange,
262 instrument,
263 strategy: strategy.clone(),
264 cid: cid.clone(),
265 },
266 state: state.clone(),
267 })
268 }
269
270 pub fn order_error(&self, error: UnindexedOrderError) -> Result<OrderError, IndexError> {
271 Ok(match error {
272 UnindexedOrderError::Connectivity(error) => OrderError::Connectivity(error),
273 UnindexedOrderError::Rejected(error) => OrderError::Rejected(self.api_error(error)?),
274 })
275 }
276
277 pub fn client_error(&self, error: UnindexedClientError) -> Result<ClientError, IndexError> {
278 Ok(match error {
279 UnindexedClientError::Connectivity(error) => ClientError::Connectivity(error),
280 UnindexedClientError::Api(error) => ClientError::Api(self.api_error(error)?),
281 UnindexedClientError::TaskFailed(value) => ClientError::TaskFailed(value),
282 UnindexedClientError::Internal(value) => ClientError::Internal(value),
283 UnindexedClientError::Truncated { limit } => ClientError::Truncated { limit },
284 UnindexedClientError::TruncatedSnapshot { limit } => {
285 ClientError::TruncatedSnapshot { limit }
286 }
287 })
288 }
289
290 pub fn trade(
302 &self,
303 trade: Trade<AssetNameExchange, InstrumentNameExchange>,
304 ) -> Result<Trade<AssetIndex, InstrumentIndex>, IndexError> {
305 let Trade {
306 id,
307 order_id,
308 instrument,
309 strategy,
310 time_exchange,
311 side,
312 price: trade_price,
313 quantity,
314 fees,
315 } = trade;
316
317 let instrument_index = self.map.find_instrument_index(&instrument)?;
318 let fee_asset_index = self.map.find_asset_index(&fees.asset)?;
319
320 let fees_quote = self
322 .map
323 .instruments
324 .get_index(instrument_index.index())
325 .and_then(|instr| {
326 if fee_asset_index == instr.underlying.quote {
327 Some(fees.fees)
329 } else if fee_asset_index == instr.underlying.base {
330 Some(fees.fees * trade_price)
332 } else {
333 None
335 }
336 });
337
338 Ok(Trade {
339 id,
340 order_id,
341 instrument: instrument_index,
342 strategy,
343 time_exchange,
344 side,
345 price: trade_price,
346 quantity,
347 fees: AssetFees {
348 asset: fee_asset_index,
349 fees: fees.fees,
350 fees_quote,
351 },
352 })
353 }
354}