Skip to main content

drasi_source_hyperliquid/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Hyperliquid market data source plugin for Drasi.
17//!
18//! This source connects to Hyperliquid's public REST + WebSocket APIs to stream
19//! real-time DeFi market data into Drasi as graph nodes and relationships.
20
21pub mod config;
22pub mod descriptor;
23pub mod mapping;
24pub mod rest;
25pub mod stream;
26pub mod types;
27
28#[cfg(test)]
29mod tests;
30
31pub use config::{CoinSelection, HyperliquidNetwork, HyperliquidSourceConfig, InitialCursor};
32
33use anyhow::Result;
34use async_trait::async_trait;
35use drasi_core::models::{Element, SourceChange};
36use drasi_lib::bootstrap::{
37    BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
38};
39use drasi_lib::channels::{
40    BootstrapEvent, BootstrapEventSender, ComponentStatus, DispatchMode, SubscriptionResponse,
41};
42use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
43use drasi_lib::state_store::{MemoryStateStoreProvider, StateStoreProvider};
44use drasi_lib::Source;
45use log::{info, warn};
46use std::collections::{HashMap, HashSet};
47use std::sync::Arc;
48use tokio::sync::{watch, RwLock};
49use tracing::Instrument;
50
51use crate::mapping::{
52    map_funding_rate_to_changes, map_meta_to_coin_changes, map_mid_prices_to_changes,
53    map_order_book_to_changes, map_spot_meta_to_nodes, InitializedEntities,
54};
55use crate::rest::HyperliquidRestClient;
56use crate::stream::{
57    load_funding_state, load_trade_dedupe_state, run_funding_poll, run_ws_stream,
58    FundingPollParams, StreamState, WsStreamParams,
59};
60
61/// Hyperliquid market data source.
62pub struct HyperliquidSource {
63    base: SourceBase,
64    config: HyperliquidSourceConfig,
65    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
66    stream_state: StreamState,
67    shutdown_tx: watch::Sender<bool>,
68    shutdown_rx: watch::Receiver<bool>,
69    task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
70}
71
72impl HyperliquidSource {
73    /// Create a builder for HyperliquidSource.
74    pub fn builder(id: impl Into<String>) -> HyperliquidSourceBuilder {
75        HyperliquidSourceBuilder::new(id)
76    }
77}
78
79#[async_trait]
80impl Source for HyperliquidSource {
81    fn id(&self) -> &str {
82        &self.base.id
83    }
84
85    fn type_name(&self) -> &str {
86        "hyperliquid"
87    }
88
89    fn properties(&self) -> HashMap<String, serde_json::Value> {
90        let mut props = HashMap::new();
91        props.insert(
92            "network".to_string(),
93            serde_json::Value::String(format!("{:?}", self.config.network)),
94        );
95
96        let coins_value = match &self.config.coins {
97            CoinSelection::Specific { coins } => serde_json::Value::Array(
98                coins
99                    .iter()
100                    .cloned()
101                    .map(serde_json::Value::String)
102                    .collect(),
103            ),
104            CoinSelection::All => serde_json::Value::String("all".to_string()),
105        };
106        props.insert("coins".to_string(), coins_value);
107
108        props.insert(
109            "enable_trades".to_string(),
110            serde_json::Value::Bool(self.config.enable_trades),
111        );
112        props.insert(
113            "enable_order_book".to_string(),
114            serde_json::Value::Bool(self.config.enable_order_book),
115        );
116        props.insert(
117            "enable_mid_prices".to_string(),
118            serde_json::Value::Bool(self.config.enable_mid_prices),
119        );
120        props.insert(
121            "enable_funding_rates".to_string(),
122            serde_json::Value::Bool(self.config.enable_funding_rates),
123        );
124        props.insert(
125            "enable_liquidations".to_string(),
126            serde_json::Value::Bool(self.config.enable_liquidations),
127        );
128        props.insert(
129            "funding_poll_interval_secs".to_string(),
130            serde_json::Value::Number(self.config.funding_poll_interval_secs.into()),
131        );
132        props.insert(
133            "initial_cursor".to_string(),
134            serde_json::to_value(&self.config.initial_cursor).unwrap_or_default(),
135        );
136
137        props
138    }
139
140    fn auto_start(&self) -> bool {
141        self.base.get_auto_start()
142    }
143
144    async fn start(&self) -> Result<()> {
145        if self.base.get_status().await == ComponentStatus::Running {
146            return Ok(());
147        }
148
149        self.base.set_status(ComponentStatus::Starting, None).await;
150        info!("Starting Hyperliquid source '{}'", self.base.id);
151
152        let config = self.config.clone();
153        config.validate()?;
154
155        let rest_client = HyperliquidRestClient::new(config.network.rest_url());
156        let coins = match &config.coins {
157            CoinSelection::Specific { coins } => coins.clone(),
158            CoinSelection::All => rest_client.resolve_all_coins().await?,
159        };
160
161        if coins.is_empty() {
162            warn!("Hyperliquid source '{}': resolved coin list is empty — source will produce no data", self.base.id);
163        }
164
165        let state_store = self.state_store.read().await.clone();
166        let trade_dedupe = load_trade_dedupe_state(&self.base.id, &state_store, &coins).await;
167        let funding_state = load_funding_state(&self.base.id, &state_store, &coins).await;
168
169        *self.stream_state.trade_dedupe.write().await = trade_dedupe;
170        *self.stream_state.funding_state.write().await = funding_state;
171
172        let source_id = self.base.id.clone();
173        let dispatchers = self.base.dispatchers.clone();
174        let stream_state = self.stream_state.clone();
175        let shutdown_rx = self.shutdown_rx.clone();
176        let start_timestamp = config.initial_cursor.start_timestamp();
177        let ws_url = config.network.ws_url();
178
179        let instance_id = self
180            .base
181            .context()
182            .await
183            .map(|c| c.instance_id)
184            .unwrap_or_default();
185
186        let span_source_id = source_id.clone();
187        let stream_task = tokio::spawn(
188            async move {
189                let ws_handle = tokio::spawn(run_ws_stream(WsStreamParams {
190                    source_id: source_id.clone(),
191                    ws_url,
192                    config: config.clone(),
193                    coins,
194                    dispatchers: dispatchers.clone(),
195                    state_store: state_store.clone(),
196                    stream_state: stream_state.clone(),
197                    shutdown_rx: shutdown_rx.clone(),
198                    start_timestamp,
199                }));
200
201                let funding_handle = if config.enable_funding_rates {
202                    let rest_client = HyperliquidRestClient::new(config.network.rest_url());
203                    Some(tokio::spawn(run_funding_poll(FundingPollParams {
204                        source_id: source_id.clone(),
205                        rest_client,
206                        config: config.clone(),
207                        dispatchers: dispatchers.clone(),
208                        state_store: state_store.clone(),
209                        stream_state: stream_state.clone(),
210                        shutdown_rx: shutdown_rx.clone(),
211                        start_timestamp,
212                    })))
213                } else {
214                    None
215                };
216
217                match funding_handle {
218                    Some(funding_h) => {
219                        // Pin both handles so select! can poll without consuming.
220                        tokio::pin!(ws_handle);
221                        tokio::pin!(funding_h);
222
223                        tokio::select! {
224                            result = &mut ws_handle => {
225                                match result {
226                                    Ok(Ok(_)) => {}
227                                    Ok(Err(e)) => warn!("WebSocket stream error: {e}"),
228                                    Err(e) => warn!("WebSocket stream task panicked: {e}"),
229                                }
230                                funding_h.abort();
231                            }
232                            result = &mut funding_h => {
233                                match result {
234                                    Ok(Ok(_)) => {}
235                                    Ok(Err(e)) => warn!("Funding poll error: {e}"),
236                                    Err(e) => warn!("Funding poll task panicked: {e}"),
237                                }
238                                ws_handle.abort();
239                            }
240                        }
241                    }
242                    None => match ws_handle.await {
243                        Ok(Ok(_)) => {}
244                        Ok(Err(e)) => warn!("WebSocket stream error: {e}"),
245                        Err(e) => warn!("WebSocket stream task panicked: {e}"),
246                    },
247                }
248            }
249            .instrument(tracing::info_span!(
250                "hyperliquid_source",
251                instance_id = %instance_id,
252                component_id = %span_source_id,
253                component_type = "source"
254            )),
255        );
256
257        *self.task_handle.write().await = Some(stream_task);
258        self.base.set_status(ComponentStatus::Running, None).await;
259
260        Ok(())
261    }
262
263    async fn stop(&self) -> Result<()> {
264        info!("Stopping Hyperliquid source '{}'", self.base.id);
265
266        if let Err(e) = self.shutdown_tx.send(true) {
267            warn!("Failed to send shutdown signal: {e}");
268        }
269
270        if let Some(handle) = self.task_handle.write().await.take() {
271            match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
272                Ok(Ok(())) => {}
273                Ok(Err(e)) => warn!("Hyperliquid source task panicked: {e}"),
274                Err(_) => warn!("Hyperliquid source task did not stop within timeout"),
275            }
276        }
277
278        self.base.set_status(ComponentStatus::Stopped, None).await;
279        Ok(())
280    }
281
282    async fn status(&self) -> ComponentStatus {
283        self.base.get_status().await
284    }
285
286    async fn subscribe(
287        &self,
288        settings: drasi_lib::config::SourceSubscriptionSettings,
289    ) -> Result<SubscriptionResponse> {
290        self.base
291            .subscribe_with_bootstrap(&settings, "Hyperliquid")
292            .await
293    }
294
295    fn as_any(&self) -> &dyn std::any::Any {
296        self
297    }
298
299    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
300        self.base.initialize(context.clone()).await;
301
302        if let Some(state_store) = context.state_store {
303            *self.state_store.write().await = Some(state_store);
304        }
305    }
306
307    async fn set_bootstrap_provider(&self, provider: Box<dyn BootstrapProvider + 'static>) {
308        self.base.set_bootstrap_provider(provider).await;
309    }
310}
311
312/// Builder for HyperliquidSource.
313pub struct HyperliquidSourceBuilder {
314    id: String,
315    config: HyperliquidSourceConfig,
316    dispatch_mode: Option<DispatchMode>,
317    dispatch_buffer_capacity: Option<usize>,
318    bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
319    auto_start: bool,
320    state_store: Option<Arc<dyn StateStoreProvider>>,
321}
322
323impl HyperliquidSourceBuilder {
324    pub fn new(id: impl Into<String>) -> Self {
325        Self {
326            id: id.into(),
327            config: HyperliquidSourceConfig::default(),
328            dispatch_mode: None,
329            dispatch_buffer_capacity: None,
330            bootstrap_provider: None,
331            auto_start: true,
332            state_store: None,
333        }
334    }
335
336    pub fn with_network(mut self, network: HyperliquidNetwork) -> Self {
337        self.config.network = network;
338        self
339    }
340
341    pub fn with_coins(mut self, coins: Vec<impl Into<String>>) -> Self {
342        self.config.coins = CoinSelection::Specific {
343            coins: coins.into_iter().map(Into::into).collect(),
344        };
345        self
346    }
347
348    pub fn with_all_coins(mut self) -> Self {
349        self.config.coins = CoinSelection::All;
350        self
351    }
352
353    pub fn with_trades(mut self, enabled: bool) -> Self {
354        self.config.enable_trades = enabled;
355        self
356    }
357
358    pub fn with_order_book(mut self, enabled: bool) -> Self {
359        self.config.enable_order_book = enabled;
360        self
361    }
362
363    pub fn with_mid_prices(mut self, enabled: bool) -> Self {
364        self.config.enable_mid_prices = enabled;
365        self
366    }
367
368    pub fn with_funding_rates(mut self, enabled: bool) -> Self {
369        self.config.enable_funding_rates = enabled;
370        self
371    }
372
373    pub fn with_liquidations(mut self, enabled: bool) -> Self {
374        self.config.enable_liquidations = enabled;
375        self
376    }
377
378    pub fn with_funding_poll_interval_secs(mut self, interval_secs: u64) -> Self {
379        self.config.funding_poll_interval_secs = interval_secs;
380        self
381    }
382
383    pub fn start_from_beginning(mut self) -> Self {
384        self.config.initial_cursor = InitialCursor::StartFromBeginning;
385        self
386    }
387
388    pub fn start_from_now(mut self) -> Self {
389        self.config.initial_cursor = InitialCursor::StartFromNow;
390        self
391    }
392
393    pub fn start_from_timestamp(mut self, timestamp: i64) -> Self {
394        self.config.initial_cursor = InitialCursor::StartFromTimestamp { timestamp };
395        self
396    }
397
398    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
399        self.dispatch_mode = Some(mode);
400        self
401    }
402
403    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
404        self.dispatch_buffer_capacity = Some(capacity);
405        self
406    }
407
408    pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
409        self.bootstrap_provider = Some(Box::new(provider));
410        self
411    }
412
413    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
414        self.auto_start = auto_start;
415        self
416    }
417
418    pub fn with_state_store(mut self, state_store: Arc<dyn StateStoreProvider>) -> Self {
419        self.state_store = Some(state_store);
420        self
421    }
422
423    pub fn build(self) -> Result<HyperliquidSource> {
424        self.config.validate()?;
425
426        let state_store = self
427            .state_store
428            .unwrap_or_else(|| Arc::new(MemoryStateStoreProvider::new()));
429
430        let stream_state = StreamState::default();
431
432        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
433        if let Some(mode) = self.dispatch_mode {
434            params = params.with_dispatch_mode(mode);
435        }
436        if let Some(capacity) = self.dispatch_buffer_capacity {
437            params = params.with_dispatch_buffer_capacity(capacity);
438        }
439        if let Some(provider) = self.bootstrap_provider {
440            params = params.with_bootstrap_provider(provider);
441        } else {
442            // Default bootstrap provider shares initialized-entity state with
443            // the streaming layer to avoid duplicate Insert events.
444            params =
445                params.with_bootstrap_provider(HyperliquidBootstrapProvider::with_stream_state(
446                    self.config.clone(),
447                    stream_state.initialized.clone(),
448                ));
449        }
450        params = params.with_state_store(state_store.clone());
451
452        let (shutdown_tx, shutdown_rx) = watch::channel(false);
453
454        Ok(HyperliquidSource {
455            base: SourceBase::new(params)?,
456            config: self.config,
457            state_store: Arc::new(RwLock::new(Some(state_store))),
458            stream_state,
459            shutdown_tx,
460            shutdown_rx,
461            task_handle: Arc::new(RwLock::new(None)),
462        })
463    }
464}
465
466/// Bootstrap provider for Hyperliquid market data.
467pub struct HyperliquidBootstrapProvider {
468    config: HyperliquidSourceConfig,
469    stream_initialized: Option<Arc<RwLock<InitializedEntities>>>,
470}
471
472impl HyperliquidBootstrapProvider {
473    pub fn new(config: HyperliquidSourceConfig) -> Self {
474        Self {
475            config,
476            stream_initialized: None,
477        }
478    }
479
480    /// Create a bootstrap provider that shares initialized-entity tracking with
481    /// the streaming layer so that singletons bootstrapped as Inserts are
482    /// subsequently emitted as Updates (not duplicate Inserts) by the stream.
483    pub fn with_stream_state(
484        config: HyperliquidSourceConfig,
485        stream_initialized: Arc<RwLock<InitializedEntities>>,
486    ) -> Self {
487        Self {
488            config,
489            stream_initialized: Some(stream_initialized),
490        }
491    }
492}
493
494#[async_trait]
495impl BootstrapProvider for HyperliquidBootstrapProvider {
496    async fn bootstrap(
497        &self,
498        request: BootstrapRequest,
499        context: &BootstrapContext,
500        event_tx: BootstrapEventSender,
501        _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
502    ) -> Result<BootstrapResult> {
503        let rest_client = HyperliquidRestClient::new(self.config.network.rest_url());
504        let mut node_changes: Vec<SourceChange> = Vec::new();
505        let mut relation_changes: Vec<SourceChange> = Vec::new();
506        let mut existing_coins: HashSet<String> = HashSet::new();
507        let mut initialized = InitializedEntities::new();
508
509        let meta = rest_client.fetch_meta().await?;
510        for asset in &meta.universe {
511            existing_coins.insert(asset.name.clone());
512        }
513        node_changes.extend(map_meta_to_coin_changes(&context.source_id, &meta)?);
514
515        let spot_meta = rest_client.fetch_spot_meta().await?;
516        node_changes.extend(map_spot_meta_to_nodes(
517            &context.source_id,
518            &spot_meta,
519            &mut existing_coins,
520        )?);
521
522        let coin_filter: Option<HashSet<String>> = match &self.config.coins {
523            CoinSelection::Specific { coins } => Some(coins.iter().cloned().collect()),
524            CoinSelection::All => None,
525        };
526
527        let timestamp = chrono::Utc::now().timestamp_millis();
528
529        if self.config.enable_mid_prices {
530            let all_mids = rest_client.fetch_all_mids().await?;
531            let filtered_mids: HashMap<String, String> = if let Some(filter) = &coin_filter {
532                all_mids
533                    .into_iter()
534                    .filter(|(coin, _)| filter.contains(coin))
535                    .collect()
536            } else {
537                all_mids
538            };
539
540            split_changes(
541                map_mid_prices_to_changes(
542                    &context.source_id,
543                    &filtered_mids,
544                    &mut initialized,
545                    timestamp,
546                )?,
547                &mut node_changes,
548                &mut relation_changes,
549            );
550        }
551
552        if self.config.enable_funding_rates {
553            let (meta_from_ctx, asset_ctxs) = rest_client.fetch_meta_and_asset_ctxs().await?;
554            for (asset, ctx) in meta_from_ctx.universe.iter().zip(asset_ctxs.iter()) {
555                if let Some(filter) = &coin_filter {
556                    if !filter.contains(&asset.name) {
557                        continue;
558                    }
559                }
560
561                let (changes, _snapshot) = map_funding_rate_to_changes(
562                    &context.source_id,
563                    &asset.name,
564                    ctx,
565                    &mut initialized,
566                    timestamp,
567                )?;
568                split_changes(changes, &mut node_changes, &mut relation_changes);
569            }
570        }
571
572        if self.config.enable_order_book {
573            let l2_coins: Vec<String> = match &self.config.coins {
574                CoinSelection::Specific { coins } => coins.clone(),
575                CoinSelection::All => meta
576                    .universe
577                    .iter()
578                    .map(|asset| asset.name.clone())
579                    .collect(),
580            };
581
582            for coin in &l2_coins {
583                let book = rest_client.fetch_l2_book(coin).await?;
584                let changes =
585                    map_order_book_to_changes(&context.source_id, &book, &mut initialized)?;
586                split_changes(changes, &mut node_changes, &mut relation_changes);
587            }
588        }
589
590        // Propagate bootstrap-initialized state to the streaming layer so that
591        // singleton entities (MidPrice, OrderBook, FundingRate) that were
592        // bootstrapped as Inserts will be emitted as Updates by the stream.
593        if let Some(shared) = &self.stream_initialized {
594            *shared.write().await = initialized;
595        }
596
597        let node_labels = &request.node_labels;
598        let relation_labels = &request.relation_labels;
599
600        let mut sequence = 0u64;
601        for change in node_changes
602            .into_iter()
603            .filter(|change| matches_labels(change, node_labels, relation_labels))
604        {
605            sequence += 1;
606            event_tx
607                .send(BootstrapEvent {
608                    source_id: context.source_id.clone(),
609                    change,
610                    timestamp: chrono::Utc::now(),
611                    sequence,
612                })
613                .await
614                .map_err(|e| anyhow::anyhow!("Failed to send bootstrap event: {e}"))?;
615        }
616
617        for change in relation_changes
618            .into_iter()
619            .filter(|change| matches_labels(change, node_labels, relation_labels))
620        {
621            sequence += 1;
622            event_tx
623                .send(BootstrapEvent {
624                    source_id: context.source_id.clone(),
625                    change,
626                    timestamp: chrono::Utc::now(),
627                    sequence,
628                })
629                .await
630                .map_err(|e| anyhow::anyhow!("Failed to send bootstrap event: {e}"))?;
631        }
632
633        Ok(BootstrapResult {
634            event_count: sequence as usize,
635            ..BootstrapResult::default()
636        })
637    }
638}
639
640fn split_changes(
641    changes: Vec<SourceChange>,
642    nodes: &mut Vec<SourceChange>,
643    relations: &mut Vec<SourceChange>,
644) {
645    for change in changes {
646        match &change {
647            SourceChange::Insert { element } | SourceChange::Update { element } => match element {
648                Element::Node { .. } => nodes.push(change),
649                Element::Relation { .. } => relations.push(change),
650            },
651            _ => nodes.push(change),
652        }
653    }
654}
655
656fn matches_labels(
657    change: &SourceChange,
658    node_labels: &[String],
659    relation_labels: &[String],
660) -> bool {
661    match change {
662        SourceChange::Insert { element } | SourceChange::Update { element } => match element {
663            Element::Node { metadata, .. } => labels_match(&metadata.labels, node_labels),
664            Element::Relation { metadata, .. } => labels_match(&metadata.labels, relation_labels),
665        },
666        _ => true,
667    }
668}
669
670fn labels_match(labels: &Arc<[Arc<str>]>, requested: &[String]) -> bool {
671    if requested.is_empty() {
672        return true;
673    }
674
675    labels
676        .iter()
677        .any(|label| requested.iter().any(|req| req == label.as_ref()))
678}
679
680/// Dynamic plugin entry point.
681#[cfg(feature = "dynamic-plugin")]
682drasi_plugin_sdk::export_plugin!(
683    plugin_id = "hyperliquid-source",
684    core_version = env!("CARGO_PKG_VERSION"),
685    lib_version = env!("CARGO_PKG_VERSION"),
686    plugin_version = env!("CARGO_PKG_VERSION"),
687    source_descriptors = [descriptor::HyperliquidSourceDescriptor],
688    reaction_descriptors = [],
689    bootstrap_descriptors = [],
690);