pyth_lazer_protocol/
api.rs

1use std::{
2    cmp::Ordering,
3    fmt::Display,
4    ops::{Deref, DerefMut},
5};
6
7use derive_more::derive::From;
8use itertools::Itertools as _;
9use serde::{de::Error, Deserialize, Serialize};
10
11use crate::{
12    payload::AggregatedPriceFeedData,
13    time::{DurationUs, FixedRate, TimestampUs},
14    ChannelId, Price, PriceFeedId, PriceFeedProperty, Rate,
15};
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18#[serde(rename_all = "camelCase")]
19pub struct LatestPriceRequestRepr {
20    // Either price feed ids or symbols must be specified.
21    pub price_feed_ids: Option<Vec<PriceFeedId>>,
22    pub symbols: Option<Vec<String>>,
23    pub properties: Vec<PriceFeedProperty>,
24    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
25    #[serde(alias = "chains")]
26    pub formats: Vec<Format>,
27    #[serde(default)]
28    pub json_binary_encoding: JsonBinaryEncoding,
29    /// If `true`, the stream update will contain a JSON object containing
30    /// all data of the update.
31    #[serde(default = "default_parsed")]
32    pub parsed: bool,
33    pub channel: Channel,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
37#[serde(rename_all = "camelCase")]
38pub struct LatestPriceRequest(LatestPriceRequestRepr);
39
40impl<'de> Deserialize<'de> for LatestPriceRequest {
41    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
42    where
43        D: serde::Deserializer<'de>,
44    {
45        let value = LatestPriceRequestRepr::deserialize(deserializer)?;
46        Self::new(value).map_err(Error::custom)
47    }
48}
49
50impl LatestPriceRequest {
51    pub fn new(value: LatestPriceRequestRepr) -> Result<Self, &'static str> {
52        validate_price_feed_ids_or_symbols(&value.price_feed_ids, &value.symbols)?;
53        validate_optional_nonempty_vec_has_unique_elements(
54            &value.price_feed_ids,
55            "no price feed ids specified",
56            "duplicate price feed ids specified",
57        )?;
58        validate_optional_nonempty_vec_has_unique_elements(
59            &value.symbols,
60            "no symbols specified",
61            "duplicate symbols specified",
62        )?;
63        validate_formats(&value.formats)?;
64        validate_properties(&value.properties)?;
65        Ok(Self(value))
66    }
67}
68
69impl Deref for LatestPriceRequest {
70    type Target = LatestPriceRequestRepr;
71
72    fn deref(&self) -> &Self::Target {
73        &self.0
74    }
75}
76impl DerefMut for LatestPriceRequest {
77    fn deref_mut(&mut self) -> &mut Self::Target {
78        &mut self.0
79    }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
83#[serde(rename_all = "camelCase")]
84pub struct PriceRequestRepr {
85    pub timestamp: TimestampUs,
86    // Either price feed ids or symbols must be specified.
87    pub price_feed_ids: Option<Vec<PriceFeedId>>,
88    pub symbols: Option<Vec<String>>,
89    pub properties: Vec<PriceFeedProperty>,
90    pub formats: Vec<Format>,
91    #[serde(default)]
92    pub json_binary_encoding: JsonBinaryEncoding,
93    /// If `true`, the stream update will contain a JSON object containing
94    /// all data of the update.
95    #[serde(default = "default_parsed")]
96    pub parsed: bool,
97    pub channel: Channel,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
101#[serde(rename_all = "camelCase")]
102pub struct PriceRequest(PriceRequestRepr);
103
104impl<'de> Deserialize<'de> for PriceRequest {
105    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
106    where
107        D: serde::Deserializer<'de>,
108    {
109        let value = PriceRequestRepr::deserialize(deserializer)?;
110        Self::new(value).map_err(Error::custom)
111    }
112}
113
114impl PriceRequest {
115    pub fn new(value: PriceRequestRepr) -> Result<Self, &'static str> {
116        validate_price_feed_ids_or_symbols(&value.price_feed_ids, &value.symbols)?;
117        validate_optional_nonempty_vec_has_unique_elements(
118            &value.price_feed_ids,
119            "no price feed ids specified",
120            "duplicate price feed ids specified",
121        )?;
122        validate_optional_nonempty_vec_has_unique_elements(
123            &value.symbols,
124            "no symbols specified",
125            "duplicate symbols specified",
126        )?;
127        validate_formats(&value.formats)?;
128        validate_properties(&value.properties)?;
129        Ok(Self(value))
130    }
131}
132
133impl Deref for PriceRequest {
134    type Target = PriceRequestRepr;
135
136    fn deref(&self) -> &Self::Target {
137        &self.0
138    }
139}
140impl DerefMut for PriceRequest {
141    fn deref_mut(&mut self) -> &mut Self::Target {
142        &mut self.0
143    }
144}
145
146#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
147#[serde(rename_all = "camelCase")]
148pub struct ReducePriceRequest {
149    pub payload: JsonUpdate,
150    pub price_feed_ids: Vec<PriceFeedId>,
151}
152
153pub type LatestPriceResponse = JsonUpdate;
154pub type ReducePriceResponse = JsonUpdate;
155pub type PriceResponse = JsonUpdate;
156
157pub fn default_parsed() -> bool {
158    true
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")]
163pub enum DeliveryFormat {
164    /// Deliver stream updates as JSON text messages.
165    #[default]
166    Json,
167    /// Deliver stream updates as binary messages.
168    Binary,
169}
170
171#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
172#[serde(rename_all = "camelCase")]
173pub enum Format {
174    Evm,
175    Solana,
176    LeEcdsa,
177    LeUnsigned,
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
181#[serde(rename_all = "camelCase")]
182pub enum JsonBinaryEncoding {
183    #[default]
184    Base64,
185    Hex,
186}
187
188#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, From)]
189pub enum Channel {
190    FixedRate(FixedRate),
191    RealTime,
192}
193
194impl PartialOrd for Channel {
195    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
196        let rate_left = match self {
197            Channel::FixedRate(rate) => rate.duration().as_micros(),
198            Channel::RealTime => FixedRate::MIN.duration().as_micros(),
199        };
200        let rate_right = match other {
201            Channel::FixedRate(rate) => rate.duration().as_micros(),
202            Channel::RealTime => FixedRate::MIN.duration().as_micros(),
203        };
204        Some(rate_left.cmp(&rate_right))
205    }
206}
207
208impl Serialize for Channel {
209    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
210    where
211        S: serde::Serializer,
212    {
213        match self {
214            Channel::FixedRate(fixed_rate) => serializer.serialize_str(&format!(
215                "fixed_rate@{}ms",
216                fixed_rate.duration().as_millis()
217            )),
218            Channel::RealTime => serializer.serialize_str("real_time"),
219        }
220    }
221}
222
223impl Display for Channel {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        match self {
226            Channel::FixedRate(fixed_rate) => {
227                write!(f, "fixed_rate@{}ms", fixed_rate.duration().as_millis())
228            }
229            Channel::RealTime => write!(f, "real_time"),
230        }
231    }
232}
233
234impl Channel {
235    pub fn id(&self) -> ChannelId {
236        match self {
237            Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
238                50 => ChannelId::FIXED_RATE_50,
239                200 => ChannelId::FIXED_RATE_200,
240                _ => panic!("unknown channel: {self:?}"),
241            },
242            Channel::RealTime => ChannelId::REAL_TIME,
243        }
244    }
245}
246
247#[test]
248fn id_supports_all_fixed_rates() {
249    for rate in FixedRate::ALL {
250        Channel::FixedRate(rate).id();
251    }
252}
253
254fn parse_channel(value: &str) -> Option<Channel> {
255    if value == "real_time" {
256        Some(Channel::RealTime)
257    } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
258        let ms_value = rest.strip_suffix("ms")?;
259        Some(Channel::FixedRate(FixedRate::from_millis(
260            ms_value.parse().ok()?,
261        )?))
262    } else {
263        None
264    }
265}
266
267impl<'de> Deserialize<'de> for Channel {
268    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
269    where
270        D: serde::Deserializer<'de>,
271    {
272        let value = <String>::deserialize(deserializer)?;
273        parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
274    }
275}
276
277#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
278#[serde(rename_all = "camelCase")]
279pub struct SubscriptionParamsRepr {
280    // Either price feed ids or symbols must be specified.
281    pub price_feed_ids: Option<Vec<PriceFeedId>>,
282    pub symbols: Option<Vec<String>>,
283    pub properties: Vec<PriceFeedProperty>,
284    // "chains" was renamed to "formats". "chains" is still supported for compatibility.
285    #[serde(alias = "chains")]
286    pub formats: Vec<Format>,
287    #[serde(default)]
288    pub delivery_format: DeliveryFormat,
289    #[serde(default)]
290    pub json_binary_encoding: JsonBinaryEncoding,
291    /// If `true`, the stream update will contain a `parsed` JSON field containing
292    /// all data of the update.
293    #[serde(default = "default_parsed")]
294    pub parsed: bool,
295    pub channel: Channel,
296    // "ignoreInvalidFeedIds" was renamed to "ignoreInvalidFeeds". "ignoreInvalidFeedIds" is still supported for compatibility.
297    #[serde(default, alias = "ignoreInvalidFeedIds")]
298    pub ignore_invalid_feeds: bool,
299}
300
301#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
302#[serde(rename_all = "camelCase")]
303pub struct SubscriptionParams(SubscriptionParamsRepr);
304
305impl<'de> Deserialize<'de> for SubscriptionParams {
306    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
307    where
308        D: serde::Deserializer<'de>,
309    {
310        let value = SubscriptionParamsRepr::deserialize(deserializer)?;
311        Self::new(value).map_err(Error::custom)
312    }
313}
314
315impl SubscriptionParams {
316    pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
317        validate_price_feed_ids_or_symbols(&value.price_feed_ids, &value.symbols)?;
318        validate_optional_nonempty_vec_has_unique_elements(
319            &value.price_feed_ids,
320            "no price feed ids specified",
321            "duplicate price feed ids specified",
322        )?;
323        validate_optional_nonempty_vec_has_unique_elements(
324            &value.symbols,
325            "no symbols specified",
326            "duplicate symbols specified",
327        )?;
328        validate_formats(&value.formats)?;
329        validate_properties(&value.properties)?;
330        Ok(Self(value))
331    }
332}
333
334impl Deref for SubscriptionParams {
335    type Target = SubscriptionParamsRepr;
336
337    fn deref(&self) -> &Self::Target {
338        &self.0
339    }
340}
341impl DerefMut for SubscriptionParams {
342    fn deref_mut(&mut self) -> &mut Self::Target {
343        &mut self.0
344    }
345}
346
347#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
348#[serde(rename_all = "camelCase")]
349pub struct JsonBinaryData {
350    pub encoding: JsonBinaryEncoding,
351    pub data: String,
352}
353
354#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
355#[serde(rename_all = "camelCase")]
356pub struct JsonUpdate {
357    /// Present unless `parsed = false` is specified in subscription params.
358    #[serde(skip_serializing_if = "Option::is_none")]
359    pub parsed: Option<ParsedPayload>,
360    /// Only present if `Evm` is present in `formats` in subscription params.
361    #[serde(skip_serializing_if = "Option::is_none")]
362    pub evm: Option<JsonBinaryData>,
363    /// Only present if `Solana` is present in `formats` in subscription params.
364    #[serde(skip_serializing_if = "Option::is_none")]
365    pub solana: Option<JsonBinaryData>,
366    /// Only present if `LeEcdsa` is present in `formats` in subscription params.
367    #[serde(skip_serializing_if = "Option::is_none")]
368    pub le_ecdsa: Option<JsonBinaryData>,
369    /// Only present if `LeUnsigned` is present in `formats` in subscription params.
370    #[serde(skip_serializing_if = "Option::is_none")]
371    pub le_unsigned: Option<JsonBinaryData>,
372}
373
374#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
375#[serde(rename_all = "camelCase")]
376pub struct ParsedPayload {
377    #[serde(with = "crate::serde_str::timestamp")]
378    pub timestamp_us: TimestampUs,
379    pub price_feeds: Vec<ParsedFeedPayload>,
380}
381
382#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
383#[serde(rename_all = "camelCase")]
384pub struct ParsedFeedPayload {
385    pub price_feed_id: PriceFeedId,
386    #[serde(skip_serializing_if = "Option::is_none")]
387    #[serde(with = "crate::serde_str::option_price")]
388    #[serde(default)]
389    pub price: Option<Price>,
390    #[serde(skip_serializing_if = "Option::is_none")]
391    #[serde(with = "crate::serde_str::option_price")]
392    #[serde(default)]
393    pub best_bid_price: Option<Price>,
394    #[serde(skip_serializing_if = "Option::is_none")]
395    #[serde(with = "crate::serde_str::option_price")]
396    #[serde(default)]
397    pub best_ask_price: Option<Price>,
398    #[serde(skip_serializing_if = "Option::is_none")]
399    #[serde(default)]
400    pub publisher_count: Option<u16>,
401    #[serde(skip_serializing_if = "Option::is_none")]
402    #[serde(default)]
403    pub exponent: Option<i16>,
404    #[serde(skip_serializing_if = "Option::is_none")]
405    #[serde(default)]
406    pub confidence: Option<Price>,
407    #[serde(skip_serializing_if = "Option::is_none")]
408    #[serde(default)]
409    pub funding_rate: Option<Rate>,
410    #[serde(skip_serializing_if = "Option::is_none")]
411    #[serde(default)]
412    pub funding_timestamp: Option<TimestampUs>,
413    // More fields may be added later.
414    #[serde(skip_serializing_if = "Option::is_none")]
415    #[serde(default)]
416    pub funding_rate_interval: Option<DurationUs>,
417}
418
419impl ParsedFeedPayload {
420    pub fn new(
421        price_feed_id: PriceFeedId,
422        exponent: Option<i16>,
423        data: &AggregatedPriceFeedData,
424        properties: &[PriceFeedProperty],
425    ) -> Self {
426        let mut output = Self {
427            price_feed_id,
428            price: None,
429            best_bid_price: None,
430            best_ask_price: None,
431            publisher_count: None,
432            exponent: None,
433            confidence: None,
434            funding_rate: None,
435            funding_timestamp: None,
436            funding_rate_interval: None,
437        };
438        for &property in properties {
439            match property {
440                PriceFeedProperty::Price => {
441                    output.price = data.price;
442                }
443                PriceFeedProperty::BestBidPrice => {
444                    output.best_bid_price = data.best_bid_price;
445                }
446                PriceFeedProperty::BestAskPrice => {
447                    output.best_ask_price = data.best_ask_price;
448                }
449                PriceFeedProperty::PublisherCount => {
450                    output.publisher_count = Some(data.publisher_count);
451                }
452                PriceFeedProperty::Exponent => {
453                    output.exponent = exponent;
454                }
455                PriceFeedProperty::Confidence => {
456                    output.confidence = data.confidence;
457                }
458                PriceFeedProperty::FundingRate => {
459                    output.funding_rate = data.funding_rate;
460                }
461                PriceFeedProperty::FundingTimestamp => {
462                    output.funding_timestamp = data.funding_timestamp;
463                }
464                PriceFeedProperty::FundingRateInterval => {
465                    output.funding_rate_interval = data.funding_rate_interval;
466                }
467            }
468        }
469        output
470    }
471
472    pub fn new_full(
473        price_feed_id: PriceFeedId,
474        exponent: Option<i16>,
475        data: &AggregatedPriceFeedData,
476    ) -> Self {
477        Self {
478            price_feed_id,
479            price: data.price,
480            best_bid_price: data.best_bid_price,
481            best_ask_price: data.best_ask_price,
482            publisher_count: Some(data.publisher_count),
483            exponent,
484            confidence: data.confidence,
485            funding_rate: data.funding_rate,
486            funding_timestamp: data.funding_timestamp,
487            funding_rate_interval: data.funding_rate_interval,
488        }
489    }
490}
491
492/// A request sent from the client to the server.
493#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
494#[serde(tag = "type")]
495#[serde(rename_all = "camelCase")]
496pub enum WsRequest {
497    Subscribe(SubscribeRequest),
498    Unsubscribe(UnsubscribeRequest),
499}
500
501#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
502pub struct SubscriptionId(pub u64);
503
504#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
505#[serde(rename_all = "camelCase")]
506pub struct SubscribeRequest {
507    pub subscription_id: SubscriptionId,
508    #[serde(flatten)]
509    pub params: SubscriptionParams,
510}
511
512#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
513#[serde(rename_all = "camelCase")]
514pub struct UnsubscribeRequest {
515    pub subscription_id: SubscriptionId,
516}
517
518/// A JSON response sent from the server to the client.
519#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, From)]
520#[serde(tag = "type")]
521#[serde(rename_all = "camelCase")]
522pub enum WsResponse {
523    Error(ErrorResponse),
524    Subscribed(SubscribedResponse),
525    SubscribedWithInvalidFeedIdsIgnored(SubscribedWithInvalidFeedIdsIgnoredResponse),
526    Unsubscribed(UnsubscribedResponse),
527    SubscriptionError(SubscriptionErrorResponse),
528    StreamUpdated(StreamUpdatedResponse),
529}
530
531/// Sent from the server after a successul subscription.
532#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
533#[serde(rename_all = "camelCase")]
534pub struct SubscribedResponse {
535    pub subscription_id: SubscriptionId,
536}
537
538#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
539#[serde(rename_all = "camelCase")]
540pub struct InvalidFeedSubscriptionDetails {
541    pub unknown_ids: Vec<PriceFeedId>,
542    pub unknown_symbols: Vec<String>,
543    pub unsupported_channels: Vec<PriceFeedId>,
544    pub unstable: Vec<PriceFeedId>,
545}
546
547#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
548#[serde(rename_all = "camelCase")]
549pub struct SubscribedWithInvalidFeedIdsIgnoredResponse {
550    pub subscription_id: SubscriptionId,
551    pub subscribed_feed_ids: Vec<PriceFeedId>,
552    pub ignored_invalid_feed_ids: InvalidFeedSubscriptionDetails,
553}
554
555#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
556#[serde(rename_all = "camelCase")]
557pub struct UnsubscribedResponse {
558    pub subscription_id: SubscriptionId,
559}
560
561/// Sent from the server if the requested subscription or unsubscription request
562/// could not be fulfilled.
563#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
564#[serde(rename_all = "camelCase")]
565pub struct SubscriptionErrorResponse {
566    pub subscription_id: SubscriptionId,
567    pub error: String,
568}
569
570/// Sent from the server if an internal error occured while serving data for an existing subscription,
571/// or a client request sent a bad request.
572#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
573#[serde(rename_all = "camelCase")]
574pub struct ErrorResponse {
575    pub error: String,
576}
577
578/// Sent from the server when new data is available for an existing subscription
579/// (only if `delivery_format == Json`).
580#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
581#[serde(rename_all = "camelCase")]
582pub struct StreamUpdatedResponse {
583    pub subscription_id: SubscriptionId,
584    #[serde(flatten)]
585    pub payload: JsonUpdate,
586}
587
588// Common validation functions
589fn validate_price_feed_ids_or_symbols(
590    price_feed_ids: &Option<Vec<PriceFeedId>>,
591    symbols: &Option<Vec<String>>,
592) -> Result<(), &'static str> {
593    if price_feed_ids.is_none() && symbols.is_none() {
594        return Err("either price feed ids or symbols must be specified");
595    }
596    if price_feed_ids.is_some() && symbols.is_some() {
597        return Err("either price feed ids or symbols must be specified, not both");
598    }
599    Ok(())
600}
601
602fn validate_optional_nonempty_vec_has_unique_elements<T>(
603    vec: &Option<Vec<T>>,
604    empty_msg: &'static str,
605    duplicate_msg: &'static str,
606) -> Result<(), &'static str>
607where
608    T: Eq + std::hash::Hash,
609{
610    if let Some(ref items) = vec {
611        if items.is_empty() {
612            return Err(empty_msg);
613        }
614        if !items.iter().all_unique() {
615            return Err(duplicate_msg);
616        }
617    }
618    Ok(())
619}
620
621fn validate_properties(properties: &[PriceFeedProperty]) -> Result<(), &'static str> {
622    if properties.is_empty() {
623        return Err("no properties specified");
624    }
625    if !properties.iter().all_unique() {
626        return Err("duplicate properties specified");
627    }
628    Ok(())
629}
630
631fn validate_formats(formats: &[Format]) -> Result<(), &'static str> {
632    if !formats.iter().all_unique() {
633        return Err("duplicate formats or chains specified");
634    }
635    Ok(())
636}