1pub mod types;
4
5use {
6 futures_util::StreamExt,
7 reqwest::{Client, Error},
8 reqwest_eventsource::{Error as EventSourceError, Event, EventSource},
9 std::sync::Arc,
10 types::*,
11};
12
13pub struct HermesClient {
14 http: reqwest::Client,
15 base_url: Arc<str>,
16}
17
18impl HermesClient {
19 pub fn new(base_url: impl Into<String>) -> Self {
20 Self {
21 http: Client::new(),
22 base_url: Arc::from(base_url.into()),
23 }
24 }
25
26 pub async fn get_latest_price_feeds(&self, ids: &[&str]) -> Result<Vec<RpcPriceFeed>, Error> {
28 let url = format!("{}/v2/updates/price/latest", self.base_url);
29 let mut req = self.http.get(&url);
30 for id in ids {
31 req = req.query(&[("ids[]", *id)]);
32 }
33 let resp = req.send().await?.error_for_status()?;
34 let feeds = resp.json::<PriceUpdate>().await?;
35 Ok(feeds.parsed.unwrap_or_default())
36 }
37
38 pub async fn get_price_feeds_metadata(
45 &self,
46 query: Option<&str>,
47 asset_type: Option<&str>,
48 ) -> Result<Vec<PriceFeedMetadata>, Error> {
49 let url = format!("{}/v2/price_feeds", self.base_url);
50 let req = self
51 .http
52 .get(&url)
53 .query(&[("query", query), ("asset_type", asset_type)]);
54 let resp = req.send().await?.error_for_status()?;
55 resp.json::<Vec<PriceFeedMetadata>>().await
56 }
57
58 pub async fn get_price_updates_by_time(
64 &self,
65 publish_time: i64,
66 ids: &[&str],
67 ) -> Result<PriceUpdate, Error> {
68 let url = format!("{}/v2/updates/price/{}", self.base_url, publish_time);
69 let mut req = self.http.get(&url);
70 for id in ids {
71 req = req.query(&[("ids[]", *id)]);
72 }
73 let resp = req.send().await?.error_for_status()?;
74 resp.json::<PriceUpdate>().await
75 }
76
77 pub async fn get_latest_twaps(
82 &self,
83 window_seconds: u64,
84 ids: &[&str],
85 ) -> Result<TwapsResponse, Error> {
86 let url = format!(
87 "{}/v2/updates/twap/{}/latest",
88 self.base_url, window_seconds
89 );
90 let mut req = self.http.get(&url);
91 for id in ids {
92 req = req.query(&[("ids[]", *id)]);
93 }
94 let resp = req.send().await?.error_for_status()?;
95 resp.json::<TwapsResponse>().await
96 }
97
98 pub async fn get_latest_publisher_stake_caps(
100 &self,
101 ) -> Result<LatestPublisherStakeCapsUpdateDataResponse, Error> {
102 let url = format!("{}/v2/updates/publisher_stake_caps/latest", self.base_url);
103 let resp = self.http.get(&url).send().await?.error_for_status()?;
104 resp.json::<LatestPublisherStakeCapsUpdateDataResponse>()
105 .await
106 }
107 pub async fn stream_price_updates<F>(
108 &self,
109 ids: Vec<String>,
110 mut on_event: F,
111 ) -> Result<(), Error>
112 where
113 F: FnMut(ParsedPriceUpdate) + Send + 'static,
114 {
115 let base_url = self.base_url.clone();
116 let client = self.http.clone();
117
118 tokio::spawn(async move {
119 loop {
120 let url = format!("{}/v2/updates/price/stream", base_url);
121 let mut req = client.get(&url);
122 for id in &ids {
123 req = req.query(&[("ids[]", id)]);
124 }
125
126 let mut es = match EventSource::new(req) {
127 Ok(stream) => stream,
128 Err(err) => {
129 log::error!("failed to connect SSE {err:#?}");
130 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
131 continue;
132 }
133 };
134
135 while let Some(event) = es.next().await {
136 match event {
137 Ok(Event::Message(msg)) => {
138 if let Ok(update) = serde_json::from_str::<PriceUpdate>(&msg.data) {
139 if let Some(parsed) = update.parsed {
140 for item in parsed {
141 if let Some(metadata) = item.metadata.clone() {
142 let parsed_update = ParsedPriceUpdate {
143 id: item.id,
144 price: item.price,
145 ema_price: item.ema_price,
146 metadata,
147 };
148 on_event(parsed_update);
149 }
150 }
151 }
152 }
153 }
154 Ok(Event::Open) => {
155 }
157 Err(EventSourceError::StreamEnded) => {
158 log::error!("stream ended, reconnecting");
159 break;
160 }
161 Err(err) => {
162 log::error!("sse error {err:#?}");
163 break;
164 }
165 }
166 }
167 }
168 });
169
170 Ok(())
171 }
172}
173
174#[cfg(test)]
175mod test {
176 use super::{types::PUBLIC_BASE_URL, *};
177
178 const ETH_USD_FEED_ID: &str =
179 "ff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace";
180 const SOL_USD_FEED_ID: &str =
181 "ef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d";
182
183 #[tokio::test(flavor = "multi_thread")]
184 async fn test_stream_price_updates_live() {
185 let client = HermesClient::new(PUBLIC_BASE_URL);
186 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
187
188 let handler = tokio::task::spawn(async move {
189 client
190 .stream_price_updates(
191 vec![ETH_USD_FEED_ID.to_string(), SOL_USD_FEED_ID.to_string()],
192 move |update| {
193 let _ = tx.send(update);
194 },
195 )
196 .await
197 .expect("Failed to start SSE stream");
198 });
199 let mut found_eth_feed = false;
200 let mut found_sol_feed = false;
201 let mut timer = tokio::time::interval(std::time::Duration::from_secs(20));
202 timer.tick().await;
203 loop {
204 tokio::select! {
205 result = rx.recv() => {
206 if let Some(update) = result {
207 println!("update {update:#?}");
208 if update.id.contains(ETH_USD_FEED_ID) {
209 found_eth_feed = true;
210 }
211 if update.id.contains(SOL_USD_FEED_ID) {
212 found_sol_feed = true;
213 }
214 if found_eth_feed && found_sol_feed {
215 break;
216 }
217 } else {
218 panic!("channel closed");
219 }
220 }
221 _ = timer.tick() => {
222 break;
223 }
224 }
225 }
226 handler.abort();
227 if !found_eth_feed || !found_sol_feed {
228 panic!("failed to find feeds");
229 }
230 }
231
232 #[tokio::test]
233 async fn test_latest_price_feeds() {
234 let hc = HermesClient::new(PUBLIC_BASE_URL);
235
236 let _ = hc.get_latest_price_feeds(&[ETH_USD_FEED_ID]).await.unwrap();
237 }
238
239 #[tokio::test]
240 async fn test_get_latest_price_feeds_live() {
241 let client = HermesClient::new(PUBLIC_BASE_URL);
242 let result = client
243 .get_latest_price_feeds(&[ETH_USD_FEED_ID])
244 .await
245 .unwrap();
246 assert!(!result.is_empty());
247 assert_eq!(
248 result[0].id.to_lowercase(),
249 ETH_USD_FEED_ID.trim_start_matches("0x").to_lowercase()
250 );
251 }
252
253 #[tokio::test]
254 async fn test_get_price_feeds_metadata_live_empty() {
255 let client = HermesClient::new(PUBLIC_BASE_URL);
256 let metadata = client.get_price_feeds_metadata(None, None).await.unwrap();
257 assert!(!metadata.is_empty());
258 }
259
260 #[tokio::test]
261 async fn test_get_price_feeds_metadata_live() {
262 let client = HermesClient::new(PUBLIC_BASE_URL);
263 let metadata = client
264 .get_price_feeds_metadata(Some("bitcoin"), None)
265 .await
266 .unwrap();
267 assert!(!metadata.is_empty());
268 }
269
270 #[tokio::test]
271 async fn test_get_latest_publisher_stake_caps_live() {
272 let client = HermesClient::new(PUBLIC_BASE_URL);
273 let response = client.get_latest_publisher_stake_caps().await.unwrap();
274 assert!(!response.binary.data.is_empty());
275 }
276
277 #[tokio::test]
278 async fn test_get_price_updates_by_time_live() {
279 let client = HermesClient::new(PUBLIC_BASE_URL);
280 let result = client
281 .get_price_updates_by_time(1717632000, &[ETH_USD_FEED_ID])
282 .await;
283 assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
284 }
285
286 #[tokio::test]
287 async fn test_get_latest_twaps_live() {
288 let client = HermesClient::new(PUBLIC_BASE_URL);
289 let result = client.get_latest_twaps(300, &[ETH_USD_FEED_ID]).await;
290 assert!(result.is_ok() || matches!(result, Err(reqwest::Error { .. })));
291 }
292}