barter_data/transformer/
stateless.rs

1use super::ExchangeTransformer;
2use crate::{
3    Identifier,
4    error::DataError,
5    event::{MarketEvent, MarketIter},
6    exchange::Connector,
7    subscription::{Map, SubscriptionKind},
8};
9use async_trait::async_trait;
10use barter_instrument::exchange::ExchangeId;
11use barter_integration::{
12    Transformer, protocol::websocket::WsMessage, subscription::SubscriptionId,
13};
14use serde::Deserialize;
15use std::marker::PhantomData;
16use tokio::sync::mpsc;
17
18/// Standard generic stateless [`ExchangeTransformer`] to translate execution specific types into
19/// normalised Barter types. Often used with
20/// [`PublicTrades`](crate::subscription::trade::PublicTrades) or
21/// [`OrderBooksL1`](crate::subscription::book::OrderBooksL1) streams.
22#[derive(Clone, Eq, PartialEq, Debug)]
23pub struct StatelessTransformer<Exchange, InstrumentKey, Kind, Input> {
24    instrument_map: Map<InstrumentKey>,
25    phantom: PhantomData<(Exchange, Kind, Input)>,
26}
27
28#[async_trait]
29impl<Exchange, InstrumentKey, Kind, Input> ExchangeTransformer<Exchange, InstrumentKey, Kind>
30    for StatelessTransformer<Exchange, InstrumentKey, Kind, Input>
31where
32    Exchange: Connector + Send,
33    InstrumentKey: Clone + Send,
34    Kind: SubscriptionKind + Send,
35    Input: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
36    MarketIter<InstrumentKey, Kind::Event>: From<(ExchangeId, InstrumentKey, Input)>,
37{
38    async fn init(
39        instrument_map: Map<InstrumentKey>,
40        _: &[MarketEvent<InstrumentKey, Kind::Event>],
41        _: mpsc::UnboundedSender<WsMessage>,
42    ) -> Result<Self, DataError> {
43        Ok(Self {
44            instrument_map,
45            phantom: PhantomData,
46        })
47    }
48}
49
50impl<Exchange, InstrumentKey, Kind, Input> Transformer
51    for StatelessTransformer<Exchange, InstrumentKey, Kind, Input>
52where
53    Exchange: Connector,
54    InstrumentKey: Clone,
55    Kind: SubscriptionKind,
56    Input: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
57    MarketIter<InstrumentKey, Kind::Event>: From<(ExchangeId, InstrumentKey, Input)>,
58{
59    type Error = DataError;
60    type Input = Input;
61    type Output = MarketEvent<InstrumentKey, Kind::Event>;
62    type OutputIter = Vec<Result<Self::Output, Self::Error>>;
63
64    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
65        // Determine if the message has an identifiable SubscriptionId
66        let subscription_id = match input.id() {
67            Some(subscription_id) => subscription_id,
68            None => return vec![],
69        };
70
71        // Find Instrument associated with Input and transform
72        match self.instrument_map.find(&subscription_id) {
73            Ok(instrument) => {
74                MarketIter::<InstrumentKey, Kind::Event>::from((
75                    Exchange::ID,
76                    instrument.clone(),
77                    input,
78                ))
79                .0
80            }
81            Err(unidentifiable) => vec![Err(DataError::from(unidentifiable))],
82        }
83    }
84}