1use {
2 crate::{
3 api::MarketSession,
4 price::Price,
5 rate::Rate,
6 time::{DurationUs, TimestampUs},
7 ChannelId, PriceFeedId, PriceFeedProperty,
8 },
9 anyhow::Context,
10};
11use {
12 anyhow::bail,
13 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
14 serde::{Deserialize, Serialize},
15 std::{
16 io::{Cursor, Read, Write},
17 num::NonZeroI64,
18 },
19};
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct PayloadData {
24 pub timestamp_us: TimestampUs,
25 pub channel_id: ChannelId,
26 pub feeds: Vec<PayloadFeedData>,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct PayloadFeedData {
32 pub feed_id: PriceFeedId,
33 pub properties: Vec<PayloadPropertyValue>,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub enum PayloadPropertyValue {
39 Price(Option<Price>),
40 BestBidPrice(Option<Price>),
41 BestAskPrice(Option<Price>),
42 PublisherCount(u16),
43 Exponent(i16),
44 Confidence(Option<Price>),
45 FundingRate(Option<Rate>),
46 FundingTimestamp(Option<TimestampUs>),
47 FundingRateInterval(Option<DurationUs>),
48 MarketSession(MarketSession),
49 EmaPrice(Option<Price>),
50 EmaConfidence(Option<Price>),
51 FeedUpdateTimestamp(Option<TimestampUs>),
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
55pub struct AggregatedPriceFeedData {
56 pub price: Option<Price>,
57 pub best_bid_price: Option<Price>,
58 pub best_ask_price: Option<Price>,
59 pub publisher_count: u16,
60 pub exponent: i16,
61 pub confidence: Option<Price>,
62 pub funding_rate: Option<Rate>,
63 pub funding_timestamp: Option<TimestampUs>,
64 pub funding_rate_interval: Option<DurationUs>,
65 pub market_session: MarketSession,
66 pub ema_price: Option<Price>,
67 pub ema_confidence: Option<Price>,
68 pub feed_update_timestamp: Option<TimestampUs>,
69}
70
71impl AggregatedPriceFeedData {
72 pub fn empty(exponent: i16, market_session: MarketSession, now: TimestampUs) -> Self {
73 Self {
74 price: None,
75 best_bid_price: None,
76 best_ask_price: None,
77 publisher_count: 0,
78 exponent,
79 confidence: None,
80 funding_rate: None,
81 funding_timestamp: None,
82 funding_rate_interval: None,
83 market_session,
84 ema_price: None,
85 ema_confidence: None,
86 feed_update_timestamp: Some(now),
87 }
88 }
89}
90
91pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
94
95impl PayloadData {
96 pub fn new(
97 timestamp_us: TimestampUs,
98 channel_id: ChannelId,
99 feeds: &[(PriceFeedId, AggregatedPriceFeedData)],
100 requested_properties: &[PriceFeedProperty],
101 ) -> Self {
102 Self {
103 timestamp_us,
104 channel_id,
105 feeds: feeds
106 .iter()
107 .map(|(feed_id, feed)| PayloadFeedData {
108 feed_id: *feed_id,
109 properties: requested_properties
110 .iter()
111 .map(|property| match property {
112 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
113 PriceFeedProperty::BestBidPrice => {
114 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
115 }
116 PriceFeedProperty::BestAskPrice => {
117 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
118 }
119 PriceFeedProperty::PublisherCount => {
120 PayloadPropertyValue::PublisherCount(feed.publisher_count)
121 }
122 PriceFeedProperty::Exponent => {
123 PayloadPropertyValue::Exponent(feed.exponent)
124 }
125 PriceFeedProperty::Confidence => {
126 PayloadPropertyValue::Confidence(feed.confidence)
127 }
128 PriceFeedProperty::FundingRate => {
129 PayloadPropertyValue::FundingRate(feed.funding_rate)
130 }
131 PriceFeedProperty::FundingTimestamp => {
132 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
133 }
134 PriceFeedProperty::FundingRateInterval => {
135 PayloadPropertyValue::FundingRateInterval(
136 feed.funding_rate_interval,
137 )
138 }
139 PriceFeedProperty::MarketSession => {
140 PayloadPropertyValue::MarketSession(feed.market_session)
141 }
142 PriceFeedProperty::EmaPrice => {
143 PayloadPropertyValue::EmaPrice(feed.ema_price)
144 }
145 PriceFeedProperty::EmaConfidence => {
146 PayloadPropertyValue::EmaConfidence(feed.ema_confidence)
147 }
148 PriceFeedProperty::FeedUpdateTimestamp => {
149 PayloadPropertyValue::FeedUpdateTimestamp(
150 feed.feed_update_timestamp,
151 )
152 }
153 })
154 .collect(),
155 })
156 .collect(),
157 }
158 }
159
160 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
161 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
162 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
163 writer.write_u8(self.channel_id.0)?;
164 writer.write_u8(self.feeds.len().try_into()?)?;
165 for feed in &self.feeds {
166 writer.write_u32::<BO>(feed.feed_id.0)?;
167 writer.write_u8(feed.properties.len().try_into()?)?;
168 for property in &feed.properties {
169 match property {
170 PayloadPropertyValue::Price(price) => {
171 writer.write_u8(PriceFeedProperty::Price as u8)?;
172 write_option_price::<BO>(&mut writer, *price)?;
173 }
174 PayloadPropertyValue::BestBidPrice(price) => {
175 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
176 write_option_price::<BO>(&mut writer, *price)?;
177 }
178 PayloadPropertyValue::BestAskPrice(price) => {
179 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
180 write_option_price::<BO>(&mut writer, *price)?;
181 }
182 PayloadPropertyValue::PublisherCount(count) => {
183 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
184 writer.write_u16::<BO>(*count)?;
185 }
186 PayloadPropertyValue::Exponent(exponent) => {
187 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
188 writer.write_i16::<BO>(*exponent)?;
189 }
190 PayloadPropertyValue::Confidence(confidence) => {
191 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
192 write_option_price::<BO>(&mut writer, *confidence)?;
193 }
194 PayloadPropertyValue::FundingRate(rate) => {
195 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
196 write_option_rate::<BO>(&mut writer, *rate)?;
197 }
198 PayloadPropertyValue::FundingTimestamp(timestamp) => {
199 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
200 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
201 }
202 PayloadPropertyValue::FundingRateInterval(interval) => {
203 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
204 write_option_duration::<BO>(&mut writer, *interval)?;
205 }
206 PayloadPropertyValue::MarketSession(market_session) => {
207 writer.write_u8(PriceFeedProperty::MarketSession as u8)?;
208 writer.write_i16::<BO>((*market_session).into())?;
209 }
210 PayloadPropertyValue::EmaPrice(ema_price) => {
211 writer.write_u8(PriceFeedProperty::EmaPrice as u8)?;
212 write_option_price::<BO>(&mut writer, *ema_price)?;
213 }
214 PayloadPropertyValue::EmaConfidence(ema_confidence) => {
215 writer.write_u8(PriceFeedProperty::EmaConfidence as u8)?;
216 write_option_price::<BO>(&mut writer, *ema_confidence)?;
217 }
218 PayloadPropertyValue::FeedUpdateTimestamp(feed_update_timestamp) => {
219 writer.write_u8(PriceFeedProperty::FeedUpdateTimestamp as u8)?;
220 write_option_timestamp::<BO>(&mut writer, *feed_update_timestamp)?;
221 }
222 }
223 }
224 }
225 Ok(())
226 }
227
228 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
229 Self::deserialize::<LE>(Cursor::new(data))
230 }
231
232 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
233 Self::deserialize::<BE>(Cursor::new(data))
234 }
235
236 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
237 let magic = reader.read_u32::<BO>()?;
238 if magic != PAYLOAD_FORMAT_MAGIC {
239 bail!("magic mismatch");
240 }
241 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
242 let channel_id = ChannelId(reader.read_u8()?);
243 let num_feeds = reader.read_u8()?;
244 let mut feeds = Vec::with_capacity(num_feeds.into());
245 for _ in 0..num_feeds {
246 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
247 let num_properties = reader.read_u8()?;
248 let mut feed = PayloadFeedData {
249 feed_id,
250 properties: Vec::with_capacity(num_properties.into()),
251 };
252 for _ in 0..num_properties {
253 let property = reader.read_u8()?;
254 let property =
255 PriceFeedProperty::from_repr(property).context("unknown property")?;
256 let value = match property {
257 PriceFeedProperty::Price => {
258 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
259 }
260 PriceFeedProperty::BestBidPrice => {
261 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
262 }
263 PriceFeedProperty::BestAskPrice => {
264 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
265 }
266 PriceFeedProperty::PublisherCount => {
267 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
268 }
269 PriceFeedProperty::Exponent => {
270 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
271 }
272 PriceFeedProperty::Confidence => {
273 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
274 }
275 PriceFeedProperty::FundingRate => {
276 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
277 }
278 PriceFeedProperty::FundingTimestamp => PayloadPropertyValue::FundingTimestamp(
279 read_option_timestamp::<BO>(&mut reader)?,
280 ),
281 PriceFeedProperty::FundingRateInterval => {
282 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
283 &mut reader,
284 )?)
285 }
286 PriceFeedProperty::MarketSession => {
287 PayloadPropertyValue::MarketSession(reader.read_i16::<BO>()?.try_into()?)
288 }
289 PriceFeedProperty::EmaPrice => {
290 PayloadPropertyValue::EmaPrice(read_option_price::<BO>(&mut reader)?)
291 }
292 PriceFeedProperty::EmaConfidence => {
293 PayloadPropertyValue::EmaConfidence(read_option_price::<BO>(&mut reader)?)
294 }
295 PriceFeedProperty::FeedUpdateTimestamp => {
296 PayloadPropertyValue::FeedUpdateTimestamp(read_option_timestamp::<BO>(
297 &mut reader,
298 )?)
299 }
300 };
301 feed.properties.push(value);
302 }
303 feeds.push(feed);
304 }
305 Ok(Self {
306 timestamp_us,
307 channel_id,
308 feeds,
309 })
310 }
311}
312
313fn write_option_price<BO: ByteOrder>(
314 mut writer: impl Write,
315 value: Option<Price>,
316) -> std::io::Result<()> {
317 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
318}
319
320fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
321 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
322 Ok(value.map(Price::from_nonzero_mantissa))
323}
324
325fn write_option_rate<BO: ByteOrder>(
326 mut writer: impl Write,
327 value: Option<Rate>,
328) -> std::io::Result<()> {
329 match value {
330 Some(value) => {
331 writer.write_u8(1)?;
332 writer.write_i64::<BO>(value.mantissa())
333 }
334 None => {
335 writer.write_u8(0)?;
336 Ok(())
337 }
338 }
339}
340
341fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
342 let present = reader.read_u8()? != 0;
343 if present {
344 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
345 } else {
346 Ok(None)
347 }
348}
349
350fn write_option_timestamp<BO: ByteOrder>(
351 mut writer: impl Write,
352 value: Option<TimestampUs>,
353) -> std::io::Result<()> {
354 match value {
355 Some(value) => {
356 writer.write_u8(1)?;
357 writer.write_u64::<BO>(value.as_micros())
358 }
359 None => {
360 writer.write_u8(0)?;
361 Ok(())
362 }
363 }
364}
365
366fn read_option_timestamp<BO: ByteOrder>(
367 mut reader: impl Read,
368) -> std::io::Result<Option<TimestampUs>> {
369 let present = reader.read_u8()? != 0;
370 if present {
371 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
372 } else {
373 Ok(None)
374 }
375}
376
377fn write_option_duration<BO: ByteOrder>(
378 mut writer: impl Write,
379 value: Option<DurationUs>,
380) -> std::io::Result<()> {
381 match value {
382 Some(value) => {
383 writer.write_u8(1)?;
384 writer.write_u64::<BO>(value.as_micros())
385 }
386 None => {
387 writer.write_u8(0)?;
388 Ok(())
389 }
390 }
391}
392
393fn read_option_interval<BO: ByteOrder>(
394 mut reader: impl Read,
395) -> std::io::Result<Option<DurationUs>> {
396 let present = reader.read_u8()? != 0;
397 if present {
398 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
399 } else {
400 Ok(None)
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use crate::{
407 api::MarketSession,
408 message::SolanaMessage,
409 payload::{PayloadData, PayloadPropertyValue},
410 time::TimestampUs,
411 ChannelId, Price, PriceFeedId,
412 };
413
414 #[test]
415 fn parse_payload() {
416 let payload =
417 "b9011a82c7887f3aaa5845b20d6bf5ca6609953b57650fa4579a4b4d34a4980ba608a9f76a825a446\
418 f3d6c1fd9daca1c5e3fc46980f14ef89c1a886c6e9e5c510872d30f80efc1f480c5615af3fb673d422\
419 87e993da9fbc3506b6e41dfa32950820c2e6c620075d3c7934077d115064b06000301010000000d009\
420 032fc171b060000014e3a0cff1a06000002bcda8a211b06000003120004f8ff05fc0b7159000000000\
421 600070008000900000aa06616362e0600000b804f93312e0600000c014077d115064b0600";
422 let message = SolanaMessage::deserialize_slice(&hex::decode(payload).unwrap()).unwrap();
423 let payload = PayloadData::deserialize_slice_le(&message.payload).unwrap();
424 assert_eq!(
425 payload.timestamp_us,
426 TimestampUs::from_micros(1771339368200000)
427 );
428 assert_eq!(payload.channel_id, ChannelId::FIXED_RATE_200);
429 assert_eq!(payload.feeds.len(), 1);
430 let feed = &payload.feeds[0];
431 assert_eq!(feed.feed_id, PriceFeedId(1));
432 assert_eq!(feed.properties.len(), 13);
433 assert_eq!(
434 feed.properties[0],
435 PayloadPropertyValue::Price(Some(Price::from_mantissa(6713436287632).unwrap()))
436 );
437 assert_eq!(
438 feed.properties[1],
439 PayloadPropertyValue::BestBidPrice(Some(Price::from_mantissa(6713017907790).unwrap()))
440 );
441 assert_eq!(
442 feed.properties[2],
443 PayloadPropertyValue::BestAskPrice(Some(Price::from_mantissa(6713596631740).unwrap()))
444 );
445 assert_eq!(feed.properties[3], PayloadPropertyValue::PublisherCount(18));
446 assert_eq!(feed.properties[4], PayloadPropertyValue::Exponent(-8));
447 assert_eq!(
448 feed.properties[5],
449 PayloadPropertyValue::Confidence(Some(Price::from_mantissa(1500580860).unwrap()))
450 );
451 assert_eq!(feed.properties[6], PayloadPropertyValue::FundingRate(None));
452 assert_eq!(
453 feed.properties[7],
454 PayloadPropertyValue::FundingTimestamp(None)
455 );
456 assert_eq!(
457 feed.properties[8],
458 PayloadPropertyValue::FundingRateInterval(None)
459 );
460 assert_eq!(
461 feed.properties[9],
462 PayloadPropertyValue::MarketSession(MarketSession::Regular)
463 );
464 assert_eq!(
465 feed.properties[10],
466 PayloadPropertyValue::EmaPrice(Some(Price::from_mantissa(6795545700000).unwrap()))
467 );
468 assert_eq!(
469 feed.properties[11],
470 PayloadPropertyValue::EmaConfidence(Some(Price::from_mantissa(6795470000000).unwrap()))
471 );
472 assert_eq!(
473 feed.properties[12],
474 PayloadPropertyValue::FeedUpdateTimestamp(Some(TimestampUs::from_micros(
475 1771339368200000
476 )))
477 );
478 }
479}