1use {
2 crate::{
3 api::MarketSession,
4 price::Price,
5 rate::Rate,
6 time::{DurationUs, TimestampUs},
7 ChannelId, PriceFeedId, PriceFeedProperty, PublisherDatapoint, PublisherId,
8 },
9 anyhow::Context,
10};
11use {
12 anyhow::bail,
13 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
14 serde::{Deserialize, Serialize},
15 std::{
16 io::{Cursor, Read, Write},
17 num::NonZeroI64,
18 },
19};
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct PayloadData {
24 pub timestamp_us: TimestampUs,
25 pub channel_id: ChannelId,
26 pub feeds: Vec<PayloadFeedData>,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct PayloadFeedData {
32 pub feed_id: PriceFeedId,
33 pub properties: Vec<PayloadPropertyValue>,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub enum PayloadPropertyValue {
39 Price(Option<Price>),
40 BestBidPrice(Option<Price>),
41 BestAskPrice(Option<Price>),
42 PublisherCount(u16),
43 Exponent(i16),
44 Confidence(Option<Price>),
45 FundingRate(Option<Rate>),
46 FundingTimestamp(Option<TimestampUs>),
47 FundingRateInterval(Option<DurationUs>),
48 MarketSession(MarketSession),
49 EmaPrice(Option<Price>),
50 EmaConfidence(Option<Price>),
51 FeedUpdateTimestamp(Option<TimestampUs>),
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
55pub struct AggregatedPriceFeedData {
56 pub price: Option<Price>,
57 pub best_bid_price: Option<Price>,
58 pub best_ask_price: Option<Price>,
59 pub publisher_count: u16,
60 pub exponent: i16,
61 pub confidence: Option<Price>,
62 pub funding_rate: Option<Rate>,
63 pub funding_timestamp: Option<TimestampUs>,
64 pub funding_rate_interval: Option<DurationUs>,
65 pub market_session: MarketSession,
66 pub ema_price: Option<Price>,
67 pub ema_confidence: Option<Price>,
68 pub feed_update_timestamp: Option<TimestampUs>,
69 pub publisher_ids: Vec<PublisherId>,
70 pub publisher_data: Vec<PublisherDatapoint>,
71}
72
73impl AggregatedPriceFeedData {
74 pub fn empty(exponent: i16, market_session: MarketSession, now: TimestampUs) -> Self {
75 Self {
76 price: None,
77 best_bid_price: None,
78 best_ask_price: None,
79 publisher_count: 0,
80 exponent,
81 confidence: None,
82 funding_rate: None,
83 funding_timestamp: None,
84 funding_rate_interval: None,
85 market_session,
86 ema_price: None,
87 ema_confidence: None,
88 feed_update_timestamp: Some(now),
89 publisher_ids: Vec::new(),
90 publisher_data: Vec::new(),
91 }
92 }
93}
94
95pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
98
99impl PayloadData {
100 pub fn new(
101 timestamp_us: TimestampUs,
102 channel_id: ChannelId,
103 feeds: &[(PriceFeedId, AggregatedPriceFeedData)],
104 requested_properties: &[PriceFeedProperty],
105 ) -> Self {
106 Self {
107 timestamp_us,
108 channel_id,
109 feeds: feeds
110 .iter()
111 .map(|(feed_id, feed)| PayloadFeedData {
112 feed_id: *feed_id,
113 properties: requested_properties
114 .iter()
115 .map(|property| match property {
116 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
117 PriceFeedProperty::BestBidPrice => {
118 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
119 }
120 PriceFeedProperty::BestAskPrice => {
121 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
122 }
123 PriceFeedProperty::PublisherCount => {
124 PayloadPropertyValue::PublisherCount(feed.publisher_count)
125 }
126 PriceFeedProperty::Exponent => {
127 PayloadPropertyValue::Exponent(feed.exponent)
128 }
129 PriceFeedProperty::Confidence => {
130 PayloadPropertyValue::Confidence(feed.confidence)
131 }
132 PriceFeedProperty::FundingRate => {
133 PayloadPropertyValue::FundingRate(feed.funding_rate)
134 }
135 PriceFeedProperty::FundingTimestamp => {
136 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
137 }
138 PriceFeedProperty::FundingRateInterval => {
139 PayloadPropertyValue::FundingRateInterval(
140 feed.funding_rate_interval,
141 )
142 }
143 PriceFeedProperty::MarketSession => {
144 PayloadPropertyValue::MarketSession(feed.market_session)
145 }
146 PriceFeedProperty::EmaPrice => {
147 PayloadPropertyValue::EmaPrice(feed.ema_price)
148 }
149 PriceFeedProperty::EmaConfidence => {
150 PayloadPropertyValue::EmaConfidence(feed.ema_confidence)
151 }
152 PriceFeedProperty::FeedUpdateTimestamp => {
153 PayloadPropertyValue::FeedUpdateTimestamp(
154 feed.feed_update_timestamp,
155 )
156 }
157 })
158 .collect(),
159 })
160 .collect(),
161 }
162 }
163
164 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
165 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
166 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
167 writer.write_u8(self.channel_id.0)?;
168 writer.write_u8(self.feeds.len().try_into()?)?;
169 for feed in &self.feeds {
170 writer.write_u32::<BO>(feed.feed_id.0)?;
171 writer.write_u8(feed.properties.len().try_into()?)?;
172 for property in &feed.properties {
173 match property {
174 PayloadPropertyValue::Price(price) => {
175 writer.write_u8(PriceFeedProperty::Price as u8)?;
176 write_option_price::<BO>(&mut writer, *price)?;
177 }
178 PayloadPropertyValue::BestBidPrice(price) => {
179 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
180 write_option_price::<BO>(&mut writer, *price)?;
181 }
182 PayloadPropertyValue::BestAskPrice(price) => {
183 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
184 write_option_price::<BO>(&mut writer, *price)?;
185 }
186 PayloadPropertyValue::PublisherCount(count) => {
187 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
188 writer.write_u16::<BO>(*count)?;
189 }
190 PayloadPropertyValue::Exponent(exponent) => {
191 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
192 writer.write_i16::<BO>(*exponent)?;
193 }
194 PayloadPropertyValue::Confidence(confidence) => {
195 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
196 write_option_price::<BO>(&mut writer, *confidence)?;
197 }
198 PayloadPropertyValue::FundingRate(rate) => {
199 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
200 write_option_rate::<BO>(&mut writer, *rate)?;
201 }
202 PayloadPropertyValue::FundingTimestamp(timestamp) => {
203 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
204 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
205 }
206 PayloadPropertyValue::FundingRateInterval(interval) => {
207 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
208 write_option_duration::<BO>(&mut writer, *interval)?;
209 }
210 PayloadPropertyValue::MarketSession(market_session) => {
211 writer.write_u8(PriceFeedProperty::MarketSession as u8)?;
212 writer.write_i16::<BO>((*market_session).into())?;
213 }
214 PayloadPropertyValue::EmaPrice(ema_price) => {
215 writer.write_u8(PriceFeedProperty::EmaPrice as u8)?;
216 write_option_price::<BO>(&mut writer, *ema_price)?;
217 }
218 PayloadPropertyValue::EmaConfidence(ema_confidence) => {
219 writer.write_u8(PriceFeedProperty::EmaConfidence as u8)?;
220 write_option_price::<BO>(&mut writer, *ema_confidence)?;
221 }
222 PayloadPropertyValue::FeedUpdateTimestamp(feed_update_timestamp) => {
223 writer.write_u8(PriceFeedProperty::FeedUpdateTimestamp as u8)?;
224 write_option_timestamp::<BO>(&mut writer, *feed_update_timestamp)?;
225 }
226 }
227 }
228 }
229 Ok(())
230 }
231
232 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
233 Self::deserialize::<LE>(Cursor::new(data))
234 }
235
236 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
237 Self::deserialize::<BE>(Cursor::new(data))
238 }
239
240 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
241 let magic = reader.read_u32::<BO>()?;
242 if magic != PAYLOAD_FORMAT_MAGIC {
243 bail!("magic mismatch");
244 }
245 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
246 let channel_id = ChannelId(reader.read_u8()?);
247 let num_feeds = reader.read_u8()?;
248 let mut feeds = Vec::with_capacity(num_feeds.into());
249 for _ in 0..num_feeds {
250 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
251 let num_properties = reader.read_u8()?;
252 let mut feed = PayloadFeedData {
253 feed_id,
254 properties: Vec::with_capacity(num_properties.into()),
255 };
256 for _ in 0..num_properties {
257 let property = reader.read_u8()?;
258 let property =
259 PriceFeedProperty::from_repr(property).context("unknown property")?;
260 let value = match property {
261 PriceFeedProperty::Price => {
262 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
263 }
264 PriceFeedProperty::BestBidPrice => {
265 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
266 }
267 PriceFeedProperty::BestAskPrice => {
268 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
269 }
270 PriceFeedProperty::PublisherCount => {
271 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
272 }
273 PriceFeedProperty::Exponent => {
274 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
275 }
276 PriceFeedProperty::Confidence => {
277 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
278 }
279 PriceFeedProperty::FundingRate => {
280 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
281 }
282 PriceFeedProperty::FundingTimestamp => PayloadPropertyValue::FundingTimestamp(
283 read_option_timestamp::<BO>(&mut reader)?,
284 ),
285 PriceFeedProperty::FundingRateInterval => {
286 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
287 &mut reader,
288 )?)
289 }
290 PriceFeedProperty::MarketSession => {
291 PayloadPropertyValue::MarketSession(reader.read_i16::<BO>()?.try_into()?)
292 }
293 PriceFeedProperty::EmaPrice => {
294 PayloadPropertyValue::EmaPrice(read_option_price::<BO>(&mut reader)?)
295 }
296 PriceFeedProperty::EmaConfidence => {
297 PayloadPropertyValue::EmaConfidence(read_option_price::<BO>(&mut reader)?)
298 }
299 PriceFeedProperty::FeedUpdateTimestamp => {
300 PayloadPropertyValue::FeedUpdateTimestamp(read_option_timestamp::<BO>(
301 &mut reader,
302 )?)
303 }
304 };
305 feed.properties.push(value);
306 }
307 feeds.push(feed);
308 }
309 Ok(Self {
310 timestamp_us,
311 channel_id,
312 feeds,
313 })
314 }
315}
316
317fn write_option_price<BO: ByteOrder>(
318 mut writer: impl Write,
319 value: Option<Price>,
320) -> std::io::Result<()> {
321 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
322}
323
324fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
325 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
326 Ok(value.map(Price::from_nonzero_mantissa))
327}
328
329fn write_option_rate<BO: ByteOrder>(
330 mut writer: impl Write,
331 value: Option<Rate>,
332) -> std::io::Result<()> {
333 match value {
334 Some(value) => {
335 writer.write_u8(1)?;
336 writer.write_i64::<BO>(value.mantissa())
337 }
338 None => {
339 writer.write_u8(0)?;
340 Ok(())
341 }
342 }
343}
344
345fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
346 let present = reader.read_u8()? != 0;
347 if present {
348 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
349 } else {
350 Ok(None)
351 }
352}
353
354fn write_option_timestamp<BO: ByteOrder>(
355 mut writer: impl Write,
356 value: Option<TimestampUs>,
357) -> std::io::Result<()> {
358 match value {
359 Some(value) => {
360 writer.write_u8(1)?;
361 writer.write_u64::<BO>(value.as_micros())
362 }
363 None => {
364 writer.write_u8(0)?;
365 Ok(())
366 }
367 }
368}
369
370fn read_option_timestamp<BO: ByteOrder>(
371 mut reader: impl Read,
372) -> std::io::Result<Option<TimestampUs>> {
373 let present = reader.read_u8()? != 0;
374 if present {
375 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
376 } else {
377 Ok(None)
378 }
379}
380
381fn write_option_duration<BO: ByteOrder>(
382 mut writer: impl Write,
383 value: Option<DurationUs>,
384) -> std::io::Result<()> {
385 match value {
386 Some(value) => {
387 writer.write_u8(1)?;
388 writer.write_u64::<BO>(value.as_micros())
389 }
390 None => {
391 writer.write_u8(0)?;
392 Ok(())
393 }
394 }
395}
396
397fn read_option_interval<BO: ByteOrder>(
398 mut reader: impl Read,
399) -> std::io::Result<Option<DurationUs>> {
400 let present = reader.read_u8()? != 0;
401 if present {
402 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
403 } else {
404 Ok(None)
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use crate::{
411 api::MarketSession,
412 message::SolanaMessage,
413 payload::{PayloadData, PayloadPropertyValue},
414 time::TimestampUs,
415 ChannelId, Price, PriceFeedId,
416 };
417
418 #[test]
419 fn parse_payload() {
420 let payload =
421 "b9011a82c7887f3aaa5845b20d6bf5ca6609953b57650fa4579a4b4d34a4980ba608a9f76a825a446\
422 f3d6c1fd9daca1c5e3fc46980f14ef89c1a886c6e9e5c510872d30f80efc1f480c5615af3fb673d422\
423 87e993da9fbc3506b6e41dfa32950820c2e6c620075d3c7934077d115064b06000301010000000d009\
424 032fc171b060000014e3a0cff1a06000002bcda8a211b06000003120004f8ff05fc0b7159000000000\
425 600070008000900000aa06616362e0600000b804f93312e0600000c014077d115064b0600";
426 let message = SolanaMessage::deserialize_slice(&hex::decode(payload).unwrap()).unwrap();
427 let payload = PayloadData::deserialize_slice_le(&message.payload).unwrap();
428 assert_eq!(
429 payload.timestamp_us,
430 TimestampUs::from_micros(1771339368200000)
431 );
432 assert_eq!(payload.channel_id, ChannelId::FIXED_RATE_200);
433 assert_eq!(payload.feeds.len(), 1);
434 let feed = &payload.feeds[0];
435 assert_eq!(feed.feed_id, PriceFeedId(1));
436 assert_eq!(feed.properties.len(), 13);
437 assert_eq!(
438 feed.properties[0],
439 PayloadPropertyValue::Price(Some(Price::from_mantissa(6713436287632).unwrap()))
440 );
441 assert_eq!(
442 feed.properties[1],
443 PayloadPropertyValue::BestBidPrice(Some(Price::from_mantissa(6713017907790).unwrap()))
444 );
445 assert_eq!(
446 feed.properties[2],
447 PayloadPropertyValue::BestAskPrice(Some(Price::from_mantissa(6713596631740).unwrap()))
448 );
449 assert_eq!(feed.properties[3], PayloadPropertyValue::PublisherCount(18));
450 assert_eq!(feed.properties[4], PayloadPropertyValue::Exponent(-8));
451 assert_eq!(
452 feed.properties[5],
453 PayloadPropertyValue::Confidence(Some(Price::from_mantissa(1500580860).unwrap()))
454 );
455 assert_eq!(feed.properties[6], PayloadPropertyValue::FundingRate(None));
456 assert_eq!(
457 feed.properties[7],
458 PayloadPropertyValue::FundingTimestamp(None)
459 );
460 assert_eq!(
461 feed.properties[8],
462 PayloadPropertyValue::FundingRateInterval(None)
463 );
464 assert_eq!(
465 feed.properties[9],
466 PayloadPropertyValue::MarketSession(MarketSession::Regular)
467 );
468 assert_eq!(
469 feed.properties[10],
470 PayloadPropertyValue::EmaPrice(Some(Price::from_mantissa(6795545700000).unwrap()))
471 );
472 assert_eq!(
473 feed.properties[11],
474 PayloadPropertyValue::EmaConfidence(Some(Price::from_mantissa(6795470000000).unwrap()))
475 );
476 assert_eq!(
477 feed.properties[12],
478 PayloadPropertyValue::FeedUpdateTimestamp(Some(TimestampUs::from_micros(
479 1771339368200000
480 )))
481 );
482 }
483}