1use protobuf::MessageField;
4use {
5 crate::payload::AggregatedPriceFeedData,
6 anyhow::{bail, Context},
7 itertools::Itertools,
8 protobuf::well_known_types::timestamp::Timestamp,
9 rust_decimal::{prelude::FromPrimitive, Decimal},
10 serde::{de::Error, Deserialize, Serialize},
11 std::{
12 fmt::Display,
13 num::NonZeroI64,
14 ops::{Add, Deref, DerefMut, Div, Sub},
15 time::{SystemTime, UNIX_EPOCH},
16 },
17};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
20pub struct PublisherId(pub u16);
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
23pub struct PriceFeedId(pub u32);
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
26pub struct ChannelId(pub u8);
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
29pub struct TimestampUs(pub u64);
30
31impl TryFrom<&Timestamp> for TimestampUs {
32 type Error = anyhow::Error;
33
34 fn try_from(timestamp: &Timestamp) -> anyhow::Result<Self> {
35 let seconds_in_micros: u64 = (timestamp.seconds * 1_000_000).try_into()?;
36 let nanos_in_micros: u64 = (timestamp.nanos / 1_000).try_into()?;
37 Ok(TimestampUs(seconds_in_micros + nanos_in_micros))
38 }
39}
40
41impl From<TimestampUs> for Timestamp {
42 fn from(value: TimestampUs) -> Self {
43 Timestamp {
44 #[allow(
45 clippy::cast_possible_wrap,
46 reason = "u64 to i64 after this division can never overflow because the value cannot be too big"
47 )]
48 seconds: (value.0 / 1_000_000) as i64,
49 nanos: (value.0 % 1_000_000) as i32 * 1000,
50 special_fields: Default::default(),
51 }
52 }
53}
54
55impl From<TimestampUs> for MessageField<Timestamp> {
56 fn from(value: TimestampUs) -> Self {
57 MessageField::some(value.into())
58 }
59}
60
61impl TimestampUs {
62 pub fn now() -> Self {
63 let value = SystemTime::now()
64 .duration_since(UNIX_EPOCH)
65 .expect("invalid system time")
66 .as_micros()
67 .try_into()
68 .expect("invalid system time");
69 Self(value)
70 }
71
72 pub fn saturating_us_since(self, other: Self) -> u64 {
73 self.0.saturating_sub(other.0)
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
78#[repr(transparent)]
79pub struct Rate(pub i64);
80
81impl Rate {
82 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
83 let value: Decimal = value.parse()?;
84 let coef = 10i64.checked_pow(exponent).context("overflow")?;
85 let coef = Decimal::from_i64(coef).context("overflow")?;
86 let value = value.checked_mul(coef).context("overflow")?;
87 if !value.is_integer() {
88 bail!("price value is more precise than available exponent");
89 }
90 let value: i64 = value.try_into().context("overflow")?;
91 Ok(Self(value))
92 }
93
94 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
95 let value = Decimal::from_f64(value).context("overflow")?;
96 let coef = 10i64.checked_pow(exponent).context("overflow")?;
97 let coef = Decimal::from_i64(coef).context("overflow")?;
98 let value = value.checked_mul(coef).context("overflow")?;
99 let value: i64 = value.try_into().context("overflow")?;
100 Ok(Self(value))
101 }
102
103 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
104 let coef = 10i64.checked_pow(exponent).context("overflow")?;
105 let value = value.checked_mul(coef).context("overflow")?;
106 Ok(Self(value))
107 }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
111#[repr(transparent)]
112pub struct Price(pub NonZeroI64);
113
114impl Price {
115 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
116 let coef = 10i64.checked_pow(exponent).context("overflow")?;
117 let value = value.checked_mul(coef).context("overflow")?;
118 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
119 Ok(Self(value))
120 }
121
122 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
123 let value: Decimal = value.parse()?;
124 let coef = 10i64.checked_pow(exponent).context("overflow")?;
125 let coef = Decimal::from_i64(coef).context("overflow")?;
126 let value = value.checked_mul(coef).context("overflow")?;
127 if !value.is_integer() {
128 bail!("price value is more precise than available exponent");
129 }
130 let value: i64 = value.try_into().context("overflow")?;
131 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
132 Ok(Self(value))
133 }
134
135 pub fn new(value: i64) -> anyhow::Result<Self> {
136 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
137 Ok(Self(value))
138 }
139
140 pub fn into_inner(self) -> NonZeroI64 {
141 self.0
142 }
143
144 pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
145 Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
146 }
147
148 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
149 let value = (value * 10f64.powi(exponent as i32)) as i64;
150 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
151 Ok(Self(value))
152 }
153
154 pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
155 let left_value = i128::from(self.0.get());
156 let right_value = i128::from(rhs.0.get());
157
158 let value = left_value * right_value / 10i128.pow(rhs_exponent);
159 let value = value.try_into()?;
160 NonZeroI64::new(value)
161 .context("zero price is unsupported")
162 .map(Self)
163 }
164}
165
166impl Sub<i64> for Price {
167 type Output = Option<Price>;
168
169 fn sub(self, rhs: i64) -> Self::Output {
170 let value = self.0.get().saturating_sub(rhs);
171 NonZeroI64::new(value).map(Self)
172 }
173}
174
175impl Add<i64> for Price {
176 type Output = Option<Price>;
177
178 fn add(self, rhs: i64) -> Self::Output {
179 let value = self.0.get().saturating_add(rhs);
180 NonZeroI64::new(value).map(Self)
181 }
182}
183
184impl Add<Price> for Price {
185 type Output = Option<Price>;
186 fn add(self, rhs: Price) -> Self::Output {
187 let value = self.0.get().saturating_add(rhs.0.get());
188 NonZeroI64::new(value).map(Self)
189 }
190}
191
192impl Sub<Price> for Price {
193 type Output = Option<Price>;
194 fn sub(self, rhs: Price) -> Self::Output {
195 let value = self.0.get().saturating_sub(rhs.0.get());
196 NonZeroI64::new(value).map(Self)
197 }
198}
199
200impl Div<i64> for Price {
201 type Output = Option<Price>;
202 fn div(self, rhs: i64) -> Self::Output {
203 let value = self.0.get().saturating_div(rhs);
204 NonZeroI64::new(value).map(Self)
205 }
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub enum PriceFeedProperty {
211 Price,
212 BestBidPrice,
213 BestAskPrice,
214 PublisherCount,
215 Exponent,
216 Confidence,
217 FundingRate,
218 FundingTimestamp,
219 }
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
223#[serde(rename_all = "camelCase")]
224pub enum DeliveryFormat {
225 #[default]
227 Json,
228 Binary,
230}
231
232#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
233#[serde(rename_all = "camelCase")]
234pub enum Format {
235 Evm,
236 Solana,
237 LeEcdsa,
238 LeUnsigned,
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
242#[serde(rename_all = "camelCase")]
243pub enum JsonBinaryEncoding {
244 #[default]
245 Base64,
246 Hex,
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
250pub enum Channel {
251 FixedRate(FixedRate),
252}
253
254impl Serialize for Channel {
255 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
256 where
257 S: serde::Serializer,
258 {
259 match self {
260 Channel::FixedRate(fixed_rate) => {
261 if *fixed_rate == FixedRate::MIN {
262 return serializer.serialize_str("real_time");
263 }
264 serializer.serialize_str(&format!("fixed_rate@{}ms", fixed_rate.value_ms()))
265 }
266 }
267 }
268}
269
270pub mod channel_ids {
271 use super::ChannelId;
272
273 pub const FIXED_RATE_1: ChannelId = ChannelId(1);
274 pub const FIXED_RATE_50: ChannelId = ChannelId(2);
275 pub const FIXED_RATE_200: ChannelId = ChannelId(3);
276}
277
278impl Display for Channel {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 match self {
281 Channel::FixedRate(fixed_rate) => match *fixed_rate {
282 FixedRate::MIN => write!(f, "real_time"),
283 rate => write!(f, "fixed_rate@{}ms", rate.value_ms()),
284 },
285 }
286 }
287}
288
289impl Channel {
290 pub fn id(&self) -> ChannelId {
291 match self {
292 Channel::FixedRate(fixed_rate) => match fixed_rate.value_ms() {
293 1 => channel_ids::FIXED_RATE_1,
294 50 => channel_ids::FIXED_RATE_50,
295 200 => channel_ids::FIXED_RATE_200,
296 _ => panic!("unknown channel: {self:?}"),
297 },
298 }
299 }
300}
301
302#[test]
303fn id_supports_all_fixed_rates() {
304 for rate in FixedRate::ALL {
305 Channel::FixedRate(rate).id();
306 }
307}
308
309fn parse_channel(value: &str) -> Option<Channel> {
310 if value == "real_time" {
311 Some(Channel::FixedRate(FixedRate::MIN))
312 } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
313 let ms_value = rest.strip_suffix("ms")?;
314 Some(Channel::FixedRate(FixedRate::from_ms(
315 ms_value.parse().ok()?,
316 )?))
317 } else {
318 None
319 }
320}
321
322impl<'de> Deserialize<'de> for Channel {
323 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
324 where
325 D: serde::Deserializer<'de>,
326 {
327 let value = <String>::deserialize(deserializer)?;
328 parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
329 }
330}
331
332#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
333pub struct FixedRate {
334 ms: u32,
335}
336
337impl FixedRate {
338 pub const ALL: [Self; 3] = [Self { ms: 1 }, Self { ms: 50 }, Self { ms: 200 }];
343 pub const MIN: Self = Self::ALL[0];
344
345 pub fn from_ms(value: u32) -> Option<Self> {
346 Self::ALL.into_iter().find(|v| v.ms == value)
347 }
348
349 pub fn value_ms(self) -> u32 {
350 self.ms
351 }
352
353 pub fn value_us(self) -> u64 {
354 (self.ms * 1000).into()
355 }
356}
357
358#[test]
359fn fixed_rate_values() {
360 assert!(
361 FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
362 "values must be unique and sorted"
363 );
364 for value in FixedRate::ALL {
365 assert_eq!(
366 1000 % value.ms,
367 0,
368 "1 s must contain whole number of intervals"
369 );
370 assert_eq!(
371 value.value_us() % FixedRate::MIN.value_us(),
372 0,
373 "the interval's borders must be a subset of the minimal interval's borders"
374 );
375 }
376}
377
378#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
379#[serde(rename_all = "camelCase")]
380pub struct SubscriptionParamsRepr {
381 pub price_feed_ids: Vec<PriceFeedId>,
382 pub properties: Vec<PriceFeedProperty>,
383 #[serde(alias = "chains")]
385 pub formats: Vec<Format>,
386 #[serde(default)]
387 pub delivery_format: DeliveryFormat,
388 #[serde(default)]
389 pub json_binary_encoding: JsonBinaryEncoding,
390 #[serde(default = "default_parsed")]
393 pub parsed: bool,
394 pub channel: Channel,
395 #[serde(default)]
396 pub ignore_invalid_feed_ids: bool,
397}
398
399#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
400#[serde(rename_all = "camelCase")]
401pub struct SubscriptionParams(SubscriptionParamsRepr);
402
403impl<'de> Deserialize<'de> for SubscriptionParams {
404 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
405 where
406 D: serde::Deserializer<'de>,
407 {
408 let value = SubscriptionParamsRepr::deserialize(deserializer)?;
409 Self::new(value).map_err(Error::custom)
410 }
411}
412
413impl SubscriptionParams {
414 pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
415 if value.price_feed_ids.is_empty() {
416 return Err("no price feed ids specified");
417 }
418 if !value.price_feed_ids.iter().all_unique() {
419 return Err("duplicate price feed ids specified");
420 }
421 if !value.formats.iter().all_unique() {
422 return Err("duplicate formats or chains specified");
423 }
424 if value.properties.is_empty() {
425 return Err("no properties specified");
426 }
427 if !value.properties.iter().all_unique() {
428 return Err("duplicate properties specified");
429 }
430 Ok(Self(value))
431 }
432}
433
434impl Deref for SubscriptionParams {
435 type Target = SubscriptionParamsRepr;
436
437 fn deref(&self) -> &Self::Target {
438 &self.0
439 }
440}
441impl DerefMut for SubscriptionParams {
442 fn deref_mut(&mut self) -> &mut Self::Target {
443 &mut self.0
444 }
445}
446
447pub fn default_parsed() -> bool {
448 true
449}
450
451#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
452#[serde(rename_all = "camelCase")]
453pub struct JsonBinaryData {
454 pub encoding: JsonBinaryEncoding,
455 pub data: String,
456}
457
458#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
459#[serde(rename_all = "camelCase")]
460pub struct JsonUpdate {
461 #[serde(skip_serializing_if = "Option::is_none")]
463 pub parsed: Option<ParsedPayload>,
464 #[serde(skip_serializing_if = "Option::is_none")]
466 pub evm: Option<JsonBinaryData>,
467 #[serde(skip_serializing_if = "Option::is_none")]
469 pub solana: Option<JsonBinaryData>,
470 #[serde(skip_serializing_if = "Option::is_none")]
472 pub le_ecdsa: Option<JsonBinaryData>,
473 #[serde(skip_serializing_if = "Option::is_none")]
475 pub le_unsigned: Option<JsonBinaryData>,
476}
477
478#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
479#[serde(rename_all = "camelCase")]
480pub struct ParsedPayload {
481 #[serde(with = "crate::serde_str::timestamp")]
482 pub timestamp_us: TimestampUs,
483 pub price_feeds: Vec<ParsedFeedPayload>,
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
487#[serde(rename_all = "camelCase")]
488pub struct ParsedFeedPayload {
489 pub price_feed_id: PriceFeedId,
490 #[serde(skip_serializing_if = "Option::is_none")]
491 #[serde(with = "crate::serde_str::option_price")]
492 #[serde(default)]
493 pub price: Option<Price>,
494 #[serde(skip_serializing_if = "Option::is_none")]
495 #[serde(with = "crate::serde_str::option_price")]
496 #[serde(default)]
497 pub best_bid_price: Option<Price>,
498 #[serde(skip_serializing_if = "Option::is_none")]
499 #[serde(with = "crate::serde_str::option_price")]
500 #[serde(default)]
501 pub best_ask_price: Option<Price>,
502 #[serde(skip_serializing_if = "Option::is_none")]
503 #[serde(default)]
504 pub publisher_count: Option<u16>,
505 #[serde(skip_serializing_if = "Option::is_none")]
506 #[serde(default)]
507 pub exponent: Option<i16>,
508 #[serde(skip_serializing_if = "Option::is_none")]
509 #[serde(default)]
510 pub confidence: Option<Price>,
511 #[serde(skip_serializing_if = "Option::is_none")]
512 #[serde(default)]
513 pub funding_rate: Option<Rate>,
514 #[serde(skip_serializing_if = "Option::is_none")]
515 #[serde(default)]
516 pub funding_timestamp: Option<TimestampUs>,
517 }
519
520impl ParsedFeedPayload {
521 pub fn new(
522 price_feed_id: PriceFeedId,
523 exponent: Option<i16>,
524 data: &AggregatedPriceFeedData,
525 properties: &[PriceFeedProperty],
526 ) -> Self {
527 let mut output = Self {
528 price_feed_id,
529 price: None,
530 best_bid_price: None,
531 best_ask_price: None,
532 publisher_count: None,
533 exponent: None,
534 confidence: None,
535 funding_rate: None,
536 funding_timestamp: None,
537 };
538 for &property in properties {
539 match property {
540 PriceFeedProperty::Price => {
541 output.price = data.price;
542 }
543 PriceFeedProperty::BestBidPrice => {
544 output.best_bid_price = data.best_bid_price;
545 }
546 PriceFeedProperty::BestAskPrice => {
547 output.best_ask_price = data.best_ask_price;
548 }
549 PriceFeedProperty::PublisherCount => {
550 output.publisher_count = Some(data.publisher_count);
551 }
552 PriceFeedProperty::Exponent => {
553 output.exponent = exponent;
554 }
555 PriceFeedProperty::Confidence => {
556 output.confidence = data.confidence;
557 }
558 PriceFeedProperty::FundingRate => {
559 output.funding_rate = data.funding_rate;
560 }
561 PriceFeedProperty::FundingTimestamp => {
562 output.funding_timestamp = data.funding_timestamp;
563 }
564 }
565 }
566 output
567 }
568
569 pub fn new_full(
570 price_feed_id: PriceFeedId,
571 exponent: Option<i16>,
572 data: &AggregatedPriceFeedData,
573 ) -> Self {
574 Self {
575 price_feed_id,
576 price: data.price,
577 best_bid_price: data.best_bid_price,
578 best_ask_price: data.best_ask_price,
579 publisher_count: Some(data.publisher_count),
580 exponent,
581 confidence: data.confidence,
582 funding_rate: data.funding_rate,
583 funding_timestamp: data.funding_timestamp,
584 }
585 }
586}