1use crate::config::HyperliquidSourceConfig;
18use crate::mapping::{
19 map_funding_rate_to_changes, map_liquidation_to_changes, map_mid_prices_to_changes,
20 map_order_book_to_changes, map_trade_to_changes, InitializedEntities,
21};
22use crate::rest::HyperliquidRestClient;
23use crate::types::{FundingSnapshot, L2Book, Liquidation, Trade, WsMessage};
24use anyhow::{anyhow, Result};
25use drasi_lib::channels::{ChangeDispatcher, SourceEvent, SourceEventWrapper};
26use drasi_lib::profiling;
27use drasi_lib::sources::base::SourceBase;
28use drasi_lib::state_store::StateStoreProvider;
29use futures_util::{Sink, SinkExt, StreamExt};
30use log::{debug, info, warn};
31use std::collections::{HashMap, HashSet};
32use std::sync::Arc;
33use tokio::sync::{watch, RwLock};
34use tokio_tungstenite::tungstenite::Message;
35
36const MAX_BACKOFF_SECS: u64 = 60;
37
38#[derive(Clone, Default)]
39pub struct StreamState {
40 pub initialized: Arc<RwLock<InitializedEntities>>,
41 pub trade_dedupe: Arc<RwLock<HashMap<String, u64>>>,
42 pub funding_state: Arc<RwLock<HashMap<String, FundingSnapshot>>>,
43}
44
45pub struct WsStreamParams {
46 pub source_id: String,
47 pub ws_url: String,
48 pub config: HyperliquidSourceConfig,
49 pub coins: Vec<String>,
50 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
51 pub state_store: Option<Arc<dyn StateStoreProvider>>,
52 pub stream_state: StreamState,
53 pub shutdown_rx: watch::Receiver<bool>,
54 pub start_timestamp: Option<i64>,
55}
56
57pub struct FundingPollParams {
58 pub source_id: String,
59 pub rest_client: HyperliquidRestClient,
60 pub config: HyperliquidSourceConfig,
61 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
62 pub state_store: Option<Arc<dyn StateStoreProvider>>,
63 pub stream_state: StreamState,
64 pub shutdown_rx: watch::Receiver<bool>,
65 pub start_timestamp: Option<i64>,
66}
67
68pub async fn load_trade_dedupe_state(
69 source_id: &str,
70 state_store: &Option<Arc<dyn StateStoreProvider>>,
71 coins: &[String],
72) -> HashMap<String, u64> {
73 let mut map = HashMap::new();
74 if let Some(store) = state_store {
75 for coin in coins {
76 let key = format!("last_trade_tid:{coin}");
77 if let Ok(Some(bytes)) = store.get(source_id, &key).await {
78 if bytes.len() == 8 {
79 let tid = u64::from_le_bytes(
80 <[u8; 8]>::try_from(bytes.as_slice()).expect("len checked above"),
81 );
82 map.insert(coin.clone(), tid);
83 }
84 }
85 }
86 }
87 map
88}
89
90pub async fn load_funding_state(
91 source_id: &str,
92 state_store: &Option<Arc<dyn StateStoreProvider>>,
93 coins: &[String],
94) -> HashMap<String, FundingSnapshot> {
95 let mut map = HashMap::new();
96 if let Some(store) = state_store {
97 for coin in coins {
98 let key = format!("funding_state:{coin}");
99 if let Ok(Some(bytes)) = store.get(source_id, &key).await {
100 if let Ok(snapshot) = serde_json::from_slice::<FundingSnapshot>(&bytes) {
101 map.insert(coin.clone(), snapshot);
102 }
103 }
104 }
105 }
106 map
107}
108
109pub async fn run_ws_stream(params: WsStreamParams) -> Result<()> {
110 let WsStreamParams {
111 source_id,
112 ws_url,
113 config,
114 coins,
115 dispatchers,
116 state_store,
117 stream_state,
118 mut shutdown_rx,
119 start_timestamp,
120 } = params;
121 let mut backoff = 1u64;
122 let coin_filter: Option<HashSet<String>> = match &config.coins {
123 crate::config::CoinSelection::Specific { coins } => Some(coins.iter().cloned().collect()),
124 crate::config::CoinSelection::All => None,
125 };
126
127 loop {
128 if *shutdown_rx.borrow() {
129 info!("Hyperliquid WS stream received shutdown");
130 break;
131 }
132
133 info!("Connecting to Hyperliquid WebSocket at {ws_url}");
134 match tokio_tungstenite::connect_async(&ws_url).await {
135 Ok((ws_stream, _)) => {
136 info!("Hyperliquid WebSocket connected");
137 backoff = 1;
138
139 let (mut write, mut read) = ws_stream.split();
140 subscribe_to_channels(&mut write, &config, &coins).await?;
141
142 loop {
143 tokio::select! {
144 _ = shutdown_rx.changed() => {
145 info!("Hyperliquid WS shutdown signal received");
146 let _ = write.send(Message::Close(None)).await;
147 return Ok(());
148 }
149 msg = read.next() => {
150 match msg {
151 Some(Ok(Message::Text(text))) => {
152 if let Err(e) = handle_ws_message(
153 &source_id,
154 &dispatchers,
155 &state_store,
156 &stream_state,
157 &coin_filter,
158 &text,
159 start_timestamp,
160 ).await {
161 warn!("Failed to handle WS message: {e}");
162 }
163 }
164 Some(Ok(Message::Ping(payload))) => {
165 write.send(Message::Pong(payload)).await.ok();
166 }
167 Some(Ok(Message::Pong(_))) => {}
168 Some(Ok(Message::Close(_))) => {
169 warn!("WebSocket closed by server");
170 break;
171 }
172 Some(Ok(_)) => {}
173 Some(Err(e)) => {
174 warn!("WebSocket error: {e}");
175 break;
176 }
177 None => {
178 warn!("WebSocket stream ended");
179 break;
180 }
181 }
182 }
183 }
184 }
185 }
186 Err(e) => {
187 warn!("WebSocket connect failed: {e}");
188 }
189 }
190
191 if *shutdown_rx.borrow() {
192 break;
193 }
194
195 let wait = std::time::Duration::from_secs(backoff);
196 warn!("Reconnecting in {backoff}s...");
197 tokio::select! {
198 _ = tokio::time::sleep(wait) => {},
199 _ = shutdown_rx.changed() => {
200 break;
201 }
202 }
203 backoff = std::cmp::min(backoff * 2, MAX_BACKOFF_SECS);
204 }
205
206 Ok(())
207}
208
209async fn subscribe_to_channels<W>(
210 write: &mut W,
211 config: &HyperliquidSourceConfig,
212 coins: &[String],
213) -> Result<()>
214where
215 W: Sink<Message> + Unpin,
216 <W as Sink<Message>>::Error: std::fmt::Display,
217{
218 if config.enable_trades {
219 for coin in coins {
220 let msg = serde_json::json!({
221 "method": "subscribe",
222 "subscription": { "type": "trades", "coin": coin }
223 });
224 write
225 .send(Message::Text(msg.to_string()))
226 .await
227 .map_err(|e| anyhow!("WebSocket subscribe to trades/{coin} failed: {e}"))?;
228 }
229 }
230
231 if config.enable_order_book {
232 for coin in coins {
233 let msg = serde_json::json!({
234 "method": "subscribe",
235 "subscription": { "type": "l2Book", "coin": coin }
236 });
237 write
238 .send(Message::Text(msg.to_string()))
239 .await
240 .map_err(|e| anyhow!("WebSocket subscribe to l2Book/{coin} failed: {e}"))?;
241 }
242 }
243
244 if config.enable_mid_prices {
245 let msg = serde_json::json!({
246 "method": "subscribe",
247 "subscription": { "type": "allMids" }
248 });
249 write
250 .send(Message::Text(msg.to_string()))
251 .await
252 .map_err(|e| anyhow!("WebSocket subscribe to allMids failed: {e}"))?;
253 }
254
255 if config.enable_liquidations {
256 let msg = serde_json::json!({
257 "method": "subscribe",
258 "subscription": { "type": "liquidations" }
259 });
260 write
261 .send(Message::Text(msg.to_string()))
262 .await
263 .map_err(|e| anyhow!("WebSocket subscribe to liquidations failed: {e}"))?;
264 }
265
266 Ok(())
267}
268
269async fn handle_ws_message(
270 source_id: &str,
271 dispatchers: &Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
272 state_store: &Option<Arc<dyn StateStoreProvider>>,
273 stream_state: &StreamState,
274 coin_filter: &Option<HashSet<String>>,
275 text: &str,
276 start_timestamp: Option<i64>,
277) -> Result<()> {
278 let msg: WsMessage = serde_json::from_str(text)?;
279
280 match msg.channel.as_str() {
281 "subscriptionResponse" => {
282 debug!("Subscription confirmed: {text}");
283 }
284 "trades" => {
285 let trades: Vec<Trade> = serde_json::from_value(msg.data)?;
286 for trade in trades {
287 if !passes_start_timestamp(start_timestamp, trade.time) {
288 continue;
289 }
290
291 if let Some(filter) = coin_filter {
292 if !filter.contains(&trade.coin) {
293 continue;
294 }
295 }
296
297 if !should_emit_trade(&trade, stream_state, state_store, source_id).await {
298 continue;
299 }
300
301 let changes = map_trade_to_changes(source_id, &trade)?;
302 dispatch_changes(source_id, dispatchers, changes).await;
303 }
304 }
305 "l2Book" => {
306 let book: L2Book = serde_json::from_value(msg.data)?;
307 if !passes_start_timestamp(start_timestamp, book.time) {
308 return Ok(());
309 }
310 if let Some(filter) = coin_filter {
311 if !filter.contains(&book.coin) {
312 return Ok(());
313 }
314 }
315
316 let changes = {
317 let mut initialized = stream_state.initialized.write().await;
318 map_order_book_to_changes(source_id, &book, &mut initialized)?
319 };
320 dispatch_changes(source_id, dispatchers, changes).await;
321 }
322 "allMids" => {
323 let mids_value = msg
324 .data
325 .get("mids")
326 .cloned()
327 .unwrap_or_else(|| msg.data.clone());
328 let mids: HashMap<String, String> = serde_json::from_value(mids_value)?;
329 let timestamp = chrono::Utc::now().timestamp_millis();
330 if !passes_start_timestamp(start_timestamp, timestamp) {
331 return Ok(());
332 }
333 let filtered = filter_mids(mids, coin_filter);
334 let changes = {
335 let mut initialized = stream_state.initialized.write().await;
336 map_mid_prices_to_changes(source_id, &filtered, &mut initialized, timestamp)?
337 };
338 dispatch_changes(source_id, dispatchers, changes).await;
339 }
340 "liquidations" => {
341 let liquidations: Vec<Liquidation> = serde_json::from_value(msg.data)?;
342 for liquidation in liquidations {
343 if !passes_start_timestamp(start_timestamp, liquidation.time) {
344 continue;
345 }
346 if let Some(filter) = coin_filter {
347 if !filter.contains(&liquidation.coin) {
348 continue;
349 }
350 }
351 let changes = map_liquidation_to_changes(source_id, &liquidation)?;
352 dispatch_changes(source_id, dispatchers, changes).await;
353 }
354 }
355 _ => {
356 debug!("Ignoring channel {}", msg.channel);
357 }
358 }
359
360 Ok(())
361}
362
363fn filter_mids(
364 mids: HashMap<String, String>,
365 coin_filter: &Option<HashSet<String>>,
366) -> HashMap<String, String> {
367 if let Some(filter) = coin_filter {
368 mids.into_iter()
369 .filter(|(coin, _)| filter.contains(coin))
370 .collect()
371 } else {
372 mids
373 }
374}
375
376fn passes_start_timestamp(start_timestamp: Option<i64>, event_time: i64) -> bool {
377 match start_timestamp {
378 Some(start) => event_time >= start,
379 None => true,
380 }
381}
382
383async fn should_emit_trade(
384 trade: &Trade,
385 stream_state: &StreamState,
386 state_store: &Option<Arc<dyn StateStoreProvider>>,
387 source_id: &str,
388) -> bool {
389 let should_emit = {
390 let mut trade_state = stream_state.trade_dedupe.write().await;
391 let last_tid = trade_state.get(&trade.coin).copied().unwrap_or_else(|| {
392 debug!(
393 "No prior trade dedup state for coin '{}' — accepting all trades",
394 trade.coin
395 );
396 0
397 });
398 if trade.tid <= last_tid {
399 return false;
400 }
401 trade_state.insert(trade.coin.clone(), trade.tid);
402 true
403 };
404 if should_emit {
407 if let Some(store) = state_store {
408 let key = format!("last_trade_tid:{}", trade.coin);
409 if let Err(e) = store
410 .set(source_id, &key, trade.tid.to_le_bytes().to_vec())
411 .await
412 {
413 warn!("Failed to persist trade tid: {e}");
414 }
415 }
416 }
417
418 should_emit
419}
420
421async fn dispatch_changes(
422 source_id: &str,
423 dispatchers: &Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
424 changes: Vec<drasi_core::models::SourceChange>,
425) {
426 for change in changes {
427 let mut profiling = profiling::ProfilingMetadata::new();
428 profiling.source_send_ns = Some(profiling::timestamp_ns());
429
430 let wrapper = SourceEventWrapper::with_profiling(
431 source_id.to_string(),
432 SourceEvent::Change(change),
433 chrono::Utc::now(),
434 profiling,
435 );
436
437 if let Err(e) =
438 SourceBase::dispatch_from_task(dispatchers.clone(), wrapper, source_id).await
439 {
440 debug!("[{source_id}] Dispatch failed (no subscribers): {e}");
441 }
442 }
443}
444
445pub async fn run_funding_poll(params: FundingPollParams) -> Result<()> {
446 let FundingPollParams {
447 source_id,
448 rest_client,
449 config,
450 dispatchers,
451 state_store,
452 stream_state,
453 mut shutdown_rx,
454 start_timestamp,
455 } = params;
456 let interval_secs = config.funding_poll_interval_secs;
457 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
458 let coin_filter: Option<HashSet<String>> = match &config.coins {
459 crate::config::CoinSelection::Specific { coins } => Some(coins.iter().cloned().collect()),
460 crate::config::CoinSelection::All => None,
461 };
462
463 loop {
464 tokio::select! {
465 _ = shutdown_rx.changed() => {
466 info!("Funding poll shutdown signal received");
467 break;
468 }
469 _ = interval.tick() => {
470 let timestamp = chrono::Utc::now().timestamp_millis();
471 if !passes_start_timestamp(start_timestamp, timestamp) {
472 continue;
473 }
474 match rest_client.fetch_meta_and_asset_ctxs().await {
475 Ok((meta, ctxs)) => {
476 let to_dispatch: Vec<(Vec<drasi_core::models::SourceChange>, String, FundingSnapshot)>;
479 {
480 let mut initialized = stream_state.initialized.write().await;
481 let mut funding_state = stream_state.funding_state.write().await;
482 let mut pending = Vec::new();
483
484 for (asset, ctx) in meta.universe.iter().zip(ctxs.iter()) {
485 if let Some(filter) = &coin_filter {
486 if !filter.contains(&asset.name) {
487 continue;
488 }
489 }
490
491 let (changes, snapshot) = map_funding_rate_to_changes(
492 &source_id,
493 &asset.name,
494 ctx,
495 &mut initialized,
496 timestamp,
497 )?;
498
499 if let Some(previous) = funding_state.get(&asset.name) {
500 if previous == &snapshot {
501 continue;
502 }
503 }
504
505 funding_state.insert(asset.name.clone(), snapshot.clone());
506 pending.push((changes, asset.name.clone(), snapshot));
507 }
508 to_dispatch = pending;
509 }
510 for (changes, coin, snapshot) in to_dispatch {
512 persist_funding_snapshot(&state_store, &source_id, &coin, &snapshot).await;
513 dispatch_changes(&source_id, &dispatchers, changes).await;
514 }
515 }
516 Err(e) => {
517 warn!("Funding poll failed: {e}");
518 }
519 }
520 }
521 }
522 }
523
524 Ok(())
525}
526
527async fn persist_funding_snapshot(
528 state_store: &Option<Arc<dyn StateStoreProvider>>,
529 source_id: &str,
530 coin: &str,
531 snapshot: &FundingSnapshot,
532) {
533 if let Some(store) = state_store {
534 let key = format!("funding_state:{coin}");
535 match serde_json::to_vec(snapshot) {
536 Ok(bytes) => {
537 if let Err(e) = store.set(source_id, &key, bytes).await {
538 warn!("Failed to persist funding snapshot: {e}");
539 }
540 }
541 Err(e) => warn!("Failed to serialize funding snapshot: {e}"),
542 }
543 }
544}