1use crate::{
2 AccountEvent, AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot,
3 UnindexedAccountEvent, UnindexedAccountSnapshot,
4 balance::AssetBalance,
5 error::{
6 ApiError, ClientError, KeyError, OrderError, UnindexedApiError, UnindexedClientError,
7 UnindexedOrderError,
8 },
9 map::ExecutionInstrumentMap,
10 order::{
11 Order, OrderEvent, OrderKey, OrderSnapshot, UnindexedOrderKey, UnindexedOrderSnapshot,
12 request::OrderResponseCancel,
13 state::{InactiveOrderState, OrderState, UnindexedOrderState},
14 },
15 trade::{AssetFees, Trade},
16};
17use derive_more::Constructor;
18use rustrade_instrument::{
19 asset::{AssetIndex, name::AssetNameExchange},
20 exchange::{ExchangeId, ExchangeIndex},
21 index::error::IndexError,
22 instrument::{InstrumentIndex, name::InstrumentNameExchange},
23};
24use rustrade_integration::{
25 collection::snapshot::Snapshot,
26 stream::ext::indexed::{IndexedStream, Indexer},
27};
28use std::sync::Arc;
29
30pub type IndexedAccountStream<St> = IndexedStream<St, AccountEventIndexer>;
31
32#[derive(Debug, Clone, Constructor)]
33pub struct AccountEventIndexer {
34 pub map: Arc<ExecutionInstrumentMap>,
35}
36
37impl Indexer for AccountEventIndexer {
38 type Unindexed = UnindexedAccountEvent;
39 type Indexed = AccountEvent;
40
41 fn index(&self, item: Self::Unindexed) -> Result<Self::Indexed, IndexError> {
42 self.account_event(item)
43 }
44}
45
46impl AccountEventIndexer {
47 pub fn account_event(&self, event: UnindexedAccountEvent) -> Result<AccountEvent, IndexError> {
48 let UnindexedAccountEvent { exchange, kind } = event;
49
50 let exchange = self.map.find_exchange_index(exchange)?;
51
52 let kind = match kind {
53 AccountEventKind::Snapshot(snapshot) => {
54 AccountEventKind::Snapshot(self.snapshot(snapshot)?)
55 }
56 AccountEventKind::BalanceSnapshot(snapshot) => {
57 AccountEventKind::BalanceSnapshot(self.asset_balance(snapshot.0).map(Snapshot)?)
58 }
59 AccountEventKind::OrderSnapshot(snapshot) => {
60 AccountEventKind::OrderSnapshot(self.order_snapshot(snapshot.0).map(Snapshot)?)
61 }
62 AccountEventKind::OrderCancelled(response) => {
63 AccountEventKind::OrderCancelled(self.order_response_cancel(response)?)
64 }
65 AccountEventKind::Trade(trade) => AccountEventKind::Trade(self.trade(trade)?),
66 AccountEventKind::StreamError(msg) => AccountEventKind::StreamError(msg),
67 };
68
69 Ok(AccountEvent { exchange, kind })
70 }
71
72 pub fn snapshot(
73 &self,
74 snapshot: UnindexedAccountSnapshot,
75 ) -> Result<AccountSnapshot, IndexError> {
76 let UnindexedAccountSnapshot {
77 exchange,
78 balances,
79 instruments,
80 } = snapshot;
81
82 let exchange = self.map.find_exchange_index(exchange)?;
83
84 let balances = balances
85 .into_iter()
86 .map(|balance| self.asset_balance(balance))
87 .collect::<Result<Vec<_>, _>>()?;
88
89 let instruments = instruments
90 .into_iter()
91 .map(|snapshot| {
92 let InstrumentAccountSnapshot {
93 instrument,
94 orders,
95 position,
96 } = snapshot;
97
98 let instrument = self.map.find_instrument_index(&instrument)?;
99
100 let orders = orders
101 .into_iter()
102 .map(|order| self.order_snapshot(order))
103 .collect::<Result<Vec<_>, _>>()?;
104
105 Ok(InstrumentAccountSnapshot {
106 instrument,
107 orders,
108 position,
109 })
110 })
111 .collect::<Result<Vec<_>, _>>()?;
112
113 Ok(AccountSnapshot {
114 exchange,
115 balances,
116 instruments,
117 })
118 }
119
120 pub fn asset_balance(
121 &self,
122 balance: AssetBalance<AssetNameExchange>,
123 ) -> Result<AssetBalance<AssetIndex>, IndexError> {
124 let AssetBalance {
125 asset,
126 balance,
127 time_exchange,
128 } = balance;
129 let asset = self.map.find_asset_index(&asset)?;
130
131 Ok(AssetBalance {
132 asset,
133 balance,
134 time_exchange,
135 })
136 }
137
138 pub fn order_snapshot(
139 &self,
140 order: UnindexedOrderSnapshot,
141 ) -> Result<OrderSnapshot, IndexError> {
142 let Order {
143 key,
144 side,
145 price,
146 quantity,
147 kind,
148 time_in_force,
149 state,
150 } = order;
151
152 let key = self.order_key(key)?;
153 let state = self.order_state(state)?;
154
155 Ok(Order {
156 key,
157 side,
158 price,
159 quantity,
160 kind,
161 time_in_force,
162 state,
163 })
164 }
165
166 pub fn order_response_cancel(
167 &self,
168 response: OrderResponseCancel<ExchangeId, AssetNameExchange, InstrumentNameExchange>,
169 ) -> Result<OrderResponseCancel, IndexError> {
170 let OrderResponseCancel { key, state } = response;
171
172 Ok(OrderResponseCancel {
173 key: self.order_key(key)?,
174 state: match state {
175 Ok(cancelled) => Ok(cancelled),
176 Err(error) => Err(self.order_error(error)?),
177 },
178 })
179 }
180
181 pub fn order_key(&self, key: UnindexedOrderKey) -> Result<OrderKey, IndexError> {
182 let UnindexedOrderKey {
183 exchange,
184 instrument,
185 strategy,
186 cid,
187 } = key;
188
189 Ok(OrderKey {
190 exchange: self.map.find_exchange_index(exchange)?,
191 instrument: self.map.find_instrument_index(&instrument)?,
192 strategy,
193 cid,
194 })
195 }
196
197 pub fn order_state(&self, state: UnindexedOrderState) -> Result<OrderState, IndexError> {
201 Ok(match state {
202 UnindexedOrderState::Active(active) => OrderState::Active(active),
203 UnindexedOrderState::Inactive(inactive) => match inactive {
204 InactiveOrderState::OpenFailed(failed) => match failed {
205 OrderError::Rejected(rejected) => {
206 OrderState::inactive(OrderError::Rejected(self.api_error(rejected)?))
207 }
208 OrderError::Connectivity(error) => {
209 OrderState::inactive(OrderError::Connectivity(error))
210 }
211 OrderError::UnsupportedOrderType(msg) => {
212 OrderState::inactive(OrderError::UnsupportedOrderType(msg))
213 }
214 },
215 InactiveOrderState::Cancelled(cancelled) => OrderState::inactive(cancelled),
216 InactiveOrderState::FullyFilled(filled) => OrderState::fully_filled(filled),
217 InactiveOrderState::Expired(expired) => OrderState::expired(expired),
218 },
219 })
220 }
221
222 pub fn api_error(&self, error: UnindexedApiError) -> Result<ApiError, IndexError> {
223 Ok(match error {
224 UnindexedApiError::RateLimit => ApiError::RateLimit,
225 UnindexedApiError::Unauthenticated(msg) => ApiError::Unauthenticated(msg),
226 UnindexedApiError::AssetInvalid(asset, value) => {
227 ApiError::AssetInvalid(self.map.find_asset_index(&asset)?, value)
228 }
229 UnindexedApiError::InstrumentInvalid(instrument, value) => {
230 ApiError::InstrumentInvalid(self.map.find_instrument_index(&instrument)?, value)
231 }
232 UnindexedApiError::BalanceInsufficient(asset, value) => {
233 ApiError::BalanceInsufficient(self.map.find_asset_index(&asset)?, value)
234 }
235 UnindexedApiError::OrderRejected(reason) => ApiError::OrderRejected(reason),
236 UnindexedApiError::OrderAlreadyCancelled => ApiError::OrderAlreadyCancelled,
237 UnindexedApiError::OrderAlreadyFullyFilled => ApiError::OrderAlreadyFullyFilled,
238 })
239 }
240
241 pub fn order_request<Kind>(
242 &self,
243 order: &OrderEvent<Kind, ExchangeIndex, InstrumentIndex>,
244 ) -> Result<OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>, KeyError>
245 where
246 Kind: Clone,
247 {
248 let OrderEvent {
249 key:
250 OrderKey {
251 exchange,
252 instrument,
253 strategy,
254 cid,
255 },
256 state,
257 } = order;
258
259 let exchange = self.map.find_exchange_id(*exchange)?;
260 let instrument = self.map.find_instrument_name_exchange(*instrument)?;
261
262 Ok(OrderEvent {
263 key: OrderKey {
264 exchange,
265 instrument,
266 strategy: strategy.clone(),
267 cid: cid.clone(),
268 },
269 state: state.clone(),
270 })
271 }
272
273 pub fn order_error(&self, error: UnindexedOrderError) -> Result<OrderError, IndexError> {
274 Ok(match error {
275 UnindexedOrderError::Connectivity(error) => OrderError::Connectivity(error),
276 UnindexedOrderError::Rejected(error) => OrderError::Rejected(self.api_error(error)?),
277 UnindexedOrderError::UnsupportedOrderType(msg) => OrderError::UnsupportedOrderType(msg),
278 })
279 }
280
281 pub fn client_error(&self, error: UnindexedClientError) -> Result<ClientError, IndexError> {
282 Ok(match error {
283 UnindexedClientError::Connectivity(error) => ClientError::Connectivity(error),
284 UnindexedClientError::Api(error) => ClientError::Api(self.api_error(error)?),
285 UnindexedClientError::TaskFailed(value) => ClientError::TaskFailed(value),
286 UnindexedClientError::Internal(value) => ClientError::Internal(value),
287 UnindexedClientError::Truncated { limit } => ClientError::Truncated { limit },
288 UnindexedClientError::TruncatedSnapshot { limit } => {
289 ClientError::TruncatedSnapshot { limit }
290 }
291 })
292 }
293
294 pub fn trade(
306 &self,
307 trade: Trade<AssetNameExchange, InstrumentNameExchange>,
308 ) -> Result<Trade<AssetIndex, InstrumentIndex>, IndexError> {
309 let Trade {
310 id,
311 order_id,
312 instrument,
313 strategy,
314 time_exchange,
315 side,
316 price: trade_price,
317 quantity,
318 fees,
319 } = trade;
320
321 let instrument_index = self.map.find_instrument_index(&instrument)?;
322 let fee_asset_index = self.map.find_asset_index(&fees.asset)?;
323
324 let fees_quote = self
326 .map
327 .instruments
328 .get_index(instrument_index.index())
329 .and_then(|instr| {
330 if fee_asset_index == instr.underlying.quote {
331 Some(fees.fees)
333 } else if fee_asset_index == instr.underlying.base {
334 Some(fees.fees * trade_price)
336 } else {
337 None
339 }
340 });
341
342 Ok(Trade {
343 id,
344 order_id,
345 instrument: instrument_index,
346 strategy,
347 time_exchange,
348 side,
349 price: trade_price,
350 quantity,
351 fees: AssetFees {
352 asset: fee_asset_index,
353 fees: fees.fees,
354 fees_quote,
355 },
356 })
357 }
358}