pyth_lazer_protocol/
router.rs

1//! WebSocket JSON protocol types for API the router provides to consumers and publishers.
2
3use {
4    crate::payload::AggregatedPriceFeedData,
5    anyhow::{bail, Context},
6    itertools::Itertools,
7    rust_decimal::{prelude::FromPrimitive, Decimal},
8    serde::{de::Error, Deserialize, Serialize},
9    std::{
10        fmt::Display,
11        num::NonZeroI64,
12        ops::{Add, Deref, DerefMut, Div, Sub},
13        time::{SystemTime, UNIX_EPOCH},
14    },
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct PublisherId(pub u16);
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct PriceFeedId(pub u32);
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
24pub struct ChannelId(pub u8);
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
27pub struct TimestampUs(pub u64);
28
29impl TimestampUs {
30    pub fn now() -> Self {
31        let value = SystemTime::now()
32            .duration_since(UNIX_EPOCH)
33            .expect("invalid system time")
34            .as_micros()
35            .try_into()
36            .expect("invalid system time");
37        Self(value)
38    }
39
40    pub fn saturating_us_since(self, other: Self) -> u64 {
41        self.0.saturating_sub(other.0)
42    }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
46#[repr(transparent)]
47pub struct Rate(pub i64);
48
49impl Rate {
50    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
51        let value: Decimal = value.parse()?;
52        let coef = 10i64.checked_pow(exponent).context("overflow")?;
53        let coef = Decimal::from_i64(coef).context("overflow")?;
54        let value = value.checked_mul(coef).context("overflow")?;
55        if !value.is_integer() {
56            bail!("price value is more precise than available exponent");
57        }
58        let value: i64 = value.try_into().context("overflow")?;
59        Ok(Self(value))
60    }
61
62    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
63        let value = Decimal::from_f64(value).context("overflow")?;
64        let coef = 10i64.checked_pow(exponent).context("overflow")?;
65        let coef = Decimal::from_i64(coef).context("overflow")?;
66        let value = value.checked_mul(coef).context("overflow")?;
67        let value: i64 = value.try_into().context("overflow")?;
68        Ok(Self(value))
69    }
70
71    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
72        let coef = 10i64.checked_pow(exponent).context("overflow")?;
73        let value = value.checked_mul(coef).context("overflow")?;
74        Ok(Self(value))
75    }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
79#[repr(transparent)]
80pub struct Price(pub NonZeroI64);
81
82impl Price {
83    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
84        let coef = 10i64.checked_pow(exponent).context("overflow")?;
85        let value = value.checked_mul(coef).context("overflow")?;
86        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
87        Ok(Self(value))
88    }
89
90    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
91        let value: Decimal = value.parse()?;
92        let coef = 10i64.checked_pow(exponent).context("overflow")?;
93        let coef = Decimal::from_i64(coef).context("overflow")?;
94        let value = value.checked_mul(coef).context("overflow")?;
95        if !value.is_integer() {
96            bail!("price value is more precise than available exponent");
97        }
98        let value: i64 = value.try_into().context("overflow")?;
99        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
100        Ok(Self(value))
101    }
102
103    pub fn new(value: i64) -> anyhow::Result<Self> {
104        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
105        Ok(Self(value))
106    }
107
108    pub fn into_inner(self) -> NonZeroI64 {
109        self.0
110    }
111
112    pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
113        Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
114    }
115
116    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
117        let value = (value * 10f64.powi(exponent as i32)) as i64;
118        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
119        Ok(Self(value))
120    }
121
122    pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
123        let left_value = i128::from(self.0.get());
124        let right_value = i128::from(rhs.0.get());
125
126        let value = left_value * right_value / 10i128.pow(rhs_exponent);
127        let value = value.try_into()?;
128        NonZeroI64::new(value)
129            .context("zero price is unsupported")
130            .map(Self)
131    }
132}
133
134impl Sub<i64> for Price {
135    type Output = Option<Price>;
136
137    fn sub(self, rhs: i64) -> Self::Output {
138        let value = self.0.get().saturating_sub(rhs);
139        NonZeroI64::new(value).map(Self)
140    }
141}
142
143impl Add<i64> for Price {
144    type Output = Option<Price>;
145
146    fn add(self, rhs: i64) -> Self::Output {
147        let value = self.0.get().saturating_add(rhs);
148        NonZeroI64::new(value).map(Self)
149    }
150}
151
152impl Add<Price> for Price {
153    type Output = Option<Price>;
154    fn add(self, rhs: Price) -> Self::Output {
155        let value = self.0.get().saturating_add(rhs.0.get());
156        NonZeroI64::new(value).map(Self)
157    }
158}
159
160impl Sub<Price> for Price {
161    type Output = Option<Price>;
162    fn sub(self, rhs: Price) -> Self::Output {
163        let value = self.0.get().saturating_sub(rhs.0.get());
164        NonZeroI64::new(value).map(Self)
165    }
166}
167
168impl Div<i64> for Price {
169    type Output = Option<Price>;
170    fn div(self, rhs: i64) -> Self::Output {
171        let value = self.0.get().saturating_div(rhs);
172        NonZeroI64::new(value).map(Self)
173    }
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
177#[serde(rename_all = "camelCase")]
178pub enum PriceFeedProperty {
179    Price,
180    BestBidPrice,
181    BestAskPrice,
182    PublisherCount,
183    Exponent,
184    Confidence,
185    FundingRate,
186    FundingTimestamp,
187    // More fields may be added later.
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
191#[serde(rename_all = "camelCase")]
192pub enum DeliveryFormat {
193    /// Deliver stream updates as JSON text messages.
194    #[default]
195    Json,
196    /// Deliver stream updates as binary messages.
197    Binary,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
201#[serde(rename_all = "camelCase")]
202pub enum Format {
203    Evm,
204    Solana,
205    LeEcdsa,
206    LeUnsigned,
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
210#[serde(rename_all = "camelCase")]
211pub enum JsonBinaryEncoding {
212    #[default]
213    Base64,
214    Hex,
215}
216
217#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
218pub enum Channel {
219    FixedRate(FixedRate),
220}
221
222impl Serialize for Channel {
223    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
224    where
225        S: serde::Serializer,
226    {
227        match self {
228            Channel::FixedRate(fixed_rate) => {
229                if *fixed_rate == FixedRate::MIN {
230                    return serializer.serialize_str("real_time");
231                }
232                serializer.serialize_str(&format!("fixed_rate@{}ms", fixed_rate.value_ms()))
233            }
234        }
235    }
236}
237
238mod channel_ids {
239    use super::ChannelId;
240
241    pub const FIXED_RATE_1: ChannelId = ChannelId(1);
242    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
243    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
244}
245
246impl Display for Channel {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        match self {
249            Channel::FixedRate(fixed_rate) => match *fixed_rate {
250                FixedRate::MIN => write!(f, "real_time"),
251                rate => write!(f, "fixed_rate@{}ms", rate.value_ms()),
252            },
253        }
254    }
255}
256
257impl Channel {
258    pub fn id(&self) -> ChannelId {
259        match self {
260            Channel::FixedRate(fixed_rate) => match fixed_rate.value_ms() {
261                1 => channel_ids::FIXED_RATE_1,
262                50 => channel_ids::FIXED_RATE_50,
263                200 => channel_ids::FIXED_RATE_200,
264                _ => panic!("unknown channel: {self:?}"),
265            },
266        }
267    }
268}
269
270#[test]
271fn id_supports_all_fixed_rates() {
272    for rate in FixedRate::ALL {
273        Channel::FixedRate(rate).id();
274    }
275}
276
277fn parse_channel(value: &str) -> Option<Channel> {
278    if value == "real_time" {
279        Some(Channel::FixedRate(FixedRate::MIN))
280    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
281        let ms_value = rest.strip_suffix("ms")?;
282        Some(Channel::FixedRate(FixedRate::from_ms(
283            ms_value.parse().ok()?,
284        )?))
285    } else {
286        None
287    }
288}
289
290impl<'de> Deserialize<'de> for Channel {
291    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
292    where
293        D: serde::Deserializer<'de>,
294    {
295        let value = <String>::deserialize(deserializer)?;
296        parse_channel(&value).ok_or_else(|| D::Error::custom("unknown channel"))
297    }
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
301pub struct FixedRate {
302    ms: u32,
303}
304
305impl FixedRate {
306    // Assumptions (tested below):
307    // - Values are sorted.
308    // - 1 second contains a whole number of each interval.
309    // - all intervals are divisable by the smallest interval.
310    pub const ALL: [Self; 3] = [Self { ms: 1 }, Self { ms: 50 }, Self { ms: 200 }];
311    pub const MIN: Self = Self::ALL[0];
312
313    pub fn from_ms(value: u32) -> Option<Self> {
314        Self::ALL.into_iter().find(|v| v.ms == value)
315    }
316
317    pub fn value_ms(self) -> u32 {
318        self.ms
319    }
320
321    pub fn value_us(self) -> u64 {
322        (self.ms * 1000).into()
323    }
324}
325
326#[test]
327fn fixed_rate_values() {
328    assert!(
329        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
330        "values must be unique and sorted"
331    );
332    for value in FixedRate::ALL {
333        assert!(
334            1000 % value.ms == 0,
335            "1 s must contain whole number of intervals"
336        );
337        assert!(
338            value.value_us() % FixedRate::MIN.value_us() == 0,
339            "the interval's borders must be a subset of the minimal interval's borders"
340        );
341    }
342}
343
344#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
345#[serde(rename_all = "camelCase")]
346pub struct SubscriptionParamsRepr {
347    pub price_feed_ids: Vec<PriceFeedId>,
348    pub properties: Vec<PriceFeedProperty>,
349    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
350    #[serde(alias = "chains")]
351    pub formats: Vec<Format>,
352    #[serde(default)]
353    pub delivery_format: DeliveryFormat,
354    #[serde(default)]
355    pub json_binary_encoding: JsonBinaryEncoding,
356    /// If `true`, the stream update will contain a `parsed` JSON field containing
357    /// all data of the update.
358    #[serde(default = "default_parsed")]
359    pub parsed: bool,
360    pub channel: Channel,
361}
362
363#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
364#[serde(rename_all = "camelCase")]
365pub struct SubscriptionParams(SubscriptionParamsRepr);
366
367impl<'de> Deserialize<'de> for SubscriptionParams {
368    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
369    where
370        D: serde::Deserializer<'de>,
371    {
372        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
373        Self::new(value).map_err(D::Error::custom)
374    }
375}
376
377impl SubscriptionParams {
378    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
379        if value.price_feed_ids.is_empty() {
380            return Err("no price feed ids specified");
381        }
382        if !value.price_feed_ids.iter().all_unique() {
383            return Err("duplicate price feed ids specified");
384        }
385        if !value.formats.iter().all_unique() {
386            return Err("duplicate formats or chains specified");
387        }
388        if value.properties.is_empty() {
389            return Err("no properties specified");
390        }
391        if !value.properties.iter().all_unique() {
392            return Err("duplicate properties specified");
393        }
394        Ok(Self(value))
395    }
396}
397
398impl Deref for SubscriptionParams {
399    type Target = SubscriptionParamsRepr;
400
401    fn deref(&self) -> &Self::Target {
402        &self.0
403    }
404}
405impl DerefMut for SubscriptionParams {
406    fn deref_mut(&mut self) -> &mut Self::Target {
407        &mut self.0
408    }
409}
410
411pub fn default_parsed() -> bool {
412    true
413}
414
415#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
416#[serde(rename_all = "camelCase")]
417pub struct JsonBinaryData {
418    pub encoding: JsonBinaryEncoding,
419    pub data: String,
420}
421
422#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
423#[serde(rename_all = "camelCase")]
424pub struct JsonUpdate {
425    /// Present unless `parsed = false` is specified in subscription params.
426    #[serde(skip_serializing_if = "Option::is_none")]
427    pub parsed: Option<ParsedPayload>,
428    /// Only present if `Evm` is present in `formats` in subscription params.
429    #[serde(skip_serializing_if = "Option::is_none")]
430    pub evm: Option<JsonBinaryData>,
431    /// Only present if `Solana` is present in `formats` in subscription params.
432    #[serde(skip_serializing_if = "Option::is_none")]
433    pub solana: Option<JsonBinaryData>,
434    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
435    #[serde(skip_serializing_if = "Option::is_none")]
436    pub le_ecdsa: Option<JsonBinaryData>,
437    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
438    #[serde(skip_serializing_if = "Option::is_none")]
439    pub le_unsigned: Option<JsonBinaryData>,
440}
441
442#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
443#[serde(rename_all = "camelCase")]
444pub struct ParsedPayload {
445    #[serde(with = "crate::serde_str::timestamp")]
446    pub timestamp_us: TimestampUs,
447    pub price_feeds: Vec<ParsedFeedPayload>,
448}
449
450#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
451#[serde(rename_all = "camelCase")]
452pub struct ParsedFeedPayload {
453    pub price_feed_id: PriceFeedId,
454    #[serde(skip_serializing_if = "Option::is_none")]
455    #[serde(with = "crate::serde_str::option_price")]
456    #[serde(default)]
457    pub price: Option<Price>,
458    #[serde(skip_serializing_if = "Option::is_none")]
459    #[serde(with = "crate::serde_str::option_price")]
460    #[serde(default)]
461    pub best_bid_price: Option<Price>,
462    #[serde(skip_serializing_if = "Option::is_none")]
463    #[serde(with = "crate::serde_str::option_price")]
464    #[serde(default)]
465    pub best_ask_price: Option<Price>,
466    #[serde(skip_serializing_if = "Option::is_none")]
467    #[serde(default)]
468    pub publisher_count: Option<u16>,
469    #[serde(skip_serializing_if = "Option::is_none")]
470    #[serde(default)]
471    pub exponent: Option<i16>,
472    #[serde(skip_serializing_if = "Option::is_none")]
473    #[serde(default)]
474    pub confidence: Option<Price>,
475    #[serde(skip_serializing_if = "Option::is_none")]
476    #[serde(default)]
477    pub funding_rate: Option<Rate>,
478    #[serde(skip_serializing_if = "Option::is_none")]
479    #[serde(default)]
480    pub funding_timestamp: Option<TimestampUs>,
481    // More fields may be added later.
482}
483
484impl ParsedFeedPayload {
485    pub fn new(
486        price_feed_id: PriceFeedId,
487        exponent: Option<i16>,
488        data: &AggregatedPriceFeedData,
489        properties: &[PriceFeedProperty],
490    ) -> Self {
491        let mut output = Self {
492            price_feed_id,
493            price: None,
494            best_bid_price: None,
495            best_ask_price: None,
496            publisher_count: None,
497            exponent: None,
498            confidence: None,
499            funding_rate: None,
500            funding_timestamp: None,
501        };
502        for &property in properties {
503            match property {
504                PriceFeedProperty::Price => {
505                    output.price = data.price;
506                }
507                PriceFeedProperty::BestBidPrice => {
508                    output.best_bid_price = data.best_bid_price;
509                }
510                PriceFeedProperty::BestAskPrice => {
511                    output.best_ask_price = data.best_ask_price;
512                }
513                PriceFeedProperty::PublisherCount => {
514                    output.publisher_count = Some(data.publisher_count);
515                }
516                PriceFeedProperty::Exponent => {
517                    output.exponent = exponent;
518                }
519                PriceFeedProperty::Confidence => {
520                    output.confidence = data.confidence;
521                }
522                PriceFeedProperty::FundingRate => {
523                    output.funding_rate = data.funding_rate;
524                }
525                PriceFeedProperty::FundingTimestamp => {
526                    output.funding_timestamp = data.funding_timestamp;
527                }
528            }
529        }
530        output
531    }
532
533    pub fn new_full(
534        price_feed_id: PriceFeedId,
535        exponent: Option<i16>,
536        data: &AggregatedPriceFeedData,
537    ) -> Self {
538        Self {
539            price_feed_id,
540            price: data.price,
541            best_bid_price: data.best_bid_price,
542            best_ask_price: data.best_ask_price,
543            publisher_count: Some(data.publisher_count),
544            exponent,
545            confidence: data.confidence,
546            funding_rate: data.funding_rate,
547            funding_timestamp: data.funding_timestamp,
548        }
549    }
550}