1use crate::{
2 price::Price,
3 rate::Rate,
4 time::{DurationUs, TimestampUs},
5 ChannelId, PriceFeedId, PriceFeedProperty,
6};
7use {
8 anyhow::bail,
9 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
10 serde::{Deserialize, Serialize},
11 std::{
12 io::{Cursor, Read, Write},
13 num::NonZeroI64,
14 },
15};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct PayloadData {
20 pub timestamp_us: TimestampUs,
21 pub channel_id: ChannelId,
22 pub feeds: Vec<PayloadFeedData>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct PayloadFeedData {
28 pub feed_id: PriceFeedId,
29 pub properties: Vec<PayloadPropertyValue>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub enum PayloadPropertyValue {
35 Price(Option<Price>),
36 BestBidPrice(Option<Price>),
37 BestAskPrice(Option<Price>),
38 PublisherCount(u16),
39 Exponent(i16),
40 Confidence(Option<Price>),
41 FundingRate(Option<Rate>),
42 FundingTimestamp(Option<TimestampUs>),
43 FundingRateInterval(Option<DurationUs>),
44}
45
46#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
47pub struct AggregatedPriceFeedData {
48 pub price: Option<Price>,
49 pub best_bid_price: Option<Price>,
50 pub best_ask_price: Option<Price>,
51 pub publisher_count: u16,
52 pub confidence: Option<Price>,
53 pub funding_rate: Option<Rate>,
54 pub funding_timestamp: Option<TimestampUs>,
55 pub funding_rate_interval: Option<DurationUs>,
56}
57
58pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
61
62impl PayloadData {
63 pub fn new(
64 timestamp_us: TimestampUs,
65 channel_id: ChannelId,
66 feeds: &[(PriceFeedId, i16, AggregatedPriceFeedData)],
67 requested_properties: &[PriceFeedProperty],
68 ) -> Self {
69 Self {
70 timestamp_us,
71 channel_id,
72 feeds: feeds
73 .iter()
74 .map(|(feed_id, exponent, feed)| PayloadFeedData {
75 feed_id: *feed_id,
76 properties: requested_properties
77 .iter()
78 .map(|property| match property {
79 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
80 PriceFeedProperty::BestBidPrice => {
81 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
82 }
83 PriceFeedProperty::BestAskPrice => {
84 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
85 }
86 PriceFeedProperty::PublisherCount => {
87 PayloadPropertyValue::PublisherCount(feed.publisher_count)
88 }
89 PriceFeedProperty::Exponent => {
90 PayloadPropertyValue::Exponent(*exponent)
91 }
92 PriceFeedProperty::Confidence => {
93 PayloadPropertyValue::Confidence(feed.confidence)
94 }
95 PriceFeedProperty::FundingRate => {
96 PayloadPropertyValue::FundingRate(feed.funding_rate)
97 }
98 PriceFeedProperty::FundingTimestamp => {
99 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
100 }
101 PriceFeedProperty::FundingRateInterval => {
102 PayloadPropertyValue::FundingRateInterval(
103 feed.funding_rate_interval,
104 )
105 }
106 })
107 .collect(),
108 })
109 .collect(),
110 }
111 }
112
113 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
114 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
115 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
116 writer.write_u8(self.channel_id.0)?;
117 writer.write_u8(self.feeds.len().try_into()?)?;
118 for feed in &self.feeds {
119 writer.write_u32::<BO>(feed.feed_id.0)?;
120 writer.write_u8(feed.properties.len().try_into()?)?;
121 for property in &feed.properties {
122 match property {
123 PayloadPropertyValue::Price(price) => {
124 writer.write_u8(PriceFeedProperty::Price as u8)?;
125 write_option_price::<BO>(&mut writer, *price)?;
126 }
127 PayloadPropertyValue::BestBidPrice(price) => {
128 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
129 write_option_price::<BO>(&mut writer, *price)?;
130 }
131 PayloadPropertyValue::BestAskPrice(price) => {
132 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
133 write_option_price::<BO>(&mut writer, *price)?;
134 }
135 PayloadPropertyValue::PublisherCount(count) => {
136 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
137 writer.write_u16::<BO>(*count)?;
138 }
139 PayloadPropertyValue::Exponent(exponent) => {
140 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
141 writer.write_i16::<BO>(*exponent)?;
142 }
143 PayloadPropertyValue::Confidence(confidence) => {
144 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
145 write_option_price::<BO>(&mut writer, *confidence)?;
146 }
147 PayloadPropertyValue::FundingRate(rate) => {
148 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
149 write_option_rate::<BO>(&mut writer, *rate)?;
150 }
151 PayloadPropertyValue::FundingTimestamp(timestamp) => {
152 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
153 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
154 }
155 &PayloadPropertyValue::FundingRateInterval(interval) => {
156 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
157 write_option_duration::<BO>(&mut writer, interval)?;
158 }
159 }
160 }
161 }
162 Ok(())
163 }
164
165 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
166 Self::deserialize::<LE>(Cursor::new(data))
167 }
168
169 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
170 Self::deserialize::<BE>(Cursor::new(data))
171 }
172
173 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
174 let magic = reader.read_u32::<BO>()?;
175 if magic != PAYLOAD_FORMAT_MAGIC {
176 bail!("magic mismatch");
177 }
178 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
179 let channel_id = ChannelId(reader.read_u8()?);
180 let num_feeds = reader.read_u8()?;
181 let mut feeds = Vec::with_capacity(num_feeds.into());
182 for _ in 0..num_feeds {
183 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
184 let num_properties = reader.read_u8()?;
185 let mut feed = PayloadFeedData {
186 feed_id,
187 properties: Vec::with_capacity(num_properties.into()),
188 };
189 for _ in 0..num_properties {
190 let property = reader.read_u8()?;
191 let value = if property == PriceFeedProperty::Price as u8 {
192 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
193 } else if property == PriceFeedProperty::BestBidPrice as u8 {
194 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
195 } else if property == PriceFeedProperty::BestAskPrice as u8 {
196 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
197 } else if property == PriceFeedProperty::PublisherCount as u8 {
198 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
199 } else if property == PriceFeedProperty::Exponent as u8 {
200 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
201 } else if property == PriceFeedProperty::Confidence as u8 {
202 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
203 } else if property == PriceFeedProperty::FundingRate as u8 {
204 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
205 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
206 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
207 &mut reader,
208 )?)
209 } else if property == PriceFeedProperty::FundingRateInterval as u8 {
210 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
211 &mut reader,
212 )?)
213 } else {
214 bail!("unknown property");
215 };
216 feed.properties.push(value);
217 }
218 feeds.push(feed);
219 }
220 Ok(Self {
221 timestamp_us,
222 channel_id,
223 feeds,
224 })
225 }
226}
227
228fn write_option_price<BO: ByteOrder>(
229 mut writer: impl Write,
230 value: Option<Price>,
231) -> std::io::Result<()> {
232 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
233}
234
235fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
236 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
237 Ok(value.map(Price::from_nonzero_mantissa))
238}
239
240fn write_option_rate<BO: ByteOrder>(
241 mut writer: impl Write,
242 value: Option<Rate>,
243) -> std::io::Result<()> {
244 match value {
245 Some(value) => {
246 writer.write_u8(1)?;
247 writer.write_i64::<BO>(value.mantissa())
248 }
249 None => {
250 writer.write_u8(0)?;
251 Ok(())
252 }
253 }
254}
255
256fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
257 let present = reader.read_u8()? != 0;
258 if present {
259 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
260 } else {
261 Ok(None)
262 }
263}
264
265fn write_option_timestamp<BO: ByteOrder>(
266 mut writer: impl Write,
267 value: Option<TimestampUs>,
268) -> std::io::Result<()> {
269 match value {
270 Some(value) => {
271 writer.write_u8(1)?;
272 writer.write_u64::<BO>(value.as_micros())
273 }
274 None => {
275 writer.write_u8(0)?;
276 Ok(())
277 }
278 }
279}
280
281fn read_option_timestamp<BO: ByteOrder>(
282 mut reader: impl Read,
283) -> std::io::Result<Option<TimestampUs>> {
284 let present = reader.read_u8()? != 0;
285 if present {
286 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
287 } else {
288 Ok(None)
289 }
290}
291
292fn write_option_duration<BO: ByteOrder>(
293 mut writer: impl Write,
294 value: Option<DurationUs>,
295) -> std::io::Result<()> {
296 match value {
297 Some(value) => {
298 writer.write_u8(1)?;
299 writer.write_u64::<BO>(value.as_micros())
300 }
301 None => {
302 writer.write_u8(0)?;
303 Ok(())
304 }
305 }
306}
307
308fn read_option_interval<BO: ByteOrder>(
309 mut reader: impl Read,
310) -> std::io::Result<Option<DurationUs>> {
311 let present = reader.read_u8()? != 0;
312 if present {
313 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
314 } else {
315 Ok(None)
316 }
317}