1use {
4 crate::{
5 payload::AggregatedPriceFeedData,
6 time::{DurationUs, TimestampUs},
7 },
8 anyhow::{bail, Context},
9 derive_more::derive::{From, Into},
10 itertools::Itertools,
11 protobuf::well_known_types::duration::Duration as ProtobufDuration,
12 rust_decimal::{prelude::FromPrimitive, Decimal},
13 serde::{de::Error, Deserialize, Serialize},
14 std::{
15 fmt::Display,
16 num::NonZeroI64,
17 ops::{Add, Deref, DerefMut, Div, Sub},
18 },
19};
20
21#[derive(
22 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
23)]
24pub struct PublisherId(pub u16);
25
26#[derive(
27 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
28)]
29pub struct PriceFeedId(pub u32);
30
31#[derive(
32 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, From, Into,
33)]
34pub struct ChannelId(pub u8);
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
37#[repr(transparent)]
38pub struct Rate(pub i64);
39
40impl Rate {
41 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
42 let value: Decimal = value.parse()?;
43 let coef = 10i64.checked_pow(exponent).context("overflow")?;
44 let coef = Decimal::from_i64(coef).context("overflow")?;
45 let value = value.checked_mul(coef).context("overflow")?;
46 if !value.is_integer() {
47 bail!("price value is more precise than available exponent");
48 }
49 let value: i64 = value.try_into().context("overflow")?;
50 Ok(Self(value))
51 }
52
53 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
54 let value = Decimal::from_f64(value).context("overflow")?;
55 let coef = 10i64.checked_pow(exponent).context("overflow")?;
56 let coef = Decimal::from_i64(coef).context("overflow")?;
57 let value = value.checked_mul(coef).context("overflow")?;
58 let value: i64 = value.try_into().context("overflow")?;
59 Ok(Self(value))
60 }
61
62 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
63 let coef = 10i64.checked_pow(exponent).context("overflow")?;
64 let value = value.checked_mul(coef).context("overflow")?;
65 Ok(Self(value))
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
70#[repr(transparent)]
71pub struct Price(pub NonZeroI64);
72
73impl Price {
74 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
75 let coef = 10i64.checked_pow(exponent).context("overflow")?;
76 let value = value.checked_mul(coef).context("overflow")?;
77 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
78 Ok(Self(value))
79 }
80
81 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
82 let value: Decimal = value.parse()?;
83 let coef = 10i64.checked_pow(exponent).context("overflow")?;
84 let coef = Decimal::from_i64(coef).context("overflow")?;
85 let value = value.checked_mul(coef).context("overflow")?;
86 if !value.is_integer() {
87 bail!("price value is more precise than available exponent");
88 }
89 let value: i64 = value.try_into().context("overflow")?;
90 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
91 Ok(Self(value))
92 }
93
94 pub fn new(value: i64) -> anyhow::Result<Self> {
95 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
96 Ok(Self(value))
97 }
98
99 pub fn into_inner(self) -> NonZeroI64 {
100 self.0
101 }
102
103 pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
104 Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
105 }
106
107 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
108 let value = (value * 10f64.powi(exponent as i32)) as i64;
109 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
110 Ok(Self(value))
111 }
112
113 pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
114 let left_value = i128::from(self.0.get());
115 let right_value = i128::from(rhs.0.get());
116
117 let value = left_value * right_value / 10i128.pow(rhs_exponent);
118 let value = value.try_into()?;
119 NonZeroI64::new(value)
120 .context("zero price is unsupported")
121 .map(Self)
122 }
123}
124
125impl Sub<i64> for Price {
126 type Output = Option<Price>;
127
128 fn sub(self, rhs: i64) -> Self::Output {
129 let value = self.0.get().saturating_sub(rhs);
130 NonZeroI64::new(value).map(Self)
131 }
132}
133
134impl Add<i64> for Price {
135 type Output = Option<Price>;
136
137 fn add(self, rhs: i64) -> Self::Output {
138 let value = self.0.get().saturating_add(rhs);
139 NonZeroI64::new(value).map(Self)
140 }
141}
142
143impl Add<Price> for Price {
144 type Output = Option<Price>;
145 fn add(self, rhs: Price) -> Self::Output {
146 let value = self.0.get().saturating_add(rhs.0.get());
147 NonZeroI64::new(value).map(Self)
148 }
149}
150
151impl Sub<Price> for Price {
152 type Output = Option<Price>;
153 fn sub(self, rhs: Price) -> Self::Output {
154 let value = self.0.get().saturating_sub(rhs.0.get());
155 NonZeroI64::new(value).map(Self)
156 }
157}
158
159impl Div<i64> for Price {
160 type Output = Option<Price>;
161 fn div(self, rhs: i64) -> Self::Output {
162 let value = self.0.get().saturating_div(rhs);
163 NonZeroI64::new(value).map(Self)
164 }
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
168#[serde(rename_all = "camelCase")]
169pub enum PriceFeedProperty {
170 Price,
171 BestBidPrice,
172 BestAskPrice,
173 PublisherCount,
174 Exponent,
175 Confidence,
176 FundingRate,
177 FundingTimestamp,
178 }
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
182#[serde(rename_all = "camelCase")]
183pub enum DeliveryFormat {
184 #[default]
186 Json,
187 Binary,
189}
190
191#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
192#[serde(rename_all = "camelCase")]
193pub enum Format {
194 Evm,
195 Solana,
196 LeEcdsa,
197 LeUnsigned,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
201#[serde(rename_all = "camelCase")]
202pub enum JsonBinaryEncoding {
203 #[default]
204 Base64,
205 Hex,
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)]
209pub enum Channel {
210 FixedRate(FixedRate),
211}
212
213impl Serialize for Channel {
214 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
215 where
216 S: serde::Serializer,
217 {
218 match self {
219 Channel::FixedRate(fixed_rate) => {
220 if *fixed_rate == FixedRate::MIN {
221 return serializer.serialize_str("real_time");
222 }
223 serializer.serialize_str(&format!(
224 "fixed_rate@{}ms",
225 fixed_rate.duration().as_millis()
226 ))
227 }
228 }
229 }
230}
231
232pub mod channel_ids {
233 use super::ChannelId;
234
235 pub const FIXED_RATE_1: ChannelId = ChannelId(1);
236 pub const FIXED_RATE_50: ChannelId = ChannelId(2);
237 pub const FIXED_RATE_200: ChannelId = ChannelId(3);
238}
239
240impl Display for Channel {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 Channel::FixedRate(fixed_rate) => match *fixed_rate {
244 FixedRate::MIN => write!(f, "real_time"),
245 rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()),
246 },
247 }
248 }
249}
250
251impl Channel {
252 pub fn id(&self) -> ChannelId {
253 match self {
254 Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
255 1 => channel_ids::FIXED_RATE_1,
256 50 => channel_ids::FIXED_RATE_50,
257 200 => channel_ids::FIXED_RATE_200,
258 _ => panic!("unknown channel: {self:?}"),
259 },
260 }
261 }
262}
263
264#[test]
265fn id_supports_all_fixed_rates() {
266 for rate in FixedRate::ALL {
267 Channel::FixedRate(rate).id();
268 }
269}
270
271fn parse_channel(value: &str) -> Option<Channel> {
272 if value == "real_time" {
273 Some(Channel::FixedRate(FixedRate::MIN))
274 } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
275 let ms_value = rest.strip_suffix("ms")?;
276 Some(Channel::FixedRate(FixedRate::from_millis(
277 ms_value.parse().ok()?,
278 )?))
279 } else {
280 None
281 }
282}
283
284impl<'de> Deserialize<'de> for Channel {
285 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
286 where
287 D: serde::Deserializer<'de>,
288 {
289 let value = <String>::deserialize(deserializer)?;
290 parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
291 }
292}
293
294#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
295pub struct FixedRate {
296 rate: DurationUs,
297}
298
299impl FixedRate {
300 pub const RATE_1_MS: Self = Self {
301 rate: DurationUs::from_millis_u32(1),
302 };
303 pub const RATE_50_MS: Self = Self {
304 rate: DurationUs::from_millis_u32(50),
305 };
306 pub const RATE_200_MS: Self = Self {
307 rate: DurationUs::from_millis_u32(200),
308 };
309
310 pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS];
315 pub const MIN: Self = Self::ALL[0];
316
317 pub fn from_millis(millis: u32) -> Option<Self> {
318 Self::ALL
319 .into_iter()
320 .find(|v| v.rate.as_millis() == u64::from(millis))
321 }
322
323 pub fn duration(self) -> DurationUs {
324 self.rate
325 }
326}
327
328impl TryFrom<DurationUs> for FixedRate {
329 type Error = anyhow::Error;
330
331 fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
332 Self::ALL
333 .into_iter()
334 .find(|v| v.rate == value)
335 .with_context(|| format!("unsupported rate: {value:?}"))
336 }
337}
338
339impl TryFrom<&ProtobufDuration> for FixedRate {
340 type Error = anyhow::Error;
341
342 fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
343 let duration = DurationUs::try_from(value)?;
344 Self::try_from(duration)
345 }
346}
347
348impl TryFrom<ProtobufDuration> for FixedRate {
349 type Error = anyhow::Error;
350
351 fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
352 TryFrom::<&ProtobufDuration>::try_from(&duration)
353 }
354}
355
356impl From<FixedRate> for DurationUs {
357 fn from(value: FixedRate) -> Self {
358 value.rate
359 }
360}
361
362impl From<FixedRate> for ProtobufDuration {
363 fn from(value: FixedRate) -> Self {
364 value.rate.into()
365 }
366}
367
368#[test]
369fn fixed_rate_values() {
370 assert!(
371 FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
372 "values must be unique and sorted"
373 );
374 for value in FixedRate::ALL {
375 assert_eq!(
376 1_000_000 % value.duration().as_micros(),
377 0,
378 "1 s must contain whole number of intervals"
379 );
380 assert_eq!(
381 value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
382 0,
383 "the interval's borders must be a subset of the minimal interval's borders"
384 );
385 }
386}
387
388#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
389#[serde(rename_all = "camelCase")]
390pub struct SubscriptionParamsRepr {
391 pub price_feed_ids: Vec<PriceFeedId>,
392 pub properties: Vec<PriceFeedProperty>,
393 #[serde(alias = "chains")]
395 pub formats: Vec<Format>,
396 #[serde(default)]
397 pub delivery_format: DeliveryFormat,
398 #[serde(default)]
399 pub json_binary_encoding: JsonBinaryEncoding,
400 #[serde(default = "default_parsed")]
403 pub parsed: bool,
404 pub channel: Channel,
405 #[serde(default)]
406 pub ignore_invalid_feed_ids: bool,
407}
408
409#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
410#[serde(rename_all = "camelCase")]
411pub struct SubscriptionParams(SubscriptionParamsRepr);
412
413impl<'de> Deserialize<'de> for SubscriptionParams {
414 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
415 where
416 D: serde::Deserializer<'de>,
417 {
418 let value = SubscriptionParamsRepr::deserialize(deserializer)?;
419 Self::new(value).map_err(Error::custom)
420 }
421}
422
423impl SubscriptionParams {
424 pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
425 if value.price_feed_ids.is_empty() {
426 return Err("no price feed ids specified");
427 }
428 if !value.price_feed_ids.iter().all_unique() {
429 return Err("duplicate price feed ids specified");
430 }
431 if !value.formats.iter().all_unique() {
432 return Err("duplicate formats or chains specified");
433 }
434 if value.properties.is_empty() {
435 return Err("no properties specified");
436 }
437 if !value.properties.iter().all_unique() {
438 return Err("duplicate properties specified");
439 }
440 Ok(Self(value))
441 }
442}
443
444impl Deref for SubscriptionParams {
445 type Target = SubscriptionParamsRepr;
446
447 fn deref(&self) -> &Self::Target {
448 &self.0
449 }
450}
451impl DerefMut for SubscriptionParams {
452 fn deref_mut(&mut self) -> &mut Self::Target {
453 &mut self.0
454 }
455}
456
457pub fn default_parsed() -> bool {
458 true
459}
460
461#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
462#[serde(rename_all = "camelCase")]
463pub struct JsonBinaryData {
464 pub encoding: JsonBinaryEncoding,
465 pub data: String,
466}
467
468#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
469#[serde(rename_all = "camelCase")]
470pub struct JsonUpdate {
471 #[serde(skip_serializing_if = "Option::is_none")]
473 pub parsed: Option<ParsedPayload>,
474 #[serde(skip_serializing_if = "Option::is_none")]
476 pub evm: Option<JsonBinaryData>,
477 #[serde(skip_serializing_if = "Option::is_none")]
479 pub solana: Option<JsonBinaryData>,
480 #[serde(skip_serializing_if = "Option::is_none")]
482 pub le_ecdsa: Option<JsonBinaryData>,
483 #[serde(skip_serializing_if = "Option::is_none")]
485 pub le_unsigned: Option<JsonBinaryData>,
486}
487
488#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
489#[serde(rename_all = "camelCase")]
490pub struct ParsedPayload {
491 #[serde(with = "crate::serde_str::timestamp")]
492 pub timestamp_us: TimestampUs,
493 pub price_feeds: Vec<ParsedFeedPayload>,
494}
495
496#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
497#[serde(rename_all = "camelCase")]
498pub struct ParsedFeedPayload {
499 pub price_feed_id: PriceFeedId,
500 #[serde(skip_serializing_if = "Option::is_none")]
501 #[serde(with = "crate::serde_str::option_price")]
502 #[serde(default)]
503 pub price: Option<Price>,
504 #[serde(skip_serializing_if = "Option::is_none")]
505 #[serde(with = "crate::serde_str::option_price")]
506 #[serde(default)]
507 pub best_bid_price: Option<Price>,
508 #[serde(skip_serializing_if = "Option::is_none")]
509 #[serde(with = "crate::serde_str::option_price")]
510 #[serde(default)]
511 pub best_ask_price: Option<Price>,
512 #[serde(skip_serializing_if = "Option::is_none")]
513 #[serde(default)]
514 pub publisher_count: Option<u16>,
515 #[serde(skip_serializing_if = "Option::is_none")]
516 #[serde(default)]
517 pub exponent: Option<i16>,
518 #[serde(skip_serializing_if = "Option::is_none")]
519 #[serde(default)]
520 pub confidence: Option<Price>,
521 #[serde(skip_serializing_if = "Option::is_none")]
522 #[serde(default)]
523 pub funding_rate: Option<Rate>,
524 #[serde(skip_serializing_if = "Option::is_none")]
525 #[serde(default)]
526 pub funding_timestamp: Option<TimestampUs>,
527 }
529
530impl ParsedFeedPayload {
531 pub fn new(
532 price_feed_id: PriceFeedId,
533 exponent: Option<i16>,
534 data: &AggregatedPriceFeedData,
535 properties: &[PriceFeedProperty],
536 ) -> Self {
537 let mut output = Self {
538 price_feed_id,
539 price: None,
540 best_bid_price: None,
541 best_ask_price: None,
542 publisher_count: None,
543 exponent: None,
544 confidence: None,
545 funding_rate: None,
546 funding_timestamp: None,
547 };
548 for &property in properties {
549 match property {
550 PriceFeedProperty::Price => {
551 output.price = data.price;
552 }
553 PriceFeedProperty::BestBidPrice => {
554 output.best_bid_price = data.best_bid_price;
555 }
556 PriceFeedProperty::BestAskPrice => {
557 output.best_ask_price = data.best_ask_price;
558 }
559 PriceFeedProperty::PublisherCount => {
560 output.publisher_count = Some(data.publisher_count);
561 }
562 PriceFeedProperty::Exponent => {
563 output.exponent = exponent;
564 }
565 PriceFeedProperty::Confidence => {
566 output.confidence = data.confidence;
567 }
568 PriceFeedProperty::FundingRate => {
569 output.funding_rate = data.funding_rate;
570 }
571 PriceFeedProperty::FundingTimestamp => {
572 output.funding_timestamp = data.funding_timestamp;
573 }
574 }
575 }
576 output
577 }
578
579 pub fn new_full(
580 price_feed_id: PriceFeedId,
581 exponent: Option<i16>,
582 data: &AggregatedPriceFeedData,
583 ) -> Self {
584 Self {
585 price_feed_id,
586 price: data.price,
587 best_bid_price: data.best_bid_price,
588 best_ask_price: data.best_ask_price,
589 publisher_count: Some(data.publisher_count),
590 exponent,
591 confidence: data.confidence,
592 funding_rate: data.funding_rate,
593 funding_timestamp: data.funding_timestamp,
594 }
595 }
596}