1use {
4 crate::payload::AggregatedPriceFeedData,
5 anyhow::{bail, Context},
6 itertools::Itertools,
7 rust_decimal::{prelude::FromPrimitive, Decimal},
8 serde::{de::Error, Deserialize, Serialize},
9 std::{
10 num::NonZeroI64,
11 ops::{Add, Deref, DerefMut, Div, Sub},
12 time::{SystemTime, UNIX_EPOCH},
13 },
14};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct PublisherId(pub u16);
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct PriceFeedId(pub u32);
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub struct ChannelId(pub u8);
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
26pub struct TimestampUs(pub u64);
27
28impl TimestampUs {
29 pub fn now() -> Self {
30 let value = SystemTime::now()
31 .duration_since(UNIX_EPOCH)
32 .expect("invalid system time")
33 .as_micros()
34 .try_into()
35 .expect("invalid system time");
36 Self(value)
37 }
38
39 pub fn saturating_us_since(self, other: Self) -> u64 {
40 self.0.saturating_sub(other.0)
41 }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
45#[repr(transparent)]
46pub struct Price(pub NonZeroI64);
47
48impl Price {
49 pub fn from_integer(value: i64, exponent: u32) -> anyhow::Result<Price> {
50 let coef = 10i64.checked_pow(exponent).context("overflow")?;
51 let value = value.checked_mul(coef).context("overflow")?;
52 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
53 Ok(Self(value))
54 }
55
56 pub fn parse_str(value: &str, exponent: u32) -> anyhow::Result<Price> {
57 let value: Decimal = value.parse()?;
58 let coef = 10i64.checked_pow(exponent).context("overflow")?;
59 let coef = Decimal::from_i64(coef).context("overflow")?;
60 let value = value.checked_mul(coef).context("overflow")?;
61 if !value.is_integer() {
62 bail!("price value is more precise than available exponent");
63 }
64 let value: i64 = value.try_into().context("overflow")?;
65 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
66 Ok(Self(value))
67 }
68
69 pub fn new(value: i64) -> anyhow::Result<Self> {
70 let value = NonZeroI64::new(value).context("zero price is unsupported")?;
71 Ok(Self(value))
72 }
73
74 pub fn into_inner(self) -> NonZeroI64 {
75 self.0
76 }
77
78 pub fn to_f64(self, exponent: u32) -> anyhow::Result<f64> {
79 Ok(self.0.get() as f64 / 10i64.checked_pow(exponent).context("overflow")? as f64)
80 }
81
82 pub fn mul(self, rhs: Price, rhs_exponent: u32) -> anyhow::Result<Price> {
83 let left_value = i128::from(self.0.get());
84 let right_value = i128::from(rhs.0.get());
85
86 let value = left_value * right_value / 10i128.pow(rhs_exponent);
87 let value = value.try_into()?;
88 NonZeroI64::new(value)
89 .context("zero price is unsupported")
90 .map(Self)
91 }
92}
93
94impl Sub<i64> for Price {
95 type Output = Option<Price>;
96
97 fn sub(self, rhs: i64) -> Self::Output {
98 let value = self.0.get().saturating_sub(rhs);
99 NonZeroI64::new(value).map(Self)
100 }
101}
102
103impl Add<i64> for Price {
104 type Output = Option<Price>;
105
106 fn add(self, rhs: i64) -> Self::Output {
107 let value = self.0.get().saturating_add(rhs);
108 NonZeroI64::new(value).map(Self)
109 }
110}
111
112impl Add<Price> for Price {
113 type Output = Option<Price>;
114 fn add(self, rhs: Price) -> Self::Output {
115 let value = self.0.get().saturating_add(rhs.0.get());
116 NonZeroI64::new(value).map(Self)
117 }
118}
119
120impl Sub<Price> for Price {
121 type Output = Option<Price>;
122 fn sub(self, rhs: Price) -> Self::Output {
123 let value = self.0.get().saturating_sub(rhs.0.get());
124 NonZeroI64::new(value).map(Self)
125 }
126}
127
128impl Div<i64> for Price {
129 type Output = Option<Price>;
130 fn div(self, rhs: i64) -> Self::Output {
131 let value = self.0.get().saturating_div(rhs);
132 NonZeroI64::new(value).map(Self)
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub enum PriceFeedProperty {
139 Price,
140 BestBidPrice,
141 BestAskPrice,
142 PublisherCount,
143 Exponent,
144 Confidence,
145 }
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
149#[serde(rename_all = "camelCase")]
150pub enum DeliveryFormat {
151 #[default]
153 Json,
154 Binary,
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
159#[serde(rename_all = "camelCase")]
160pub enum Chain {
161 Evm,
162 Solana,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
166#[serde(rename_all = "camelCase")]
167pub enum JsonBinaryEncoding {
168 #[default]
169 Base64,
170 Hex,
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
174pub enum Channel {
175 FixedRate(FixedRate),
176}
177
178impl Serialize for Channel {
179 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
180 where
181 S: serde::Serializer,
182 {
183 match self {
184 Channel::FixedRate(fixed_rate) => {
185 if *fixed_rate == FixedRate::MIN {
186 return serializer.serialize_str("real_time");
187 }
188 serializer.serialize_str(&format!("fixed_rate@{}ms", fixed_rate.value_ms()))
189 }
190 }
191 }
192}
193
194mod channel_ids {
195 use super::ChannelId;
196
197 pub const FIXED_RATE_1: ChannelId = ChannelId(1);
198 pub const FIXED_RATE_50: ChannelId = ChannelId(2);
199 pub const FIXED_RATE_200: ChannelId = ChannelId(3);
200}
201
202impl Channel {
203 pub fn id(&self) -> ChannelId {
204 match self {
205 Channel::FixedRate(fixed_rate) => match fixed_rate.value_ms() {
206 1 => channel_ids::FIXED_RATE_1,
207 50 => channel_ids::FIXED_RATE_50,
208 200 => channel_ids::FIXED_RATE_200,
209 _ => panic!("unknown channel: {self:?}"),
210 },
211 }
212 }
213}
214
215#[test]
216fn id_supports_all_fixed_rates() {
217 for rate in FixedRate::ALL {
218 Channel::FixedRate(rate).id();
219 }
220}
221
222fn parse_channel(value: &str) -> Option<Channel> {
223 if value == "real_time" {
224 Some(Channel::FixedRate(FixedRate::MIN))
225 } else if let Some(rest) = value.strip_prefix("fixed_rate@") {
226 let ms_value = rest.strip_suffix("ms")?;
227 Some(Channel::FixedRate(FixedRate::from_ms(
228 ms_value.parse().ok()?,
229 )?))
230 } else {
231 None
232 }
233}
234
235impl<'de> Deserialize<'de> for Channel {
236 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
237 where
238 D: serde::Deserializer<'de>,
239 {
240 let value = <String>::deserialize(deserializer)?;
241 parse_channel(&value).ok_or_else(|| D::Error::custom("unknown channel"))
242 }
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
246pub struct FixedRate {
247 ms: u32,
248}
249
250impl FixedRate {
251 pub const ALL: [Self; 3] = [Self { ms: 1 }, Self { ms: 50 }, Self { ms: 200 }];
256 pub const MIN: Self = Self::ALL[0];
257
258 pub fn from_ms(value: u32) -> Option<Self> {
259 Self::ALL.into_iter().find(|v| v.ms == value)
260 }
261
262 pub fn value_ms(self) -> u32 {
263 self.ms
264 }
265
266 pub fn value_us(self) -> u64 {
267 (self.ms * 1000).into()
268 }
269}
270
271#[test]
272fn fixed_rate_values() {
273 assert!(
274 FixedRate::ALL.windows(2).all(|w| w[0] < w[1]),
275 "values must be unique and sorted"
276 );
277 for value in FixedRate::ALL {
278 assert!(
279 1000 % value.ms == 0,
280 "1 s must contain whole number of intervals"
281 );
282 assert!(
283 value.value_us() % FixedRate::MIN.value_us() == 0,
284 "the interval's borders must be a subset of the minimal interval's borders"
285 );
286 }
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
290#[serde(rename_all = "camelCase")]
291pub struct SubscriptionParamsRepr {
292 pub price_feed_ids: Vec<PriceFeedId>,
293 pub properties: Vec<PriceFeedProperty>,
294 pub chains: Vec<Chain>,
295 #[serde(default)]
296 pub delivery_format: DeliveryFormat,
297 #[serde(default)]
298 pub json_binary_encoding: JsonBinaryEncoding,
299 #[serde(default = "default_parsed")]
302 pub parsed: bool,
303 pub channel: Channel,
304}
305
306#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
307#[serde(rename_all = "camelCase")]
308pub struct SubscriptionParams(SubscriptionParamsRepr);
309
310impl<'de> Deserialize<'de> for SubscriptionParams {
311 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
312 where
313 D: serde::Deserializer<'de>,
314 {
315 let value = SubscriptionParamsRepr::deserialize(deserializer)?;
316 Self::new(value).map_err(D::Error::custom)
317 }
318}
319
320impl SubscriptionParams {
321 pub fn new(value: SubscriptionParamsRepr) -> Result<Self, &'static str> {
322 if value.price_feed_ids.is_empty() {
323 return Err("no price feed ids specified");
324 }
325 if !value.price_feed_ids.iter().all_unique() {
326 return Err("duplicate price feed ids specified");
327 }
328 if !value.chains.iter().all_unique() {
329 return Err("duplicate chains specified");
330 }
331 if value.properties.is_empty() {
332 return Err("no properties specified");
333 }
334 if !value.properties.iter().all_unique() {
335 return Err("duplicate properties specified");
336 }
337 Ok(Self(value))
338 }
339}
340
341impl Deref for SubscriptionParams {
342 type Target = SubscriptionParamsRepr;
343
344 fn deref(&self) -> &Self::Target {
345 &self.0
346 }
347}
348impl DerefMut for SubscriptionParams {
349 fn deref_mut(&mut self) -> &mut Self::Target {
350 &mut self.0
351 }
352}
353
354pub fn default_parsed() -> bool {
355 true
356}
357
358#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
359#[serde(rename_all = "camelCase")]
360pub struct JsonBinaryData {
361 pub encoding: JsonBinaryEncoding,
362 pub data: String,
363}
364
365#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
366#[serde(rename_all = "camelCase")]
367pub struct JsonUpdate {
368 #[serde(skip_serializing_if = "Option::is_none")]
369 pub parsed: Option<ParsedPayload>,
370 #[serde(skip_serializing_if = "Option::is_none")]
371 pub evm: Option<JsonBinaryData>,
372 #[serde(skip_serializing_if = "Option::is_none")]
373 pub solana: Option<JsonBinaryData>,
374}
375
376#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
377#[serde(rename_all = "camelCase")]
378pub struct ParsedPayload {
379 #[serde(with = "crate::serde_str::timestamp")]
380 pub timestamp_us: TimestampUs,
381 pub price_feeds: Vec<ParsedFeedPayload>,
382}
383
384#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
385#[serde(rename_all = "camelCase")]
386pub struct NatsPayload {
387 pub payload: ParsedPayload,
388 pub channel: Channel,
389}
390
391#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
392#[serde(rename_all = "camelCase")]
393pub struct ParsedFeedPayload {
394 pub price_feed_id: PriceFeedId,
395 #[serde(skip_serializing_if = "Option::is_none")]
396 #[serde(with = "crate::serde_str::option_price")]
397 #[serde(default)]
398 pub price: Option<Price>,
399 #[serde(skip_serializing_if = "Option::is_none")]
400 #[serde(with = "crate::serde_str::option_price")]
401 #[serde(default)]
402 pub best_bid_price: Option<Price>,
403 #[serde(skip_serializing_if = "Option::is_none")]
404 #[serde(with = "crate::serde_str::option_price")]
405 #[serde(default)]
406 pub best_ask_price: Option<Price>,
407 #[serde(skip_serializing_if = "Option::is_none")]
408 #[serde(default)]
409 pub publisher_count: Option<u16>,
410 #[serde(skip_serializing_if = "Option::is_none")]
411 #[serde(default)]
412 pub exponent: Option<i16>,
413 #[serde(skip_serializing_if = "Option::is_none")]
414 #[serde(default)]
415 pub confidence: Option<Price>,
416 }
418
419impl ParsedFeedPayload {
420 pub fn new(
421 price_feed_id: PriceFeedId,
422 exponent: Option<i16>,
423 data: &AggregatedPriceFeedData,
424 properties: &[PriceFeedProperty],
425 ) -> Self {
426 let mut output = Self {
427 price_feed_id,
428 price: None,
429 best_bid_price: None,
430 best_ask_price: None,
431 publisher_count: None,
432 exponent: None,
433 confidence: None,
434 };
435 for &property in properties {
436 match property {
437 PriceFeedProperty::Price => {
438 output.price = data.price;
439 }
440 PriceFeedProperty::BestBidPrice => {
441 output.best_bid_price = data.best_bid_price;
442 }
443 PriceFeedProperty::BestAskPrice => {
444 output.best_ask_price = data.best_ask_price;
445 }
446 PriceFeedProperty::PublisherCount => {
447 output.publisher_count = data.publisher_count;
448 }
449 PriceFeedProperty::Exponent => {
450 output.exponent = exponent;
451 }
452 PriceFeedProperty::Confidence => {
453 output.confidence = data.confidence;
454 }
455 }
456 }
457 output
458 }
459
460 pub fn new_full(
461 price_feed_id: PriceFeedId,
462 exponent: Option<i16>,
463 data: &AggregatedPriceFeedData,
464 ) -> Self {
465 Self {
466 price_feed_id,
467 price: data.price,
468 best_bid_price: data.best_bid_price,
469 best_ask_price: data.best_ask_price,
470 publisher_count: data.publisher_count,
471 exponent,
472 confidence: data.confidence,
473 }
474 }
475}