drasi-source-hyperliquid 0.1.2

Hyperliquid market data source plugin for Drasi
Documentation
use anyhow::Result;
use drasi_lib::channels::ResultDiff;
use drasi_lib::{DrasiLib, Query};
use drasi_reaction_application::{subscription::SubscriptionOptions, ApplicationReaction};
use drasi_source_hyperliquid::{
    HyperliquidBootstrapProvider, HyperliquidNetwork, HyperliquidSource,
};
use std::time::{Duration, Instant};

const SOURCE_ID: &str = "hl-test";
const PRICE_QUERY: &str = "price-query";
const TRADE_QUERY: &str = "latest-trade";

#[tokio::test]
#[ignore]
async fn test_hyperliquid_source_live() -> Result<()> {
    let bootstrap_config = drasi_source_hyperliquid::HyperliquidSourceConfig::default();
    let bootstrap = HyperliquidBootstrapProvider::new(bootstrap_config.clone());

    let source = HyperliquidSource::builder(SOURCE_ID)
        .with_network(HyperliquidNetwork::Mainnet)
        .with_coins(vec!["BTC"])
        .with_trades(true)
        .with_order_book(true)
        .with_mid_prices(true)
        .with_liquidations(true)
        .with_funding_rates(false)
        .start_from_beginning()
        .with_bootstrap_provider(bootstrap)
        .build()?;

    let price_query = Query::cypher(PRICE_QUERY)
        .query(
            r#"
            MATCH (m:MidPrice)-[:PRICE_OF]->(c:Coin)
            WHERE c.name = 'BTC'
            RETURN c.name AS coin, c.max_leverage AS max_leverage, m.price AS price
            "#,
        )
        .from_source(SOURCE_ID)
        .auto_start(true)
        .enable_bootstrap(true)
        .build();

    let trade_query = Query::cypher(TRADE_QUERY)
        .query(
            r#"
            MATCH (t:Trade)
            WHERE t.coin = 'BTC'
            RETURN t.tid AS tid, t.price AS price
            "#,
        )
        .from_source(SOURCE_ID)
        .auto_start(true)
        .enable_bootstrap(false)
        .build();

    let (reaction, handle) = ApplicationReaction::builder("app-reaction")
        .with_query(PRICE_QUERY)
        .with_query(TRADE_QUERY)
        .build();

    let core = DrasiLib::builder()
        .with_id("hl-integration-test")
        .with_source(source)
        .with_query(price_query)
        .with_query(trade_query)
        .with_reaction(reaction)
        .build()
        .await?;

    core.start().await?;

    let mut subscription = handle
        .subscribe_with_options(SubscriptionOptions::default().with_timeout(Duration::from_secs(2)))
        .await?;

    let mut saw_price_add = false;
    let mut saw_price_update = false;
    let mut trade_event_count = 0usize;
    let mut saw_trade_change = false;

    let start = Instant::now();
    let timeout = Duration::from_secs(60);

    while start.elapsed() < timeout {
        if let Some(result) = subscription.recv().await {
            if result.query_id == PRICE_QUERY {
                for entry in &result.results {
                    match entry {
                        ResultDiff::Add { data, .. } => {
                            if data.get("coin").is_some() {
                                saw_price_add = true;
                            }
                        }
                        ResultDiff::Update { .. } => {
                            if saw_price_add {
                                saw_price_update = true;
                            } else {
                                saw_price_add = true;
                                saw_price_update = true;
                            }
                        }
                        _ => {}
                    }
                }
            }

            if result.query_id == TRADE_QUERY {
                for entry in &result.results {
                    if matches!(
                        entry,
                        ResultDiff::Add { .. }
                            | ResultDiff::Update { .. }
                            | ResultDiff::Aggregation { .. }
                            | ResultDiff::Delete { .. }
                    ) {
                        trade_event_count += 1;
                        if trade_event_count > 1 {
                            saw_trade_change = true;
                        }
                    }
                }
            }
        }

        if saw_price_add && saw_price_update && trade_event_count > 0 && saw_trade_change {
            break;
        }
    }

    if !saw_price_add {
        anyhow::bail!("Did not observe initial MidPrice add event");
    }
    if !saw_price_update {
        anyhow::bail!("Did not observe MidPrice update event");
    }
    if trade_event_count == 0 {
        anyhow::bail!("Did not observe trade events");
    }
    if !saw_trade_change {
        anyhow::bail!("Did not observe trade change events");
    }

    core.stop().await?;
    Ok(())
}