1use crate::time::DurationUs;
4use {
5 super::router::{PriceFeedId, PriceFeedProperty},
6 crate::{
7 router::{ChannelId, Price, Rate},
8 time::TimestampUs,
9 },
10 anyhow::bail,
11 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
12 serde::{Deserialize, Serialize},
13 std::{
14 io::{Cursor, Read, Write},
15 num::NonZeroI64,
16 },
17};
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct PayloadData {
22 pub timestamp_us: TimestampUs,
23 pub channel_id: ChannelId,
24 pub feeds: Vec<PayloadFeedData>,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct PayloadFeedData {
30 pub feed_id: PriceFeedId,
31 pub properties: Vec<PayloadPropertyValue>,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub enum PayloadPropertyValue {
37 Price(Option<Price>),
38 BestBidPrice(Option<Price>),
39 BestAskPrice(Option<Price>),
40 PublisherCount(u16),
41 Exponent(i16),
42 Confidence(Option<Price>),
43 FundingRate(Option<Rate>),
44 FundingTimestamp(Option<TimestampUs>),
45 FundingRateInterval(Option<DurationUs>),
46}
47
48#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct AggregatedPriceFeedData {
50 pub price: Option<Price>,
51 pub best_bid_price: Option<Price>,
52 pub best_ask_price: Option<Price>,
53 pub publisher_count: u16,
54 pub confidence: Option<Price>,
55 pub funding_rate: Option<Rate>,
56 pub funding_timestamp: Option<TimestampUs>,
57 pub funding_rate_interval: Option<DurationUs>,
58}
59
60pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
63
64impl PayloadData {
65 pub fn new(
66 timestamp_us: TimestampUs,
67 channel_id: ChannelId,
68 feeds: &[(PriceFeedId, i16, AggregatedPriceFeedData)],
69 requested_properties: &[PriceFeedProperty],
70 ) -> Self {
71 Self {
72 timestamp_us,
73 channel_id,
74 feeds: feeds
75 .iter()
76 .map(|(feed_id, exponent, feed)| PayloadFeedData {
77 feed_id: *feed_id,
78 properties: requested_properties
79 .iter()
80 .map(|property| match property {
81 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
82 PriceFeedProperty::BestBidPrice => {
83 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
84 }
85 PriceFeedProperty::BestAskPrice => {
86 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
87 }
88 PriceFeedProperty::PublisherCount => {
89 PayloadPropertyValue::PublisherCount(feed.publisher_count)
90 }
91 PriceFeedProperty::Exponent => {
92 PayloadPropertyValue::Exponent(*exponent)
93 }
94 PriceFeedProperty::Confidence => {
95 PayloadPropertyValue::Confidence(feed.confidence)
96 }
97 PriceFeedProperty::FundingRate => {
98 PayloadPropertyValue::FundingRate(feed.funding_rate)
99 }
100 PriceFeedProperty::FundingTimestamp => {
101 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
102 }
103 PriceFeedProperty::FundingRateInterval => {
104 PayloadPropertyValue::FundingRateInterval(
105 feed.funding_rate_interval,
106 )
107 }
108 })
109 .collect(),
110 })
111 .collect(),
112 }
113 }
114
115 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
116 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
117 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
118 writer.write_u8(self.channel_id.0)?;
119 writer.write_u8(self.feeds.len().try_into()?)?;
120 for feed in &self.feeds {
121 writer.write_u32::<BO>(feed.feed_id.0)?;
122 writer.write_u8(feed.properties.len().try_into()?)?;
123 for property in &feed.properties {
124 match property {
125 PayloadPropertyValue::Price(price) => {
126 writer.write_u8(PriceFeedProperty::Price as u8)?;
127 write_option_price::<BO>(&mut writer, *price)?;
128 }
129 PayloadPropertyValue::BestBidPrice(price) => {
130 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
131 write_option_price::<BO>(&mut writer, *price)?;
132 }
133 PayloadPropertyValue::BestAskPrice(price) => {
134 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
135 write_option_price::<BO>(&mut writer, *price)?;
136 }
137 PayloadPropertyValue::PublisherCount(count) => {
138 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
139 writer.write_u16::<BO>(*count)?;
140 }
141 PayloadPropertyValue::Exponent(exponent) => {
142 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
143 writer.write_i16::<BO>(*exponent)?;
144 }
145 PayloadPropertyValue::Confidence(confidence) => {
146 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
147 write_option_price::<BO>(&mut writer, *confidence)?;
148 }
149 PayloadPropertyValue::FundingRate(rate) => {
150 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
151 write_option_rate::<BO>(&mut writer, *rate)?;
152 }
153 PayloadPropertyValue::FundingTimestamp(timestamp) => {
154 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
155 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
156 }
157 &PayloadPropertyValue::FundingRateInterval(interval) => {
158 writer.write_u8(PriceFeedProperty::FundingRateInterval as u8)?;
159 write_option_duration::<BO>(&mut writer, interval)?;
160 }
161 }
162 }
163 }
164 Ok(())
165 }
166
167 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
168 Self::deserialize::<LE>(Cursor::new(data))
169 }
170
171 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
172 Self::deserialize::<BE>(Cursor::new(data))
173 }
174
175 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
176 let magic = reader.read_u32::<BO>()?;
177 if magic != PAYLOAD_FORMAT_MAGIC {
178 bail!("magic mismatch");
179 }
180 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
181 let channel_id = ChannelId(reader.read_u8()?);
182 let num_feeds = reader.read_u8()?;
183 let mut feeds = Vec::with_capacity(num_feeds.into());
184 for _ in 0..num_feeds {
185 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
186 let num_properties = reader.read_u8()?;
187 let mut feed = PayloadFeedData {
188 feed_id,
189 properties: Vec::with_capacity(num_properties.into()),
190 };
191 for _ in 0..num_properties {
192 let property = reader.read_u8()?;
193 let value = if property == PriceFeedProperty::Price as u8 {
194 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
195 } else if property == PriceFeedProperty::BestBidPrice as u8 {
196 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
197 } else if property == PriceFeedProperty::BestAskPrice as u8 {
198 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
199 } else if property == PriceFeedProperty::PublisherCount as u8 {
200 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
201 } else if property == PriceFeedProperty::Exponent as u8 {
202 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
203 } else if property == PriceFeedProperty::Confidence as u8 {
204 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
205 } else if property == PriceFeedProperty::FundingRate as u8 {
206 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
207 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
208 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
209 &mut reader,
210 )?)
211 } else if property == PriceFeedProperty::FundingRateInterval as u8 {
212 PayloadPropertyValue::FundingRateInterval(read_option_interval::<BO>(
213 &mut reader,
214 )?)
215 } else {
216 bail!("unknown property");
217 };
218 feed.properties.push(value);
219 }
220 feeds.push(feed);
221 }
222 Ok(Self {
223 timestamp_us,
224 channel_id,
225 feeds,
226 })
227 }
228}
229
230fn write_option_price<BO: ByteOrder>(
231 mut writer: impl Write,
232 value: Option<Price>,
233) -> std::io::Result<()> {
234 writer.write_i64::<BO>(value.map_or(0, |v| v.0.get()))
235}
236
237fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
238 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
239 Ok(value.map(Price))
240}
241
242fn write_option_rate<BO: ByteOrder>(
243 mut writer: impl Write,
244 value: Option<Rate>,
245) -> std::io::Result<()> {
246 match value {
247 Some(value) => {
248 writer.write_u8(1)?;
249 writer.write_i64::<BO>(value.0)
250 }
251 None => {
252 writer.write_u8(0)?;
253 Ok(())
254 }
255 }
256}
257
258fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
259 let present = reader.read_u8()? != 0;
260 if present {
261 Ok(Some(Rate(reader.read_i64::<BO>()?)))
262 } else {
263 Ok(None)
264 }
265}
266
267fn write_option_timestamp<BO: ByteOrder>(
268 mut writer: impl Write,
269 value: Option<TimestampUs>,
270) -> std::io::Result<()> {
271 match value {
272 Some(value) => {
273 writer.write_u8(1)?;
274 writer.write_u64::<BO>(value.as_micros())
275 }
276 None => {
277 writer.write_u8(0)?;
278 Ok(())
279 }
280 }
281}
282
283fn read_option_timestamp<BO: ByteOrder>(
284 mut reader: impl Read,
285) -> std::io::Result<Option<TimestampUs>> {
286 let present = reader.read_u8()? != 0;
287 if present {
288 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
289 } else {
290 Ok(None)
291 }
292}
293
294fn write_option_duration<BO: ByteOrder>(
295 mut writer: impl Write,
296 value: Option<DurationUs>,
297) -> std::io::Result<()> {
298 match value {
299 Some(value) => {
300 writer.write_u8(1)?;
301 writer.write_u64::<BO>(value.as_micros())
302 }
303 None => {
304 writer.write_u8(0)?;
305 Ok(())
306 }
307 }
308}
309
310fn read_option_interval<BO: ByteOrder>(
311 mut reader: impl Read,
312) -> std::io::Result<Option<DurationUs>> {
313 let present = reader.read_u8()? != 0;
314 if present {
315 Ok(Some(DurationUs::from_micros(reader.read_u64::<BO>()?)))
316 } else {
317 Ok(None)
318 }
319}