use crate::broker::{
Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
};
use crate::{OrderRequest, OrderResponse, OrderStatus, Symbol};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use reqwest::{Client, Method};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::Sha256;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, error, info};
use uuid::Uuid;
type HmacSha256 = Hmac<Sha256>;
#[derive(Debug, Clone)]
pub struct CCXTConfig {
pub exchange: String,
pub api_key: String,
pub secret: String,
pub password: Option<String>,
pub sandbox: bool,
pub timeout: Duration,
}
pub struct CCXTBroker {
client: Client,
config: CCXTConfig,
exchange_config: ExchangeInfo,
balances: Arc<RwLock<HashMap<String, Decimal>>>,
positions: Arc<RwLock<Vec<Position>>>,
}
impl CCXTBroker {
pub fn new(config: CCXTConfig) -> Result<Self, BrokerError> {
let client = Client::builder()
.timeout(config.timeout)
.build()
.expect("Failed to create HTTP client");
let exchange_config = Self::get_exchange_config(&config.exchange)?;
Ok(Self {
client,
config,
exchange_config,
balances: Arc::new(RwLock::new(HashMap::new())),
positions: Arc::new(RwLock::new(Vec::new())),
})
}
fn get_exchange_config(exchange: &str) -> Result<ExchangeInfo, BrokerError> {
match exchange.to_lowercase().as_str() {
"binance" => Ok(ExchangeInfo {
name: "binance".to_string(),
base_url: "https://api.binance.com".to_string(),
testnet_url: Some("https://testnet.binance.vision".to_string()),
has_futures: true,
has_margin: true,
rate_limit: Duration::from_millis(100),
}),
"coinbase" => Ok(ExchangeInfo {
name: "coinbase".to_string(),
base_url: "https://api.exchange.coinbase.com".to_string(),
testnet_url: Some("https://api-public.sandbox.exchange.coinbase.com".to_string()),
has_futures: false,
has_margin: true,
rate_limit: Duration::from_millis(100),
}),
"kraken" => Ok(ExchangeInfo {
name: "kraken".to_string(),
base_url: "https://api.kraken.com".to_string(),
testnet_url: None,
has_futures: true,
has_margin: true,
rate_limit: Duration::from_millis(100),
}),
_ => Err(BrokerError::InvalidOrder(format!(
"Unsupported exchange: {}",
exchange
))),
}
}
fn base_url(&self) -> &str {
if self.config.sandbox {
self.exchange_config
.testnet_url
.as_ref()
.unwrap_or(&self.exchange_config.base_url)
} else {
&self.exchange_config.base_url
}
}
fn sign_request(&self, message: &str) -> String {
let mut mac = HmacSha256::new_from_slice(self.config.secret.as_bytes())
.expect("HMAC can take key of any size");
mac.update(message.as_bytes());
let result = mac.finalize();
hex::encode(result.into_bytes())
}
async fn request<T: serde::de::DeserializeOwned>(
&self,
method: Method,
endpoint: &str,
params: Option<HashMap<String, String>>,
) -> Result<T, BrokerError> {
let url = format!("{}{}", self.base_url(), endpoint);
let timestamp = Utc::now().timestamp_millis().to_string();
let mut req = self.client.request(method.clone(), &url);
match self.exchange_config.name.as_str() {
"binance" => {
let mut query_params = params.unwrap_or_default();
query_params.insert("timestamp".to_string(), timestamp.clone());
query_params.insert("recvWindow".to_string(), "5000".to_string());
let query_string = serde_urlencoded::to_string(&query_params)
.map_err(|e| BrokerError::Parse(e.to_string()))?;
let signature = self.sign_request(&query_string);
query_params.insert("signature".to_string(), signature);
req = req
.query(&query_params)
.header("X-MBX-APIKEY", &self.config.api_key);
}
"coinbase" => {
let timestamp = Utc::now().timestamp();
let message = format!("{}{}{}", timestamp, method.as_str(), endpoint);
let signature = self.sign_request(&message);
let b64_signature = base64::encode(signature);
req = req
.header("CB-ACCESS-KEY", &self.config.api_key)
.header("CB-ACCESS-SIGN", b64_signature)
.header("CB-ACCESS-TIMESTAMP", timestamp.to_string())
.header("CB-ACCESS-PASSPHRASE", self.config.password.as_ref().unwrap_or(&String::new()));
}
"kraken" => {
let nonce = Utc::now().timestamp_millis().to_string();
let mut post_data = params.unwrap_or_default();
post_data.insert("nonce".to_string(), nonce.clone());
let post_string = serde_urlencoded::to_string(&post_data)
.map_err(|e| BrokerError::Parse(e.to_string()))?;
let message = format!("{}{}{}", nonce, endpoint, post_string);
let signature = self.sign_request(&message);
req = req
.header("API-Key", &self.config.api_key)
.header("API-Sign", signature)
.body(post_string);
}
_ => {}
}
debug!("CCXT API request: {} {}", method, url);
let response = req.send().await?;
if response.status().is_success() {
let result = response.json().await?;
Ok(result)
} else {
let error_text = response.text().await.unwrap_or_default();
error!("CCXT API error: {}", error_text);
Err(BrokerError::Other(anyhow::anyhow!("API error: {}", error_text)))
}
}
async fn fetch_balances(&self) -> Result<HashMap<String, Decimal>, BrokerError> {
let endpoint = match self.exchange_config.name.as_str() {
"binance" => "/api/v3/account",
"coinbase" => "/accounts",
"kraken" => "/0/private/Balance",
_ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
};
let response: Value = self.request(Method::GET, endpoint, None).await?;
let mut balances = HashMap::new();
match self.exchange_config.name.as_str() {
"binance" => {
if let Some(balance_array) = response.get("balances").and_then(|v| v.as_array()) {
for balance in balance_array {
if let (Some(asset), Some(free)) = (
balance.get("asset").and_then(|v| v.as_str()),
balance.get("free").and_then(|v| v.as_str()),
) {
if let Ok(amount) = Decimal::from_str(free) {
if amount > Decimal::ZERO {
balances.insert(asset.to_string(), amount);
}
}
}
}
}
}
"coinbase" => {
if let Some(accounts) = response.as_array() {
for account in accounts {
if let (Some(currency), Some(balance)) = (
account.get("currency").and_then(|v| v.as_str()),
account.get("balance").and_then(|v| v.as_str()),
) {
if let Ok(amount) = Decimal::from_str(balance) {
if amount > Decimal::ZERO {
balances.insert(currency.to_string(), amount);
}
}
}
}
}
}
"kraken" => {
if let Some(result) = response.get("result").and_then(|v| v.as_object()) {
for (asset, amount) in result {
if let Some(amount_str) = amount.as_str() {
if let Ok(amount_dec) = Decimal::from_str(amount_str) {
if amount_dec > Decimal::ZERO {
balances.insert(asset.clone(), amount_dec);
}
}
}
}
}
}
_ => {}
}
*self.balances.write().await = balances.clone();
Ok(balances)
}
}
#[async_trait]
impl BrokerClient for CCXTBroker {
async fn get_account(&self) -> Result<Account, BrokerError> {
let balances = self.fetch_balances().await?;
let total_value = balances
.values()
.fold(Decimal::ZERO, |acc, balance| acc + *balance);
Ok(Account {
account_id: self.config.exchange.clone(),
cash: total_value,
portfolio_value: total_value,
buying_power: total_value,
equity: total_value,
last_equity: total_value,
multiplier: "1".to_string(),
currency: "USD".to_string(),
shorting_enabled: self.exchange_config.has_margin,
long_market_value: total_value,
short_market_value: Decimal::ZERO,
initial_margin: Decimal::ZERO,
maintenance_margin: Decimal::ZERO,
day_trading_buying_power: total_value,
daytrade_count: 0,
})
}
async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
let balances = self.fetch_balances().await?;
let positions: Vec<Position> = balances
.into_iter()
.filter(|(asset, _)| asset != "USD" && asset != "USDT" && asset != "USDC")
.map(|(asset, qty)| Position {
symbol: Symbol::new(&asset).expect("Invalid symbol from CCXT"),
qty: qty.to_string().parse().unwrap_or(0),
side: PositionSide::Long,
avg_entry_price: Decimal::ONE, market_value: qty,
cost_basis: qty,
unrealized_pl: Decimal::ZERO,
unrealized_plpc: Decimal::ZERO,
current_price: Decimal::ONE,
lastday_price: Decimal::ONE,
change_today: Decimal::ZERO,
})
.collect();
*self.positions.write().await = positions.clone();
Ok(positions)
}
async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
let endpoint = match self.exchange_config.name.as_str() {
"binance" => "/api/v3/order",
"coinbase" => "/orders",
"kraken" => "/0/private/AddOrder",
_ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
};
let mut params = HashMap::new();
params.insert("symbol".to_string(), order.symbol.to_string());
params.insert("side".to_string(), order.side.to_string().to_uppercase());
params.insert("type".to_string(), order.order_type.to_string().to_uppercase());
params.insert("quantity".to_string(), order.quantity.to_string());
if let Some(price) = order.limit_price {
params.insert("price".to_string(), price.to_string());
}
let response: Value = self.request(Method::POST, endpoint, Some(params)).await?;
let order_id = response
.get("orderId")
.or_else(|| response.get("id"))
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
Ok(OrderResponse {
order_id,
client_order_id: Uuid::new_v4().to_string(),
status: OrderStatus::Accepted,
filled_qty: 0,
filled_avg_price: None,
submitted_at: Utc::now(),
filled_at: None,
})
}
async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
let endpoint = match self.exchange_config.name.as_str() {
"binance" => "/api/v3/order",
"coinbase" => &format!("/orders/{}", order_id),
"kraken" => "/0/private/CancelOrder",
_ => return Err(BrokerError::InvalidOrder("Unsupported exchange".to_string())),
};
let mut params = HashMap::new();
params.insert("orderId".to_string(), order_id.to_string());
let _: Value = self.request(Method::DELETE, endpoint, Some(params)).await?;
Ok(())
}
async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
Err(BrokerError::Other(anyhow::anyhow!("Not implemented")))
}
async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
Ok(Vec::new())
}
async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
match self.fetch_balances().await {
Ok(_) => Ok(HealthStatus::Healthy),
Err(_) => Ok(HealthStatus::Unhealthy),
}
}
}
#[derive(Debug, Clone)]
struct ExchangeInfo {
name: String,
base_url: String,
testnet_url: Option<String>,
has_futures: bool,
has_margin: bool,
rate_limit: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_exchange_config() {
let info = CCXTBroker::get_exchange_config("binance").unwrap();
assert_eq!(info.name, "binance");
assert!(info.has_futures);
}
}