pyth_lazer_protocol/
router.rs

1//! WebSocket JSON protocol types for API the router provides to consumers and publishers.
2
3use {
4    crate::payload::AggregatedPriceFeedData,
5    anyhow::{bail, Context},
6    itertools::Itertools,
7    protobuf::well_known_types::timestamp::Timestamp,
8    rust_decimal::{prelude::FromPrimitive, Decimal},
9    serde::{de::Error, Deserialize, Serialize},
10    std::{
11        fmt::Display,
12        num::NonZeroI64,
13        ops::{Add, Deref, DerefMut, Div, Sub},
14        time::{SystemTime, UNIX_EPOCH},
15    },
16};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
19pub struct PublisherId(pub u16);
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
22pub struct PriceFeedId(pub u32);
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
25pub struct ChannelId(pub u8);
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
28pub struct TimestampUs(pub u64);
29
30impl TryFrom<&Timestamp> for TimestampUs {
31    type Error = anyhow::Error;
32
33    fn try_from(timestamp: &Timestamp) -> anyhow::Result<Self> {
34        let seconds_in_micros: u64 = (timestamp.seconds * 1_000_000).try_into()?;
35        let nanos_in_micros: u64 = (timestamp.nanos / 1_000).try_into()?;
36        Ok(TimestampUs(seconds_in_micros + nanos_in_micros))
37    }
38}
39
40impl TimestampUs {
41    pub fn now() -> Self {
42        let value = SystemTime::now()
43            .duration_since(UNIX_EPOCH)
44            .expect("invalid system time")
45            .as_micros()
46            .try_into()
47            .expect("invalid system time");
48        Self(value)
49    }
50
51    pub fn saturating_us_since(self, other: Self) -> u64 {
52        self.0.saturating_sub(other.0)
53    }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
57#[repr(transparent)]
58pub struct Rate(pub i64);
59
60impl Rate {
61    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
62        let value: Decimal = value.parse()?;
63        let coef = 10i64.checked_pow(exponent).context("overflow")?;
64        let coef = Decimal::from_i64(coef).context("overflow")?;
65        let value = value.checked_mul(coef).context("overflow")?;
66        if !value.is_integer() {
67            bail!("price value is more precise than available exponent");
68        }
69        let value: i64 = value.try_into().context("overflow")?;
70        Ok(Self(value))
71    }
72
73    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
74        let value = Decimal::from_f64(value).context("overflow")?;
75        let coef = 10i64.checked_pow(exponent).context("overflow")?;
76        let coef = Decimal::from_i64(coef).context("overflow")?;
77        let value = value.checked_mul(coef).context("overflow")?;
78        let value: i64 = value.try_into().context("overflow")?;
79        Ok(Self(value))
80    }
81
82    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
83        let coef = 10i64.checked_pow(exponent).context("overflow")?;
84        let value = value.checked_mul(coef).context("overflow")?;
85        Ok(Self(value))
86    }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
90#[repr(transparent)]
91pub struct Price(pub NonZeroI64);
92
93impl Price {
94    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
95        let coef = 10i64.checked_pow(exponent).context("overflow")?;
96        let value = value.checked_mul(coef).context("overflow")?;
97        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
98        Ok(Self(value))
99    }
100
101    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
102        let value: Decimal = value.parse()?;
103        let coef = 10i64.checked_pow(exponent).context("overflow")?;
104        let coef = Decimal::from_i64(coef).context("overflow")?;
105        let value = value.checked_mul(coef).context("overflow")?;
106        if !value.is_integer() {
107            bail!("price value is more precise than available exponent");
108        }
109        let value: i64 = value.try_into().context("overflow")?;
110        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
111        Ok(Self(value))
112    }
113
114    pub fn new(value: i64) -> anyhow::Result<Self> {
115        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
116        Ok(Self(value))
117    }
118
119    pub fn into_inner(self) -> NonZeroI64 {
120        self.0
121    }
122
123    pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
124        Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
125    }
126
127    pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
128        let value = (value * 10f64.powi(exponent as i32)) as i64;
129        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
130        Ok(Self(value))
131    }
132
133    pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
134        let left_value = i128::from(self.0.get());
135        let right_value = i128::from(rhs.0.get());
136
137        let value = left_value * right_value / 10i128.pow(rhs_exponent);
138        let value = value.try_into()?;
139        NonZeroI64::new(value)
140            .context("zero price is unsupported")
141            .map(Self)
142    }
143}
144
145impl Sub<i64> for Price {
146    type Output = Option<Price>;
147
148    fn sub(self, rhs: i64) -> Self::Output {
149        let value = self.0.get().saturating_sub(rhs);
150        NonZeroI64::new(value).map(Self)
151    }
152}
153
154impl Add<i64> for Price {
155    type Output = Option<Price>;
156
157    fn add(self, rhs: i64) -> Self::Output {
158        let value = self.0.get().saturating_add(rhs);
159        NonZeroI64::new(value).map(Self)
160    }
161}
162
163impl Add<Price> for Price {
164    type Output = Option<Price>;
165    fn add(self, rhs: Price) -> Self::Output {
166        let value = self.0.get().saturating_add(rhs.0.get());
167        NonZeroI64::new(value).map(Self)
168    }
169}
170
171impl Sub<Price> for Price {
172    type Output = Option<Price>;
173    fn sub(self, rhs: Price) -> Self::Output {
174        let value = self.0.get().saturating_sub(rhs.0.get());
175        NonZeroI64::new(value).map(Self)
176    }
177}
178
179impl Div<i64> for Price {
180    type Output = Option<Price>;
181    fn div(self, rhs: i64) -> Self::Output {
182        let value = self.0.get().saturating_div(rhs);
183        NonZeroI64::new(value).map(Self)
184    }
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
188#[serde(rename_all = "camelCase")]
189pub enum PriceFeedProperty {
190    Price,
191    BestBidPrice,
192    BestAskPrice,
193    PublisherCount,
194    Exponent,
195    Confidence,
196    FundingRate,
197    FundingTimestamp,
198    // More fields may be added later.
199}
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
202#[serde(rename_all = "camelCase")]
203pub enum DeliveryFormat {
204    /// Deliver stream updates as JSON text messages.
205    #[default]
206    Json,
207    /// Deliver stream updates as binary messages.
208    Binary,
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
212#[serde(rename_all = "camelCase")]
213pub enum Format {
214    Evm,
215    Solana,
216    LeEcdsa,
217    LeUnsigned,
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
221#[serde(rename_all = "camelCase")]
222pub enum JsonBinaryEncoding {
223    #[default]
224    Base64,
225    Hex,
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
229pub enum Channel {
230    FixedRate(FixedRate),
231}
232
233impl Serialize for Channel {
234    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
235    where
236        S: serde::Serializer,
237    {
238        match self {
239            Channel::FixedRate(fixed_rate) => {
240                if *fixed_rate == FixedRate::MIN {
241                    return serializer.serialize_str("real_time");
242                }
243                serializer.serialize_str(&format!("fixed_rate@{}ms", fixed_rate.value_ms()))
244            }
245        }
246    }
247}
248
249pub mod channel_ids {
250    use super::ChannelId;
251
252    pub const FIXED_RATE_1: ChannelId = ChannelId(1);
253    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
254    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
255}
256
257impl Display for Channel {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        match self {
260            Channel::FixedRate(fixed_rate) => match *fixed_rate {
261                FixedRate::MIN => write!(f, "real_time"),
262                rate => write!(f, "fixed_rate@{}ms", rate.value_ms()),
263            },
264        }
265    }
266}
267
268impl Channel {
269    pub fn id(&self) -> ChannelId {
270        match self {
271            Channel::FixedRate(fixed_rate) => match fixed_rate.value_ms() {
272                1 => channel_ids::FIXED_RATE_1,
273                50 => channel_ids::FIXED_RATE_50,
274                200 => channel_ids::FIXED_RATE_200,
275                _ => panic!("unknown channel: {self:?}"),
276            },
277        }
278    }
279}
280
281#[test]
282fn id_supports_all_fixed_rates() {
283    for rate in FixedRate::ALL {
284        Channel::FixedRate(rate).id();
285    }
286}
287
288fn parse_channel(value: &str) -> Option<Channel> {
289    if value == "real_time" {
290        Some(Channel::FixedRate(FixedRate::MIN))
291    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
292        let ms_value = rest.strip_suffix("ms")?;
293        Some(Channel::FixedRate(FixedRate::from_ms(
294            ms_value.parse().ok()?,
295        )?))
296    } else {
297        None
298    }
299}
300
301impl<'de> Deserialize<'de> for Channel {
302    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
303    where
304        D: serde::Deserializer<'de>,
305    {
306        let value = <String>::deserialize(deserializer)?;
307        parse_channel(&value).ok_or_else(|| D::Error::custom("unknown channel"))
308    }
309}
310
311#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
312pub struct FixedRate {
313    ms: u32,
314}
315
316impl FixedRate {
317    // Assumptions (tested below):
318    // - Values are sorted.
319    // - 1 second contains a whole number of each interval.
320    // - all intervals are divisable by the smallest interval.
321    pub const ALL: [Self; 3] = [Self { ms: 1 }, Self { ms: 50 }, Self { ms: 200 }];
322    pub const MIN: Self = Self::ALL[0];
323
324    pub fn from_ms(value: u32) -> Option<Self> {
325        Self::ALL.into_iter().find(|v| v.ms == value)
326    }
327
328    pub fn value_ms(self) -> u32 {
329        self.ms
330    }
331
332    pub fn value_us(self) -> u64 {
333        (self.ms * 1000).into()
334    }
335}
336
337#[test]
338fn fixed_rate_values() {
339    assert!(
340        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
341        "values must be unique and sorted"
342    );
343    for value in FixedRate::ALL {
344        assert!(
345            1000 % value.ms == 0,
346            "1 s must contain whole number of intervals"
347        );
348        assert!(
349            value.value_us() % FixedRate::MIN.value_us() == 0,
350            "the interval's borders must be a subset of the minimal interval's borders"
351        );
352    }
353}
354
355#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
356#[serde(rename_all = "camelCase")]
357pub struct SubscriptionParamsRepr {
358    pub price_feed_ids: Vec<PriceFeedId>,
359    pub properties: Vec<PriceFeedProperty>,
360    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
361    #[serde(alias = "chains")]
362    pub formats: Vec<Format>,
363    #[serde(default)]
364    pub delivery_format: DeliveryFormat,
365    #[serde(default)]
366    pub json_binary_encoding: JsonBinaryEncoding,
367    /// If `true`, the stream update will contain a `parsed` JSON field containing
368    /// all data of the update.
369    #[serde(default = "default_parsed")]
370    pub parsed: bool,
371    pub channel: Channel,
372    #[serde(default)]
373    pub ignore_invalid_feed_ids: bool,
374}
375
376#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
377#[serde(rename_all = "camelCase")]
378pub struct SubscriptionParams(SubscriptionParamsRepr);
379
380impl<'de> Deserialize<'de> for SubscriptionParams {
381    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
382    where
383        D: serde::Deserializer<'de>,
384    {
385        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
386        Self::new(value).map_err(D::Error::custom)
387    }
388}
389
390impl SubscriptionParams {
391    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
392        if value.price_feed_ids.is_empty() {
393            return Err("no price feed ids specified");
394        }
395        if !value.price_feed_ids.iter().all_unique() {
396            return Err("duplicate price feed ids specified");
397        }
398        if !value.formats.iter().all_unique() {
399            return Err("duplicate formats or chains specified");
400        }
401        if value.properties.is_empty() {
402            return Err("no properties specified");
403        }
404        if !value.properties.iter().all_unique() {
405            return Err("duplicate properties specified");
406        }
407        Ok(Self(value))
408    }
409}
410
411impl Deref for SubscriptionParams {
412    type Target = SubscriptionParamsRepr;
413
414    fn deref(&self) -> &Self::Target {
415        &self.0
416    }
417}
418impl DerefMut for SubscriptionParams {
419    fn deref_mut(&mut self) -> &mut Self::Target {
420        &mut self.0
421    }
422}
423
424pub fn default_parsed() -> bool {
425    true
426}
427
428#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
429#[serde(rename_all = "camelCase")]
430pub struct JsonBinaryData {
431    pub encoding: JsonBinaryEncoding,
432    pub data: String,
433}
434
435#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
436#[serde(rename_all = "camelCase")]
437pub struct JsonUpdate {
438    /// Present unless `parsed = false` is specified in subscription params.
439    #[serde(skip_serializing_if = "Option::is_none")]
440    pub parsed: Option<ParsedPayload>,
441    /// Only present if `Evm` is present in `formats` in subscription params.
442    #[serde(skip_serializing_if = "Option::is_none")]
443    pub evm: Option<JsonBinaryData>,
444    /// Only present if `Solana` is present in `formats` in subscription params.
445    #[serde(skip_serializing_if = "Option::is_none")]
446    pub solana: Option<JsonBinaryData>,
447    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
448    #[serde(skip_serializing_if = "Option::is_none")]
449    pub le_ecdsa: Option<JsonBinaryData>,
450    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
451    #[serde(skip_serializing_if = "Option::is_none")]
452    pub le_unsigned: Option<JsonBinaryData>,
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
456#[serde(rename_all = "camelCase")]
457pub struct ParsedPayload {
458    #[serde(with = "crate::serde_str::timestamp")]
459    pub timestamp_us: TimestampUs,
460    pub price_feeds: Vec<ParsedFeedPayload>,
461}
462
463#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
464#[serde(rename_all = "camelCase")]
465pub struct ParsedFeedPayload {
466    pub price_feed_id: PriceFeedId,
467    #[serde(skip_serializing_if = "Option::is_none")]
468    #[serde(with = "crate::serde_str::option_price")]
469    #[serde(default)]
470    pub price: Option<Price>,
471    #[serde(skip_serializing_if = "Option::is_none")]
472    #[serde(with = "crate::serde_str::option_price")]
473    #[serde(default)]
474    pub best_bid_price: Option<Price>,
475    #[serde(skip_serializing_if = "Option::is_none")]
476    #[serde(with = "crate::serde_str::option_price")]
477    #[serde(default)]
478    pub best_ask_price: Option<Price>,
479    #[serde(skip_serializing_if = "Option::is_none")]
480    #[serde(default)]
481    pub publisher_count: Option<u16>,
482    #[serde(skip_serializing_if = "Option::is_none")]
483    #[serde(default)]
484    pub exponent: Option<i16>,
485    #[serde(skip_serializing_if = "Option::is_none")]
486    #[serde(default)]
487    pub confidence: Option<Price>,
488    #[serde(skip_serializing_if = "Option::is_none")]
489    #[serde(default)]
490    pub funding_rate: Option<Rate>,
491    #[serde(skip_serializing_if = "Option::is_none")]
492    #[serde(default)]
493    pub funding_timestamp: Option<TimestampUs>,
494    // More fields may be added later.
495}
496
497impl ParsedFeedPayload {
498    pub fn new(
499        price_feed_id: PriceFeedId,
500        exponent: Option<i16>,
501        data: &AggregatedPriceFeedData,
502        properties: &[PriceFeedProperty],
503    ) -> Self {
504        let mut output = Self {
505            price_feed_id,
506            price: None,
507            best_bid_price: None,
508            best_ask_price: None,
509            publisher_count: None,
510            exponent: None,
511            confidence: None,
512            funding_rate: None,
513            funding_timestamp: None,
514        };
515        for &property in properties {
516            match property {
517                PriceFeedProperty::Price => {
518                    output.price = data.price;
519                }
520                PriceFeedProperty::BestBidPrice => {
521                    output.best_bid_price = data.best_bid_price;
522                }
523                PriceFeedProperty::BestAskPrice => {
524                    output.best_ask_price = data.best_ask_price;
525                }
526                PriceFeedProperty::PublisherCount => {
527                    output.publisher_count = Some(data.publisher_count);
528                }
529                PriceFeedProperty::Exponent => {
530                    output.exponent = exponent;
531                }
532                PriceFeedProperty::Confidence => {
533                    output.confidence = data.confidence;
534                }
535                PriceFeedProperty::FundingRate => {
536                    output.funding_rate = data.funding_rate;
537                }
538                PriceFeedProperty::FundingTimestamp => {
539                    output.funding_timestamp = data.funding_timestamp;
540                }
541            }
542        }
543        output
544    }
545
546    pub fn new_full(
547        price_feed_id: PriceFeedId,
548        exponent: Option<i16>,
549        data: &AggregatedPriceFeedData,
550    ) -> Self {
551        Self {
552            price_feed_id,
553            price: data.price,
554            best_bid_price: data.best_bid_price,
555            best_ask_price: data.best_ask_price,
556            publisher_count: Some(data.publisher_count),
557            exponent,
558            confidence: data.confidence,
559            funding_rate: data.funding_rate,
560            funding_timestamp: data.funding_timestamp,
561        }
562    }
563}