pyth_hermes_rs/
lib.rs

1//! Rust library for querying deployments of the Pyth Hermes API
2
3pub mod types;
4
5use {
6    futures_util::StreamExt,
7    reqwest::{Client, Error},
8    reqwest_eventsource::{Error as EventSourceError, Event, EventSource},
9    std::sync::Arc,
10    types::*,
11};
12
13pub struct HermesClient {
14    http: reqwest::Client,
15    base_url: Arc<str>,
16}
17
18impl HermesClient {
19    pub fn new(base_url: impl Into<String>) -> Self {
20        Self {
21            http: Client::new(),
22            base_url: Arc::from(base_url.into()),
23        }
24    }
25
26    /// Get the latest price updates by price feed id.
27    pub async fn get_latest_price_feeds(&self, ids: &[&str]) -> Result<Vec<RpcPriceFeed>, Error> {
28        let url = format!("{}/v2/updates/price/latest", self.base_url);
29        let mut req = self.http.get(&url);
30        for id in ids {
31            req = req.query(&[("ids[]", *id)]);
32        }
33        let resp = req.send().await?.error_for_status()?;
34        let feeds = resp.json::<PriceUpdate>().await?;
35        Ok(feeds.parsed.unwrap_or_default())
36    }
37
38    /// This endpoint fetches all price feeds from the Pyth network. It can be filtered by asset type and query string.
39    ///
40    /// # Arguments
41    ///
42    /// * `query` - If provided results will be filtered for price feeds whose symbol contains the query string
43    /// * `asset_type` - If provides filter by asset type. Values are crypto, equity, fx, metal, rates
44    pub async fn get_price_feeds_metadata(
45        &self,
46        query: Option<&str>,
47        asset_type: Option<&str>,
48    ) -> Result<Vec<PriceFeedMetadata>, Error> {
49        let url = format!("{}/v2/price_feeds", self.base_url);
50        let req = self
51            .http
52            .get(&url)
53            .query(&[("query", query), ("asset_type", asset_type)]);
54        let resp = req.send().await?.error_for_status()?;
55        resp.json::<Vec<PriceFeedMetadata>>().await
56    }
57
58    /// Get the latest price updates by price feed id, with a publish time greater than `publish_time`
59    ///
60    /// # Arguments
61    ///
62    /// * `publish_time` - Only return price feed updates that are greater than or equal to this timestamp
63    pub async fn get_price_updates_by_time(
64        &self,
65        publish_time: i64,
66        ids: &[&str],
67    ) -> Result<PriceUpdate, Error> {
68        let url = format!("{}/v2/updates/price/{}", self.base_url, publish_time);
69        let mut req = self.http.get(&url);
70        for id in ids {
71            req = req.query(&[("ids[]", *id)]);
72        }
73        let resp = req.send().await?.error_for_status()?;
74        resp.json::<PriceUpdate>().await
75    }
76
77    /// Get the latest TWAP by price feed id with a custom time window.
78    ///
79    /// # Arguments
80    /// * `window_seconds` - Time period in seconds used to calculate the TWAP, ending at current time
81    pub async fn get_latest_twaps(
82        &self,
83        window_seconds: u64,
84        ids: &[&str],
85    ) -> Result<TwapsResponse, Error> {
86        let url = format!(
87            "{}/v2/updates/twap/{}/latest",
88            self.base_url, window_seconds
89        );
90        let mut req = self.http.get(&url);
91        for id in ids {
92            req = req.query(&[("ids[]", *id)]);
93        }
94        let resp = req.send().await?.error_for_status()?;
95        resp.json::<TwapsResponse>().await
96    }
97
98    /// Gets the most recent publisher stake caps update data
99    pub async fn get_latest_publisher_stake_caps(
100        &self,
101    ) -> Result<LatestPublisherStakeCapsUpdateDataResponse, Error> {
102        let url = format!("{}/v2/updates/publisher_stake_caps/latest", self.base_url);
103        let resp = self.http.get(&url).send().await?.error_for_status()?;
104        resp.json::<LatestPublisherStakeCapsUpdateDataResponse>()
105            .await
106    }
107    pub async fn stream_price_updates<F>(&self, ids: &[&str], mut on_event: F) -> Result<(), Error>
108    where
109        F: FnMut(ParsedPriceUpdate) + Send + 'static,
110    {
111        let base_url = self.base_url.clone();
112        let client = self.http.clone();
113        let ids: Vec<String> = ids.iter().map(|s| s.to_string()).collect();
114
115        tokio::spawn(async move {
116            loop {
117                let url = format!("{}/v2/updates/price/stream", base_url);
118                let mut req = client.get(&url);
119                for id in &ids {
120                    req = req.query(&[("ids[]", id)]);
121                }
122
123                let mut es = match EventSource::new(req) {
124                    Ok(stream) => stream,
125                    Err(err) => {
126                        log::error!("failed to connect SSE {err:#?}");
127                        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
128                        continue;
129                    }
130                };
131
132                while let Some(event) = es.next().await {
133                    match event {
134                        Ok(Event::Message(msg)) => {
135                            if let Ok(update) = serde_json::from_str::<PriceUpdate>(&msg.data) {
136                                if let Some(parsed) = update.parsed {
137                                    for item in parsed {
138                                        if let Some(metadata) = item.metadata.clone() {
139                                            let parsed_update = ParsedPriceUpdate {
140                                                id: item.id,
141                                                price: item.price,
142                                                ema_price: item.ema_price,
143                                                metadata,
144                                            };
145                                            on_event(parsed_update);
146                                        }
147                                    }
148                                }
149                            }
150                        }
151                        Ok(Event::Open) => {
152                            // Connection established
153                        }
154                        Err(EventSourceError::StreamEnded) => {
155                            log::error!("stream ended, reconnecting");
156                            break;
157                        }
158                        Err(err) => {
159                            log::error!("sse error {err:#?}");
160                            break;
161                        }
162                    }
163                }
164            }
165        });
166
167        Ok(())
168    }
169}
170
171#[cfg(test)]
172mod test {
173    use {
174        super::*,
175        tokio::time::{timeout, Duration},
176    };
177    const BASE_URL: &str = "https://hermes.pyth.network";
178    const FEED_ID: &str = "0xff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace";
179
180    #[tokio::test]
181    async fn test_stream_price_updates_live() {
182        let client = HermesClient::new(BASE_URL);
183        let (tx, mut rx) = tokio::sync::mpsc::channel(2);
184
185        client
186            .stream_price_updates(&[FEED_ID], move |update| {
187                let _ = tx.try_send(update);
188            })
189            .await
190            .expect("Failed to start SSE stream");
191
192        let mut events_received = 0;
193        loop {
194            let result = timeout(Duration::from_secs(20), rx.recv()).await;
195
196            match result {
197                Ok(Some(update)) => {
198                    assert_eq!(
199                        update.id.to_lowercase(),
200                        FEED_ID.trim_start_matches("0x").to_lowercase()
201                    );
202                    events_received += 1;
203                }
204                Ok(None) => panic!("Channel closed before receiving data"),
205                Err(_) => panic!("Timed out waiting for SSE data"),
206            }
207
208            if events_received >= 2 {
209                break;
210            }
211        }
212    }
213
214    #[tokio::test]
215    async fn test_latest_price_feeds() {
216        let hc = HermesClient::new(BASE_URL);
217
218        let _ = hc.get_latest_price_feeds(&[FEED_ID]).await.unwrap();
219    }
220
221    #[tokio::test]
222    async fn test_get_latest_price_feeds_live() {
223        let client = HermesClient::new(BASE_URL);
224        let result = client.get_latest_price_feeds(&[FEED_ID]).await.unwrap();
225        assert!(!result.is_empty());
226        assert_eq!(
227            result[0].id.to_lowercase(),
228            FEED_ID.trim_start_matches("0x").to_lowercase()
229        );
230    }
231
232    #[tokio::test]
233    async fn test_get_price_feeds_metadata_live_empty() {
234        let client = HermesClient::new(BASE_URL);
235        let metadata = client.get_price_feeds_metadata(None, None).await.unwrap();
236        assert!(!metadata.is_empty());
237    }
238
239    #[tokio::test]
240    async fn test_get_price_feeds_metadata_live() {
241        let client = HermesClient::new(BASE_URL);
242        let metadata = client
243            .get_price_feeds_metadata(Some("bitcoin"), None)
244            .await
245            .unwrap();
246        assert!(!metadata.is_empty());
247    }
248
249    #[tokio::test]
250    async fn test_get_latest_publisher_stake_caps_live() {
251        let client = HermesClient::new(BASE_URL);
252        let response = client.get_latest_publisher_stake_caps().await.unwrap();
253        assert!(!response.binary.data.is_empty());
254    }
255
256    #[tokio::test]
257    async fn test_get_price_updates_by_time_live() {
258        let client = HermesClient::new(BASE_URL);
259        let result = client
260            .get_price_updates_by_time(1717632000, &[FEED_ID])
261            .await;
262        assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
263    }
264
265    #[tokio::test]
266    async fn test_get_latest_twaps_live() {
267        let client = HermesClient::new(BASE_URL);
268        let result = client.get_latest_twaps(300, &[FEED_ID]).await;
269        assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
270    }
271}