1use {
4 super::router::{PriceFeedId, PriceFeedProperty, TimestampUs},
5 crate::router::{ChannelId, Price, Rate},
6 anyhow::bail,
7 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
8 serde::{Deserialize, Serialize},
9 std::{
10 io::{Cursor, Read, Write},
11 num::NonZeroI64,
12 },
13};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
17pub struct PayloadData {
18 pub timestamp_us: TimestampUs,
19 pub channel_id: ChannelId,
20 pub feeds: Vec<PayloadFeedData>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash)]
25pub struct PayloadFeedData {
26 pub feed_id: PriceFeedId,
27 pub properties: Vec<PayloadPropertyValue>,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub enum PayloadPropertyValue {
33 Price(Option<Price>),
34 BestBidPrice(Option<Price>),
35 BestAskPrice(Option<Price>),
36 PublisherCount(u16),
37 Exponent(i16),
38 Confidence(Option<Price>),
39 FundingRate(Option<Rate>),
40 FundingTimestamp(Option<TimestampUs>),
41}
42
43#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
44pub struct AggregatedPriceFeedData {
45 pub price: Option<Price>,
46 pub best_bid_price: Option<Price>,
47 pub best_ask_price: Option<Price>,
48 pub publisher_count: u16,
49 pub confidence: Option<Price>,
50 pub funding_rate: Option<Rate>,
51 pub funding_timestamp: Option<TimestampUs>,
52}
53
54pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
57
58impl PayloadData {
59 pub fn new(
60 timestamp_us: TimestampUs,
61 channel_id: ChannelId,
62 feeds: &[(PriceFeedId, i16, AggregatedPriceFeedData)],
63 requested_properties: &[PriceFeedProperty],
64 ) -> Self {
65 Self {
66 timestamp_us,
67 channel_id,
68 feeds: feeds
69 .iter()
70 .map(|(feed_id, exponent, feed)| PayloadFeedData {
71 feed_id: *feed_id,
72 properties: requested_properties
73 .iter()
74 .map(|property| match property {
75 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
76 PriceFeedProperty::BestBidPrice => {
77 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
78 }
79 PriceFeedProperty::BestAskPrice => {
80 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
81 }
82 PriceFeedProperty::PublisherCount => {
83 PayloadPropertyValue::PublisherCount(feed.publisher_count)
84 }
85 PriceFeedProperty::Exponent => {
86 PayloadPropertyValue::Exponent(*exponent)
87 }
88 PriceFeedProperty::Confidence => {
89 PayloadPropertyValue::Confidence(feed.confidence)
90 }
91 PriceFeedProperty::FundingRate => {
92 PayloadPropertyValue::FundingRate(feed.funding_rate)
93 }
94 PriceFeedProperty::FundingTimestamp => {
95 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
96 }
97 })
98 .collect(),
99 })
100 .collect(),
101 }
102 }
103
104 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
105 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
106 writer.write_u64::<BO>(self.timestamp_us.0)?;
107 writer.write_u8(self.channel_id.0)?;
108 writer.write_u8(self.feeds.len().try_into()?)?;
109 for feed in &self.feeds {
110 writer.write_u32::<BO>(feed.feed_id.0)?;
111 writer.write_u8(feed.properties.len().try_into()?)?;
112 for property in &feed.properties {
113 match property {
114 PayloadPropertyValue::Price(price) => {
115 writer.write_u8(PriceFeedProperty::Price as u8)?;
116 write_option_price::<BO>(&mut writer, *price)?;
117 }
118 PayloadPropertyValue::BestBidPrice(price) => {
119 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
120 write_option_price::<BO>(&mut writer, *price)?;
121 }
122 PayloadPropertyValue::BestAskPrice(price) => {
123 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
124 write_option_price::<BO>(&mut writer, *price)?;
125 }
126 PayloadPropertyValue::PublisherCount(count) => {
127 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
128 writer.write_u16::<BO>(*count)?;
129 }
130 PayloadPropertyValue::Exponent(exponent) => {
131 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
132 writer.write_i16::<BO>(*exponent)?;
133 }
134 PayloadPropertyValue::Confidence(confidence) => {
135 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
136 write_option_price::<BO>(&mut writer, *confidence)?;
137 }
138 PayloadPropertyValue::FundingRate(rate) => {
139 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
140 write_option_rate::<BO>(&mut writer, *rate)?;
141 }
142 PayloadPropertyValue::FundingTimestamp(timestamp) => {
143 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
144 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
145 }
146 }
147 }
148 }
149 Ok(())
150 }
151
152 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
153 Self::deserialize::<LE>(Cursor::new(data))
154 }
155
156 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
157 Self::deserialize::<BE>(Cursor::new(data))
158 }
159
160 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
161 let magic = reader.read_u32::<BO>()?;
162 if magic != PAYLOAD_FORMAT_MAGIC {
163 bail!("magic mismatch");
164 }
165 let timestamp_us = TimestampUs(reader.read_u64::<BO>()?);
166 let channel_id = ChannelId(reader.read_u8()?);
167 let num_feeds = reader.read_u8()?;
168 let mut feeds = Vec::with_capacity(num_feeds.into());
169 for _ in 0..num_feeds {
170 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
171 let num_properties = reader.read_u8()?;
172 let mut feed = PayloadFeedData {
173 feed_id,
174 properties: Vec::with_capacity(num_properties.into()),
175 };
176 for _ in 0..num_properties {
177 let property = reader.read_u8()?;
178 let value = if property == PriceFeedProperty::Price as u8 {
179 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
180 } else if property == PriceFeedProperty::BestBidPrice as u8 {
181 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
182 } else if property == PriceFeedProperty::BestAskPrice as u8 {
183 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
184 } else if property == PriceFeedProperty::PublisherCount as u8 {
185 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
186 } else if property == PriceFeedProperty::Exponent as u8 {
187 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
188 } else if property == PriceFeedProperty::Confidence as u8 {
189 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
190 } else if property == PriceFeedProperty::FundingRate as u8 {
191 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
192 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
193 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
194 &mut reader,
195 )?)
196 } else {
197 bail!("unknown property");
198 };
199 feed.properties.push(value);
200 }
201 feeds.push(feed);
202 }
203 Ok(Self {
204 timestamp_us,
205 channel_id,
206 feeds,
207 })
208 }
209}
210
211fn write_option_price<BO: ByteOrder>(
212 mut writer: impl Write,
213 value: Option<Price>,
214) -> std::io::Result<()> {
215 writer.write_i64::<BO>(value.map_or(0, |v| v.0.get()))
216}
217
218fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
219 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
220 Ok(value.map(Price))
221}
222
223fn write_option_rate<BO: ByteOrder>(
224 mut writer: impl Write,
225 value: Option<Rate>,
226) -> std::io::Result<()> {
227 match value {
228 Some(value) => {
229 writer.write_u8(1)?;
230 writer.write_i64::<BO>(value.0)
231 }
232 None => {
233 writer.write_u8(0)?;
234 Ok(())
235 }
236 }
237}
238
239fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
240 let present = reader.read_u8()? != 0;
241 if present {
242 Ok(Some(Rate(reader.read_i64::<BO>()?)))
243 } else {
244 Ok(None)
245 }
246}
247
248fn write_option_timestamp<BO: ByteOrder>(
249 mut writer: impl Write,
250 value: Option<TimestampUs>,
251) -> std::io::Result<()> {
252 match value {
253 Some(value) => {
254 writer.write_u8(1)?;
255 writer.write_u64::<BO>(value.0)
256 }
257 None => {
258 writer.write_u8(0)?;
259 Ok(())
260 }
261 }
262}
263
264fn read_option_timestamp<BO: ByteOrder>(
265 mut reader: impl Read,
266) -> std::io::Result<Option<TimestampUs>> {
267 let present = reader.read_u8()? != 0;
268 if present {
269 Ok(Some(TimestampUs(reader.read_u64::<BO>()?)))
270 } else {
271 Ok(None)
272 }
273}