pyth_lazer_protocol/
router.rs

1//! WebSocket JSON protocol types for the API the router provides to consumers and publishers.
2
3use {
4    crate::{
5        payload::AggregatedPriceFeedData,
6        time::{DurationUs, TimestampUs},
7    },
8    anyhow::{bail, Context},
9    derive_more::derive::From,
10    itertools::Itertools,
11    protobuf::well_known_types::duration::Duration as ProtobufDuration,
12    rust_decimal::{prelude::FromPrimitive, Decimal},
13    serde::{de::Error, Deserialize, Serialize},
14    std::{
15        fmt::Display,
16        num::NonZeroI64,
17        ops::{Add, Deref, DerefMut, Div, Sub},
18    },
19};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
22pub struct PublisherId(pub u16);
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
25pub struct PriceFeedId(pub u32);
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
28pub struct ChannelId(pub u8);
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
31#[repr(transparent)]
32pub struct Rate(pub i64);
33
34impl Rate {
35    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
36        let value: Decimal = value.parse()?;
37        let coef = 10i64.checked_pow(exponent).context("overflow")?;
38        let coef = Decimal::from_i64(coef).context("overflow")?;
39        let value = value.checked_mul(coef).context("overflow")?;
40        if !value.is_integer() {
41            bail!("price value is more precise than available exponent");
42        }
43        let value: i64 = value.try_into().context("overflow")?;
44        Ok(Self(value))
45    }
46
47    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
48        let value = Decimal::from_f64(value).context("overflow")?;
49        let coef = 10i64.checked_pow(exponent).context("overflow")?;
50        let coef = Decimal::from_i64(coef).context("overflow")?;
51        let value = value.checked_mul(coef).context("overflow")?;
52        let value: i64 = value.try_into().context("overflow")?;
53        Ok(Self(value))
54    }
55
56    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
57        let coef = 10i64.checked_pow(exponent).context("overflow")?;
58        let value = value.checked_mul(coef).context("overflow")?;
59        Ok(Self(value))
60    }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
64#[repr(transparent)]
65pub struct Price(pub NonZeroI64);
66
67impl Price {
68    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
69        let coef = 10i64.checked_pow(exponent).context("overflow")?;
70        let value = value.checked_mul(coef).context("overflow")?;
71        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
72        Ok(Self(value))
73    }
74
75    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
76        let value: Decimal = value.parse()?;
77        let coef = 10i64.checked_pow(exponent).context("overflow")?;
78        let coef = Decimal::from_i64(coef).context("overflow")?;
79        let value = value.checked_mul(coef).context("overflow")?;
80        if !value.is_integer() {
81            bail!("price value is more precise than available exponent");
82        }
83        let value: i64 = value.try_into().context("overflow")?;
84        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
85        Ok(Self(value))
86    }
87
88    pub fn new(value: i64) -> anyhow::Result<Self> {
89        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
90        Ok(Self(value))
91    }
92
93    pub fn into_inner(self) -> NonZeroI64 {
94        self.0
95    }
96
97    pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
98        Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
99    }
100
101    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
102        let value = (value * 10f64.powi(exponent as i32)) as i64;
103        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
104        Ok(Self(value))
105    }
106
107    pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
108        let left_value = i128::from(self.0.get());
109        let right_value = i128::from(rhs.0.get());
110
111        let value = left_value * right_value / 10i128.pow(rhs_exponent);
112        let value = value.try_into()?;
113        NonZeroI64::new(value)
114            .context("zero price is unsupported")
115            .map(Self)
116    }
117}
118
119impl Sub<i64> for Price {
120    type Output = Option<Price>;
121
122    fn sub(self, rhs: i64) -> Self::Output {
123        let value = self.0.get().saturating_sub(rhs);
124        NonZeroI64::new(value).map(Self)
125    }
126}
127
128impl Add<i64> for Price {
129    type Output = Option<Price>;
130
131    fn add(self, rhs: i64) -> Self::Output {
132        let value = self.0.get().saturating_add(rhs);
133        NonZeroI64::new(value).map(Self)
134    }
135}
136
137impl Add<Price> for Price {
138    type Output = Option<Price>;
139    fn add(self, rhs: Price) -> Self::Output {
140        let value = self.0.get().saturating_add(rhs.0.get());
141        NonZeroI64::new(value).map(Self)
142    }
143}
144
145impl Sub<Price> for Price {
146    type Output = Option<Price>;
147    fn sub(self, rhs: Price) -> Self::Output {
148        let value = self.0.get().saturating_sub(rhs.0.get());
149        NonZeroI64::new(value).map(Self)
150    }
151}
152
153impl Div<i64> for Price {
154    type Output = Option<Price>;
155    fn div(self, rhs: i64) -> Self::Output {
156        let value = self.0.get().saturating_div(rhs);
157        NonZeroI64::new(value).map(Self)
158    }
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")]
163pub enum PriceFeedProperty {
164    Price,
165    BestBidPrice,
166    BestAskPrice,
167    PublisherCount,
168    Exponent,
169    Confidence,
170    FundingRate,
171    FundingTimestamp,
172    // More fields may be added later.
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
176#[serde(rename_all = "camelCase")]
177pub enum DeliveryFormat {
178    /// Deliver stream updates as JSON text messages.
179    #[default]
180    Json,
181    /// Deliver stream updates as binary messages.
182    Binary,
183}
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
186#[serde(rename_all = "camelCase")]
187pub enum Format {
188    Evm,
189    Solana,
190    LeEcdsa,
191    LeUnsigned,
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
195#[serde(rename_all = "camelCase")]
196pub enum JsonBinaryEncoding {
197    #[default]
198    Base64,
199    Hex,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)]
203pub enum Channel {
204    FixedRate(FixedRate),
205}
206
207impl Serialize for Channel {
208    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
209    where
210        S: serde::Serializer,
211    {
212        match self {
213            Channel::FixedRate(fixed_rate) => {
214                if *fixed_rate == FixedRate::MIN {
215                    return serializer.serialize_str("real_time");
216                }
217                serializer.serialize_str(&format!(
218                    "fixed_rate@{}ms",
219                    fixed_rate.duration().as_millis()
220                ))
221            }
222        }
223    }
224}
225
226pub mod channel_ids {
227    use super::ChannelId;
228
229    pub const FIXED_RATE_1: ChannelId = ChannelId(1);
230    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
231    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
232}
233
234impl Display for Channel {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        match self {
237            Channel::FixedRate(fixed_rate) => match *fixed_rate {
238                FixedRate::MIN => write!(f, "real_time"),
239                rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()),
240            },
241        }
242    }
243}
244
245impl Channel {
246    pub fn id(&self) -> ChannelId {
247        match self {
248            Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
249                1 => channel_ids::FIXED_RATE_1,
250                50 => channel_ids::FIXED_RATE_50,
251                200 => channel_ids::FIXED_RATE_200,
252                _ => panic!("unknown channel: {self:?}"),
253            },
254        }
255    }
256}
257
258#[test]
259fn id_supports_all_fixed_rates() {
260    for rate in FixedRate::ALL {
261        Channel::FixedRate(rate).id();
262    }
263}
264
265fn parse_channel(value: &str) -> Option<Channel> {
266    if value == "real_time" {
267        Some(Channel::FixedRate(FixedRate::MIN))
268    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
269        let ms_value = rest.strip_suffix("ms")?;
270        Some(Channel::FixedRate(FixedRate::from_millis(
271            ms_value.parse().ok()?,
272        )?))
273    } else {
274        None
275    }
276}
277
278impl<'de> Deserialize<'de> for Channel {
279    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
280    where
281        D: serde::Deserializer<'de>,
282    {
283        let value = <String>::deserialize(deserializer)?;
284        parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
285    }
286}
287
288#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
289pub struct FixedRate {
290    rate: DurationUs,
291}
292
293impl FixedRate {
294    pub const RATE_1_MS: Self = Self {
295        rate: DurationUs::from_millis_u32(1),
296    };
297    pub const RATE_50_MS: Self = Self {
298        rate: DurationUs::from_millis_u32(50),
299    };
300    pub const RATE_200_MS: Self = Self {
301        rate: DurationUs::from_millis_u32(200),
302    };
303
304    // Assumptions (tested below):
305    // - Values are sorted.
306    // - 1 second contains a whole number of each interval.
307    // - all intervals are divisable by the smallest interval.
308    pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS];
309    pub const MIN: Self = Self::ALL[0];
310
311    pub fn from_millis(millis: u32) -> Option<Self> {
312        Self::ALL
313            .into_iter()
314            .find(|v| v.rate.as_millis() == u64::from(millis))
315    }
316
317    pub fn duration(self) -> DurationUs {
318        self.rate
319    }
320}
321
322impl TryFrom<DurationUs> for FixedRate {
323    type Error = anyhow::Error;
324
325    fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
326        Self::ALL
327            .into_iter()
328            .find(|v| v.rate == value)
329            .with_context(|| format!("unsupported rate: {value:?}"))
330    }
331}
332
333impl TryFrom<&ProtobufDuration> for FixedRate {
334    type Error = anyhow::Error;
335
336    fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
337        let duration = DurationUs::try_from(value)?;
338        Self::try_from(duration)
339    }
340}
341
342impl TryFrom<ProtobufDuration> for FixedRate {
343    type Error = anyhow::Error;
344
345    fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
346        TryFrom::<&ProtobufDuration>::try_from(&duration)
347    }
348}
349
350impl From<FixedRate> for DurationUs {
351    fn from(value: FixedRate) -> Self {
352        value.rate
353    }
354}
355
356impl From<FixedRate> for ProtobufDuration {
357    fn from(value: FixedRate) -> Self {
358        value.rate.into()
359    }
360}
361
362#[test]
363fn fixed_rate_values() {
364    assert!(
365        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
366        "values must be unique and sorted"
367    );
368    for value in FixedRate::ALL {
369        assert_eq!(
370            1_000_000 % value.duration().as_micros(),
371            0,
372            "1 s must contain whole number of intervals"
373        );
374        assert_eq!(
375            value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
376            0,
377            "the interval's borders must be a subset of the minimal interval's borders"
378        );
379    }
380}
381
382#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
383#[serde(rename_all = "camelCase")]
384pub struct SubscriptionParamsRepr {
385    pub price_feed_ids: Vec<PriceFeedId>,
386    pub properties: Vec<PriceFeedProperty>,
387    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
388    #[serde(alias = "chains")]
389    pub formats: Vec<Format>,
390    #[serde(default)]
391    pub delivery_format: DeliveryFormat,
392    #[serde(default)]
393    pub json_binary_encoding: JsonBinaryEncoding,
394    /// If `true`, the stream update will contain a `parsed` JSON field containing
395    /// all data of the update.
396    #[serde(default = "default_parsed")]
397    pub parsed: bool,
398    pub channel: Channel,
399    #[serde(default)]
400    pub ignore_invalid_feed_ids: bool,
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
404#[serde(rename_all = "camelCase")]
405pub struct SubscriptionParams(SubscriptionParamsRepr);
406
407impl<'de> Deserialize<'de> for SubscriptionParams {
408    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
409    where
410        D: serde::Deserializer<'de>,
411    {
412        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
413        Self::new(value).map_err(Error::custom)
414    }
415}
416
417impl SubscriptionParams {
418    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
419        if value.price_feed_ids.is_empty() {
420            return Err("no price feed ids specified");
421        }
422        if !value.price_feed_ids.iter().all_unique() {
423            return Err("duplicate price feed ids specified");
424        }
425        if !value.formats.iter().all_unique() {
426            return Err("duplicate formats or chains specified");
427        }
428        if value.properties.is_empty() {
429            return Err("no properties specified");
430        }
431        if !value.properties.iter().all_unique() {
432            return Err("duplicate properties specified");
433        }
434        Ok(Self(value))
435    }
436}
437
438impl Deref for SubscriptionParams {
439    type Target = SubscriptionParamsRepr;
440
441    fn deref(&self) -> &Self::Target {
442        &self.0
443    }
444}
445impl DerefMut for SubscriptionParams {
446    fn deref_mut(&mut self) -> &mut Self::Target {
447        &mut self.0
448    }
449}
450
451pub fn default_parsed() -> bool {
452    true
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
456#[serde(rename_all = "camelCase")]
457pub struct JsonBinaryData {
458    pub encoding: JsonBinaryEncoding,
459    pub data: String,
460}
461
462#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
463#[serde(rename_all = "camelCase")]
464pub struct JsonUpdate {
465    /// Present unless `parsed = false` is specified in subscription params.
466    #[serde(skip_serializing_if = "Option::is_none")]
467    pub parsed: Option<ParsedPayload>,
468    /// Only present if `Evm` is present in `formats` in subscription params.
469    #[serde(skip_serializing_if = "Option::is_none")]
470    pub evm: Option<JsonBinaryData>,
471    /// Only present if `Solana` is present in `formats` in subscription params.
472    #[serde(skip_serializing_if = "Option::is_none")]
473    pub solana: Option<JsonBinaryData>,
474    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
475    #[serde(skip_serializing_if = "Option::is_none")]
476    pub le_ecdsa: Option<JsonBinaryData>,
477    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
478    #[serde(skip_serializing_if = "Option::is_none")]
479    pub le_unsigned: Option<JsonBinaryData>,
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
483#[serde(rename_all = "camelCase")]
484pub struct ParsedPayload {
485    #[serde(with = "crate::serde_str::timestamp")]
486    pub timestamp_us: TimestampUs,
487    pub price_feeds: Vec<ParsedFeedPayload>,
488}
489
490#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
491#[serde(rename_all = "camelCase")]
492pub struct ParsedFeedPayload {
493    pub price_feed_id: PriceFeedId,
494    #[serde(skip_serializing_if = "Option::is_none")]
495    #[serde(with = "crate::serde_str::option_price")]
496    #[serde(default)]
497    pub price: Option<Price>,
498    #[serde(skip_serializing_if = "Option::is_none")]
499    #[serde(with = "crate::serde_str::option_price")]
500    #[serde(default)]
501    pub best_bid_price: Option<Price>,
502    #[serde(skip_serializing_if = "Option::is_none")]
503    #[serde(with = "crate::serde_str::option_price")]
504    #[serde(default)]
505    pub best_ask_price: Option<Price>,
506    #[serde(skip_serializing_if = "Option::is_none")]
507    #[serde(default)]
508    pub publisher_count: Option<u16>,
509    #[serde(skip_serializing_if = "Option::is_none")]
510    #[serde(default)]
511    pub exponent: Option<i16>,
512    #[serde(skip_serializing_if = "Option::is_none")]
513    #[serde(default)]
514    pub confidence: Option<Price>,
515    #[serde(skip_serializing_if = "Option::is_none")]
516    #[serde(default)]
517    pub funding_rate: Option<Rate>,
518    #[serde(skip_serializing_if = "Option::is_none")]
519    #[serde(default)]
520    pub funding_timestamp: Option<TimestampUs>,
521    // More fields may be added later.
522}
523
524impl ParsedFeedPayload {
525    pub fn new(
526        price_feed_id: PriceFeedId,
527        exponent: Option<i16>,
528        data: &AggregatedPriceFeedData,
529        properties: &[PriceFeedProperty],
530    ) -> Self {
531        let mut output = Self {
532            price_feed_id,
533            price: None,
534            best_bid_price: None,
535            best_ask_price: None,
536            publisher_count: None,
537            exponent: None,
538            confidence: None,
539            funding_rate: None,
540            funding_timestamp: None,
541        };
542        for &property in properties {
543            match property {
544                PriceFeedProperty::Price => {
545                    output.price = data.price;
546                }
547                PriceFeedProperty::BestBidPrice => {
548                    output.best_bid_price = data.best_bid_price;
549                }
550                PriceFeedProperty::BestAskPrice => {
551                    output.best_ask_price = data.best_ask_price;
552                }
553                PriceFeedProperty::PublisherCount => {
554                    output.publisher_count = Some(data.publisher_count);
555                }
556                PriceFeedProperty::Exponent => {
557                    output.exponent = exponent;
558                }
559                PriceFeedProperty::Confidence => {
560                    output.confidence = data.confidence;
561                }
562                PriceFeedProperty::FundingRate => {
563                    output.funding_rate = data.funding_rate;
564                }
565                PriceFeedProperty::FundingTimestamp => {
566                    output.funding_timestamp = data.funding_timestamp;
567                }
568            }
569        }
570        output
571    }
572
573    pub fn new_full(
574        price_feed_id: PriceFeedId,
575        exponent: Option<i16>,
576        data: &AggregatedPriceFeedData,
577    ) -> Self {
578        Self {
579            price_feed_id,
580            price: data.price,
581            best_bid_price: data.best_bid_price,
582            best_ask_price: data.best_ask_price,
583            publisher_count: Some(data.publisher_count),
584            exponent,
585            confidence: data.confidence,
586            funding_rate: data.funding_rate,
587            funding_timestamp: data.funding_timestamp,
588        }
589    }
590}