pyth_hermes_rs/
lib.rs

1//! Rust library for querying deployments of the Pyth Hermes API
2
3pub mod types;
4
5use {
6    futures_util::StreamExt,
7    reqwest::{Client, Error},
8    reqwest_eventsource::{Error as EventSourceError, Event, EventSource},
9    std::sync::Arc,
10    tokio::task::JoinHandle,
11    types::*,
12};
13
14pub struct HermesClient {
15    http: reqwest::Client,
16    base_url: Arc<str>,
17}
18
19impl HermesClient {
20    pub fn new(base_url: impl Into<String>) -> Self {
21        Self {
22            http: Client::new(),
23            base_url: Arc::from(base_url.into()),
24        }
25    }
26
27    /// Get the latest price updates by price feed id.
28    pub async fn get_latest_price_feeds(&self, ids: &[&str]) -> Result<Vec<RpcPriceFeed>, Error> {
29        let url = format!("{}/v2/updates/price/latest", self.base_url);
30        let mut req = self.http.get(&url);
31        for id in ids {
32            req = req.query(&[("ids[]", *id)]);
33        }
34        let resp = req.send().await?.error_for_status()?;
35        let feeds = resp.json::<PriceUpdate>().await?;
36        Ok(feeds.parsed.unwrap_or_default())
37    }
38
39    /// This endpoint fetches all price feeds from the Pyth network. It can be filtered by asset type and query string.
40    ///
41    /// # Arguments
42    ///
43    /// * `query` - If provided results will be filtered for price feeds whose symbol contains the query string
44    /// * `asset_type` - If provides filter by asset type. Values are crypto, equity, fx, metal, rates
45    pub async fn get_price_feeds_metadata(
46        &self,
47        query: Option<&str>,
48        asset_type: Option<&str>,
49    ) -> Result<Vec<PriceFeedMetadata>, Error> {
50        let url = format!("{}/v2/price_feeds", self.base_url);
51        let req = self
52            .http
53            .get(&url)
54            .query(&[("query", query), ("asset_type", asset_type)]);
55        let resp = req.send().await?.error_for_status()?;
56        resp.json::<Vec<PriceFeedMetadata>>().await
57    }
58
59    /// Get the latest price updates by price feed id, with a publish time greater than `publish_time`
60    ///
61    /// # Arguments
62    ///
63    /// * `publish_time` - Only return price feed updates that are greater than or equal to this timestamp
64    pub async fn get_price_updates_by_time(
65        &self,
66        publish_time: i64,
67        ids: &[&str],
68    ) -> Result<PriceUpdate, Error> {
69        let url = format!("{}/v2/updates/price/{}", self.base_url, publish_time);
70        let mut req = self.http.get(&url);
71        for id in ids {
72            req = req.query(&[("ids[]", *id)]);
73        }
74        let resp = req.send().await?.error_for_status()?;
75        resp.json::<PriceUpdate>().await
76    }
77
78    /// Get the latest TWAP by price feed id with a custom time window.
79    ///
80    /// # Arguments
81    /// * `window_seconds` - Time period in seconds used to calculate the TWAP, ending at current time
82    pub async fn get_latest_twaps(
83        &self,
84        window_seconds: u64,
85        ids: &[&str],
86    ) -> Result<TwapsResponse, Error> {
87        let url = format!(
88            "{}/v2/updates/twap/{}/latest",
89            self.base_url, window_seconds
90        );
91        let mut req = self.http.get(&url);
92        for id in ids {
93            req = req.query(&[("ids[]", *id)]);
94        }
95        let resp = req.send().await?.error_for_status()?;
96        resp.json::<TwapsResponse>().await
97    }
98
99    /// Gets the most recent publisher stake caps update data
100    pub async fn get_latest_publisher_stake_caps(
101        &self,
102    ) -> Result<LatestPublisherStakeCapsUpdateDataResponse, Error> {
103        let url = format!("{}/v2/updates/publisher_stake_caps/latest", self.base_url);
104        let resp = self.http.get(&url).send().await?.error_for_status()?;
105        resp.json::<LatestPublisherStakeCapsUpdateDataResponse>()
106            .await
107    }
108    /// Spawns a task which streams price updates from the hermes api
109    ///
110    /// # Returns
111    ///
112    /// [`JoinHandle`] which can be used to abort the spawned task
113    pub async fn stream_price_updates<F>(
114        &self,
115        ids: Vec<String>,
116        mut on_event: F,
117    ) -> Result<JoinHandle<()>, Error>
118    where
119        F: FnMut(ParsedPriceUpdate) + Send + 'static,
120    {
121        let base_url = self.base_url.clone();
122        let client = self.http.clone();
123
124        let handler = tokio::spawn(async move {
125            loop {
126                let url = format!("{}/v2/updates/price/stream", base_url);
127                let mut req = client.get(&url);
128                for id in &ids {
129                    req = req.query(&[("ids[]", id)]);
130                }
131
132                let mut es = match EventSource::new(req) {
133                    Ok(stream) => stream,
134                    Err(err) => {
135                        log::error!("failed to connect SSE {err:#?}");
136                        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
137                        continue;
138                    }
139                };
140
141                while let Some(event) = es.next().await {
142                    match event {
143                        Ok(Event::Message(msg)) => {
144                            if let Ok(update) = serde_json::from_str::<PriceUpdate>(&msg.data) {
145                                if let Some(parsed) = update.parsed {
146                                    for item in parsed {
147                                        if let Some(metadata) = item.metadata.clone() {
148                                            let parsed_update = ParsedPriceUpdate {
149                                                id: item.id,
150                                                price: item.price,
151                                                ema_price: item.ema_price,
152                                                metadata,
153                                            };
154                                            on_event(parsed_update);
155                                        }
156                                    }
157                                }
158                            }
159                        }
160                        Ok(Event::Open) => {
161                            // Connection established
162                        }
163                        Err(EventSourceError::StreamEnded) => {
164                            log::error!("stream ended, reconnecting");
165                            break;
166                        }
167                        Err(err) => {
168                            log::error!("sse error {err:#?}");
169                            break;
170                        }
171                    }
172                }
173            }
174        });
175
176        Ok(handler)
177    }
178}
179
180#[cfg(test)]
181mod test {
182    use super::{types::PUBLIC_BASE_URL, *};
183
184    const ETH_USD_FEED_ID: &str =
185        "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace";
186    const SOL_USD_FEED_ID: &str =
187        "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d";
188
189    #[tokio::test(flavor = "multi_thread")]
190    async fn test_stream_price_updates_live() {
191        let client = HermesClient::new(PUBLIC_BASE_URL);
192        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
193
194        let handler = client
195            .stream_price_updates(
196                vec![ETH_USD_FEED_ID.to_string(), SOL_USD_FEED_ID.to_string()],
197                move |update| {
198                    let _ = tx.send(update);
199                },
200            )
201            .await
202            .expect("Failed to start SSE stream");
203        let mut found_eth_feed = false;
204        let mut found_sol_feed = false;
205        let mut timer = tokio::time::interval(std::time::Duration::from_secs(20));
206        timer.tick().await;
207        loop {
208            tokio::select! {
209                result = rx.recv() => {
210                    if let Some(update) = result {
211                        println!("update {update:#?}");
212                        if update.id.contains(ETH_USD_FEED_ID) {
213                            found_eth_feed = true;
214                        }
215                        if update.id.contains(SOL_USD_FEED_ID) {
216                            found_sol_feed = true;
217                        }
218                        if found_eth_feed && found_sol_feed {
219                            break;
220                        }
221                    } else {
222                        panic!("channel closed");
223                    }
224                }
225                _ = timer.tick() => {
226                    break;
227                }
228            }
229        }
230        handler.abort();
231        if !found_eth_feed || !found_sol_feed {
232            panic!("failed to find feeds");
233        }
234    }
235
236    #[tokio::test]
237    async fn test_latest_price_feeds() {
238        let hc = HermesClient::new(PUBLIC_BASE_URL);
239
240        let _ = hc.get_latest_price_feeds(&[ETH_USD_FEED_ID]).await.unwrap();
241    }
242
243    #[tokio::test]
244    async fn test_get_latest_price_feeds_live() {
245        let client = HermesClient::new(PUBLIC_BASE_URL);
246        let result = client
247            .get_latest_price_feeds(&[ETH_USD_FEED_ID])
248            .await
249            .unwrap();
250        assert!(!result.is_empty());
251        assert_eq!(
252            result[0].id.to_lowercase(),
253            ETH_USD_FEED_ID.trim_start_matches("0x").to_lowercase()
254        );
255    }
256
257    #[tokio::test]
258    async fn test_get_price_feeds_metadata_live_empty() {
259        let client = HermesClient::new(PUBLIC_BASE_URL);
260        let metadata = client.get_price_feeds_metadata(None, None).await.unwrap();
261        assert!(!metadata.is_empty());
262    }
263
264    #[tokio::test]
265    async fn test_get_price_feeds_metadata_live() {
266        let client = HermesClient::new(PUBLIC_BASE_URL);
267        let metadata = client
268            .get_price_feeds_metadata(Some("bitcoin"), None)
269            .await
270            .unwrap();
271        assert!(!metadata.is_empty());
272    }
273
274    #[tokio::test]
275    async fn test_get_latest_publisher_stake_caps_live() {
276        let client = HermesClient::new(PUBLIC_BASE_URL);
277        let response = client.get_latest_publisher_stake_caps().await.unwrap();
278        assert!(!response.binary.data.is_empty());
279    }
280
281    #[tokio::test]
282    async fn test_get_price_updates_by_time_live() {
283        let client = HermesClient::new(PUBLIC_BASE_URL);
284        let result = client
285            .get_price_updates_by_time(1717632000, &[ETH_USD_FEED_ID])
286            .await;
287        assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
288    }
289
290    #[tokio::test]
291    async fn test_get_latest_twaps_live() {
292        let client = HermesClient::new(PUBLIC_BASE_URL);
293        let result = client.get_latest_twaps(300, &[ETH_USD_FEED_ID]).await;
294        assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
295    }
296}