1use {
4 crate::{
5 payload::AggregatedPriceFeedData,
6 time::{DurationUs, TimestampUs},
7 },
8 anyhow::{bail, Context},
9 derive_more::derive::{From, Into},
10 itertools::Itertools,
11 protobuf::well_known_types::duration::Duration as ProtobufDuration,
12 rust_decimal::{prelude::FromPrimitive, Decimal},
13 serde::{de::Error, Deserialize, Serialize},
14 std::{
15 fmt::Display,
16 num::NonZeroI64,
17 ops::{Add, Deref, DerefMut, Div, Sub},
18 },
19};
20
21#[derive(
22 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
23)]
24pub struct PublisherId(pub u16);
25
26#[derive(
27 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
28)]
29pub struct PriceFeedId(pub u32);
30
31#[derive(
32 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
33)]
34pub struct ChannelId(pub u8);
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
37#[repr(transparent)]
38pub struct Rate(pub i64);
39
40impl Rate {
41 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
42 let value: Decimal = value.parse()?;
43 let coef = 10i64.checked_pow(exponent).context("overflow")?;
44 let coef = Decimal::from_i64(coef).context("overflow")?;
45 let value = value.checked_mul(coef).context("overflow")?;
46 if !value.is_integer() {
47 bail!("price value is more precise than available exponent");
48 }
49 let value: i64 = value.try_into().context("overflow")?;
50 Ok(Self(value))
51 }
52
53 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
54 let value = Decimal::from_f64(value).context("overflow")?;
55 let coef = 10i64.checked_pow(exponent).context("overflow")?;
56 let coef = Decimal::from_i64(coef).context("overflow")?;
57 let value = value.checked_mul(coef).context("overflow")?;
58 let value: i64 = value.try_into().context("overflow")?;
59 Ok(Self(value))
60 }
61
62 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
63 let coef = 10i64.checked_pow(exponent).context("overflow")?;
64 let value = value.checked_mul(coef).context("overflow")?;
65 Ok(Self(value))
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
70#[repr(transparent)]
71pub struct Price(pub NonZeroI64);
72
73impl Price {
74 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
75 let coef = 10i64.checked_pow(exponent).context("overflow")?;
76 let value = value.checked_mul(coef).context("overflow")?;
77 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
78 Ok(Self(value))
79 }
80
81 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
82 let value: Decimal = value.parse()?;
83 let coef = 10i64.checked_pow(exponent).context("overflow")?;
84 let coef = Decimal::from_i64(coef).context("overflow")?;
85 let value = value.checked_mul(coef).context("overflow")?;
86 if !value.is_integer() {
87 bail!("price value is more precise than available exponent");
88 }
89 let value: i64 = value.try_into().context("overflow")?;
90 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
91 Ok(Self(value))
92 }
93
94 pub fn new(value: i64) -> anyhow::Result<Self> {
95 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
96 Ok(Self(value))
97 }
98
99 pub fn into_inner(self) -> NonZeroI64 {
100 self.0
101 }
102
103 pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
104 Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
105 }
106
107 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
108 let value = (value * 10f64.powi(exponent as i32)) as i64;
109 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
110 Ok(Self(value))
111 }
112
113 pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
114 let left_value = i128::from(self.0.get());
115 let right_value = i128::from(rhs.0.get());
116
117 let value = left_value * right_value / 10i128.pow(rhs_exponent);
118 let value = value.try_into()?;
119 NonZeroI64::new(value)
120 .context("zero price is unsupported")
121 .map(Self)
122 }
123}
124
125impl Sub<i64> for Price {
126 type Output = Option<Price>;
127
128 fn sub(self, rhs: i64) -> Self::Output {
129 let value = self.0.get().saturating_sub(rhs);
130 NonZeroI64::new(value).map(Self)
131 }
132}
133
134impl Add<i64> for Price {
135 type Output = Option<Price>;
136
137 fn add(self, rhs: i64) -> Self::Output {
138 let value = self.0.get().saturating_add(rhs);
139 NonZeroI64::new(value).map(Self)
140 }
141}
142
143impl Add<Price> for Price {
144 type Output = Option<Price>;
145 fn add(self, rhs: Price) -> Self::Output {
146 let value = self.0.get().saturating_add(rhs.0.get());
147 NonZeroI64::new(value).map(Self)
148 }
149}
150
151impl Sub<Price> for Price {
152 type Output = Option<Price>;
153 fn sub(self, rhs: Price) -> Self::Output {
154 let value = self.0.get().saturating_sub(rhs.0.get());
155 NonZeroI64::new(value).map(Self)
156 }
157}
158
159impl Div<i64> for Price {
160 type Output = Option<Price>;
161 fn div(self, rhs: i64) -> Self::Output {
162 let value = self.0.get().saturating_div(rhs);
163 NonZeroI64::new(value).map(Self)
164 }
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
168#[serde(rename_all = "camelCase")]
169pub enum PriceFeedProperty {
170 Price,
171 BestBidPrice,
172 BestAskPrice,
173 PublisherCount,
174 Exponent,
175 Confidence,
176 FundingRate,
177 FundingTimestamp,
178 FundingRateInterval,
179 }
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
183#[serde(rename_all = "camelCase")]
184pub enum DeliveryFormat {
185 #[default]
187 Json,
188 Binary,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
193#[serde(rename_all = "camelCase")]
194pub enum Format {
195 Evm,
196 Solana,
197 LeEcdsa,
198 LeUnsigned,
199}
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
202#[serde(rename_all = "camelCase")]
203pub enum JsonBinaryEncoding {
204 #[default]
205 Base64,
206 Hex,
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)]
210pub enum Channel {
211 FixedRate(FixedRate),
212}
213
214impl Serialize for Channel {
215 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
216 where
217 S: serde::Serializer,
218 {
219 match self {
220 Channel::FixedRate(fixed_rate) => {
221 if *fixed_rate == FixedRate::MIN {
222 return serializer.serialize_str("real_time");
223 }
224 serializer.serialize_str(&format!(
225 "fixed_rate@{}ms",
226 fixed_rate.duration().as_millis()
227 ))
228 }
229 }
230 }
231}
232
233pub mod channel_ids {
234 use super::ChannelId;
235
236 pub const FIXED_RATE_1: ChannelId = ChannelId(1);
237 pub const FIXED_RATE_50: ChannelId = ChannelId(2);
238 pub const FIXED_RATE_200: ChannelId = ChannelId(3);
239}
240
241impl Display for Channel {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 match self {
244 Channel::FixedRate(fixed_rate) => match *fixed_rate {
245 FixedRate::MIN => write!(f, "real_time"),
246 rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()),
247 },
248 }
249 }
250}
251
252impl Channel {
253 pub fn id(&self) -> ChannelId {
254 match self {
255 Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
256 1 => channel_ids::FIXED_RATE_1,
257 50 => channel_ids::FIXED_RATE_50,
258 200 => channel_ids::FIXED_RATE_200,
259 _ => panic!("unknown channel: {self:?}"),
260 },
261 }
262 }
263}
264
265#[test]
266fn id_supports_all_fixed_rates() {
267 for rate in FixedRate::ALL {
268 Channel::FixedRate(rate).id();
269 }
270}
271
272fn parse_channel(value: &str) -> Option<Channel> {
273 if value == "real_time" {
274 Some(Channel::FixedRate(FixedRate::MIN))
275 } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
276 let ms_value = rest.strip_suffix("ms")?;
277 Some(Channel::FixedRate(FixedRate::from_millis(
278 ms_value.parse().ok()?,
279 )?))
280 } else {
281 None
282 }
283}
284
285impl<'de> Deserialize<'de> for Channel {
286 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
287 where
288 D: serde::Deserializer<'de>,
289 {
290 let value = <String>::deserialize(deserializer)?;
291 parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
292 }
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
296pub struct FixedRate {
297 rate: DurationUs,
298}
299
300impl FixedRate {
301 pub const RATE_1_MS: Self = Self {
302 rate: DurationUs::from_millis_u32(1),
303 };
304 pub const RATE_50_MS: Self = Self {
305 rate: DurationUs::from_millis_u32(50),
306 };
307 pub const RATE_200_MS: Self = Self {
308 rate: DurationUs::from_millis_u32(200),
309 };
310
311 pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS];
316 pub const MIN: Self = Self::ALL[0];
317
318 pub fn from_millis(millis: u32) -> Option<Self> {
319 Self::ALL
320 .into_iter()
321 .find(|v| v.rate.as_millis() == u64::from(millis))
322 }
323
324 pub fn duration(self) -> DurationUs {
325 self.rate
326 }
327}
328
329impl TryFrom<DurationUs> for FixedRate {
330 type Error = anyhow::Error;
331
332 fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
333 Self::ALL
334 .into_iter()
335 .find(|v| v.rate == value)
336 .with_context(|| format!("unsupported rate: {value:?}"))
337 }
338}
339
340impl TryFrom<&ProtobufDuration> for FixedRate {
341 type Error = anyhow::Error;
342
343 fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
344 let duration = DurationUs::try_from(value)?;
345 Self::try_from(duration)
346 }
347}
348
349impl TryFrom<ProtobufDuration> for FixedRate {
350 type Error = anyhow::Error;
351
352 fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
353 TryFrom::<&ProtobufDuration>::try_from(&duration)
354 }
355}
356
357impl From<FixedRate> for DurationUs {
358 fn from(value: FixedRate) -> Self {
359 value.rate
360 }
361}
362
363impl From<FixedRate> for ProtobufDuration {
364 fn from(value: FixedRate) -> Self {
365 value.rate.into()
366 }
367}
368
369#[test]
370fn fixed_rate_values() {
371 assert!(
372 FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
373 "values must be unique and sorted"
374 );
375 for value in FixedRate::ALL {
376 assert_eq!(
377 1_000_000 % value.duration().as_micros(),
378 0,
379 "1 s must contain whole number of intervals"
380 );
381 assert_eq!(
382 value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
383 0,
384 "the interval's borders must be a subset of the minimal interval's borders"
385 );
386 }
387}
388
389#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
390#[serde(rename_all = "camelCase")]
391pub struct SubscriptionParamsRepr {
392 pub price_feed_ids: Vec<PriceFeedId>,
393 pub properties: Vec<PriceFeedProperty>,
394 #[serde(alias = "chains")]
396 pub formats: Vec<Format>,
397 #[serde(default)]
398 pub delivery_format: DeliveryFormat,
399 #[serde(default)]
400 pub json_binary_encoding: JsonBinaryEncoding,
401 #[serde(default = "default_parsed")]
404 pub parsed: bool,
405 pub channel: Channel,
406 #[serde(default)]
407 pub ignore_invalid_feed_ids: bool,
408}
409
410#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
411#[serde(rename_all = "camelCase")]
412pub struct SubscriptionParams(SubscriptionParamsRepr);
413
414impl<'de> Deserialize<'de> for SubscriptionParams {
415 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
416 where
417 D: serde::Deserializer<'de>,
418 {
419 let value = SubscriptionParamsRepr::deserialize(deserializer)?;
420 Self::new(value).map_err(Error::custom)
421 }
422}
423
424impl SubscriptionParams {
425 pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
426 if value.price_feed_ids.is_empty() {
427 return Err("no price feed ids specified");
428 }
429 if !value.price_feed_ids.iter().all_unique() {
430 return Err("duplicate price feed ids specified");
431 }
432 if !value.formats.iter().all_unique() {
433 return Err("duplicate formats or chains specified");
434 }
435 if value.properties.is_empty() {
436 return Err("no properties specified");
437 }
438 if !value.properties.iter().all_unique() {
439 return Err("duplicate properties specified");
440 }
441 Ok(Self(value))
442 }
443}
444
445impl Deref for SubscriptionParams {
446 type Target = SubscriptionParamsRepr;
447
448 fn deref(&self) -> &Self::Target {
449 &self.0
450 }
451}
452impl DerefMut for SubscriptionParams {
453 fn deref_mut(&mut self) -> &mut Self::Target {
454 &mut self.0
455 }
456}
457
458pub fn default_parsed() -> bool {
459 true
460}
461
462#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
463#[serde(rename_all = "camelCase")]
464pub struct JsonBinaryData {
465 pub encoding: JsonBinaryEncoding,
466 pub data: String,
467}
468
469#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
470#[serde(rename_all = "camelCase")]
471pub struct JsonUpdate {
472 #[serde(skip_serializing_if = "Option::is_none")]
474 pub parsed: Option<ParsedPayload>,
475 #[serde(skip_serializing_if = "Option::is_none")]
477 pub evm: Option<JsonBinaryData>,
478 #[serde(skip_serializing_if = "Option::is_none")]
480 pub solana: Option<JsonBinaryData>,
481 #[serde(skip_serializing_if = "Option::is_none")]
483 pub le_ecdsa: Option<JsonBinaryData>,
484 #[serde(skip_serializing_if = "Option::is_none")]
486 pub le_unsigned: Option<JsonBinaryData>,
487}
488
489#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
490#[serde(rename_all = "camelCase")]
491pub struct ParsedPayload {
492 #[serde(with = "crate::serde_str::timestamp")]
493 pub timestamp_us: TimestampUs,
494 pub price_feeds: Vec<ParsedFeedPayload>,
495}
496
497#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
498#[serde(rename_all = "camelCase")]
499pub struct ParsedFeedPayload {
500 pub price_feed_id: PriceFeedId,
501 #[serde(skip_serializing_if = "Option::is_none")]
502 #[serde(with = "crate::serde_str::option_price")]
503 #[serde(default)]
504 pub price: Option<Price>,
505 #[serde(skip_serializing_if = "Option::is_none")]
506 #[serde(with = "crate::serde_str::option_price")]
507 #[serde(default)]
508 pub best_bid_price: Option<Price>,
509 #[serde(skip_serializing_if = "Option::is_none")]
510 #[serde(with = "crate::serde_str::option_price")]
511 #[serde(default)]
512 pub best_ask_price: Option<Price>,
513 #[serde(skip_serializing_if = "Option::is_none")]
514 #[serde(default)]
515 pub publisher_count: Option<u16>,
516 #[serde(skip_serializing_if = "Option::is_none")]
517 #[serde(default)]
518 pub exponent: Option<i16>,
519 #[serde(skip_serializing_if = "Option::is_none")]
520 #[serde(default)]
521 pub confidence: Option<Price>,
522 #[serde(skip_serializing_if = "Option::is_none")]
523 #[serde(default)]
524 pub funding_rate: Option<Rate>,
525 #[serde(skip_serializing_if = "Option::is_none")]
526 #[serde(default)]
527 pub funding_timestamp: Option<TimestampUs>,
528 #[serde(skip_serializing_if = "Option::is_none")]
530 #[serde(default)]
531 pub funding_rate_interval: Option<DurationUs>,
532}
533
534impl ParsedFeedPayload {
535 pub fn new(
536 price_feed_id: PriceFeedId,
537 exponent: Option<i16>,
538 data: &AggregatedPriceFeedData,
539 properties: &[PriceFeedProperty],
540 ) -> Self {
541 let mut output = Self {
542 price_feed_id,
543 price: None,
544 best_bid_price: None,
545 best_ask_price: None,
546 publisher_count: None,
547 exponent: None,
548 confidence: None,
549 funding_rate: None,
550 funding_timestamp: None,
551 funding_rate_interval: None,
552 };
553 for &property in properties {
554 match property {
555 PriceFeedProperty::Price => {
556 output.price = data.price;
557 }
558 PriceFeedProperty::BestBidPrice => {
559 output.best_bid_price = data.best_bid_price;
560 }
561 PriceFeedProperty::BestAskPrice => {
562 output.best_ask_price = data.best_ask_price;
563 }
564 PriceFeedProperty::PublisherCount => {
565 output.publisher_count = Some(data.publisher_count);
566 }
567 PriceFeedProperty::Exponent => {
568 output.exponent = exponent;
569 }
570 PriceFeedProperty::Confidence => {
571 output.confidence = data.confidence;
572 }
573 PriceFeedProperty::FundingRate => {
574 output.funding_rate = data.funding_rate;
575 }
576 PriceFeedProperty::FundingTimestamp => {
577 output.funding_timestamp = data.funding_timestamp;
578 }
579 PriceFeedProperty::FundingRateInterval => {
580 output.funding_rate_interval = data.funding_rate_interval;
581 }
582 }
583 }
584 output
585 }
586
587 pub fn new_full(
588 price_feed_id: PriceFeedId,
589 exponent: Option<i16>,
590 data: &AggregatedPriceFeedData,
591 ) -> Self {
592 Self {
593 price_feed_id,
594 price: data.price,
595 best_bid_price: data.best_bid_price,
596 best_ask_price: data.best_ask_price,
597 publisher_count: Some(data.publisher_count),
598 exponent,
599 confidence: data.confidence,
600 funding_rate: data.funding_rate,
601 funding_timestamp: data.funding_timestamp,
602 funding_rate_interval: data.funding_rate_interval,
603 }
604 }
605}