1use crate::types::{
18 AssetCtx, FundingSnapshot, L2Book, Liquidation, MetaResponse, SpotMetaResponse, Trade,
19};
20use anyhow::{anyhow, Result};
21use drasi_core::models::{
22 Element, ElementMetadata, ElementPropertyMap, ElementReference, ElementValue, SourceChange,
23};
24use ordered_float::OrderedFloat;
25use std::collections::{HashMap, HashSet};
26use std::sync::Arc;
27
28const COIN_LABEL: &str = "Coin";
29const TRADE_LABEL: &str = "Trade";
30const MIDPRICE_LABEL: &str = "MidPrice";
31const ORDERBOOK_LABEL: &str = "OrderBook";
32const LIQUIDATION_LABEL: &str = "Liquidation";
33const FUNDING_LABEL: &str = "FundingRate";
34const SPOTPAIR_LABEL: &str = "SpotPair";
35
36const REL_TRADED_ON: &str = "TRADED_ON";
37const REL_PRICE_OF: &str = "PRICE_OF";
38const REL_BOOK_OF: &str = "BOOK_OF";
39const REL_LIQUIDATED_ON: &str = "LIQUIDATED_ON";
40const REL_FUNDING_OF: &str = "FUNDING_OF";
41
42#[derive(Debug, Default)]
43pub struct InitializedEntities {
44 mid_prices: HashSet<String>,
45 order_books: HashSet<String>,
46 funding_rates: HashSet<String>,
47}
48
49impl InitializedEntities {
50 pub fn new() -> Self {
51 Self::default()
52 }
53
54 pub fn mark_mid_price(&mut self, coin: &str) -> bool {
55 self.mid_prices.insert(coin.to_string())
56 }
57
58 pub fn mark_order_book(&mut self, coin: &str) -> bool {
59 self.order_books.insert(coin.to_string())
60 }
61
62 pub fn mark_funding_rate(&mut self, coin: &str) -> bool {
63 self.funding_rates.insert(coin.to_string())
64 }
65}
66
67pub fn build_coin_id(coin: &str) -> String {
68 format!("coin:{coin}")
69}
70
71fn build_trade_id(coin: &str, tid: u64) -> String {
72 format!("trade:{coin}:{tid}")
73}
74
75fn build_midprice_id(coin: &str) -> String {
76 format!("midprice:{coin}")
77}
78
79fn build_orderbook_id(coin: &str) -> String {
80 format!("orderbook:{coin}")
81}
82
83fn build_liquidation_id(coin: &str, timestamp: i64, hash: &Option<String>) -> String {
84 if let Some(hash) = hash {
85 format!("liquidation:{coin}:{hash}")
86 } else {
87 format!("liquidation:{coin}:{timestamp}")
88 }
89}
90
91fn build_funding_id(coin: &str) -> String {
92 format!("funding:{coin}")
93}
94
95fn build_spot_pair_id(name: &str) -> String {
96 format!("spotpair:{name}")
97}
98
99fn build_relation_id(label: &str, source_element_id: &str) -> String {
100 format!("{label}:{source_element_id}")
101}
102
103fn labels(label: &str) -> Arc<[Arc<str>]> {
104 vec![Arc::from(label)].into()
105}
106
107fn element_metadata(
108 source_id: &str,
109 element_id: &str,
110 label: &str,
111 effective_from: u64,
112) -> ElementMetadata {
113 ElementMetadata {
114 reference: ElementReference::new(source_id, element_id),
115 labels: labels(label),
116 effective_from,
117 }
118}
119
120fn relation_element(
121 source_id: &str,
122 relation_id: &str,
123 label: &str,
124 from_id: &str,
125 to_id: &str,
126 effective_from: u64,
127) -> Element {
128 Element::Relation {
129 metadata: element_metadata(source_id, relation_id, label, effective_from),
130 properties: ElementPropertyMap::new(),
131 in_node: ElementReference::new(source_id, from_id),
132 out_node: ElementReference::new(source_id, to_id),
133 }
134}
135
136fn node_element(
137 source_id: &str,
138 element_id: &str,
139 label: &str,
140 effective_from: u64,
141 properties: ElementPropertyMap,
142) -> Element {
143 Element::Node {
144 metadata: element_metadata(source_id, element_id, label, effective_from),
145 properties,
146 }
147}
148
149fn insert_string(props: &mut ElementPropertyMap, key: &str, value: &str) {
150 props.insert(key, ElementValue::String(Arc::from(value)));
151}
152
153fn insert_float(props: &mut ElementPropertyMap, key: &str, value: f64) {
154 props.insert(key, ElementValue::Float(OrderedFloat(value)));
155}
156
157fn insert_integer(props: &mut ElementPropertyMap, key: &str, value: i64) {
158 props.insert(key, ElementValue::Integer(value));
159}
160
161fn parse_f64(value: &str, field: &str) -> Result<f64> {
162 value
163 .parse::<f64>()
164 .map_err(|e| anyhow!("Failed to parse {field} value '{value}': {e}"))
165}
166
167fn effective_from(timestamp: i64) -> u64 {
168 if timestamp <= 0 {
169 0
170 } else {
171 timestamp as u64
172 }
173}
174
175pub fn map_meta_to_coin_changes(source_id: &str, meta: &MetaResponse) -> Result<Vec<SourceChange>> {
176 let mut changes = Vec::new();
177
178 for asset in &meta.universe {
179 let mut props = ElementPropertyMap::new();
180 insert_string(&mut props, "name", &asset.name);
181 insert_integer(&mut props, "sz_decimals", asset.sz_decimals as i64);
182 insert_integer(&mut props, "max_leverage", asset.max_leverage as i64);
183 insert_string(&mut props, "market_type", "perp");
184
185 let element = node_element(source_id, &build_coin_id(&asset.name), COIN_LABEL, 0, props);
186 changes.push(SourceChange::Insert { element });
187 }
188
189 Ok(changes)
190}
191
192pub fn map_spot_meta_to_nodes(
193 source_id: &str,
194 spot_meta: &SpotMetaResponse,
195 existing_coins: &mut HashSet<String>,
196) -> Result<Vec<SourceChange>> {
197 let mut changes = Vec::new();
198
199 let token_lookup: HashMap<u32, String> = spot_meta
200 .tokens
201 .iter()
202 .map(|token| (token.index, token.name.clone()))
203 .collect();
204
205 for token in &spot_meta.tokens {
206 if existing_coins.insert(token.name.clone()) {
207 let mut props = ElementPropertyMap::new();
208 insert_string(&mut props, "name", &token.name);
209 insert_integer(&mut props, "sz_decimals", token.sz_decimals as i64);
210 insert_integer(&mut props, "max_leverage", 0);
211 insert_string(&mut props, "market_type", "spot");
212
213 let element =
214 node_element(source_id, &build_coin_id(&token.name), COIN_LABEL, 0, props);
215 changes.push(SourceChange::Insert { element });
216 }
217 }
218
219 for pair in &spot_meta.universe {
220 let mut props = ElementPropertyMap::new();
221 insert_string(&mut props, "name", &pair.name);
222
223 let token_names: Vec<ElementValue> = pair
224 .tokens
225 .iter()
226 .filter_map(|idx| token_lookup.get(idx))
227 .map(|name| ElementValue::String(Arc::from(name.as_str())))
228 .collect();
229 props.insert("tokens", ElementValue::List(token_names));
230
231 let element = node_element(
232 source_id,
233 &build_spot_pair_id(&pair.name),
234 SPOTPAIR_LABEL,
235 0,
236 props,
237 );
238 changes.push(SourceChange::Insert { element });
239 }
240
241 Ok(changes)
242}
243
244pub fn map_trade_to_changes(source_id: &str, trade: &Trade) -> Result<Vec<SourceChange>> {
245 let price = parse_f64(&trade.px, "trade.px")?;
246 let size = parse_f64(&trade.sz, "trade.sz")?;
247
248 let mut props = ElementPropertyMap::new();
249 insert_string(&mut props, "coin", &trade.coin);
250 insert_string(&mut props, "side", &trade.side);
251 insert_float(&mut props, "price", price);
252 insert_float(&mut props, "size", size);
253 insert_integer(&mut props, "timestamp", trade.time);
254 insert_integer(&mut props, "tid", trade.tid as i64);
255 if let Some(hash) = &trade.hash {
256 insert_string(&mut props, "hash", hash);
257 }
258
259 let trade_id = build_trade_id(&trade.coin, trade.tid);
260 let trade_element = node_element(
261 source_id,
262 &trade_id,
263 TRADE_LABEL,
264 effective_from(trade.time),
265 props,
266 );
267
268 let relation_id = build_relation_id("traded_on", &trade_id);
269 let relation_element = relation_element(
270 source_id,
271 &relation_id,
272 REL_TRADED_ON,
273 &trade_id,
274 &build_coin_id(&trade.coin),
275 effective_from(trade.time),
276 );
277
278 Ok(vec![
279 SourceChange::Insert {
280 element: trade_element,
281 },
282 SourceChange::Insert {
283 element: relation_element,
284 },
285 ])
286}
287
288pub fn map_liquidation_to_changes(
289 source_id: &str,
290 liquidation: &Liquidation,
291) -> Result<Vec<SourceChange>> {
292 let price = parse_f64(&liquidation.px, "liquidation.px")?;
293 let size = parse_f64(&liquidation.sz, "liquidation.sz")?;
294
295 let mut props = ElementPropertyMap::new();
296 insert_string(&mut props, "coin", &liquidation.coin);
297 insert_string(&mut props, "side", &liquidation.side);
298 insert_float(&mut props, "price", price);
299 insert_float(&mut props, "size", size);
300 insert_integer(&mut props, "timestamp", liquidation.time);
301 if let Some(hash) = &liquidation.hash {
302 insert_string(&mut props, "hash", hash);
303 }
304
305 let liquidation_id =
306 build_liquidation_id(&liquidation.coin, liquidation.time, &liquidation.hash);
307 let liquidation_element = node_element(
308 source_id,
309 &liquidation_id,
310 LIQUIDATION_LABEL,
311 effective_from(liquidation.time),
312 props,
313 );
314
315 let relation_id = build_relation_id("liquidated_on", &liquidation_id);
316 let relation_element = relation_element(
317 source_id,
318 &relation_id,
319 REL_LIQUIDATED_ON,
320 &liquidation_id,
321 &build_coin_id(&liquidation.coin),
322 effective_from(liquidation.time),
323 );
324
325 Ok(vec![
326 SourceChange::Insert {
327 element: liquidation_element,
328 },
329 SourceChange::Insert {
330 element: relation_element,
331 },
332 ])
333}
334
335pub fn map_mid_prices_to_changes(
336 source_id: &str,
337 mids: &HashMap<String, String>,
338 initialized: &mut InitializedEntities,
339 timestamp: i64,
340) -> Result<Vec<SourceChange>> {
341 let mut changes = Vec::new();
342
343 for (coin, price_str) in mids {
344 let price = parse_f64(price_str, "midprice")?;
345 let mut props = ElementPropertyMap::new();
346 insert_string(&mut props, "coin", coin);
347 insert_float(&mut props, "price", price);
348 insert_integer(&mut props, "timestamp", timestamp);
349
350 let mid_id = build_midprice_id(coin);
351 let element = node_element(
352 source_id,
353 &mid_id,
354 MIDPRICE_LABEL,
355 effective_from(timestamp),
356 props,
357 );
358
359 if initialized.mark_mid_price(coin) {
360 changes.push(SourceChange::Insert { element });
361
362 let relation_id = build_relation_id("price_of", &mid_id);
363 let relation_element = relation_element(
364 source_id,
365 &relation_id,
366 REL_PRICE_OF,
367 &mid_id,
368 &build_coin_id(coin),
369 effective_from(timestamp),
370 );
371 changes.push(SourceChange::Insert {
372 element: relation_element,
373 });
374 } else {
375 changes.push(SourceChange::Update { element });
376 }
377 }
378
379 Ok(changes)
380}
381
382pub fn map_order_book_to_changes(
383 source_id: &str,
384 book: &L2Book,
385 initialized: &mut InitializedEntities,
386) -> Result<Vec<SourceChange>> {
387 let bid_levels = match book.levels.first() {
388 Some(levels) if !levels.is_empty() => levels,
389 _ => return Ok(Vec::new()),
390 };
391 let ask_levels = match book.levels.get(1) {
392 Some(levels) if !levels.is_empty() => levels,
393 _ => return Ok(Vec::new()),
394 };
395
396 let best_bid = bid_levels
397 .first()
398 .ok_or_else(|| anyhow!("Order book missing best bid"))?;
399 let best_ask = ask_levels
400 .first()
401 .ok_or_else(|| anyhow!("Order book missing best ask"))?;
402
403 let mut props = ElementPropertyMap::new();
404 insert_string(&mut props, "coin", &book.coin);
405 insert_float(
406 &mut props,
407 "best_bid_price",
408 parse_f64(&best_bid.px, "best_bid_price")?,
409 );
410 insert_float(
411 &mut props,
412 "best_bid_size",
413 parse_f64(&best_bid.sz, "best_bid_size")?,
414 );
415 insert_float(
416 &mut props,
417 "best_ask_price",
418 parse_f64(&best_ask.px, "best_ask_price")?,
419 );
420 insert_float(
421 &mut props,
422 "best_ask_size",
423 parse_f64(&best_ask.sz, "best_ask_size")?,
424 );
425 insert_integer(&mut props, "bid_depth", bid_levels.len() as i64);
426 insert_integer(&mut props, "ask_depth", ask_levels.len() as i64);
427 insert_integer(&mut props, "timestamp", book.time);
428
429 let orderbook_id = build_orderbook_id(&book.coin);
430 let element = node_element(
431 source_id,
432 &orderbook_id,
433 ORDERBOOK_LABEL,
434 effective_from(book.time),
435 props,
436 );
437
438 let mut changes = Vec::new();
439 if initialized.mark_order_book(&book.coin) {
440 changes.push(SourceChange::Insert { element });
441 let relation_id = build_relation_id("book_of", &orderbook_id);
442 let relation_element = relation_element(
443 source_id,
444 &relation_id,
445 REL_BOOK_OF,
446 &orderbook_id,
447 &build_coin_id(&book.coin),
448 effective_from(book.time),
449 );
450 changes.push(SourceChange::Insert {
451 element: relation_element,
452 });
453 } else {
454 changes.push(SourceChange::Update { element });
455 }
456
457 Ok(changes)
458}
459
460pub fn map_funding_rate_to_changes(
461 source_id: &str,
462 coin: &str,
463 ctx: &AssetCtx,
464 initialized: &mut InitializedEntities,
465 timestamp: i64,
466) -> Result<(Vec<SourceChange>, FundingSnapshot)> {
467 let rate = parse_f64(&ctx.funding, "funding")?;
468 let premium = ctx
469 .premium
470 .as_deref()
471 .map(|s| parse_f64(s, "premium"))
472 .transpose()?
473 .unwrap_or(0.0);
474 let mark_price = parse_f64(&ctx.mark_px, "mark_px")?;
475 let open_interest = parse_f64(&ctx.open_interest, "open_interest")?;
476 let volume_24h = parse_f64(&ctx.day_ntl_vlm, "day_ntl_vlm")?;
477
478 let mut props = ElementPropertyMap::new();
479 insert_string(&mut props, "coin", coin);
480 insert_float(&mut props, "rate", rate);
481 insert_float(&mut props, "premium", premium);
482 insert_float(&mut props, "mark_price", mark_price);
483 insert_float(&mut props, "open_interest", open_interest);
484 insert_float(&mut props, "volume_24h", volume_24h);
485 insert_integer(&mut props, "timestamp", timestamp);
486
487 let funding_id = build_funding_id(coin);
488 let element = node_element(
489 source_id,
490 &funding_id,
491 FUNDING_LABEL,
492 effective_from(timestamp),
493 props,
494 );
495
496 let mut changes = Vec::new();
497 if initialized.mark_funding_rate(coin) {
498 changes.push(SourceChange::Insert { element });
499 let relation_id = build_relation_id("funding_of", &funding_id);
500 let relation_element = relation_element(
501 source_id,
502 &relation_id,
503 REL_FUNDING_OF,
504 &funding_id,
505 &build_coin_id(coin),
506 effective_from(timestamp),
507 );
508 changes.push(SourceChange::Insert {
509 element: relation_element,
510 });
511 } else {
512 changes.push(SourceChange::Update { element });
513 }
514
515 let snapshot = FundingSnapshot {
516 rate,
517 premium,
518 mark_price,
519 open_interest,
520 volume_24h,
521 timestamp,
522 };
523
524 Ok((changes, snapshot))
525}