flowsurface_data/
stream.rs1use exchange::{PushFrequency, Ticker, TickerInfo, Timeframe, adapter::StreamKind};
2use serde::{Deserialize, Serialize};
3
4#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
5pub enum PersistStreamKind {
6 Kline {
7 ticker: Ticker,
8 timeframe: Timeframe,
9 },
10 Depth(PersistDepth),
11 Trades {
12 ticker: Ticker,
13 },
14 DepthAndTrades(PersistDepth),
17}
18
19#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
20pub struct PersistDepth {
21 pub ticker: Ticker,
22 #[serde(default = "default_depth_aggr")]
23 pub depth_aggr: exchange::adapter::StreamTicksize,
24 #[serde(default = "default_push_freq")]
25 pub push_freq: PushFrequency,
26}
27
28impl From<StreamKind> for PersistStreamKind {
29 fn from(stream: StreamKind) -> Self {
30 match stream {
31 StreamKind::Kline {
32 ticker_info,
33 timeframe,
34 } => PersistStreamKind::Kline {
35 ticker: ticker_info.ticker,
36 timeframe,
37 },
38 StreamKind::Depth {
39 ticker_info,
40 depth_aggr,
41 push_freq,
42 } => PersistStreamKind::Depth(PersistDepth {
43 ticker: ticker_info.ticker,
44 depth_aggr,
45 push_freq,
46 }),
47 StreamKind::Trades { ticker_info } => PersistStreamKind::Trades {
48 ticker: ticker_info.ticker,
49 },
50 }
51 }
52}
53
54impl PersistStreamKind {
55 pub fn into_stream_kinds<F>(self, mut resolver: F) -> Result<Vec<StreamKind>, String>
58 where
59 F: FnMut(&Ticker) -> Option<TickerInfo>,
60 {
61 match self {
62 PersistStreamKind::Kline { ticker, timeframe } => resolver(&ticker)
63 .map(|ti| {
64 vec![StreamKind::Kline {
65 ticker_info: ti,
66 timeframe,
67 }]
68 })
69 .ok_or_else(|| format!("TickerInfo not found for {}", ticker)),
70 PersistStreamKind::Depth(d) => resolver(&d.ticker)
71 .map(|ti| {
72 vec![StreamKind::Depth {
73 ticker_info: ti,
74 depth_aggr: d.depth_aggr,
75 push_freq: d.push_freq,
76 }]
77 })
78 .ok_or_else(|| format!("TickerInfo not found for {}", d.ticker)),
79 PersistStreamKind::Trades { ticker } => resolver(&ticker)
80 .map(|ti| vec![StreamKind::Trades { ticker_info: ti }])
81 .ok_or_else(|| format!("TickerInfo not found for {}", ticker)),
82 PersistStreamKind::DepthAndTrades(d) => resolver(&d.ticker)
83 .map(|ti| {
84 vec![
85 StreamKind::Depth {
86 ticker_info: ti,
87 depth_aggr: d.depth_aggr,
88 push_freq: d.push_freq,
89 },
90 StreamKind::Trades { ticker_info: ti },
91 ]
92 })
93 .ok_or_else(|| format!("TickerInfo not found for {}", d.ticker)),
94 }
95 }
96}
97
98fn default_depth_aggr() -> exchange::adapter::StreamTicksize {
99 exchange::adapter::StreamTicksize::Client
100}
101
102fn default_push_freq() -> PushFrequency {
103 PushFrequency::ServerDefault
104}