1use crate::{
2 api::MarketSession,
3 price::Price,
4 rate::Rate,
5 time::{DurationUs, TimestampUs},
6 ChannelId, PriceFeedId, PriceFeedProperty,
7};
8use {
9 anyhow::bail,
10 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
11 serde::{Deserialize, Serialize},
12 std::{
13 io::{Cursor, Read, Write},
14 num::NonZeroI64,
15 },
16};
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct PayloadData {
21 pub timestamp_us: TimestampUs,
22 pub channel_id: ChannelId,
23 pub feeds: Vec<PayloadFeedData>,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct PayloadFeedData {
29 pub feed_id: PriceFeedId,
30 pub properties: Vec<PayloadPropertyValue>,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub enum PayloadPropertyValue {
36 Price(Option<Price>),
37 BestBidPrice(Option<Price>),
38 BestAskPrice(Option<Price>),
39 PublisherCount(u16),
40 Exponent(i16),
41 Confidence(Option<Price>),
42 FundingRate(Option<Rate>),
43 FundingTimestamp(Option<TimestampUs>),
44 FundingRateInterval(Option<DurationUs>),
45 MarketSession(MarketSession),
46 EmaPrice(Option<Price>),
47 EmaConfidence(Option<Price>),
48 FeedUpdateTimestamp(Option<TimestampUs>),
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
52pub struct AggregatedPriceFeedData {
53 pub price: Option<Price>,
54 pub best_bid_price: Option<Price>,
55 pub best_ask_price: Option<Price>,
56 pub publisher_count: u16,
57 pub exponent: i16,
58 pub confidence: Option<Price>,
59 pub funding_rate: Option<Rate>,
60 pub funding_timestamp: Option<TimestampUs>,
61 pub funding_rate_interval: Option<DurationUs>,
62 pub market_session: MarketSession,
63 pub ema_price: Option<Price>,
64 pub ema_confidence: Option<Price>,
65 pub feed_update_timestamp: Option<TimestampUs>,
66}
67
68impl AggregatedPriceFeedData {
69 pub fn empty(exponent: i16, market_session: MarketSession, now: TimestampUs) -> Self {
70 Self {
71 price: None,
72 best_bid_price: None,
73 best_ask_price: None,
74 publisher_count: 0,
75 exponent,
76 confidence: None,
77 funding_rate: None,
78 funding_timestamp: None,
79 funding_rate_interval: None,
80 market_session,
81 ema_price: None,
82 ema_confidence: None,
83 feed_update_timestamp: Some(now),
84 }
85 }
86}
87
88pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
91
92impl PayloadData {
93 pub fn new(
94 timestamp_us: TimestampUs,
95 channel_id: ChannelId,
96 feeds: &[(PriceFeedId, AggregatedPriceFeedData)],
97 requested_properties: &[PriceFeedProperty],
98 ) -> Self {
99 Self {
100 timestamp_us,
101 channel_id,
102 feeds: feeds
103 .iter()
104 .map(|(feed_id, feed)| PayloadFeedData {
105 feed_id: *feed_id,
106 properties: requested_properties
107 .iter()
108 .map(|property| match property {
109 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
110 PriceFeedProperty::BestBidPrice => {
111 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
112 }
113 PriceFeedProperty::BestAskPrice => {
114 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
115 }
116 PriceFeedProperty::PublisherCount => {
117 PayloadPropertyValue::PublisherCount(feed.publisher_count)
118 }
119 PriceFeedProperty::Exponent => {
120 PayloadPropertyValue::Exponent(feed.exponent)
121 }
122 PriceFeedProperty::Confidence => {
123 PayloadPropertyValue::Confidence(feed.confidence)
124 }
125 PriceFeedProperty::FundingRate => {
126 PayloadPropertyValue::FundingRate(feed.funding_rate)
127 }
128 PriceFeedProperty::FundingTimestamp => {
129 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
130 }
131 PriceFeedProperty::FundingRateInterval => {
132 PayloadPropertyValue::FundingRateInterval(
133 feed.funding_rate_interval,
134 )
135 }
136 PriceFeedProperty::MarketSession => {
137 PayloadPropertyValue::MarketSession(feed.market_session)
138 }
139 PriceFeedProperty::EmaPrice => {
140 PayloadPropertyValue::EmaPrice(feed.ema_price)
141 }
142 PriceFeedProperty::EmaConfidence => {
143 PayloadPropertyValue::EmaConfidence(feed.ema_confidence)
144 }
145 PriceFeedProperty::FeedUpdateTimestamp => {
146 PayloadPropertyValue::FeedUpdateTimestamp(
147 feed.feed_update_timestamp,
148 )
149 }
150 })
151 .collect(),
152 })
153 .collect(),
154 }
155 }
156
157 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
158 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
159 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
160 writer.write_u8(self.channel_id.0)?;
161 writer.write_u8(self.feeds.len().try_into()?)?;
162 for feed in &self.feeds {
163 writer.write_u32::<BO>(feed.feed_id.0)?;
164 writer.write_u8(feed.properties.len().try_into()?)?;
165 for property in &feed.properties {
166 match property {
167 PayloadPropertyValue::Price(price) => {
168 writer.write_u8(PriceFeedProperty::Price as u8)?;
169 write_option_price::<BO>(&mut writer, *price)?;
170 }
171 PayloadPropertyValue::BestBidPrice(price) => {
172 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
173 write_option_price::<BO>(&mut writer, *price)?;
174 }
175 PayloadPropertyValue::BestAskPrice(price) => {
176 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
177 write_option_price::<BO>(&mut writer, *price)?;
178 }
179 PayloadPropertyValue::PublisherCount(count) => {
180 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
181 writer.write_u16::<BO>(*count)?;
182 }
183 PayloadPropertyValue::Exponent(exponent) => {
184 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
185 writer.write_i16::<BO>(*exponent)?;
186 }
187 PayloadPropertyValue::Confidence(confidence) => {
188 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
189 write_option_price::<BO>(&mut writer, *confidence)?;
190 }
191 PayloadPropertyValue::FundingRate(rate) => {
192 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
193 write_option_rate::<BO>(&mut writer, *rate)?;
194 }
195 PayloadPropertyValue::FundingTimestamp(timestamp) => {
196 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
197 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
198 }
199 PayloadPropertyValue::FundingRateInterval(interval) => {
200 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
201 write_option_duration::<BO>(&mut writer, *interval)?;
202 }
203 PayloadPropertyValue::MarketSession(market_session) => {
204 writer.write_u8(PriceFeedProperty::MarketSession as u8)?;
205 writer.write_i16::<BO>((*market_session).into())?;
206 }
207 PayloadPropertyValue::EmaPrice(ema_price) => {
208 writer.write_u8(PriceFeedProperty::EmaPrice as u8)?;
209 write_option_price::<BO>(&mut writer, *ema_price)?;
210 }
211 PayloadPropertyValue::EmaConfidence(ema_confidence) => {
212 writer.write_u8(PriceFeedProperty::EmaConfidence as u8)?;
213 write_option_price::<BO>(&mut writer, *ema_confidence)?;
214 }
215 PayloadPropertyValue::FeedUpdateTimestamp(feed_update_timestamp) => {
216 writer.write_u8(PriceFeedProperty::FeedUpdateTimestamp as u8)?;
217 write_option_timestamp::<BO>(&mut writer, *feed_update_timestamp)?;
218 }
219 }
220 }
221 }
222 Ok(())
223 }
224
225 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
226 Self::deserialize::<LE>(Cursor::new(data))
227 }
228
229 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
230 Self::deserialize::<BE>(Cursor::new(data))
231 }
232
233 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
234 let magic = reader.read_u32::<BO>()?;
235 if magic != PAYLOAD_FORMAT_MAGIC {
236 bail!("magic mismatch");
237 }
238 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
239 let channel_id = ChannelId(reader.read_u8()?);
240 let num_feeds = reader.read_u8()?;
241 let mut feeds = Vec::with_capacity(num_feeds.into());
242 for _ in 0..num_feeds {
243 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
244 let num_properties = reader.read_u8()?;
245 let mut feed = PayloadFeedData {
246 feed_id,
247 properties: Vec::with_capacity(num_properties.into()),
248 };
249 for _ in 0..num_properties {
250 let property = reader.read_u8()?;
251 let value = if property == PriceFeedProperty::Price as u8 {
252 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
253 } else if property == PriceFeedProperty::BestBidPrice as u8 {
254 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
255 } else if property == PriceFeedProperty::BestAskPrice as u8 {
256 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
257 } else if property == PriceFeedProperty::PublisherCount as u8 {
258 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
259 } else if property == PriceFeedProperty::Exponent as u8 {
260 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
261 } else if property == PriceFeedProperty::Confidence as u8 {
262 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
263 } else if property == PriceFeedProperty::FundingRate as u8 {
264 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
265 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
266 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
267 &mut reader,
268 )?)
269 } else if property == PriceFeedProperty::FundingRateInterval as u8 {
270 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
271 &mut reader,
272 )?)
273 } else if property == PriceFeedProperty::MarketSession as u8 {
274 PayloadPropertyValue::MarketSession(reader.read_i16::<BO>()?.try_into()?)
275 } else {
276 bail!("unknown property");
277 };
278 feed.properties.push(value);
279 }
280 feeds.push(feed);
281 }
282 Ok(Self {
283 timestamp_us,
284 channel_id,
285 feeds,
286 })
287 }
288}
289
290fn write_option_price<BO: ByteOrder>(
291 mut writer: impl Write,
292 value: Option<Price>,
293) -> std::io::Result<()> {
294 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
295}
296
297fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
298 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
299 Ok(value.map(Price::from_nonzero_mantissa))
300}
301
302fn write_option_rate<BO: ByteOrder>(
303 mut writer: impl Write,
304 value: Option<Rate>,
305) -> std::io::Result<()> {
306 match value {
307 Some(value) => {
308 writer.write_u8(1)?;
309 writer.write_i64::<BO>(value.mantissa())
310 }
311 None => {
312 writer.write_u8(0)?;
313 Ok(())
314 }
315 }
316}
317
318fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
319 let present = reader.read_u8()? != 0;
320 if present {
321 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
322 } else {
323 Ok(None)
324 }
325}
326
327fn write_option_timestamp<BO: ByteOrder>(
328 mut writer: impl Write,
329 value: Option<TimestampUs>,
330) -> std::io::Result<()> {
331 match value {
332 Some(value) => {
333 writer.write_u8(1)?;
334 writer.write_u64::<BO>(value.as_micros())
335 }
336 None => {
337 writer.write_u8(0)?;
338 Ok(())
339 }
340 }
341}
342
343fn read_option_timestamp<BO: ByteOrder>(
344 mut reader: impl Read,
345) -> std::io::Result<Option<TimestampUs>> {
346 let present = reader.read_u8()? != 0;
347 if present {
348 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
349 } else {
350 Ok(None)
351 }
352}
353
354fn write_option_duration<BO: ByteOrder>(
355 mut writer: impl Write,
356 value: Option<DurationUs>,
357) -> std::io::Result<()> {
358 match value {
359 Some(value) => {
360 writer.write_u8(1)?;
361 writer.write_u64::<BO>(value.as_micros())
362 }
363 None => {
364 writer.write_u8(0)?;
365 Ok(())
366 }
367 }
368}
369
370fn read_option_interval<BO: ByteOrder>(
371 mut reader: impl Read,
372) -> std::io::Result<Option<DurationUs>> {
373 let present = reader.read_u8()? != 0;
374 if present {
375 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
376 } else {
377 Ok(None)
378 }
379}