wmjtyd_libstock/data/
kline.rs1use std::io::{BufRead, BufReader};
4
5use crypto_msg_parser::KlineMsg;
6use rust_decimal::prelude::ToPrimitive;
7
8use super::{
9 fields::{
10 ExchangeTimestampRepr, ExchangeTypeRepr, MarketTypeRepr, MessageTypeRepr, PeriodRepr,
11 ReadExt, ReceivedTimestampRepr, StructureError, SymbolPairRepr,
12 },
13 hex::{HexDataError, NumToBytesExt},
14};
15
16const KLINE_INDICATOR_SIZE: [usize; 5] = [5, 5, 5, 5, 10];
20
21fn get_kline_indi_array(kline: &KlineMsg) -> [f64; 5] {
25 [kline.open, kline.high, kline.low, kline.close, kline.volume]
26}
27
28pub fn encode_kline(kline: &KlineMsg) -> KlineResult<Vec<u8>> {
30 let mut bytes = Vec::<u8>::with_capacity(47);
32
33 bytes.extend_from_slice(&ExchangeTimestampRepr(kline.timestamp).to_bytes());
35
36 bytes.extend_from_slice(&ReceivedTimestampRepr::try_new_from_now()?.to_bytes());
38
39 bytes.extend_from_slice(&ExchangeTypeRepr::try_from_str(&kline.exchange)?.to_bytes());
41
42 bytes.extend_from_slice(&MarketTypeRepr(kline.market_type).to_bytes());
44
45 bytes.extend_from_slice(&MessageTypeRepr(kline.msg_type).to_bytes());
47
48 bytes.extend_from_slice(&SymbolPairRepr::from_pair(&kline.pair).to_bytes());
50
51 bytes.extend_from_slice(&PeriodRepr(kline.period.as_str()).try_to_bytes()?);
53
54 for (idx, price) in get_kline_indi_array(kline).iter().enumerate() {
56 macro_rules! create_extend_branch {
58 ($ty:ty) => {
59 bytes.extend_from_slice(&<$ty>::encode_bytes(&price.to_string())?)
60 };
61 }
62
63 let size = KLINE_INDICATOR_SIZE[idx];
64
65 match size {
66 5 => create_extend_branch!(u32),
67 10 => create_extend_branch!(u64),
68 _ => unreachable!(),
69 };
70 }
71
72 Ok(bytes)
73}
74
75pub fn decode_kline(payload: &[u8]) -> KlineResult<KlineMsg> {
77 let mut reader = BufReader::new(payload);
78
79 let exchange_timestamp = ExchangeTimestampRepr::try_from_reader(&mut reader)?.0;
81
82 reader.consume(8);
84 let exchange_type = ExchangeTypeRepr::try_from_reader(&mut reader)?.0;
88
89 let market_type = MarketTypeRepr::try_from_reader(&mut reader)?.0;
91
92 let msg_type = MessageTypeRepr::try_from_reader(&mut reader)?.0;
94
95 let SymbolPairRepr(symbol, pair) = SymbolPairRepr::try_from_reader(&mut reader)?;
97
98 let period = PeriodRepr::try_from_reader(&mut reader)?.0;
100
101 let mut indicators = [0.0f64; 5];
103 for (idx, size) in KLINE_INDICATOR_SIZE.iter().enumerate() {
104 macro_rules! get_indicators_branch {
106 ($ty: ty) => {{
107 let raw = reader.read_exact_array()?;
108 let indicator = <$ty>::decode_bytes(&raw)
109 .to_f64()
110 .ok_or_else(|| KlineError::DecimalConvertF64Failed(raw.to_vec()))?;
111
112 indicator
113 }};
114 }
115
116 indicators[idx] = match *size {
117 5 => get_indicators_branch!(u32),
118 10 => get_indicators_branch!(u64),
119 _ => unreachable!(),
120 };
121 }
122 let [open, high, low, close, volume] = indicators;
123
124 Ok(KlineMsg {
125 exchange: exchange_type.to_string(),
126 market_type,
127 msg_type,
128 pair: pair.to_string(),
129 symbol: symbol.to_string(),
130 timestamp: exchange_timestamp as i64,
131 open,
132 high,
133 low,
134 close,
135 volume,
137 period: period.to_string(),
139 quote_volume: None,
141 json: String::new(),
142 })
143}
144
145#[derive(thiserror::Error, Debug)]
146pub enum KlineError {
147 #[error("data/hex error: {0}")]
148 HexDataError(#[from] HexDataError),
149
150 #[error("structure error: {0}")]
151 StructureError(#[from] StructureError),
152
153 #[error("failed to convert the following bytes to f64: {0:?}")]
154 DecimalConvertF64Failed(Vec<u8>),
155}
156
157pub type KlineResult<T> = Result<T, KlineError>;