1use std::collections::HashMap;
6
7use serde::{Deserialize, Serialize};
8
9#[cfg(feature = "stream")]
10mod stream;
11
12#[derive(thiserror::Error, Debug)]
13#[non_exhaustive]
14pub enum Error {
15 #[error("Building request payload: {0:?}")]
16 RequestBuilder(reqwest::Error),
17
18 #[error("Executing request to server: {0:?}")]
19 Execute(reqwest::Error),
20
21 #[error("Unsuccessful response status: {0:?}")]
22 ResponseStatus(reqwest::Error),
23
24 #[error("Deserializing response body: {0:?}")]
25 Deserialize(reqwest::Error),
26
27 #[cfg(feature = "stream")]
28 #[error("From event stream: {0}")]
29 EventStream(#[from] eventsource_stream::EventStreamError<reqwest::Error>),
30
31 #[cfg(feature = "stream")]
32 #[error("Deserializing event data: {0}")]
33 EventData(serde_json::Error),
34}
35
36#[derive(Debug, Clone)]
40pub struct PythClient {
41 client: reqwest::Client,
42 url: url::Url,
43}
44
45impl PythClient {
46 pub fn new(url: url::Url) -> Self {
47 Self::new_with_client(Default::default(), url)
48 }
49
50 pub fn new_with_client(client: reqwest::Client, url: url::Url) -> Self {
51 Self { client, url }
52 }
53
54 pub async fn price_feeds(
66 &self,
67 query: String,
68 asset_type: Option<AssetType>,
69 ) -> Result<Vec<PriceFeedMetadata>, Error> {
70 #[derive(Serialize)]
71 struct Query {
72 query: String,
73 asset_type: Option<String>,
74 }
75
76 let mut url = self.url.clone();
77 url.set_path("/v2/price_feeds");
78 let request = self
79 .client
80 .get(url)
81 .query(&Query {
82 query,
83 asset_type: asset_type.map(|a| a.to_string()),
84 })
85 .build()
86 .map_err(Error::RequestBuilder)?;
87
88 let result = self
89 .client
90 .execute(request)
91 .await
92 .map_err(Error::Execute)?
93 .error_for_status()
94 .map_err(Error::ResponseStatus)?
95 .json()
96 .await
97 .map_err(Error::Deserialize)?;
98 Ok(result)
99 }
100
101 pub async fn latest_price_update(
114 &self,
115 ids: Vec<PriceIdInput>,
116 encoding: Option<EncodingType>,
117 parsed: Option<bool>,
118 ) -> Result<PriceUpdate, Error> {
119 #[derive(Serialize)]
120 struct Options {
121 encoding: Option<EncodingType>,
122 parsed: Option<bool>,
123 }
124
125 let mut url = self.url.clone();
126 url.set_path("/v2/updates/price/latest");
127
128 let mut builder = self.client.get(url);
129 for id in ids {
130 builder = builder.query(&[("ids[]", id)]);
131 }
132 let request = builder
133 .query(&Options {
134 encoding,
135 parsed: parsed.or(Some(false)),
136 })
137 .build()
138 .map_err(Error::RequestBuilder)?;
139
140 let result = self
141 .client
142 .execute(request)
143 .await
144 .map_err(Error::Execute)?
145 .error_for_status()
146 .map_err(Error::ResponseStatus)?
147 .json()
148 .await
149 .map_err(Error::Deserialize)?;
150 Ok(result)
151 }
152
153 pub async fn price_update(
168 &self,
169 publish_time: u64,
170 ids: Vec<PriceIdInput>,
171 encoding: Option<EncodingType>,
172 parsed: Option<bool>,
173 ) -> Result<PriceUpdate, Error> {
174 #[derive(Serialize)]
175 struct Options {
176 encoding: Option<EncodingType>,
177 parsed: Option<bool>,
178 }
179
180 let mut url = self.url.clone();
181 url.set_path(&format!("/v2/updates/price/{publish_time}"));
182
183 let mut builder = self.client.get(url);
184 for id in ids {
185 builder = builder.query(&[("ids[]", id)]);
186 }
187 let request = builder
188 .query(&Options {
189 encoding,
190 parsed: parsed.or(Some(false)),
191 })
192 .build()
193 .map_err(Error::RequestBuilder)?;
194
195 let result = self
196 .client
197 .execute(request)
198 .await
199 .map_err(Error::Execute)?
200 .error_for_status()
201 .map_err(Error::ResponseStatus)?
202 .json()
203 .await
204 .map_err(Error::Deserialize)?;
205 Ok(result)
206 }
207}
208
209pub type PriceIdInput = String;
224
225#[derive(Clone, Copy, Debug, strum::Display, strum::EnumString)]
227#[strum(serialize_all = "lowercase")]
228pub enum AssetType {
229 Crypto,
230 Equity,
231 Fx,
232 Metal,
233 Rates,
234}
235
236#[derive(Clone, Debug, Deserialize, Serialize)]
238pub struct PriceFeedMetadata {
239 pub id: RpcPriceIdentifier,
240 pub attributes: HashMap<String, String>,
241}
242
243#[derive(Clone, Debug, Deserialize, Serialize)]
245pub struct PriceUpdate {
246 pub binary: BinaryPriceUpdate,
247 pub parsed: Option<Vec<ParsedPriceUpdate>>,
248}
249
250#[derive(Clone, Debug, Deserialize, Serialize)]
252pub struct BinaryPriceUpdate {
253 pub data: Vec<String>,
254 pub encoding: EncodingType,
255}
256
257impl BinaryPriceUpdate {
258 pub fn decode(&self) -> Result<Vec<Vec<u8>>, BinaryPriceUpdateError> {
260 use base64::Engine as _;
261 use base64::engine::general_purpose::STANDARD as BASE64;
262
263 let bytes_vec = match self.encoding {
264 EncodingType::Hex => self
265 .data
266 .iter()
267 .map(hex::decode)
268 .collect::<Result<_, hex::FromHexError>>()?,
269 EncodingType::Base64 => self
270 .data
271 .iter()
272 .map(|d| BASE64.decode(d))
273 .collect::<Result<_, base64::DecodeError>>()?,
274 };
275 Ok(bytes_vec)
276 }
277}
278
279#[derive(Clone, Debug, Deserialize, Serialize, strum::EnumString)]
280#[serde(rename_all = "lowercase")]
281#[strum(serialize_all = "lowercase")]
282pub enum EncodingType {
283 Hex,
284 Base64,
285}
286
287#[derive(Clone, Debug, Deserialize, Serialize)]
291pub struct ParsedPriceUpdate {
292 pub id: RpcPriceIdentifier,
293 pub price: RpcPrice,
294 pub ema_price: RpcPrice,
295 pub metadata: RpcPriceFeedMetadataV2,
296}
297
298impl TryFrom<ParsedPriceUpdate> for pyth_sdk::PriceFeed {
299 type Error = hex::FromHexError;
300
301 fn try_from(value: ParsedPriceUpdate) -> Result<Self, Self::Error> {
302 let ParsedPriceUpdate {
303 id,
304 price,
305 ema_price,
306 ..
307 } = value;
308 Ok(Self::new(
309 pyth_sdk::PriceIdentifier::from_hex(id)?,
310 price,
311 ema_price,
312 ))
313 }
314}
315
316pub type RpcPriceIdentifier = String;
317
318pub type RpcPrice = pyth_sdk::Price;
319
320#[derive(Clone, Debug, Deserialize, Serialize)]
321pub struct RpcPriceFeedMetadataV2 {
322 pub prev_publish_time: Option<i64>,
323 pub proof_available_time: Option<i64>,
324 pub slot: Option<i64>,
325}
326
327#[derive(thiserror::Error, Debug)]
329pub enum BinaryPriceUpdateError {
330 #[error("Decoding hex payload: {0}")]
331 HexDecode(#[from] hex::FromHexError),
332 #[error("Decoding base64 payload: {0}")]
333 Base64Decode(#[from] base64::DecodeError),
334}
335
336#[cfg(test)]
337mod tests {
338 use std::path::{Path, PathBuf};
339 use std::sync::LazyLock;
340
341 use color_eyre::Result;
342 use color_eyre::eyre::OptionExt as _;
343
344 use super::*;
345
346 static TEST_DATA: LazyLock<PathBuf> = LazyLock::new(|| {
347 Path::new(env!("CARGO_MANIFEST_DIR"))
348 .join("tests")
349 .join("data")
350 });
351
352 #[test]
353 fn price_update_deser() -> Result<()> {
354 for file in std::fs::read_dir(TEST_DATA.join("latest_price"))? {
355 let path = file?.path();
356 let update: PriceUpdate = serde_json::from_slice(&std::fs::read(path)?)?;
357
358 for parsed in update.parsed.ok_or_eyre("Missing parsed price update")? {
359 let _: pyth_sdk::PriceFeed = parsed.try_into()?;
360 }
361 }
362 Ok(())
363 }
364}