Skip to main content

drasi_source_hyperliquid/
mapping.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mapping from Hyperliquid API responses to Drasi SourceChange events.
16
17use crate::types::{
18    AssetCtx, FundingSnapshot, L2Book, Liquidation, MetaResponse, SpotMetaResponse, Trade,
19};
20use anyhow::{anyhow, Result};
21use drasi_core::models::{
22    Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
23};
24use ordered_float::OrderedFloat;
25use std::collections::{HashMap, HashSet};
26use std::sync::Arc;
27
28const COIN_LABEL: &str = "Coin";
29const TRADE_LABEL: &str = "Trade";
30const MIDPRICE_LABEL: &str = "MidPrice";
31const ORDERBOOK_LABEL: &str = "OrderBook";
32const LIQUIDATION_LABEL: &str = "Liquidation";
33const FUNDING_LABEL: &str = "FundingRate";
34const SPOTPAIR_LABEL: &str = "SpotPair";
35
36const REL_TRADED_ON: &str = "TRADED_ON";
37const REL_PRICE_OF: &str = "PRICE_OF";
38const REL_BOOK_OF: &str = "BOOK_OF";
39const REL_LIQUIDATED_ON: &str = "LIQUIDATED_ON";
40const REL_FUNDING_OF: &str = "FUNDING_OF";
41
42#[derive(Debug, Default)]
43pub struct InitializedEntities {
44    mid_prices: HashSet<String>,
45    order_books: HashSet<String>,
46    funding_rates: HashSet<String>,
47}
48
49impl InitializedEntities {
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    pub fn mark_mid_price(&mut self, coin: &str) -> bool {
55        self.mid_prices.insert(coin.to_string())
56    }
57
58    pub fn mark_order_book(&mut self, coin: &str) -> bool {
59        self.order_books.insert(coin.to_string())
60    }
61
62    pub fn mark_funding_rate(&mut self, coin: &str) -> bool {
63        self.funding_rates.insert(coin.to_string())
64    }
65}
66
67pub fn build_coin_id(coin: &str) -> String {
68    format!("coin:{coin}")
69}
70
71fn build_trade_id(coin: &str, tid: u64) -> String {
72    format!("trade:{coin}:{tid}")
73}
74
75fn build_midprice_id(coin: &str) -> String {
76    format!("midprice:{coin}")
77}
78
79fn build_orderbook_id(coin: &str) -> String {
80    format!("orderbook:{coin}")
81}
82
83fn build_liquidation_id(coin: &str, timestamp: i64, hash: &Option<String>) -> String {
84    if let Some(hash) = hash {
85        format!("liquidation:{coin}:{hash}")
86    } else {
87        format!("liquidation:{coin}:{timestamp}")
88    }
89}
90
91fn build_funding_id(coin: &str) -> String {
92    format!("funding:{coin}")
93}
94
95fn build_spot_pair_id(name: &str) -> String {
96    format!("spotpair:{name}")
97}
98
99fn build_relation_id(label: &str, source_element_id: &str) -> String {
100    format!("{label}:{source_element_id}")
101}
102
103fn labels(label: &str) -> Arc<[Arc<str>]> {
104    vec![Arc::from(label)].into()
105}
106
107fn element_metadata(
108    source_id: &str,
109    element_id: &str,
110    label: &str,
111    effective_from: u64,
112) -> ElementMetadata {
113    ElementMetadata {
114        reference: ElementReference::new(source_id, element_id),
115        labels: labels(label),
116        effective_from,
117    }
118}
119
120fn relation_element(
121    source_id: &str,
122    relation_id: &str,
123    label: &str,
124    from_id: &str,
125    to_id: &str,
126    effective_from: u64,
127) -> Element {
128    Element::Relation {
129        metadata: element_metadata(source_id, relation_id, label, effective_from),
130        properties: ElementPropertyMap::new(),
131        in_node: ElementReference::new(source_id, from_id),
132        out_node: ElementReference::new(source_id, to_id),
133    }
134}
135
136fn node_element(
137    source_id: &str,
138    element_id: &str,
139    label: &str,
140    effective_from: u64,
141    properties: ElementPropertyMap,
142) -> Element {
143    Element::Node {
144        metadata: element_metadata(source_id, element_id, label, effective_from),
145        properties,
146    }
147}
148
149fn insert_string(props: &mut ElementPropertyMap, key: &str, value: &str) {
150    props.insert(key, ElementValue::String(Arc::from(value)));
151}
152
153fn insert_float(props: &mut ElementPropertyMap, key: &str, value: f64) {
154    props.insert(key, ElementValue::Float(OrderedFloat(value)));
155}
156
157fn insert_integer(props: &mut ElementPropertyMap, key: &str, value: i64) {
158    props.insert(key, ElementValue::Integer(value));
159}
160
161fn parse_f64(value: &str, field: &str) -> Result<f64> {
162    value
163        .parse::<f64>()
164        .map_err(|e| anyhow!("Failed to parse {field} value '{value}': {e}"))
165}
166
167fn effective_from(timestamp: i64) -> u64 {
168    if timestamp <= 0 {
169        0
170    } else {
171        timestamp as u64
172    }
173}
174
175pub fn map_meta_to_coin_changes(source_id: &str, meta: &MetaResponse) -> Result<Vec<SourceChange>> {
176    let mut changes = Vec::new();
177
178    for asset in &meta.universe {
179        let mut props = ElementPropertyMap::new();
180        insert_string(&mut props, "name", &asset.name);
181        insert_integer(&mut props, "sz_decimals", asset.sz_decimals as i64);
182        insert_integer(&mut props, "max_leverage", asset.max_leverage as i64);
183        insert_string(&mut props, "market_type", "perp");
184
185        let element = node_element(source_id, &build_coin_id(&asset.name), COIN_LABEL, 0, props);
186        changes.push(SourceChange::Insert { element });
187    }
188
189    Ok(changes)
190}
191
192pub fn map_spot_meta_to_nodes(
193    source_id: &str,
194    spot_meta: &SpotMetaResponse,
195    existing_coins: &mut HashSet<String>,
196) -> Result<Vec<SourceChange>> {
197    let mut changes = Vec::new();
198
199    let token_lookup: HashMap<u32, String> = spot_meta
200        .tokens
201        .iter()
202        .map(|token| (token.index, token.name.clone()))
203        .collect();
204
205    for token in &spot_meta.tokens {
206        if existing_coins.insert(token.name.clone()) {
207            let mut props = ElementPropertyMap::new();
208            insert_string(&mut props, "name", &token.name);
209            insert_integer(&mut props, "sz_decimals", token.sz_decimals as i64);
210            insert_integer(&mut props, "max_leverage", 0);
211            insert_string(&mut props, "market_type", "spot");
212
213            let element =
214                node_element(source_id, &build_coin_id(&token.name), COIN_LABEL, 0, props);
215            changes.push(SourceChange::Insert { element });
216        }
217    }
218
219    for pair in &spot_meta.universe {
220        let mut props = ElementPropertyMap::new();
221        insert_string(&mut props, "name", &pair.name);
222
223        let token_names: Vec<ElementValue> = pair
224            .tokens
225            .iter()
226            .filter_map(|idx| token_lookup.get(idx))
227            .map(|name| ElementValue::String(Arc::from(name.as_str())))
228            .collect();
229        props.insert("tokens", ElementValue::List(token_names));
230
231        let element = node_element(
232            source_id,
233            &build_spot_pair_id(&pair.name),
234            SPOTPAIR_LABEL,
235            0,
236            props,
237        );
238        changes.push(SourceChange::Insert { element });
239    }
240
241    Ok(changes)
242}
243
244pub fn map_trade_to_changes(source_id: &str, trade: &Trade) -> Result<Vec<SourceChange>> {
245    let price = parse_f64(&trade.px, "trade.px")?;
246    let size = parse_f64(&trade.sz, "trade.sz")?;
247
248    let mut props = ElementPropertyMap::new();
249    insert_string(&mut props, "coin", &trade.coin);
250    insert_string(&mut props, "side", &trade.side);
251    insert_float(&mut props, "price", price);
252    insert_float(&mut props, "size", size);
253    insert_integer(&mut props, "timestamp", trade.time);
254    insert_integer(&mut props, "tid", trade.tid as i64);
255    if let Some(hash) = &trade.hash {
256        insert_string(&mut props, "hash", hash);
257    }
258
259    let trade_id = build_trade_id(&trade.coin, trade.tid);
260    let trade_element = node_element(
261        source_id,
262        &trade_id,
263        TRADE_LABEL,
264        effective_from(trade.time),
265        props,
266    );
267
268    let relation_id = build_relation_id("traded_on", &trade_id);
269    let relation_element = relation_element(
270        source_id,
271        &relation_id,
272        REL_TRADED_ON,
273        &trade_id,
274        &build_coin_id(&trade.coin),
275        effective_from(trade.time),
276    );
277
278    Ok(vec![
279        SourceChange::Insert {
280            element: trade_element,
281        },
282        SourceChange::Insert {
283            element: relation_element,
284        },
285    ])
286}
287
288pub fn map_liquidation_to_changes(
289    source_id: &str,
290    liquidation: &Liquidation,
291) -> Result<Vec<SourceChange>> {
292    let price = parse_f64(&liquidation.px, "liquidation.px")?;
293    let size = parse_f64(&liquidation.sz, "liquidation.sz")?;
294
295    let mut props = ElementPropertyMap::new();
296    insert_string(&mut props, "coin", &liquidation.coin);
297    insert_string(&mut props, "side", &liquidation.side);
298    insert_float(&mut props, "price", price);
299    insert_float(&mut props, "size", size);
300    insert_integer(&mut props, "timestamp", liquidation.time);
301    if let Some(hash) = &liquidation.hash {
302        insert_string(&mut props, "hash", hash);
303    }
304
305    let liquidation_id =
306        build_liquidation_id(&liquidation.coin, liquidation.time, &liquidation.hash);
307    let liquidation_element = node_element(
308        source_id,
309        &liquidation_id,
310        LIQUIDATION_LABEL,
311        effective_from(liquidation.time),
312        props,
313    );
314
315    let relation_id = build_relation_id("liquidated_on", &liquidation_id);
316    let relation_element = relation_element(
317        source_id,
318        &relation_id,
319        REL_LIQUIDATED_ON,
320        &liquidation_id,
321        &build_coin_id(&liquidation.coin),
322        effective_from(liquidation.time),
323    );
324
325    Ok(vec![
326        SourceChange::Insert {
327            element: liquidation_element,
328        },
329        SourceChange::Insert {
330            element: relation_element,
331        },
332    ])
333}
334
335pub fn map_mid_prices_to_changes(
336    source_id: &str,
337    mids: &HashMap<String, String>,
338    initialized: &mut InitializedEntities,
339    timestamp: i64,
340) -> Result<Vec<SourceChange>> {
341    let mut changes = Vec::new();
342
343    for (coin, price_str) in mids {
344        let price = parse_f64(price_str, "midprice")?;
345        let mut props = ElementPropertyMap::new();
346        insert_string(&mut props, "coin", coin);
347        insert_float(&mut props, "price", price);
348        insert_integer(&mut props, "timestamp", timestamp);
349
350        let mid_id = build_midprice_id(coin);
351        let element = node_element(
352            source_id,
353            &mid_id,
354            MIDPRICE_LABEL,
355            effective_from(timestamp),
356            props,
357        );
358
359        if initialized.mark_mid_price(coin) {
360            changes.push(SourceChange::Insert { element });
361
362            let relation_id = build_relation_id("price_of", &mid_id);
363            let relation_element = relation_element(
364                source_id,
365                &relation_id,
366                REL_PRICE_OF,
367                &mid_id,
368                &build_coin_id(coin),
369                effective_from(timestamp),
370            );
371            changes.push(SourceChange::Insert {
372                element: relation_element,
373            });
374        } else {
375            changes.push(SourceChange::Update { element });
376        }
377    }
378
379    Ok(changes)
380}
381
382pub fn map_order_book_to_changes(
383    source_id: &str,
384    book: &L2Book,
385    initialized: &mut InitializedEntities,
386) -> Result<Vec<SourceChange>> {
387    let bid_levels = match book.levels.first() {
388        Some(levels) if !levels.is_empty() => levels,
389        _ => return Ok(Vec::new()),
390    };
391    let ask_levels = match book.levels.get(1) {
392        Some(levels) if !levels.is_empty() => levels,
393        _ => return Ok(Vec::new()),
394    };
395
396    let best_bid = bid_levels
397        .first()
398        .ok_or_else(|| anyhow!("Order book missing best bid"))?;
399    let best_ask = ask_levels
400        .first()
401        .ok_or_else(|| anyhow!("Order book missing best ask"))?;
402
403    let mut props = ElementPropertyMap::new();
404    insert_string(&mut props, "coin", &book.coin);
405    insert_float(
406        &mut props,
407        "best_bid_price",
408        parse_f64(&best_bid.px, "best_bid_price")?,
409    );
410    insert_float(
411        &mut props,
412        "best_bid_size",
413        parse_f64(&best_bid.sz, "best_bid_size")?,
414    );
415    insert_float(
416        &mut props,
417        "best_ask_price",
418        parse_f64(&best_ask.px, "best_ask_price")?,
419    );
420    insert_float(
421        &mut props,
422        "best_ask_size",
423        parse_f64(&best_ask.sz, "best_ask_size")?,
424    );
425    insert_integer(&mut props, "bid_depth", bid_levels.len() as i64);
426    insert_integer(&mut props, "ask_depth", ask_levels.len() as i64);
427    insert_integer(&mut props, "timestamp", book.time);
428
429    let orderbook_id = build_orderbook_id(&book.coin);
430    let element = node_element(
431        source_id,
432        &orderbook_id,
433        ORDERBOOK_LABEL,
434        effective_from(book.time),
435        props,
436    );
437
438    let mut changes = Vec::new();
439    if initialized.mark_order_book(&book.coin) {
440        changes.push(SourceChange::Insert { element });
441        let relation_id = build_relation_id("book_of", &orderbook_id);
442        let relation_element = relation_element(
443            source_id,
444            &relation_id,
445            REL_BOOK_OF,
446            &orderbook_id,
447            &build_coin_id(&book.coin),
448            effective_from(book.time),
449        );
450        changes.push(SourceChange::Insert {
451            element: relation_element,
452        });
453    } else {
454        changes.push(SourceChange::Update { element });
455    }
456
457    Ok(changes)
458}
459
460pub fn map_funding_rate_to_changes(
461    source_id: &str,
462    coin: &str,
463    ctx: &AssetCtx,
464    initialized: &mut InitializedEntities,
465    timestamp: i64,
466) -> Result<(Vec<SourceChange>, FundingSnapshot)> {
467    let rate = parse_f64(&ctx.funding, "funding")?;
468    let premium = ctx
469        .premium
470        .as_deref()
471        .map(|s| parse_f64(s, "premium"))
472        .transpose()?
473        .unwrap_or(0.0);
474    let mark_price = parse_f64(&ctx.mark_px, "mark_px")?;
475    let open_interest = parse_f64(&ctx.open_interest, "open_interest")?;
476    let volume_24h = parse_f64(&ctx.day_ntl_vlm, "day_ntl_vlm")?;
477
478    let mut props = ElementPropertyMap::new();
479    insert_string(&mut props, "coin", coin);
480    insert_float(&mut props, "rate", rate);
481    insert_float(&mut props, "premium", premium);
482    insert_float(&mut props, "mark_price", mark_price);
483    insert_float(&mut props, "open_interest", open_interest);
484    insert_float(&mut props, "volume_24h", volume_24h);
485    insert_integer(&mut props, "timestamp", timestamp);
486
487    let funding_id = build_funding_id(coin);
488    let element = node_element(
489        source_id,
490        &funding_id,
491        FUNDING_LABEL,
492        effective_from(timestamp),
493        props,
494    );
495
496    let mut changes = Vec::new();
497    if initialized.mark_funding_rate(coin) {
498        changes.push(SourceChange::Insert { element });
499        let relation_id = build_relation_id("funding_of", &funding_id);
500        let relation_element = relation_element(
501            source_id,
502            &relation_id,
503            REL_FUNDING_OF,
504            &funding_id,
505            &build_coin_id(coin),
506            effective_from(timestamp),
507        );
508        changes.push(SourceChange::Insert {
509            element: relation_element,
510        });
511    } else {
512        changes.push(SourceChange::Update { element });
513    }
514
515    let snapshot = FundingSnapshot {
516        rate,
517        premium,
518        mark_price,
519        open_interest,
520        volume_24h,
521        timestamp,
522    };
523
524    Ok((changes, snapshot))
525}