1use crate::{
2 api::MarketSession,
3 price::Price,
4 rate::Rate,
5 time::{DurationUs, TimestampUs},
6 ChannelId, PriceFeedId, PriceFeedProperty,
7};
8use {
9 anyhow::bail,
10 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
11 serde::{Deserialize, Serialize},
12 std::{
13 io::{Cursor, Read, Write},
14 num::NonZeroI64,
15 },
16};
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct PayloadData {
21 pub timestamp_us: TimestampUs,
22 pub channel_id: ChannelId,
23 pub feeds: Vec<PayloadFeedData>,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct PayloadFeedData {
29 pub feed_id: PriceFeedId,
30 pub properties: Vec<PayloadPropertyValue>,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub enum PayloadPropertyValue {
36 Price(Option<Price>),
37 BestBidPrice(Option<Price>),
38 BestAskPrice(Option<Price>),
39 PublisherCount(u16),
40 Exponent(i16),
41 Confidence(Option<Price>),
42 FundingRate(Option<Rate>),
43 FundingTimestamp(Option<TimestampUs>),
44 FundingRateInterval(Option<DurationUs>),
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
48pub struct AggregatedPriceFeedData {
49 pub price: Option<Price>,
50 pub best_bid_price: Option<Price>,
51 pub best_ask_price: Option<Price>,
52 pub publisher_count: u16,
53 pub exponent: i16,
54 pub confidence: Option<Price>,
55 pub funding_rate: Option<Rate>,
56 pub funding_timestamp: Option<TimestampUs>,
57 pub funding_rate_interval: Option<DurationUs>,
58 pub market_session: Option<MarketSession>,
59}
60
61impl AggregatedPriceFeedData {
62 pub fn empty(exponent: i16, market_session: Option<MarketSession>) -> Self {
63 Self {
64 price: None,
65 best_bid_price: None,
66 best_ask_price: None,
67 publisher_count: 0,
68 exponent,
69 confidence: None,
70 funding_rate: None,
71 funding_timestamp: None,
72 funding_rate_interval: None,
73 market_session,
74 }
75 }
76}
77
78pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
81
82impl PayloadData {
83 pub fn new(
84 timestamp_us: TimestampUs,
85 channel_id: ChannelId,
86 feeds: &[(PriceFeedId, AggregatedPriceFeedData)],
87 requested_properties: &[PriceFeedProperty],
88 ) -> Self {
89 Self {
90 timestamp_us,
91 channel_id,
92 feeds: feeds
93 .iter()
94 .map(|(feed_id, feed)| PayloadFeedData {
95 feed_id: *feed_id,
96 properties: requested_properties
97 .iter()
98 .map(|property| match property {
99 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
100 PriceFeedProperty::BestBidPrice => {
101 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
102 }
103 PriceFeedProperty::BestAskPrice => {
104 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
105 }
106 PriceFeedProperty::PublisherCount => {
107 PayloadPropertyValue::PublisherCount(feed.publisher_count)
108 }
109 PriceFeedProperty::Exponent => {
110 PayloadPropertyValue::Exponent(feed.exponent)
111 }
112 PriceFeedProperty::Confidence => {
113 PayloadPropertyValue::Confidence(feed.confidence)
114 }
115 PriceFeedProperty::FundingRate => {
116 PayloadPropertyValue::FundingRate(feed.funding_rate)
117 }
118 PriceFeedProperty::FundingTimestamp => {
119 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
120 }
121 PriceFeedProperty::FundingRateInterval => {
122 PayloadPropertyValue::FundingRateInterval(
123 feed.funding_rate_interval,
124 )
125 }
126 })
127 .collect(),
128 })
129 .collect(),
130 }
131 }
132
133 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
134 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
135 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
136 writer.write_u8(self.channel_id.0)?;
137 writer.write_u8(self.feeds.len().try_into()?)?;
138 for feed in &self.feeds {
139 writer.write_u32::<BO>(feed.feed_id.0)?;
140 writer.write_u8(feed.properties.len().try_into()?)?;
141 for property in &feed.properties {
142 match property {
143 PayloadPropertyValue::Price(price) => {
144 writer.write_u8(PriceFeedProperty::Price as u8)?;
145 write_option_price::<BO>(&mut writer, *price)?;
146 }
147 PayloadPropertyValue::BestBidPrice(price) => {
148 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
149 write_option_price::<BO>(&mut writer, *price)?;
150 }
151 PayloadPropertyValue::BestAskPrice(price) => {
152 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
153 write_option_price::<BO>(&mut writer, *price)?;
154 }
155 PayloadPropertyValue::PublisherCount(count) => {
156 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
157 writer.write_u16::<BO>(*count)?;
158 }
159 PayloadPropertyValue::Exponent(exponent) => {
160 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
161 writer.write_i16::<BO>(*exponent)?;
162 }
163 PayloadPropertyValue::Confidence(confidence) => {
164 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
165 write_option_price::<BO>(&mut writer, *confidence)?;
166 }
167 PayloadPropertyValue::FundingRate(rate) => {
168 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
169 write_option_rate::<BO>(&mut writer, *rate)?;
170 }
171 PayloadPropertyValue::FundingTimestamp(timestamp) => {
172 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
173 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
174 }
175 &PayloadPropertyValue::FundingRateInterval(interval) => {
176 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
177 write_option_duration::<BO>(&mut writer, interval)?;
178 }
179 }
180 }
181 }
182 Ok(())
183 }
184
185 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
186 Self::deserialize::<LE>(Cursor::new(data))
187 }
188
189 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
190 Self::deserialize::<BE>(Cursor::new(data))
191 }
192
193 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
194 let magic = reader.read_u32::<BO>()?;
195 if magic != PAYLOAD_FORMAT_MAGIC {
196 bail!("magic mismatch");
197 }
198 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
199 let channel_id = ChannelId(reader.read_u8()?);
200 let num_feeds = reader.read_u8()?;
201 let mut feeds = Vec::with_capacity(num_feeds.into());
202 for _ in 0..num_feeds {
203 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
204 let num_properties = reader.read_u8()?;
205 let mut feed = PayloadFeedData {
206 feed_id,
207 properties: Vec::with_capacity(num_properties.into()),
208 };
209 for _ in 0..num_properties {
210 let property = reader.read_u8()?;
211 let value = if property == PriceFeedProperty::Price as u8 {
212 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
213 } else if property == PriceFeedProperty::BestBidPrice as u8 {
214 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
215 } else if property == PriceFeedProperty::BestAskPrice as u8 {
216 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
217 } else if property == PriceFeedProperty::PublisherCount as u8 {
218 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
219 } else if property == PriceFeedProperty::Exponent as u8 {
220 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
221 } else if property == PriceFeedProperty::Confidence as u8 {
222 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
223 } else if property == PriceFeedProperty::FundingRate as u8 {
224 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
225 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
226 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
227 &mut reader,
228 )?)
229 } else if property == PriceFeedProperty::FundingRateInterval as u8 {
230 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
231 &mut reader,
232 )?)
233 } else {
234 bail!("unknown property");
235 };
236 feed.properties.push(value);
237 }
238 feeds.push(feed);
239 }
240 Ok(Self {
241 timestamp_us,
242 channel_id,
243 feeds,
244 })
245 }
246}
247
248fn write_option_price<BO: ByteOrder>(
249 mut writer: impl Write,
250 value: Option<Price>,
251) -> std::io::Result<()> {
252 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
253}
254
255fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
256 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
257 Ok(value.map(Price::from_nonzero_mantissa))
258}
259
260fn write_option_rate<BO: ByteOrder>(
261 mut writer: impl Write,
262 value: Option<Rate>,
263) -> std::io::Result<()> {
264 match value {
265 Some(value) => {
266 writer.write_u8(1)?;
267 writer.write_i64::<BO>(value.mantissa())
268 }
269 None => {
270 writer.write_u8(0)?;
271 Ok(())
272 }
273 }
274}
275
276fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
277 let present = reader.read_u8()? != 0;
278 if present {
279 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
280 } else {
281 Ok(None)
282 }
283}
284
285fn write_option_timestamp<BO: ByteOrder>(
286 mut writer: impl Write,
287 value: Option<TimestampUs>,
288) -> std::io::Result<()> {
289 match value {
290 Some(value) => {
291 writer.write_u8(1)?;
292 writer.write_u64::<BO>(value.as_micros())
293 }
294 None => {
295 writer.write_u8(0)?;
296 Ok(())
297 }
298 }
299}
300
301fn read_option_timestamp<BO: ByteOrder>(
302 mut reader: impl Read,
303) -> std::io::Result<Option<TimestampUs>> {
304 let present = reader.read_u8()? != 0;
305 if present {
306 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
307 } else {
308 Ok(None)
309 }
310}
311
312fn write_option_duration<BO: ByteOrder>(
313 mut writer: impl Write,
314 value: Option<DurationUs>,
315) -> std::io::Result<()> {
316 match value {
317 Some(value) => {
318 writer.write_u8(1)?;
319 writer.write_u64::<BO>(value.as_micros())
320 }
321 None => {
322 writer.write_u8(0)?;
323 Ok(())
324 }
325 }
326}
327
328fn read_option_interval<BO: ByteOrder>(
329 mut reader: impl Read,
330) -> std::io::Result<Option<DurationUs>> {
331 let present = reader.read_u8()? != 0;
332 if present {
333 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
334 } else {
335 Ok(None)
336 }
337}