1use {
4 crate::{
5 payload::AggregatedPriceFeedData,
6 time::{DurationUs, TimestampUs},
7 },
8 anyhow::{bail, Context},
9 derive_more::derive::{From, Into},
10 itertools::Itertools,
11 protobuf::well_known_types::duration::Duration as ProtobufDuration,
12 rust_decimal::{prelude::FromPrimitive, Decimal},
13 serde::{de::Error, Deserialize, Serialize},
14 std::{
15 cmp::Ordering,
16 fmt::Display,
17 num::NonZeroI64,
18 ops::{Add, Deref, DerefMut, Div, Sub},
19 },
20};
21
22#[derive(
23 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
24)]
25pub struct PublisherId(pub u16);
26
27#[derive(
28 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
29)]
30pub struct PriceFeedId(pub u32);
31
32#[derive(
33 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
34)]
35pub struct ChannelId(pub u8);
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
38#[repr(transparent)]
39pub struct Rate(pub i64);
40
41impl Rate {
42 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
43 let value: Decimal = value.parse()?;
44 let coef = 10i64.checked_pow(exponent).context("overflow")?;
45 let coef = Decimal::from_i64(coef).context("overflow")?;
46 let value = value.checked_mul(coef).context("overflow")?;
47 if !value.is_integer() {
48 bail!("price value is more precise than available exponent");
49 }
50 let value: i64 = value.try_into().context("overflow")?;
51 Ok(Self(value))
52 }
53
54 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
55 let value = Decimal::from_f64(value).context("overflow")?;
56 let coef = 10i64.checked_pow(exponent).context("overflow")?;
57 let coef = Decimal::from_i64(coef).context("overflow")?;
58 let value = value.checked_mul(coef).context("overflow")?;
59 let value: i64 = value.try_into().context("overflow")?;
60 Ok(Self(value))
61 }
62
63 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
64 let coef = 10i64.checked_pow(exponent).context("overflow")?;
65 let value = value.checked_mul(coef).context("overflow")?;
66 Ok(Self(value))
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
71#[repr(transparent)]
72pub struct Price(pub NonZeroI64);
73
74impl Price {
75 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
76 let coef = 10i64.checked_pow(exponent).context("overflow")?;
77 let value = value.checked_mul(coef).context("overflow")?;
78 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
79 Ok(Self(value))
80 }
81
82 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
83 let value: Decimal = value.parse()?;
84 let coef = 10i64.checked_pow(exponent).context("overflow")?;
85 let coef = Decimal::from_i64(coef).context("overflow")?;
86 let value = value.checked_mul(coef).context("overflow")?;
87 if !value.is_integer() {
88 bail!("price value is more precise than available exponent");
89 }
90 let value: i64 = value.try_into().context("overflow")?;
91 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
92 Ok(Self(value))
93 }
94
95 pub fn new(value: i64) -> anyhow::Result<Self> {
96 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
97 Ok(Self(value))
98 }
99
100 pub fn into_inner(self) -> NonZeroI64 {
101 self.0
102 }
103
104 pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
105 Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
106 }
107
108 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
109 let value = (value * 10f64.powi(exponent as i32)) as i64;
110 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
111 Ok(Self(value))
112 }
113
114 pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
115 let left_value = i128::from(self.0.get());
116 let right_value = i128::from(rhs.0.get());
117
118 let value = left_value * right_value / 10i128.pow(rhs_exponent);
119 let value = value.try_into()?;
120 NonZeroI64::new(value)
121 .context("zero price is unsupported")
122 .map(Self)
123 }
124}
125
126impl Sub<i64> for Price {
127 type Output = Option<Price>;
128
129 fn sub(self, rhs: i64) -> Self::Output {
130 let value = self.0.get().saturating_sub(rhs);
131 NonZeroI64::new(value).map(Self)
132 }
133}
134
135impl Add<i64> for Price {
136 type Output = Option<Price>;
137
138 fn add(self, rhs: i64) -> Self::Output {
139 let value = self.0.get().saturating_add(rhs);
140 NonZeroI64::new(value).map(Self)
141 }
142}
143
144impl Add<Price> for Price {
145 type Output = Option<Price>;
146 fn add(self, rhs: Price) -> Self::Output {
147 let value = self.0.get().saturating_add(rhs.0.get());
148 NonZeroI64::new(value).map(Self)
149 }
150}
151
152impl Sub<Price> for Price {
153 type Output = Option<Price>;
154 fn sub(self, rhs: Price) -> Self::Output {
155 let value = self.0.get().saturating_sub(rhs.0.get());
156 NonZeroI64::new(value).map(Self)
157 }
158}
159
160impl Div<i64> for Price {
161 type Output = Option<Price>;
162 fn div(self, rhs: i64) -> Self::Output {
163 let value = self.0.get().saturating_div(rhs);
164 NonZeroI64::new(value).map(Self)
165 }
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
169#[serde(rename_all = "camelCase")]
170pub enum PriceFeedProperty {
171 Price,
172 BestBidPrice,
173 BestAskPrice,
174 PublisherCount,
175 Exponent,
176 Confidence,
177 FundingRate,
178 FundingTimestamp,
179 FundingRateInterval,
180 }
182
183#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
184#[serde(rename_all = "camelCase")]
185pub enum DeliveryFormat {
186 #[default]
188 Json,
189 Binary,
191}
192
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
194#[serde(rename_all = "camelCase")]
195pub enum Format {
196 Evm,
197 Solana,
198 LeEcdsa,
199 LeUnsigned,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
203#[serde(rename_all = "camelCase")]
204pub enum JsonBinaryEncoding {
205 #[default]
206 Base64,
207 Hex,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, From)]
211pub enum Channel {
212 FixedRate(FixedRate),
213 RealTime,
214}
215
216impl PartialOrd for Channel {
217 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
218 let rate_left = match self {
219 Channel::FixedRate(rate) => rate.duration().as_micros(),
220 Channel::RealTime => FixedRate::MIN.duration().as_micros(),
221 };
222 let rate_right = match other {
223 Channel::FixedRate(rate) => rate.duration().as_micros(),
224 Channel::RealTime => FixedRate::MIN.duration().as_micros(),
225 };
226 Some(rate_left.cmp(&rate_right))
227 }
228}
229
230impl Serialize for Channel {
231 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
232 where
233 S: serde::Serializer,
234 {
235 match self {
236 Channel::FixedRate(fixed_rate) => serializer.serialize_str(&format!(
237 "fixed_rate@{}ms",
238 fixed_rate.duration().as_millis()
239 )),
240 Channel::RealTime => serializer.serialize_str("real_time"),
241 }
242 }
243}
244
245pub mod channel_ids {
246 use super::ChannelId;
247
248 pub const REAL_TIME: ChannelId = ChannelId(1);
249 pub const FIXED_RATE_50: ChannelId = ChannelId(2);
250 pub const FIXED_RATE_200: ChannelId = ChannelId(3);
251}
252
253impl Display for Channel {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 match self {
256 Channel::FixedRate(fixed_rate) => {
257 write!(f, "fixed_rate@{}ms", fixed_rate.duration().as_millis())
258 }
259 Channel::RealTime => write!(f, "real_time"),
260 }
261 }
262}
263
264impl Channel {
265 pub fn id(&self) -> ChannelId {
266 match self {
267 Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
268 50 => channel_ids::FIXED_RATE_50,
269 200 => channel_ids::FIXED_RATE_200,
270 _ => panic!("unknown channel: {self:?}"),
271 },
272 Channel::RealTime => channel_ids::REAL_TIME,
273 }
274 }
275}
276
277#[test]
278fn id_supports_all_fixed_rates() {
279 for rate in FixedRate::ALL {
280 Channel::FixedRate(rate).id();
281 }
282}
283
284fn parse_channel(value: &str) -> Option<Channel> {
285 if value == "real_time" {
286 Some(Channel::RealTime)
287 } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
288 let ms_value = rest.strip_suffix("ms")?;
289 Some(Channel::FixedRate(FixedRate::from_millis(
290 ms_value.parse().ok()?,
291 )?))
292 } else {
293 None
294 }
295}
296
297impl<'de> Deserialize<'de> for Channel {
298 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
299 where
300 D: serde::Deserializer<'de>,
301 {
302 let value = <String>::deserialize(deserializer)?;
303 parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
304 }
305}
306
307#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
308pub struct FixedRate {
309 rate: DurationUs,
310}
311
312impl FixedRate {
313 pub const RATE_50_MS: Self = Self {
314 rate: DurationUs::from_millis_u32(50),
315 };
316 pub const RATE_200_MS: Self = Self {
317 rate: DurationUs::from_millis_u32(200),
318 };
319
320 pub const ALL: [Self; 2] = [Self::RATE_50_MS, Self::RATE_200_MS];
325 pub const MIN: Self = Self::ALL[0];
326
327 pub fn from_millis(millis: u32) -> Option<Self> {
328 Self::ALL
329 .into_iter()
330 .find(|v| v.rate.as_millis() == u64::from(millis))
331 }
332
333 pub fn duration(self) -> DurationUs {
334 self.rate
335 }
336}
337
338impl TryFrom<DurationUs> for FixedRate {
339 type Error = anyhow::Error;
340
341 fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
342 Self::ALL
343 .into_iter()
344 .find(|v| v.rate == value)
345 .with_context(|| format!("unsupported rate: {value:?}"))
346 }
347}
348
349impl TryFrom<&ProtobufDuration> for FixedRate {
350 type Error = anyhow::Error;
351
352 fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
353 let duration = DurationUs::try_from(value)?;
354 Self::try_from(duration)
355 }
356}
357
358impl TryFrom<ProtobufDuration> for FixedRate {
359 type Error = anyhow::Error;
360
361 fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
362 TryFrom::<&ProtobufDuration>::try_from(&duration)
363 }
364}
365
366impl From<FixedRate> for DurationUs {
367 fn from(value: FixedRate) -> Self {
368 value.rate
369 }
370}
371
372impl From<FixedRate> for ProtobufDuration {
373 fn from(value: FixedRate) -> Self {
374 value.rate.into()
375 }
376}
377
378#[test]
379fn fixed_rate_values() {
380 assert!(
381 FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
382 "values must be unique and sorted"
383 );
384 for value in FixedRate::ALL {
385 assert_eq!(
386 1_000_000 % value.duration().as_micros(),
387 0,
388 "1 s must contain whole number of intervals"
389 );
390 assert_eq!(
391 value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
392 0,
393 "the interval's borders must be a subset of the minimal interval's borders"
394 );
395 }
396}
397
398#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
399#[serde(rename_all = "camelCase")]
400pub struct SubscriptionParamsRepr {
401 pub price_feed_ids: Vec<PriceFeedId>,
402 pub properties: Vec<PriceFeedProperty>,
403 #[serde(alias = "chains")]
405 pub formats: Vec<Format>,
406 #[serde(default)]
407 pub delivery_format: DeliveryFormat,
408 #[serde(default)]
409 pub json_binary_encoding: JsonBinaryEncoding,
410 #[serde(default = "default_parsed")]
413 pub parsed: bool,
414 pub channel: Channel,
415 #[serde(default)]
416 pub ignore_invalid_feed_ids: bool,
417}
418
419#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
420#[serde(rename_all = "camelCase")]
421pub struct SubscriptionParams(SubscriptionParamsRepr);
422
423impl<'de> Deserialize<'de> for SubscriptionParams {
424 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
425 where
426 D: serde::Deserializer<'de>,
427 {
428 let value = SubscriptionParamsRepr::deserialize(deserializer)?;
429 Self::new(value).map_err(Error::custom)
430 }
431}
432
433impl SubscriptionParams {
434 pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
435 if value.price_feed_ids.is_empty() {
436 return Err("no price feed ids specified");
437 }
438 if !value.price_feed_ids.iter().all_unique() {
439 return Err("duplicate price feed ids specified");
440 }
441 if !value.formats.iter().all_unique() {
442 return Err("duplicate formats or chains specified");
443 }
444 if value.properties.is_empty() {
445 return Err("no properties specified");
446 }
447 if !value.properties.iter().all_unique() {
448 return Err("duplicate properties specified");
449 }
450 Ok(Self(value))
451 }
452}
453
454impl Deref for SubscriptionParams {
455 type Target = SubscriptionParamsRepr;
456
457 fn deref(&self) -> &Self::Target {
458 &self.0
459 }
460}
461impl DerefMut for SubscriptionParams {
462 fn deref_mut(&mut self) -> &mut Self::Target {
463 &mut self.0
464 }
465}
466
467pub fn default_parsed() -> bool {
468 true
469}
470
471#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
472#[serde(rename_all = "camelCase")]
473pub struct JsonBinaryData {
474 pub encoding: JsonBinaryEncoding,
475 pub data: String,
476}
477
478#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
479#[serde(rename_all = "camelCase")]
480pub struct JsonUpdate {
481 #[serde(skip_serializing_if = "Option::is_none")]
483 pub parsed: Option<ParsedPayload>,
484 #[serde(skip_serializing_if = "Option::is_none")]
486 pub evm: Option<JsonBinaryData>,
487 #[serde(skip_serializing_if = "Option::is_none")]
489 pub solana: Option<JsonBinaryData>,
490 #[serde(skip_serializing_if = "Option::is_none")]
492 pub le_ecdsa: Option<JsonBinaryData>,
493 #[serde(skip_serializing_if = "Option::is_none")]
495 pub le_unsigned: Option<JsonBinaryData>,
496}
497
498#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
499#[serde(rename_all = "camelCase")]
500pub struct ParsedPayload {
501 #[serde(with = "crate::serde_str::timestamp")]
502 pub timestamp_us: TimestampUs,
503 pub price_feeds: Vec<ParsedFeedPayload>,
504}
505
506#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
507#[serde(rename_all = "camelCase")]
508pub struct ParsedFeedPayload {
509 pub price_feed_id: PriceFeedId,
510 #[serde(skip_serializing_if = "Option::is_none")]
511 #[serde(with = "crate::serde_str::option_price")]
512 #[serde(default)]
513 pub price: Option<Price>,
514 #[serde(skip_serializing_if = "Option::is_none")]
515 #[serde(with = "crate::serde_str::option_price")]
516 #[serde(default)]
517 pub best_bid_price: Option<Price>,
518 #[serde(skip_serializing_if = "Option::is_none")]
519 #[serde(with = "crate::serde_str::option_price")]
520 #[serde(default)]
521 pub best_ask_price: Option<Price>,
522 #[serde(skip_serializing_if = "Option::is_none")]
523 #[serde(default)]
524 pub publisher_count: Option<u16>,
525 #[serde(skip_serializing_if = "Option::is_none")]
526 #[serde(default)]
527 pub exponent: Option<i16>,
528 #[serde(skip_serializing_if = "Option::is_none")]
529 #[serde(default)]
530 pub confidence: Option<Price>,
531 #[serde(skip_serializing_if = "Option::is_none")]
532 #[serde(default)]
533 pub funding_rate: Option<Rate>,
534 #[serde(skip_serializing_if = "Option::is_none")]
535 #[serde(default)]
536 pub funding_timestamp: Option<TimestampUs>,
537 #[serde(skip_serializing_if = "Option::is_none")]
539 #[serde(default)]
540 pub funding_rate_interval: Option<DurationUs>,
541}
542
543impl ParsedFeedPayload {
544 pub fn new(
545 price_feed_id: PriceFeedId,
546 exponent: Option<i16>,
547 data: &AggregatedPriceFeedData,
548 properties: &[PriceFeedProperty],
549 ) -> Self {
550 let mut output = Self {
551 price_feed_id,
552 price: None,
553 best_bid_price: None,
554 best_ask_price: None,
555 publisher_count: None,
556 exponent: None,
557 confidence: None,
558 funding_rate: None,
559 funding_timestamp: None,
560 funding_rate_interval: None,
561 };
562 for &property in properties {
563 match property {
564 PriceFeedProperty::Price => {
565 output.price = data.price;
566 }
567 PriceFeedProperty::BestBidPrice => {
568 output.best_bid_price = data.best_bid_price;
569 }
570 PriceFeedProperty::BestAskPrice => {
571 output.best_ask_price = data.best_ask_price;
572 }
573 PriceFeedProperty::PublisherCount => {
574 output.publisher_count = Some(data.publisher_count);
575 }
576 PriceFeedProperty::Exponent => {
577 output.exponent = exponent;
578 }
579 PriceFeedProperty::Confidence => {
580 output.confidence = data.confidence;
581 }
582 PriceFeedProperty::FundingRate => {
583 output.funding_rate = data.funding_rate;
584 }
585 PriceFeedProperty::FundingTimestamp => {
586 output.funding_timestamp = data.funding_timestamp;
587 }
588 PriceFeedProperty::FundingRateInterval => {
589 output.funding_rate_interval = data.funding_rate_interval;
590 }
591 }
592 }
593 output
594 }
595
596 pub fn new_full(
597 price_feed_id: PriceFeedId,
598 exponent: Option<i16>,
599 data: &AggregatedPriceFeedData,
600 ) -> Self {
601 Self {
602 price_feed_id,
603 price: data.price,
604 best_bid_price: data.best_bid_price,
605 best_ask_price: data.best_ask_price,
606 publisher_count: Some(data.publisher_count),
607 exponent,
608 confidence: data.confidence,
609 funding_rate: data.funding_rate,
610 funding_timestamp: data.funding_timestamp,
611 funding_rate_interval: data.funding_rate_interval,
612 }
613 }
614}