1#![cfg_attr(all(doc, not(doctest)), feature(doc_auto_cfg))]
2
3use std::collections::HashMap;
8
9use serde::{Deserialize, Serialize};
10
11#[cfg(feature = "stream")]
12mod stream;
13
14#[derive(thiserror::Error, Debug)]
15#[non_exhaustive]
16pub enum Error {
17 #[error("Building request payload: {0:?}")]
18 RequestBuilder(reqwest::Error),
19
20 #[error("Executing request to server: {0:?}")]
21 Execute(reqwest::Error),
22
23 #[error("Unsuccessful response status: {0:?}")]
24 ResponseStatus(reqwest::Error),
25
26 #[error("Deserializing response body: {0:?}")]
27 Deserialize(reqwest::Error),
28
29 #[cfg(feature = "stream")]
30 #[error("From event stream: {0}")]
31 EventStream(#[from] eventsource_stream::EventStreamError<reqwest::Error>),
32
33 #[cfg(feature = "stream")]
34 #[error("Deserializing event data: {0}")]
35 EventData(serde_json::Error),
36}
37
38#[derive(Debug, Clone)]
42pub struct PythClient {
43 client: reqwest::Client,
44 url: url::Url,
45}
46
47impl PythClient {
48 pub fn new(url: url::Url) -> Self {
49 Self::new_with_client(Default::default(), url)
50 }
51
52 pub fn new_with_client(client: reqwest::Client, url: url::Url) -> Self {
53 Self { client, url }
54 }
55
56 pub async fn price_feeds(
68 &self,
69 query: String,
70 asset_type: Option<AssetType>,
71 ) -> Result<Vec<PriceFeedMetadata>, Error> {
72 #[derive(Serialize)]
73 struct Query {
74 query: String,
75 asset_type: Option<String>,
76 }
77
78 let mut url = self.url.clone();
79 url.set_path("/v2/price_feeds");
80 let request = self
81 .client
82 .get(url)
83 .query(&Query {
84 query,
85 asset_type: asset_type.map(|a| a.to_string()),
86 })
87 .build()
88 .map_err(Error::RequestBuilder)?;
89
90 let result = self
91 .client
92 .execute(request)
93 .await
94 .map_err(Error::Execute)?
95 .error_for_status()
96 .map_err(Error::ResponseStatus)?
97 .json()
98 .await
99 .map_err(Error::Deserialize)?;
100 Ok(result)
101 }
102
103 pub async fn latest_price_update(
116 &self,
117 ids: Vec<PriceIdInput>,
118 encoding: Option<EncodingType>,
119 parsed: Option<bool>,
120 ) -> Result<PriceUpdate, Error> {
121 #[derive(Serialize)]
122 struct Options {
123 encoding: Option<EncodingType>,
124 parsed: Option<bool>,
125 }
126
127 let mut url = self.url.clone();
128 url.set_path("/v2/updates/price/latest");
129
130 let mut builder = self.client.get(url);
131 for id in ids {
132 builder = builder.query(&[("ids[]", id)]);
133 }
134 let request = builder
135 .query(&Options {
136 encoding,
137 parsed: parsed.or(Some(false)),
138 })
139 .build()
140 .map_err(Error::RequestBuilder)?;
141
142 let result = self
143 .client
144 .execute(request)
145 .await
146 .map_err(Error::Execute)?
147 .error_for_status()
148 .map_err(Error::ResponseStatus)?
149 .json()
150 .await
151 .map_err(Error::Deserialize)?;
152 Ok(result)
153 }
154
155 pub async fn price_update(
170 &self,
171 publish_time: u64,
172 ids: Vec<PriceIdInput>,
173 encoding: Option<EncodingType>,
174 parsed: Option<bool>,
175 ) -> Result<PriceUpdate, Error> {
176 #[derive(Serialize)]
177 struct Options {
178 encoding: Option<EncodingType>,
179 parsed: Option<bool>,
180 }
181
182 let mut url = self.url.clone();
183 url.set_path(&format!("/v2/updates/price/{publish_time}"));
184
185 let mut builder = self.client.get(url);
186 for id in ids {
187 builder = builder.query(&[("ids[]", id)]);
188 }
189 let request = builder
190 .query(&Options {
191 encoding,
192 parsed: parsed.or(Some(false)),
193 })
194 .build()
195 .map_err(Error::RequestBuilder)?;
196
197 let result = self
198 .client
199 .execute(request)
200 .await
201 .map_err(Error::Execute)?
202 .error_for_status()
203 .map_err(Error::ResponseStatus)?
204 .json()
205 .await
206 .map_err(Error::Deserialize)?;
207 Ok(result)
208 }
209}
210
211pub type PriceIdInput = String;
226
227#[derive(Clone, Copy, Debug, strum::Display, strum::EnumString)]
229#[strum(serialize_all = "lowercase")]
230pub enum AssetType {
231 Crypto,
232 Equity,
233 Fx,
234 Metal,
235 Rates,
236}
237
238#[derive(Clone, Debug, Deserialize, Serialize)]
240pub struct PriceFeedMetadata {
241 pub id: RpcPriceIdentifier,
242 pub attributes: HashMap<String, String>,
243}
244
245#[derive(Clone, Debug, Deserialize, Serialize)]
247pub struct PriceUpdate {
248 pub binary: BinaryPriceUpdate,
249 pub parsed: Option<Vec<ParsedPriceUpdate>>,
250}
251
252#[derive(Clone, Debug, Deserialize, Serialize)]
254pub struct BinaryPriceUpdate {
255 pub data: Vec<String>,
256 pub encoding: EncodingType,
257}
258
259impl BinaryPriceUpdate {
260 pub fn decode(&self) -> Result<Vec<Vec<u8>>, BinaryPriceUpdateError> {
262 use base64::Engine as _;
263 use base64::engine::general_purpose::STANDARD as BASE64;
264
265 let bytes_vec = match self.encoding {
266 EncodingType::Hex => self
267 .data
268 .iter()
269 .map(hex::decode)
270 .collect::<Result<_, hex::FromHexError>>()?,
271 EncodingType::Base64 => self
272 .data
273 .iter()
274 .map(|d| BASE64.decode(d))
275 .collect::<Result<_, base64::DecodeError>>()?,
276 };
277 Ok(bytes_vec)
278 }
279}
280
281#[derive(Clone, Debug, Deserialize, Serialize, strum::EnumString)]
282#[serde(rename_all = "lowercase")]
283#[strum(serialize_all = "lowercase")]
284pub enum EncodingType {
285 Hex,
286 Base64,
287}
288
289#[derive(Clone, Debug, Deserialize, Serialize)]
293pub struct ParsedPriceUpdate {
294 pub id: RpcPriceIdentifier,
295 pub price: RpcPrice,
296 pub ema_price: RpcPrice,
297 pub metadata: RpcPriceFeedMetadataV2,
298}
299
300impl TryFrom<ParsedPriceUpdate> for pyth_sdk::PriceFeed {
301 type Error = hex::FromHexError;
302
303 fn try_from(value: ParsedPriceUpdate) -> Result<Self, Self::Error> {
304 let ParsedPriceUpdate {
305 id,
306 price,
307 ema_price,
308 ..
309 } = value;
310 Ok(Self::new(
311 pyth_sdk::PriceIdentifier::from_hex(id)?,
312 price,
313 ema_price,
314 ))
315 }
316}
317
318pub type RpcPriceIdentifier = String;
319
320pub type RpcPrice = pyth_sdk::Price;
321
322#[derive(Clone, Debug, Deserialize, Serialize)]
323pub struct RpcPriceFeedMetadataV2 {
324 pub prev_publish_time: Option<i64>,
325 pub proof_available_time: Option<i64>,
326 pub slot: Option<i64>,
327}
328
329#[derive(thiserror::Error, Debug)]
331pub enum BinaryPriceUpdateError {
332 #[error("Decoding hex payload: {0}")]
333 HexDecode(#[from] hex::FromHexError),
334 #[error("Decoding base64 payload: {0}")]
335 Base64Decode(#[from] base64::DecodeError),
336}
337
338#[cfg(test)]
339mod tests {
340 use std::path::{Path, PathBuf};
341 use std::sync::LazyLock;
342
343 use color_eyre::Result;
344 use color_eyre::eyre::OptionExt as _;
345
346 use super::*;
347
348 static TEST_DATA: LazyLock<PathBuf> = LazyLock::new(|| {
349 Path::new(env!("CARGO_MANIFEST_DIR"))
350 .join("tests")
351 .join("data")
352 });
353
354 #[test]
355 fn price_update_deser() -> Result<()> {
356 for file in std::fs::read_dir(TEST_DATA.join("latest_price"))? {
357 let path = file?.path();
358 let update: PriceUpdate = serde_json::from_slice(&std::fs::read(path)?)?;
359
360 for parsed in update.parsed.ok_or_eyre("Missing parsed price update")? {
361 let _: pyth_sdk::PriceFeed = parsed.try_into()?;
362 }
363 }
364 Ok(())
365 }
366}