pyth_lazer_protocol/
router.rs

1//! WebSocket JSON protocol types for the API the router provides to consumers and publishers.
2
3use {
4    crate::{
5        payload::AggregatedPriceFeedData,
6        time::{DurationUs, TimestampUs},
7    },
8    anyhow::{bail, Context},
9    derive_more::derive::{From, Into},
10    itertools::Itertools,
11    protobuf::well_known_types::duration::Duration as ProtobufDuration,
12    rust_decimal::{prelude::FromPrimitive, Decimal},
13    serde::{de::Error, Deserialize, Serialize},
14    std::{
15        cmp::Ordering,
16        fmt::Display,
17        num::NonZeroI64,
18        ops::{Add, Deref, DerefMut, Div, Sub},
19    },
20};
21
22#[derive(
23    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
24)]
25pub struct PublisherId(pub u16);
26
27#[derive(
28    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
29)]
30pub struct PriceFeedId(pub u32);
31
32#[derive(
33    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
34)]
35pub struct ChannelId(pub u8);
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
38#[repr(transparent)]
39pub struct Rate(pub i64);
40
41impl Rate {
42    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
43        let value: Decimal = value.parse()?;
44        let coef = 10i64.checked_pow(exponent).context("overflow")?;
45        let coef = Decimal::from_i64(coef).context("overflow")?;
46        let value = value.checked_mul(coef).context("overflow")?;
47        if !value.is_integer() {
48            bail!("price value is more precise than available exponent");
49        }
50        let value: i64 = value.try_into().context("overflow")?;
51        Ok(Self(value))
52    }
53
54    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
55        let value = Decimal::from_f64(value).context("overflow")?;
56        let coef = 10i64.checked_pow(exponent).context("overflow")?;
57        let coef = Decimal::from_i64(coef).context("overflow")?;
58        let value = value.checked_mul(coef).context("overflow")?;
59        let value: i64 = value.try_into().context("overflow")?;
60        Ok(Self(value))
61    }
62
63    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
64        let coef = 10i64.checked_pow(exponent).context("overflow")?;
65        let value = value.checked_mul(coef).context("overflow")?;
66        Ok(Self(value))
67    }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
71#[repr(transparent)]
72pub struct Price(pub NonZeroI64);
73
74impl Price {
75    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
76        let coef = 10i64.checked_pow(exponent).context("overflow")?;
77        let value = value.checked_mul(coef).context("overflow")?;
78        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
79        Ok(Self(value))
80    }
81
82    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
83        let value: Decimal = value.parse()?;
84        let coef = 10i64.checked_pow(exponent).context("overflow")?;
85        let coef = Decimal::from_i64(coef).context("overflow")?;
86        let value = value.checked_mul(coef).context("overflow")?;
87        if !value.is_integer() {
88            bail!("price value is more precise than available exponent");
89        }
90        let value: i64 = value.try_into().context("overflow")?;
91        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
92        Ok(Self(value))
93    }
94
95    pub fn new(value: i64) -> anyhow::Result<Self> {
96        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
97        Ok(Self(value))
98    }
99
100    pub fn into_inner(self) -> NonZeroI64 {
101        self.0
102    }
103
104    pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
105        Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
106    }
107
108    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
109        let value = (value * 10f64.powi(exponent as i32)) as i64;
110        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
111        Ok(Self(value))
112    }
113
114    pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
115        let left_value = i128::from(self.0.get());
116        let right_value = i128::from(rhs.0.get());
117
118        let value = left_value * right_value / 10i128.pow(rhs_exponent);
119        let value = value.try_into()?;
120        NonZeroI64::new(value)
121            .context("zero price is unsupported")
122            .map(Self)
123    }
124}
125
126impl Sub<i64> for Price {
127    type Output = Option<Price>;
128
129    fn sub(self, rhs: i64) -> Self::Output {
130        let value = self.0.get().saturating_sub(rhs);
131        NonZeroI64::new(value).map(Self)
132    }
133}
134
135impl Add<i64> for Price {
136    type Output = Option<Price>;
137
138    fn add(self, rhs: i64) -> Self::Output {
139        let value = self.0.get().saturating_add(rhs);
140        NonZeroI64::new(value).map(Self)
141    }
142}
143
144impl Add<Price> for Price {
145    type Output = Option<Price>;
146    fn add(self, rhs: Price) -> Self::Output {
147        let value = self.0.get().saturating_add(rhs.0.get());
148        NonZeroI64::new(value).map(Self)
149    }
150}
151
152impl Sub<Price> for Price {
153    type Output = Option<Price>;
154    fn sub(self, rhs: Price) -> Self::Output {
155        let value = self.0.get().saturating_sub(rhs.0.get());
156        NonZeroI64::new(value).map(Self)
157    }
158}
159
160impl Div<i64> for Price {
161    type Output = Option<Price>;
162    fn div(self, rhs: i64) -> Self::Output {
163        let value = self.0.get().saturating_div(rhs);
164        NonZeroI64::new(value).map(Self)
165    }
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
169#[serde(rename_all = "camelCase")]
170pub enum PriceFeedProperty {
171    Price,
172    BestBidPrice,
173    BestAskPrice,
174    PublisherCount,
175    Exponent,
176    Confidence,
177    FundingRate,
178    FundingTimestamp,
179    FundingRateInterval,
180    // More fields may be added later.
181}
182
183#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
184#[serde(rename_all = "camelCase")]
185pub enum DeliveryFormat {
186    /// Deliver stream updates as JSON text messages.
187    #[default]
188    Json,
189    /// Deliver stream updates as binary messages.
190    Binary,
191}
192
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
194#[serde(rename_all = "camelCase")]
195pub enum Format {
196    Evm,
197    Solana,
198    LeEcdsa,
199    LeUnsigned,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
203#[serde(rename_all = "camelCase")]
204pub enum JsonBinaryEncoding {
205    #[default]
206    Base64,
207    Hex,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, From)]
211pub enum Channel {
212    FixedRate(FixedRate),
213    RealTime,
214}
215
216impl PartialOrd for Channel {
217    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
218        let rate_left = match self {
219            Channel::FixedRate(rate) => rate.duration().as_micros(),
220            Channel::RealTime => FixedRate::MIN.duration().as_micros(),
221        };
222        let rate_right = match other {
223            Channel::FixedRate(rate) => rate.duration().as_micros(),
224            Channel::RealTime => FixedRate::MIN.duration().as_micros(),
225        };
226        Some(rate_left.cmp(&rate_right))
227    }
228}
229
230impl Serialize for Channel {
231    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
232    where
233        S: serde::Serializer,
234    {
235        match self {
236            Channel::FixedRate(fixed_rate) => serializer.serialize_str(&format!(
237                "fixed_rate@{}ms",
238                fixed_rate.duration().as_millis()
239            )),
240            Channel::RealTime => serializer.serialize_str("real_time"),
241        }
242    }
243}
244
245pub mod channel_ids {
246    use super::ChannelId;
247
248    pub const REAL_TIME: ChannelId = ChannelId(1);
249    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
250    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
251}
252
253impl Display for Channel {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        match self {
256            Channel::FixedRate(fixed_rate) => {
257                write!(f, "fixed_rate@{}ms", fixed_rate.duration().as_millis())
258            }
259            Channel::RealTime => write!(f, "real_time"),
260        }
261    }
262}
263
264impl Channel {
265    pub fn id(&self) -> ChannelId {
266        match self {
267            Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
268                50 => channel_ids::FIXED_RATE_50,
269                200 => channel_ids::FIXED_RATE_200,
270                _ => panic!("unknown channel: {self:?}"),
271            },
272            Channel::RealTime => channel_ids::REAL_TIME,
273        }
274    }
275}
276
277#[test]
278fn id_supports_all_fixed_rates() {
279    for rate in FixedRate::ALL {
280        Channel::FixedRate(rate).id();
281    }
282}
283
284fn parse_channel(value: &str) -> Option<Channel> {
285    if value == "real_time" {
286        Some(Channel::RealTime)
287    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
288        let ms_value = rest.strip_suffix("ms")?;
289        Some(Channel::FixedRate(FixedRate::from_millis(
290            ms_value.parse().ok()?,
291        )?))
292    } else {
293        None
294    }
295}
296
297impl<'de> Deserialize<'de> for Channel {
298    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
299    where
300        D: serde::Deserializer<'de>,
301    {
302        let value = <String>::deserialize(deserializer)?;
303        parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
304    }
305}
306
307#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
308pub struct FixedRate {
309    rate: DurationUs,
310}
311
312impl FixedRate {
313    pub const RATE_50_MS: Self = Self {
314        rate: DurationUs::from_millis_u32(50),
315    };
316    pub const RATE_200_MS: Self = Self {
317        rate: DurationUs::from_millis_u32(200),
318    };
319
320    // Assumptions (tested below):
321    // - Values are sorted.
322    // - 1 second contains a whole number of each interval.
323    // - all intervals are divisable by the smallest interval.
324    pub const ALL: [Self; 2] = [Self::RATE_50_MS, Self::RATE_200_MS];
325    pub const MIN: Self = Self::ALL[0];
326
327    pub fn from_millis(millis: u32) -> Option<Self> {
328        Self::ALL
329            .into_iter()
330            .find(|v| v.rate.as_millis() == u64::from(millis))
331    }
332
333    pub fn duration(self) -> DurationUs {
334        self.rate
335    }
336}
337
338impl TryFrom<DurationUs> for FixedRate {
339    type Error = anyhow::Error;
340
341    fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
342        Self::ALL
343            .into_iter()
344            .find(|v| v.rate == value)
345            .with_context(|| format!("unsupported rate: {value:?}"))
346    }
347}
348
349impl TryFrom<&ProtobufDuration> for FixedRate {
350    type Error = anyhow::Error;
351
352    fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
353        let duration = DurationUs::try_from(value)?;
354        Self::try_from(duration)
355    }
356}
357
358impl TryFrom<ProtobufDuration> for FixedRate {
359    type Error = anyhow::Error;
360
361    fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
362        TryFrom::<&ProtobufDuration>::try_from(&duration)
363    }
364}
365
366impl From<FixedRate> for DurationUs {
367    fn from(value: FixedRate) -> Self {
368        value.rate
369    }
370}
371
372impl From<FixedRate> for ProtobufDuration {
373    fn from(value: FixedRate) -> Self {
374        value.rate.into()
375    }
376}
377
378#[test]
379fn fixed_rate_values() {
380    assert!(
381        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
382        "values must be unique and sorted"
383    );
384    for value in FixedRate::ALL {
385        assert_eq!(
386            1_000_000 % value.duration().as_micros(),
387            0,
388            "1 s must contain whole number of intervals"
389        );
390        assert_eq!(
391            value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
392            0,
393            "the interval's borders must be a subset of the minimal interval's borders"
394        );
395    }
396}
397
398#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
399#[serde(rename_all = "camelCase")]
400pub struct SubscriptionParamsRepr {
401    pub price_feed_ids: Vec<PriceFeedId>,
402    pub properties: Vec<PriceFeedProperty>,
403    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
404    #[serde(alias = "chains")]
405    pub formats: Vec<Format>,
406    #[serde(default)]
407    pub delivery_format: DeliveryFormat,
408    #[serde(default)]
409    pub json_binary_encoding: JsonBinaryEncoding,
410    /// If `true`, the stream update will contain a `parsed` JSON field containing
411    /// all data of the update.
412    #[serde(default = "default_parsed")]
413    pub parsed: bool,
414    pub channel: Channel,
415    #[serde(default)]
416    pub ignore_invalid_feed_ids: bool,
417}
418
419#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
420#[serde(rename_all = "camelCase")]
421pub struct SubscriptionParams(SubscriptionParamsRepr);
422
423impl<'de> Deserialize<'de> for SubscriptionParams {
424    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
425    where
426        D: serde::Deserializer<'de>,
427    {
428        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
429        Self::new(value).map_err(Error::custom)
430    }
431}
432
433impl SubscriptionParams {
434    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
435        if value.price_feed_ids.is_empty() {
436            return Err("no price feed ids specified");
437        }
438        if !value.price_feed_ids.iter().all_unique() {
439            return Err("duplicate price feed ids specified");
440        }
441        if !value.formats.iter().all_unique() {
442            return Err("duplicate formats or chains specified");
443        }
444        if value.properties.is_empty() {
445            return Err("no properties specified");
446        }
447        if !value.properties.iter().all_unique() {
448            return Err("duplicate properties specified");
449        }
450        Ok(Self(value))
451    }
452}
453
454impl Deref for SubscriptionParams {
455    type Target = SubscriptionParamsRepr;
456
457    fn deref(&self) -> &Self::Target {
458        &self.0
459    }
460}
461impl DerefMut for SubscriptionParams {
462    fn deref_mut(&mut self) -> &mut Self::Target {
463        &mut self.0
464    }
465}
466
467pub fn default_parsed() -> bool {
468    true
469}
470
471#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
472#[serde(rename_all = "camelCase")]
473pub struct JsonBinaryData {
474    pub encoding: JsonBinaryEncoding,
475    pub data: String,
476}
477
478#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
479#[serde(rename_all = "camelCase")]
480pub struct JsonUpdate {
481    /// Present unless `parsed = false` is specified in subscription params.
482    #[serde(skip_serializing_if = "Option::is_none")]
483    pub parsed: Option<ParsedPayload>,
484    /// Only present if `Evm` is present in `formats` in subscription params.
485    #[serde(skip_serializing_if = "Option::is_none")]
486    pub evm: Option<JsonBinaryData>,
487    /// Only present if `Solana` is present in `formats` in subscription params.
488    #[serde(skip_serializing_if = "Option::is_none")]
489    pub solana: Option<JsonBinaryData>,
490    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
491    #[serde(skip_serializing_if = "Option::is_none")]
492    pub le_ecdsa: Option<JsonBinaryData>,
493    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
494    #[serde(skip_serializing_if = "Option::is_none")]
495    pub le_unsigned: Option<JsonBinaryData>,
496}
497
498#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
499#[serde(rename_all = "camelCase")]
500pub struct ParsedPayload {
501    #[serde(with = "crate::serde_str::timestamp")]
502    pub timestamp_us: TimestampUs,
503    pub price_feeds: Vec<ParsedFeedPayload>,
504}
505
506#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
507#[serde(rename_all = "camelCase")]
508pub struct ParsedFeedPayload {
509    pub price_feed_id: PriceFeedId,
510    #[serde(skip_serializing_if = "Option::is_none")]
511    #[serde(with = "crate::serde_str::option_price")]
512    #[serde(default)]
513    pub price: Option<Price>,
514    #[serde(skip_serializing_if = "Option::is_none")]
515    #[serde(with = "crate::serde_str::option_price")]
516    #[serde(default)]
517    pub best_bid_price: Option<Price>,
518    #[serde(skip_serializing_if = "Option::is_none")]
519    #[serde(with = "crate::serde_str::option_price")]
520    #[serde(default)]
521    pub best_ask_price: Option<Price>,
522    #[serde(skip_serializing_if = "Option::is_none")]
523    #[serde(default)]
524    pub publisher_count: Option<u16>,
525    #[serde(skip_serializing_if = "Option::is_none")]
526    #[serde(default)]
527    pub exponent: Option<i16>,
528    #[serde(skip_serializing_if = "Option::is_none")]
529    #[serde(default)]
530    pub confidence: Option<Price>,
531    #[serde(skip_serializing_if = "Option::is_none")]
532    #[serde(default)]
533    pub funding_rate: Option<Rate>,
534    #[serde(skip_serializing_if = "Option::is_none")]
535    #[serde(default)]
536    pub funding_timestamp: Option<TimestampUs>,
537    // More fields may be added later.
538    #[serde(skip_serializing_if = "Option::is_none")]
539    #[serde(default)]
540    pub funding_rate_interval: Option<DurationUs>,
541}
542
543impl ParsedFeedPayload {
544    pub fn new(
545        price_feed_id: PriceFeedId,
546        exponent: Option<i16>,
547        data: &AggregatedPriceFeedData,
548        properties: &[PriceFeedProperty],
549    ) -> Self {
550        let mut output = Self {
551            price_feed_id,
552            price: None,
553            best_bid_price: None,
554            best_ask_price: None,
555            publisher_count: None,
556            exponent: None,
557            confidence: None,
558            funding_rate: None,
559            funding_timestamp: None,
560            funding_rate_interval: None,
561        };
562        for &property in properties {
563            match property {
564                PriceFeedProperty::Price => {
565                    output.price = data.price;
566                }
567                PriceFeedProperty::BestBidPrice => {
568                    output.best_bid_price = data.best_bid_price;
569                }
570                PriceFeedProperty::BestAskPrice => {
571                    output.best_ask_price = data.best_ask_price;
572                }
573                PriceFeedProperty::PublisherCount => {
574                    output.publisher_count = Some(data.publisher_count);
575                }
576                PriceFeedProperty::Exponent => {
577                    output.exponent = exponent;
578                }
579                PriceFeedProperty::Confidence => {
580                    output.confidence = data.confidence;
581                }
582                PriceFeedProperty::FundingRate => {
583                    output.funding_rate = data.funding_rate;
584                }
585                PriceFeedProperty::FundingTimestamp => {
586                    output.funding_timestamp = data.funding_timestamp;
587                }
588                PriceFeedProperty::FundingRateInterval => {
589                    output.funding_rate_interval = data.funding_rate_interval;
590                }
591            }
592        }
593        output
594    }
595
596    pub fn new_full(
597        price_feed_id: PriceFeedId,
598        exponent: Option<i16>,
599        data: &AggregatedPriceFeedData,
600    ) -> Self {
601        Self {
602            price_feed_id,
603            price: data.price,
604            best_bid_price: data.best_bid_price,
605            best_ask_price: data.best_ask_price,
606            publisher_count: Some(data.publisher_count),
607            exponent,
608            confidence: data.confidence,
609            funding_rate: data.funding_rate,
610            funding_timestamp: data.funding_timestamp,
611            funding_rate_interval: data.funding_rate_interval,
612        }
613    }
614}