1#![allow(unexpected_cfgs)]
2pub mod config;
22pub mod descriptor;
23pub mod mapping;
24pub mod rest;
25pub mod stream;
26pub mod types;
27
28#[cfg(test)]
29mod tests;
30
31pub use config::{CoinSelection, HyperliquidNetwork, HyperliquidSourceConfig, InitialCursor};
32
33use anyhow::Result;
34use async_trait::async_trait;
35use drasi_core::models::{Element, SourceChange};
36use drasi_lib::bootstrap::{
37 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
38};
39use drasi_lib::channels::{
40 BootstrapEvent, BootstrapEventSender, ComponentStatus, DispatchMode, SubscriptionResponse,
41};
42use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
43use drasi_lib::state_store::{MemoryStateStoreProvider, StateStoreProvider};
44use drasi_lib::Source;
45use log::{info, warn};
46use std::collections::{HashMap, HashSet};
47use std::sync::Arc;
48use tokio::sync::{watch, RwLock};
49use tracing::Instrument;
50
51use crate::mapping::{
52 map_funding_rate_to_changes, map_meta_to_coin_changes, map_mid_prices_to_changes,
53 map_order_book_to_changes, map_spot_meta_to_nodes, InitializedEntities,
54};
55use crate::rest::HyperliquidRestClient;
56use crate::stream::{
57 load_funding_state, load_trade_dedupe_state, run_funding_poll, run_ws_stream,
58 FundingPollParams, StreamState, WsStreamParams,
59};
60
61pub struct HyperliquidSource {
63 base: SourceBase,
64 config: HyperliquidSourceConfig,
65 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
66 stream_state: StreamState,
67 shutdown_tx: watch::Sender<bool>,
68 shutdown_rx: watch::Receiver<bool>,
69 task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
70}
71
72impl HyperliquidSource {
73 pub fn builder(id: impl Into<String>) -> HyperliquidSourceBuilder {
75 HyperliquidSourceBuilder::new(id)
76 }
77}
78
79#[async_trait]
80impl Source for HyperliquidSource {
81 fn id(&self) -> &str {
82 &self.base.id
83 }
84
85 fn type_name(&self) -> &str {
86 "hyperliquid"
87 }
88
89 fn properties(&self) -> HashMap<String, serde_json::Value> {
90 let mut props = HashMap::new();
91 props.insert(
92 "network".to_string(),
93 serde_json::Value::String(format!("{:?}", self.config.network)),
94 );
95
96 let coins_value = match &self.config.coins {
97 CoinSelection::Specific { coins } => serde_json::Value::Array(
98 coins
99 .iter()
100 .cloned()
101 .map(serde_json::Value::String)
102 .collect(),
103 ),
104 CoinSelection::All => serde_json::Value::String("all".to_string()),
105 };
106 props.insert("coins".to_string(), coins_value);
107
108 props.insert(
109 "enable_trades".to_string(),
110 serde_json::Value::Bool(self.config.enable_trades),
111 );
112 props.insert(
113 "enable_order_book".to_string(),
114 serde_json::Value::Bool(self.config.enable_order_book),
115 );
116 props.insert(
117 "enable_mid_prices".to_string(),
118 serde_json::Value::Bool(self.config.enable_mid_prices),
119 );
120 props.insert(
121 "enable_funding_rates".to_string(),
122 serde_json::Value::Bool(self.config.enable_funding_rates),
123 );
124 props.insert(
125 "enable_liquidations".to_string(),
126 serde_json::Value::Bool(self.config.enable_liquidations),
127 );
128 props.insert(
129 "funding_poll_interval_secs".to_string(),
130 serde_json::Value::Number(self.config.funding_poll_interval_secs.into()),
131 );
132 props.insert(
133 "initial_cursor".to_string(),
134 serde_json::to_value(&self.config.initial_cursor).unwrap_or_default(),
135 );
136
137 props
138 }
139
140 fn auto_start(&self) -> bool {
141 self.base.get_auto_start()
142 }
143
144 async fn start(&self) -> Result<()> {
145 if self.base.get_status().await == ComponentStatus::Running {
146 return Ok(());
147 }
148
149 self.base.set_status(ComponentStatus::Starting, None).await;
150 info!("Starting Hyperliquid source '{}'", self.base.id);
151
152 let config = self.config.clone();
153 config.validate()?;
154
155 let rest_client = HyperliquidRestClient::new(config.network.rest_url());
156 let coins = match &config.coins {
157 CoinSelection::Specific { coins } => coins.clone(),
158 CoinSelection::All => rest_client.resolve_all_coins().await?,
159 };
160
161 if coins.is_empty() {
162 warn!("Hyperliquid source '{}': resolved coin list is empty — source will produce no data", self.base.id);
163 }
164
165 let state_store = self.state_store.read().await.clone();
166 let trade_dedupe = load_trade_dedupe_state(&self.base.id, &state_store, &coins).await;
167 let funding_state = load_funding_state(&self.base.id, &state_store, &coins).await;
168
169 *self.stream_state.trade_dedupe.write().await = trade_dedupe;
170 *self.stream_state.funding_state.write().await = funding_state;
171
172 let source_id = self.base.id.clone();
173 let dispatchers = self.base.dispatchers.clone();
174 let stream_state = self.stream_state.clone();
175 let shutdown_rx = self.shutdown_rx.clone();
176 let start_timestamp = config.initial_cursor.start_timestamp();
177 let ws_url = config.network.ws_url();
178
179 let instance_id = self
180 .base
181 .context()
182 .await
183 .map(|c| c.instance_id)
184 .unwrap_or_default();
185
186 let span_source_id = source_id.clone();
187 let stream_task = tokio::spawn(
188 async move {
189 let ws_handle = tokio::spawn(run_ws_stream(WsStreamParams {
190 source_id: source_id.clone(),
191 ws_url,
192 config: config.clone(),
193 coins,
194 dispatchers: dispatchers.clone(),
195 state_store: state_store.clone(),
196 stream_state: stream_state.clone(),
197 shutdown_rx: shutdown_rx.clone(),
198 start_timestamp,
199 }));
200
201 let funding_handle = if config.enable_funding_rates {
202 let rest_client = HyperliquidRestClient::new(config.network.rest_url());
203 Some(tokio::spawn(run_funding_poll(FundingPollParams {
204 source_id: source_id.clone(),
205 rest_client,
206 config: config.clone(),
207 dispatchers: dispatchers.clone(),
208 state_store: state_store.clone(),
209 stream_state: stream_state.clone(),
210 shutdown_rx: shutdown_rx.clone(),
211 start_timestamp,
212 })))
213 } else {
214 None
215 };
216
217 match funding_handle {
218 Some(funding_h) => {
219 tokio::pin!(ws_handle);
221 tokio::pin!(funding_h);
222
223 tokio::select! {
224 result = &mut ws_handle => {
225 match result {
226 Ok(Ok(_)) => {}
227 Ok(Err(e)) => warn!("WebSocket stream error: {e}"),
228 Err(e) => warn!("WebSocket stream task panicked: {e}"),
229 }
230 funding_h.abort();
231 }
232 result = &mut funding_h => {
233 match result {
234 Ok(Ok(_)) => {}
235 Ok(Err(e)) => warn!("Funding poll error: {e}"),
236 Err(e) => warn!("Funding poll task panicked: {e}"),
237 }
238 ws_handle.abort();
239 }
240 }
241 }
242 None => match ws_handle.await {
243 Ok(Ok(_)) => {}
244 Ok(Err(e)) => warn!("WebSocket stream error: {e}"),
245 Err(e) => warn!("WebSocket stream task panicked: {e}"),
246 },
247 }
248 }
249 .instrument(tracing::info_span!(
250 "hyperliquid_source",
251 instance_id = %instance_id,
252 component_id = %span_source_id,
253 component_type = "source"
254 )),
255 );
256
257 *self.task_handle.write().await = Some(stream_task);
258 self.base.set_status(ComponentStatus::Running, None).await;
259
260 Ok(())
261 }
262
263 async fn stop(&self) -> Result<()> {
264 info!("Stopping Hyperliquid source '{}'", self.base.id);
265
266 if let Err(e) = self.shutdown_tx.send(true) {
267 warn!("Failed to send shutdown signal: {e}");
268 }
269
270 if let Some(handle) = self.task_handle.write().await.take() {
271 match tokio::time::timeout(std::time::Duration::from_secs(5), handle).await {
272 Ok(Ok(())) => {}
273 Ok(Err(e)) => warn!("Hyperliquid source task panicked: {e}"),
274 Err(_) => warn!("Hyperliquid source task did not stop within timeout"),
275 }
276 }
277
278 self.base.set_status(ComponentStatus::Stopped, None).await;
279 Ok(())
280 }
281
282 async fn status(&self) -> ComponentStatus {
283 self.base.get_status().await
284 }
285
286 async fn subscribe(
287 &self,
288 settings: drasi_lib::config::SourceSubscriptionSettings,
289 ) -> Result<SubscriptionResponse> {
290 self.base
291 .subscribe_with_bootstrap(&settings, "Hyperliquid")
292 .await
293 }
294
295 fn as_any(&self) -> &dyn std::any::Any {
296 self
297 }
298
299 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
300 self.base.initialize(context.clone()).await;
301
302 if let Some(state_store) = context.state_store {
303 *self.state_store.write().await = Some(state_store);
304 }
305 }
306
307 async fn set_bootstrap_provider(&self, provider: Box<dyn BootstrapProvider + 'static>) {
308 self.base.set_bootstrap_provider(provider).await;
309 }
310}
311
312pub struct HyperliquidSourceBuilder {
314 id: String,
315 config: HyperliquidSourceConfig,
316 dispatch_mode: Option<DispatchMode>,
317 dispatch_buffer_capacity: Option<usize>,
318 bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
319 auto_start: bool,
320 state_store: Option<Arc<dyn StateStoreProvider>>,
321}
322
323impl HyperliquidSourceBuilder {
324 pub fn new(id: impl Into<String>) -> Self {
325 Self {
326 id: id.into(),
327 config: HyperliquidSourceConfig::default(),
328 dispatch_mode: None,
329 dispatch_buffer_capacity: None,
330 bootstrap_provider: None,
331 auto_start: true,
332 state_store: None,
333 }
334 }
335
336 pub fn with_network(mut self, network: HyperliquidNetwork) -> Self {
337 self.config.network = network;
338 self
339 }
340
341 pub fn with_coins(mut self, coins: Vec<impl Into<String>>) -> Self {
342 self.config.coins = CoinSelection::Specific {
343 coins: coins.into_iter().map(Into::into).collect(),
344 };
345 self
346 }
347
348 pub fn with_all_coins(mut self) -> Self {
349 self.config.coins = CoinSelection::All;
350 self
351 }
352
353 pub fn with_trades(mut self, enabled: bool) -> Self {
354 self.config.enable_trades = enabled;
355 self
356 }
357
358 pub fn with_order_book(mut self, enabled: bool) -> Self {
359 self.config.enable_order_book = enabled;
360 self
361 }
362
363 pub fn with_mid_prices(mut self, enabled: bool) -> Self {
364 self.config.enable_mid_prices = enabled;
365 self
366 }
367
368 pub fn with_funding_rates(mut self, enabled: bool) -> Self {
369 self.config.enable_funding_rates = enabled;
370 self
371 }
372
373 pub fn with_liquidations(mut self, enabled: bool) -> Self {
374 self.config.enable_liquidations = enabled;
375 self
376 }
377
378 pub fn with_funding_poll_interval_secs(mut self, interval_secs: u64) -> Self {
379 self.config.funding_poll_interval_secs = interval_secs;
380 self
381 }
382
383 pub fn start_from_beginning(mut self) -> Self {
384 self.config.initial_cursor = InitialCursor::StartFromBeginning;
385 self
386 }
387
388 pub fn start_from_now(mut self) -> Self {
389 self.config.initial_cursor = InitialCursor::StartFromNow;
390 self
391 }
392
393 pub fn start_from_timestamp(mut self, timestamp: i64) -> Self {
394 self.config.initial_cursor = InitialCursor::StartFromTimestamp { timestamp };
395 self
396 }
397
398 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
399 self.dispatch_mode = Some(mode);
400 self
401 }
402
403 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
404 self.dispatch_buffer_capacity = Some(capacity);
405 self
406 }
407
408 pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
409 self.bootstrap_provider = Some(Box::new(provider));
410 self
411 }
412
413 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
414 self.auto_start = auto_start;
415 self
416 }
417
418 pub fn with_state_store(mut self, state_store: Arc<dyn StateStoreProvider>) -> Self {
419 self.state_store = Some(state_store);
420 self
421 }
422
423 pub fn build(self) -> Result<HyperliquidSource> {
424 self.config.validate()?;
425
426 let state_store = self
427 .state_store
428 .unwrap_or_else(|| Arc::new(MemoryStateStoreProvider::new()));
429
430 let stream_state = StreamState::default();
431
432 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
433 if let Some(mode) = self.dispatch_mode {
434 params = params.with_dispatch_mode(mode);
435 }
436 if let Some(capacity) = self.dispatch_buffer_capacity {
437 params = params.with_dispatch_buffer_capacity(capacity);
438 }
439 if let Some(provider) = self.bootstrap_provider {
440 params = params.with_bootstrap_provider(provider);
441 } else {
442 params =
445 params.with_bootstrap_provider(HyperliquidBootstrapProvider::with_stream_state(
446 self.config.clone(),
447 stream_state.initialized.clone(),
448 ));
449 }
450 params = params.with_state_store(state_store.clone());
451
452 let (shutdown_tx, shutdown_rx) = watch::channel(false);
453
454 Ok(HyperliquidSource {
455 base: SourceBase::new(params)?,
456 config: self.config,
457 state_store: Arc::new(RwLock::new(Some(state_store))),
458 stream_state,
459 shutdown_tx,
460 shutdown_rx,
461 task_handle: Arc::new(RwLock::new(None)),
462 })
463 }
464}
465
466pub struct HyperliquidBootstrapProvider {
468 config: HyperliquidSourceConfig,
469 stream_initialized: Option<Arc<RwLock<InitializedEntities>>>,
470}
471
472impl HyperliquidBootstrapProvider {
473 pub fn new(config: HyperliquidSourceConfig) -> Self {
474 Self {
475 config,
476 stream_initialized: None,
477 }
478 }
479
480 pub fn with_stream_state(
484 config: HyperliquidSourceConfig,
485 stream_initialized: Arc<RwLock<InitializedEntities>>,
486 ) -> Self {
487 Self {
488 config,
489 stream_initialized: Some(stream_initialized),
490 }
491 }
492}
493
494#[async_trait]
495impl BootstrapProvider for HyperliquidBootstrapProvider {
496 async fn bootstrap(
497 &self,
498 request: BootstrapRequest,
499 context: &BootstrapContext,
500 event_tx: BootstrapEventSender,
501 _settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
502 ) -> Result<BootstrapResult> {
503 let rest_client = HyperliquidRestClient::new(self.config.network.rest_url());
504 let mut node_changes: Vec<SourceChange> = Vec::new();
505 let mut relation_changes: Vec<SourceChange> = Vec::new();
506 let mut existing_coins: HashSet<String> = HashSet::new();
507 let mut initialized = InitializedEntities::new();
508
509 let meta = rest_client.fetch_meta().await?;
510 for asset in &meta.universe {
511 existing_coins.insert(asset.name.clone());
512 }
513 node_changes.extend(map_meta_to_coin_changes(&context.source_id, &meta)?);
514
515 let spot_meta = rest_client.fetch_spot_meta().await?;
516 node_changes.extend(map_spot_meta_to_nodes(
517 &context.source_id,
518 &spot_meta,
519 &mut existing_coins,
520 )?);
521
522 let coin_filter: Option<HashSet<String>> = match &self.config.coins {
523 CoinSelection::Specific { coins } => Some(coins.iter().cloned().collect()),
524 CoinSelection::All => None,
525 };
526
527 let timestamp = chrono::Utc::now().timestamp_millis();
528
529 if self.config.enable_mid_prices {
530 let all_mids = rest_client.fetch_all_mids().await?;
531 let filtered_mids: HashMap<String, String> = if let Some(filter) = &coin_filter {
532 all_mids
533 .into_iter()
534 .filter(|(coin, _)| filter.contains(coin))
535 .collect()
536 } else {
537 all_mids
538 };
539
540 split_changes(
541 map_mid_prices_to_changes(
542 &context.source_id,
543 &filtered_mids,
544 &mut initialized,
545 timestamp,
546 )?,
547 &mut node_changes,
548 &mut relation_changes,
549 );
550 }
551
552 if self.config.enable_funding_rates {
553 let (meta_from_ctx, asset_ctxs) = rest_client.fetch_meta_and_asset_ctxs().await?;
554 for (asset, ctx) in meta_from_ctx.universe.iter().zip(asset_ctxs.iter()) {
555 if let Some(filter) = &coin_filter {
556 if !filter.contains(&asset.name) {
557 continue;
558 }
559 }
560
561 let (changes, _snapshot) = map_funding_rate_to_changes(
562 &context.source_id,
563 &asset.name,
564 ctx,
565 &mut initialized,
566 timestamp,
567 )?;
568 split_changes(changes, &mut node_changes, &mut relation_changes);
569 }
570 }
571
572 if self.config.enable_order_book {
573 let l2_coins: Vec<String> = match &self.config.coins {
574 CoinSelection::Specific { coins } => coins.clone(),
575 CoinSelection::All => meta
576 .universe
577 .iter()
578 .map(|asset| asset.name.clone())
579 .collect(),
580 };
581
582 for coin in &l2_coins {
583 let book = rest_client.fetch_l2_book(coin).await?;
584 let changes =
585 map_order_book_to_changes(&context.source_id, &book, &mut initialized)?;
586 split_changes(changes, &mut node_changes, &mut relation_changes);
587 }
588 }
589
590 if let Some(shared) = &self.stream_initialized {
594 *shared.write().await = initialized;
595 }
596
597 let node_labels = &request.node_labels;
598 let relation_labels = &request.relation_labels;
599
600 let mut sequence = 0u64;
601 for change in node_changes
602 .into_iter()
603 .filter(|change| matches_labels(change, node_labels, relation_labels))
604 {
605 sequence += 1;
606 event_tx
607 .send(BootstrapEvent {
608 source_id: context.source_id.clone(),
609 change,
610 timestamp: chrono::Utc::now(),
611 sequence,
612 })
613 .await
614 .map_err(|e| anyhow::anyhow!("Failed to send bootstrap event: {e}"))?;
615 }
616
617 for change in relation_changes
618 .into_iter()
619 .filter(|change| matches_labels(change, node_labels, relation_labels))
620 {
621 sequence += 1;
622 event_tx
623 .send(BootstrapEvent {
624 source_id: context.source_id.clone(),
625 change,
626 timestamp: chrono::Utc::now(),
627 sequence,
628 })
629 .await
630 .map_err(|e| anyhow::anyhow!("Failed to send bootstrap event: {e}"))?;
631 }
632
633 Ok(BootstrapResult {
634 event_count: sequence as usize,
635 ..BootstrapResult::default()
636 })
637 }
638}
639
640fn split_changes(
641 changes: Vec<SourceChange>,
642 nodes: &mut Vec<SourceChange>,
643 relations: &mut Vec<SourceChange>,
644) {
645 for change in changes {
646 match &change {
647 SourceChange::Insert { element } | SourceChange::Update { element } => match element {
648 Element::Node { .. } => nodes.push(change),
649 Element::Relation { .. } => relations.push(change),
650 },
651 _ => nodes.push(change),
652 }
653 }
654}
655
656fn matches_labels(
657 change: &SourceChange,
658 node_labels: &[String],
659 relation_labels: &[String],
660) -> bool {
661 match change {
662 SourceChange::Insert { element } | SourceChange::Update { element } => match element {
663 Element::Node { metadata, .. } => labels_match(&metadata.labels, node_labels),
664 Element::Relation { metadata, .. } => labels_match(&metadata.labels, relation_labels),
665 },
666 _ => true,
667 }
668}
669
670fn labels_match(labels: &Arc<[Arc<str>]>, requested: &[String]) -> bool {
671 if requested.is_empty() {
672 return true;
673 }
674
675 labels
676 .iter()
677 .any(|label| requested.iter().any(|req| req == label.as_ref()))
678}
679
680#[cfg(feature = "dynamic-plugin")]
682drasi_plugin_sdk::export_plugin!(
683 plugin_id = "hyperliquid-source",
684 core_version = env!("CARGO_PKG_VERSION"),
685 lib_version = env!("CARGO_PKG_VERSION"),
686 plugin_version = env!("CARGO_PKG_VERSION"),
687 source_descriptors = [descriptor::HyperliquidSourceDescriptor],
688 reaction_descriptors = [],
689 bootstrap_descriptors = [],
690);