1use crate::{
2 price::Price,
3 rate::Rate,
4 time::{DurationUs, TimestampUs},
5 ChannelId, PriceFeedId, PriceFeedProperty,
6};
7use {
8 anyhow::bail,
9 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
10 serde::{Deserialize, Serialize},
11 std::{
12 io::{Cursor, Read, Write},
13 num::NonZeroI64,
14 },
15};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct PayloadData {
20 pub timestamp_us: TimestampUs,
21 pub channel_id: ChannelId,
22 pub feeds: Vec<PayloadFeedData>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct PayloadFeedData {
28 pub feed_id: PriceFeedId,
29 pub properties: Vec<PayloadPropertyValue>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub enum PayloadPropertyValue {
35 Price(Option<Price>),
36 BestBidPrice(Option<Price>),
37 BestAskPrice(Option<Price>),
38 PublisherCount(u16),
39 Exponent(i16),
40 Confidence(Option<Price>),
41 FundingRate(Option<Rate>),
42 FundingTimestamp(Option<TimestampUs>),
43 FundingRateInterval(Option<DurationUs>),
44}
45
46#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
47pub struct AggregatedPriceFeedData {
48 pub price: Option<Price>,
49 pub best_bid_price: Option<Price>,
50 pub best_ask_price: Option<Price>,
51 pub publisher_count: u16,
52 pub exponent: i16,
53 pub confidence: Option<Price>,
54 pub funding_rate: Option<Rate>,
55 pub funding_timestamp: Option<TimestampUs>,
56 pub funding_rate_interval: Option<DurationUs>,
57}
58
59pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
62
63impl PayloadData {
64 pub fn new(
65 timestamp_us: TimestampUs,
66 channel_id: ChannelId,
67 feeds: &[(PriceFeedId, AggregatedPriceFeedData)],
68 requested_properties: &[PriceFeedProperty],
69 ) -> Self {
70 Self {
71 timestamp_us,
72 channel_id,
73 feeds: feeds
74 .iter()
75 .map(|(feed_id, feed)| PayloadFeedData {
76 feed_id: *feed_id,
77 properties: requested_properties
78 .iter()
79 .map(|property| match property {
80 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
81 PriceFeedProperty::BestBidPrice => {
82 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
83 }
84 PriceFeedProperty::BestAskPrice => {
85 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
86 }
87 PriceFeedProperty::PublisherCount => {
88 PayloadPropertyValue::PublisherCount(feed.publisher_count)
89 }
90 PriceFeedProperty::Exponent => {
91 PayloadPropertyValue::Exponent(feed.exponent)
92 }
93 PriceFeedProperty::Confidence => {
94 PayloadPropertyValue::Confidence(feed.confidence)
95 }
96 PriceFeedProperty::FundingRate => {
97 PayloadPropertyValue::FundingRate(feed.funding_rate)
98 }
99 PriceFeedProperty::FundingTimestamp => {
100 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
101 }
102 PriceFeedProperty::FundingRateInterval => {
103 PayloadPropertyValue::FundingRateInterval(
104 feed.funding_rate_interval,
105 )
106 }
107 })
108 .collect(),
109 })
110 .collect(),
111 }
112 }
113
114 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
115 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
116 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
117 writer.write_u8(self.channel_id.0)?;
118 writer.write_u8(self.feeds.len().try_into()?)?;
119 for feed in &self.feeds {
120 writer.write_u32::<BO>(feed.feed_id.0)?;
121 writer.write_u8(feed.properties.len().try_into()?)?;
122 for property in &feed.properties {
123 match property {
124 PayloadPropertyValue::Price(price) => {
125 writer.write_u8(PriceFeedProperty::Price as u8)?;
126 write_option_price::<BO>(&mut writer, *price)?;
127 }
128 PayloadPropertyValue::BestBidPrice(price) => {
129 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
130 write_option_price::<BO>(&mut writer, *price)?;
131 }
132 PayloadPropertyValue::BestAskPrice(price) => {
133 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
134 write_option_price::<BO>(&mut writer, *price)?;
135 }
136 PayloadPropertyValue::PublisherCount(count) => {
137 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
138 writer.write_u16::<BO>(*count)?;
139 }
140 PayloadPropertyValue::Exponent(exponent) => {
141 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
142 writer.write_i16::<BO>(*exponent)?;
143 }
144 PayloadPropertyValue::Confidence(confidence) => {
145 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
146 write_option_price::<BO>(&mut writer, *confidence)?;
147 }
148 PayloadPropertyValue::FundingRate(rate) => {
149 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
150 write_option_rate::<BO>(&mut writer, *rate)?;
151 }
152 PayloadPropertyValue::FundingTimestamp(timestamp) => {
153 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
154 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
155 }
156 &PayloadPropertyValue::FundingRateInterval(interval) => {
157 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
158 write_option_duration::<BO>(&mut writer, interval)?;
159 }
160 }
161 }
162 }
163 Ok(())
164 }
165
166 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
167 Self::deserialize::<LE>(Cursor::new(data))
168 }
169
170 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
171 Self::deserialize::<BE>(Cursor::new(data))
172 }
173
174 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
175 let magic = reader.read_u32::<BO>()?;
176 if magic != PAYLOAD_FORMAT_MAGIC {
177 bail!("magic mismatch");
178 }
179 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
180 let channel_id = ChannelId(reader.read_u8()?);
181 let num_feeds = reader.read_u8()?;
182 let mut feeds = Vec::with_capacity(num_feeds.into());
183 for _ in 0..num_feeds {
184 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
185 let num_properties = reader.read_u8()?;
186 let mut feed = PayloadFeedData {
187 feed_id,
188 properties: Vec::with_capacity(num_properties.into()),
189 };
190 for _ in 0..num_properties {
191 let property = reader.read_u8()?;
192 let value = if property == PriceFeedProperty::Price as u8 {
193 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
194 } else if property == PriceFeedProperty::BestBidPrice as u8 {
195 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
196 } else if property == PriceFeedProperty::BestAskPrice as u8 {
197 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
198 } else if property == PriceFeedProperty::PublisherCount as u8 {
199 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
200 } else if property == PriceFeedProperty::Exponent as u8 {
201 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
202 } else if property == PriceFeedProperty::Confidence as u8 {
203 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
204 } else if property == PriceFeedProperty::FundingRate as u8 {
205 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
206 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
207 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
208 &mut reader,
209 )?)
210 } else if property == PriceFeedProperty::FundingRateInterval as u8 {
211 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
212 &mut reader,
213 )?)
214 } else {
215 bail!("unknown property");
216 };
217 feed.properties.push(value);
218 }
219 feeds.push(feed);
220 }
221 Ok(Self {
222 timestamp_us,
223 channel_id,
224 feeds,
225 })
226 }
227}
228
229fn write_option_price<BO: ByteOrder>(
230 mut writer: impl Write,
231 value: Option<Price>,
232) -> std::io::Result<()> {
233 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
234}
235
236fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
237 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
238 Ok(value.map(Price::from_nonzero_mantissa))
239}
240
241fn write_option_rate<BO: ByteOrder>(
242 mut writer: impl Write,
243 value: Option<Rate>,
244) -> std::io::Result<()> {
245 match value {
246 Some(value) => {
247 writer.write_u8(1)?;
248 writer.write_i64::<BO>(value.mantissa())
249 }
250 None => {
251 writer.write_u8(0)?;
252 Ok(())
253 }
254 }
255}
256
257fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
258 let present = reader.read_u8()? != 0;
259 if present {
260 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
261 } else {
262 Ok(None)
263 }
264}
265
266fn write_option_timestamp<BO: ByteOrder>(
267 mut writer: impl Write,
268 value: Option<TimestampUs>,
269) -> std::io::Result<()> {
270 match value {
271 Some(value) => {
272 writer.write_u8(1)?;
273 writer.write_u64::<BO>(value.as_micros())
274 }
275 None => {
276 writer.write_u8(0)?;
277 Ok(())
278 }
279 }
280}
281
282fn read_option_timestamp<BO: ByteOrder>(
283 mut reader: impl Read,
284) -> std::io::Result<Option<TimestampUs>> {
285 let present = reader.read_u8()? != 0;
286 if present {
287 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
288 } else {
289 Ok(None)
290 }
291}
292
293fn write_option_duration<BO: ByteOrder>(
294 mut writer: impl Write,
295 value: Option<DurationUs>,
296) -> std::io::Result<()> {
297 match value {
298 Some(value) => {
299 writer.write_u8(1)?;
300 writer.write_u64::<BO>(value.as_micros())
301 }
302 None => {
303 writer.write_u8(0)?;
304 Ok(())
305 }
306 }
307}
308
309fn read_option_interval<BO: ByteOrder>(
310 mut reader: impl Read,
311) -> std::io::Result<Option<DurationUs>> {
312 let present = reader.read_u8()? != 0;
313 if present {
314 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
315 } else {
316 Ok(None)
317 }
318}