1use crate::{
2 price::Price,
3 rate::Rate,
4 time::{DurationUs, TimestampUs},
5 ChannelId, PriceFeedId, PriceFeedProperty,
6};
7use {
8 anyhow::bail,
9 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
10 serde::{Deserialize, Serialize},
11 std::{
12 io::{Cursor, Read, Write},
13 num::NonZeroI64,
14 },
15};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct PayloadData {
20 pub timestamp_us: TimestampUs,
21 pub channel_id: ChannelId,
22 pub feeds: Vec<PayloadFeedData>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct PayloadFeedData {
28 pub feed_id: PriceFeedId,
29 pub properties: Vec<PayloadPropertyValue>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub enum PayloadPropertyValue {
35 Price(Option<Price>),
36 BestBidPrice(Option<Price>),
37 BestAskPrice(Option<Price>),
38 PublisherCount(u16),
39 Exponent(i16),
40 Confidence(Option<Price>),
41 FundingRate(Option<Rate>),
42 FundingTimestamp(Option<TimestampUs>),
43 FundingRateInterval(Option<DurationUs>),
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
47pub struct AggregatedPriceFeedData {
48 pub price: Option<Price>,
49 pub best_bid_price: Option<Price>,
50 pub best_ask_price: Option<Price>,
51 pub publisher_count: u16,
52 pub exponent: i16,
53 pub confidence: Option<Price>,
54 pub funding_rate: Option<Rate>,
55 pub funding_timestamp: Option<TimestampUs>,
56 pub funding_rate_interval: Option<DurationUs>,
57}
58
59impl AggregatedPriceFeedData {
60 pub fn empty(exponent: i16) -> Self {
61 Self {
62 price: None,
63 best_bid_price: None,
64 best_ask_price: None,
65 publisher_count: 0,
66 exponent,
67 confidence: None,
68 funding_rate: None,
69 funding_timestamp: None,
70 funding_rate_interval: None,
71 }
72 }
73}
74
75pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
78
79impl PayloadData {
80 pub fn new(
81 timestamp_us: TimestampUs,
82 channel_id: ChannelId,
83 feeds: &[(PriceFeedId, AggregatedPriceFeedData)],
84 requested_properties: &[PriceFeedProperty],
85 ) -> Self {
86 Self {
87 timestamp_us,
88 channel_id,
89 feeds: feeds
90 .iter()
91 .map(|(feed_id, feed)| PayloadFeedData {
92 feed_id: *feed_id,
93 properties: requested_properties
94 .iter()
95 .map(|property| match property {
96 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
97 PriceFeedProperty::BestBidPrice => {
98 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
99 }
100 PriceFeedProperty::BestAskPrice => {
101 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
102 }
103 PriceFeedProperty::PublisherCount => {
104 PayloadPropertyValue::PublisherCount(feed.publisher_count)
105 }
106 PriceFeedProperty::Exponent => {
107 PayloadPropertyValue::Exponent(feed.exponent)
108 }
109 PriceFeedProperty::Confidence => {
110 PayloadPropertyValue::Confidence(feed.confidence)
111 }
112 PriceFeedProperty::FundingRate => {
113 PayloadPropertyValue::FundingRate(feed.funding_rate)
114 }
115 PriceFeedProperty::FundingTimestamp => {
116 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
117 }
118 PriceFeedProperty::FundingRateInterval => {
119 PayloadPropertyValue::FundingRateInterval(
120 feed.funding_rate_interval,
121 )
122 }
123 })
124 .collect(),
125 })
126 .collect(),
127 }
128 }
129
130 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
131 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
132 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
133 writer.write_u8(self.channel_id.0)?;
134 writer.write_u8(self.feeds.len().try_into()?)?;
135 for feed in &self.feeds {
136 writer.write_u32::<BO>(feed.feed_id.0)?;
137 writer.write_u8(feed.properties.len().try_into()?)?;
138 for property in &feed.properties {
139 match property {
140 PayloadPropertyValue::Price(price) => {
141 writer.write_u8(PriceFeedProperty::Price as u8)?;
142 write_option_price::<BO>(&mut writer, *price)?;
143 }
144 PayloadPropertyValue::BestBidPrice(price) => {
145 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
146 write_option_price::<BO>(&mut writer, *price)?;
147 }
148 PayloadPropertyValue::BestAskPrice(price) => {
149 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
150 write_option_price::<BO>(&mut writer, *price)?;
151 }
152 PayloadPropertyValue::PublisherCount(count) => {
153 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
154 writer.write_u16::<BO>(*count)?;
155 }
156 PayloadPropertyValue::Exponent(exponent) => {
157 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
158 writer.write_i16::<BO>(*exponent)?;
159 }
160 PayloadPropertyValue::Confidence(confidence) => {
161 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
162 write_option_price::<BO>(&mut writer, *confidence)?;
163 }
164 PayloadPropertyValue::FundingRate(rate) => {
165 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
166 write_option_rate::<BO>(&mut writer, *rate)?;
167 }
168 PayloadPropertyValue::FundingTimestamp(timestamp) => {
169 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
170 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
171 }
172 &PayloadPropertyValue::FundingRateInterval(interval) => {
173 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
174 write_option_duration::<BO>(&mut writer, interval)?;
175 }
176 }
177 }
178 }
179 Ok(())
180 }
181
182 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
183 Self::deserialize::<LE>(Cursor::new(data))
184 }
185
186 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
187 Self::deserialize::<BE>(Cursor::new(data))
188 }
189
190 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
191 let magic = reader.read_u32::<BO>()?;
192 if magic != PAYLOAD_FORMAT_MAGIC {
193 bail!("magic mismatch");
194 }
195 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
196 let channel_id = ChannelId(reader.read_u8()?);
197 let num_feeds = reader.read_u8()?;
198 let mut feeds = Vec::with_capacity(num_feeds.into());
199 for _ in 0..num_feeds {
200 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
201 let num_properties = reader.read_u8()?;
202 let mut feed = PayloadFeedData {
203 feed_id,
204 properties: Vec::with_capacity(num_properties.into()),
205 };
206 for _ in 0..num_properties {
207 let property = reader.read_u8()?;
208 let value = if property == PriceFeedProperty::Price as u8 {
209 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
210 } else if property == PriceFeedProperty::BestBidPrice as u8 {
211 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
212 } else if property == PriceFeedProperty::BestAskPrice as u8 {
213 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
214 } else if property == PriceFeedProperty::PublisherCount as u8 {
215 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
216 } else if property == PriceFeedProperty::Exponent as u8 {
217 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
218 } else if property == PriceFeedProperty::Confidence as u8 {
219 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
220 } else if property == PriceFeedProperty::FundingRate as u8 {
221 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
222 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
223 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
224 &mut reader,
225 )?)
226 } else if property == PriceFeedProperty::FundingRateInterval as u8 {
227 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
228 &mut reader,
229 )?)
230 } else {
231 bail!("unknown property");
232 };
233 feed.properties.push(value);
234 }
235 feeds.push(feed);
236 }
237 Ok(Self {
238 timestamp_us,
239 channel_id,
240 feeds,
241 })
242 }
243}
244
245fn write_option_price<BO: ByteOrder>(
246 mut writer: impl Write,
247 value: Option<Price>,
248) -> std::io::Result<()> {
249 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
250}
251
252fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
253 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
254 Ok(value.map(Price::from_nonzero_mantissa))
255}
256
257fn write_option_rate<BO: ByteOrder>(
258 mut writer: impl Write,
259 value: Option<Rate>,
260) -> std::io::Result<()> {
261 match value {
262 Some(value) => {
263 writer.write_u8(1)?;
264 writer.write_i64::<BO>(value.mantissa())
265 }
266 None => {
267 writer.write_u8(0)?;
268 Ok(())
269 }
270 }
271}
272
273fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
274 let present = reader.read_u8()? != 0;
275 if present {
276 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
277 } else {
278 Ok(None)
279 }
280}
281
282fn write_option_timestamp<BO: ByteOrder>(
283 mut writer: impl Write,
284 value: Option<TimestampUs>,
285) -> std::io::Result<()> {
286 match value {
287 Some(value) => {
288 writer.write_u8(1)?;
289 writer.write_u64::<BO>(value.as_micros())
290 }
291 None => {
292 writer.write_u8(0)?;
293 Ok(())
294 }
295 }
296}
297
298fn read_option_timestamp<BO: ByteOrder>(
299 mut reader: impl Read,
300) -> std::io::Result<Option<TimestampUs>> {
301 let present = reader.read_u8()? != 0;
302 if present {
303 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
304 } else {
305 Ok(None)
306 }
307}
308
309fn write_option_duration<BO: ByteOrder>(
310 mut writer: impl Write,
311 value: Option<DurationUs>,
312) -> std::io::Result<()> {
313 match value {
314 Some(value) => {
315 writer.write_u8(1)?;
316 writer.write_u64::<BO>(value.as_micros())
317 }
318 None => {
319 writer.write_u8(0)?;
320 Ok(())
321 }
322 }
323}
324
325fn read_option_interval<BO: ByteOrder>(
326 mut reader: impl Read,
327) -> std::io::Result<Option<DurationUs>> {
328 let present = reader.read_u8()? != 0;
329 if present {
330 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
331 } else {
332 Ok(None)
333 }
334}