1pub mod types;
4
5use {
6 futures_util::StreamExt,
7 reqwest::{Client, Error},
8 reqwest_eventsource::{Error as EventSourceError, Event, EventSource},
9 std::sync::Arc,
10 types::*,
11};
12
13pub struct HermesClient {
14 http: reqwest::Client,
15 base_url: Arc<str>,
16}
17
18impl HermesClient {
19 pub fn new(base_url: impl Into<String>) -> Self {
20 Self {
21 http: Client::new(),
22 base_url: Arc::from(base_url.into()),
23 }
24 }
25
26 pub async fn get_latest_price_feeds(&self, ids: &[&str]) -> Result<Vec<RpcPriceFeed>, Error> {
28 let url = format!("{}/v2/updates/price/latest", self.base_url);
29 let mut req = self.http.get(&url);
30 for id in ids {
31 req = req.query(&[("ids[]", *id)]);
32 }
33 let resp = req.send().await?.error_for_status()?;
34 let feeds = resp.json::<PriceUpdate>().await?;
35 Ok(feeds.parsed.unwrap_or_default())
36 }
37
38 pub async fn get_price_feeds_metadata(
45 &self,
46 query: Option<&str>,
47 asset_type: Option<&str>,
48 ) -> Result<Vec<PriceFeedMetadata>, Error> {
49 let url = format!("{}/v2/price_feeds", self.base_url);
50 let req = self
51 .http
52 .get(&url)
53 .query(&[("query", query), ("asset_type", asset_type)]);
54 let resp = req.send().await?.error_for_status()?;
55 resp.json::<Vec<PriceFeedMetadata>>().await
56 }
57
58 pub async fn get_price_updates_by_time(
64 &self,
65 publish_time: i64,
66 ids: &[&str],
67 ) -> Result<PriceUpdate, Error> {
68 let url = format!("{}/v2/updates/price/{}", self.base_url, publish_time);
69 let mut req = self.http.get(&url);
70 for id in ids {
71 req = req.query(&[("ids[]", *id)]);
72 }
73 let resp = req.send().await?.error_for_status()?;
74 resp.json::<PriceUpdate>().await
75 }
76
77 pub async fn get_latest_twaps(
82 &self,
83 window_seconds: u64,
84 ids: &[&str],
85 ) -> Result<TwapsResponse, Error> {
86 let url = format!(
87 "{}/v2/updates/twap/{}/latest",
88 self.base_url, window_seconds
89 );
90 let mut req = self.http.get(&url);
91 for id in ids {
92 req = req.query(&[("ids[]", *id)]);
93 }
94 let resp = req.send().await?.error_for_status()?;
95 resp.json::<TwapsResponse>().await
96 }
97
98 pub async fn get_latest_publisher_stake_caps(
100 &self,
101 ) -> Result<LatestPublisherStakeCapsUpdateDataResponse, Error> {
102 let url = format!("{}/v2/updates/publisher_stake_caps/latest", self.base_url);
103 let resp = self.http.get(&url).send().await?.error_for_status()?;
104 resp.json::<LatestPublisherStakeCapsUpdateDataResponse>()
105 .await
106 }
107 pub async fn stream_price_updates<F>(&self, ids: &[&str], mut on_event: F) -> Result<(), Error>
108 where
109 F: FnMut(ParsedPriceUpdate) + Send + 'static,
110 {
111 let base_url = self.base_url.clone();
112 let client = self.http.clone();
113 let ids: Vec<String> = ids.iter().map(|s| s.to_string()).collect();
114
115 tokio::spawn(async move {
116 loop {
117 let url = format!("{}/v2/updates/price/stream", base_url);
118 let mut req = client.get(&url);
119 for id in &ids {
120 req = req.query(&[("ids[]", id)]);
121 }
122
123 let mut es = match EventSource::new(req) {
124 Ok(stream) => stream,
125 Err(err) => {
126 log::error!("failed to connect SSE {err:#?}");
127 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
128 continue;
129 }
130 };
131
132 while let Some(event) = es.next().await {
133 match event {
134 Ok(Event::Message(msg)) => {
135 if let Ok(update) = serde_json::from_str::<PriceUpdate>(&msg.data) {
136 if let Some(parsed) = update.parsed {
137 for item in parsed {
138 if let Some(metadata) = item.metadata.clone() {
139 let parsed_update = ParsedPriceUpdate {
140 id: item.id,
141 price: item.price,
142 ema_price: item.ema_price,
143 metadata,
144 };
145 on_event(parsed_update);
146 }
147 }
148 }
149 }
150 }
151 Ok(Event::Open) => {
152 }
154 Err(EventSourceError::StreamEnded) => {
155 log::error!("stream ended, reconnecting");
156 break;
157 }
158 Err(err) => {
159 log::error!("sse error {err:#?}");
160 break;
161 }
162 }
163 }
164 }
165 });
166
167 Ok(())
168 }
169}
170
171#[cfg(test)]
172mod test {
173 use {
174 super::*,
175 tokio::time::{timeout, Duration},
176 };
177 const BASE_URL: &str = "https://hermes.pyth.network";
178 const FEED_ID: &str = "0xff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace";
179
180 #[tokio::test]
181 async fn test_stream_price_updates_live() {
182 let client = HermesClient::new(BASE_URL);
183 let (tx, mut rx) = tokio::sync::mpsc::channel(2);
184
185 client
186 .stream_price_updates(&[FEED_ID], move |update| {
187 let _ = tx.try_send(update);
188 })
189 .await
190 .expect("Failed to start SSE stream");
191
192 let mut events_received = 0;
193 loop {
194 let result = timeout(Duration::from_secs(20), rx.recv()).await;
195
196 match result {
197 Ok(Some(update)) => {
198 assert_eq!(
199 update.id.to_lowercase(),
200 FEED_ID.trim_start_matches("0x").to_lowercase()
201 );
202 events_received += 1;
203 }
204 Ok(None) => panic!("Channel closed before receiving data"),
205 Err(_) => panic!("Timed out waiting for SSE data"),
206 }
207
208 if events_received >= 2 {
209 break;
210 }
211 }
212 }
213
214 #[tokio::test]
215 async fn test_latest_price_feeds() {
216 let hc = HermesClient::new(BASE_URL);
217
218 let _ = hc.get_latest_price_feeds(&[FEED_ID]).await.unwrap();
219 }
220
221 #[tokio::test]
222 async fn test_get_latest_price_feeds_live() {
223 let client = HermesClient::new(BASE_URL);
224 let result = client.get_latest_price_feeds(&[FEED_ID]).await.unwrap();
225 assert!(!result.is_empty());
226 assert_eq!(
227 result[0].id.to_lowercase(),
228 FEED_ID.trim_start_matches("0x").to_lowercase()
229 );
230 }
231
232 #[tokio::test]
233 async fn test_get_price_feeds_metadata_live_empty() {
234 let client = HermesClient::new(BASE_URL);
235 let metadata = client.get_price_feeds_metadata(None, None).await.unwrap();
236 assert!(!metadata.is_empty());
237 }
238
239 #[tokio::test]
240 async fn test_get_price_feeds_metadata_live() {
241 let client = HermesClient::new(BASE_URL);
242 let metadata = client
243 .get_price_feeds_metadata(Some("bitcoin"), None)
244 .await
245 .unwrap();
246 assert!(!metadata.is_empty());
247 }
248
249 #[tokio::test]
250 async fn test_get_latest_publisher_stake_caps_live() {
251 let client = HermesClient::new(BASE_URL);
252 let response = client.get_latest_publisher_stake_caps().await.unwrap();
253 assert!(!response.binary.data.is_empty());
254 }
255
256 #[tokio::test]
257 async fn test_get_price_updates_by_time_live() {
258 let client = HermesClient::new(BASE_URL);
259 let result = client
260 .get_price_updates_by_time(1717632000, &[FEED_ID])
261 .await;
262 assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
263 }
264
265 #[tokio::test]
266 async fn test_get_latest_twaps_live() {
267 let client = HermesClient::new(BASE_URL);
268 let result = client.get_latest_twaps(300, &[FEED_ID]).await;
269 assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
270 }
271}