pyth_lazer_protocol/
router.rs

1//! WebSocket JSON protocol types for the API the router provides to consumers and publishers.
2
3use {
4    crate::{
5        payload::AggregatedPriceFeedData,
6        time::{DurationUs, TimestampUs},
7    },
8    anyhow::{bail, Context},
9    derive_more::derive::{From, Into},
10    itertools::Itertools,
11    protobuf::well_known_types::duration::Duration as ProtobufDuration,
12    rust_decimal::{prelude::FromPrimitive, Decimal},
13    serde::{de::Error, Deserialize, Serialize},
14    std::{
15        fmt::Display,
16        num::NonZeroI64,
17        ops::{Add, Deref, DerefMut, Div, Sub},
18    },
19};
20
21#[derive(
22    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
23)]
24pub struct PublisherId(pub u16);
25
26#[derive(
27    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
28)]
29pub struct PriceFeedId(pub u32);
30
31#[derive(
32    Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
33)]
34pub struct ChannelId(pub u8);
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
37#[repr(transparent)]
38pub struct Rate(pub i64);
39
40impl Rate {
41    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
42        let value: Decimal = value.parse()?;
43        let coef = 10i64.checked_pow(exponent).context("overflow")?;
44        let coef = Decimal::from_i64(coef).context("overflow")?;
45        let value = value.checked_mul(coef).context("overflow")?;
46        if !value.is_integer() {
47            bail!("price value is more precise than available exponent");
48        }
49        let value: i64 = value.try_into().context("overflow")?;
50        Ok(Self(value))
51    }
52
53    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
54        let value = Decimal::from_f64(value).context("overflow")?;
55        let coef = 10i64.checked_pow(exponent).context("overflow")?;
56        let coef = Decimal::from_i64(coef).context("overflow")?;
57        let value = value.checked_mul(coef).context("overflow")?;
58        let value: i64 = value.try_into().context("overflow")?;
59        Ok(Self(value))
60    }
61
62    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
63        let coef = 10i64.checked_pow(exponent).context("overflow")?;
64        let value = value.checked_mul(coef).context("overflow")?;
65        Ok(Self(value))
66    }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
70#[repr(transparent)]
71pub struct Price(pub NonZeroI64);
72
73impl Price {
74    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
75        let coef = 10i64.checked_pow(exponent).context("overflow")?;
76        let value = value.checked_mul(coef).context("overflow")?;
77        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
78        Ok(Self(value))
79    }
80
81    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
82        let value: Decimal = value.parse()?;
83        let coef = 10i64.checked_pow(exponent).context("overflow")?;
84        let coef = Decimal::from_i64(coef).context("overflow")?;
85        let value = value.checked_mul(coef).context("overflow")?;
86        if !value.is_integer() {
87            bail!("price value is more precise than available exponent");
88        }
89        let value: i64 = value.try_into().context("overflow")?;
90        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
91        Ok(Self(value))
92    }
93
94    pub fn new(value: i64) -> anyhow::Result<Self> {
95        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
96        Ok(Self(value))
97    }
98
99    pub fn into_inner(self) -> NonZeroI64 {
100        self.0
101    }
102
103    pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
104        Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
105    }
106
107    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
108        let value = (value * 10f64.powi(exponent as i32)) as i64;
109        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
110        Ok(Self(value))
111    }
112
113    pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
114        let left_value = i128::from(self.0.get());
115        let right_value = i128::from(rhs.0.get());
116
117        let value = left_value * right_value / 10i128.pow(rhs_exponent);
118        let value = value.try_into()?;
119        NonZeroI64::new(value)
120            .context("zero price is unsupported")
121            .map(Self)
122    }
123}
124
125impl Sub<i64> for Price {
126    type Output = Option<Price>;
127
128    fn sub(self, rhs: i64) -> Self::Output {
129        let value = self.0.get().saturating_sub(rhs);
130        NonZeroI64::new(value).map(Self)
131    }
132}
133
134impl Add<i64> for Price {
135    type Output = Option<Price>;
136
137    fn add(self, rhs: i64) -> Self::Output {
138        let value = self.0.get().saturating_add(rhs);
139        NonZeroI64::new(value).map(Self)
140    }
141}
142
143impl Add<Price> for Price {
144    type Output = Option<Price>;
145    fn add(self, rhs: Price) -> Self::Output {
146        let value = self.0.get().saturating_add(rhs.0.get());
147        NonZeroI64::new(value).map(Self)
148    }
149}
150
151impl Sub<Price> for Price {
152    type Output = Option<Price>;
153    fn sub(self, rhs: Price) -> Self::Output {
154        let value = self.0.get().saturating_sub(rhs.0.get());
155        NonZeroI64::new(value).map(Self)
156    }
157}
158
159impl Div<i64> for Price {
160    type Output = Option<Price>;
161    fn div(self, rhs: i64) -> Self::Output {
162        let value = self.0.get().saturating_div(rhs);
163        NonZeroI64::new(value).map(Self)
164    }
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
168#[serde(rename_all = "camelCase")]
169pub enum PriceFeedProperty {
170    Price,
171    BestBidPrice,
172    BestAskPrice,
173    PublisherCount,
174    Exponent,
175    Confidence,
176    FundingRate,
177    FundingTimestamp,
178    FundingRateInterval,
179    // More fields may be added later.
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
183#[serde(rename_all = "camelCase")]
184pub enum DeliveryFormat {
185    /// Deliver stream updates as JSON text messages.
186    #[default]
187    Json,
188    /// Deliver stream updates as binary messages.
189    Binary,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
193#[serde(rename_all = "camelCase")]
194pub enum Format {
195    Evm,
196    Solana,
197    LeEcdsa,
198    LeUnsigned,
199}
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
202#[serde(rename_all = "camelCase")]
203pub enum JsonBinaryEncoding {
204    #[default]
205    Base64,
206    Hex,
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)]
210pub enum Channel {
211    FixedRate(FixedRate),
212}
213
214impl Serialize for Channel {
215    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
216    where
217        S: serde::Serializer,
218    {
219        match self {
220            Channel::FixedRate(fixed_rate) => {
221                if *fixed_rate == FixedRate::MIN {
222                    return serializer.serialize_str("real_time");
223                }
224                serializer.serialize_str(&format!(
225                    "fixed_rate@{}ms",
226                    fixed_rate.duration().as_millis()
227                ))
228            }
229        }
230    }
231}
232
233pub mod channel_ids {
234    use super::ChannelId;
235
236    pub const FIXED_RATE_1: ChannelId = ChannelId(1);
237    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
238    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
239}
240
241impl Display for Channel {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        match self {
244            Channel::FixedRate(fixed_rate) => match *fixed_rate {
245                FixedRate::MIN => write!(f, "real_time"),
246                rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()),
247            },
248        }
249    }
250}
251
252impl Channel {
253    pub fn id(&self) -> ChannelId {
254        match self {
255            Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
256                1 => channel_ids::FIXED_RATE_1,
257                50 => channel_ids::FIXED_RATE_50,
258                200 => channel_ids::FIXED_RATE_200,
259                _ => panic!("unknown channel: {self:?}"),
260            },
261        }
262    }
263}
264
265#[test]
266fn id_supports_all_fixed_rates() {
267    for rate in FixedRate::ALL {
268        Channel::FixedRate(rate).id();
269    }
270}
271
272fn parse_channel(value: &str) -> Option<Channel> {
273    if value == "real_time" {
274        Some(Channel::FixedRate(FixedRate::MIN))
275    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
276        let ms_value = rest.strip_suffix("ms")?;
277        Some(Channel::FixedRate(FixedRate::from_millis(
278            ms_value.parse().ok()?,
279        )?))
280    } else {
281        None
282    }
283}
284
285impl<'de> Deserialize<'de> for Channel {
286    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
287    where
288        D: serde::Deserializer<'de>,
289    {
290        let value = <String>::deserialize(deserializer)?;
291        parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
292    }
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
296pub struct FixedRate {
297    rate: DurationUs,
298}
299
300impl FixedRate {
301    pub const RATE_1_MS: Self = Self {
302        rate: DurationUs::from_millis_u32(1),
303    };
304    pub const RATE_50_MS: Self = Self {
305        rate: DurationUs::from_millis_u32(50),
306    };
307    pub const RATE_200_MS: Self = Self {
308        rate: DurationUs::from_millis_u32(200),
309    };
310
311    // Assumptions (tested below):
312    // - Values are sorted.
313    // - 1 second contains a whole number of each interval.
314    // - all intervals are divisable by the smallest interval.
315    pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS];
316    pub const MIN: Self = Self::ALL[0];
317
318    pub fn from_millis(millis: u32) -> Option<Self> {
319        Self::ALL
320            .into_iter()
321            .find(|v| v.rate.as_millis() == u64::from(millis))
322    }
323
324    pub fn duration(self) -> DurationUs {
325        self.rate
326    }
327}
328
329impl TryFrom<DurationUs> for FixedRate {
330    type Error = anyhow::Error;
331
332    fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
333        Self::ALL
334            .into_iter()
335            .find(|v| v.rate == value)
336            .with_context(|| format!("unsupported rate: {value:?}"))
337    }
338}
339
340impl TryFrom<&ProtobufDuration> for FixedRate {
341    type Error = anyhow::Error;
342
343    fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
344        let duration = DurationUs::try_from(value)?;
345        Self::try_from(duration)
346    }
347}
348
349impl TryFrom<ProtobufDuration> for FixedRate {
350    type Error = anyhow::Error;
351
352    fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
353        TryFrom::<&ProtobufDuration>::try_from(&duration)
354    }
355}
356
357impl From<FixedRate> for DurationUs {
358    fn from(value: FixedRate) -> Self {
359        value.rate
360    }
361}
362
363impl From<FixedRate> for ProtobufDuration {
364    fn from(value: FixedRate) -> Self {
365        value.rate.into()
366    }
367}
368
369#[test]
370fn fixed_rate_values() {
371    assert!(
372        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
373        "values must be unique and sorted"
374    );
375    for value in FixedRate::ALL {
376        assert_eq!(
377            1_000_000 % value.duration().as_micros(),
378            0,
379            "1 s must contain whole number of intervals"
380        );
381        assert_eq!(
382            value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
383            0,
384            "the interval's borders must be a subset of the minimal interval's borders"
385        );
386    }
387}
388
389#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
390#[serde(rename_all = "camelCase")]
391pub struct SubscriptionParamsRepr {
392    pub price_feed_ids: Vec<PriceFeedId>,
393    pub properties: Vec<PriceFeedProperty>,
394    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
395    #[serde(alias = "chains")]
396    pub formats: Vec<Format>,
397    #[serde(default)]
398    pub delivery_format: DeliveryFormat,
399    #[serde(default)]
400    pub json_binary_encoding: JsonBinaryEncoding,
401    /// If `true`, the stream update will contain a `parsed` JSON field containing
402    /// all data of the update.
403    #[serde(default = "default_parsed")]
404    pub parsed: bool,
405    pub channel: Channel,
406    #[serde(default)]
407    pub ignore_invalid_feed_ids: bool,
408}
409
410#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
411#[serde(rename_all = "camelCase")]
412pub struct SubscriptionParams(SubscriptionParamsRepr);
413
414impl<'de> Deserialize<'de> for SubscriptionParams {
415    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
416    where
417        D: serde::Deserializer<'de>,
418    {
419        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
420        Self::new(value).map_err(Error::custom)
421    }
422}
423
424impl SubscriptionParams {
425    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
426        if value.price_feed_ids.is_empty() {
427            return Err("no price feed ids specified");
428        }
429        if !value.price_feed_ids.iter().all_unique() {
430            return Err("duplicate price feed ids specified");
431        }
432        if !value.formats.iter().all_unique() {
433            return Err("duplicate formats or chains specified");
434        }
435        if value.properties.is_empty() {
436            return Err("no properties specified");
437        }
438        if !value.properties.iter().all_unique() {
439            return Err("duplicate properties specified");
440        }
441        Ok(Self(value))
442    }
443}
444
445impl Deref for SubscriptionParams {
446    type Target = SubscriptionParamsRepr;
447
448    fn deref(&self) -> &Self::Target {
449        &self.0
450    }
451}
452impl DerefMut for SubscriptionParams {
453    fn deref_mut(&mut self) -> &mut Self::Target {
454        &mut self.0
455    }
456}
457
458pub fn default_parsed() -> bool {
459    true
460}
461
462#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
463#[serde(rename_all = "camelCase")]
464pub struct JsonBinaryData {
465    pub encoding: JsonBinaryEncoding,
466    pub data: String,
467}
468
469#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
470#[serde(rename_all = "camelCase")]
471pub struct JsonUpdate {
472    /// Present unless `parsed = false` is specified in subscription params.
473    #[serde(skip_serializing_if = "Option::is_none")]
474    pub parsed: Option<ParsedPayload>,
475    /// Only present if `Evm` is present in `formats` in subscription params.
476    #[serde(skip_serializing_if = "Option::is_none")]
477    pub evm: Option<JsonBinaryData>,
478    /// Only present if `Solana` is present in `formats` in subscription params.
479    #[serde(skip_serializing_if = "Option::is_none")]
480    pub solana: Option<JsonBinaryData>,
481    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
482    #[serde(skip_serializing_if = "Option::is_none")]
483    pub le_ecdsa: Option<JsonBinaryData>,
484    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
485    #[serde(skip_serializing_if = "Option::is_none")]
486    pub le_unsigned: Option<JsonBinaryData>,
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
490#[serde(rename_all = "camelCase")]
491pub struct ParsedPayload {
492    #[serde(with = "crate::serde_str::timestamp")]
493    pub timestamp_us: TimestampUs,
494    pub price_feeds: Vec<ParsedFeedPayload>,
495}
496
497#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
498#[serde(rename_all = "camelCase")]
499pub struct ParsedFeedPayload {
500    pub price_feed_id: PriceFeedId,
501    #[serde(skip_serializing_if = "Option::is_none")]
502    #[serde(with = "crate::serde_str::option_price")]
503    #[serde(default)]
504    pub price: Option<Price>,
505    #[serde(skip_serializing_if = "Option::is_none")]
506    #[serde(with = "crate::serde_str::option_price")]
507    #[serde(default)]
508    pub best_bid_price: Option<Price>,
509    #[serde(skip_serializing_if = "Option::is_none")]
510    #[serde(with = "crate::serde_str::option_price")]
511    #[serde(default)]
512    pub best_ask_price: Option<Price>,
513    #[serde(skip_serializing_if = "Option::is_none")]
514    #[serde(default)]
515    pub publisher_count: Option<u16>,
516    #[serde(skip_serializing_if = "Option::is_none")]
517    #[serde(default)]
518    pub exponent: Option<i16>,
519    #[serde(skip_serializing_if = "Option::is_none")]
520    #[serde(default)]
521    pub confidence: Option<Price>,
522    #[serde(skip_serializing_if = "Option::is_none")]
523    #[serde(default)]
524    pub funding_rate: Option<Rate>,
525    #[serde(skip_serializing_if = "Option::is_none")]
526    #[serde(default)]
527    pub funding_timestamp: Option<TimestampUs>,
528    // More fields may be added later.
529    #[serde(skip_serializing_if = "Option::is_none")]
530    #[serde(default)]
531    pub funding_rate_interval: Option<DurationUs>,
532}
533
534impl ParsedFeedPayload {
535    pub fn new(
536        price_feed_id: PriceFeedId,
537        exponent: Option<i16>,
538        data: &AggregatedPriceFeedData,
539        properties: &[PriceFeedProperty],
540    ) -> Self {
541        let mut output = Self {
542            price_feed_id,
543            price: None,
544            best_bid_price: None,
545            best_ask_price: None,
546            publisher_count: None,
547            exponent: None,
548            confidence: None,
549            funding_rate: None,
550            funding_timestamp: None,
551            funding_rate_interval: None,
552        };
553        for &property in properties {
554            match property {
555                PriceFeedProperty::Price => {
556                    output.price = data.price;
557                }
558                PriceFeedProperty::BestBidPrice => {
559                    output.best_bid_price = data.best_bid_price;
560                }
561                PriceFeedProperty::BestAskPrice => {
562                    output.best_ask_price = data.best_ask_price;
563                }
564                PriceFeedProperty::PublisherCount => {
565                    output.publisher_count = Some(data.publisher_count);
566                }
567                PriceFeedProperty::Exponent => {
568                    output.exponent = exponent;
569                }
570                PriceFeedProperty::Confidence => {
571                    output.confidence = data.confidence;
572                }
573                PriceFeedProperty::FundingRate => {
574                    output.funding_rate = data.funding_rate;
575                }
576                PriceFeedProperty::FundingTimestamp => {
577                    output.funding_timestamp = data.funding_timestamp;
578                }
579                PriceFeedProperty::FundingRateInterval => {
580                    output.funding_rate_interval = data.funding_rate_interval;
581                }
582            }
583        }
584        output
585    }
586
587    pub fn new_full(
588        price_feed_id: PriceFeedId,
589        exponent: Option<i16>,
590        data: &AggregatedPriceFeedData,
591    ) -> Self {
592        Self {
593            price_feed_id,
594            price: data.price,
595            best_bid_price: data.best_bid_price,
596            best_ask_price: data.best_ask_price,
597            publisher_count: Some(data.publisher_count),
598            exponent,
599            confidence: data.confidence,
600            funding_rate: data.funding_rate,
601            funding_timestamp: data.funding_timestamp,
602            funding_rate_interval: data.funding_rate_interval,
603        }
604    }
605}