1use crate::broker::{
11 Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
12};
13use crate::{OrderRequest, OrderResponse, OrderStatus, Symbol};
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use hmac::{Hmac, Mac};
17use reqwest::{Client, Method};
18use rust_decimal::Decimal;
19use serde::{Deserialize, Serialize};
20use serde_json::Value;
21use sha2::Sha256;
22use std::collections::HashMap;
23use std::str::FromStr;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::sync::RwLock;
27use tracing::{debug, error, info};
28use uuid::Uuid;
29
30type HmacSha256 = Hmac<Sha256>;
31
32#[derive(Debug, Clone)]
34pub struct CCXTConfig {
35 pub exchange: String,
37 pub api_key: String,
39 pub secret: String,
41 pub password: Option<String>,
43 pub sandbox: bool,
45 pub timeout: Duration,
47}
48
49pub struct CCXTBroker {
51 client: Client,
52 config: CCXTConfig,
53 exchange_config: ExchangeInfo,
54 balances: Arc<RwLock<HashMap<String, Decimal>>>,
55 positions: Arc<RwLock<Vec<Position>>>,
56}
57
58impl CCXTBroker {
59 pub fn new(config: CCXTConfig) -> Result<Self, BrokerError> {
61 let client = Client::builder()
62 .timeout(config.timeout)
63 .build()
64 .expect("Failed to create HTTP client");
65
66 let exchange_config = Self::get_exchange_config(&config.exchange)?;
67
68 Ok(Self {
69 client,
70 config,
71 exchange_config,
72 balances: Arc::new(RwLock::new(HashMap::new())),
73 positions: Arc::new(RwLock::new(Vec::new())),
74 })
75 }
76
77 fn get_exchange_config(exchange: &str) -> Result<ExchangeInfo, BrokerError> {
79 match exchange.to_lowercase().as_str() {
80 "binance" => Ok(ExchangeInfo {
81 name: "binance".to_string(),
82 base_url: "https://api.binance.com".to_string(),
83 testnet_url: Some("https://testnet.binance.vision".to_string()),
84 has_futures: true,
85 has_margin: true,
86 rate_limit: Duration::from_millis(100),
87 }),
88 "coinbase" => Ok(ExchangeInfo {
89 name: "coinbase".to_string(),
90 base_url: "https://api.exchange.coinbase.com".to_string(),
91 testnet_url: Some("https://api-public.sandbox.exchange.coinbase.com".to_string()),
92 has_futures: false,
93 has_margin: true,
94 rate_limit: Duration::from_millis(100),
95 }),
96 "kraken" => Ok(ExchangeInfo {
97 name: "kraken".to_string(),
98 base_url: "https://api.kraken.com".to_string(),
99 testnet_url: None,
100 has_futures: true,
101 has_margin: true,
102 rate_limit: Duration::from_millis(100),
103 }),
104 _ => Err(BrokerError::InvalidOrder(format!(
105 "Unsupported exchange: {}",
106 exchange
107 ))),
108 }
109 }
110
111 fn base_url(&self) -> &str {
113 if self.config.sandbox {
114 self.exchange_config
115 .testnet_url
116 .as_ref()
117 .unwrap_or(&self.exchange_config.base_url)
118 } else {
119 &self.exchange_config.base_url
120 }
121 }
122
123 fn sign_request(&self, message: &str) -> String {
125 let mut mac = HmacSha256::new_from_slice(self.config.secret.as_bytes())
126 .expect("HMAC can take key of any size");
127 mac.update(message.as_bytes());
128 let result = mac.finalize();
129 hex::encode(result.into_bytes())
130 }
131
132 async fn request<T: serde::de::DeserializeOwned>(
134 &self,
135 method: Method,
136 endpoint: &str,
137 params: Option<HashMap<String, String>>,
138 ) -> Result<T, BrokerError> {
139 let url = format!("{}{}", self.base_url(), endpoint);
140 let timestamp = Utc::now().timestamp_millis().to_string();
141
142 let mut req = self.client.request(method.clone(), &url);
143
144 match self.exchange_config.name.as_str() {
146 "binance" => {
147 let mut query_params = params.unwrap_or_default();
148 query_params.insert("timestamp".to_string(), timestamp.clone());
149 query_params.insert("recvWindow".to_string(), "5000".to_string());
150
151 let query_string = serde_urlencoded::to_string(&query_params)
152 .map_err(|e| BrokerError::Parse(e.to_string()))?;
153 let signature = self.sign_request(&query_string);
154 query_params.insert("signature".to_string(), signature);
155
156 req = req
157 .query(&query_params)
158 .header("X-MBX-APIKEY", &self.config.api_key);
159 }
160 "coinbase" => {
161 let timestamp = Utc::now().timestamp();
162 let message = format!("{}{}{}", timestamp, method.as_str(), endpoint);
163 let signature = self.sign_request(&message);
164 let b64_signature = base64::encode(signature);
165
166 req = req
167 .header("CB-ACCESS-KEY", &self.config.api_key)
168 .header("CB-ACCESS-SIGN", b64_signature)
169 .header("CB-ACCESS-TIMESTAMP", timestamp.to_string())
170 .header("CB-ACCESS-PASSPHRASE", self.config.password.as_ref().unwrap_or(&String::new()));
171 }
172 "kraken" => {
173 let nonce = Utc::now().timestamp_millis().to_string();
175 let mut post_data = params.unwrap_or_default();
176 post_data.insert("nonce".to_string(), nonce.clone());
177
178 let post_string = serde_urlencoded::to_string(&post_data)
179 .map_err(|e| BrokerError::Parse(e.to_string()))?;
180 let message = format!("{}{}{}", nonce, endpoint, post_string);
181 let signature = self.sign_request(&message);
182
183 req = req
184 .header("API-Key", &self.config.api_key)
185 .header("API-Sign", signature)
186 .body(post_string);
187 }
188 _ => {}
189 }
190
191 debug!("CCXT API request: {} {}", method, url);
192
193 let response = req.send().await?;
194
195 if response.status().is_success() {
196 let result = response.json().await?;
197 Ok(result)
198 } else {
199 let error_text = response.text().await.unwrap_or_default();
200 error!("CCXT API error: {}", error_text);
201 Err(BrokerError::Other(anyhow::anyhow!("API error: {}", error_text)))
202 }
203 }
204
205 async fn fetch_balances(&self) -> Result<HashMap<String, Decimal>, BrokerError> {
207 let endpoint = match self.exchange_config.name.as_str() {
208 "binance" => "/api/v3/account",
209 "coinbase" => "/accounts",
210 "kraken" => "/0/private/Balance",
211 _ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
212 };
213
214 let response: Value = self.request(Method::GET, endpoint, None).await?;
215
216 let mut balances = HashMap::new();
217
218 match self.exchange_config.name.as_str() {
220 "binance" => {
221 if let Some(balance_array) = response.get("balances").and_then(|v| v.as_array()) {
222 for balance in balance_array {
223 if let (Some(asset), Some(free)) = (
224 balance.get("asset").and_then(|v| v.as_str()),
225 balance.get("free").and_then(|v| v.as_str()),
226 ) {
227 if let Ok(amount) = Decimal::from_str(free) {
228 if amount > Decimal::ZERO {
229 balances.insert(asset.to_string(), amount);
230 }
231 }
232 }
233 }
234 }
235 }
236 "coinbase" => {
237 if let Some(accounts) = response.as_array() {
238 for account in accounts {
239 if let (Some(currency), Some(balance)) = (
240 account.get("currency").and_then(|v| v.as_str()),
241 account.get("balance").and_then(|v| v.as_str()),
242 ) {
243 if let Ok(amount) = Decimal::from_str(balance) {
244 if amount > Decimal::ZERO {
245 balances.insert(currency.to_string(), amount);
246 }
247 }
248 }
249 }
250 }
251 }
252 "kraken" => {
253 if let Some(result) = response.get("result").and_then(|v| v.as_object()) {
254 for (asset, amount) in result {
255 if let Some(amount_str) = amount.as_str() {
256 if let Ok(amount_dec) = Decimal::from_str(amount_str) {
257 if amount_dec > Decimal::ZERO {
258 balances.insert(asset.clone(), amount_dec);
259 }
260 }
261 }
262 }
263 }
264 }
265 _ => {}
266 }
267
268 *self.balances.write().await = balances.clone();
269 Ok(balances)
270 }
271}
272
273#[async_trait]
274impl BrokerClient for CCXTBroker {
275 async fn get_account(&self) -> Result<Account, BrokerError> {
276 let balances = self.fetch_balances().await?;
277
278 let total_value = balances
279 .values()
280 .fold(Decimal::ZERO, |acc, balance| acc + *balance);
281
282 Ok(Account {
283 account_id: self.config.exchange.clone(),
284 cash: total_value,
285 portfolio_value: total_value,
286 buying_power: total_value,
287 equity: total_value,
288 last_equity: total_value,
289 multiplier: "1".to_string(),
290 currency: "USD".to_string(),
291 shorting_enabled: self.exchange_config.has_margin,
292 long_market_value: total_value,
293 short_market_value: Decimal::ZERO,
294 initial_margin: Decimal::ZERO,
295 maintenance_margin: Decimal::ZERO,
296 day_trading_buying_power: total_value,
297 daytrade_count: 0,
298 })
299 }
300
301 async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
302 let balances = self.fetch_balances().await?;
304
305 let positions: Vec<Position> = balances
306 .into_iter()
307 .filter(|(asset, _)| asset != "USD" && asset != "USDT" && asset != "USDC")
308 .map(|(asset, qty)| Position {
309 symbol: Symbol::new(&asset).expect("Invalid symbol from CCXT"),
310 qty: qty.to_string().parse().unwrap_or(0),
311 side: PositionSide::Long,
312 avg_entry_price: Decimal::ONE, market_value: qty,
314 cost_basis: qty,
315 unrealized_pl: Decimal::ZERO,
316 unrealized_plpc: Decimal::ZERO,
317 current_price: Decimal::ONE,
318 lastday_price: Decimal::ONE,
319 change_today: Decimal::ZERO,
320 })
321 .collect();
322
323 *self.positions.write().await = positions.clone();
324 Ok(positions)
325 }
326
327 async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
328 let endpoint = match self.exchange_config.name.as_str() {
329 "binance" => "/api/v3/order",
330 "coinbase" => "/orders",
331 "kraken" => "/0/private/AddOrder",
332 _ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
333 };
334
335 let mut params = HashMap::new();
336 params.insert("symbol".to_string(), order.symbol.to_string());
337 params.insert("side".to_string(), order.side.to_string().to_uppercase());
338 params.insert("type".to_string(), order.order_type.to_string().to_uppercase());
339 params.insert("quantity".to_string(), order.quantity.to_string());
340
341 if let Some(price) = order.limit_price {
342 params.insert("price".to_string(), price.to_string());
343 }
344
345 let response: Value = self.request(Method::POST, endpoint, Some(params)).await?;
346
347 let order_id = response
349 .get("orderId")
350 .or_else(|| response.get("id"))
351 .and_then(|v| v.as_str())
352 .unwrap_or_default()
353 .to_string();
354
355 Ok(OrderResponse {
356 order_id,
357 client_order_id: Uuid::new_v4().to_string(),
358 status: OrderStatus::Accepted,
359 filled_qty: 0,
360 filled_avg_price: None,
361 submitted_at: Utc::now(),
362 filled_at: None,
363 })
364 }
365
366 async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
367 let endpoint = match self.exchange_config.name.as_str() {
368 "binance" => "/api/v3/order",
369 "coinbase" => &format!("/orders/{}", order_id),
370 "kraken" => "/0/private/CancelOrder",
371 _ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
372 };
373
374 let mut params = HashMap::new();
375 params.insert("orderId".to_string(), order_id.to_string());
376
377 let _: Value = self.request(Method::DELETE, endpoint, Some(params)).await?;
378 Ok(())
379 }
380
381 async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
382 Err(BrokerError::Other(anyhow::anyhow!("Not implemented")))
383 }
384
385 async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
386 Ok(Vec::new())
387 }
388
389 async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
390 match self.fetch_balances().await {
391 Ok(_) => Ok(HealthStatus::Healthy),
392 Err(_) => Ok(HealthStatus::Unhealthy),
393 }
394 }
395}
396
397#[derive(Debug, Clone)]
398struct ExchangeInfo {
399 name: String,
400 base_url: String,
401 testnet_url: Option<String>,
402 has_futures: bool,
403 has_margin: bool,
404 rate_limit: Duration,
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 #[test]
412 fn test_exchange_config() {
413 let info = CCXTBroker::get_exchange_config("binance").unwrap();
414 assert_eq!(info.name, "binance");
415 assert!(info.has_futures);
416 }
417}