Skip to main content

flowsurface_data/
stream.rs

1use exchange::{PushFrequency, Ticker, TickerInfo, Timeframe, adapter::StreamKind};
2use serde::{Deserialize, Serialize};
3
4#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
5pub enum PersistStreamKind {
6    Kline {
7        ticker: Ticker,
8        timeframe: Timeframe,
9    },
10    Depth(PersistDepth),
11    Trades {
12        ticker: Ticker,
13    },
14    /// Deprecated combined stream, kept for backward compatibility.
15    /// Will be converted to separate Depth and Trades on load.
16    DepthAndTrades(PersistDepth),
17}
18
19#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
20pub struct PersistDepth {
21    pub ticker: Ticker,
22    #[serde(default = "default_depth_aggr")]
23    pub depth_aggr: exchange::adapter::StreamTicksize,
24    #[serde(default = "default_push_freq")]
25    pub push_freq: PushFrequency,
26}
27
28impl From<StreamKind> for PersistStreamKind {
29    fn from(stream: StreamKind) -> Self {
30        match stream {
31            StreamKind::Kline {
32                ticker_info,
33                timeframe,
34            } => PersistStreamKind::Kline {
35                ticker: ticker_info.ticker,
36                timeframe,
37            },
38            StreamKind::Depth {
39                ticker_info,
40                depth_aggr,
41                push_freq,
42            } => PersistStreamKind::Depth(PersistDepth {
43                ticker: ticker_info.ticker,
44                depth_aggr,
45                push_freq,
46            }),
47            StreamKind::Trades { ticker_info } => PersistStreamKind::Trades {
48                ticker: ticker_info.ticker,
49            },
50        }
51    }
52}
53
54impl PersistStreamKind {
55    /// Try to convert into runtime StreamKind list. `resolver` should return Some(TickerInfo) for a ticker,
56    /// otherwise the conversion fails (so caller can trigger a refresh / fetch).
57    pub fn into_stream_kinds<F>(self, mut resolver: F) -> Result<Vec<StreamKind>, String>
58    where
59        F: FnMut(&Ticker) -> Option<TickerInfo>,
60    {
61        match self {
62            PersistStreamKind::Kline { ticker, timeframe } => resolver(&ticker)
63                .map(|ti| {
64                    vec![StreamKind::Kline {
65                        ticker_info: ti,
66                        timeframe,
67                    }]
68                })
69                .ok_or_else(|| format!("TickerInfo not found for {}", ticker)),
70            PersistStreamKind::Depth(d) => resolver(&d.ticker)
71                .map(|ti| {
72                    vec![StreamKind::Depth {
73                        ticker_info: ti,
74                        depth_aggr: d.depth_aggr,
75                        push_freq: d.push_freq,
76                    }]
77                })
78                .ok_or_else(|| format!("TickerInfo not found for {}", d.ticker)),
79            PersistStreamKind::Trades { ticker } => resolver(&ticker)
80                .map(|ti| vec![StreamKind::Trades { ticker_info: ti }])
81                .ok_or_else(|| format!("TickerInfo not found for {}", ticker)),
82            PersistStreamKind::DepthAndTrades(d) => resolver(&d.ticker)
83                .map(|ti| {
84                    vec![
85                        StreamKind::Depth {
86                            ticker_info: ti,
87                            depth_aggr: d.depth_aggr,
88                            push_freq: d.push_freq,
89                        },
90                        StreamKind::Trades { ticker_info: ti },
91                    ]
92                })
93                .ok_or_else(|| format!("TickerInfo not found for {}", d.ticker)),
94        }
95    }
96}
97
98fn default_depth_aggr() -> exchange::adapter::StreamTicksize {
99    exchange::adapter::StreamTicksize::Client
100}
101
102fn default_push_freq() -> PushFrequency {
103    PushFrequency::ServerDefault
104}