Skip to main content

hyperliquid_sdk/
evm_stream.rs

1//! EVM WebSocket streaming client for HyperEVM.
2//!
3//! Stream EVM events via WebSocket on the /nanoreth namespace:
4//! - newHeads: New block headers
5//! - logs: Contract event logs
6//! - newPendingTransactions: Pending transaction hashes
7
8use futures_util::{SinkExt, StreamExt};
9use parking_lot::RwLock;
10use serde_json::{json, Value};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::time::sleep;
16use tokio_tungstenite::{connect_async, tungstenite::Message};
17
18use crate::error::Result;
19
20// ══════════════════════════════════════════════════════════════════════════════
21// EVM Subscription Types
22// ══════════════════════════════════════════════════════════════════════════════
23
24/// EVM WebSocket subscription types (eth_subscribe)
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub enum EVMSubscriptionType {
27    /// New block headers
28    NewHeads,
29    /// Contract event logs
30    Logs,
31    /// Pending transaction hashes
32    NewPendingTransactions,
33}
34
35impl EVMSubscriptionType {
36    /// Get the subscription type string for eth_subscribe
37    pub fn as_str(&self) -> &'static str {
38        match self {
39            EVMSubscriptionType::NewHeads => "newHeads",
40            EVMSubscriptionType::Logs => "logs",
41            EVMSubscriptionType::NewPendingTransactions => "newPendingTransactions",
42        }
43    }
44}
45
46// ══════════════════════════════════════════════════════════════════════════════
47// Connection State
48// ══════════════════════════════════════════════════════════════════════════════
49
50/// Connection state for EVM WebSocket
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum EVMConnectionState {
53    Disconnected,
54    Connecting,
55    Connected,
56    Reconnecting,
57}
58
59// ══════════════════════════════════════════════════════════════════════════════
60// EVM Subscription
61// ══════════════════════════════════════════════════════════════════════════════
62
63/// An EVM subscription handle
64#[derive(Debug, Clone)]
65pub struct EVMSubscription {
66    pub id: u32,
67    pub sub_type: EVMSubscriptionType,
68}
69
70// ══════════════════════════════════════════════════════════════════════════════
71// EVM Stream Configuration
72// ══════════════════════════════════════════════════════════════════════════════
73
74/// EVM stream configuration
75#[derive(Clone)]
76pub struct EVMStreamConfig {
77    pub endpoint: Option<String>,
78    pub reconnect: bool,
79    pub max_reconnect_attempts: Option<u32>,
80    pub reconnect_delay: Duration,
81    pub ping_interval: Duration,
82    pub ping_timeout: Duration,
83}
84
85impl Default for EVMStreamConfig {
86    fn default() -> Self {
87        Self {
88            endpoint: None,
89            reconnect: true,
90            max_reconnect_attempts: Some(10),
91            reconnect_delay: Duration::from_secs(1),
92            ping_interval: Duration::from_secs(30),
93            ping_timeout: Duration::from_secs(10),
94        }
95    }
96}
97
98// ══════════════════════════════════════════════════════════════════════════════
99// EVM Stream
100// ══════════════════════════════════════════════════════════════════════════════
101
102/// EVM WebSocket streaming client for HyperEVM
103///
104/// Stream EVM events via WebSocket on the /nanoreth namespace.
105///
106/// # Example
107///
108/// ```rust,no_run
109/// use hyperliquid_sdk::EVMStream;
110///
111/// #[tokio::main]
112/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
113///     let mut stream = EVMStream::new(Some("https://your-endpoint.quiknode.pro/TOKEN".to_string()));
114///
115///     // Subscribe to new block headers
116///     stream.new_heads(|header| {
117///         println!("New block: {:?}", header);
118///     });
119///
120///     // Subscribe to contract logs
121///     stream.logs(
122///         Some(serde_json::json!({"address": "0x..."})),
123///         |log| println!("Log: {:?}", log),
124///     );
125///
126///     stream.start()?;
127///     Ok(())
128/// }
129/// ```
130pub struct EVMStream {
131    config: EVMStreamConfig,
132    state: Arc<RwLock<EVMConnectionState>>,
133    running: Arc<AtomicBool>,
134    reconnect_count: Arc<AtomicU32>,
135    request_id: Arc<AtomicU32>,
136    pending_subscriptions: Arc<RwLock<Vec<PendingSubscription>>>,
137    active_subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
138    callbacks: Arc<RwLock<HashMap<String, Box<dyn Fn(Value) + Send + Sync>>>>,
139    on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
140    on_close: Option<Arc<dyn Fn() + Send + Sync>>,
141    on_open: Option<Arc<dyn Fn() + Send + Sync>>,
142    on_state_change: Option<Arc<dyn Fn(EVMConnectionState) + Send + Sync>>,
143}
144
145struct PendingSubscription {
146    sub_type: EVMSubscriptionType,
147    params: Option<Value>,
148    callback: Box<dyn Fn(Value) + Send + Sync>,
149}
150
151struct SubscriptionInfo {
152    #[allow(dead_code)]
153    sub_type: EVMSubscriptionType,
154}
155
156impl EVMStream {
157    /// Create a new EVM stream client
158    pub fn new(endpoint: Option<String>) -> Self {
159        Self {
160            config: EVMStreamConfig {
161                endpoint,
162                ..Default::default()
163            },
164            state: Arc::new(RwLock::new(EVMConnectionState::Disconnected)),
165            running: Arc::new(AtomicBool::new(false)),
166            reconnect_count: Arc::new(AtomicU32::new(0)),
167            request_id: Arc::new(AtomicU32::new(0)),
168            pending_subscriptions: Arc::new(RwLock::new(Vec::new())),
169            active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
170            callbacks: Arc::new(RwLock::new(HashMap::new())),
171            on_error: None,
172            on_close: None,
173            on_open: None,
174            on_state_change: None,
175        }
176    }
177
178    /// Configure stream options
179    pub fn configure(mut self, config: EVMStreamConfig) -> Self {
180        self.config = config;
181        self
182    }
183
184    /// Set error callback
185    pub fn on_error<F>(mut self, f: F) -> Self
186    where
187        F: Fn(String) + Send + Sync + 'static,
188    {
189        self.on_error = Some(Arc::new(f));
190        self
191    }
192
193    /// Set close callback
194    pub fn on_close<F>(mut self, f: F) -> Self
195    where
196        F: Fn() + Send + Sync + 'static,
197    {
198        self.on_close = Some(Arc::new(f));
199        self
200    }
201
202    /// Set open callback
203    pub fn on_open<F>(mut self, f: F) -> Self
204    where
205        F: Fn() + Send + Sync + 'static,
206    {
207        self.on_open = Some(Arc::new(f));
208        self
209    }
210
211    /// Set state change callback
212    pub fn on_state_change<F>(mut self, f: F) -> Self
213    where
214        F: Fn(EVMConnectionState) + Send + Sync + 'static,
215    {
216        self.on_state_change = Some(Arc::new(f));
217        self
218    }
219
220    /// Get current connection state
221    pub fn state(&self) -> EVMConnectionState {
222        *self.state.read()
223    }
224
225    /// Check if connected
226    pub fn connected(&self) -> bool {
227        *self.state.read() == EVMConnectionState::Connected
228    }
229
230    fn set_state(&self, state: EVMConnectionState) {
231        *self.state.write() = state;
232        if let Some(ref cb) = self.on_state_change {
233            cb(state);
234        }
235    }
236
237    fn get_ws_url(&self) -> String {
238        if let Some(ref endpoint) = self.config.endpoint {
239            // QuickNode endpoint - build /nanoreth WebSocket URL
240            let base = endpoint
241                .trim_end_matches('/')
242                .replace("https://", "wss://")
243                .replace("http://", "ws://")
244                .replace("/info", "")
245                .replace("/evm", "")
246                .replace("/hypercore", "");
247
248            // Extract token from path
249            if let Ok(url) = url::Url::parse(&base) {
250                if let Some(host) = url.host_str() {
251                    let path = url.path().trim_matches('/');
252                    let parts: Vec<&str> = path.split('/').collect();
253                    // Find the token (first part that's not a known path)
254                    for part in parts {
255                        if !part.is_empty()
256                            && !["info", "hypercore", "evm", "nanoreth", "ws"].contains(&part)
257                        {
258                            let scheme = if base.starts_with("wss") { "wss" } else { "ws" };
259                            return format!("{}://{}/{}/nanoreth", scheme, host, part);
260                        }
261                    }
262                }
263            }
264            format!("{}/nanoreth", base)
265        } else {
266            // No endpoint available
267            "wss://api.hyperliquid.xyz/nanoreth".to_string()
268        }
269    }
270
271    #[allow(dead_code)]
272    fn next_request_id(&self) -> u32 {
273        self.request_id.fetch_add(1, Ordering::SeqCst)
274    }
275
276    // ──────────────────────────────────────────────────────────────────────────
277    // Subscription Methods
278    // ──────────────────────────────────────────────────────────────────────────
279
280    /// Subscribe to new block headers
281    ///
282    /// Fires a notification each time a new header is appended to the chain,
283    /// including during chain reorganizations.
284    pub fn new_heads<F>(&mut self, callback: F) -> &mut Self
285    where
286        F: Fn(Value) + Send + Sync + 'static,
287    {
288        self.pending_subscriptions.write().push(PendingSubscription {
289            sub_type: EVMSubscriptionType::NewHeads,
290            params: None,
291            callback: Box::new(callback),
292        });
293        self
294    }
295
296    /// Subscribe to contract event logs
297    ///
298    /// Returns logs that are included in new imported blocks and match
299    /// the given filter criteria.
300    ///
301    /// # Arguments
302    ///
303    /// * `filter` - Filter parameters with optional `address` and `topics`
304    /// * `callback` - Function called with each matching log
305    pub fn logs<F>(&mut self, filter: Option<Value>, callback: F) -> &mut Self
306    where
307        F: Fn(Value) + Send + Sync + 'static,
308    {
309        self.pending_subscriptions.write().push(PendingSubscription {
310            sub_type: EVMSubscriptionType::Logs,
311            params: filter,
312            callback: Box::new(callback),
313        });
314        self
315    }
316
317    /// Subscribe to pending transaction hashes
318    ///
319    /// Returns the hash for all transactions that are added to the pending state.
320    pub fn new_pending_transactions<F>(&mut self, callback: F) -> &mut Self
321    where
322        F: Fn(Value) + Send + Sync + 'static,
323    {
324        self.pending_subscriptions
325            .write()
326            .push(PendingSubscription {
327                sub_type: EVMSubscriptionType::NewPendingTransactions,
328                params: None,
329                callback: Box::new(callback),
330            });
331        self
332    }
333
334    /// Get list of active subscription IDs
335    pub fn subscriptions(&self) -> Vec<String> {
336        self.active_subscriptions.read().keys().cloned().collect()
337    }
338
339    // ──────────────────────────────────────────────────────────────────────────
340    // Lifecycle
341    // ──────────────────────────────────────────────────────────────────────────
342
343    /// Start the stream in background (non-blocking)
344    pub fn start(&mut self) -> Result<()> {
345        if self.running.load(Ordering::SeqCst) {
346            return Ok(());
347        }
348
349        self.running.store(true, Ordering::SeqCst);
350
351        let ws_url = self.get_ws_url();
352        let state = self.state.clone();
353        let running = self.running.clone();
354        let reconnect_count = self.reconnect_count.clone();
355        let request_id = self.request_id.clone();
356        let pending_subscriptions = self.pending_subscriptions.clone();
357        let active_subscriptions = self.active_subscriptions.clone();
358        let callbacks = self.callbacks.clone();
359        let config = self.config.clone();
360        let on_error = self.on_error.clone();
361        let on_close = self.on_close.clone();
362        let on_open = self.on_open.clone();
363        let on_state_change = self.on_state_change.clone();
364
365        tokio::spawn(async move {
366            Self::run_loop(
367                ws_url,
368                state,
369                running,
370                reconnect_count,
371                request_id,
372                pending_subscriptions,
373                active_subscriptions,
374                callbacks,
375                config,
376                on_error,
377                on_close,
378                on_open,
379                on_state_change,
380            )
381            .await;
382        });
383
384        Ok(())
385    }
386
387    /// Run the stream (blocking)
388    pub async fn run(&mut self) -> Result<()> {
389        self.start()?;
390
391        while self.running.load(Ordering::SeqCst) {
392            sleep(Duration::from_millis(100)).await;
393        }
394
395        Ok(())
396    }
397
398    /// Stop the stream
399    pub fn stop(&mut self) {
400        self.running.store(false, Ordering::SeqCst);
401        self.set_state(EVMConnectionState::Disconnected);
402
403        if let Some(ref cb) = self.on_close {
404            cb();
405        }
406    }
407
408    async fn run_loop(
409        ws_url: String,
410        state: Arc<RwLock<EVMConnectionState>>,
411        running: Arc<AtomicBool>,
412        reconnect_count: Arc<AtomicU32>,
413        request_id: Arc<AtomicU32>,
414        pending_subscriptions: Arc<RwLock<Vec<PendingSubscription>>>,
415        active_subscriptions: Arc<RwLock<HashMap<String, SubscriptionInfo>>>,
416        callbacks: Arc<RwLock<HashMap<String, Box<dyn Fn(Value) + Send + Sync>>>>,
417        config: EVMStreamConfig,
418        on_error: Option<Arc<dyn Fn(String) + Send + Sync>>,
419        on_close: Option<Arc<dyn Fn() + Send + Sync>>,
420        on_open: Option<Arc<dyn Fn() + Send + Sync>>,
421        on_state_change: Option<Arc<dyn Fn(EVMConnectionState) + Send + Sync>>,
422    ) {
423        let mut backoff = config.reconnect_delay;
424        let max_backoff = Duration::from_secs(30);
425
426        while running.load(Ordering::SeqCst) {
427            // Update state
428            {
429                *state.write() = EVMConnectionState::Connecting;
430            }
431            if let Some(ref cb) = on_state_change {
432                cb(EVMConnectionState::Connecting);
433            }
434
435            // Connect
436            match connect_async(&ws_url).await {
437                Ok((ws_stream, _)) => {
438                    {
439                        *state.write() = EVMConnectionState::Connected;
440                    }
441                    if let Some(ref cb) = on_state_change {
442                        cb(EVMConnectionState::Connected);
443                    }
444                    if let Some(ref cb) = on_open {
445                        cb();
446                    }
447
448                    reconnect_count.store(0, Ordering::SeqCst);
449                    backoff = config.reconnect_delay;
450
451                    let (mut write, mut read) = ws_stream.split();
452
453                    // Send pending subscriptions
454                    let pending: Vec<_> = {
455                        let mut pending = pending_subscriptions.write();
456                        pending.drain(..).collect()
457                    };
458
459                    // Track request IDs for callback mapping
460                    let mut req_to_callback: HashMap<u32, Box<dyn Fn(Value) + Send + Sync>> =
461                        HashMap::new();
462
463                    for sub in pending {
464                        let req_id = request_id.fetch_add(1, Ordering::SeqCst);
465                        let mut params = vec![json!(sub.sub_type.as_str())];
466                        if let Some(p) = sub.params {
467                            params.push(p);
468                        }
469
470                        let msg = json!({
471                            "jsonrpc": "2.0",
472                            "method": "eth_subscribe",
473                            "params": params,
474                            "id": req_id,
475                        });
476
477                        req_to_callback.insert(req_id, sub.callback);
478
479                        if write.send(Message::Text(msg.to_string().into())).await.is_err() {
480                            break;
481                        }
482                    }
483
484                    // Message loop
485                    while running.load(Ordering::SeqCst) {
486                        match tokio::time::timeout(config.ping_timeout, read.next()).await {
487                            Ok(Some(Ok(Message::Text(text)))) => {
488                                if let Ok(data) = serde_json::from_str::<Value>(&text) {
489                                    // Check for subscription confirmation
490                                    if let (Some(id), Some(result)) =
491                                        (data.get("id"), data.get("result"))
492                                    {
493                                        if let Some(id_num) = id.as_u64() {
494                                            if let Some(callback) =
495                                                req_to_callback.remove(&(id_num as u32))
496                                            {
497                                                if let Some(sub_id) = result.as_str() {
498                                                    callbacks
499                                                        .write()
500                                                        .insert(sub_id.to_string(), callback);
501                                                    active_subscriptions.write().insert(
502                                                        sub_id.to_string(),
503                                                        SubscriptionInfo {
504                                                            sub_type: EVMSubscriptionType::NewHeads,
505                                                        },
506                                                    );
507                                                }
508                                            }
509                                        }
510                                    }
511
512                                    // Check for subscription data
513                                    if data.get("method") == Some(&json!("eth_subscription")) {
514                                        if let Some(params) = data.get("params") {
515                                            if let Some(sub_id) =
516                                                params.get("subscription").and_then(|s| s.as_str())
517                                            {
518                                                if let Some(result) = params.get("result") {
519                                                    let callbacks_read = callbacks.read();
520                                                    if let Some(callback) =
521                                                        callbacks_read.get(sub_id)
522                                                    {
523                                                        callback(result.clone());
524                                                    }
525                                                }
526                                            }
527                                        }
528                                    }
529                                }
530                            }
531                            Ok(Some(Ok(Message::Close(_)))) => {
532                                break;
533                            }
534                            Ok(Some(Err(e))) => {
535                                if let Some(ref cb) = on_error {
536                                    cb(e.to_string());
537                                }
538                                break;
539                            }
540                            Ok(None) => {
541                                break;
542                            }
543                            Err(_) => {
544                                // Timeout - send ping
545                                if write.send(Message::Ping(vec![].into())).await.is_err() {
546                                    break;
547                                }
548                            }
549                            _ => {}
550                        }
551                    }
552                }
553                Err(e) => {
554                    if let Some(ref cb) = on_error {
555                        cb(format!("Connection failed: {}", e));
556                    }
557                }
558            }
559
560            // Check if we should reconnect
561            if !running.load(Ordering::SeqCst) {
562                break;
563            }
564
565            if !config.reconnect {
566                break;
567            }
568
569            let attempts = reconnect_count.fetch_add(1, Ordering::SeqCst) + 1;
570            if let Some(max) = config.max_reconnect_attempts {
571                if attempts >= max {
572                    break;
573                }
574            }
575
576            {
577                *state.write() = EVMConnectionState::Reconnecting;
578            }
579            if let Some(ref cb) = on_state_change {
580                cb(EVMConnectionState::Reconnecting);
581            }
582
583            sleep(backoff).await;
584            backoff = (backoff * 2).min(max_backoff);
585        }
586
587        {
588            *state.write() = EVMConnectionState::Disconnected;
589        }
590        if let Some(ref cb) = on_state_change {
591            cb(EVMConnectionState::Disconnected);
592        }
593        if let Some(ref cb) = on_close {
594            cb();
595        }
596    }
597}