1use {
4 super::router::{PriceFeedId, PriceFeedProperty},
5 crate::{
6 router::{ChannelId, Price, Rate},
7 time::TimestampUs,
8 },
9 anyhow::bail,
10 byteorder::{ByteOrder, ReadBytesExt, WriteBytesExt, BE, LE},
11 serde::{Deserialize, Serialize},
12 std::{
13 io::{Cursor, Read, Write},
14 num::NonZeroI64,
15 },
16};
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct PayloadData {
21 pub timestamp_us: TimestampUs,
22 pub channel_id: ChannelId,
23 pub feeds: Vec<PayloadFeedData>,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct PayloadFeedData {
29 pub feed_id: PriceFeedId,
30 pub properties: Vec<PayloadPropertyValue>,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub enum PayloadPropertyValue {
36 Price(Option<Price>),
37 BestBidPrice(Option<Price>),
38 BestAskPrice(Option<Price>),
39 PublisherCount(u16),
40 Exponent(i16),
41 Confidence(Option<Price>),
42 FundingRate(Option<Rate>),
43 FundingTimestamp(Option<TimestampUs>),
44}
45
46#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
47pub struct AggregatedPriceFeedData {
48 pub price: Option<Price>,
49 pub best_bid_price: Option<Price>,
50 pub best_ask_price: Option<Price>,
51 pub publisher_count: u16,
52 pub confidence: Option<Price>,
53 pub funding_rate: Option<Rate>,
54 pub funding_timestamp: Option<TimestampUs>,
55}
56
57pub const PAYLOAD_FORMAT_MAGIC: u32 = 2479346549;
60
61impl PayloadData {
62 pub fn new(
63 timestamp_us: TimestampUs,
64 channel_id: ChannelId,
65 feeds: &[(PriceFeedId, i16, AggregatedPriceFeedData)],
66 requested_properties: &[PriceFeedProperty],
67 ) -> Self {
68 Self {
69 timestamp_us,
70 channel_id,
71 feeds: feeds
72 .iter()
73 .map(|(feed_id, exponent, feed)| PayloadFeedData {
74 feed_id: *feed_id,
75 properties: requested_properties
76 .iter()
77 .map(|property| match property {
78 PriceFeedProperty::Price => PayloadPropertyValue::Price(feed.price),
79 PriceFeedProperty::BestBidPrice => {
80 PayloadPropertyValue::BestBidPrice(feed.best_bid_price)
81 }
82 PriceFeedProperty::BestAskPrice => {
83 PayloadPropertyValue::BestAskPrice(feed.best_ask_price)
84 }
85 PriceFeedProperty::PublisherCount => {
86 PayloadPropertyValue::PublisherCount(feed.publisher_count)
87 }
88 PriceFeedProperty::Exponent => {
89 PayloadPropertyValue::Exponent(*exponent)
90 }
91 PriceFeedProperty::Confidence => {
92 PayloadPropertyValue::Confidence(feed.confidence)
93 }
94 PriceFeedProperty::FundingRate => {
95 PayloadPropertyValue::FundingRate(feed.funding_rate)
96 }
97 PriceFeedProperty::FundingTimestamp => {
98 PayloadPropertyValue::FundingTimestamp(feed.funding_timestamp)
99 }
100 })
101 .collect(),
102 })
103 .collect(),
104 }
105 }
106
107 pub fn serialize<BO: ByteOrder>(&self, mut writer: impl Write) -> anyhow::Result<()> {
108 writer.write_u32::<BO>(PAYLOAD_FORMAT_MAGIC)?;
109 writer.write_u64::<BO>(self.timestamp_us.as_micros())?;
110 writer.write_u8(self.channel_id.0)?;
111 writer.write_u8(self.feeds.len().try_into()?)?;
112 for feed in &self.feeds {
113 writer.write_u32::<BO>(feed.feed_id.0)?;
114 writer.write_u8(feed.properties.len().try_into()?)?;
115 for property in &feed.properties {
116 match property {
117 PayloadPropertyValue::Price(price) => {
118 writer.write_u8(PriceFeedProperty::Price as u8)?;
119 write_option_price::<BO>(&mut writer, *price)?;
120 }
121 PayloadPropertyValue::BestBidPrice(price) => {
122 writer.write_u8(PriceFeedProperty::BestBidPrice as u8)?;
123 write_option_price::<BO>(&mut writer, *price)?;
124 }
125 PayloadPropertyValue::BestAskPrice(price) => {
126 writer.write_u8(PriceFeedProperty::BestAskPrice as u8)?;
127 write_option_price::<BO>(&mut writer, *price)?;
128 }
129 PayloadPropertyValue::PublisherCount(count) => {
130 writer.write_u8(PriceFeedProperty::PublisherCount as u8)?;
131 writer.write_u16::<BO>(*count)?;
132 }
133 PayloadPropertyValue::Exponent(exponent) => {
134 writer.write_u8(PriceFeedProperty::Exponent as u8)?;
135 writer.write_i16::<BO>(*exponent)?;
136 }
137 PayloadPropertyValue::Confidence(confidence) => {
138 writer.write_u8(PriceFeedProperty::Confidence as u8)?;
139 write_option_price::<BO>(&mut writer, *confidence)?;
140 }
141 PayloadPropertyValue::FundingRate(rate) => {
142 writer.write_u8(PriceFeedProperty::FundingRate as u8)?;
143 write_option_rate::<BO>(&mut writer, *rate)?;
144 }
145 PayloadPropertyValue::FundingTimestamp(timestamp) => {
146 writer.write_u8(PriceFeedProperty::FundingTimestamp as u8)?;
147 write_option_timestamp::<BO>(&mut writer, *timestamp)?;
148 }
149 }
150 }
151 }
152 Ok(())
153 }
154
155 pub fn deserialize_slice_le(data: &[u8]) -> anyhow::Result<Self> {
156 Self::deserialize::<LE>(Cursor::new(data))
157 }
158
159 pub fn deserialize_slice_be(data: &[u8]) -> anyhow::Result<Self> {
160 Self::deserialize::<BE>(Cursor::new(data))
161 }
162
163 pub fn deserialize<BO: ByteOrder>(mut reader: impl Read) -> anyhow::Result<Self> {
164 let magic = reader.read_u32::<BO>()?;
165 if magic != PAYLOAD_FORMAT_MAGIC {
166 bail!("magic mismatch");
167 }
168 let timestamp_us = TimestampUs::from_micros(reader.read_u64::<BO>()?);
169 let channel_id = ChannelId(reader.read_u8()?);
170 let num_feeds = reader.read_u8()?;
171 let mut feeds = Vec::with_capacity(num_feeds.into());
172 for _ in 0..num_feeds {
173 let feed_id = PriceFeedId(reader.read_u32::<BO>()?);
174 let num_properties = reader.read_u8()?;
175 let mut feed = PayloadFeedData {
176 feed_id,
177 properties: Vec::with_capacity(num_properties.into()),
178 };
179 for _ in 0..num_properties {
180 let property = reader.read_u8()?;
181 let value = if property == PriceFeedProperty::Price as u8 {
182 PayloadPropertyValue::Price(read_option_price::<BO>(&mut reader)?)
183 } else if property == PriceFeedProperty::BestBidPrice as u8 {
184 PayloadPropertyValue::BestBidPrice(read_option_price::<BO>(&mut reader)?)
185 } else if property == PriceFeedProperty::BestAskPrice as u8 {
186 PayloadPropertyValue::BestAskPrice(read_option_price::<BO>(&mut reader)?)
187 } else if property == PriceFeedProperty::PublisherCount as u8 {
188 PayloadPropertyValue::PublisherCount(reader.read_u16::<BO>()?)
189 } else if property == PriceFeedProperty::Exponent as u8 {
190 PayloadPropertyValue::Exponent(reader.read_i16::<BO>()?)
191 } else if property == PriceFeedProperty::Confidence as u8 {
192 PayloadPropertyValue::Confidence(read_option_price::<BO>(&mut reader)?)
193 } else if property == PriceFeedProperty::FundingRate as u8 {
194 PayloadPropertyValue::FundingRate(read_option_rate::<BO>(&mut reader)?)
195 } else if property == PriceFeedProperty::FundingTimestamp as u8 {
196 PayloadPropertyValue::FundingTimestamp(read_option_timestamp::<BO>(
197 &mut reader,
198 )?)
199 } else {
200 bail!("unknown property");
201 };
202 feed.properties.push(value);
203 }
204 feeds.push(feed);
205 }
206 Ok(Self {
207 timestamp_us,
208 channel_id,
209 feeds,
210 })
211 }
212}
213
214fn write_option_price<BO: ByteOrder>(
215 mut writer: impl Write,
216 value: Option<Price>,
217) -> std::io::Result<()> {
218 writer.write_i64::<BO>(value.map_or(0, |v| v.0.get()))
219}
220
221fn read_option_price<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Price>> {
222 let value = NonZeroI64::new(reader.read_i64::<BO>()?);
223 Ok(value.map(Price))
224}
225
226fn write_option_rate<BO: ByteOrder>(
227 mut writer: impl Write,
228 value: Option<Rate>,
229) -> std::io::Result<()> {
230 match value {
231 Some(value) => {
232 writer.write_u8(1)?;
233 writer.write_i64::<BO>(value.0)
234 }
235 None => {
236 writer.write_u8(0)?;
237 Ok(())
238 }
239 }
240}
241
242fn read_option_rate<BO: ByteOrder>(mut reader: impl Read) -> std::io::Result<Option<Rate>> {
243 let present = reader.read_u8()? != 0;
244 if present {
245 Ok(Some(Rate(reader.read_i64::<BO>()?)))
246 } else {
247 Ok(None)
248 }
249}
250
251fn write_option_timestamp<BO: ByteOrder>(
252 mut writer: impl Write,
253 value: Option<TimestampUs>,
254) -> std::io::Result<()> {
255 match value {
256 Some(value) => {
257 writer.write_u8(1)?;
258 writer.write_u64::<BO>(value.as_micros())
259 }
260 None => {
261 writer.write_u8(0)?;
262 Ok(())
263 }
264 }
265}
266
267fn read_option_timestamp<BO: ByteOrder>(
268 mut reader: impl Read,
269) -> std::io::Result<Option<TimestampUs>> {
270 let present = reader.read_u8()? != 0;
271 if present {
272 Ok(Some(TimestampUs::from_micros(reader.read_u64::<BO>()?)))
273 } else {
274 Ok(None)
275 }
276}