1use {
4 crate::{
5 payload::AggregatedPriceFeedData,
6 time::{DurationUs, TimestampUs},
7 },
8 anyhow::{bail, Context},
9 derive_more::derive::From,
10 itertools::Itertools,
11 protobuf::well_known_types::duration::Duration as ProtobufDuration,
12 rust_decimal::{prelude::FromPrimitive, Decimal},
13 serde::{de::Error, Deserialize, Serialize},
14 std::{
15 fmt::Display,
16 num::NonZeroI64,
17 ops::{Add, Deref, DerefMut, Div, Sub},
18 },
19};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
22pub struct PublisherId(pub u16);
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
25pub struct PriceFeedId(pub u32);
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
28pub struct ChannelId(pub u8);
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
31#[repr(transparent)]
32pub struct Rate(pub i64);
33
34impl Rate {
35 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
36 let value: Decimal = value.parse()?;
37 let coef = 10i64.checked_pow(exponent).context("overflow")?;
38 let coef = Decimal::from_i64(coef).context("overflow")?;
39 let value = value.checked_mul(coef).context("overflow")?;
40 if !value.is_integer() {
41 bail!("price value is more precise than available exponent");
42 }
43 let value: i64 = value.try_into().context("overflow")?;
44 Ok(Self(value))
45 }
46
47 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
48 let value = Decimal::from_f64(value).context("overflow")?;
49 let coef = 10i64.checked_pow(exponent).context("overflow")?;
50 let coef = Decimal::from_i64(coef).context("overflow")?;
51 let value = value.checked_mul(coef).context("overflow")?;
52 let value: i64 = value.try_into().context("overflow")?;
53 Ok(Self(value))
54 }
55
56 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
57 let coef = 10i64.checked_pow(exponent).context("overflow")?;
58 let value = value.checked_mul(coef).context("overflow")?;
59 Ok(Self(value))
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
64#[repr(transparent)]
65pub struct Price(pub NonZeroI64);
66
67impl Price {
68 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
69 let coef = 10i64.checked_pow(exponent).context("overflow")?;
70 let value = value.checked_mul(coef).context("overflow")?;
71 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
72 Ok(Self(value))
73 }
74
75 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
76 let value: Decimal = value.parse()?;
77 let coef = 10i64.checked_pow(exponent).context("overflow")?;
78 let coef = Decimal::from_i64(coef).context("overflow")?;
79 let value = value.checked_mul(coef).context("overflow")?;
80 if !value.is_integer() {
81 bail!("price value is more precise than available exponent");
82 }
83 let value: i64 = value.try_into().context("overflow")?;
84 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
85 Ok(Self(value))
86 }
87
88 pub fn new(value: i64) -> anyhow::Result<Self> {
89 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
90 Ok(Self(value))
91 }
92
93 pub fn into_inner(self) -> NonZeroI64 {
94 self.0
95 }
96
97 pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
98 Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
99 }
100
101 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
102 let value = (value * 10f64.powi(exponent as i32)) as i64;
103 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
104 Ok(Self(value))
105 }
106
107 pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
108 let left_value = i128::from(self.0.get());
109 let right_value = i128::from(rhs.0.get());
110
111 let value = left_value * right_value / 10i128.pow(rhs_exponent);
112 let value = value.try_into()?;
113 NonZeroI64::new(value)
114 .context("zero price is unsupported")
115 .map(Self)
116 }
117}
118
119impl Sub<i64> for Price {
120 type Output = Option<Price>;
121
122 fn sub(self, rhs: i64) -> Self::Output {
123 let value = self.0.get().saturating_sub(rhs);
124 NonZeroI64::new(value).map(Self)
125 }
126}
127
128impl Add<i64> for Price {
129 type Output = Option<Price>;
130
131 fn add(self, rhs: i64) -> Self::Output {
132 let value = self.0.get().saturating_add(rhs);
133 NonZeroI64::new(value).map(Self)
134 }
135}
136
137impl Add<Price> for Price {
138 type Output = Option<Price>;
139 fn add(self, rhs: Price) -> Self::Output {
140 let value = self.0.get().saturating_add(rhs.0.get());
141 NonZeroI64::new(value).map(Self)
142 }
143}
144
145impl Sub<Price> for Price {
146 type Output = Option<Price>;
147 fn sub(self, rhs: Price) -> Self::Output {
148 let value = self.0.get().saturating_sub(rhs.0.get());
149 NonZeroI64::new(value).map(Self)
150 }
151}
152
153impl Div<i64> for Price {
154 type Output = Option<Price>;
155 fn div(self, rhs: i64) -> Self::Output {
156 let value = self.0.get().saturating_div(rhs);
157 NonZeroI64::new(value).map(Self)
158 }
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")]
163pub enum PriceFeedProperty {
164 Price,
165 BestBidPrice,
166 BestAskPrice,
167 PublisherCount,
168 Exponent,
169 Confidence,
170 FundingRate,
171 FundingTimestamp,
172 }
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
176#[serde(rename_all = "camelCase")]
177pub enum DeliveryFormat {
178 #[default]
180 Json,
181 Binary,
183}
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
186#[serde(rename_all = "camelCase")]
187pub enum Format {
188 Evm,
189 Solana,
190 LeEcdsa,
191 LeUnsigned,
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
195#[serde(rename_all = "camelCase")]
196pub enum JsonBinaryEncoding {
197 #[default]
198 Base64,
199 Hex,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, From)]
203pub enum Channel {
204 FixedRate(FixedRate),
205}
206
207impl Serialize for Channel {
208 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
209 where
210 S: serde::Serializer,
211 {
212 match self {
213 Channel::FixedRate(fixed_rate) => {
214 if *fixed_rate == FixedRate::MIN {
215 return serializer.serialize_str("real_time");
216 }
217 serializer.serialize_str(&format!(
218 "fixed_rate@{}ms",
219 fixed_rate.duration().as_millis()
220 ))
221 }
222 }
223 }
224}
225
226pub mod channel_ids {
227 use super::ChannelId;
228
229 pub const FIXED_RATE_1: ChannelId = ChannelId(1);
230 pub const FIXED_RATE_50: ChannelId = ChannelId(2);
231 pub const FIXED_RATE_200: ChannelId = ChannelId(3);
232}
233
234impl Display for Channel {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 match self {
237 Channel::FixedRate(fixed_rate) => match *fixed_rate {
238 FixedRate::MIN => write!(f, "real_time"),
239 rate => write!(f, "fixed_rate@{}ms", rate.duration().as_millis()),
240 },
241 }
242 }
243}
244
245impl Channel {
246 pub fn id(&self) -> ChannelId {
247 match self {
248 Channel::FixedRate(fixed_rate) => match fixed_rate.duration().as_millis() {
249 1 => channel_ids::FIXED_RATE_1,
250 50 => channel_ids::FIXED_RATE_50,
251 200 => channel_ids::FIXED_RATE_200,
252 _ => panic!("unknown channel: {self:?}"),
253 },
254 }
255 }
256}
257
258#[test]
259fn id_supports_all_fixed_rates() {
260 for rate in FixedRate::ALL {
261 Channel::FixedRate(rate).id();
262 }
263}
264
265fn parse_channel(value: &str) -> Option<Channel> {
266 if value == "real_time" {
267 Some(Channel::FixedRate(FixedRate::MIN))
268 } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
269 let ms_value = rest.strip_suffix("ms")?;
270 Some(Channel::FixedRate(FixedRate::from_millis(
271 ms_value.parse().ok()?,
272 )?))
273 } else {
274 None
275 }
276}
277
278impl<'de> Deserialize<'de> for Channel {
279 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
280 where
281 D: serde::Deserializer<'de>,
282 {
283 let value = <String>::deserialize(deserializer)?;
284 parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
285 }
286}
287
288#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
289pub struct FixedRate {
290 rate: DurationUs,
291}
292
293impl FixedRate {
294 pub const RATE_1_MS: Self = Self {
295 rate: DurationUs::from_millis_u32(1),
296 };
297 pub const RATE_50_MS: Self = Self {
298 rate: DurationUs::from_millis_u32(50),
299 };
300 pub const RATE_200_MS: Self = Self {
301 rate: DurationUs::from_millis_u32(200),
302 };
303
304 pub const ALL: [Self; 3] = [Self::RATE_1_MS, Self::RATE_50_MS, Self::RATE_200_MS];
309 pub const MIN: Self = Self::ALL[0];
310
311 pub fn from_millis(millis: u32) -> Option<Self> {
312 Self::ALL
313 .into_iter()
314 .find(|v| v.rate.as_millis() == u64::from(millis))
315 }
316
317 pub fn duration(self) -> DurationUs {
318 self.rate
319 }
320}
321
322impl TryFrom<DurationUs> for FixedRate {
323 type Error = anyhow::Error;
324
325 fn try_from(value: DurationUs) -> Result<Self, Self::Error> {
326 Self::ALL
327 .into_iter()
328 .find(|v| v.rate == value)
329 .with_context(|| format!("unsupported rate: {value:?}"))
330 }
331}
332
333impl TryFrom<&ProtobufDuration> for FixedRate {
334 type Error = anyhow::Error;
335
336 fn try_from(value: &ProtobufDuration) -> Result<Self, Self::Error> {
337 let duration = DurationUs::try_from(value)?;
338 Self::try_from(duration)
339 }
340}
341
342impl TryFrom<ProtobufDuration> for FixedRate {
343 type Error = anyhow::Error;
344
345 fn try_from(duration: ProtobufDuration) -> anyhow::Result<Self> {
346 TryFrom::<&ProtobufDuration>::try_from(&duration)
347 }
348}
349
350impl From<FixedRate> for DurationUs {
351 fn from(value: FixedRate) -> Self {
352 value.rate
353 }
354}
355
356impl From<FixedRate> for ProtobufDuration {
357 fn from(value: FixedRate) -> Self {
358 value.rate.into()
359 }
360}
361
362#[test]
363fn fixed_rate_values() {
364 assert!(
365 FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
366 "values must be unique and sorted"
367 );
368 for value in FixedRate::ALL {
369 assert_eq!(
370 1_000_000 % value.duration().as_micros(),
371 0,
372 "1 s must contain whole number of intervals"
373 );
374 assert_eq!(
375 value.duration().as_micros() % FixedRate::MIN.duration().as_micros(),
376 0,
377 "the interval's borders must be a subset of the minimal interval's borders"
378 );
379 }
380}
381
382#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
383#[serde(rename_all = "camelCase")]
384pub struct SubscriptionParamsRepr {
385 pub price_feed_ids: Vec<PriceFeedId>,
386 pub properties: Vec<PriceFeedProperty>,
387 #[serde(alias = "chains")]
389 pub formats: Vec<Format>,
390 #[serde(default)]
391 pub delivery_format: DeliveryFormat,
392 #[serde(default)]
393 pub json_binary_encoding: JsonBinaryEncoding,
394 #[serde(default = "default_parsed")]
397 pub parsed: bool,
398 pub channel: Channel,
399 #[serde(default)]
400 pub ignore_invalid_feed_ids: bool,
401}
402
403#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
404#[serde(rename_all = "camelCase")]
405pub struct SubscriptionParams(SubscriptionParamsRepr);
406
407impl<'de> Deserialize<'de> for SubscriptionParams {
408 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
409 where
410 D: serde::Deserializer<'de>,
411 {
412 let value = SubscriptionParamsRepr::deserialize(deserializer)?;
413 Self::new(value).map_err(Error::custom)
414 }
415}
416
417impl SubscriptionParams {
418 pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
419 if value.price_feed_ids.is_empty() {
420 return Err("no price feed ids specified");
421 }
422 if !value.price_feed_ids.iter().all_unique() {
423 return Err("duplicate price feed ids specified");
424 }
425 if !value.formats.iter().all_unique() {
426 return Err("duplicate formats or chains specified");
427 }
428 if value.properties.is_empty() {
429 return Err("no properties specified");
430 }
431 if !value.properties.iter().all_unique() {
432 return Err("duplicate properties specified");
433 }
434 Ok(Self(value))
435 }
436}
437
438impl Deref for SubscriptionParams {
439 type Target = SubscriptionParamsRepr;
440
441 fn deref(&self) -> &Self::Target {
442 &self.0
443 }
444}
445impl DerefMut for SubscriptionParams {
446 fn deref_mut(&mut self) -> &mut Self::Target {
447 &mut self.0
448 }
449}
450
451pub fn default_parsed() -> bool {
452 true
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
456#[serde(rename_all = "camelCase")]
457pub struct JsonBinaryData {
458 pub encoding: JsonBinaryEncoding,
459 pub data: String,
460}
461
462#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
463#[serde(rename_all = "camelCase")]
464pub struct JsonUpdate {
465 #[serde(skip_serializing_if = "Option::is_none")]
467 pub parsed: Option<ParsedPayload>,
468 #[serde(skip_serializing_if = "Option::is_none")]
470 pub evm: Option<JsonBinaryData>,
471 #[serde(skip_serializing_if = "Option::is_none")]
473 pub solana: Option<JsonBinaryData>,
474 #[serde(skip_serializing_if = "Option::is_none")]
476 pub le_ecdsa: Option<JsonBinaryData>,
477 #[serde(skip_serializing_if = "Option::is_none")]
479 pub le_unsigned: Option<JsonBinaryData>,
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
483#[serde(rename_all = "camelCase")]
484pub struct ParsedPayload {
485 #[serde(with = "crate::serde_str::timestamp")]
486 pub timestamp_us: TimestampUs,
487 pub price_feeds: Vec<ParsedFeedPayload>,
488}
489
490#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
491#[serde(rename_all = "camelCase")]
492pub struct ParsedFeedPayload {
493 pub price_feed_id: PriceFeedId,
494 #[serde(skip_serializing_if = "Option::is_none")]
495 #[serde(with = "crate::serde_str::option_price")]
496 #[serde(default)]
497 pub price: Option<Price>,
498 #[serde(skip_serializing_if = "Option::is_none")]
499 #[serde(with = "crate::serde_str::option_price")]
500 #[serde(default)]
501 pub best_bid_price: Option<Price>,
502 #[serde(skip_serializing_if = "Option::is_none")]
503 #[serde(with = "crate::serde_str::option_price")]
504 #[serde(default)]
505 pub best_ask_price: Option<Price>,
506 #[serde(skip_serializing_if = "Option::is_none")]
507 #[serde(default)]
508 pub publisher_count: Option<u16>,
509 #[serde(skip_serializing_if = "Option::is_none")]
510 #[serde(default)]
511 pub exponent: Option<i16>,
512 #[serde(skip_serializing_if = "Option::is_none")]
513 #[serde(default)]
514 pub confidence: Option<Price>,
515 #[serde(skip_serializing_if = "Option::is_none")]
516 #[serde(default)]
517 pub funding_rate: Option<Rate>,
518 #[serde(skip_serializing_if = "Option::is_none")]
519 #[serde(default)]
520 pub funding_timestamp: Option<TimestampUs>,
521 }
523
524impl ParsedFeedPayload {
525 pub fn new(
526 price_feed_id: PriceFeedId,
527 exponent: Option<i16>,
528 data: &AggregatedPriceFeedData,
529 properties: &[PriceFeedProperty],
530 ) -> Self {
531 let mut output = Self {
532 price_feed_id,
533 price: None,
534 best_bid_price: None,
535 best_ask_price: None,
536 publisher_count: None,
537 exponent: None,
538 confidence: None,
539 funding_rate: None,
540 funding_timestamp: None,
541 };
542 for &property in properties {
543 match property {
544 PriceFeedProperty::Price => {
545 output.price = data.price;
546 }
547 PriceFeedProperty::BestBidPrice => {
548 output.best_bid_price = data.best_bid_price;
549 }
550 PriceFeedProperty::BestAskPrice => {
551 output.best_ask_price = data.best_ask_price;
552 }
553 PriceFeedProperty::PublisherCount => {
554 output.publisher_count = Some(data.publisher_count);
555 }
556 PriceFeedProperty::Exponent => {
557 output.exponent = exponent;
558 }
559 PriceFeedProperty::Confidence => {
560 output.confidence = data.confidence;
561 }
562 PriceFeedProperty::FundingRate => {
563 output.funding_rate = data.funding_rate;
564 }
565 PriceFeedProperty::FundingTimestamp => {
566 output.funding_timestamp = data.funding_timestamp;
567 }
568 }
569 }
570 output
571 }
572
573 pub fn new_full(
574 price_feed_id: PriceFeedId,
575 exponent: Option<i16>,
576 data: &AggregatedPriceFeedData,
577 ) -> Self {
578 Self {
579 price_feed_id,
580 price: data.price,
581 best_bid_price: data.best_bid_price,
582 best_ask_price: data.best_ask_price,
583 publisher_count: Some(data.publisher_count),
584 exponent,
585 confidence: data.confidence,
586 funding_rate: data.funding_rate,
587 funding_timestamp: data.funding_timestamp,
588 }
589 }
590}