Skip to main content

rustrade_data/transformer/
stateless.rs

1use super::ExchangeTransformer;
2use crate::{
3    Identifier,
4    error::DataError,
5    event::{MarketEvent, MarketIter},
6    exchange::Connector,
7    subscription::{Map, SubscriptionKind},
8};
9use rustrade_instrument::exchange::ExchangeId;
10use rustrade_integration::{
11    Transformer, protocol::websocket::WsMessage, subscription::SubscriptionId,
12};
13use serde::Deserialize;
14use std::marker::PhantomData;
15use tokio::sync::mpsc;
16
17/// Standard generic stateless [`ExchangeTransformer`] to translate exchange specific types into
18/// normalised Barter types. Often used with
19/// [`PublicTrades`](crate::subscription::trade::PublicTrades) or
20/// [`OrderBooksL1`](crate::subscription::book::OrderBooksL1) streams.
21#[derive(Clone, Eq, PartialEq, Debug)]
22pub struct StatelessTransformer<Exchange, InstrumentKey, Kind, Input> {
23    instrument_map: Map<InstrumentKey>,
24    phantom: PhantomData<(Exchange, Kind, Input)>,
25}
26
27impl<Exchange, InstrumentKey, Kind, Input> ExchangeTransformer<Exchange, InstrumentKey, Kind>
28    for StatelessTransformer<Exchange, InstrumentKey, Kind, Input>
29where
30    Exchange: Connector + Send,
31    InstrumentKey: Clone + Send + Sync,
32    Kind: SubscriptionKind + Send,
33    Kind::Event: Sync,
34    Input: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
35    MarketIter<InstrumentKey, Kind::Event>: From<(ExchangeId, InstrumentKey, Input)>,
36{
37    async fn init(
38        instrument_map: Map<InstrumentKey>,
39        _: &[MarketEvent<InstrumentKey, Kind::Event>],
40        _: mpsc::UnboundedSender<WsMessage>,
41    ) -> Result<Self, DataError> {
42        Ok(Self {
43            instrument_map,
44            phantom: PhantomData,
45        })
46    }
47}
48
49impl<Exchange, InstrumentKey, Kind, Input> Transformer
50    for StatelessTransformer<Exchange, InstrumentKey, Kind, Input>
51where
52    Exchange: Connector,
53    InstrumentKey: Clone,
54    Kind: SubscriptionKind,
55    Input: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
56    MarketIter<InstrumentKey, Kind::Event>: From<(ExchangeId, InstrumentKey, Input)>,
57{
58    type Error = DataError;
59    type Input = Input;
60    type Output = MarketEvent<InstrumentKey, Kind::Event>;
61    type OutputIter = Vec<Result<Self::Output, Self::Error>>;
62
63    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
64        // Determine if the message has an identifiable SubscriptionId
65        let subscription_id = match input.id() {
66            Some(subscription_id) => subscription_id,
67            None => return vec![],
68        };
69
70        // Find Instrument associated with Input and transform
71        match self.instrument_map.find(&subscription_id) {
72            Ok(instrument) => {
73                MarketIter::<InstrumentKey, Kind::Event>::from((
74                    Exchange::ID,
75                    instrument.clone(),
76                    input,
77                ))
78                .0
79            }
80            Err(unidentifiable) => vec![Err(DataError::from(unidentifiable))],
81        }
82    }
83}