1use crate::{
2 api::MarketSession,
3 price::Price,
4 rate::Rate,
5 time::{DurationUs, TimestampUs},
6 ChannelId, PriceFeedId, PriceFeedProperty,
7};
8use {
9 anyhow::bail,
10 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
11 serde::{Deserialize, Serialize},
12 std::{
13 io::{Cursor, Read, Write},
14 num::NonZeroI64,
15 },
16};
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct PayloadData {
21 pub timestamp_us: TimestampUs,
22 pub channel_id: ChannelId,
23 pub feeds: Vec<PayloadFeedData>,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct PayloadFeedData {
29 pub feed_id: PriceFeedId,
30 pub properties: Vec<PayloadPropertyValue>,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub enum PayloadPropertyValue {
36 Price(Option<Price>),
37 BestBidPrice(Option<Price>),
38 BestAskPrice(Option<Price>),
39 PublisherCount(u16),
40 Exponent(i16),
41 Confidence(Option<Price>),
42 FundingRate(Option<Rate>),
43 FundingTimestamp(Option<TimestampUs>),
44 FundingRateInterval(Option<DurationUs>),
45 MarketSession(MarketSession),
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct AggregatedPriceFeedData {
50 pub price: Option<Price>,
51 pub best_bid_price: Option<Price>,
52 pub best_ask_price: Option<Price>,
53 pub publisher_count: u16,
54 pub exponent: i16,
55 pub confidence: Option<Price>,
56 pub funding_rate: Option<Rate>,
57 pub funding_timestamp: Option<TimestampUs>,
58 pub funding_rate_interval: Option<DurationUs>,
59 pub market_session: MarketSession,
60}
61
62impl AggregatedPriceFeedData {
63 pub fn empty(exponent: i16, market_session: MarketSession) -> Self {
64 Self {
65 price: None,
66 best_bid_price: None,
67 best_ask_price: None,
68 publisher_count: 0,
69 exponent,
70 confidence: None,
71 funding_rate: None,
72 funding_timestamp: None,
73 funding_rate_interval: None,
74 market_session,
75 }
76 }
77}
78
79pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
82
83impl PayloadData {
84 pub fn new(
85 timestamp_us: TimestampUs,
86 channel_id: ChannelId,
87 feeds: &[(PriceFeedId, AggregatedPriceFeedData)],
88 requested_properties: &[PriceFeedProperty],
89 ) -> Self {
90 Self {
91 timestamp_us,
92 channel_id,
93 feeds: feeds
94 .iter()
95 .map(|(feed_id, feed)| PayloadFeedData {
96 feed_id: *feed_id,
97 properties: requested_properties
98 .iter()
99 .map(|property| match property {
100 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
101 PriceFeedProperty::BestBidPrice => {
102 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
103 }
104 PriceFeedProperty::BestAskPrice => {
105 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
106 }
107 PriceFeedProperty::PublisherCount => {
108 PayloadPropertyValue::PublisherCount(feed.publisher_count)
109 }
110 PriceFeedProperty::Exponent => {
111 PayloadPropertyValue::Exponent(feed.exponent)
112 }
113 PriceFeedProperty::Confidence => {
114 PayloadPropertyValue::Confidence(feed.confidence)
115 }
116 PriceFeedProperty::FundingRate => {
117 PayloadPropertyValue::FundingRate(feed.funding_rate)
118 }
119 PriceFeedProperty::FundingTimestamp => {
120 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
121 }
122 PriceFeedProperty::FundingRateInterval => {
123 PayloadPropertyValue::FundingRateInterval(
124 feed.funding_rate_interval,
125 )
126 }
127 PriceFeedProperty::MarketSession => {
128 PayloadPropertyValue::MarketSession(feed.market_session)
129 }
130 })
131 .collect(),
132 })
133 .collect(),
134 }
135 }
136
137 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
138 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
139 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
140 writer.write_u8(self.channel_id.0)?;
141 writer.write_u8(self.feeds.len().try_into()?)?;
142 for feed in &self.feeds {
143 writer.write_u32::<BO>(feed.feed_id.0)?;
144 writer.write_u8(feed.properties.len().try_into()?)?;
145 for property in &feed.properties {
146 match property {
147 PayloadPropertyValue::Price(price) => {
148 writer.write_u8(PriceFeedProperty::Price as u8)?;
149 write_option_price::<BO>(&mut writer, *price)?;
150 }
151 PayloadPropertyValue::BestBidPrice(price) => {
152 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
153 write_option_price::<BO>(&mut writer, *price)?;
154 }
155 PayloadPropertyValue::BestAskPrice(price) => {
156 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
157 write_option_price::<BO>(&mut writer, *price)?;
158 }
159 PayloadPropertyValue::PublisherCount(count) => {
160 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
161 writer.write_u16::<BO>(*count)?;
162 }
163 PayloadPropertyValue::Exponent(exponent) => {
164 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
165 writer.write_i16::<BO>(*exponent)?;
166 }
167 PayloadPropertyValue::Confidence(confidence) => {
168 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
169 write_option_price::<BO>(&mut writer, *confidence)?;
170 }
171 PayloadPropertyValue::FundingRate(rate) => {
172 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
173 write_option_rate::<BO>(&mut writer, *rate)?;
174 }
175 PayloadPropertyValue::FundingTimestamp(timestamp) => {
176 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
177 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
178 }
179 &PayloadPropertyValue::FundingRateInterval(interval) => {
180 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
181 write_option_duration::<BO>(&mut writer, interval)?;
182 }
183 PayloadPropertyValue::MarketSession(market_session) => {
184 writer.write_u8(PriceFeedProperty::MarketSession as u8)?;
185 writer.write_i16::<BO>((*market_session).into())?;
186 }
187 }
188 }
189 }
190 Ok(())
191 }
192
193 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
194 Self::deserialize::<LE>(Cursor::new(data))
195 }
196
197 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
198 Self::deserialize::<BE>(Cursor::new(data))
199 }
200
201 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
202 let magic = reader.read_u32::<BO>()?;
203 if magic != PAYLOAD_FORMAT_MAGIC {
204 bail!("magic mismatch");
205 }
206 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
207 let channel_id = ChannelId(reader.read_u8()?);
208 let num_feeds = reader.read_u8()?;
209 let mut feeds = Vec::with_capacity(num_feeds.into());
210 for _ in 0..num_feeds {
211 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
212 let num_properties = reader.read_u8()?;
213 let mut feed = PayloadFeedData {
214 feed_id,
215 properties: Vec::with_capacity(num_properties.into()),
216 };
217 for _ in 0..num_properties {
218 let property = reader.read_u8()?;
219 let value = if property == PriceFeedProperty::Price as u8 {
220 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
221 } else if property == PriceFeedProperty::BestBidPrice as u8 {
222 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
223 } else if property == PriceFeedProperty::BestAskPrice as u8 {
224 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
225 } else if property == PriceFeedProperty::PublisherCount as u8 {
226 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
227 } else if property == PriceFeedProperty::Exponent as u8 {
228 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
229 } else if property == PriceFeedProperty::Confidence as u8 {
230 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
231 } else if property == PriceFeedProperty::FundingRate as u8 {
232 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
233 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
234 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
235 &mut reader,
236 )?)
237 } else if property == PriceFeedProperty::FundingRateInterval as u8 {
238 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
239 &mut reader,
240 )?)
241 } else if property == PriceFeedProperty::MarketSession as u8 {
242 PayloadPropertyValue::MarketSession(reader.read_i16::<BO>()?.try_into()?)
243 } else {
244 bail!("unknown property");
245 };
246 feed.properties.push(value);
247 }
248 feeds.push(feed);
249 }
250 Ok(Self {
251 timestamp_us,
252 channel_id,
253 feeds,
254 })
255 }
256}
257
258fn write_option_price<BO: ByteOrder>(
259 mut writer: impl Write,
260 value: Option<Price>,
261) -> std::io::Result<()> {
262 writer.write_i64::<BO>(value.map_or(0, |v| v.mantissa_i64()))
263}
264
265fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
266 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
267 Ok(value.map(Price::from_nonzero_mantissa))
268}
269
270fn write_option_rate<BO: ByteOrder>(
271 mut writer: impl Write,
272 value: Option<Rate>,
273) -> std::io::Result<()> {
274 match value {
275 Some(value) => {
276 writer.write_u8(1)?;
277 writer.write_i64::<BO>(value.mantissa())
278 }
279 None => {
280 writer.write_u8(0)?;
281 Ok(())
282 }
283 }
284}
285
286fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
287 let present = reader.read_u8()? != 0;
288 if present {
289 Ok(Some(Rate::from_mantissa(reader.read_i64::<BO>()?)))
290 } else {
291 Ok(None)
292 }
293}
294
295fn write_option_timestamp<BO: ByteOrder>(
296 mut writer: impl Write,
297 value: Option<TimestampUs>,
298) -> std::io::Result<()> {
299 match value {
300 Some(value) => {
301 writer.write_u8(1)?;
302 writer.write_u64::<BO>(value.as_micros())
303 }
304 None => {
305 writer.write_u8(0)?;
306 Ok(())
307 }
308 }
309}
310
311fn read_option_timestamp<BO: ByteOrder>(
312 mut reader: impl Read,
313) -> std::io::Result<Option<TimestampUs>> {
314 let present = reader.read_u8()? != 0;
315 if present {
316 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
317 } else {
318 Ok(None)
319 }
320}
321
322fn write_option_duration<BO: ByteOrder>(
323 mut writer: impl Write,
324 value: Option<DurationUs>,
325) -> std::io::Result<()> {
326 match value {
327 Some(value) => {
328 writer.write_u8(1)?;
329 writer.write_u64::<BO>(value.as_micros())
330 }
331 None => {
332 writer.write_u8(0)?;
333 Ok(())
334 }
335 }
336}
337
338fn read_option_interval<BO: ByteOrder>(
339 mut reader: impl Read,
340) -> std::io::Result<Option<DurationUs>> {
341 let present = reader.read_u8()? != 0;
342 if present {
343 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
344 } else {
345 Ok(None)
346 }
347}