1use crate::{
2 AccountEvent, AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot,
3 UnindexedAccountEvent, UnindexedAccountSnapshot,
4 balance::AssetBalance,
5 error::{
6 ApiError, ClientError, KeyError, OrderError, UnindexedApiError, UnindexedClientError,
7 UnindexedOrderError,
8 },
9 map::ExecutionInstrumentMap,
10 order::{
11 Order, OrderEvent, OrderKey, OrderSnapshot, UnindexedOrderKey, UnindexedOrderSnapshot,
12 request::OrderResponseCancel,
13 state::{InactiveOrderState, OrderState, UnindexedOrderState},
14 },
15 trade::Trade,
16};
17use barter_instrument::{
18 asset::{AssetIndex, QuoteAsset, name::AssetNameExchange},
19 exchange::{ExchangeId, ExchangeIndex},
20 index::error::IndexError,
21 instrument::{InstrumentIndex, name::InstrumentNameExchange},
22};
23use barter_integration::{
24 snapshot::Snapshot,
25 stream::indexed::{IndexedStream, Indexer},
26};
27use derive_more::Constructor;
28use std::sync::Arc;
29
30pub type IndexedAccountStream<St> = IndexedStream<AccountEventIndexer, St>;
31
32#[derive(Debug, Clone, Constructor)]
33pub struct AccountEventIndexer {
34 pub map: Arc<ExecutionInstrumentMap>,
35}
36
37impl Indexer for AccountEventIndexer {
38 type Unindexed = UnindexedAccountEvent;
39 type Indexed = AccountEvent;
40
41 fn index(&self, item: Self::Unindexed) -> Result<Self::Indexed, IndexError> {
42 self.account_event(item)
43 }
44}
45
46impl AccountEventIndexer {
47 pub fn account_event(&self, event: UnindexedAccountEvent) -> Result<AccountEvent, IndexError> {
48 let UnindexedAccountEvent { exchange, kind } = event;
49
50 let exchange = self.map.find_exchange_index(exchange)?;
51
52 let kind = match kind {
53 AccountEventKind::Snapshot(snapshot) => {
54 AccountEventKind::Snapshot(self.snapshot(snapshot)?)
55 }
56 AccountEventKind::BalanceSnapshot(snapshot) => {
57 AccountEventKind::BalanceSnapshot(self.asset_balance(snapshot.0).map(Snapshot)?)
58 }
59 AccountEventKind::OrderSnapshot(snapshot) => {
60 AccountEventKind::OrderSnapshot(self.order_snapshot(snapshot.0).map(Snapshot)?)
61 }
62 AccountEventKind::OrderCancelled(response) => {
63 AccountEventKind::OrderCancelled(self.order_response_cancel(response)?)
64 }
65 AccountEventKind::Trade(trade) => AccountEventKind::Trade(self.trade(trade)?),
66 };
67
68 Ok(AccountEvent { exchange, kind })
69 }
70
71 pub fn snapshot(
72 &self,
73 snapshot: UnindexedAccountSnapshot,
74 ) -> Result<AccountSnapshot, IndexError> {
75 let UnindexedAccountSnapshot {
76 exchange,
77 balances,
78 instruments,
79 } = snapshot;
80
81 let exchange = self.map.find_exchange_index(exchange)?;
82
83 let balances = balances
84 .into_iter()
85 .map(|balance| self.asset_balance(balance))
86 .collect::<Result<Vec<_>, _>>()?;
87
88 let instruments = instruments
89 .into_iter()
90 .map(|snapshot| {
91 let InstrumentAccountSnapshot { instrument, orders } = snapshot;
92
93 let instrument = self.map.find_instrument_index(&instrument)?;
94
95 let orders = orders
96 .into_iter()
97 .map(|order| self.order_snapshot(order))
98 .collect::<Result<Vec<_>, _>>()?;
99
100 Ok(InstrumentAccountSnapshot { instrument, orders })
101 })
102 .collect::<Result<Vec<_>, _>>()?;
103
104 Ok(AccountSnapshot {
105 exchange,
106 balances,
107 instruments,
108 })
109 }
110
111 pub fn asset_balance(
112 &self,
113 balance: AssetBalance<AssetNameExchange>,
114 ) -> Result<AssetBalance<AssetIndex>, IndexError> {
115 let AssetBalance {
116 asset,
117 balance,
118 time_exchange,
119 } = balance;
120 let asset = self.map.find_asset_index(&asset)?;
121
122 Ok(AssetBalance {
123 asset,
124 balance,
125 time_exchange,
126 })
127 }
128
129 pub fn order_snapshot(
130 &self,
131 order: UnindexedOrderSnapshot,
132 ) -> Result<OrderSnapshot, IndexError> {
133 let Order {
134 key,
135 side,
136 price,
137 quantity,
138 kind,
139 time_in_force,
140 state,
141 } = order;
142
143 let key = self.order_key(key)?;
144
145 let state = match state {
146 UnindexedOrderState::Active(active) => OrderState::Active(active),
147 UnindexedOrderState::Inactive(inactive) => match inactive {
148 InactiveOrderState::OpenFailed(failed) => match failed {
149 OrderError::Rejected(rejected) => {
150 OrderState::inactive(OrderError::Rejected(self.api_error(rejected)?))
151 }
152 OrderError::Connectivity(error) => {
153 OrderState::inactive(OrderError::Connectivity(error))
154 }
155 },
156 InactiveOrderState::Cancelled(cancelled) => OrderState::inactive(cancelled),
157 InactiveOrderState::FullyFilled => OrderState::fully_filled(),
158 InactiveOrderState::Expired => OrderState::expired(),
159 },
160 };
161
162 Ok(Order {
163 key,
164 side,
165 price,
166 quantity,
167 kind,
168 time_in_force,
169 state,
170 })
171 }
172
173 pub fn order_response_cancel(
174 &self,
175 response: OrderResponseCancel<ExchangeId, AssetNameExchange, InstrumentNameExchange>,
176 ) -> Result<OrderResponseCancel, IndexError> {
177 let OrderResponseCancel { key, state } = response;
178
179 Ok(OrderResponseCancel {
180 key: self.order_key(key)?,
181 state: match state {
182 Ok(cancelled) => Ok(cancelled),
183 Err(error) => Err(self.order_error(error)?),
184 },
185 })
186 }
187
188 pub fn order_key(&self, key: UnindexedOrderKey) -> Result<OrderKey, IndexError> {
189 let UnindexedOrderKey {
190 exchange,
191 instrument,
192 strategy,
193 cid,
194 } = key;
195
196 Ok(OrderKey {
197 exchange: self.map.find_exchange_index(exchange)?,
198 instrument: self.map.find_instrument_index(&instrument)?,
199 strategy,
200 cid,
201 })
202 }
203
204 pub fn api_error(&self, error: UnindexedApiError) -> Result<ApiError, IndexError> {
205 Ok(match error {
206 UnindexedApiError::RateLimit => ApiError::RateLimit,
207 UnindexedApiError::AssetInvalid(asset, value) => {
208 ApiError::AssetInvalid(self.map.find_asset_index(&asset)?, value)
209 }
210 UnindexedApiError::InstrumentInvalid(instrument, value) => {
211 ApiError::InstrumentInvalid(self.map.find_instrument_index(&instrument)?, value)
212 }
213 UnindexedApiError::BalanceInsufficient(asset, value) => {
214 ApiError::BalanceInsufficient(self.map.find_asset_index(&asset)?, value)
215 }
216 UnindexedApiError::OrderRejected(reason) => ApiError::OrderRejected(reason),
217 UnindexedApiError::OrderAlreadyCancelled => ApiError::OrderAlreadyCancelled,
218 UnindexedApiError::OrderAlreadyFullyFilled => ApiError::OrderAlreadyFullyFilled,
219 })
220 }
221
222 pub fn order_request<Kind>(
223 &self,
224 order: &OrderEvent<Kind, ExchangeIndex, InstrumentIndex>,
225 ) -> Result<OrderEvent<Kind, ExchangeId, &InstrumentNameExchange>, KeyError>
226 where
227 Kind: Clone,
228 {
229 let OrderEvent {
230 key:
231 OrderKey {
232 exchange,
233 instrument,
234 strategy,
235 cid,
236 },
237 state,
238 } = order;
239
240 let exchange = self.map.find_exchange_id(*exchange)?;
241 let instrument = self.map.find_instrument_name_exchange(*instrument)?;
242
243 Ok(OrderEvent {
244 key: OrderKey {
245 exchange,
246 instrument,
247 strategy: strategy.clone(),
248 cid: cid.clone(),
249 },
250 state: state.clone(),
251 })
252 }
253
254 pub fn order_error(&self, error: UnindexedOrderError) -> Result<OrderError, IndexError> {
255 Ok(match error {
256 UnindexedOrderError::Connectivity(error) => OrderError::Connectivity(error),
257 UnindexedOrderError::Rejected(error) => OrderError::Rejected(self.api_error(error)?),
258 })
259 }
260
261 pub fn client_error(&self, error: UnindexedClientError) -> Result<ClientError, IndexError> {
262 Ok(match error {
263 UnindexedClientError::Connectivity(error) => ClientError::Connectivity(error),
264 UnindexedClientError::Api(error) => ClientError::Api(self.api_error(error)?),
265 UnindexedClientError::AccountSnapshot(value) => ClientError::AccountSnapshot(value),
266 UnindexedClientError::AccountStream(value) => ClientError::AccountStream(value),
267 })
268 }
269
270 pub fn trade(
271 &self,
272 trade: Trade<QuoteAsset, InstrumentNameExchange>,
273 ) -> Result<Trade<QuoteAsset, InstrumentIndex>, IndexError> {
274 let Trade {
275 id,
276 order_id,
277 instrument,
278 strategy,
279 time_exchange,
280 side,
281 price,
282 quantity,
283 fees,
284 } = trade;
285
286 let instrument_index = self.map.find_instrument_index(&instrument)?;
287
288 Ok(Trade {
289 id,
290 order_id,
291 instrument: instrument_index,
292 strategy,
293 time_exchange,
294 side,
295 price,
296 quantity,
297 fees,
298 })
299 }
300}