barter_data/transformer/
stateless.rs1use super::ExchangeTransformer;
2use crate::{
3 Identifier,
4 error::DataError,
5 event::{MarketEvent, MarketIter},
6 exchange::Connector,
7 subscription::{Map, SubscriptionKind},
8};
9use async_trait::async_trait;
10use barter_instrument::exchange::ExchangeId;
11use barter_integration::{
12 Transformer, protocol::websocket::WsMessage, subscription::SubscriptionId,
13};
14use serde::Deserialize;
15use std::marker::PhantomData;
16use tokio::sync::mpsc;
17
18#[derive(Clone, Eq, PartialEq, Debug)]
23pub struct StatelessTransformer<Exchange, InstrumentKey, Kind, Input> {
24 instrument_map: Map<InstrumentKey>,
25 phantom: PhantomData<(Exchange, Kind, Input)>,
26}
27
28#[async_trait]
29impl<Exchange, InstrumentKey, Kind, Input> ExchangeTransformer<Exchange, InstrumentKey, Kind>
30 for StatelessTransformer<Exchange, InstrumentKey, Kind, Input>
31where
32 Exchange: Connector + Send,
33 InstrumentKey: Clone + Send,
34 Kind: SubscriptionKind + Send,
35 Input: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
36 MarketIter<InstrumentKey, Kind::Event>: From<(ExchangeId, InstrumentKey, Input)>,
37{
38 async fn init(
39 instrument_map: Map<InstrumentKey>,
40 _: &[MarketEvent<InstrumentKey, Kind::Event>],
41 _: mpsc::UnboundedSender<WsMessage>,
42 ) -> Result<Self, DataError> {
43 Ok(Self {
44 instrument_map,
45 phantom: PhantomData,
46 })
47 }
48}
49
50impl<Exchange, InstrumentKey, Kind, Input> Transformer
51 for StatelessTransformer<Exchange, InstrumentKey, Kind, Input>
52where
53 Exchange: Connector,
54 InstrumentKey: Clone,
55 Kind: SubscriptionKind,
56 Input: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
57 MarketIter<InstrumentKey, Kind::Event>: From<(ExchangeId, InstrumentKey, Input)>,
58{
59 type Error = DataError;
60 type Input = Input;
61 type Output = MarketEvent<InstrumentKey, Kind::Event>;
62 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
63
64 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
65 let subscription_id = match input.id() {
67 Some(subscription_id) => subscription_id,
68 None => return vec![],
69 };
70
71 match self.instrument_map.find(&subscription_id) {
73 Ok(instrument) => {
74 MarketIter::<InstrumentKey, Kind::Event>::from((
75 Exchange::ID,
76 instrument.clone(),
77 input,
78 ))
79 .0
80 }
81 Err(unidentifiable) => vec![Err(DataError::from(unidentifiable))],
82 }
83 }
84}