rustrade_data/transformer/
stateless.rs1use super::ExchangeTransformer;
2use crate::{
3 Identifier,
4 error::DataError,
5 event::{MarketEvent, MarketIter},
6 exchange::Connector,
7 subscription::{Map, SubscriptionKind},
8};
9use rustrade_instrument::exchange::ExchangeId;
10use rustrade_integration::{
11 Transformer, protocol::websocket::WsMessage, subscription::SubscriptionId,
12};
13use serde::Deserialize;
14use std::marker::PhantomData;
15use tokio::sync::mpsc;
16
17#[derive(Clone, Eq, PartialEq, Debug)]
22pub struct StatelessTransformer<Exchange, InstrumentKey, Kind, Input> {
23 instrument_map: Map<InstrumentKey>,
24 phantom: PhantomData<(Exchange, Kind, Input)>,
25}
26
27impl<Exchange, InstrumentKey, Kind, Input> ExchangeTransformer<Exchange, InstrumentKey, Kind>
28 for StatelessTransformer<Exchange, InstrumentKey, Kind, Input>
29where
30 Exchange: Connector + Send,
31 InstrumentKey: Clone + Send + Sync,
32 Kind: SubscriptionKind + Send,
33 Kind::Event: Sync,
34 Input: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
35 MarketIter<InstrumentKey, Kind::Event>: From<(ExchangeId, InstrumentKey, Input)>,
36{
37 async fn init(
38 instrument_map: Map<InstrumentKey>,
39 _: &[MarketEvent<InstrumentKey, Kind::Event>],
40 _: mpsc::UnboundedSender<WsMessage>,
41 ) -> Result<Self, DataError> {
42 Ok(Self {
43 instrument_map,
44 phantom: PhantomData,
45 })
46 }
47}
48
49impl<Exchange, InstrumentKey, Kind, Input> Transformer
50 for StatelessTransformer<Exchange, InstrumentKey, Kind, Input>
51where
52 Exchange: Connector,
53 InstrumentKey: Clone,
54 Kind: SubscriptionKind,
55 Input: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
56 MarketIter<InstrumentKey, Kind::Event>: From<(ExchangeId, InstrumentKey, Input)>,
57{
58 type Error = DataError;
59 type Input = Input;
60 type Output = MarketEvent<InstrumentKey, Kind::Event>;
61 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
62
63 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
64 let subscription_id = match input.id() {
66 Some(subscription_id) => subscription_id,
67 None => return vec![],
68 };
69
70 match self.instrument_map.find(&subscription_id) {
72 Ok(instrument) => {
73 MarketIter::<InstrumentKey, Kind::Event>::from((
74 Exchange::ID,
75 instrument.clone(),
76 input,
77 ))
78 .0
79 }
80 Err(unidentifiable) => vec![Err(DataError::from(unidentifiable))],
81 }
82 }
83}