pyth_lazer_protocol/
router.rs

1//! WebSocket JSON protocol types for API the router provides to consumers and publishers.
2
3use {
4    crate::payload::AggregatedPriceFeedData,
5    anyhow::{bail, Context},
6    itertools::Itertools,
7    rust_decimal::{prelude::FromPrimitive, Decimal},
8    serde::{de::Error, Deserialize, Serialize},
9    std::{
10        num::NonZeroI64,
11        ops::{Add, Deref, DerefMut, Div, Sub},
12        time::{SystemTime, UNIX_EPOCH},
13    },
14};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct PublisherId(pub u16);
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct PriceFeedId(pub u32);
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct ChannelId(pub u8);
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
26pub struct TimestampUs(pub u64);
27
28impl TimestampUs {
29    pub fn now() -> Self {
30        let value = SystemTime::now()
31            .duration_since(UNIX_EPOCH)
32            .expect("invalid system time")
33            .as_micros()
34            .try_into()
35            .expect("invalid system time");
36        Self(value)
37    }
38
39    pub fn saturating_us_since(self, other: Self) -> u64 {
40        self.0.saturating_sub(other.0)
41    }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
45#[repr(transparent)]
46pub struct Price(pub NonZeroI64);
47
48impl Price {
49    pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
50        let coef = 10i64.checked_pow(exponent).context("overflow")?;
51        let value = value.checked_mul(coef).context("overflow")?;
52        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
53        Ok(Self(value))
54    }
55
56    pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
57        let value: Decimal = value.parse()?;
58        let coef = 10i64.checked_pow(exponent).context("overflow")?;
59        let coef = Decimal::from_i64(coef).context("overflow")?;
60        let value = value.checked_mul(coef).context("overflow")?;
61        if !value.is_integer() {
62            bail!("price value is more precise than available exponent");
63        }
64        let value: i64 = value.try_into().context("overflow")?;
65        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
66        Ok(Self(value))
67    }
68
69    pub fn new(value: i64) -> anyhow::Result<Self> {
70        let value = NonZeroI64::new(value).context("zero price is unsupported")?;
71        Ok(Self(value))
72    }
73
74    pub fn into_inner(self) -> NonZeroI64 {
75        self.0
76    }
77
78    pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
79        Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
80    }
81
82    pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
83        let left_value = i128::from(self.0.get());
84        let right_value = i128::from(rhs.0.get());
85
86        let value = left_value * right_value / 10i128.pow(rhs_exponent);
87        let value = value.try_into()?;
88        NonZeroI64::new(value)
89            .context("zero price is unsupported")
90            .map(Self)
91    }
92}
93
94impl Sub<i64> for Price {
95    type Output = Option<Price>;
96
97    fn sub(self, rhs: i64) -> Self::Output {
98        let value = self.0.get().saturating_sub(rhs);
99        NonZeroI64::new(value).map(Self)
100    }
101}
102
103impl Add<i64> for Price {
104    type Output = Option<Price>;
105
106    fn add(self, rhs: i64) -> Self::Output {
107        let value = self.0.get().saturating_add(rhs);
108        NonZeroI64::new(value).map(Self)
109    }
110}
111
112impl Add<Price> for Price {
113    type Output = Option<Price>;
114    fn add(self, rhs: Price) -> Self::Output {
115        let value = self.0.get().saturating_add(rhs.0.get());
116        NonZeroI64::new(value).map(Self)
117    }
118}
119
120impl Sub<Price> for Price {
121    type Output = Option<Price>;
122    fn sub(self, rhs: Price) -> Self::Output {
123        let value = self.0.get().saturating_sub(rhs.0.get());
124        NonZeroI64::new(value).map(Self)
125    }
126}
127
128impl Div<i64> for Price {
129    type Output = Option<Price>;
130    fn div(self, rhs: i64) -> Self::Output {
131        let value = self.0.get().saturating_div(rhs);
132        NonZeroI64::new(value).map(Self)
133    }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub enum PriceFeedProperty {
139    Price,
140    BestBidPrice,
141    BestAskPrice,
142    PublisherCount,
143    Exponent,
144    Confidence,
145    // More fields may be added later.
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
149#[serde(rename_all = "camelCase")]
150pub enum DeliveryFormat {
151    /// Deliver stream updates as JSON text messages.
152    #[default]
153    Json,
154    /// Deliver stream updates as binary messages.
155    Binary,
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
159#[serde(rename_all = "camelCase")]
160pub enum Chain {
161    Evm,
162    Solana,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
166#[serde(rename_all = "camelCase")]
167pub enum JsonBinaryEncoding {
168    #[default]
169    Base64,
170    Hex,
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
174pub enum Channel {
175    FixedRate(FixedRate),
176}
177
178impl Serialize for Channel {
179    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
180    where
181        S: serde::Serializer,
182    {
183        match self {
184            Channel::FixedRate(fixed_rate) => {
185                if *fixed_rate == FixedRate::MIN {
186                    return serializer.serialize_str("real_time");
187                }
188                serializer.serialize_str(&format!("fixed_rate@{}ms", fixed_rate.value_ms()))
189            }
190        }
191    }
192}
193
194mod channel_ids {
195    use super::ChannelId;
196
197    pub const FIXED_RATE_1: ChannelId = ChannelId(1);
198    pub const FIXED_RATE_50: ChannelId = ChannelId(2);
199    pub const FIXED_RATE_200: ChannelId = ChannelId(3);
200}
201
202impl Channel {
203    pub fn id(&self) -> ChannelId {
204        match self {
205            Channel::FixedRate(fixed_rate) => match fixed_rate.value_ms() {
206                1 => channel_ids::FIXED_RATE_1,
207                50 => channel_ids::FIXED_RATE_50,
208                200 => channel_ids::FIXED_RATE_200,
209                _ => panic!("unknown channel: {self:?}"),
210            },
211        }
212    }
213}
214
215#[test]
216fn id_supports_all_fixed_rates() {
217    for rate in FixedRate::ALL {
218        Channel::FixedRate(rate).id();
219    }
220}
221
222fn parse_channel(value: &str) -> Option<Channel> {
223    if value == "real_time" {
224        Some(Channel::FixedRate(FixedRate::MIN))
225    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
226        let ms_value = rest.strip_suffix("ms")?;
227        Some(Channel::FixedRate(FixedRate::from_ms(
228            ms_value.parse().ok()?,
229        )?))
230    } else {
231        None
232    }
233}
234
235impl<'de> Deserialize<'de> for Channel {
236    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
237    where
238        D: serde::Deserializer<'de>,
239    {
240        let value = <String>::deserialize(deserializer)?;
241        parse_channel(&value).ok_or_else(|| D::Error::custom("unknown channel"))
242    }
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
246pub struct FixedRate {
247    ms: u32,
248}
249
250impl FixedRate {
251    // Assumptions (tested below):
252    // - Values are sorted.
253    // - 1 second contains a whole number of each interval.
254    // - all intervals are divisable by the smallest interval.
255    pub const ALL: [Self; 3] = [Self { ms: 1 }, Self { ms: 50 }, Self { ms: 200 }];
256    pub const MIN: Self = Self::ALL[0];
257
258    pub fn from_ms(value: u32) -> Option<Self> {
259        Self::ALL.into_iter().find(|v| v.ms == value)
260    }
261
262    pub fn value_ms(self) -> u32 {
263        self.ms
264    }
265
266    pub fn value_us(self) -> u64 {
267        (self.ms * 1000).into()
268    }
269}
270
271#[test]
272fn fixed_rate_values() {
273    assert!(
274        FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
275        "values must be unique and sorted"
276    );
277    for value in FixedRate::ALL {
278        assert!(
279            1000 % value.ms == 0,
280            "1 s must contain whole number of intervals"
281        );
282        assert!(
283            value.value_us() % FixedRate::MIN.value_us() == 0,
284            "the interval's borders must be a subset of the minimal interval's borders"
285        );
286    }
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
290#[serde(rename_all = "camelCase")]
291pub struct SubscriptionParamsRepr {
292    pub price_feed_ids: Vec<PriceFeedId>,
293    pub properties: Vec<PriceFeedProperty>,
294    pub chains: Vec<Chain>,
295    #[serde(default)]
296    pub delivery_format: DeliveryFormat,
297    #[serde(default)]
298    pub json_binary_encoding: JsonBinaryEncoding,
299    /// If `true`, the stream update will contain a JSON object containing
300    /// all data of the update.
301    #[serde(default = "default_parsed")]
302    pub parsed: bool,
303    pub channel: Channel,
304}
305
306#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
307#[serde(rename_all = "camelCase")]
308pub struct SubscriptionParams(SubscriptionParamsRepr);
309
310impl<'de> Deserialize<'de> for SubscriptionParams {
311    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
312    where
313        D: serde::Deserializer<'de>,
314    {
315        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
316        Self::new(value).map_err(D::Error::custom)
317    }
318}
319
320impl SubscriptionParams {
321    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
322        if value.price_feed_ids.is_empty() {
323            return Err("no price feed ids specified");
324        }
325        if !value.price_feed_ids.iter().all_unique() {
326            return Err("duplicate price feed ids specified");
327        }
328        if !value.chains.iter().all_unique() {
329            return Err("duplicate chains specified");
330        }
331        if value.properties.is_empty() {
332            return Err("no properties specified");
333        }
334        if !value.properties.iter().all_unique() {
335            return Err("duplicate properties specified");
336        }
337        Ok(Self(value))
338    }
339}
340
341impl Deref for SubscriptionParams {
342    type Target = SubscriptionParamsRepr;
343
344    fn deref(&self) -> &Self::Target {
345        &self.0
346    }
347}
348impl DerefMut for SubscriptionParams {
349    fn deref_mut(&mut self) -> &mut Self::Target {
350        &mut self.0
351    }
352}
353
354pub fn default_parsed() -> bool {
355    true
356}
357
358#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
359#[serde(rename_all = "camelCase")]
360pub struct JsonBinaryData {
361    pub encoding: JsonBinaryEncoding,
362    pub data: String,
363}
364
365#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
366#[serde(rename_all = "camelCase")]
367pub struct JsonUpdate {
368    #[serde(skip_serializing_if = "Option::is_none")]
369    pub parsed: Option<ParsedPayload>,
370    #[serde(skip_serializing_if = "Option::is_none")]
371    pub evm: Option<JsonBinaryData>,
372    #[serde(skip_serializing_if = "Option::is_none")]
373    pub solana: Option<JsonBinaryData>,
374}
375
376#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
377#[serde(rename_all = "camelCase")]
378pub struct ParsedPayload {
379    #[serde(with = "crate::serde_str::timestamp")]
380    pub timestamp_us: TimestampUs,
381    pub price_feeds: Vec<ParsedFeedPayload>,
382}
383
384#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
385#[serde(rename_all = "camelCase")]
386pub struct NatsPayload {
387    pub payload: ParsedPayload,
388    pub channel: Channel,
389}
390
391#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
392#[serde(rename_all = "camelCase")]
393pub struct ParsedFeedPayload {
394    pub price_feed_id: PriceFeedId,
395    #[serde(skip_serializing_if = "Option::is_none")]
396    #[serde(with = "crate::serde_str::option_price")]
397    #[serde(default)]
398    pub price: Option<Price>,
399    #[serde(skip_serializing_if = "Option::is_none")]
400    #[serde(with = "crate::serde_str::option_price")]
401    #[serde(default)]
402    pub best_bid_price: Option<Price>,
403    #[serde(skip_serializing_if = "Option::is_none")]
404    #[serde(with = "crate::serde_str::option_price")]
405    #[serde(default)]
406    pub best_ask_price: Option<Price>,
407    #[serde(skip_serializing_if = "Option::is_none")]
408    #[serde(default)]
409    pub publisher_count: Option<u16>,
410    #[serde(skip_serializing_if = "Option::is_none")]
411    #[serde(default)]
412    pub exponent: Option<i16>,
413    #[serde(skip_serializing_if = "Option::is_none")]
414    #[serde(default)]
415    pub confidence: Option<Price>,
416    // More fields may be added later.
417}
418
419impl ParsedFeedPayload {
420    pub fn new(
421        price_feed_id: PriceFeedId,
422        exponent: Option<i16>,
423        data: &AggregatedPriceFeedData,
424        properties: &[PriceFeedProperty],
425    ) -> Self {
426        let mut output = Self {
427            price_feed_id,
428            price: None,
429            best_bid_price: None,
430            best_ask_price: None,
431            publisher_count: None,
432            exponent: None,
433            confidence: None,
434        };
435        for &property in properties {
436            match property {
437                PriceFeedProperty::Price => {
438                    output.price = data.price;
439                }
440                PriceFeedProperty::BestBidPrice => {
441                    output.best_bid_price = data.best_bid_price;
442                }
443                PriceFeedProperty::BestAskPrice => {
444                    output.best_ask_price = data.best_ask_price;
445                }
446                PriceFeedProperty::PublisherCount => {
447                    output.publisher_count = data.publisher_count;
448                }
449                PriceFeedProperty::Exponent => {
450                    output.exponent = exponent;
451                }
452                PriceFeedProperty::Confidence => {
453                    output.confidence = data.confidence;
454                }
455            }
456        }
457        output
458    }
459
460    pub fn new_full(
461        price_feed_id: PriceFeedId,
462        exponent: Option<i16>,
463        data: &AggregatedPriceFeedData,
464    ) -> Self {
465        Self {
466            price_feed_id,
467            price: data.price,
468            best_bid_price: data.best_bid_price,
469            best_ask_price: data.best_ask_price,
470            publisher_count: data.publisher_count,
471            exponent,
472            confidence: data.confidence,
473        }
474    }
475}