quicknode_hyperliquid_sdk/
evm_stream.rs1use futures_util::{SinkExt, StreamExt};
9use parking_lot::RwLock;
10use serde_json::{json, Value};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::time::sleep;
16use tokio_tungstenite::{connect_async, tungstenite::Message};
17
18use crate::error::Result;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub enum EVMSubscriptionType {
27 NewHeads,
29 Logs,
31 NewPendingTransactions,
33}
34
35impl EVMSubscriptionType {
36 pub fn as_str(&self) -> &'static str {
38 match self {
39 EVMSubscriptionType::NewHeads => "newHeads",
40 EVMSubscriptionType::Logs => "logs",
41 EVMSubscriptionType::NewPendingTransactions => "newPendingTransactions",
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum EVMConnectionState {
53 Disconnected,
54 Connecting,
55 Connected,
56 Reconnecting,
57}
58
59#[derive(Debug, Clone)]
65pub struct EVMSubscription {
66 pub id: u32,
67 pub sub_type: EVMSubscriptionType,
68}
69
70#[derive(Clone)]
76pub struct EVMStreamConfig {
77 pub endpoint: Option<String>,
78 pub reconnect: bool,
79 pub max_reconnect_attempts: Option<u32>,
80 pub reconnect_delay: Duration,
81 pub ping_interval: Duration,
82 pub ping_timeout: Duration,
83}
84
85impl Default for EVMStreamConfig {
86 fn default() -> Self {
87 Self {
88 endpoint: None,
89 reconnect: true,
90 max_reconnect_attempts: Some(10),
91 reconnect_delay: Duration::from_secs(1),
92 ping_interval: Duration::from_secs(30),
93 ping_timeout: Duration::from_secs(10),
94 }
95 }
96}
97
98pub struct EVMStream {
131 config: EVMStreamConfig,
132 state: Arc<RwLock<EVMConnectionState>>,
133 running: Arc<AtomicBool>,
134 reconnect_count: Arc<AtomicU32>,
135 request_id: Arc<AtomicU32>,
136 pending_subscriptions: Arc<RwLock<Vec<PendingSubscription>>>,
137 active_subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
138 callbacks: Arc<RwLock<HashMap<String, Box<dyn Fn(Value) + Send + Sync>>>>,
139 on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
140 on_close: Option<Arc<dyn Fn() + Send + Sync>>,
141 on_open: Option<Arc<dyn Fn() + Send + Sync>>,
142 on_state_change: Option<Arc<dyn Fn(EVMConnectionState) + Send + Sync>>,
143}
144
145struct PendingSubscription {
146 sub_type: EVMSubscriptionType,
147 params: Option<Value>,
148 callback: Box<dyn Fn(Value) + Send + Sync>,
149}
150
151struct SubscriptionInfo {
152 #[allow(dead_code)]
153 sub_type: EVMSubscriptionType,
154}
155
156impl EVMStream {
157 pub fn new(endpoint: Option<String>) -> Self {
159 Self {
160 config: EVMStreamConfig {
161 endpoint,
162 ..Default::default()
163 },
164 state: Arc::new(RwLock::new(EVMConnectionState::Disconnected)),
165 running: Arc::new(AtomicBool::new(false)),
166 reconnect_count: Arc::new(AtomicU32::new(0)),
167 request_id: Arc::new(AtomicU32::new(0)),
168 pending_subscriptions: Arc::new(RwLock::new(Vec::new())),
169 active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
170 callbacks: Arc::new(RwLock::new(HashMap::new())),
171 on_error: None,
172 on_close: None,
173 on_open: None,
174 on_state_change: None,
175 }
176 }
177
178 pub fn configure(mut self, config: EVMStreamConfig) -> Self {
180 self.config = config;
181 self
182 }
183
184 pub fn on_error<F>(mut self, f: F) -> Self
186 where
187 F: Fn(String) + Send + Sync + 'static,
188 {
189 self.on_error = Some(Arc::new(f));
190 self
191 }
192
193 pub fn on_close<F>(mut self, f: F) -> Self
195 where
196 F: Fn() + Send + Sync + 'static,
197 {
198 self.on_close = Some(Arc::new(f));
199 self
200 }
201
202 pub fn on_open<F>(mut self, f: F) -> Self
204 where
205 F: Fn() + Send + Sync + 'static,
206 {
207 self.on_open = Some(Arc::new(f));
208 self
209 }
210
211 pub fn on_state_change<F>(mut self, f: F) -> Self
213 where
214 F: Fn(EVMConnectionState) + Send + Sync + 'static,
215 {
216 self.on_state_change = Some(Arc::new(f));
217 self
218 }
219
220 pub fn state(&self) -> EVMConnectionState {
222 *self.state.read()
223 }
224
225 pub fn connected(&self) -> bool {
227 *self.state.read() == EVMConnectionState::Connected
228 }
229
230 fn set_state(&self, state: EVMConnectionState) {
231 *self.state.write() = state;
232 if let Some(ref cb) = self.on_state_change {
233 cb(state);
234 }
235 }
236
237 fn get_ws_url(&self) -> String {
238 if let Some(ref endpoint) = self.config.endpoint {
239 let base = endpoint
241 .trim_end_matches('/')
242 .replace("https://", "wss://")
243 .replace("http://", "ws://")
244 .replace("/info", "")
245 .replace("/evm", "")
246 .replace("/hypercore", "");
247
248 if let Ok(url) = url::Url::parse(&base) {
250 if let Some(host) = url.host_str() {
251 let path = url.path().trim_matches('/');
252 let parts: Vec<&str> = path.split('/').collect();
253 for part in parts {
255 if !part.is_empty()
256 && !["info", "hypercore", "evm", "nanoreth", "ws"].contains(&part)
257 {
258 let scheme = if base.starts_with("wss") { "wss" } else { "ws" };
259 return format!("{}://{}/{}/nanoreth", scheme, host, part);
260 }
261 }
262 }
263 }
264 format!("{}/nanoreth", base)
265 } else {
266 "wss://api.hyperliquid.xyz/nanoreth".to_string()
268 }
269 }
270
271 #[allow(dead_code)]
272 fn next_request_id(&self) -> u32 {
273 self.request_id.fetch_add(1, Ordering::SeqCst)
274 }
275
276 pub fn new_heads<F>(&mut self, callback: F) -> &mut Self
285 where
286 F: Fn(Value) + Send + Sync + 'static,
287 {
288 self.pending_subscriptions.write().push(PendingSubscription {
289 sub_type: EVMSubscriptionType::NewHeads,
290 params: None,
291 callback: Box::new(callback),
292 });
293 self
294 }
295
296 pub fn logs<F>(&mut self, filter: Option<Value>, callback: F) -> &mut Self
306 where
307 F: Fn(Value) + Send + Sync + 'static,
308 {
309 self.pending_subscriptions.write().push(PendingSubscription {
310 sub_type: EVMSubscriptionType::Logs,
311 params: filter,
312 callback: Box::new(callback),
313 });
314 self
315 }
316
317 pub fn new_pending_transactions<F>(&mut self, callback: F) -> &mut Self
321 where
322 F: Fn(Value) + Send + Sync + 'static,
323 {
324 self.pending_subscriptions
325 .write()
326 .push(PendingSubscription {
327 sub_type: EVMSubscriptionType::NewPendingTransactions,
328 params: None,
329 callback: Box::new(callback),
330 });
331 self
332 }
333
334 pub fn subscriptions(&self) -> Vec<String> {
336 self.active_subscriptions.read().keys().cloned().collect()
337 }
338
339 pub fn start(&mut self) -> Result<()> {
345 if self.running.load(Ordering::SeqCst) {
346 return Ok(());
347 }
348
349 self.running.store(true, Ordering::SeqCst);
350
351 let ws_url = self.get_ws_url();
352 let state = self.state.clone();
353 let running = self.running.clone();
354 let reconnect_count = self.reconnect_count.clone();
355 let request_id = self.request_id.clone();
356 let pending_subscriptions = self.pending_subscriptions.clone();
357 let active_subscriptions = self.active_subscriptions.clone();
358 let callbacks = self.callbacks.clone();
359 let config = self.config.clone();
360 let on_error = self.on_error.clone();
361 let on_close = self.on_close.clone();
362 let on_open = self.on_open.clone();
363 let on_state_change = self.on_state_change.clone();
364
365 tokio::spawn(async move {
366 Self::run_loop(
367 ws_url,
368 state,
369 running,
370 reconnect_count,
371 request_id,
372 pending_subscriptions,
373 active_subscriptions,
374 callbacks,
375 config,
376 on_error,
377 on_close,
378 on_open,
379 on_state_change,
380 )
381 .await;
382 });
383
384 Ok(())
385 }
386
387 pub async fn run(&mut self) -> Result<()> {
389 self.start()?;
390
391 while self.running.load(Ordering::SeqCst) {
392 sleep(Duration::from_millis(100)).await;
393 }
394
395 Ok(())
396 }
397
398 pub fn stop(&mut self) {
400 self.running.store(false, Ordering::SeqCst);
401 self.set_state(EVMConnectionState::Disconnected);
402
403 if let Some(ref cb) = self.on_close {
404 cb();
405 }
406 }
407
408 async fn run_loop(
409 ws_url: String,
410 state: Arc<RwLock<EVMConnectionState>>,
411 running: Arc<AtomicBool>,
412 reconnect_count: Arc<AtomicU32>,
413 request_id: Arc<AtomicU32>,
414 pending_subscriptions: Arc<RwLock<Vec<PendingSubscription>>>,
415 active_subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
416 callbacks: Arc<RwLock<HashMap<String, Box<dyn Fn(Value) + Send + Sync>>>>,
417 config: EVMStreamConfig,
418 on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
419 on_close: Option<Arc<dyn Fn() + Send + Sync>>,
420 on_open: Option<Arc<dyn Fn() + Send + Sync>>,
421 on_state_change: Option<Arc<dyn Fn(EVMConnectionState) + Send + Sync>>,
422 ) {
423 let mut backoff = config.reconnect_delay;
424 let max_backoff = Duration::from_secs(30);
425
426 while running.load(Ordering::SeqCst) {
427 {
429 *state.write() = EVMConnectionState::Connecting;
430 }
431 if let Some(ref cb) = on_state_change {
432 cb(EVMConnectionState::Connecting);
433 }
434
435 match connect_async(&ws_url).await {
437 Ok((ws_stream, _)) => {
438 {
439 *state.write() = EVMConnectionState::Connected;
440 }
441 if let Some(ref cb) = on_state_change {
442 cb(EVMConnectionState::Connected);
443 }
444 if let Some(ref cb) = on_open {
445 cb();
446 }
447
448 reconnect_count.store(0, Ordering::SeqCst);
449 backoff = config.reconnect_delay;
450
451 let (mut write, mut read) = ws_stream.split();
452
453 let pending: Vec<_> = {
455 let mut pending = pending_subscriptions.write();
456 pending.drain(..).collect()
457 };
458
459 let mut req_to_callback: HashMap<u32, Box<dyn Fn(Value) + Send + Sync>> =
461 HashMap::new();
462
463 for sub in pending {
464 let req_id = request_id.fetch_add(1, Ordering::SeqCst);
465 let mut params = vec![json!(sub.sub_type.as_str())];
466 if let Some(p) = sub.params {
467 params.push(p);
468 }
469
470 let msg = json!({
471 "jsonrpc": "2.0",
472 "method": "eth_subscribe",
473 "params": params,
474 "id": req_id,
475 });
476
477 req_to_callback.insert(req_id, sub.callback);
478
479 if write.send(Message::Text(msg.to_string().into())).await.is_err() {
480 break;
481 }
482 }
483
484 while running.load(Ordering::SeqCst) {
486 match tokio::time::timeout(config.ping_timeout, read.next()).await {
487 Ok(Some(Ok(Message::Text(text)))) => {
488 if let Ok(data) = serde_json::from_str::<Value>(&text) {
489 if let (Some(id), Some(result)) =
491 (data.get("id"), data.get("result"))
492 {
493 if let Some(id_num) = id.as_u64() {
494 if let Some(callback) =
495 req_to_callback.remove(&(id_num as u32))
496 {
497 if let Some(sub_id) = result.as_str() {
498 callbacks
499 .write()
500 .insert(sub_id.to_string(), callback);
501 active_subscriptions.write().insert(
502 sub_id.to_string(),
503 SubscriptionInfo {
504 sub_type: EVMSubscriptionType::NewHeads,
505 },
506 );
507 }
508 }
509 }
510 }
511
512 if data.get("method") == Some(&json!("eth_subscription")) {
514 if let Some(params) = data.get("params") {
515 if let Some(sub_id) =
516 params.get("subscription").and_then(|s| s.as_str())
517 {
518 if let Some(result) = params.get("result") {
519 let callbacks_read = callbacks.read();
520 if let Some(callback) =
521 callbacks_read.get(sub_id)
522 {
523 callback(result.clone());
524 }
525 }
526 }
527 }
528 }
529 }
530 }
531 Ok(Some(Ok(Message::Close(_)))) => {
532 break;
533 }
534 Ok(Some(Err(e))) => {
535 if let Some(ref cb) = on_error {
536 cb(e.to_string());
537 }
538 break;
539 }
540 Ok(None) => {
541 break;
542 }
543 Err(_) => {
544 if write.send(Message::Ping(vec![].into())).await.is_err() {
546 break;
547 }
548 }
549 _ => {}
550 }
551 }
552 }
553 Err(e) => {
554 if let Some(ref cb) = on_error {
555 cb(format!("Connection failed: {}", e));
556 }
557 }
558 }
559
560 if !running.load(Ordering::SeqCst) {
562 break;
563 }
564
565 if !config.reconnect {
566 break;
567 }
568
569 let attempts = reconnect_count.fetch_add(1, Ordering::SeqCst) + 1;
570 if let Some(max) = config.max_reconnect_attempts {
571 if attempts >= max {
572 break;
573 }
574 }
575
576 {
577 *state.write() = EVMConnectionState::Reconnecting;
578 }
579 if let Some(ref cb) = on_state_change {
580 cb(EVMConnectionState::Reconnecting);
581 }
582
583 sleep(backoff).await;
584 backoff = (backoff * 2).min(max_backoff);
585 }
586
587 {
588 *state.write() = EVMConnectionState::Disconnected;
589 }
590 if let Some(ref cb) = on_state_change {
591 cb(EVMConnectionState::Disconnected);
592 }
593 if let Some(ref cb) = on_close {
594 cb();
595 }
596 }
597}