pyth_lazer_protocol/
router.rs

1//! WebSocket JSON protocol types for the API the router provides to consumers and publishers.
2
3use protobuf::MessageField;
4use {
5    crate::payload::AggregatedPriceFeedData,
6    anyhow::{bail, Context},
7    itertools::Itertools,
8    protobuf::well_known_types::timestamp::Timestamp,
9    rust_decimal::{prelude::FromPrimitive, Decimal},
10    serde::{de::Error, Deserialize, Serialize},
11    std::{
12        fmt::Display,
13        num::NonZeroI64,
14        ops::{Add, Deref, DerefMut, Div, Sub},
15        time::{SystemTime, UNIX_EPOCH},
16    },
17};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
20pub struct PublisherId(pub u16);
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
23pub struct PriceFeedId(pub u32);
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
26pub struct ChannelId(pub u8);
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
29pub struct TimestampUs(pub u64);
30
31impl TryFrom<&Timestamp> for TimestampUs {
32    type Error = anyhow::Error;
33
34    fn try_from(timestamp: &Timestamp) -> anyhow::Result<Self> {
35        let seconds_in_micros: u64 = (timestamp.seconds * 1_000_000).try_into()?;
36        let nanos_in_micros: u64 = (timestamp.nanos / 1_000).try_into()?;
37        Ok(TimestampUs(seconds_in_micros + nanos_in_micros))
38    }
39}
40
41impl From<TimestampUs> for Timestamp {
42    fn from(value: TimestampUs) -> Self {
43        Timestamp {
44            #[allow(
45                clippy::cast_possible_wrap,
46                reason = "u64 to i64 after this division can never overflow because the value cannot be too big"
47            )]
48            seconds: (value.0 / 1_000_000) as i64,
49            nanos: (value.0 % 1_000_000) as i32 * 1000,
50            special_fields: Default::default(),
51        }
52    }
53}
54
55impl From<TimestampUs> for MessageField<Timestamp> {
56    fn from(value: TimestampUs) -> Self {
57        MessageField::some(value.into())
58    }
59}
60
61impl TimestampUs {
62    pub fn now() -> Self {
63        let value = SystemTime::now()
64            .duration_since(UNIX_EPOCH)
65            .expect("invalid system time")
66            .as_micros()
67            .try_into()
68            .expect("invalid system time");
69        Self(value)
70    }
71
72    pub fn saturating_us_since(self, other: Self) -> u64 {
73        self.0.saturating_sub(other.0)
74    }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
78#[repr(transparent)]
79pub struct Rate(pub i64);
80
81impl Rate {
82    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
83        let value: Decimal = value.parse()?;
84        let coef = 10i64.checked_pow(exponent).context("overflow")?;
85        let coef = Decimal::from_i64(coef).context("overflow")?;
86        let value = value.checked_mul(coef).context("overflow")?;
87        if !value.is_integer() {
88            bail!("price value is more precise than available exponent");
89        }
90        let value: i64 = value.try_into().context("overflow")?;
91        Ok(Self(value))
92    }
93
94    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
95        let value = Decimal::from_f64(value).context("overflow")?;
96        let coef = 10i64.checked_pow(exponent).context("overflow")?;
97        let coef = Decimal::from_i64(coef).context("overflow")?;
98        let value = value.checked_mul(coef).context("overflow")?;
99        let value: i64 = value.try_into().context("overflow")?;
100        Ok(Self(value))
101    }
102
103    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
104        let coef = 10i64.checked_pow(exponent).context("overflow")?;
105        let value = value.checked_mul(coef).context("overflow")?;
106        Ok(Self(value))
107    }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
111#[repr(transparent)]
112pub struct Price(pub NonZeroI64);
113
114impl Price {
115    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
116        let coef = 10i64.checked_pow(exponent).context("overflow")?;
117        let value = value.checked_mul(coef).context("overflow")?;
118        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
119        Ok(Self(value))
120    }
121
122    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
123        let value: Decimal = value.parse()?;
124        let coef = 10i64.checked_pow(exponent).context("overflow")?;
125        let coef = Decimal::from_i64(coef).context("overflow")?;
126        let value = value.checked_mul(coef).context("overflow")?;
127        if !value.is_integer() {
128            bail!("price value is more precise than available exponent");
129        }
130        let value: i64 = value.try_into().context("overflow")?;
131        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
132        Ok(Self(value))
133    }
134
135    pub fn new(value: i64) -> anyhow::Result<Self> {
136        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
137        Ok(Self(value))
138    }
139
140    pub fn into_inner(self) -> NonZeroI64 {
141        self.0
142    }
143
144    pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
145        Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
146    }
147
148    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
149        let value = (value * 10f64.powi(exponent as i32)) as i64;
150        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
151        Ok(Self(value))
152    }
153
154    pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
155        let left_value = i128::from(self.0.get());
156        let right_value = i128::from(rhs.0.get());
157
158        let value = left_value * right_value / 10i128.pow(rhs_exponent);
159        let value = value.try_into()?;
160        NonZeroI64::new(value)
161            .context("zero price is unsupported")
162            .map(Self)
163    }
164}
165
166impl Sub<i64> for Price {
167    type Output = Option<Price>;
168
169    fn sub(self, rhs: i64) -> Self::Output {
170        let value = self.0.get().saturating_sub(rhs);
171        NonZeroI64::new(value).map(Self)
172    }
173}
174
175impl Add<i64> for Price {
176    type Output = Option<Price>;
177
178    fn add(self, rhs: i64) -> Self::Output {
179        let value = self.0.get().saturating_add(rhs);
180        NonZeroI64::new(value).map(Self)
181    }
182}
183
184impl Add<Price> for Price {
185    type Output = Option<Price>;
186    fn add(self, rhs: Price) -> Self::Output {
187        let value = self.0.get().saturating_add(rhs.0.get());
188        NonZeroI64::new(value).map(Self)
189    }
190}
191
192impl Sub<Price> for Price {
193    type Output = Option<Price>;
194    fn sub(self, rhs: Price) -> Self::Output {
195        let value = self.0.get().saturating_sub(rhs.0.get());
196        NonZeroI64::new(value).map(Self)
197    }
198}
199
200impl Div<i64> for Price {
201    type Output = Option<Price>;
202    fn div(self, rhs: i64) -> Self::Output {
203        let value = self.0.get().saturating_div(rhs);
204        NonZeroI64::new(value).map(Self)
205    }
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
209#[serde(rename_all = "camelCase")]
210pub enum PriceFeedProperty {
211    Price,
212    BestBidPrice,
213    BestAskPrice,
214    PublisherCount,
215    Exponent,
216    Confidence,
217    FundingRate,
218    FundingTimestamp,
219    // More fields may be added later.
220}
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
223#[serde(rename_all = "camelCase")]
224pub enum DeliveryFormat {
225    /// Deliver stream updates as JSON text messages.
226    #[default]
227    Json,
228    /// Deliver stream updates as binary messages.
229    Binary,
230}
231
232#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
233#[serde(rename_all = "camelCase")]
234pub enum Format {
235    Evm,
236    Solana,
237    LeEcdsa,
238    LeUnsigned,
239}
240
241#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
242#[serde(rename_all = "camelCase")]
243pub enum JsonBinaryEncoding {
244    #[default]
245    Base64,
246    Hex,
247}
248
249#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
250pub enum Channel {
251    FixedRate(FixedRate),
252}
253
254impl Serialize for Channel {
255    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
256    where
257        S: serde::Serializer,
258    {
259        match self {
260            Channel::FixedRate(fixed_rate) => {
261                if *fixed_rate == FixedRate::MIN {
262                    return serializer.serialize_str("real_time");
263                }
264                serializer.serialize_str(&format!("fixed_rate@{}ms", fixed_rate.value_ms()))
265            }
266        }
267    }
268}
269
270pub mod channel_ids {
271    use super::ChannelId;
272
273    pub const FIXED_RATE_1: ChannelId = ChannelId(1);
274    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
275    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
276}
277
278impl Display for Channel {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        match self {
281            Channel::FixedRate(fixed_rate) => match *fixed_rate {
282                FixedRate::MIN => write!(f, "real_time"),
283                rate => write!(f, "fixed_rate@{}ms", rate.value_ms()),
284            },
285        }
286    }
287}
288
289impl Channel {
290    pub fn id(&self) -> ChannelId {
291        match self {
292            Channel::FixedRate(fixed_rate) => match fixed_rate.value_ms() {
293                1 => channel_ids::FIXED_RATE_1,
294                50 => channel_ids::FIXED_RATE_50,
295                200 => channel_ids::FIXED_RATE_200,
296                _ => panic!("unknown channel: {self:?}"),
297            },
298        }
299    }
300}
301
302#[test]
303fn id_supports_all_fixed_rates() {
304    for rate in FixedRate::ALL {
305        Channel::FixedRate(rate).id();
306    }
307}
308
309fn parse_channel(value: &str) -> Option<Channel> {
310    if value == "real_time" {
311        Some(Channel::FixedRate(FixedRate::MIN))
312    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
313        let ms_value = rest.strip_suffix("ms")?;
314        Some(Channel::FixedRate(FixedRate::from_ms(
315            ms_value.parse().ok()?,
316        )?))
317    } else {
318        None
319    }
320}
321
322impl<'de> Deserialize<'de> for Channel {
323    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
324    where
325        D: serde::Deserializer<'de>,
326    {
327        let value = <String>::deserialize(deserializer)?;
328        parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
329    }
330}
331
332#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
333pub struct FixedRate {
334    ms: u32,
335}
336
337impl FixedRate {
338    // Assumptions (tested below):
339    // - Values are sorted.
340    // - 1 second contains a whole number of each interval.
341    // - all intervals are divisable by the smallest interval.
342    pub const ALL: [Self; 3] = [Self { ms: 1 }, Self { ms: 50 }, Self { ms: 200 }];
343    pub const MIN: Self = Self::ALL[0];
344
345    pub fn from_ms(value: u32) -> Option<Self> {
346        Self::ALL.into_iter().find(|v| v.ms == value)
347    }
348
349    pub fn value_ms(self) -> u32 {
350        self.ms
351    }
352
353    pub fn value_us(self) -> u64 {
354        (self.ms * 1000).into()
355    }
356}
357
358#[test]
359fn fixed_rate_values() {
360    assert!(
361        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
362        "values must be unique and sorted"
363    );
364    for value in FixedRate::ALL {
365        assert_eq!(
366            1000 % value.ms,
367            0,
368            "1 s must contain whole number of intervals"
369        );
370        assert_eq!(
371            value.value_us() % FixedRate::MIN.value_us(),
372            0,
373            "the interval's borders must be a subset of the minimal interval's borders"
374        );
375    }
376}
377
378#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
379#[serde(rename_all = "camelCase")]
380pub struct SubscriptionParamsRepr {
381    pub price_feed_ids: Vec<PriceFeedId>,
382    pub properties: Vec<PriceFeedProperty>,
383    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
384    #[serde(alias = "chains")]
385    pub formats: Vec<Format>,
386    #[serde(default)]
387    pub delivery_format: DeliveryFormat,
388    #[serde(default)]
389    pub json_binary_encoding: JsonBinaryEncoding,
390    /// If `true`, the stream update will contain a `parsed` JSON field containing
391    /// all data of the update.
392    #[serde(default = "default_parsed")]
393    pub parsed: bool,
394    pub channel: Channel,
395    #[serde(default)]
396    pub ignore_invalid_feed_ids: bool,
397}
398
399#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
400#[serde(rename_all = "camelCase")]
401pub struct SubscriptionParams(SubscriptionParamsRepr);
402
403impl<'de> Deserialize<'de> for SubscriptionParams {
404    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
405    where
406        D: serde::Deserializer<'de>,
407    {
408        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
409        Self::new(value).map_err(Error::custom)
410    }
411}
412
413impl SubscriptionParams {
414    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
415        if value.price_feed_ids.is_empty() {
416            return Err("no price feed ids specified");
417        }
418        if !value.price_feed_ids.iter().all_unique() {
419            return Err("duplicate price feed ids specified");
420        }
421        if !value.formats.iter().all_unique() {
422            return Err("duplicate formats or chains specified");
423        }
424        if value.properties.is_empty() {
425            return Err("no properties specified");
426        }
427        if !value.properties.iter().all_unique() {
428            return Err("duplicate properties specified");
429        }
430        Ok(Self(value))
431    }
432}
433
434impl Deref for SubscriptionParams {
435    type Target = SubscriptionParamsRepr;
436
437    fn deref(&self) -> &Self::Target {
438        &self.0
439    }
440}
441impl DerefMut for SubscriptionParams {
442    fn deref_mut(&mut self) -> &mut Self::Target {
443        &mut self.0
444    }
445}
446
447pub fn default_parsed() -> bool {
448    true
449}
450
451#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
452#[serde(rename_all = "camelCase")]
453pub struct JsonBinaryData {
454    pub encoding: JsonBinaryEncoding,
455    pub data: String,
456}
457
458#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
459#[serde(rename_all = "camelCase")]
460pub struct JsonUpdate {
461    /// Present unless `parsed = false` is specified in subscription params.
462    #[serde(skip_serializing_if = "Option::is_none")]
463    pub parsed: Option<ParsedPayload>,
464    /// Only present if `Evm` is present in `formats` in subscription params.
465    #[serde(skip_serializing_if = "Option::is_none")]
466    pub evm: Option<JsonBinaryData>,
467    /// Only present if `Solana` is present in `formats` in subscription params.
468    #[serde(skip_serializing_if = "Option::is_none")]
469    pub solana: Option<JsonBinaryData>,
470    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
471    #[serde(skip_serializing_if = "Option::is_none")]
472    pub le_ecdsa: Option<JsonBinaryData>,
473    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
474    #[serde(skip_serializing_if = "Option::is_none")]
475    pub le_unsigned: Option<JsonBinaryData>,
476}
477
478#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
479#[serde(rename_all = "camelCase")]
480pub struct ParsedPayload {
481    #[serde(with = "crate::serde_str::timestamp")]
482    pub timestamp_us: TimestampUs,
483    pub price_feeds: Vec<ParsedFeedPayload>,
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
487#[serde(rename_all = "camelCase")]
488pub struct ParsedFeedPayload {
489    pub price_feed_id: PriceFeedId,
490    #[serde(skip_serializing_if = "Option::is_none")]
491    #[serde(with = "crate::serde_str::option_price")]
492    #[serde(default)]
493    pub price: Option<Price>,
494    #[serde(skip_serializing_if = "Option::is_none")]
495    #[serde(with = "crate::serde_str::option_price")]
496    #[serde(default)]
497    pub best_bid_price: Option<Price>,
498    #[serde(skip_serializing_if = "Option::is_none")]
499    #[serde(with = "crate::serde_str::option_price")]
500    #[serde(default)]
501    pub best_ask_price: Option<Price>,
502    #[serde(skip_serializing_if = "Option::is_none")]
503    #[serde(default)]
504    pub publisher_count: Option<u16>,
505    #[serde(skip_serializing_if = "Option::is_none")]
506    #[serde(default)]
507    pub exponent: Option<i16>,
508    #[serde(skip_serializing_if = "Option::is_none")]
509    #[serde(default)]
510    pub confidence: Option<Price>,
511    #[serde(skip_serializing_if = "Option::is_none")]
512    #[serde(default)]
513    pub funding_rate: Option<Rate>,
514    #[serde(skip_serializing_if = "Option::is_none")]
515    #[serde(default)]
516    pub funding_timestamp: Option<TimestampUs>,
517    // More fields may be added later.
518}
519
520impl ParsedFeedPayload {
521    pub fn new(
522        price_feed_id: PriceFeedId,
523        exponent: Option<i16>,
524        data: &AggregatedPriceFeedData,
525        properties: &[PriceFeedProperty],
526    ) -> Self {
527        let mut output = Self {
528            price_feed_id,
529            price: None,
530            best_bid_price: None,
531            best_ask_price: None,
532            publisher_count: None,
533            exponent: None,
534            confidence: None,
535            funding_rate: None,
536            funding_timestamp: None,
537        };
538        for &property in properties {
539            match property {
540                PriceFeedProperty::Price => {
541                    output.price = data.price;
542                }
543                PriceFeedProperty::BestBidPrice => {
544                    output.best_bid_price = data.best_bid_price;
545                }
546                PriceFeedProperty::BestAskPrice => {
547                    output.best_ask_price = data.best_ask_price;
548                }
549                PriceFeedProperty::PublisherCount => {
550                    output.publisher_count = Some(data.publisher_count);
551                }
552                PriceFeedProperty::Exponent => {
553                    output.exponent = exponent;
554                }
555                PriceFeedProperty::Confidence => {
556                    output.confidence = data.confidence;
557                }
558                PriceFeedProperty::FundingRate => {
559                    output.funding_rate = data.funding_rate;
560                }
561                PriceFeedProperty::FundingTimestamp => {
562                    output.funding_timestamp = data.funding_timestamp;
563                }
564            }
565        }
566        output
567    }
568
569    pub fn new_full(
570        price_feed_id: PriceFeedId,
571        exponent: Option<i16>,
572        data: &AggregatedPriceFeedData,
573    ) -> Self {
574        Self {
575            price_feed_id,
576            price: data.price,
577            best_bid_price: data.best_bid_price,
578            best_ask_price: data.best_ask_price,
579            publisher_count: Some(data.publisher_count),
580            exponent,
581            confidence: data.confidence,
582            funding_rate: data.funding_rate,
583            funding_timestamp: data.funding_timestamp,
584        }
585    }
586}