use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use reqwest::header::HeaderMap;
use serde_json::{json, Value};
use crate::core::{
HttpClient, Credentials,
ExchangeId, ExchangeType, AccountType, Symbol,
ExchangeError, ExchangeResult,
Price, Kline, Ticker, OrderBook,
Order, OrderSide, OrderType, OrderStatus, Balance, AccountInfo,
Position, FundingRate, TimeInForce,
OrderRequest, CancelRequest, CancelScope,
BalanceQuery, PositionQuery, PositionModification,
OrderHistoryFilter, PlaceOrderResponse, FeeInfo,
UserTrade, UserTradeFilter,
};
use crate::core::{AmendRequest, CancelAllResponse, OrderResult};
use crate::core::types::AlgoOrderResponse;
use crate::core::traits::{
ExchangeIdentity, MarketData, Trading, Account, Positions,
CancelAll, AmendOrder, BatchOrders, FundingHistory,
};
use crate::core::utils::GroupRateLimiter;
use crate::core::utils::precision::PrecisionCache;
use crate::core::types::{ConnectorStats, SymbolInfo, FundingPayment, FundingFilter};
use super::endpoints::{ParadexUrls, ParadexEndpoint, format_symbol, map_kline_resolution};
use super::auth::ParadexAuth;
use super::parser::ParadexParser;
#[cfg(feature = "onchain-starknet")]
use crate::core::chain::StarkNetProvider;
pub struct ParadexConnector {
http: HttpClient,
auth: Arc<ParadexAuth>,
urls: ParadexUrls,
testnet: bool,
rate_limiter: Arc<Mutex<GroupRateLimiter>>,
precision: PrecisionCache,
#[cfg(feature = "onchain-starknet")]
starknet_provider: Option<Arc<StarkNetProvider>>,
}
impl ParadexConnector {
pub async fn new(credentials: Credentials, testnet: bool) -> ExchangeResult<Self> {
let urls = if testnet {
ParadexUrls::TESTNET
} else {
ParadexUrls::MAINNET
};
let http = HttpClient::new(30_000)?;
let auth = Arc::new(ParadexAuth::new(&credentials)?);
let base_url = urls.rest_url();
let url = format!("{}/system/time", base_url);
if let Ok(response) = http.get(&url, &HashMap::new()).await {
if let Some(server_time) = response.get("server_time").and_then(|t| t.as_i64()) {
auth.sync_time(server_time).await;
}
}
let mut rl = GroupRateLimiter::new();
rl.add_group("public", 1500, Duration::from_secs(60));
rl.add_group("orders", 17250, Duration::from_secs(60));
rl.add_group("private_gets", 600, Duration::from_secs(60));
let rate_limiter = Arc::new(Mutex::new(rl));
Ok(Self {
http,
auth,
urls,
testnet,
rate_limiter,
precision: PrecisionCache::new(),
#[cfg(feature = "onchain-starknet")]
starknet_provider: None,
})
}
pub async fn public(testnet: bool) -> ExchangeResult<Self> {
let credentials = Credentials::new("", ""); Self::new(credentials, testnet).await
}
#[cfg(feature = "onchain-starknet")]
pub fn with_starknet_provider(mut self, provider: Arc<StarkNetProvider>) -> Self {
self.starknet_provider = Some(provider);
self
}
#[cfg(feature = "onchain-starknet")]
pub fn starknet_provider(&self) -> Option<&Arc<StarkNetProvider>> {
self.starknet_provider.as_ref()
}
async fn rate_limit_wait(&self, group: &str, weight: u32) {
loop {
let wait_time = {
let mut limiter = self.rate_limiter.lock().expect("Mutex poisoned");
if limiter.try_acquire(group, weight) {
return;
}
limiter.time_until_ready(group, weight)
};
if wait_time > Duration::ZERO {
tokio::time::sleep(wait_time).await;
}
}
}
fn update_rate_from_headers(&self, headers: &HeaderMap, group: &str) {
let remaining = headers
.get("x-ratelimit-remaining")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
let limit = headers
.get("x-ratelimit-limit")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
if let (Some(remaining), Some(limit)) = (remaining, limit) {
let used = limit.saturating_sub(remaining);
if let Ok(mut limiter) = self.rate_limiter.lock() {
limiter.update_from_server(group, used);
}
}
}
async fn get(
&self,
endpoint: ParadexEndpoint,
params: HashMap<String, String>,
) -> ExchangeResult<Value> {
let group = if endpoint.requires_auth() { "private_gets" } else { "public" };
self.rate_limit_wait(group, 1).await;
let base_url = self.urls.rest_url();
let mut path = endpoint.path().to_string();
for (key, value) in ¶ms {
if path.contains(&format!("{{{}}}", key)) {
path = path.replace(&format!("{{{}}}", key), value);
}
}
let query_params: HashMap<_, _> = params.iter()
.filter(|(k, _)| !path.contains(&format!("{{{}}}", k)))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let query = if query_params.is_empty() {
String::new()
} else {
let qs: Vec<String> = query_params.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect();
format!("?{}", qs.join("&"))
};
let url = format!("{}{}{}", base_url, path, query);
let full_path = format!("{}{}", path, query);
let headers = if endpoint.requires_auth() {
self.auth.sign_request("GET", &full_path, "").await?
} else {
HashMap::new()
};
let (response, resp_headers) = self.http.get_with_response_headers(&url, &HashMap::new(), &headers).await?;
self.update_rate_from_headers(&resp_headers, group);
self.check_response(&response)?;
Ok(response)
}
async fn post(
&self,
endpoint: ParadexEndpoint,
body: Value,
) -> ExchangeResult<Value> {
self.rate_limit_wait("orders", 1).await;
let base_url = self.urls.rest_url();
let path = endpoint.path();
let url = format!("{}{}", base_url, path);
let body_str = body.to_string();
let headers = self.auth.sign_request("POST", path, &body_str).await?;
let (response, resp_headers) = self.http.post_with_response_headers(&url, &body, &headers).await?;
self.update_rate_from_headers(&resp_headers, "orders");
self.check_response(&response)?;
Ok(response)
}
async fn delete(
&self,
endpoint: ParadexEndpoint,
path_params: &[(&str, &str)],
) -> ExchangeResult<Value> {
self.rate_limit_wait("orders", 1).await;
let base_url = self.urls.rest_url();
let mut path = endpoint.path().to_string();
for (key, value) in path_params {
path = path.replace(&format!("{{{}}}", key), value);
}
let url = format!("{}{}", base_url, path);
let headers = self.auth.sign_request("DELETE", &path, "").await?;
let (response, resp_headers) = self.http.delete_with_response_headers(&url, &HashMap::new(), &headers).await?;
self.update_rate_from_headers(&resp_headers, "orders");
self.check_response(&response)?;
Ok(response)
}
async fn _put(
&self,
endpoint: ParadexEndpoint,
path_params: &[(&str, &str)],
body: Value,
) -> ExchangeResult<Value> {
self.rate_limit_wait("orders", 1).await;
let base_url = self.urls.rest_url();
let mut path = endpoint.path().to_string();
for (key, value) in path_params {
path = path.replace(&format!("{{{}}}", key), value);
}
let url = format!("{}{}", base_url, path);
let body_str = body.to_string();
let headers = self.auth.sign_request("PUT", &path, &body_str).await?;
let response = self.http.put(&url, &body, &headers).await?;
self.check_response(&response)?;
Ok(response)
}
fn check_response(&self, response: &Value) -> ExchangeResult<()> {
if let Some(error) = response.get("error") {
let code = error.as_str().unwrap_or("UNKNOWN");
let message = response.get("message")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
return Err(ExchangeError::Api {
code: -1,
message: format!("{}: {}", code, message),
});
}
Ok(())
}
pub async fn get_symbols(&self) -> ExchangeResult<Vec<String>> {
let response = self.get(ParadexEndpoint::Markets, HashMap::new()).await?;
ParadexParser::parse_symbols(&response)
}
pub async fn get_markets_summary(&self, market: Option<String>) -> ExchangeResult<Vec<Ticker>> {
let mut params = HashMap::new();
if let Some(m) = market {
params.insert("market".to_string(), m);
} else {
params.insert("market".to_string(), "ALL".to_string());
}
let response = self.get(ParadexEndpoint::MarketsSummary, params).await?;
let results = ParadexParser::extract_results(&response)?;
let arr = results.as_array()
.ok_or_else(|| ExchangeError::Parse("'results' is not an array".to_string()))?;
Ok(arr.iter()
.filter_map(|item| {
let wrapper = json!({"results": [item]});
ParadexParser::parse_ticker(&wrapper).ok()
})
.collect())
}
pub async fn get_account_summary(&self) -> ExchangeResult<Value> {
self.get(ParadexEndpoint::Account, HashMap::new()).await
}
pub async fn cancel_all_orders(&self, symbol: Option<Symbol>) -> ExchangeResult<Value> {
let mut params = HashMap::new();
if let Some(s) = symbol {
params.insert("market".to_string(), format_symbol(&s.base, &s.quote, AccountType::FuturesCross));
}
self.delete(ParadexEndpoint::CancelAllOrders, &[]).await
}
}
impl ExchangeIdentity for ParadexConnector {
fn exchange_id(&self) -> ExchangeId {
ExchangeId::Paradex
}
fn metrics(&self) -> ConnectorStats {
let (http_requests, http_errors, last_latency_ms) = self.http.stats();
let (rate_used, rate_max, rate_groups) = if let Ok(mut limiter) = self.rate_limiter.lock() {
let (u, m) = limiter.primary_stats();
let groups = limiter.all_stats()
.into_iter()
.map(|(name, cur, max)| (name.to_string(), cur, max))
.collect();
(u, m, groups)
} else {
(0, 0, Vec::new())
};
ConnectorStats {
http_requests,
http_errors,
last_latency_ms,
rate_used,
rate_max,
rate_groups,
ws_ping_rtt_ms: 0,
}
}
fn is_testnet(&self) -> bool {
self.testnet
}
fn supported_account_types(&self) -> Vec<AccountType> {
vec![AccountType::FuturesCross] }
fn exchange_type(&self) -> ExchangeType {
ExchangeType::Dex }
}
#[async_trait]
impl MarketData for ParadexConnector {
async fn get_price(&self, symbol: Symbol, account_type: AccountType) -> ExchangeResult<Price> {
let symbol_str = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("market".to_string(), symbol_str);
let response = self.get(ParadexEndpoint::MarketsSummary, params).await?;
Ok(ParadexParser::parse_price(&response)?)
}
async fn get_orderbook(
&self,
symbol: Symbol,
depth: Option<u16>,
account_type: AccountType,
) -> ExchangeResult<OrderBook> {
let symbol_str = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("market".to_string(), symbol_str.clone());
if let Some(d) = depth {
params.insert("depth".to_string(), d.to_string());
}
let response = self.get(ParadexEndpoint::Orderbook, params).await?;
ParadexParser::parse_orderbook(&response)
}
async fn get_klines(
&self,
symbol: Symbol,
interval: &str,
limit: Option<u16>,
account_type: AccountType,
_end_time: Option<i64>,
) -> ExchangeResult<Vec<Kline>> {
let symbol_str = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("symbol".to_string(), symbol_str);
params.insert("resolution".to_string(), map_kline_resolution(interval).to_string());
if let Some(lim) = limit {
params.insert("limit".to_string(), lim.to_string());
}
let response = self.get(ParadexEndpoint::Klines, params).await?;
ParadexParser::parse_klines(&response)
}
async fn get_ticker(&self, symbol: Symbol, account_type: AccountType) -> ExchangeResult<Ticker> {
let symbol_str = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("market".to_string(), symbol_str);
let response = self.get(ParadexEndpoint::MarketsSummary, params).await?;
ParadexParser::parse_ticker(&response)
}
async fn ping(&self) -> ExchangeResult<()> {
let response = self.get(ParadexEndpoint::SystemState, HashMap::new()).await?;
if let Some(status) = response.get("status").and_then(|s| s.as_str()) {
if status == "operational" {
return Ok(());
}
}
Err(ExchangeError::Network("System not operational".to_string()))
}
async fn get_exchange_info(&self, account_type: AccountType) -> ExchangeResult<Vec<SymbolInfo>> {
let response = self.get(ParadexEndpoint::Markets, HashMap::new()).await?;
let results = response
.get("results")
.and_then(|v| v.as_array())
.ok_or_else(|| ExchangeError::Parse("Missing 'results' array in markets response".to_string()))?;
let infos: Vec<SymbolInfo> = results.iter().filter_map(|item| {
let sym = item.get("symbol").and_then(|v| v.as_str())?.to_string();
let base = item.get("base_currency")
.and_then(|v| v.as_str())
.unwrap_or_else(|| {
sym.split('-').next().unwrap_or(&sym)
})
.to_string();
let quote = item.get("quote_currency")
.and_then(|v| v.as_str())
.unwrap_or_else(|| {
let parts: Vec<&str> = sym.splitn(3, '-').collect();
if parts.len() > 1 { parts[1] } else { "USD" }
})
.to_string();
let status = item.get("status")
.and_then(|v| v.as_str())
.unwrap_or("TRADING")
.to_string();
let tick_size = item.get("price_tick_size")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok());
let step_size = item.get("order_size_increment")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok());
let min_notional = item.get("min_notional")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok());
let max_quantity = item.get("max_order_size")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok());
Some(SymbolInfo {
symbol: sym,
base_asset: base,
quote_asset: quote,
status,
price_precision: 8,
quantity_precision: 8,
min_quantity: None,
max_quantity,
tick_size,
step_size,
min_notional,
account_type,
})
}).collect();
self.precision.load_from_symbols(&infos);
Ok(infos)
}
}
#[async_trait]
impl Trading for ParadexConnector {
async fn place_order(&self, req: OrderRequest) -> ExchangeResult<PlaceOrderResponse> {
let symbol = req.symbol.clone();
let side = req.side;
let quantity = req.quantity;
let account_type = req.account_type;
let symbol_str = format_symbol(&symbol.base, &symbol.quote, account_type);
let side_str = match side {
OrderSide::Buy => "BUY",
OrderSide::Sell => "SELL",
};
match req.order_type {
OrderType::Market => {
let body = json!({
"market": symbol_str,
"side": side_str,
"type": "MARKET",
"size": self.precision.qty(&symbol_str, quantity),
"instruction": "IOC",
});
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::Limit { price } => {
let body = json!({
"market": symbol_str,
"side": side_str,
"type": "LIMIT",
"price": self.precision.price(&symbol_str, price),
"size": self.precision.qty(&symbol_str, quantity),
"instruction": "GTC",
});
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::PostOnly { price } => {
let body = json!({
"market": symbol_str,
"side": side_str,
"type": "LIMIT",
"price": self.precision.price(&symbol_str, price),
"size": self.precision.qty(&symbol_str, quantity),
"instruction": "POST_ONLY",
});
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::Ioc { price } => {
let mut body = json!({
"market": symbol_str,
"side": side_str,
"type": "MARKET",
"size": self.precision.qty(&symbol_str, quantity),
"instruction": "IOC",
});
if let Some(p) = price {
body["type"] = json!("LIMIT");
body["price"] = json!(self.precision.price(&symbol_str, p));
}
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::Fok { price } => {
let body = json!({
"market": symbol_str,
"side": side_str,
"type": "LIMIT",
"price": self.precision.price(&symbol_str, price),
"size": self.precision.qty(&symbol_str, quantity),
"instruction": "FOK",
});
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::StopMarket { stop_price } => {
let body = json!({
"market": symbol_str,
"side": side_str,
"type": "STOP_MARKET",
"trigger_price": self.precision.price(&symbol_str, stop_price),
"size": self.precision.qty(&symbol_str, quantity),
"instruction": "IOC",
});
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::StopLimit { stop_price, limit_price } => {
let body = json!({
"market": symbol_str,
"side": side_str,
"type": "STOP_LIMIT",
"trigger_price": self.precision.price(&symbol_str, stop_price),
"price": self.precision.price(&symbol_str, limit_price),
"size": self.precision.qty(&symbol_str, quantity),
"instruction": "GTC",
});
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::ReduceOnly { price } => {
let (order_type_str, price_val) = match price {
Some(p) => ("LIMIT", p),
None => ("MARKET", 0.0),
};
let mut body = json!({
"market": symbol_str,
"side": side_str,
"type": order_type_str,
"size": self.precision.qty(&symbol_str, quantity),
"instruction": if price.is_some() { "GTC" } else { "IOC" },
"reduce_only": true,
});
if price.is_some() {
body["price"] = json!(self.precision.price(&symbol_str, price_val));
}
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::Gtd { price, expire_time } => {
let body = json!({
"market": symbol_str,
"side": side_str,
"type": "LIMIT",
"price": self.precision.price(&symbol_str, price),
"size": self.precision.qty(&symbol_str, quantity),
"instruction": "GTC",
"expiry": expire_time,
});
let response = self.post(ParadexEndpoint::CreateOrder, body).await?;
ParadexParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::Twap { duration_seconds, .. } => {
let body = json!({
"market": symbol_str,
"side": side_str,
"size": self.precision.qty(&symbol_str, quantity),
"algo_type": "TWAP",
"duration": duration_seconds.clamp(30, 86400),
});
let response = self.post(ParadexEndpoint::CreateAlgoOrder, body).await?;
let algo_id = response
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let status = response
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("RUNNING")
.to_string();
Ok(PlaceOrderResponse::Algo(AlgoOrderResponse {
algo_id,
status,
executed_count: None,
total_count: None,
}))
}
_ => Err(ExchangeError::UnsupportedOperation(
format!("{:?} order type not supported on Paradex", req.order_type)
)),
}
}
async fn get_order_history(
&self,
filter: OrderHistoryFilter,
_account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
let mut params = HashMap::new();
if let Some(symbol) = &filter.symbol {
params.insert("market".to_string(), format_symbol(&symbol.base, &symbol.quote, AccountType::FuturesCross));
}
if let Some(limit) = filter.limit {
params.insert("page_size".to_string(), limit.to_string());
}
if let Some(start) = filter.start_time {
params.insert("start_unix_timestamp".to_string(), (start / 1000).to_string());
}
if let Some(end) = filter.end_time {
params.insert("end_unix_timestamp".to_string(), (end / 1000).to_string());
}
let response = self.get(ParadexEndpoint::OrdersHistory, params).await?;
ParadexParser::parse_orders(&response)
}
async fn cancel_order(&self, req: CancelRequest) -> ExchangeResult<Order> {
match req.scope {
CancelScope::Single { ref order_id } => {
self.delete(ParadexEndpoint::CancelOrder, &[("order_id", order_id)]).await?;
Ok(Order {
id: order_id.to_string(),
client_order_id: None,
symbol: req.symbol
.as_ref()
.map(|s| s.to_string())
.unwrap_or_default(),
side: OrderSide::Buy,
order_type: OrderType::Limit { price: 0.0 },
status: OrderStatus::Canceled,
price: None,
stop_price: None,
quantity: 0.0,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: Some(crate::core::timestamp_millis() as i64),
time_in_force: TimeInForce::Gtc,
})
}
CancelScope::All { ref symbol } => {
self.cancel_all_orders(symbol.clone()).await?;
Ok(Order {
id: "cancel-all".to_string(),
client_order_id: None,
symbol: String::new(),
side: OrderSide::Buy,
order_type: OrderType::Market,
status: OrderStatus::Canceled,
price: None,
stop_price: None,
quantity: 0.0,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: Some(crate::core::timestamp_millis() as i64),
time_in_force: TimeInForce::Gtc,
})
}
CancelScope::BySymbol { ref symbol } => {
self.cancel_all_orders(Some(symbol.clone())).await?;
Ok(Order {
id: "cancel-by-symbol".to_string(),
client_order_id: None,
symbol: symbol.to_string(),
side: OrderSide::Buy,
order_type: OrderType::Market,
status: OrderStatus::Canceled,
price: None,
stop_price: None,
quantity: 0.0,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: Some(crate::core::timestamp_millis() as i64),
time_in_force: TimeInForce::Gtc,
})
}
CancelScope::Batch { .. } => Err(ExchangeError::UnsupportedOperation(
"Batch cancel not supported on Paradex; use CancelAll/BySymbol instead".to_string()
)),
CancelScope::ByLabel(_) | CancelScope::ByCurrencyKind { .. } | CancelScope::ScheduledAt(_) => {
Err(ExchangeError::UnsupportedOperation(
"ByLabel/ByCurrencyKind/ScheduledAt cancel scopes not supported on Paradex".to_string()
))
}
}
}
async fn get_order(
&self,
_symbol: &str,
order_id: &str,
_account_type: AccountType,
) -> ExchangeResult<Order> {
let _symbol_parts: Vec<&str> = _symbol.split('/').collect();
let _symbol = if _symbol_parts.len() == 2 {
crate::core::Symbol::new(_symbol_parts[0], _symbol_parts[1])
} else {
crate::core::Symbol { base: _symbol.to_string(), quote: String::new(), raw: Some(_symbol.to_string()) }
};
let mut params = HashMap::new();
params.insert("order_id".to_string(), order_id.to_string());
let response = self.get(ParadexEndpoint::GetOrder, params).await?;
ParadexParser::parse_order(&response)
}
async fn get_open_orders(&self, symbol: Option<&str>, account_type: AccountType) -> ExchangeResult<Vec<Order>> {
let mut params = HashMap::new();
if let Some(s) = symbol {
let parts: Vec<&str> = s.split('/').collect();
let symbol_str = if parts.len() == 2 {
format_symbol(parts[0], parts[1], account_type)
} else {
s.to_string()
};
params.insert("market".to_string(), symbol_str);
}
let response = self.get(ParadexEndpoint::OpenOrders, params).await?;
ParadexParser::parse_orders(&response)
}
async fn get_user_trades(
&self,
filter: UserTradeFilter,
_account_type: AccountType,
) -> ExchangeResult<Vec<UserTrade>> {
let mut params = HashMap::new();
if let Some(sym) = &filter.symbol {
let market = if sym.contains('/') {
let parts: Vec<&str> = sym.splitn(2, '/').collect();
format!("{}-{}-PERP", parts[0].to_uppercase(), parts[1].to_uppercase())
} else {
sym.clone()
};
params.insert("market".to_string(), market);
}
if let Some(order_id) = &filter.order_id {
params.insert("order_id".to_string(), order_id.clone());
}
if let Some(start_ms) = filter.start_time {
params.insert("start_unix_timestamp".to_string(), (start_ms / 1000).to_string());
}
if let Some(end_ms) = filter.end_time {
params.insert("end_unix_timestamp".to_string(), (end_ms / 1000).to_string());
}
if let Some(limit) = filter.limit {
params.insert("page_size".to_string(), limit.min(100).to_string());
}
let response = self.get(ParadexEndpoint::Fills, params).await?;
ParadexParser::parse_fills(&response)
}
}
#[async_trait]
impl Account for ParadexConnector {
async fn get_balance(&self, query: BalanceQuery) -> ExchangeResult<Vec<Balance>> {
let _asset = query.asset.clone();
let _account_type = query.account_type;
let response = self.get(ParadexEndpoint::Balances, HashMap::new()).await?;
ParadexParser::parse_balances(&response)
}
async fn get_account_info(&self, _account_type: AccountType) -> ExchangeResult<AccountInfo> {
let response = self.get(ParadexEndpoint::Account, HashMap::new()).await?;
ParadexParser::parse_account_info(&response)
}
async fn get_fees(&self, symbol: Option<&str>) -> ExchangeResult<FeeInfo> {
let mut params = HashMap::new();
if let Some(sym) = symbol {
params.insert("market".to_string(), sym.to_string());
}
let response = self.get(ParadexEndpoint::Markets, params).await?;
let results = response.get("results")
.and_then(|r| r.as_array())
.and_then(|arr| arr.first())
.cloned();
let (maker_rate, taker_rate) = if let Some(market) = results {
let fee_config = market.get("fee_config");
let api_fees = fee_config.and_then(|fc| fc.get("api_fees"));
let maker = api_fees
.and_then(|af| af.get("maker"))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let taker = api_fees
.and_then(|af| af.get("taker"))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0003);
(maker, taker)
} else {
(0.0, 0.0003)
};
Ok(FeeInfo {
maker_rate,
taker_rate,
symbol: symbol.map(|s| s.to_string()),
tier: None,
})
}
}
#[async_trait]
impl Positions for ParadexConnector {
async fn get_positions(&self, query: PositionQuery) -> ExchangeResult<Vec<Position>> {
let mut params = HashMap::new();
if let Some(symbol) = &query.symbol {
params.insert("market".to_string(), format_symbol(&symbol.base, &symbol.quote, query.account_type));
}
let response = self.get(ParadexEndpoint::Positions, params).await?;
ParadexParser::parse_positions(&response)
}
async fn get_funding_rate(
&self,
symbol: &str,
_account_type: AccountType,
) -> ExchangeResult<FundingRate> {
let mut params = HashMap::new();
params.insert("market".to_string(), symbol.to_string());
let response = self.get(ParadexEndpoint::MarketsSummary, params).await?;
let mut rate = ParadexParser::parse_funding_rate(&response)?;
rate.symbol = symbol.to_string();
Ok(rate)
}
async fn modify_position(&self, req: PositionModification) -> ExchangeResult<()> {
match req {
PositionModification::ClosePosition { symbol, account_type } => {
let symbol_str = format_symbol(&symbol.base, &symbol.quote, account_type);
let body = json!({
"market": symbol_str,
"type": "MARKET",
"instruction": "REDUCE_ONLY",
});
let _ = self.post(ParadexEndpoint::CreateOrder, body).await?;
Ok(())
}
PositionModification::SetTpSl { symbol, take_profit, stop_loss, account_type } => {
let symbol_str = format_symbol(&symbol.base, &symbol.quote, account_type);
if let Some(tp) = take_profit {
let body = json!({
"market": symbol_str,
"type": "TAKE_PROFIT_MARKET",
"trigger_price": self.precision.price(&symbol_str, tp),
"instruction": "IOC",
"reduce_only": true,
});
let _ = self.post(ParadexEndpoint::CreateOrder, body).await?;
}
if let Some(sl) = stop_loss {
let body = json!({
"market": symbol_str,
"type": "STOP_MARKET",
"trigger_price": self.precision.price(&symbol_str, sl),
"instruction": "IOC",
"reduce_only": true,
});
let _ = self.post(ParadexEndpoint::CreateOrder, body).await?;
}
Ok(())
}
PositionModification::SetLeverage { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Paradex manages leverage automatically based on margin mode".to_string()
))
}
PositionModification::SetMarginMode { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Paradex uses cross-margin by default; isolated margin is per-market configuration".to_string()
))
}
PositionModification::AddMargin { .. } | PositionModification::RemoveMargin { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Paradex uses auto-margin management; manual margin add/remove not supported".to_string()
))
}
PositionModification::SwitchPositionMode { .. } | PositionModification::MovePositions { .. } => {
Err(ExchangeError::UnsupportedOperation(
"SwitchPositionMode/MovePositions not supported on Paradex".to_string()
))
}
}
}
}
#[async_trait]
impl CancelAll for ParadexConnector {
async fn cancel_all_orders(
&self,
scope: CancelScope,
_account_type: AccountType,
) -> ExchangeResult<CancelAllResponse> {
match scope {
CancelScope::All { .. } => {
self.delete(ParadexEndpoint::CancelAllOrders, &[]).await?;
Ok(CancelAllResponse {
cancelled_count: 0, failed_count: 0,
details: Vec::new(),
})
}
CancelScope::BySymbol { ref symbol } => {
self.rate_limit_wait("orders", 1).await;
let base_url = self.urls.rest_url();
let market_str = format_symbol(&symbol.base, &symbol.quote, AccountType::FuturesCross);
let path = format!("{}?market={}", ParadexEndpoint::CancelAllOrders.path(), market_str);
let url = format!("{}{}", base_url, path);
let headers = self.auth.sign_request("DELETE", &path, "").await?;
let (response, resp_headers) = self.http
.delete_with_response_headers(&url, &std::collections::HashMap::new(), &headers)
.await?;
self.update_rate_from_headers(&resp_headers, "orders");
self.check_response(&response)?;
Ok(CancelAllResponse {
cancelled_count: 0,
failed_count: 0,
details: Vec::new(),
})
}
_ => Err(ExchangeError::UnsupportedOperation(
"CancelAll only accepts CancelScope::All or CancelScope::BySymbol".to_string(),
)),
}
}
}
#[async_trait]
impl AmendOrder for ParadexConnector {
async fn amend_order(&self, req: AmendRequest) -> ExchangeResult<Order> {
let mut body = json!({});
let amend_symbol_str = format_symbol(&req.symbol.base, &req.symbol.quote, req.account_type);
if let Some(price) = req.fields.price {
body["price"] = json!(self.precision.price(&amend_symbol_str, price));
}
if let Some(qty) = req.fields.quantity {
body["size"] = json!(self.precision.qty(&amend_symbol_str, qty));
}
if let Some(trigger) = req.fields.trigger_price {
body["trigger_price"] = json!(self.precision.price(&amend_symbol_str, trigger));
}
let response = self._put(
ParadexEndpoint::ModifyOrder,
&[("order_id", &req.order_id)],
body,
)
.await?;
ParadexParser::parse_order(&response)
}
}
#[async_trait]
impl BatchOrders for ParadexConnector {
async fn place_orders_batch(
&self,
orders: Vec<OrderRequest>,
) -> ExchangeResult<Vec<OrderResult>> {
let order_jsons: Vec<serde_json::Value> = orders.iter().map(|req| {
let symbol_str = format_symbol(&req.symbol.base, &req.symbol.quote, req.account_type);
let side_str = match req.side {
OrderSide::Buy => "BUY",
OrderSide::Sell => "SELL",
};
match &req.order_type {
OrderType::Market => json!({
"market": symbol_str,
"side": side_str,
"type": "MARKET",
"size": self.precision.qty(&symbol_str, req.quantity),
"instruction": "IOC",
}),
OrderType::Limit { price } => json!({
"market": symbol_str,
"side": side_str,
"type": "LIMIT",
"price": self.precision.price(&symbol_str, *price),
"size": self.precision.qty(&symbol_str, req.quantity),
"instruction": "GTC",
}),
OrderType::PostOnly { price } => json!({
"market": symbol_str,
"side": side_str,
"type": "LIMIT",
"price": self.precision.price(&symbol_str, *price),
"size": self.precision.qty(&symbol_str, req.quantity),
"instruction": "POST_ONLY",
}),
OrderType::Ioc { price } => {
let mut o = json!({
"market": symbol_str,
"side": side_str,
"type": "MARKET",
"size": self.precision.qty(&symbol_str, req.quantity),
"instruction": "IOC",
});
if let Some(p) = price {
o["type"] = json!("LIMIT");
o["price"] = json!(self.precision.price(&symbol_str, *p));
}
o
}
OrderType::Fok { price } => json!({
"market": symbol_str,
"side": side_str,
"type": "LIMIT",
"price": self.precision.price(&symbol_str, *price),
"size": self.precision.qty(&symbol_str, req.quantity),
"instruction": "FOK",
}),
_ => json!({
"market": symbol_str,
"side": side_str,
"type": "MARKET",
"size": self.precision.qty(&symbol_str, req.quantity),
"instruction": "IOC",
}),
}
}).collect();
let body = json!({ "orders": order_jsons });
let response = self.post(ParadexEndpoint::CreateOrderBatch, body).await?;
let results_val = response.get("results")
.and_then(|r| r.as_array())
.ok_or_else(|| ExchangeError::Parse("Missing 'results' in batch place response".to_string()))?;
let results: Vec<OrderResult> = results_val.iter().zip(orders.iter()).map(|(item, req)| {
if let Some(err_msg) = item.get("error").and_then(|e| e.as_str()) {
OrderResult {
order: None,
client_order_id: req.client_order_id.clone(),
success: false,
error: Some(err_msg.to_string()),
error_code: item.get("error_code").and_then(|c| c.as_i64()).map(|c| c as i32),
}
} else {
match ParadexParser::parse_order(item) {
Ok(order) => OrderResult {
order: Some(order),
client_order_id: req.client_order_id.clone(),
success: true,
error: None,
error_code: None,
},
Err(e) => OrderResult {
order: None,
client_order_id: req.client_order_id.clone(),
success: false,
error: Some(e.to_string()),
error_code: None,
},
}
}
}).collect();
Ok(results)
}
async fn cancel_orders_batch(
&self,
order_ids: Vec<String>,
_symbol: Option<&str>,
_account_type: AccountType,
) -> ExchangeResult<Vec<OrderResult>> {
self.rate_limit_wait("orders", 1).await;
let base_url = self.urls.rest_url();
let path = ParadexEndpoint::CancelOrderBatch.path();
let url = format!("{}{}", base_url, path);
let body = json!({ "ids": order_ids });
let body_str = body.to_string();
let headers = self.auth.sign_request("DELETE", path, &body_str).await?;
let response = self.http
.delete_with_body(&url, &body, &headers)
.await?;
self.check_response(&response)?;
let cancelled: Vec<String> = response.get("cancelled")
.and_then(|c| c.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect())
.unwrap_or_default();
let failed: Vec<String> = response.get("failed")
.and_then(|f| f.as_array())
.map(|arr| arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect())
.unwrap_or_default();
let results: Vec<OrderResult> = order_ids.iter().map(|oid| {
if cancelled.contains(oid) {
OrderResult {
order: Some(Order {
id: oid.clone(),
client_order_id: None,
symbol: String::new(),
side: OrderSide::Buy,
order_type: OrderType::Limit { price: 0.0 },
status: OrderStatus::Canceled,
price: None,
stop_price: None,
quantity: 0.0,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: Some(crate::core::timestamp_millis() as i64),
time_in_force: TimeInForce::Gtc,
}),
client_order_id: None,
success: true,
error: None,
error_code: None,
}
} else if failed.contains(oid) {
OrderResult {
order: None,
client_order_id: None,
success: false,
error: Some(format!("Failed to cancel order {}", oid)),
error_code: None,
}
} else {
OrderResult {
order: None,
client_order_id: None,
success: false,
error: Some(format!("Order {} not in cancel response", oid)),
error_code: None,
}
}
}).collect();
Ok(results)
}
fn max_batch_place_size(&self) -> usize {
10 }
fn max_batch_cancel_size(&self) -> usize {
100 }
}
#[async_trait]
impl FundingHistory for ParadexConnector {
async fn get_funding_payments(
&self,
filter: FundingFilter,
_account_type: AccountType,
) -> ExchangeResult<Vec<FundingPayment>> {
let mut params = HashMap::new();
if let Some(sym) = &filter.symbol {
params.insert("market".to_string(), sym.clone());
}
if let Some(limit) = filter.limit {
params.insert("page_size".to_string(), limit.to_string());
}
let response = self.get(ParadexEndpoint::FundingPayments, params).await?;
ParadexParser::parse_funding_payments(&response)
}
}