1use {
4 crate::payload::AggregatedPriceFeedData,
5 anyhow::{bail, Context},
6 itertools::Itertools,
7 protobuf::well_known_types::timestamp::Timestamp,
8 rust_decimal::{prelude::FromPrimitive, Decimal},
9 serde::{de::Error, Deserialize, Serialize},
10 std::{
11 fmt::Display,
12 num::NonZeroI64,
13 ops::{Add, Deref, DerefMut, Div, Sub},
14 time::{SystemTime, UNIX_EPOCH},
15 },
16};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
19pub struct PublisherId(pub u16);
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
22pub struct PriceFeedId(pub u32);
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
25pub struct ChannelId(pub u8);
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
28pub struct TimestampUs(pub u64);
29
30impl TryFrom<&Timestamp> for TimestampUs {
31 type Error = anyhow::Error;
32
33 fn try_from(timestamp: &Timestamp) -> anyhow::Result<Self> {
34 let seconds_in_micros: u64 = (timestamp.seconds * 1_000_000).try_into()?;
35 let nanos_in_micros: u64 = (timestamp.nanos / 1_000).try_into()?;
36 Ok(TimestampUs(seconds_in_micros + nanos_in_micros))
37 }
38}
39
40impl TimestampUs {
41 pub fn now() -> Self {
42 let value = SystemTime::now()
43 .duration_since(UNIX_EPOCH)
44 .expect("invalid system time")
45 .as_micros()
46 .try_into()
47 .expect("invalid system time");
48 Self(value)
49 }
50
51 pub fn saturating_us_since(self, other: Self) -> u64 {
52 self.0.saturating_sub(other.0)
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
57#[repr(transparent)]
58pub struct Rate(pub i64);
59
60impl Rate {
61 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Self> {
62 let value: Decimal = value.parse()?;
63 let coef = 10i64.checked_pow(exponent).context("overflow")?;
64 let coef = Decimal::from_i64(coef).context("overflow")?;
65 let value = value.checked_mul(coef).context("overflow")?;
66 if !value.is_integer() {
67 bail!("price value is more precise than available exponent");
68 }
69 let value: i64 = value.try_into().context("overflow")?;
70 Ok(Self(value))
71 }
72
73 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
74 let value = Decimal::from_f64(value).context("overflow")?;
75 let coef = 10i64.checked_pow(exponent).context("overflow")?;
76 let coef = Decimal::from_i64(coef).context("overflow")?;
77 let value = value.checked_mul(coef).context("overflow")?;
78 let value: i64 = value.try_into().context("overflow")?;
79 Ok(Self(value))
80 }
81
82 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Self> {
83 let coef = 10i64.checked_pow(exponent).context("overflow")?;
84 let value = value.checked_mul(coef).context("overflow")?;
85 Ok(Self(value))
86 }
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
90#[repr(transparent)]
91pub struct Price(pub NonZeroI64);
92
93impl Price {
94 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
95 let coef = 10i64.checked_pow(exponent).context("overflow")?;
96 let value = value.checked_mul(coef).context("overflow")?;
97 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
98 Ok(Self(value))
99 }
100
101 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
102 let value: Decimal = value.parse()?;
103 let coef = 10i64.checked_pow(exponent).context("overflow")?;
104 let coef = Decimal::from_i64(coef).context("overflow")?;
105 let value = value.checked_mul(coef).context("overflow")?;
106 if !value.is_integer() {
107 bail!("price value is more precise than available exponent");
108 }
109 let value: i64 = value.try_into().context("overflow")?;
110 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
111 Ok(Self(value))
112 }
113
114 pub fn new(value: i64) -> anyhow::Result<Self> {
115 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
116 Ok(Self(value))
117 }
118
119 pub fn into_inner(self) -> NonZeroI64 {
120 self.0
121 }
122
123 pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
124 Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
125 }
126
127 pub fn from_f64(value: f64, exponent: u32) -> anyhow::Result<Self> {
128 let value = (value * 10f64.powi(exponent as i32)) as i64;
129 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
130 Ok(Self(value))
131 }
132
133 pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
134 let left_value = i128::from(self.0.get());
135 let right_value = i128::from(rhs.0.get());
136
137 let value = left_value * right_value / 10i128.pow(rhs_exponent);
138 let value = value.try_into()?;
139 NonZeroI64::new(value)
140 .context("zero price is unsupported")
141 .map(Self)
142 }
143}
144
145impl Sub<i64> for Price {
146 type Output = Option<Price>;
147
148 fn sub(self, rhs: i64) -> Self::Output {
149 let value = self.0.get().saturating_sub(rhs);
150 NonZeroI64::new(value).map(Self)
151 }
152}
153
154impl Add<i64> for Price {
155 type Output = Option<Price>;
156
157 fn add(self, rhs: i64) -> Self::Output {
158 let value = self.0.get().saturating_add(rhs);
159 NonZeroI64::new(value).map(Self)
160 }
161}
162
163impl Add<Price> for Price {
164 type Output = Option<Price>;
165 fn add(self, rhs: Price) -> Self::Output {
166 let value = self.0.get().saturating_add(rhs.0.get());
167 NonZeroI64::new(value).map(Self)
168 }
169}
170
171impl Sub<Price> for Price {
172 type Output = Option<Price>;
173 fn sub(self, rhs: Price) -> Self::Output {
174 let value = self.0.get().saturating_sub(rhs.0.get());
175 NonZeroI64::new(value).map(Self)
176 }
177}
178
179impl Div<i64> for Price {
180 type Output = Option<Price>;
181 fn div(self, rhs: i64) -> Self::Output {
182 let value = self.0.get().saturating_div(rhs);
183 NonZeroI64::new(value).map(Self)
184 }
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
188#[serde(rename_all = "camelCase")]
189pub enum PriceFeedProperty {
190 Price,
191 BestBidPrice,
192 BestAskPrice,
193 PublisherCount,
194 Exponent,
195 Confidence,
196 FundingRate,
197 FundingTimestamp,
198 }
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
202#[serde(rename_all = "camelCase")]
203pub enum DeliveryFormat {
204 #[default]
206 Json,
207 Binary,
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
212#[serde(rename_all = "camelCase")]
213pub enum Format {
214 Evm,
215 Solana,
216 LeEcdsa,
217 LeUnsigned,
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
221#[serde(rename_all = "camelCase")]
222pub enum JsonBinaryEncoding {
223 #[default]
224 Base64,
225 Hex,
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
229pub enum Channel {
230 FixedRate(FixedRate),
231}
232
233impl Serialize for Channel {
234 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
235 where
236 S: serde::Serializer,
237 {
238 match self {
239 Channel::FixedRate(fixed_rate) => {
240 if *fixed_rate == FixedRate::MIN {
241 return serializer.serialize_str("real_time");
242 }
243 serializer.serialize_str(&format!("fixed_rate@{}ms", fixed_rate.value_ms()))
244 }
245 }
246 }
247}
248
249pub mod channel_ids {
250 use super::ChannelId;
251
252 pub const FIXED_RATE_1: ChannelId = ChannelId(1);
253 pub const FIXED_RATE_50: ChannelId = ChannelId(2);
254 pub const FIXED_RATE_200: ChannelId = ChannelId(3);
255}
256
257impl Display for Channel {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 match self {
260 Channel::FixedRate(fixed_rate) => match *fixed_rate {
261 FixedRate::MIN => write!(f, "real_time"),
262 rate => write!(f, "fixed_rate@{}ms", rate.value_ms()),
263 },
264 }
265 }
266}
267
268impl Channel {
269 pub fn id(&self) -> ChannelId {
270 match self {
271 Channel::FixedRate(fixed_rate) => match fixed_rate.value_ms() {
272 1 => channel_ids::FIXED_RATE_1,
273 50 => channel_ids::FIXED_RATE_50,
274 200 => channel_ids::FIXED_RATE_200,
275 _ => panic!("unknown channel: {self:?}"),
276 },
277 }
278 }
279}
280
281#[test]
282fn id_supports_all_fixed_rates() {
283 for rate in FixedRate::ALL {
284 Channel::FixedRate(rate).id();
285 }
286}
287
288fn parse_channel(value: &str) -> Option<Channel> {
289 if value == "real_time" {
290 Some(Channel::FixedRate(FixedRate::MIN))
291 } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
292 let ms_value = rest.strip_suffix("ms")?;
293 Some(Channel::FixedRate(FixedRate::from_ms(
294 ms_value.parse().ok()?,
295 )?))
296 } else {
297 None
298 }
299}
300
301impl<'de> Deserialize<'de> for Channel {
302 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
303 where
304 D: serde::Deserializer<'de>,
305 {
306 let value = <String>::deserialize(deserializer)?;
307 parse_channel(&value).ok_or_else(|| D::Error::custom("unknown channel"))
308 }
309}
310
311#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
312pub struct FixedRate {
313 ms: u32,
314}
315
316impl FixedRate {
317 pub const ALL: [Self; 3] = [Self { ms: 1 }, Self { ms: 50 }, Self { ms: 200 }];
322 pub const MIN: Self = Self::ALL[0];
323
324 pub fn from_ms(value: u32) -> Option<Self> {
325 Self::ALL.into_iter().find(|v| v.ms == value)
326 }
327
328 pub fn value_ms(self) -> u32 {
329 self.ms
330 }
331
332 pub fn value_us(self) -> u64 {
333 (self.ms * 1000).into()
334 }
335}
336
337#[test]
338fn fixed_rate_values() {
339 assert!(
340 FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
341 "values must be unique and sorted"
342 );
343 for value in FixedRate::ALL {
344 assert!(
345 1000 % value.ms == 0,
346 "1 s must contain whole number of intervals"
347 );
348 assert!(
349 value.value_us() % FixedRate::MIN.value_us() == 0,
350 "the interval's borders must be a subset of the minimal interval's borders"
351 );
352 }
353}
354
355#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
356#[serde(rename_all = "camelCase")]
357pub struct SubscriptionParamsRepr {
358 pub price_feed_ids: Vec<PriceFeedId>,
359 pub properties: Vec<PriceFeedProperty>,
360 #[serde(alias = "chains")]
362 pub formats: Vec<Format>,
363 #[serde(default)]
364 pub delivery_format: DeliveryFormat,
365 #[serde(default)]
366 pub json_binary_encoding: JsonBinaryEncoding,
367 #[serde(default = "default_parsed")]
370 pub parsed: bool,
371 pub channel: Channel,
372 #[serde(default)]
373 pub ignore_invalid_feed_ids: bool,
374}
375
376#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
377#[serde(rename_all = "camelCase")]
378pub struct SubscriptionParams(SubscriptionParamsRepr);
379
380impl<'de> Deserialize<'de> for SubscriptionParams {
381 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
382 where
383 D: serde::Deserializer<'de>,
384 {
385 let value = SubscriptionParamsRepr::deserialize(deserializer)?;
386 Self::new(value).map_err(D::Error::custom)
387 }
388}
389
390impl SubscriptionParams {
391 pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
392 if value.price_feed_ids.is_empty() {
393 return Err("no price feed ids specified");
394 }
395 if !value.price_feed_ids.iter().all_unique() {
396 return Err("duplicate price feed ids specified");
397 }
398 if !value.formats.iter().all_unique() {
399 return Err("duplicate formats or chains specified");
400 }
401 if value.properties.is_empty() {
402 return Err("no properties specified");
403 }
404 if !value.properties.iter().all_unique() {
405 return Err("duplicate properties specified");
406 }
407 Ok(Self(value))
408 }
409}
410
411impl Deref for SubscriptionParams {
412 type Target = SubscriptionParamsRepr;
413
414 fn deref(&self) -> &Self::Target {
415 &self.0
416 }
417}
418impl DerefMut for SubscriptionParams {
419 fn deref_mut(&mut self) -> &mut Self::Target {
420 &mut self.0
421 }
422}
423
424pub fn default_parsed() -> bool {
425 true
426}
427
428#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
429#[serde(rename_all = "camelCase")]
430pub struct JsonBinaryData {
431 pub encoding: JsonBinaryEncoding,
432 pub data: String,
433}
434
435#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
436#[serde(rename_all = "camelCase")]
437pub struct JsonUpdate {
438 #[serde(skip_serializing_if = "Option::is_none")]
440 pub parsed: Option<ParsedPayload>,
441 #[serde(skip_serializing_if = "Option::is_none")]
443 pub evm: Option<JsonBinaryData>,
444 #[serde(skip_serializing_if = "Option::is_none")]
446 pub solana: Option<JsonBinaryData>,
447 #[serde(skip_serializing_if = "Option::is_none")]
449 pub le_ecdsa: Option<JsonBinaryData>,
450 #[serde(skip_serializing_if = "Option::is_none")]
452 pub le_unsigned: Option<JsonBinaryData>,
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
456#[serde(rename_all = "camelCase")]
457pub struct ParsedPayload {
458 #[serde(with = "crate::serde_str::timestamp")]
459 pub timestamp_us: TimestampUs,
460 pub price_feeds: Vec<ParsedFeedPayload>,
461}
462
463#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
464#[serde(rename_all = "camelCase")]
465pub struct ParsedFeedPayload {
466 pub price_feed_id: PriceFeedId,
467 #[serde(skip_serializing_if = "Option::is_none")]
468 #[serde(with = "crate::serde_str::option_price")]
469 #[serde(default)]
470 pub price: Option<Price>,
471 #[serde(skip_serializing_if = "Option::is_none")]
472 #[serde(with = "crate::serde_str::option_price")]
473 #[serde(default)]
474 pub best_bid_price: Option<Price>,
475 #[serde(skip_serializing_if = "Option::is_none")]
476 #[serde(with = "crate::serde_str::option_price")]
477 #[serde(default)]
478 pub best_ask_price: Option<Price>,
479 #[serde(skip_serializing_if = "Option::is_none")]
480 #[serde(default)]
481 pub publisher_count: Option<u16>,
482 #[serde(skip_serializing_if = "Option::is_none")]
483 #[serde(default)]
484 pub exponent: Option<i16>,
485 #[serde(skip_serializing_if = "Option::is_none")]
486 #[serde(default)]
487 pub confidence: Option<Price>,
488 #[serde(skip_serializing_if = "Option::is_none")]
489 #[serde(default)]
490 pub funding_rate: Option<Rate>,
491 #[serde(skip_serializing_if = "Option::is_none")]
492 #[serde(default)]
493 pub funding_timestamp: Option<TimestampUs>,
494 }
496
497impl ParsedFeedPayload {
498 pub fn new(
499 price_feed_id: PriceFeedId,
500 exponent: Option<i16>,
501 data: &AggregatedPriceFeedData,
502 properties: &[PriceFeedProperty],
503 ) -> Self {
504 let mut output = Self {
505 price_feed_id,
506 price: None,
507 best_bid_price: None,
508 best_ask_price: None,
509 publisher_count: None,
510 exponent: None,
511 confidence: None,
512 funding_rate: None,
513 funding_timestamp: None,
514 };
515 for &property in properties {
516 match property {
517 PriceFeedProperty::Price => {
518 output.price = data.price;
519 }
520 PriceFeedProperty::BestBidPrice => {
521 output.best_bid_price = data.best_bid_price;
522 }
523 PriceFeedProperty::BestAskPrice => {
524 output.best_ask_price = data.best_ask_price;
525 }
526 PriceFeedProperty::PublisherCount => {
527 output.publisher_count = Some(data.publisher_count);
528 }
529 PriceFeedProperty::Exponent => {
530 output.exponent = exponent;
531 }
532 PriceFeedProperty::Confidence => {
533 output.confidence = data.confidence;
534 }
535 PriceFeedProperty::FundingRate => {
536 output.funding_rate = data.funding_rate;
537 }
538 PriceFeedProperty::FundingTimestamp => {
539 output.funding_timestamp = data.funding_timestamp;
540 }
541 }
542 }
543 output
544 }
545
546 pub fn new_full(
547 price_feed_id: PriceFeedId,
548 exponent: Option<i16>,
549 data: &AggregatedPriceFeedData,
550 ) -> Self {
551 Self {
552 price_feed_id,
553 price: data.price,
554 best_bid_price: data.best_bid_price,
555 best_ask_price: data.best_ask_price,
556 publisher_count: Some(data.publisher_count),
557 exponent,
558 confidence: data.confidence,
559 funding_rate: data.funding_rate,
560 funding_timestamp: data.funding_timestamp,
561 }
562 }
563}