pyth_hermes_rs/
lib.rs

1//! Rust library for querying deployments of the Pyth Hermes API
2
3pub mod types;
4
5use {
6    futures_util::StreamExt,
7    reqwest::{Client, Error},
8    reqwest_eventsource::{Error as EventSourceError, Event, EventSource},
9    std::sync::Arc,
10    types::*,
11};
12
13pub struct HermesClient {
14    http: reqwest::Client,
15    base_url: Arc<str>,
16}
17
18impl HermesClient {
19    pub fn new(base_url: impl Into<String>) -> Self {
20        Self {
21            http: Client::new(),
22            base_url: Arc::from(base_url.into()),
23        }
24    }
25
26    /// Get the latest price updates by price feed id.
27    pub async fn get_latest_price_feeds(&self, ids: &[&str]) -> Result<Vec<RpcPriceFeed>, Error> {
28        let url = format!("{}/v2/updates/price/latest", self.base_url);
29        let mut req = self.http.get(&url);
30        for id in ids {
31            req = req.query(&[("ids[]", *id)]);
32        }
33        let resp = req.send().await?.error_for_status()?;
34        let feeds = resp.json::<PriceUpdate>().await?;
35        Ok(feeds.parsed.unwrap_or_default())
36    }
37
38    /// This endpoint fetches all price feeds from the Pyth network. It can be filtered by asset type and query string.
39    ///
40    /// # Arguments
41    ///
42    /// * `query` - If provided results will be filtered for price feeds whose symbol contains the query string
43    /// * `asset_type` - If provides filter by asset type. Values are crypto, equity, fx, metal, rates
44    pub async fn get_price_feeds_metadata(
45        &self,
46        query: Option<&str>,
47        asset_type: Option<&str>,
48    ) -> Result<Vec<PriceFeedMetadata>, Error> {
49        let url = format!("{}/v2/price_feeds", self.base_url);
50        let req = self
51            .http
52            .get(&url)
53            .query(&[("query", query), ("asset_type", asset_type)]);
54        let resp = req.send().await?.error_for_status()?;
55        resp.json::<Vec<PriceFeedMetadata>>().await
56    }
57
58    /// Get the latest price updates by price feed id, with a publish time greater than `publish_time`
59    ///
60    /// # Arguments
61    ///
62    /// * `publish_time` - Only return price feed updates that are greater than or equal to this timestamp
63    pub async fn get_price_updates_by_time(
64        &self,
65        publish_time: i64,
66        ids: &[&str],
67    ) -> Result<PriceUpdate, Error> {
68        let url = format!("{}/v2/updates/price/{}", self.base_url, publish_time);
69        let mut req = self.http.get(&url);
70        for id in ids {
71            req = req.query(&[("ids[]", *id)]);
72        }
73        let resp = req.send().await?.error_for_status()?;
74        resp.json::<PriceUpdate>().await
75    }
76
77    /// Get the latest TWAP by price feed id with a custom time window.
78    ///
79    /// # Arguments
80    /// * `window_seconds` - Time period in seconds used to calculate the TWAP, ending at current time
81    pub async fn get_latest_twaps(
82        &self,
83        window_seconds: u64,
84        ids: &[&str],
85    ) -> Result<TwapsResponse, Error> {
86        let url = format!(
87            "{}/v2/updates/twap/{}/latest",
88            self.base_url, window_seconds
89        );
90        let mut req = self.http.get(&url);
91        for id in ids {
92            req = req.query(&[("ids[]", *id)]);
93        }
94        let resp = req.send().await?.error_for_status()?;
95        resp.json::<TwapsResponse>().await
96    }
97
98    /// Gets the most recent publisher stake caps update data
99    pub async fn get_latest_publisher_stake_caps(
100        &self,
101    ) -> Result<LatestPublisherStakeCapsUpdateDataResponse, Error> {
102        let url = format!("{}/v2/updates/publisher_stake_caps/latest", self.base_url);
103        let resp = self.http.get(&url).send().await?.error_for_status()?;
104        resp.json::<LatestPublisherStakeCapsUpdateDataResponse>()
105            .await
106    }
107    pub async fn stream_price_updates<F>(
108        &self,
109        ids: Vec<String>,
110        mut on_event: F,
111    ) -> Result<(), Error>
112    where
113        F: FnMut(ParsedPriceUpdate) + Send + 'static,
114    {
115        let base_url = self.base_url.clone();
116        let client = self.http.clone();
117
118        tokio::spawn(async move {
119            loop {
120                let url = format!("{}/v2/updates/price/stream", base_url);
121                let mut req = client.get(&url);
122                for id in &ids {
123                    req = req.query(&[("ids[]", id)]);
124                }
125
126                let mut es = match EventSource::new(req) {
127                    Ok(stream) => stream,
128                    Err(err) => {
129                        log::error!("failed to connect SSE {err:#?}");
130                        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
131                        continue;
132                    }
133                };
134
135                while let Some(event) = es.next().await {
136                    match event {
137                        Ok(Event::Message(msg)) => {
138                            if let Ok(update) = serde_json::from_str::<PriceUpdate>(&msg.data) {
139                                if let Some(parsed) = update.parsed {
140                                    for item in parsed {
141                                        if let Some(metadata) = item.metadata.clone() {
142                                            let parsed_update = ParsedPriceUpdate {
143                                                id: item.id,
144                                                price: item.price,
145                                                ema_price: item.ema_price,
146                                                metadata,
147                                            };
148                                            on_event(parsed_update);
149                                        }
150                                    }
151                                }
152                            }
153                        }
154                        Ok(Event::Open) => {
155                            // Connection established
156                        }
157                        Err(EventSourceError::StreamEnded) => {
158                            log::error!("stream ended, reconnecting");
159                            break;
160                        }
161                        Err(err) => {
162                            log::error!("sse error {err:#?}");
163                            break;
164                        }
165                    }
166                }
167            }
168        });
169
170        Ok(())
171    }
172}
173
174#[cfg(test)]
175mod test {
176    use super::{types::PUBLIC_BASE_URL, *};
177
178    const ETH_USD_FEED_ID: &str =
179        "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace";
180    const SOL_USD_FEED_ID: &str =
181        "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d";
182
183    #[tokio::test(flavor = "multi_thread")]
184    async fn test_stream_price_updates_live() {
185        let client = HermesClient::new(PUBLIC_BASE_URL);
186        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
187
188        let handler = tokio::task::spawn(async move {
189            client
190                .stream_price_updates(
191                    vec![ETH_USD_FEED_ID.to_string(), SOL_USD_FEED_ID.to_string()],
192                    move |update| {
193                        let _ = tx.send(update);
194                    },
195                )
196                .await
197                .expect("Failed to start SSE stream");
198        });
199        let mut found_eth_feed = false;
200        let mut found_sol_feed = false;
201        let mut timer = tokio::time::interval(std::time::Duration::from_secs(20));
202        timer.tick().await;
203        loop {
204            tokio::select! {
205                result = rx.recv() => {
206                    if let Some(update) = result {
207                        println!("update {update:#?}");
208                        if update.id.contains(ETH_USD_FEED_ID) {
209                            found_eth_feed = true;
210                        }
211                        if update.id.contains(SOL_USD_FEED_ID) {
212                            found_sol_feed = true;
213                        }
214                        if found_eth_feed && found_sol_feed {
215                            break;
216                        }
217                    } else {
218                        panic!("channel closed");
219                    }
220                }
221                _ = timer.tick() => {
222                    break;
223                }
224            }
225        }
226        handler.abort();
227        if !found_eth_feed || !found_sol_feed {
228            panic!("failed to find feeds");
229        }
230    }
231
232    #[tokio::test]
233    async fn test_latest_price_feeds() {
234        let hc = HermesClient::new(PUBLIC_BASE_URL);
235
236        let _ = hc.get_latest_price_feeds(&[ETH_USD_FEED_ID]).await.unwrap();
237    }
238
239    #[tokio::test]
240    async fn test_get_latest_price_feeds_live() {
241        let client = HermesClient::new(PUBLIC_BASE_URL);
242        let result = client
243            .get_latest_price_feeds(&[ETH_USD_FEED_ID])
244            .await
245            .unwrap();
246        assert!(!result.is_empty());
247        assert_eq!(
248            result[0].id.to_lowercase(),
249            ETH_USD_FEED_ID.trim_start_matches("0x").to_lowercase()
250        );
251    }
252
253    #[tokio::test]
254    async fn test_get_price_feeds_metadata_live_empty() {
255        let client = HermesClient::new(PUBLIC_BASE_URL);
256        let metadata = client.get_price_feeds_metadata(None, None).await.unwrap();
257        assert!(!metadata.is_empty());
258    }
259
260    #[tokio::test]
261    async fn test_get_price_feeds_metadata_live() {
262        let client = HermesClient::new(PUBLIC_BASE_URL);
263        let metadata = client
264            .get_price_feeds_metadata(Some("bitcoin"), None)
265            .await
266            .unwrap();
267        assert!(!metadata.is_empty());
268    }
269
270    #[tokio::test]
271    async fn test_get_latest_publisher_stake_caps_live() {
272        let client = HermesClient::new(PUBLIC_BASE_URL);
273        let response = client.get_latest_publisher_stake_caps().await.unwrap();
274        assert!(!response.binary.data.is_empty());
275    }
276
277    #[tokio::test]
278    async fn test_get_price_updates_by_time_live() {
279        let client = HermesClient::new(PUBLIC_BASE_URL);
280        let result = client
281            .get_price_updates_by_time(1717632000, &[ETH_USD_FEED_ID])
282            .await;
283        assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
284    }
285
286    #[tokio::test]
287    async fn test_get_latest_twaps_live() {
288        let client = HermesClient::new(PUBLIC_BASE_URL);
289        let result = client.get_latest_twaps(300, &[ETH_USD_FEED_ID]).await;
290        assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
291    }
292}