1use crate::time::DurationUs;
4use {
5 super::router::{PriceFeedId, PriceFeedProperty},
6 crate::{
7 router::{ChannelId, Price, Rate},
8 time::TimestampUs,
9 },
10 anyhow::bail,
11 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
12 serde::{Deserialize, Serialize},
13 std::{
14 io::{Cursor, Read, Write},
15 num::NonZeroI64,
16 },
17};
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21pub struct PayloadData {
22 pub timestamp_us: TimestampUs,
23 pub channel_id: ChannelId,
24 pub feeds: Vec<PayloadFeedData>,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct PayloadFeedData {
30 pub feed_id: PriceFeedId,
31 pub properties: Vec<PayloadPropertyValue>,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub enum PayloadPropertyValue {
37 Price(Option<Price>),
38 BestBidPrice(Option<Price>),
39 BestAskPrice(Option<Price>),
40 PublisherCount(u16),
41 Exponent(i16),
42 Confidence(Option<Price>),
43 FundingRate(Option<Rate>),
44 FundingTimestamp(Option<TimestampUs>),
45}
46
47#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
48pub struct AggregatedPriceFeedData {
49 pub price: Option<Price>,
50 pub best_bid_price: Option<Price>,
51 pub best_ask_price: Option<Price>,
52 pub publisher_count: u16,
53 pub confidence: Option<Price>,
54 pub funding_rate: Option<Rate>,
55 pub funding_timestamp: Option<TimestampUs>,
56 pub funding_rate_interval: Option<DurationUs>,
57}
58
59pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
62
63impl PayloadData {
64 pub fn new(
65 timestamp_us: TimestampUs,
66 channel_id: ChannelId,
67 feeds: &[(PriceFeedId, i16, AggregatedPriceFeedData)],
68 requested_properties: &[PriceFeedProperty],
69 ) -> Self {
70 Self {
71 timestamp_us,
72 channel_id,
73 feeds: feeds
74 .iter()
75 .map(|(feed_id, exponent, feed)| PayloadFeedData {
76 feed_id: *feed_id,
77 properties: requested_properties
78 .iter()
79 .map(|property| match property {
80 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
81 PriceFeedProperty::BestBidPrice => {
82 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
83 }
84 PriceFeedProperty::BestAskPrice => {
85 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
86 }
87 PriceFeedProperty::PublisherCount => {
88 PayloadPropertyValue::PublisherCount(feed.publisher_count)
89 }
90 PriceFeedProperty::Exponent => {
91 PayloadPropertyValue::Exponent(*exponent)
92 }
93 PriceFeedProperty::Confidence => {
94 PayloadPropertyValue::Confidence(feed.confidence)
95 }
96 PriceFeedProperty::FundingRate => {
97 PayloadPropertyValue::FundingRate(feed.funding_rate)
98 }
99 PriceFeedProperty::FundingTimestamp => {
100 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
101 }
102 })
103 .collect(),
104 })
105 .collect(),
106 }
107 }
108
109 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
110 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
111 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
112 writer.write_u8(self.channel_id.0)?;
113 writer.write_u8(self.feeds.len().try_into()?)?;
114 for feed in &self.feeds {
115 writer.write_u32::<BO>(feed.feed_id.0)?;
116 writer.write_u8(feed.properties.len().try_into()?)?;
117 for property in &feed.properties {
118 match property {
119 PayloadPropertyValue::Price(price) => {
120 writer.write_u8(PriceFeedProperty::Price as u8)?;
121 write_option_price::<BO>(&mut writer, *price)?;
122 }
123 PayloadPropertyValue::BestBidPrice(price) => {
124 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
125 write_option_price::<BO>(&mut writer, *price)?;
126 }
127 PayloadPropertyValue::BestAskPrice(price) => {
128 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
129 write_option_price::<BO>(&mut writer, *price)?;
130 }
131 PayloadPropertyValue::PublisherCount(count) => {
132 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
133 writer.write_u16::<BO>(*count)?;
134 }
135 PayloadPropertyValue::Exponent(exponent) => {
136 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
137 writer.write_i16::<BO>(*exponent)?;
138 }
139 PayloadPropertyValue::Confidence(confidence) => {
140 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
141 write_option_price::<BO>(&mut writer, *confidence)?;
142 }
143 PayloadPropertyValue::FundingRate(rate) => {
144 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
145 write_option_rate::<BO>(&mut writer, *rate)?;
146 }
147 PayloadPropertyValue::FundingTimestamp(timestamp) => {
148 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
149 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
150 }
151 }
152 }
153 }
154 Ok(())
155 }
156
157 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
158 Self::deserialize::<LE>(Cursor::new(data))
159 }
160
161 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
162 Self::deserialize::<BE>(Cursor::new(data))
163 }
164
165 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
166 let magic = reader.read_u32::<BO>()?;
167 if magic != PAYLOAD_FORMAT_MAGIC {
168 bail!("magic mismatch");
169 }
170 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
171 let channel_id = ChannelId(reader.read_u8()?);
172 let num_feeds = reader.read_u8()?;
173 let mut feeds = Vec::with_capacity(num_feeds.into());
174 for _ in 0..num_feeds {
175 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
176 let num_properties = reader.read_u8()?;
177 let mut feed = PayloadFeedData {
178 feed_id,
179 properties: Vec::with_capacity(num_properties.into()),
180 };
181 for _ in 0..num_properties {
182 let property = reader.read_u8()?;
183 let value = if property == PriceFeedProperty::Price as u8 {
184 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
185 } else if property == PriceFeedProperty::BestBidPrice as u8 {
186 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
187 } else if property == PriceFeedProperty::BestAskPrice as u8 {
188 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
189 } else if property == PriceFeedProperty::PublisherCount as u8 {
190 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
191 } else if property == PriceFeedProperty::Exponent as u8 {
192 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
193 } else if property == PriceFeedProperty::Confidence as u8 {
194 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
195 } else if property == PriceFeedProperty::FundingRate as u8 {
196 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
197 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
198 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
199 &mut reader,
200 )?)
201 } else {
202 bail!("unknown property");
203 };
204 feed.properties.push(value);
205 }
206 feeds.push(feed);
207 }
208 Ok(Self {
209 timestamp_us,
210 channel_id,
211 feeds,
212 })
213 }
214}
215
216fn write_option_price<BO: ByteOrder>(
217 mut writer: impl Write,
218 value: Option<Price>,
219) -> std::io::Result<()> {
220 writer.write_i64::<BO>(value.map_or(0, |v| v.0.get()))
221}
222
223fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
224 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
225 Ok(value.map(Price))
226}
227
228fn write_option_rate<BO: ByteOrder>(
229 mut writer: impl Write,
230 value: Option<Rate>,
231) -> std::io::Result<()> {
232 match value {
233 Some(value) => {
234 writer.write_u8(1)?;
235 writer.write_i64::<BO>(value.0)
236 }
237 None => {
238 writer.write_u8(0)?;
239 Ok(())
240 }
241 }
242}
243
244fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
245 let present = reader.read_u8()? != 0;
246 if present {
247 Ok(Some(Rate(reader.read_i64::<BO>()?)))
248 } else {
249 Ok(None)
250 }
251}
252
253fn write_option_timestamp<BO: ByteOrder>(
254 mut writer: impl Write,
255 value: Option<TimestampUs>,
256) -> std::io::Result<()> {
257 match value {
258 Some(value) => {
259 writer.write_u8(1)?;
260 writer.write_u64::<BO>(value.as_micros())
261 }
262 None => {
263 writer.write_u8(0)?;
264 Ok(())
265 }
266 }
267}
268
269fn read_option_timestamp<BO: ByteOrder>(
270 mut reader: impl Read,
271) -> std::io::Result<Option<TimestampUs>> {
272 let present = reader.read_u8()? != 0;
273 if present {
274 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
275 } else {
276 Ok(None)
277 }
278}