1use crate::error::{Result, ScopeError};
8use crate::market::descriptor::{EndpointDescriptor, HttpMethod, ResponseMapping, VenueDescriptor};
9use crate::market::orderbook::{
10 OrderBook, OrderBookClient, OrderBookLevel, Ticker, TickerClient, Trade, TradeHistoryClient,
11 TradeSide,
12};
13use async_trait::async_trait;
14use reqwest::Client;
15use serde_json::Value;
16use std::time::Duration;
17
18#[derive(Debug, Clone)]
22pub struct ConfigurableExchangeClient {
23 descriptor: VenueDescriptor,
24 http: Client,
25}
26
27impl ConfigurableExchangeClient {
28 pub fn new(descriptor: VenueDescriptor) -> Self {
30 let timeout = descriptor.timeout_secs.unwrap_or(15);
31 let http = Client::builder()
32 .timeout(Duration::from_secs(timeout))
33 .build()
34 .expect("reqwest client build");
35 Self { descriptor, http }
36 }
37
38 pub fn descriptor(&self) -> &VenueDescriptor {
40 &self.descriptor
41 }
42
43 pub fn format_pair(&self, base: &str, quote: Option<&str>) -> String {
45 self.descriptor.format_pair(base, quote)
46 }
47
48 async fn fetch_endpoint(
54 &self,
55 endpoint: &EndpointDescriptor,
56 pair: &str,
57 limit: Option<u32>,
58 ) -> Result<Value> {
59 let url = format!(
60 "{}{}",
61 self.descriptor.base_url,
62 self.interpolate_path(&endpoint.path, pair)
63 );
64
65 let limit_str = limit.unwrap_or(100).to_string();
66
67 match endpoint.method {
68 HttpMethod::GET => {
69 let mut req = self.http.get(&url);
70 for (k, v) in &self.descriptor.headers {
72 req = req.header(k, v);
73 }
74 let params: Vec<(String, String)> = endpoint
76 .params
77 .iter()
78 .map(|(k, v)| (k.clone(), self.interpolate_value(v, pair, &limit_str)))
79 .collect();
80 if !params.is_empty() {
81 req = req.query(¶ms);
82 }
83
84 let resp = req.send().await?;
85 if !resp.status().is_success() {
86 return Err(ScopeError::Chain(format!(
87 "{} API error: HTTP {}",
88 self.descriptor.name,
89 resp.status()
90 )));
91 }
92 resp.json::<Value>().await.map_err(|e| {
93 ScopeError::Chain(format!("{} JSON parse error: {}", self.descriptor.name, e))
94 })
95 }
96 HttpMethod::POST => {
97 let mut req = self.http.post(&url);
98 for (k, v) in &self.descriptor.headers {
99 req = req.header(k, v);
100 }
101 if let Some(body_template) = &endpoint.request_body {
103 let body = self.interpolate_json(body_template, pair, &limit_str);
104 req = req.json(&body);
105 }
106
107 let resp = req.send().await?;
108 if !resp.status().is_success() {
109 return Err(ScopeError::Chain(format!(
110 "{} API error: HTTP {}",
111 self.descriptor.name,
112 resp.status()
113 )));
114 }
115 resp.json::<Value>().await.map_err(|e| {
116 ScopeError::Chain(format!("{} JSON parse error: {}", self.descriptor.name, e))
117 })
118 }
119 }
120 }
121
122 fn interpolate_path(&self, path: &str, pair: &str) -> String {
124 path.replace("{pair}", pair)
125 }
126
127 fn interpolate_value(&self, template: &str, pair: &str, limit: &str) -> String {
129 template.replace("{pair}", pair).replace("{limit}", limit)
130 }
131
132 fn interpolate_json(&self, value: &Value, pair: &str, limit: &str) -> Value {
134 match value {
135 Value::String(s) => Value::String(self.interpolate_value(s, pair, limit)),
136 Value::Object(map) => {
137 let mut new_map = serde_json::Map::new();
138 for (k, v) in map {
139 new_map.insert(k.clone(), self.interpolate_json(v, pair, limit));
140 }
141 Value::Object(new_map)
142 }
143 Value::Array(arr) => Value::Array(
144 arr.iter()
145 .map(|v| self.interpolate_json(v, pair, limit))
146 .collect(),
147 ),
148 other => other.clone(),
149 }
150 }
151
152 fn navigate_root<'a>(&self, root: &'a Value, path: Option<&str>) -> Result<&'a Value> {
165 let path = match path {
166 Some(p) if !p.is_empty() => p,
167 _ => return Ok(root),
168 };
169
170 let mut current = root;
171 for segment in path.split('.') {
172 if segment == "*" {
173 current = match current {
175 Value::Object(map) => map.values().next().ok_or_else(|| {
176 ScopeError::Chain(format!(
177 "{}: empty object at wildcard '*'",
178 self.descriptor.name
179 ))
180 })?,
181 _ => {
182 return Err(ScopeError::Chain(format!(
183 "{}: expected object at wildcard '*', got {:?}",
184 self.descriptor.name,
185 current_type(current)
186 )));
187 }
188 };
189 } else if let Ok(idx) = segment.parse::<usize>() {
190 current = current.get(idx).ok_or_else(|| {
192 ScopeError::Chain(format!(
193 "{}: index {} out of bounds",
194 self.descriptor.name, idx
195 ))
196 })?;
197 } else {
198 current = current.get(segment).ok_or_else(|| {
200 ScopeError::Chain(format!(
201 "{}: missing key '{}' in response",
202 self.descriptor.name, segment
203 ))
204 })?;
205 }
206 }
207 Ok(current)
208 }
209
210 fn extract_f64(&self, data: &Value, field_path: &str) -> Option<f64> {
214 let val = self.navigate_field(data, field_path)?;
215 value_to_f64(val)
216 }
217
218 fn navigate_field<'a>(&self, data: &'a Value, path: &str) -> Option<&'a Value> {
220 let mut current = data;
221 for segment in path.split('.') {
222 if let Ok(idx) = segment.parse::<usize>() {
223 current = current.get(idx)?;
224 } else {
225 current = current.get(segment)?;
226 }
227 }
228 Some(current)
229 }
230
231 fn extract_string(&self, data: &Value, field_path: &str) -> Option<String> {
233 let val = self.navigate_field(data, field_path)?;
234 match val {
235 Value::String(s) => Some(s.clone()),
236 Value::Number(n) => Some(n.to_string()),
237 _ => None,
238 }
239 }
240
241 fn parse_levels(&self, arr: &Value, mapping: &ResponseMapping) -> Result<Vec<OrderBookLevel>> {
247 let items = arr.as_array().ok_or_else(|| {
248 ScopeError::Chain(format!(
249 "{}: expected array for levels",
250 self.descriptor.name
251 ))
252 })?;
253
254 let level_format = mapping.level_format.as_deref().unwrap_or("positional");
255 let mut levels = Vec::with_capacity(items.len());
256
257 for item in items {
258 let (price, quantity) = match level_format {
259 "object" => {
260 let price_field = mapping.level_price_field.as_deref().unwrap_or("price");
261 let size_field = mapping.level_size_field.as_deref().unwrap_or("size");
262 let p = self
263 .navigate_field(item, price_field)
264 .and_then(value_to_f64);
265 let q = self.navigate_field(item, size_field).and_then(value_to_f64);
266 (p, q)
267 }
268 _ => {
269 let p = item.get(0).and_then(value_to_f64);
271 let q = item.get(1).and_then(value_to_f64);
272 (p, q)
273 }
274 };
275
276 if let (Some(price), Some(quantity)) = (price, quantity) {
277 if price > 0.0 && quantity > 0.0 {
278 levels.push(OrderBookLevel { price, quantity });
279 }
280 }
281 }
282
283 Ok(levels)
284 }
285
286 fn parse_side(&self, data: &Value, mapping: &ResponseMapping) -> TradeSide {
292 if let Some(side_mapping) = &mapping.side {
293 if let Some(val) = self.navigate_field(data, &side_mapping.field) {
294 let val_str = match val {
295 Value::String(s) => s.clone(),
296 Value::Bool(b) => b.to_string(),
297 Value::Number(n) => n.to_string(),
298 _ => return TradeSide::Buy,
299 };
300 if let Some(canonical) = side_mapping.mapping.get(&val_str) {
301 return match canonical.as_str() {
302 "sell" => TradeSide::Sell,
303 _ => TradeSide::Buy,
304 };
305 }
306 }
307 }
308 TradeSide::Buy
309 }
310}
311
312#[async_trait]
317impl OrderBookClient for ConfigurableExchangeClient {
318 async fn fetch_order_book(&self, pair_symbol: &str) -> Result<OrderBook> {
319 let endpoint = self
320 .descriptor
321 .capabilities
322 .order_book
323 .as_ref()
324 .ok_or_else(|| {
325 ScopeError::Chain(format!(
326 "{} does not support order book",
327 self.descriptor.name
328 ))
329 })?;
330
331 let json = self.fetch_endpoint(endpoint, pair_symbol, None).await?;
332 let data = self.navigate_root(&json, endpoint.response_root.as_deref())?;
333
334 let asks_key = endpoint.response.asks_key.as_deref().unwrap_or("asks");
335 let bids_key = endpoint.response.bids_key.as_deref().unwrap_or("bids");
336
337 let asks_arr = data.get(asks_key).ok_or_else(|| {
338 ScopeError::Chain(format!(
339 "{}: missing '{}' in order book response",
340 self.descriptor.name, asks_key
341 ))
342 })?;
343 let bids_arr = data.get(bids_key).ok_or_else(|| {
344 ScopeError::Chain(format!(
345 "{}: missing '{}' in order book response",
346 self.descriptor.name, bids_key
347 ))
348 })?;
349
350 let mut asks = self.parse_levels(asks_arr, &endpoint.response)?;
351 let mut bids = self.parse_levels(bids_arr, &endpoint.response)?;
352
353 asks.sort_by(|a, b| {
355 a.price
356 .partial_cmp(&b.price)
357 .unwrap_or(std::cmp::Ordering::Equal)
358 });
359 bids.sort_by(|a, b| {
360 b.price
361 .partial_cmp(&a.price)
362 .unwrap_or(std::cmp::Ordering::Equal)
363 });
364
365 let pair = format_display_pair(pair_symbol, &self.descriptor.symbol.template);
367
368 Ok(OrderBook { pair, bids, asks })
369 }
370}
371
372#[async_trait]
373impl TickerClient for ConfigurableExchangeClient {
374 async fn fetch_ticker(&self, pair_symbol: &str) -> Result<Ticker> {
375 let endpoint = self
376 .descriptor
377 .capabilities
378 .ticker
379 .as_ref()
380 .ok_or_else(|| {
381 ScopeError::Chain(format!("{} does not support ticker", self.descriptor.name))
382 })?;
383
384 let json = self.fetch_endpoint(endpoint, pair_symbol, None).await?;
385
386 let data = if let Some(filter) = &endpoint.response.filter {
388 let root = self.navigate_root(&json, endpoint.response_root.as_deref())?;
389 let items_key = endpoint.response.items_key.as_deref().unwrap_or("");
390 let items = if items_key.is_empty() {
391 root
392 } else {
393 root.get(items_key).unwrap_or(root)
394 };
395 let arr = items.as_array().ok_or_else(|| {
396 ScopeError::Chain(format!(
397 "{}: expected array for ticker filter",
398 self.descriptor.name
399 ))
400 })?;
401 let filter_value = filter.value.replace("{pair}", pair_symbol);
402 arr.iter()
403 .find(|item| {
404 item.get(&filter.field)
405 .and_then(|v| v.as_str())
406 .is_some_and(|s| s == filter_value)
407 })
408 .ok_or_else(|| {
409 ScopeError::Chain(format!(
410 "{}: no ticker found for pair {}",
411 self.descriptor.name, pair_symbol
412 ))
413 })?
414 .clone()
415 } else {
416 self.navigate_root(&json, endpoint.response_root.as_deref())?
417 .clone()
418 };
419
420 let r = &endpoint.response;
421 let pair = format_display_pair(pair_symbol, &self.descriptor.symbol.template);
422
423 Ok(Ticker {
424 pair,
425 last_price: r
426 .last_price
427 .as_ref()
428 .and_then(|f| self.extract_f64(&data, f)),
429 high_24h: r.high_24h.as_ref().and_then(|f| self.extract_f64(&data, f)),
430 low_24h: r.low_24h.as_ref().and_then(|f| self.extract_f64(&data, f)),
431 volume_24h: r
432 .volume_24h
433 .as_ref()
434 .and_then(|f| self.extract_f64(&data, f)),
435 quote_volume_24h: r
436 .quote_volume_24h
437 .as_ref()
438 .and_then(|f| self.extract_f64(&data, f)),
439 best_bid: r.best_bid.as_ref().and_then(|f| self.extract_f64(&data, f)),
440 best_ask: r.best_ask.as_ref().and_then(|f| self.extract_f64(&data, f)),
441 })
442 }
443}
444
445#[async_trait]
446impl TradeHistoryClient for ConfigurableExchangeClient {
447 async fn fetch_recent_trades(&self, pair_symbol: &str, limit: u32) -> Result<Vec<Trade>> {
448 let endpoint = self
449 .descriptor
450 .capabilities
451 .trades
452 .as_ref()
453 .ok_or_else(|| {
454 ScopeError::Chain(format!("{} does not support trades", self.descriptor.name))
455 })?;
456
457 let json = self
458 .fetch_endpoint(endpoint, pair_symbol, Some(limit))
459 .await?;
460 let data = self.navigate_root(&json, endpoint.response_root.as_deref())?;
461
462 let items_key = endpoint.response.items_key.as_deref().unwrap_or("");
464 let arr = if items_key.is_empty() {
465 data
466 } else {
467 data.get(items_key).unwrap_or(data)
468 };
469
470 let items = arr.as_array().ok_or_else(|| {
471 ScopeError::Chain(format!(
472 "{}: expected array for trades",
473 self.descriptor.name
474 ))
475 })?;
476
477 let r = &endpoint.response;
478 let mut trades = Vec::with_capacity(items.len());
479
480 for item in items {
481 let price = r.price.as_ref().and_then(|f| self.extract_f64(item, f));
482 let quantity = r.quantity.as_ref().and_then(|f| self.extract_f64(item, f));
483
484 if let (Some(price), Some(quantity)) = (price, quantity) {
485 let quote_quantity = r
486 .quote_quantity
487 .as_ref()
488 .and_then(|f| self.extract_f64(item, f));
489 let timestamp_ms = r
490 .timestamp_ms
491 .as_ref()
492 .and_then(|f| self.extract_f64(item, f))
493 .map(|v| v as u64)
494 .unwrap_or(0);
495 let id = r.id.as_ref().and_then(|f| self.extract_string(item, f));
496 let side = self.parse_side(item, r);
497
498 trades.push(Trade {
499 price,
500 quantity,
501 quote_quantity,
502 timestamp_ms,
503 side,
504 id,
505 });
506 }
507 }
508
509 Ok(trades)
510 }
511}
512
513fn value_to_f64(val: &Value) -> Option<f64> {
519 match val {
520 Value::Number(n) => n.as_f64(),
521 Value::String(s) => s.parse::<f64>().ok(),
522 _ => None,
523 }
524}
525
526fn current_type(val: &Value) -> &'static str {
528 match val {
529 Value::Null => "null",
530 Value::Bool(_) => "bool",
531 Value::Number(_) => "number",
532 Value::String(_) => "string",
533 Value::Array(_) => "array",
534 Value::Object(_) => "object",
535 }
536}
537
538fn format_display_pair(raw: &str, template: &str) -> String {
540 let sep = if template.contains('_') {
542 "_"
543 } else if template.contains('-') {
544 "-"
545 } else {
546 ""
547 };
548
549 if !sep.is_empty() {
550 raw.replace(sep, "/")
551 } else {
552 let upper = raw.to_uppercase();
554 for quote in &["USDT", "USD", "USDC", "BTC", "ETH", "EUR", "GBP"] {
555 if upper.ends_with(quote) {
556 let base_end = raw.len() - quote.len();
557 if base_end > 0 {
558 return format!("{}/{}", &raw[..base_end], &raw[base_end..]);
559 }
560 }
561 }
562 raw.to_string()
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569
570 #[test]
571 fn test_format_display_pair_underscore() {
572 assert_eq!(
573 format_display_pair("BTC_USDT", "{base}_{quote}"),
574 "BTC/USDT"
575 );
576 }
577
578 #[test]
579 fn test_format_display_pair_dash() {
580 assert_eq!(
581 format_display_pair("BTC-USDT", "{base}-{quote}"),
582 "BTC/USDT"
583 );
584 }
585
586 #[test]
587 fn test_format_display_pair_concatenated() {
588 assert_eq!(format_display_pair("BTCUSDT", "{base}{quote}"), "BTC/USDT");
589 assert_eq!(format_display_pair("ETHUSD", "{base}{quote}"), "ETH/USD");
590 }
591
592 #[test]
593 fn test_value_to_f64_number() {
594 let val = serde_json::json!(42.5);
595 assert_eq!(value_to_f64(&val), Some(42.5));
596 }
597
598 #[test]
599 fn test_value_to_f64_string() {
600 let val = serde_json::json!("42.5");
601 assert_eq!(value_to_f64(&val), Some(42.5));
602 }
603
604 #[test]
605 fn test_value_to_f64_invalid() {
606 let val = serde_json::json!(null);
607 assert_eq!(value_to_f64(&val), None);
608 }
609
610 #[test]
611 fn test_navigate_root_empty() {
612 let desc = make_test_descriptor();
613 let client = ConfigurableExchangeClient::new(desc);
614 let json = serde_json::json!({"price": 42});
615 let result = client.navigate_root(&json, None).unwrap();
616 assert_eq!(result, &json);
617 }
618
619 #[test]
620 fn test_navigate_root_single_key() {
621 let desc = make_test_descriptor();
622 let client = ConfigurableExchangeClient::new(desc);
623 let json = serde_json::json!({"result": {"price": 42}});
624 let result = client.navigate_root(&json, Some("result")).unwrap();
625 assert_eq!(result, &serde_json::json!({"price": 42}));
626 }
627
628 #[test]
629 fn test_navigate_root_nested_with_index() {
630 let desc = make_test_descriptor();
631 let client = ConfigurableExchangeClient::new(desc);
632 let json = serde_json::json!({"data": [{"price": 42}, {"price": 43}]});
633 let result = client.navigate_root(&json, Some("data.0")).unwrap();
634 assert_eq!(result, &serde_json::json!({"price": 42}));
635 }
636
637 #[test]
638 fn test_navigate_root_wildcard() {
639 let desc = make_test_descriptor();
640 let client = ConfigurableExchangeClient::new(desc);
641 let json = serde_json::json!({"result": {"XXBTZUSD": {"a": ["42000.0"]}}});
642 let result = client.navigate_root(&json, Some("result.*")).unwrap();
643 assert_eq!(result, &serde_json::json!({"a": ["42000.0"]}));
644 }
645
646 #[test]
647 fn test_extract_f64_nested() {
648 let desc = make_test_descriptor();
649 let client = ConfigurableExchangeClient::new(desc);
650 let data = serde_json::json!({"c": ["42000.5", "1.5"]});
651 assert_eq!(client.extract_f64(&data, "c.0"), Some(42000.5));
652 assert_eq!(client.extract_f64(&data, "c.1"), Some(1.5));
653 }
654
655 #[test]
656 fn test_parse_positional_levels() {
657 let desc = make_test_descriptor();
658 let client = ConfigurableExchangeClient::new(desc);
659 let arr = serde_json::json!([["42000.0", "1.5"], ["42001.0", "2.0"]]);
660 let mapping = ResponseMapping {
661 level_format: Some("positional".to_string()),
662 ..Default::default()
663 };
664 let levels = client.parse_levels(&arr, &mapping).unwrap();
665 assert_eq!(levels.len(), 2);
666 assert_eq!(levels[0].price, 42000.0);
667 assert_eq!(levels[0].quantity, 1.5);
668 }
669
670 #[test]
671 fn test_parse_object_levels() {
672 let desc = make_test_descriptor();
673 let client = ConfigurableExchangeClient::new(desc);
674 let arr = serde_json::json!([
675 {"price": "42000.0", "size": "1.5"},
676 {"price": "42001.0", "size": "2.0"}
677 ]);
678 let mapping = ResponseMapping {
679 level_format: Some("object".to_string()),
680 level_price_field: Some("price".to_string()),
681 level_size_field: Some("size".to_string()),
682 ..Default::default()
683 };
684 let levels = client.parse_levels(&arr, &mapping).unwrap();
685 assert_eq!(levels.len(), 2);
686 assert_eq!(levels[0].price, 42000.0);
687 assert_eq!(levels[0].quantity, 1.5);
688 }
689
690 #[test]
691 fn test_parse_side_mapping() {
692 let desc = make_test_descriptor();
693 let client = ConfigurableExchangeClient::new(desc);
694 let data = serde_json::json!({"isBuyerMaker": true});
695 let mapping = ResponseMapping {
696 side: Some(crate::market::descriptor::SideMapping {
697 field: "isBuyerMaker".to_string(),
698 mapping: [
699 ("true".to_string(), "sell".to_string()),
700 ("false".to_string(), "buy".to_string()),
701 ]
702 .into_iter()
703 .collect(),
704 }),
705 ..Default::default()
706 };
707 assert_eq!(client.parse_side(&data, &mapping), TradeSide::Sell);
708 }
709
710 #[test]
711 fn test_interpolate_json() {
712 let desc = make_test_descriptor();
713 let client = ConfigurableExchangeClient::new(desc);
714 let template = serde_json::json!({
715 "method": "get-book",
716 "params": {"instrument": "{pair}", "depth": "{limit}"}
717 });
718 let result = client.interpolate_json(&template, "BTC_USDT", "100");
719 assert_eq!(
720 result,
721 serde_json::json!({
722 "method": "get-book",
723 "params": {"instrument": "BTC_USDT", "depth": "100"}
724 })
725 );
726 }
727
728 fn make_test_descriptor() -> VenueDescriptor {
729 use crate::market::descriptor::*;
730 VenueDescriptor {
731 id: "test".to_string(),
732 name: "Test".to_string(),
733 base_url: "https://example.com".to_string(),
734 timeout_secs: Some(5),
735 rate_limit_per_sec: None,
736 symbol: SymbolConfig {
737 template: "{base}{quote}".to_string(),
738 default_quote: "USDT".to_string(),
739 case: SymbolCase::Upper,
740 },
741 headers: std::collections::HashMap::new(),
742 capabilities: CapabilitySet::default(),
743 }
744 }
745}