1use {
2 crate::{
3 api::{MarketSession, ParsedFeedPayload},
4 price::Price,
5 rate::Rate,
6 time::{DurationUs, TimestampUs},
7 ChannelId, PriceFeedId, PriceFeedProperty, PublisherDatapoint, PublisherId,
8 },
9 anyhow::Context,
10};
11use {
12 anyhow::bail,
13 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
14 serde::{Deserialize, Serialize},
15 std::{
16 io::{Cursor, Read, Write},
17 num::NonZeroI64,
18 },
19};
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct PayloadData {
24 pub timestamp_us: TimestampUs,
25 pub channel_id: ChannelId,
26 pub feeds: Vec<PayloadFeedData>,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct PayloadFeedData {
32 pub feed_id: PriceFeedId,
33 pub properties: Vec<PayloadPropertyValue>,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub enum PayloadPropertyValue {
39 Price(Option<Price>),
40 BestBidPrice(Option<Price>),
41 BestAskPrice(Option<Price>),
42 PublisherCount(u16),
43 Exponent(i16),
44 Confidence(Option<Price>),
45 FundingRate(Option<Rate>),
46 FundingTimestamp(Option<TimestampUs>),
47 FundingRateInterval(Option<DurationUs>),
48 MarketSession(MarketSession),
49 EmaPrice(Option<Price>),
50 EmaConfidence(Option<Price>),
51 FeedUpdateTimestamp(Option<TimestampUs>),
52}
53
54impl TryFrom<(PriceFeedProperty, &ParsedFeedPayload)> for PayloadPropertyValue {
55 type Error = anyhow::Error;
56
57 fn try_from((property, feed): (PriceFeedProperty, &ParsedFeedPayload)) -> anyhow::Result<Self> {
58 let missing =
59 || anyhow::anyhow!("ParsedFeedPayload is missing required property: {property:?}");
60 Ok(match property {
61 PriceFeedProperty::Price => Self::Price(feed.price),
62 PriceFeedProperty::BestBidPrice => Self::BestBidPrice(feed.best_bid_price),
63 PriceFeedProperty::BestAskPrice => Self::BestAskPrice(feed.best_ask_price),
64 PriceFeedProperty::PublisherCount => {
65 Self::PublisherCount(feed.publisher_count.ok_or_else(missing)?)
66 }
67 PriceFeedProperty::Exponent => Self::Exponent(feed.exponent.ok_or_else(missing)?),
68 PriceFeedProperty::Confidence => Self::Confidence(feed.confidence),
69 PriceFeedProperty::FundingRate => Self::FundingRate(feed.funding_rate),
70 PriceFeedProperty::FundingTimestamp => Self::FundingTimestamp(feed.funding_timestamp),
71 PriceFeedProperty::FundingRateInterval => {
72 Self::FundingRateInterval(feed.funding_rate_interval)
73 }
74 PriceFeedProperty::MarketSession => {
75 Self::MarketSession(feed.market_session.ok_or_else(missing)?)
76 }
77 PriceFeedProperty::EmaPrice => Self::EmaPrice(feed.ema_price),
78 PriceFeedProperty::EmaConfidence => Self::EmaConfidence(feed.ema_confidence),
79 PriceFeedProperty::FeedUpdateTimestamp => {
80 Self::FeedUpdateTimestamp(feed.feed_update_timestamp)
81 }
82 })
83 }
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
87pub struct AggregatedPriceFeedData {
88 pub price: Option<Price>,
89 pub best_bid_price: Option<Price>,
90 pub best_ask_price: Option<Price>,
91 pub publisher_count: u16,
92 pub exponent: i16,
93 pub confidence: Option<Price>,
94 pub funding_rate: Option<Rate>,
95 pub funding_timestamp: Option<TimestampUs>,
96 pub funding_rate_interval: Option<DurationUs>,
97 pub market_session: MarketSession,
98 pub ema_price: Option<Price>,
99 pub ema_confidence: Option<Price>,
100 pub feed_update_timestamp: Option<TimestampUs>,
101 pub publisher_ids: Vec<PublisherId>,
102 pub publisher_data: Vec<PublisherDatapoint>,
103}
104
105impl AggregatedPriceFeedData {
106 pub fn empty(exponent: i16, market_session: MarketSession, now: TimestampUs) -> Self {
107 Self {
108 price: None,
109 best_bid_price: None,
110 best_ask_price: None,
111 publisher_count: 0,
112 exponent,
113 confidence: None,
114 funding_rate: None,
115 funding_timestamp: None,
116 funding_rate_interval: None,
117 market_session,
118 ema_price: None,
119 ema_confidence: None,
120 feed_update_timestamp: Some(now),
121 publisher_ids: Vec::new(),
122 publisher_data: Vec::new(),
123 }
124 }
125}
126
127pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
130
131impl PayloadData {
132 pub fn new(
133 timestamp_us: TimestampUs,
134 channel_id: ChannelId,
135 feeds: &[(PriceFeedId, AggregatedPriceFeedData)],
136 requested_properties: &[PriceFeedProperty],
137 ) -> Self {
138 Self {
139 timestamp_us,
140 channel_id,
141 feeds: feeds
142 .iter()
143 .map(|(feed_id, feed)| PayloadFeedData {
144 feed_id: *feed_id,
145 properties: requested_properties
146 .iter()
147 .map(|property| match property {
148 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
149 PriceFeedProperty::BestBidPrice => {
150 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
151 }
152 PriceFeedProperty::BestAskPrice => {
153 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
154 }
155 PriceFeedProperty::PublisherCount => {
156 PayloadPropertyValue::PublisherCount(feed.publisher_count)
157 }
158 PriceFeedProperty::Exponent => {
159 PayloadPropertyValue::Exponent(feed.exponent)
160 }
161 PriceFeedProperty::Confidence => {
162 PayloadPropertyValue::Confidence(feed.confidence)
163 }
164 PriceFeedProperty::FundingRate => {
165 PayloadPropertyValue::FundingRate(feed.funding_rate)
166 }
167 PriceFeedProperty::FundingTimestamp => {
168 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
169 }
170 PriceFeedProperty::FundingRateInterval => {
171 PayloadPropertyValue::FundingRateInterval(
172 feed.funding_rate_interval,
173 )
174 }
175 PriceFeedProperty::MarketSession => {
176 PayloadPropertyValue::MarketSession(feed.market_session)
177 }
178 PriceFeedProperty::EmaPrice => {
179 PayloadPropertyValue::EmaPrice(feed.ema_price)
180 }
181 PriceFeedProperty::EmaConfidence => {
182 PayloadPropertyValue::EmaConfidence(feed.ema_confidence)
183 }
184 PriceFeedProperty::FeedUpdateTimestamp => {
185 PayloadPropertyValue::FeedUpdateTimestamp(
186 feed.feed_update_timestamp,
187 )
188 }
189 })
190 .collect(),
191 })
192 .collect(),
193 }
194 }
195
196 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
197 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
198 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
199 writer.write_u8(self.channel_id.0)?;
200 writer.write_u8(self.feeds.len().try_into()?)?;
201 for feed in &self.feeds {
202 writer.write_u32::<BO>(feed.feed_id.0)?;
203 writer.write_u8(feed.properties.len().try_into()?)?;
204 for property in &feed.properties {
205 match property {
206 PayloadPropertyValue::Price(price) => {
207 writer.write_u8(PriceFeedProperty::Price as u8)?;
208 write_option_price::<BO>(&mut writer, *price)?;
209 }
210 PayloadPropertyValue::BestBidPrice(price) => {
211 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
212 write_option_price::<BO>(&mut writer, *price)?;
213 }
214 PayloadPropertyValue::BestAskPrice(price) => {
215 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
216 write_option_price::<BO>(&mut writer, *price)?;
217 }
218 PayloadPropertyValue::PublisherCount(count) => {
219 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
220 writer.write_u16::<BO>(*count)?;
221 }
222 PayloadPropertyValue::Exponent(exponent) => {
223 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
224 writer.write_i16::<BO>(*exponent)?;
225 }
226 PayloadPropertyValue::Confidence(confidence) => {
227 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
228 write_option_price::<BO>(&mut writer, *confidence)?;
229 }
230 PayloadPropertyValue::FundingRate(rate) => {
231 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
232 write_option_rate::<BO>(&mut writer, *rate)?;
233 }
234 PayloadPropertyValue::FundingTimestamp(timestamp) => {
235 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
236 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
237 }
238 PayloadPropertyValue::FundingRateInterval(interval) => {
239 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
240 write_option_duration::<BO>(&mut writer, *interval)?;
241 }
242 PayloadPropertyValue::MarketSession(market_session) => {
243 writer.write_u8(PriceFeedProperty::MarketSession as u8)?;
244 writer.write_i16::<BO>((*market_session).into())?;
245 }
246 PayloadPropertyValue::EmaPrice(ema_price) => {
247 writer.write_u8(PriceFeedProperty::EmaPrice as u8)?;
248 write_option_price::<BO>(&mut writer, *ema_price)?;
249 }
250 PayloadPropertyValue::EmaConfidence(ema_confidence) => {
251 writer.write_u8(PriceFeedProperty::EmaConfidence as u8)?;
252 write_option_price::<BO>(&mut writer, *ema_confidence)?;
253 }
254 PayloadPropertyValue::FeedUpdateTimestamp(feed_update_timestamp) => {
255 writer.write_u8(PriceFeedProperty::FeedUpdateTimestamp as u8)?;
256 write_option_timestamp::<BO>(&mut writer, *feed_update_timestamp)?;
257 }
258 }
259 }
260 }
261 Ok(())
262 }
263
264 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
265 Self::deserialize::<LE>(Cursor::new(data))
266 }
267
268 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
269 Self::deserialize::<BE>(Cursor::new(data))
270 }
271
272 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
273 let magic = reader.read_u32::<BO>()?;
274 if magic != PAYLOAD_FORMAT_MAGIC {
275 bail!("magic mismatch");
276 }
277 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
278 let channel_id = ChannelId(reader.read_u8()?);
279 let num_feeds = reader.read_u8()?;
280 let mut feeds = Vec::with_capacity(num_feeds.into());
281 for _ in 0..num_feeds {
282 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
283 let num_properties = reader.read_u8()?;
284 let mut feed = PayloadFeedData {
285 feed_id,
286 properties: Vec::with_capacity(num_properties.into()),
287 };
288 for _ in 0..num_properties {
289 let property = reader.read_u8()?;
290 let property =
291 PriceFeedProperty::from_repr(property).context("unknown property")?;
292 let value = match property {
293 PriceFeedProperty::Price => {
294 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
295 }
296 PriceFeedProperty::BestBidPrice => {
297 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
298 }
299 PriceFeedProperty::BestAskPrice => {
300 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
301 }
302 PriceFeedProperty::PublisherCount => {
303 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
304 }
305 PriceFeedProperty::Exponent => {
306 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
307 }
308 PriceFeedProperty::Confidence => {
309 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
310 }
311 PriceFeedProperty::FundingRate => {
312 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
313 }
314 PriceFeedProperty::FundingTimestamp => PayloadPropertyValue::FundingTimestamp(
315 read_option_timestamp::<BO>(&mut reader)?,
316 ),
317 PriceFeedProperty::FundingRateInterval => {
318 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
319 &mut reader,
320 )?)
321 }
322 PriceFeedProperty::MarketSession => {
323 PayloadPropertyValue::MarketSession(reader.read_i16::<BO>()?.try_into()?)
324 }
325 PriceFeedProperty::EmaPrice => {
326 PayloadPropertyValue::EmaPrice(read_option_price::<BO>(&mut reader)?)
327 }
328 PriceFeedProperty::EmaConfidence => {
329 PayloadPropertyValue::EmaConfidence(read_option_price::<BO>(&mut reader)?)
330 }
331 PriceFeedProperty::FeedUpdateTimestamp => {
332 PayloadPropertyValue::FeedUpdateTimestamp(read_option_timestamp::<BO>(
333 &mut reader,
334 )?)
335 }
336 };
337 feed.properties.push(value);
338 }
339 feeds.push(feed);
340 }
341 Ok(Self {
342 timestamp_us,
343 channel_id,
344 feeds,
345 })
346 }
347}
348
349fn write_option_price<BO: ByteOrder>(
350 mut writer: impl Write,
351 value: Option<Price>,
352) -> std::io::Result<()> {
353 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
354}
355
356fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
357 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
358 Ok(value.map(Price::from_nonzero_mantissa))
359}
360
361fn write_option_rate<BO: ByteOrder>(
362 mut writer: impl Write,
363 value: Option<Rate>,
364) -> std::io::Result<()> {
365 match value {
366 Some(value) => {
367 writer.write_u8(1)?;
368 writer.write_i64::<BO>(value.mantissa())
369 }
370 None => {
371 writer.write_u8(0)?;
372 Ok(())
373 }
374 }
375}
376
377fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
378 let present = reader.read_u8()? != 0;
379 if present {
380 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
381 } else {
382 Ok(None)
383 }
384}
385
386fn write_option_timestamp<BO: ByteOrder>(
387 mut writer: impl Write,
388 value: Option<TimestampUs>,
389) -> std::io::Result<()> {
390 match value {
391 Some(value) => {
392 writer.write_u8(1)?;
393 writer.write_u64::<BO>(value.as_micros())
394 }
395 None => {
396 writer.write_u8(0)?;
397 Ok(())
398 }
399 }
400}
401
402fn read_option_timestamp<BO: ByteOrder>(
403 mut reader: impl Read,
404) -> std::io::Result<Option<TimestampUs>> {
405 let present = reader.read_u8()? != 0;
406 if present {
407 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
408 } else {
409 Ok(None)
410 }
411}
412
413fn write_option_duration<BO: ByteOrder>(
414 mut writer: impl Write,
415 value: Option<DurationUs>,
416) -> std::io::Result<()> {
417 match value {
418 Some(value) => {
419 writer.write_u8(1)?;
420 writer.write_u64::<BO>(value.as_micros())
421 }
422 None => {
423 writer.write_u8(0)?;
424 Ok(())
425 }
426 }
427}
428
429fn read_option_interval<BO: ByteOrder>(
430 mut reader: impl Read,
431) -> std::io::Result<Option<DurationUs>> {
432 let present = reader.read_u8()? != 0;
433 if present {
434 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
435 } else {
436 Ok(None)
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use crate::{
443 api::MarketSession,
444 message::SolanaMessage,
445 payload::{PayloadData, PayloadPropertyValue},
446 time::TimestampUs,
447 ChannelId, Price, PriceFeedId,
448 };
449
450 #[test]
451 fn parse_payload() {
452 let payload =
453 "b9011a82c7887f3aaa5845b20d6bf5ca6609953b57650fa4579a4b4d34a4980ba608a9f76a825a446\
454 f3d6c1fd9daca1c5e3fc46980f14ef89c1a886c6e9e5c510872d30f80efc1f480c5615af3fb673d422\
455 87e993da9fbc3506b6e41dfa32950820c2e6c620075d3c7934077d115064b06000301010000000d009\
456 032fc171b060000014e3a0cff1a06000002bcda8a211b06000003120004f8ff05fc0b7159000000000\
457 600070008000900000aa06616362e0600000b804f93312e0600000c014077d115064b0600";
458 let message = SolanaMessage::deserialize_slice(&hex::decode(payload).unwrap()).unwrap();
459 let payload = PayloadData::deserialize_slice_le(&message.payload).unwrap();
460 assert_eq!(
461 payload.timestamp_us,
462 TimestampUs::from_micros(1771339368200000)
463 );
464 assert_eq!(payload.channel_id, ChannelId::FIXED_RATE_200);
465 assert_eq!(payload.feeds.len(), 1);
466 let feed = &payload.feeds[0];
467 assert_eq!(feed.feed_id, PriceFeedId(1));
468 assert_eq!(feed.properties.len(), 13);
469 assert_eq!(
470 feed.properties[0],
471 PayloadPropertyValue::Price(Some(Price::from_mantissa(6713436287632).unwrap()))
472 );
473 assert_eq!(
474 feed.properties[1],
475 PayloadPropertyValue::BestBidPrice(Some(Price::from_mantissa(6713017907790).unwrap()))
476 );
477 assert_eq!(
478 feed.properties[2],
479 PayloadPropertyValue::BestAskPrice(Some(Price::from_mantissa(6713596631740).unwrap()))
480 );
481 assert_eq!(feed.properties[3], PayloadPropertyValue::PublisherCount(18));
482 assert_eq!(feed.properties[4], PayloadPropertyValue::Exponent(-8));
483 assert_eq!(
484 feed.properties[5],
485 PayloadPropertyValue::Confidence(Some(Price::from_mantissa(1500580860).unwrap()))
486 );
487 assert_eq!(feed.properties[6], PayloadPropertyValue::FundingRate(None));
488 assert_eq!(
489 feed.properties[7],
490 PayloadPropertyValue::FundingTimestamp(None)
491 );
492 assert_eq!(
493 feed.properties[8],
494 PayloadPropertyValue::FundingRateInterval(None)
495 );
496 assert_eq!(
497 feed.properties[9],
498 PayloadPropertyValue::MarketSession(MarketSession::Regular)
499 );
500 assert_eq!(
501 feed.properties[10],
502 PayloadPropertyValue::EmaPrice(Some(Price::from_mantissa(6795545700000).unwrap()))
503 );
504 assert_eq!(
505 feed.properties[11],
506 PayloadPropertyValue::EmaConfidence(Some(Price::from_mantissa(6795470000000).unwrap()))
507 );
508 assert_eq!(
509 feed.properties[12],
510 PayloadPropertyValue::FeedUpdateTimestamp(Some(TimestampUs::from_micros(
511 1771339368200000
512 )))
513 );
514 }
515}