nt_execution/
ccxt_broker.rs

1// CCXT cryptocurrency exchange integration
2//
3// Unified API for 100+ cryptocurrency exchanges
4// Features:
5// - Unified order placement across exchanges
6// - Real-time orderbook and ticker data
7// - Balance and position management
8// - Cross-exchange arbitrage support
9
10use crate::broker::{
11    Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
12};
13use crate::{OrderRequest, OrderResponse, OrderStatus, Symbol};
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use hmac::{Hmac, Mac};
17use reqwest::{Client, Method};
18use rust_decimal::Decimal;
19use serde::{Deserialize, Serialize};
20use serde_json::Value;
21use sha2::Sha256;
22use std::collections::HashMap;
23use std::str::FromStr;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::sync::RwLock;
27use tracing::{debug, error, info};
28use uuid::Uuid;
29
30type HmacSha256 = Hmac<Sha256>;
31
32/// CCXT exchange configuration
33#[derive(Debug, Clone)]
34pub struct CCXTConfig {
35    /// Exchange name (binance, coinbase, kraken, etc.)
36    pub exchange: String,
37    /// API key
38    pub api_key: String,
39    /// API secret
40    pub secret: String,
41    /// Additional password (for some exchanges)
42    pub password: Option<String>,
43    /// Sandbox/testnet mode
44    pub sandbox: bool,
45    /// Request timeout
46    pub timeout: Duration,
47}
48
49/// CCXT broker client supporting multiple exchanges
50pub struct CCXTBroker {
51    client: Client,
52    config: CCXTConfig,
53    exchange_config: ExchangeInfo,
54    balances: Arc<RwLock<HashMap<String, Decimal>>>,
55    positions: Arc<RwLock<Vec<Position>>>,
56}
57
58impl CCXTBroker {
59    /// Create a new CCXT broker for a specific exchange
60    pub fn new(config: CCXTConfig) -> Result<Self, BrokerError> {
61        let client = Client::builder()
62            .timeout(config.timeout)
63            .build()
64            .expect("Failed to create HTTP client");
65
66        let exchange_config = Self::get_exchange_config(&config.exchange)?;
67
68        Ok(Self {
69            client,
70            config,
71            exchange_config,
72            balances: Arc::new(RwLock::new(HashMap::new())),
73            positions: Arc::new(RwLock::new(Vec::new())),
74        })
75    }
76
77    /// Get exchange-specific configuration
78    fn get_exchange_config(exchange: &str) -> Result<ExchangeInfo, BrokerError> {
79        match exchange.to_lowercase().as_str() {
80            "binance" => Ok(ExchangeInfo {
81                name: "binance".to_string(),
82                base_url: "https://api.binance.com".to_string(),
83                testnet_url: Some("https://testnet.binance.vision".to_string()),
84                has_futures: true,
85                has_margin: true,
86                rate_limit: Duration::from_millis(100),
87            }),
88            "coinbase" => Ok(ExchangeInfo {
89                name: "coinbase".to_string(),
90                base_url: "https://api.exchange.coinbase.com".to_string(),
91                testnet_url: Some("https://api-public.sandbox.exchange.coinbase.com".to_string()),
92                has_futures: false,
93                has_margin: true,
94                rate_limit: Duration::from_millis(100),
95            }),
96            "kraken" => Ok(ExchangeInfo {
97                name: "kraken".to_string(),
98                base_url: "https://api.kraken.com".to_string(),
99                testnet_url: None,
100                has_futures: true,
101                has_margin: true,
102                rate_limit: Duration::from_millis(100),
103            }),
104            _ => Err(BrokerError::InvalidOrder(format!(
105                "Unsupported exchange: {}",
106                exchange
107            ))),
108        }
109    }
110
111    /// Get base URL for requests
112    fn base_url(&self) -> &str {
113        if self.config.sandbox {
114            self.exchange_config
115                .testnet_url
116                .as_ref()
117                .unwrap_or(&self.exchange_config.base_url)
118        } else {
119            &self.exchange_config.base_url
120        }
121    }
122
123    /// Sign request with HMAC-SHA256
124    fn sign_request(&self, message: &str) -> String {
125        let mut mac = HmacSha256::new_from_slice(self.config.secret.as_bytes())
126            .expect("HMAC can take key of any size");
127        mac.update(message.as_bytes());
128        let result = mac.finalize();
129        hex::encode(result.into_bytes())
130    }
131
132    /// Make authenticated request to exchange
133    async fn request<T: serde::de::DeserializeOwned>(
134        &self,
135        method: Method,
136        endpoint: &str,
137        params: Option<HashMap<String, String>>,
138    ) -> Result<T, BrokerError> {
139        let url = format!("{}{}", self.base_url(), endpoint);
140        let timestamp = Utc::now().timestamp_millis().to_string();
141
142        let mut req = self.client.request(method.clone(), &url);
143
144        // Add exchange-specific headers and signing
145        match self.exchange_config.name.as_str() {
146            "binance" => {
147                let mut query_params = params.unwrap_or_default();
148                query_params.insert("timestamp".to_string(), timestamp.clone());
149                query_params.insert("recvWindow".to_string(), "5000".to_string());
150
151                let query_string = serde_urlencoded::to_string(&query_params)
152                    .map_err(|e| BrokerError::Parse(e.to_string()))?;
153                let signature = self.sign_request(&query_string);
154                query_params.insert("signature".to_string(), signature);
155
156                req = req
157                    .query(&query_params)
158                    .header("X-MBX-APIKEY", &self.config.api_key);
159            }
160            "coinbase" => {
161                let timestamp = Utc::now().timestamp();
162                let message = format!("{}{}{}", timestamp, method.as_str(), endpoint);
163                let signature = self.sign_request(&message);
164                let b64_signature = base64::encode(signature);
165
166                req = req
167                    .header("CB-ACCESS-KEY", &self.config.api_key)
168                    .header("CB-ACCESS-SIGN", b64_signature)
169                    .header("CB-ACCESS-TIMESTAMP", timestamp.to_string())
170                    .header("CB-ACCESS-PASSPHRASE", self.config.password.as_ref().unwrap_or(&String::new()));
171            }
172            "kraken" => {
173                // Kraken uses nonce and API-Sign
174                let nonce = Utc::now().timestamp_millis().to_string();
175                let mut post_data = params.unwrap_or_default();
176                post_data.insert("nonce".to_string(), nonce.clone());
177
178                let post_string = serde_urlencoded::to_string(&post_data)
179                    .map_err(|e| BrokerError::Parse(e.to_string()))?;
180                let message = format!("{}{}{}", nonce, endpoint, post_string);
181                let signature = self.sign_request(&message);
182
183                req = req
184                    .header("API-Key", &self.config.api_key)
185                    .header("API-Sign", signature)
186                    .body(post_string);
187            }
188            _ => {}
189        }
190
191        debug!("CCXT API request: {} {}", method, url);
192
193        let response = req.send().await?;
194
195        if response.status().is_success() {
196            let result = response.json().await?;
197            Ok(result)
198        } else {
199            let error_text = response.text().await.unwrap_or_default();
200            error!("CCXT API error: {}", error_text);
201            Err(BrokerError::Other(anyhow::anyhow!("API error: {}", error_text)))
202        }
203    }
204
205    /// Fetch balances from exchange
206    async fn fetch_balances(&self) -> Result<HashMap<String, Decimal>, BrokerError> {
207        let endpoint = match self.exchange_config.name.as_str() {
208            "binance" => "/api/v3/account",
209            "coinbase" => "/accounts",
210            "kraken" => "/0/private/Balance",
211            _ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
212        };
213
214        let response: Value = self.request(Method::GET, endpoint, None).await?;
215
216        let mut balances = HashMap::new();
217
218        // Parse exchange-specific response format
219        match self.exchange_config.name.as_str() {
220            "binance" => {
221                if let Some(balance_array) = response.get("balances").and_then(|v| v.as_array()) {
222                    for balance in balance_array {
223                        if let (Some(asset), Some(free)) = (
224                            balance.get("asset").and_then(|v| v.as_str()),
225                            balance.get("free").and_then(|v| v.as_str()),
226                        ) {
227                            if let Ok(amount) = Decimal::from_str(free) {
228                                if amount > Decimal::ZERO {
229                                    balances.insert(asset.to_string(), amount);
230                                }
231                            }
232                        }
233                    }
234                }
235            }
236            "coinbase" => {
237                if let Some(accounts) = response.as_array() {
238                    for account in accounts {
239                        if let (Some(currency), Some(balance)) = (
240                            account.get("currency").and_then(|v| v.as_str()),
241                            account.get("balance").and_then(|v| v.as_str()),
242                        ) {
243                            if let Ok(amount) = Decimal::from_str(balance) {
244                                if amount > Decimal::ZERO {
245                                    balances.insert(currency.to_string(), amount);
246                                }
247                            }
248                        }
249                    }
250                }
251            }
252            "kraken" => {
253                if let Some(result) = response.get("result").and_then(|v| v.as_object()) {
254                    for (asset, amount) in result {
255                        if let Some(amount_str) = amount.as_str() {
256                            if let Ok(amount_dec) = Decimal::from_str(amount_str) {
257                                if amount_dec > Decimal::ZERO {
258                                    balances.insert(asset.clone(), amount_dec);
259                                }
260                            }
261                        }
262                    }
263                }
264            }
265            _ => {}
266        }
267
268        *self.balances.write().await = balances.clone();
269        Ok(balances)
270    }
271}
272
273#[async_trait]
274impl BrokerClient for CCXTBroker {
275    async fn get_account(&self) -> Result<Account, BrokerError> {
276        let balances = self.fetch_balances().await?;
277
278        let total_value = balances
279            .values()
280            .fold(Decimal::ZERO, |acc, balance| acc + *balance);
281
282        Ok(Account {
283            account_id: self.config.exchange.clone(),
284            cash: total_value,
285            portfolio_value: total_value,
286            buying_power: total_value,
287            equity: total_value,
288            last_equity: total_value,
289            multiplier: "1".to_string(),
290            currency: "USD".to_string(),
291            shorting_enabled: self.exchange_config.has_margin,
292            long_market_value: total_value,
293            short_market_value: Decimal::ZERO,
294            initial_margin: Decimal::ZERO,
295            maintenance_margin: Decimal::ZERO,
296            day_trading_buying_power: total_value,
297            daytrade_count: 0,
298        })
299    }
300
301    async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
302        // For spot trading, positions are based on non-zero balances
303        let balances = self.fetch_balances().await?;
304
305        let positions: Vec<Position> = balances
306            .into_iter()
307            .filter(|(asset, _)| asset != "USD" && asset != "USDT" && asset != "USDC")
308            .map(|(asset, qty)| Position {
309                symbol: Symbol::new(&asset).expect("Invalid symbol from CCXT"),
310                qty: qty.to_string().parse().unwrap_or(0),
311                side: PositionSide::Long,
312                avg_entry_price: Decimal::ONE, // Would need trade history
313                market_value: qty,
314                cost_basis: qty,
315                unrealized_pl: Decimal::ZERO,
316                unrealized_plpc: Decimal::ZERO,
317                current_price: Decimal::ONE,
318                lastday_price: Decimal::ONE,
319                change_today: Decimal::ZERO,
320            })
321            .collect();
322
323        *self.positions.write().await = positions.clone();
324        Ok(positions)
325    }
326
327    async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
328        let endpoint = match self.exchange_config.name.as_str() {
329            "binance" => "/api/v3/order",
330            "coinbase" => "/orders",
331            "kraken" => "/0/private/AddOrder",
332            _ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
333        };
334
335        let mut params = HashMap::new();
336        params.insert("symbol".to_string(), order.symbol.to_string());
337        params.insert("side".to_string(), order.side.to_string().to_uppercase());
338        params.insert("type".to_string(), order.order_type.to_string().to_uppercase());
339        params.insert("quantity".to_string(), order.quantity.to_string());
340
341        if let Some(price) = order.limit_price {
342            params.insert("price".to_string(), price.to_string());
343        }
344
345        let response: Value = self.request(Method::POST, endpoint, Some(params)).await?;
346
347        // Parse order ID from response
348        let order_id = response
349            .get("orderId")
350            .or_else(|| response.get("id"))
351            .and_then(|v| v.as_str())
352            .unwrap_or_default()
353            .to_string();
354
355        Ok(OrderResponse {
356            order_id,
357            client_order_id: Uuid::new_v4().to_string(),
358            status: OrderStatus::Accepted,
359            filled_qty: 0,
360            filled_avg_price: None,
361            submitted_at: Utc::now(),
362            filled_at: None,
363        })
364    }
365
366    async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
367        let endpoint = match self.exchange_config.name.as_str() {
368            "binance" => "/api/v3/order",
369            "coinbase" => &format!("/orders/{}", order_id),
370            "kraken" => "/0/private/CancelOrder",
371            _ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
372        };
373
374        let mut params = HashMap::new();
375        params.insert("orderId".to_string(), order_id.to_string());
376
377        let _: Value = self.request(Method::DELETE, endpoint, Some(params)).await?;
378        Ok(())
379    }
380
381    async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
382        Err(BrokerError::Other(anyhow::anyhow!("Not implemented")))
383    }
384
385    async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
386        Ok(Vec::new())
387    }
388
389    async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
390        match self.fetch_balances().await {
391            Ok(_) => Ok(HealthStatus::Healthy),
392            Err(_) => Ok(HealthStatus::Unhealthy),
393        }
394    }
395}
396
397#[derive(Debug, Clone)]
398struct ExchangeInfo {
399    name: String,
400    base_url: String,
401    testnet_url: Option<String>,
402    has_futures: bool,
403    has_margin: bool,
404    rate_limit: Duration,
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[test]
412    fn test_exchange_config() {
413        let info = CCXTBroker::get_exchange_config("binance").unwrap();
414        assert_eq!(info.name, "binance");
415        assert!(info.has_futures);
416    }
417}