Skip to main content

drasi_source_hyperliquid/
stream.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! WebSocket streaming and REST polling for Hyperliquid.
16
17use crate::config::HyperliquidSourceConfig;
18use crate::mapping::{
19    map_funding_rate_to_changes, map_liquidation_to_changes, map_mid_prices_to_changes,
20    map_order_book_to_changes, map_trade_to_changes, InitializedEntities,
21};
22use crate::rest::HyperliquidRestClient;
23use crate::types::{FundingSnapshot, L2Book, Liquidation, Trade, WsMessage};
24use anyhow::{anyhow, Result};
25use drasi_lib::channels::{ChangeDispatcher, SourceEvent, SourceEventWrapper};
26use drasi_lib::profiling;
27use drasi_lib::sources::base::SourceBase;
28use drasi_lib::state_store::StateStoreProvider;
29use futures_util::{Sink, SinkExt, StreamExt};
30use log::{debug, info, warn};
31use std::collections::{HashMap, HashSet};
32use std::sync::Arc;
33use tokio::sync::{watch, RwLock};
34use tokio_tungstenite::tungstenite::Message;
35
36const MAX_BACKOFF_SECS: u64 = 60;
37
38#[derive(Clone, Default)]
39pub struct StreamState {
40    pub initialized: Arc<RwLock<InitializedEntities>>,
41    pub trade_dedupe: Arc<RwLock<HashMap<String, u64>>>,
42    pub funding_state: Arc<RwLock<HashMap<String, FundingSnapshot>>>,
43}
44
45pub struct WsStreamParams {
46    pub source_id: String,
47    pub ws_url: String,
48    pub config: HyperliquidSourceConfig,
49    pub coins: Vec<String>,
50    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
51    pub state_store: Option<Arc<dyn StateStoreProvider>>,
52    pub stream_state: StreamState,
53    pub shutdown_rx: watch::Receiver<bool>,
54    pub start_timestamp: Option<i64>,
55}
56
57pub struct FundingPollParams {
58    pub source_id: String,
59    pub rest_client: HyperliquidRestClient,
60    pub config: HyperliquidSourceConfig,
61    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
62    pub state_store: Option<Arc<dyn StateStoreProvider>>,
63    pub stream_state: StreamState,
64    pub shutdown_rx: watch::Receiver<bool>,
65    pub start_timestamp: Option<i64>,
66}
67
68pub async fn load_trade_dedupe_state(
69    source_id: &str,
70    state_store: &Option<Arc<dyn StateStoreProvider>>,
71    coins: &[String],
72) -> HashMap<String, u64> {
73    let mut map = HashMap::new();
74    if let Some(store) = state_store {
75        for coin in coins {
76            let key = format!("last_trade_tid:{coin}");
77            if let Ok(Some(bytes)) = store.get(source_id, &key).await {
78                if bytes.len() == 8 {
79                    let tid = u64::from_le_bytes(
80                        <[u8; 8]>::try_from(bytes.as_slice()).expect("len checked above"),
81                    );
82                    map.insert(coin.clone(), tid);
83                }
84            }
85        }
86    }
87    map
88}
89
90pub async fn load_funding_state(
91    source_id: &str,
92    state_store: &Option<Arc<dyn StateStoreProvider>>,
93    coins: &[String],
94) -> HashMap<String, FundingSnapshot> {
95    let mut map = HashMap::new();
96    if let Some(store) = state_store {
97        for coin in coins {
98            let key = format!("funding_state:{coin}");
99            if let Ok(Some(bytes)) = store.get(source_id, &key).await {
100                if let Ok(snapshot) = serde_json::from_slice::<FundingSnapshot>(&bytes) {
101                    map.insert(coin.clone(), snapshot);
102                }
103            }
104        }
105    }
106    map
107}
108
109pub async fn run_ws_stream(params: WsStreamParams) -> Result<()> {
110    let WsStreamParams {
111        source_id,
112        ws_url,
113        config,
114        coins,
115        dispatchers,
116        state_store,
117        stream_state,
118        mut shutdown_rx,
119        start_timestamp,
120    } = params;
121    let mut backoff = 1u64;
122    let coin_filter: Option<HashSet<String>> = match &config.coins {
123        crate::config::CoinSelection::Specific { coins } => Some(coins.iter().cloned().collect()),
124        crate::config::CoinSelection::All => None,
125    };
126
127    loop {
128        if *shutdown_rx.borrow() {
129            info!("Hyperliquid WS stream received shutdown");
130            break;
131        }
132
133        info!("Connecting to Hyperliquid WebSocket at {ws_url}");
134        match tokio_tungstenite::connect_async(&ws_url).await {
135            Ok((ws_stream, _)) => {
136                info!("Hyperliquid WebSocket connected");
137                backoff = 1;
138
139                let (mut write, mut read) = ws_stream.split();
140                subscribe_to_channels(&mut write, &config, &coins).await?;
141
142                loop {
143                    tokio::select! {
144                        _ = shutdown_rx.changed() => {
145                            info!("Hyperliquid WS shutdown signal received");
146                            let _ = write.send(Message::Close(None)).await;
147                            return Ok(());
148                        }
149                        msg = read.next() => {
150                            match msg {
151                                Some(Ok(Message::Text(text))) => {
152                                    if let Err(e) = handle_ws_message(
153                                        &source_id,
154                                        &dispatchers,
155                                        &state_store,
156                                        &stream_state,
157                                        &coin_filter,
158                                        &text,
159                                        start_timestamp,
160                                    ).await {
161                                        warn!("Failed to handle WS message: {e}");
162                                    }
163                                }
164                                Some(Ok(Message::Ping(payload))) => {
165                                    write.send(Message::Pong(payload)).await.ok();
166                                }
167                                Some(Ok(Message::Pong(_))) => {}
168                                Some(Ok(Message::Close(_))) => {
169                                    warn!("WebSocket closed by server");
170                                    break;
171                                }
172                                Some(Ok(_)) => {}
173                                Some(Err(e)) => {
174                                    warn!("WebSocket error: {e}");
175                                    break;
176                                }
177                                None => {
178                                    warn!("WebSocket stream ended");
179                                    break;
180                                }
181                            }
182                        }
183                    }
184                }
185            }
186            Err(e) => {
187                warn!("WebSocket connect failed: {e}");
188            }
189        }
190
191        if *shutdown_rx.borrow() {
192            break;
193        }
194
195        let wait = std::time::Duration::from_secs(backoff);
196        warn!("Reconnecting in {backoff}s...");
197        tokio::select! {
198            _ = tokio::time::sleep(wait) => {},
199            _ = shutdown_rx.changed() => {
200                break;
201            }
202        }
203        backoff = std::cmp::min(backoff * 2, MAX_BACKOFF_SECS);
204    }
205
206    Ok(())
207}
208
209async fn subscribe_to_channels<W>(
210    write: &mut W,
211    config: &HyperliquidSourceConfig,
212    coins: &[String],
213) -> Result<()>
214where
215    W: Sink<Message> + Unpin,
216    <W as Sink<Message>>::Error: std::fmt::Display,
217{
218    if config.enable_trades {
219        for coin in coins {
220            let msg = serde_json::json!({
221                "method": "subscribe",
222                "subscription": { "type": "trades", "coin": coin }
223            });
224            write
225                .send(Message::Text(msg.to_string()))
226                .await
227                .map_err(|e| anyhow!("WebSocket subscribe to trades/{coin} failed: {e}"))?;
228        }
229    }
230
231    if config.enable_order_book {
232        for coin in coins {
233            let msg = serde_json::json!({
234                "method": "subscribe",
235                "subscription": { "type": "l2Book", "coin": coin }
236            });
237            write
238                .send(Message::Text(msg.to_string()))
239                .await
240                .map_err(|e| anyhow!("WebSocket subscribe to l2Book/{coin} failed: {e}"))?;
241        }
242    }
243
244    if config.enable_mid_prices {
245        let msg = serde_json::json!({
246            "method": "subscribe",
247            "subscription": { "type": "allMids" }
248        });
249        write
250            .send(Message::Text(msg.to_string()))
251            .await
252            .map_err(|e| anyhow!("WebSocket subscribe to allMids failed: {e}"))?;
253    }
254
255    if config.enable_liquidations {
256        let msg = serde_json::json!({
257            "method": "subscribe",
258            "subscription": { "type": "liquidations" }
259        });
260        write
261            .send(Message::Text(msg.to_string()))
262            .await
263            .map_err(|e| anyhow!("WebSocket subscribe to liquidations failed: {e}"))?;
264    }
265
266    Ok(())
267}
268
269async fn handle_ws_message(
270    source_id: &str,
271    dispatchers: &Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
272    state_store: &Option<Arc<dyn StateStoreProvider>>,
273    stream_state: &StreamState,
274    coin_filter: &Option<HashSet<String>>,
275    text: &str,
276    start_timestamp: Option<i64>,
277) -> Result<()> {
278    let msg: WsMessage = serde_json::from_str(text)?;
279
280    match msg.channel.as_str() {
281        "subscriptionResponse" => {
282            debug!("Subscription confirmed: {text}");
283        }
284        "trades" => {
285            let trades: Vec<Trade> = serde_json::from_value(msg.data)?;
286            for trade in trades {
287                if !passes_start_timestamp(start_timestamp, trade.time) {
288                    continue;
289                }
290
291                if let Some(filter) = coin_filter {
292                    if !filter.contains(&trade.coin) {
293                        continue;
294                    }
295                }
296
297                if !should_emit_trade(&trade, stream_state, state_store, source_id).await {
298                    continue;
299                }
300
301                let changes = map_trade_to_changes(source_id, &trade)?;
302                dispatch_changes(source_id, dispatchers, changes).await;
303            }
304        }
305        "l2Book" => {
306            let book: L2Book = serde_json::from_value(msg.data)?;
307            if !passes_start_timestamp(start_timestamp, book.time) {
308                return Ok(());
309            }
310            if let Some(filter) = coin_filter {
311                if !filter.contains(&book.coin) {
312                    return Ok(());
313                }
314            }
315
316            let changes = {
317                let mut initialized = stream_state.initialized.write().await;
318                map_order_book_to_changes(source_id, &book, &mut initialized)?
319            };
320            dispatch_changes(source_id, dispatchers, changes).await;
321        }
322        "allMids" => {
323            let mids_value = msg
324                .data
325                .get("mids")
326                .cloned()
327                .unwrap_or_else(|| msg.data.clone());
328            let mids: HashMap<String, String> = serde_json::from_value(mids_value)?;
329            let timestamp = chrono::Utc::now().timestamp_millis();
330            if !passes_start_timestamp(start_timestamp, timestamp) {
331                return Ok(());
332            }
333            let filtered = filter_mids(mids, coin_filter);
334            let changes = {
335                let mut initialized = stream_state.initialized.write().await;
336                map_mid_prices_to_changes(source_id, &filtered, &mut initialized, timestamp)?
337            };
338            dispatch_changes(source_id, dispatchers, changes).await;
339        }
340        "liquidations" => {
341            let liquidations: Vec<Liquidation> = serde_json::from_value(msg.data)?;
342            for liquidation in liquidations {
343                if !passes_start_timestamp(start_timestamp, liquidation.time) {
344                    continue;
345                }
346                if let Some(filter) = coin_filter {
347                    if !filter.contains(&liquidation.coin) {
348                        continue;
349                    }
350                }
351                let changes = map_liquidation_to_changes(source_id, &liquidation)?;
352                dispatch_changes(source_id, dispatchers, changes).await;
353            }
354        }
355        _ => {
356            debug!("Ignoring channel {}", msg.channel);
357        }
358    }
359
360    Ok(())
361}
362
363fn filter_mids(
364    mids: HashMap<String, String>,
365    coin_filter: &Option<HashSet<String>>,
366) -> HashMap<String, String> {
367    if let Some(filter) = coin_filter {
368        mids.into_iter()
369            .filter(|(coin, _)| filter.contains(coin))
370            .collect()
371    } else {
372        mids
373    }
374}
375
376fn passes_start_timestamp(start_timestamp: Option<i64>, event_time: i64) -> bool {
377    match start_timestamp {
378        Some(start) => event_time >= start,
379        None => true,
380    }
381}
382
383async fn should_emit_trade(
384    trade: &Trade,
385    stream_state: &StreamState,
386    state_store: &Option<Arc<dyn StateStoreProvider>>,
387    source_id: &str,
388) -> bool {
389    let should_emit = {
390        let mut trade_state = stream_state.trade_dedupe.write().await;
391        let last_tid = trade_state.get(&trade.coin).copied().unwrap_or_else(|| {
392            debug!(
393                "No prior trade dedup state for coin '{}' — accepting all trades",
394                trade.coin
395            );
396            0
397        });
398        if trade.tid <= last_tid {
399            return false;
400        }
401        trade_state.insert(trade.coin.clone(), trade.tid);
402        true
403    };
404    // Lock is dropped — safe to .await on state store persistence.
405
406    if should_emit {
407        if let Some(store) = state_store {
408            let key = format!("last_trade_tid:{}", trade.coin);
409            if let Err(e) = store
410                .set(source_id, &key, trade.tid.to_le_bytes().to_vec())
411                .await
412            {
413                warn!("Failed to persist trade tid: {e}");
414            }
415        }
416    }
417
418    should_emit
419}
420
421async fn dispatch_changes(
422    source_id: &str,
423    dispatchers: &Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
424    changes: Vec<drasi_core::models::SourceChange>,
425) {
426    for change in changes {
427        let mut profiling = profiling::ProfilingMetadata::new();
428        profiling.source_send_ns = Some(profiling::timestamp_ns());
429
430        let wrapper = SourceEventWrapper::with_profiling(
431            source_id.to_string(),
432            SourceEvent::Change(change),
433            chrono::Utc::now(),
434            profiling,
435        );
436
437        if let Err(e) =
438            SourceBase::dispatch_from_task(dispatchers.clone(), wrapper, source_id).await
439        {
440            debug!("[{source_id}] Dispatch failed (no subscribers): {e}");
441        }
442    }
443}
444
445pub async fn run_funding_poll(params: FundingPollParams) -> Result<()> {
446    let FundingPollParams {
447        source_id,
448        rest_client,
449        config,
450        dispatchers,
451        state_store,
452        stream_state,
453        mut shutdown_rx,
454        start_timestamp,
455    } = params;
456    let interval_secs = config.funding_poll_interval_secs;
457    let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
458    let coin_filter: Option<HashSet<String>> = match &config.coins {
459        crate::config::CoinSelection::Specific { coins } => Some(coins.iter().cloned().collect()),
460        crate::config::CoinSelection::All => None,
461    };
462
463    loop {
464        tokio::select! {
465            _ = shutdown_rx.changed() => {
466                info!("Funding poll shutdown signal received");
467                break;
468            }
469            _ = interval.tick() => {
470                let timestamp = chrono::Utc::now().timestamp_millis();
471                if !passes_start_timestamp(start_timestamp, timestamp) {
472                    continue;
473                }
474                match rest_client.fetch_meta_and_asset_ctxs().await {
475                    Ok((meta, ctxs)) => {
476                        // Collect changes and persistence tasks while holding locks,
477                        // then release before async I/O.
478                        let to_dispatch: Vec<(Vec<drasi_core::models::SourceChange>, String, FundingSnapshot)>;
479                        {
480                            let mut initialized = stream_state.initialized.write().await;
481                            let mut funding_state = stream_state.funding_state.write().await;
482                            let mut pending = Vec::new();
483
484                            for (asset, ctx) in meta.universe.iter().zip(ctxs.iter()) {
485                                if let Some(filter) = &coin_filter {
486                                    if !filter.contains(&asset.name) {
487                                        continue;
488                                    }
489                                }
490
491                                let (changes, snapshot) = map_funding_rate_to_changes(
492                                    &source_id,
493                                    &asset.name,
494                                    ctx,
495                                    &mut initialized,
496                                    timestamp,
497                                )?;
498
499                                if let Some(previous) = funding_state.get(&asset.name) {
500                                    if previous == &snapshot {
501                                        continue;
502                                    }
503                                }
504
505                                funding_state.insert(asset.name.clone(), snapshot.clone());
506                                pending.push((changes, asset.name.clone(), snapshot));
507                            }
508                            to_dispatch = pending;
509                        }
510                        // Locks dropped — safe to .await on persist and dispatch.
511                        for (changes, coin, snapshot) in to_dispatch {
512                            persist_funding_snapshot(&state_store, &source_id, &coin, &snapshot).await;
513                            dispatch_changes(&source_id, &dispatchers, changes).await;
514                        }
515                    }
516                    Err(e) => {
517                        warn!("Funding poll failed: {e}");
518                    }
519                }
520            }
521        }
522    }
523
524    Ok(())
525}
526
527async fn persist_funding_snapshot(
528    state_store: &Option<Arc<dyn StateStoreProvider>>,
529    source_id: &str,
530    coin: &str,
531    snapshot: &FundingSnapshot,
532) {
533    if let Some(store) = state_store {
534        let key = format!("funding_state:{coin}");
535        match serde_json::to_vec(snapshot) {
536            Ok(bytes) => {
537                if let Err(e) = store.set(source_id, &key, bytes).await {
538                    warn!("Failed to persist funding snapshot: {e}");
539                }
540            }
541            Err(e) => warn!("Failed to serialize funding snapshot: {e}"),
542        }
543    }
544}