1pub mod types;
4
5use {
6 futures_util::StreamExt,
7 reqwest::{Client, Error},
8 reqwest_eventsource::{Error as EventSourceError, Event, EventSource},
9 std::sync::Arc,
10 tokio::task::JoinHandle,
11 types::*,
12};
13
14pub struct HermesClient {
15 http: reqwest::Client,
16 base_url: Arc<str>,
17}
18
19impl HermesClient {
20 pub fn new(base_url: impl Into<String>) -> Self {
21 Self {
22 http: Client::new(),
23 base_url: Arc::from(base_url.into()),
24 }
25 }
26
27 pub async fn get_latest_price_feeds(&self, ids: &[&str]) -> Result<Vec<RpcPriceFeed>, Error> {
29 let url = format!("{}/v2/updates/price/latest", self.base_url);
30 let mut req = self.http.get(&url);
31 for id in ids {
32 req = req.query(&[("ids[]", *id)]);
33 }
34 let resp = req.send().await?.error_for_status()?;
35 let feeds = resp.json::<PriceUpdate>().await?;
36 Ok(feeds.parsed.unwrap_or_default())
37 }
38
39 pub async fn get_price_feeds_metadata(
46 &self,
47 query: Option<&str>,
48 asset_type: Option<&str>,
49 ) -> Result<Vec<PriceFeedMetadata>, Error> {
50 let url = format!("{}/v2/price_feeds", self.base_url);
51 let req = self
52 .http
53 .get(&url)
54 .query(&[("query", query), ("asset_type", asset_type)]);
55 let resp = req.send().await?.error_for_status()?;
56 resp.json::<Vec<PriceFeedMetadata>>().await
57 }
58
59 pub async fn get_price_updates_by_time(
65 &self,
66 publish_time: i64,
67 ids: &[&str],
68 ) -> Result<PriceUpdate, Error> {
69 let url = format!("{}/v2/updates/price/{}", self.base_url, publish_time);
70 let mut req = self.http.get(&url);
71 for id in ids {
72 req = req.query(&[("ids[]", *id)]);
73 }
74 let resp = req.send().await?.error_for_status()?;
75 resp.json::<PriceUpdate>().await
76 }
77
78 pub async fn get_latest_twaps(
83 &self,
84 window_seconds: u64,
85 ids: &[&str],
86 ) -> Result<TwapsResponse, Error> {
87 let url = format!(
88 "{}/v2/updates/twap/{}/latest",
89 self.base_url, window_seconds
90 );
91 let mut req = self.http.get(&url);
92 for id in ids {
93 req = req.query(&[("ids[]", *id)]);
94 }
95 let resp = req.send().await?.error_for_status()?;
96 resp.json::<TwapsResponse>().await
97 }
98
99 pub async fn get_latest_publisher_stake_caps(
101 &self,
102 ) -> Result<LatestPublisherStakeCapsUpdateDataResponse, Error> {
103 let url = format!("{}/v2/updates/publisher_stake_caps/latest", self.base_url);
104 let resp = self.http.get(&url).send().await?.error_for_status()?;
105 resp.json::<LatestPublisherStakeCapsUpdateDataResponse>()
106 .await
107 }
108 pub async fn stream_price_updates<F>(
114 &self,
115 ids: Vec<String>,
116 mut on_event: F,
117 ) -> Result<JoinHandle<()>, Error>
118 where
119 F: FnMut(ParsedPriceUpdate) + Send + 'static,
120 {
121 let base_url = self.base_url.clone();
122 let client = self.http.clone();
123
124 let handler = tokio::spawn(async move {
125 loop {
126 let url = format!("{}/v2/updates/price/stream", base_url);
127 let mut req = client.get(&url);
128 for id in &ids {
129 req = req.query(&[("ids[]", id)]);
130 }
131
132 let mut es = match EventSource::new(req) {
133 Ok(stream) => stream,
134 Err(err) => {
135 log::error!("failed to connect SSE {err:#?}");
136 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
137 continue;
138 }
139 };
140
141 while let Some(event) = es.next().await {
142 match event {
143 Ok(Event::Message(msg)) => {
144 if let Ok(update) = serde_json::from_str::<PriceUpdate>(&msg.data) {
145 if let Some(parsed) = update.parsed {
146 for item in parsed {
147 if let Some(metadata) = item.metadata.clone() {
148 let parsed_update = ParsedPriceUpdate {
149 id: item.id,
150 price: item.price,
151 ema_price: item.ema_price,
152 metadata,
153 };
154 on_event(parsed_update);
155 }
156 }
157 }
158 }
159 }
160 Ok(Event::Open) => {
161 }
163 Err(EventSourceError::StreamEnded) => {
164 log::error!("stream ended, reconnecting");
165 break;
166 }
167 Err(err) => {
168 log::error!("sse error {err:#?}");
169 break;
170 }
171 }
172 }
173 }
174 });
175
176 Ok(handler)
177 }
178}
179
180#[cfg(test)]
181mod test {
182 use super::{types::PUBLIC_BASE_URL, *};
183
184 const ETH_USD_FEED_ID: &str =
185 "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace";
186 const SOL_USD_FEED_ID: &str =
187 "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d";
188
189 #[tokio::test(flavor = "multi_thread")]
190 async fn test_stream_price_updates_live() {
191 let client = HermesClient::new(PUBLIC_BASE_URL);
192 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
193
194 let handler = client
195 .stream_price_updates(
196 vec![ETH_USD_FEED_ID.to_string(), SOL_USD_FEED_ID.to_string()],
197 move |update| {
198 let _ = tx.send(update);
199 },
200 )
201 .await
202 .expect("Failed to start SSE stream");
203 let mut found_eth_feed = false;
204 let mut found_sol_feed = false;
205 let mut timer = tokio::time::interval(std::time::Duration::from_secs(20));
206 timer.tick().await;
207 loop {
208 tokio::select! {
209 result = rx.recv() => {
210 if let Some(update) = result {
211 println!("update {update:#?}");
212 if update.id.contains(ETH_USD_FEED_ID) {
213 found_eth_feed = true;
214 }
215 if update.id.contains(SOL_USD_FEED_ID) {
216 found_sol_feed = true;
217 }
218 if found_eth_feed && found_sol_feed {
219 break;
220 }
221 } else {
222 panic!("channel closed");
223 }
224 }
225 _ = timer.tick() => {
226 break;
227 }
228 }
229 }
230 handler.abort();
231 if !found_eth_feed || !found_sol_feed {
232 panic!("failed to find feeds");
233 }
234 }
235
236 #[tokio::test]
237 async fn test_latest_price_feeds() {
238 let hc = HermesClient::new(PUBLIC_BASE_URL);
239
240 let _ = hc.get_latest_price_feeds(&[ETH_USD_FEED_ID]).await.unwrap();
241 }
242
243 #[tokio::test]
244 async fn test_get_latest_price_feeds_live() {
245 let client = HermesClient::new(PUBLIC_BASE_URL);
246 let result = client
247 .get_latest_price_feeds(&[ETH_USD_FEED_ID])
248 .await
249 .unwrap();
250 assert!(!result.is_empty());
251 assert_eq!(
252 result[0].id.to_lowercase(),
253 ETH_USD_FEED_ID.trim_start_matches("0x").to_lowercase()
254 );
255 }
256
257 #[tokio::test]
258 async fn test_get_price_feeds_metadata_live_empty() {
259 let client = HermesClient::new(PUBLIC_BASE_URL);
260 let metadata = client.get_price_feeds_metadata(None, None).await.unwrap();
261 assert!(!metadata.is_empty());
262 }
263
264 #[tokio::test]
265 async fn test_get_price_feeds_metadata_live() {
266 let client = HermesClient::new(PUBLIC_BASE_URL);
267 let metadata = client
268 .get_price_feeds_metadata(Some("bitcoin"), None)
269 .await
270 .unwrap();
271 assert!(!metadata.is_empty());
272 }
273
274 #[tokio::test]
275 async fn test_get_latest_publisher_stake_caps_live() {
276 let client = HermesClient::new(PUBLIC_BASE_URL);
277 let response = client.get_latest_publisher_stake_caps().await.unwrap();
278 assert!(!response.binary.data.is_empty());
279 }
280
281 #[tokio::test]
282 async fn test_get_price_updates_by_time_live() {
283 let client = HermesClient::new(PUBLIC_BASE_URL);
284 let result = client
285 .get_price_updates_by_time(1717632000, &[ETH_USD_FEED_ID])
286 .await;
287 assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
288 }
289
290 #[tokio::test]
291 async fn test_get_latest_twaps_live() {
292 let client = HermesClient::new(PUBLIC_BASE_URL);
293 let result = client.get_latest_twaps(300, &[ETH_USD_FEED_ID]).await;
294 assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
295 }
296}