use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use serde_json::{json, Value};
use crate::core::{
HttpClient, Credentials,
ExchangeId, ExchangeType, AccountType, Symbol,
ExchangeError, ExchangeResult,
Price, Kline, Ticker, OrderBook,
Order, OrderSide, OrderType, Balance, AccountInfo,
Position, FundingRate,
OrderRequest, CancelRequest, CancelScope,
BalanceQuery, PositionQuery, PositionModification,
OrderHistoryFilter, PlaceOrderResponse, FeeInfo,
AmendRequest, CancelAllResponse, OrderResult,
MarketDataCapabilities, TradingCapabilities, AccountCapabilities,
};
use crate::core::types::{
WithdrawRequest, WithdrawResponse, DepositAddress,
FundsHistoryFilter, FundsRecord, FundsRecordType,
SubAccountOperation, SubAccountResult,
UserTrade, UserTradeFilter,
FundingPayment, FundingFilter, LedgerEntry, LedgerFilter,
};
use crate::core::types::SymbolInfo;
use crate::core::traits::{
ExchangeIdentity, MarketData, Trading, Account, Positions,
CancelAll, AmendOrder, BatchOrders, CustodialFunds, SubAccounts,
FundingHistory, AccountLedger,
};
use crate::core::types::ConnectorStats;
use crate::core::utils::{RuntimeLimiter, RateLimitMonitor, RateLimitPressure};
use crate::core::types::{RateLimitCapabilities, LimitModel, RestLimitPool, WsLimits, EndpointWeight, DecayingLimitConfig, OrderbookCapabilities, WsBookChannel, ChecksumInfo, ChecksumAlgorithm};
use crate::core::utils::precision::PrecisionCache;
use super::endpoints::{KrakenUrls, KrakenEndpoint, format_symbol, map_ohlc_interval};
use super::auth::KrakenAuth;
use super::parser::KrakenParser;
static KRAKEN_RATE_CAPS: RateLimitCapabilities = RateLimitCapabilities {
model: LimitModel::Decaying,
rest_pools: &[] as &[RestLimitPool],
decaying: Some(DecayingLimitConfig {
max_counter: 15.0,
decay_rate_per_sec: 0.33,
default_cost: 1.0,
}),
endpoint_weights: &[] as &[EndpointWeight],
ws: WsLimits {
max_connections: Some(150),
max_subs_per_conn: None,
max_msg_per_sec: None,
max_streams_per_conn: None,
},
};
pub struct KrakenConnector {
http: HttpClient,
auth: Option<KrakenAuth>,
urls: KrakenUrls,
testnet: bool,
limiter: Arc<Mutex<RuntimeLimiter>>,
monitor: Arc<Mutex<RateLimitMonitor>>,
precision: PrecisionCache,
}
impl KrakenConnector {
pub async fn new(credentials: Option<Credentials>, testnet: bool) -> ExchangeResult<Self> {
let urls = if testnet {
KrakenUrls::TESTNET
} else {
KrakenUrls::MAINNET
};
let http = HttpClient::new(30_000)?;
let auth = credentials
.as_ref()
.map(KrakenAuth::new)
.transpose()?;
let limiter = Arc::new(Mutex::new(RuntimeLimiter::from_caps(&KRAKEN_RATE_CAPS)));
let monitor = Arc::new(Mutex::new(RateLimitMonitor::new("Kraken")));
Ok(Self {
http,
auth,
urls,
testnet,
limiter,
monitor,
precision: PrecisionCache::new(),
})
}
pub async fn public(testnet: bool) -> ExchangeResult<Self> {
Self::new(None, testnet).await
}
async fn rate_limit_wait(&self, essential: bool) -> bool {
loop {
let wait_time = {
let mut limiter = self.limiter.lock().expect("limiter poisoned");
let pressure = self.monitor.lock().expect("monitor poisoned").check(&mut limiter);
if pressure >= RateLimitPressure::Cutoff && !essential {
return false;
}
if limiter.try_acquire("default", 1) {
return true;
}
limiter.time_until_ready("default", 1)
};
if wait_time > Duration::ZERO {
tokio::time::sleep(wait_time).await;
}
}
}
async fn get(
&self,
endpoint: KrakenEndpoint,
params: HashMap<String, String>,
account_type: AccountType,
) -> ExchangeResult<Value> {
if !self.rate_limit_wait(false).await {
return Err(ExchangeError::RateLimitExceeded {
retry_after: None,
message: "Rate limit budget >= 90% used; market data request dropped".to_string(),
});
}
let base_url = self.urls.rest_url(account_type);
let path = endpoint.path();
let query = if params.is_empty() {
String::new()
} else {
let qs: Vec<String> = params.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect();
format!("?{}", qs.join("&"))
};
let url = format!("{}{}{}", base_url, path, query);
let response = self.http.get(&url, &HashMap::new()).await?;
Ok(response)
}
async fn post(
&self,
endpoint: KrakenEndpoint,
params: HashMap<String, String>,
account_type: AccountType,
) -> ExchangeResult<Value> {
self.rate_limit_wait(true).await;
let base_url = self.urls.rest_url(account_type);
let path = endpoint.path();
if endpoint.requires_auth() {
let auth = self.auth.as_ref()
.ok_or_else(|| ExchangeError::Auth("Authentication required".to_string()))?;
let (headers, _body_str) = auth.sign_request(path, ¶ms);
let url = format!("{}{}", base_url, path);
self.http.post_with_params(&url, ¶ms, &json!({}), &headers).await
} else {
let url = format!("{}{}", base_url, path);
self.http.post_with_params(&url, ¶ms, &json!({}), &HashMap::new()).await
}
}
pub async fn get_asset_pairs(&self) -> ExchangeResult<Value> {
self.get(KrakenEndpoint::SpotAssetPairs, HashMap::new(), AccountType::Spot).await
}
pub async fn get_ws_token(&self) -> ExchangeResult<String> {
let response = self.post(
KrakenEndpoint::SpotWebSocketToken,
HashMap::new(),
AccountType::Spot,
).await?;
let result = KrakenParser::extract_result(&response)?;
result.get("token")
.and_then(|t| t.as_str())
.map(String::from)
.ok_or_else(|| ExchangeError::Parse("Missing WebSocket token".to_string()))
}
pub async fn get_trades_history(
&self,
trade_type: Option<&str>,
start: Option<i64>,
end: Option<i64>,
offset: Option<u32>,
) -> ExchangeResult<Value> {
let mut params = HashMap::new();
if let Some(t) = trade_type {
params.insert("type".to_string(), t.to_string());
}
if let Some(s) = start {
params.insert("start".to_string(), s.to_string());
}
if let Some(e) = end {
params.insert("end".to_string(), e.to_string());
}
if let Some(o) = offset {
params.insert("ofs".to_string(), o.to_string());
}
self.post(KrakenEndpoint::TradesHistory, params, AccountType::Spot).await
}
}
impl ExchangeIdentity for KrakenConnector {
fn exchange_id(&self) -> ExchangeId {
ExchangeId::Kraken
}
fn metrics(&self) -> ConnectorStats {
let (http_requests, http_errors, last_latency_ms) = self.http.stats();
let (rate_used, rate_max) = if let Ok(mut limiter) = self.limiter.lock() {
limiter.primary_stats()
} else {
(0, 0)
};
ConnectorStats {
http_requests,
http_errors,
last_latency_ms,
rate_used,
rate_max,
rate_groups: Vec::new(),
ws_ping_rtt_ms: 0,
}
}
fn rate_limit_capabilities(&self) -> RateLimitCapabilities {
KRAKEN_RATE_CAPS
}
fn is_testnet(&self) -> bool {
self.testnet
}
fn supported_account_types(&self) -> Vec<AccountType> {
vec![
AccountType::Spot,
AccountType::Margin,
AccountType::FuturesCross,
]
}
fn exchange_type(&self) -> ExchangeType {
ExchangeType::Cex
}
fn orderbook_capabilities(&self, account_type: AccountType) -> OrderbookCapabilities {
static SPOT_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::delta("book", None, None),
];
match account_type {
AccountType::Spot => OrderbookCapabilities {
ws_depths: &[10, 25, 100, 500, 1000],
ws_default_depth: Some(10),
rest_max_depth: Some(500),
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[],
default_speed_ms: None,
ws_channels: SPOT_CHANNELS,
checksum: Some(ChecksumInfo {
algorithm: ChecksumAlgorithm::Crc32KrakenFormat,
levels_per_side: 10,
opt_in: false,
}),
has_sequence: false,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
},
_ => OrderbookCapabilities {
ws_depths: &[],
ws_default_depth: None,
rest_max_depth: None,
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[],
default_speed_ms: None,
ws_channels: &[],
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
},
}
}
}
#[async_trait]
impl MarketData for KrakenConnector {
async fn get_price(
&self,
symbol: Symbol,
account_type: AccountType,
) -> ExchangeResult<Price> {
let formatted = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("pair".to_string(), formatted.clone());
let response = self.get(KrakenEndpoint::SpotTicker, params, account_type).await?;
KrakenParser::parse_price(&response, &formatted)
.or_else(|_| {
let full_format = if formatted.starts_with("XBT")
|| formatted.starts_with("ETH")
|| formatted.starts_with("LTC") {
format!("X{}", formatted)
} else {
formatted.clone()
};
let full_format = if full_format.ends_with("USD") {
format!("{}Z{}", &full_format[..full_format.len()-3], "USD")
} else {
full_format
};
KrakenParser::parse_price(&response, &full_format)
})
}
async fn get_orderbook(
&self,
symbol: Symbol,
depth: Option<u16>,
account_type: AccountType,
) -> ExchangeResult<OrderBook> {
let formatted = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("pair".to_string(), formatted.clone());
if let Some(d) = depth {
params.insert("count".to_string(), d.to_string());
}
let response = self.get(KrakenEndpoint::SpotOrderbook, params, account_type).await?;
KrakenParser::parse_orderbook(&response, &formatted)
.or_else(|_| {
let full_format = Self::to_full_format(&formatted);
KrakenParser::parse_orderbook(&response, &full_format)
})
}
async fn get_klines(
&self,
symbol: Symbol,
interval: &str,
_limit: Option<u16>,
account_type: AccountType,
_end_time: Option<i64>,
) -> ExchangeResult<Vec<Kline>> {
let formatted = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("pair".to_string(), formatted.clone());
params.insert("interval".to_string(), map_ohlc_interval(interval).to_string());
let response = self.get(KrakenEndpoint::SpotOHLC, params, account_type).await?;
KrakenParser::parse_klines(&response, &formatted)
.or_else(|_| {
let full_format = Self::to_full_format(&formatted);
KrakenParser::parse_klines(&response, &full_format)
})
}
async fn get_ticker(
&self,
symbol: Symbol,
account_type: AccountType,
) -> ExchangeResult<Ticker> {
let formatted = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("pair".to_string(), formatted.clone());
let response = self.get(KrakenEndpoint::SpotTicker, params, account_type).await?;
KrakenParser::parse_ticker(&response, &formatted)
.or_else(|_| {
let full_format = Self::to_full_format(&formatted);
KrakenParser::parse_ticker(&response, &full_format)
})
}
async fn ping(&self) -> ExchangeResult<()> {
let response = self.get(KrakenEndpoint::ServerTime, HashMap::new(), AccountType::Spot).await?;
KrakenParser::extract_result(&response)?;
Ok(())
}
async fn get_exchange_info(&self, account_type: AccountType) -> ExchangeResult<Vec<SymbolInfo>> {
let response = self.get_asset_pairs().await?;
let symbols = KrakenParser::parse_exchange_info(&response, account_type)?;
self.precision.load_from_symbols(&symbols);
Ok(symbols)
}
fn market_data_capabilities(&self, _account_type: AccountType) -> MarketDataCapabilities {
MarketDataCapabilities {
has_ping: true,
has_price: true,
has_ticker: true,
has_orderbook: true,
has_klines: true,
has_exchange_info: true,
has_recent_trades: false,
supported_intervals: &["1m", "5m", "15m", "30m", "1h", "4h", "1d", "1w", "15d"],
max_kline_limit: Some(720),
has_ws_klines: true,
has_ws_trades: true,
has_ws_orderbook: true,
has_ws_ticker: true,
}
}
}
#[async_trait]
impl Trading for KrakenConnector {
async fn place_order(&self, req: OrderRequest) -> ExchangeResult<PlaceOrderResponse> {
let symbol = req.symbol.clone();
let side = req.side;
let quantity = req.quantity;
let account_type = req.account_type;
let formatted = format_symbol(&symbol.base, &symbol.quote, account_type);
let side_str = match side { OrderSide::Buy => "buy", OrderSide::Sell => "sell" };
let sym = &formatted;
let endpoint = match account_type {
AccountType::Spot | AccountType::Margin => KrakenEndpoint::SpotAddOrder,
_ => KrakenEndpoint::FuturesSendOrder,
};
let (mut params, order_type_out, price_out, stop_price_out, tif_out) = match req.order_type {
OrderType::Market => {
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "market".to_string());
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
(p, OrderType::Market, None, None, crate::core::TimeInForce::Gtc)
}
OrderType::Limit { price } => {
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "limit".to_string());
p.insert("price".to_string(), self.precision.price(sym, price));
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
(p, OrderType::Limit { price }, Some(price), None, crate::core::TimeInForce::Gtc)
}
OrderType::PostOnly { price } => {
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "limit".to_string());
p.insert("price".to_string(), self.precision.price(sym, price));
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
p.insert("oflags".to_string(), "post".to_string());
(p, OrderType::PostOnly { price }, Some(price), None, crate::core::TimeInForce::Gtc)
}
OrderType::Ioc { price } => {
let px_val = price.unwrap_or(0.0);
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "limit".to_string());
p.insert("price".to_string(), self.precision.price(sym, px_val));
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
p.insert("timeinforce".to_string(), "IOC".to_string());
(p, OrderType::Ioc { price }, price, None, crate::core::TimeInForce::Ioc)
}
OrderType::Fok { price } => {
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "limit".to_string());
p.insert("price".to_string(), self.precision.price(sym, price));
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
p.insert("timeinforce".to_string(), "IOC".to_string());
(p, OrderType::Fok { price }, Some(price), None, crate::core::TimeInForce::Fok)
}
OrderType::StopMarket { stop_price } => {
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "stop-loss".to_string());
p.insert("price".to_string(), self.precision.price(sym, stop_price));
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
(p, OrderType::StopMarket { stop_price }, None, Some(stop_price), crate::core::TimeInForce::Gtc)
}
OrderType::StopLimit { stop_price, limit_price } => {
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "stop-loss-limit".to_string());
p.insert("price".to_string(), self.precision.price(sym, stop_price));
p.insert("price2".to_string(), self.precision.price(sym, limit_price));
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
(p, OrderType::StopLimit { stop_price, limit_price }, Some(limit_price), Some(stop_price), crate::core::TimeInForce::Gtc)
}
OrderType::Gtd { price, expire_time } => {
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "limit".to_string());
p.insert("price".to_string(), self.precision.price(sym, price));
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
p.insert("timeinforce".to_string(), "GTD".to_string());
p.insert("expiretm".to_string(), (expire_time / 1000).to_string());
(p, OrderType::Gtd { price, expire_time }, Some(price), None, crate::core::TimeInForce::Gtd)
}
OrderType::ReduceOnly { price } => {
match account_type {
AccountType::Spot | AccountType::Margin => {
return Err(ExchangeError::UnsupportedOperation(
"ReduceOnly not supported for spot on Kraken".to_string()
));
}
_ => {}
}
let ord_type = if price.is_some() { "lmt" } else { "mkt" };
let mut p = HashMap::new();
p.insert("symbol".to_string(), formatted.clone());
p.insert("side".to_string(), side_str.to_string());
p.insert("orderType".to_string(), ord_type.to_string());
p.insert("size".to_string(), self.precision.qty(sym, quantity));
p.insert("reduceOnly".to_string(), "true".to_string());
if let Some(px) = price {
p.insert("limitPrice".to_string(), self.precision.price(sym, px));
}
(p, OrderType::ReduceOnly { price }, price, None, crate::core::TimeInForce::Gtc)
}
OrderType::Iceberg { price, display_quantity } => {
let mut p = HashMap::new();
p.insert("pair".to_string(), formatted.clone());
p.insert("type".to_string(), side_str.to_string());
p.insert("ordertype".to_string(), "iceberg".to_string());
p.insert("price".to_string(), self.precision.price(sym, price));
p.insert("volume".to_string(), self.precision.qty(sym, quantity));
p.insert("displayvol".to_string(), self.precision.qty(sym, display_quantity));
(p, OrderType::Iceberg { price, display_quantity }, Some(price), None, crate::core::TimeInForce::Gtc)
}
OrderType::TrailingStop { .. } | OrderType::Oco { .. } | OrderType::Bracket { .. }
| OrderType::Twap { .. }
| OrderType::Oto { .. } | OrderType::ConditionalPlan { .. } | OrderType::DcaRecurring { .. } => {
return Err(ExchangeError::UnsupportedOperation(
format!("{:?} order type not supported on {:?}", req.order_type, self.exchange_id())
));
}
};
if matches!(account_type, AccountType::FuturesCross | AccountType::FuturesIsolated) {
if let Some(pair) = params.remove("pair") {
params.insert("symbol".to_string(), pair);
}
if let Some(t) = params.remove("type") {
params.insert("side".to_string(), t);
}
if let Some(ot) = params.remove("ordertype") {
let futures_type = match ot.as_str() {
"market" => "mkt",
"limit" => "lmt",
"stop-loss" => "stp",
_ => "lmt",
};
params.insert("orderType".to_string(), futures_type.to_string());
}
if let Some(vol) = params.remove("volume") {
params.insert("size".to_string(), vol);
}
if let Some(px) = params.remove("price") {
params.insert("limitPrice".to_string(), px);
}
}
if let Some(ref cl_id) = req.client_order_id {
params.insert("cl_ord_id".to_string(), cl_id.clone());
}
let response = self.post(endpoint, params, account_type).await?;
let order_id = KrakenParser::parse_order_id(&response)?;
Ok(PlaceOrderResponse::Simple(Order {
id: order_id,
client_order_id: req.client_order_id,
symbol: symbol.to_string(),
side,
order_type: order_type_out,
status: crate::core::OrderStatus::New,
price: price_out,
stop_price: stop_price_out,
quantity,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: crate::core::timestamp_millis() as i64,
updated_at: None,
time_in_force: tif_out,
}))
}
async fn get_order_history(
&self,
filter: OrderHistoryFilter,
account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
match account_type {
AccountType::Spot | AccountType::Margin => {
let mut params = HashMap::new();
if let Some(start) = filter.start_time {
params.insert("start".to_string(), (start / 1000).to_string());
}
if let Some(end) = filter.end_time {
params.insert("end".to_string(), (end / 1000).to_string());
}
let response = self.post(KrakenEndpoint::SpotClosedOrders, params, account_type).await?;
KrakenParser::parse_closed_orders(&response)
}
_ => {
let mut params = HashMap::new();
if let Some(start) = filter.start_time {
params.insert("lastFillTime".to_string(), start.to_string());
}
let response = self.get(KrakenEndpoint::FuturesHistory, params, account_type).await?;
KrakenParser::parse_futures_fills(&response)
}
}
}
async fn cancel_order(&self, req: CancelRequest) -> ExchangeResult<Order> {
match req.scope {
CancelScope::Single { ref order_id } => {
let symbol = req.symbol.as_ref()
.ok_or_else(|| ExchangeError::InvalidRequest("Symbol required for cancel".into()))?
.clone();
let account_type = req.account_type;
let mut params = HashMap::new();
params.insert("txid".to_string(), order_id.to_string());
let response = self.post(KrakenEndpoint::SpotCancelOrder, params, account_type).await?;
KrakenParser::extract_result(&response)?;
Ok(Order {
id: order_id.to_string(),
client_order_id: None,
symbol: symbol.to_string(),
side: OrderSide::Buy,
order_type: OrderType::Limit { price: 0.0 },
status: crate::core::OrderStatus::Canceled,
price: None,
stop_price: None,
quantity: 0.0,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: Some(crate::core::timestamp_millis() as i64),
time_in_force: crate::core::TimeInForce::Gtc,
})
}
CancelScope::All { ref symbol } => {
let account_type = req.account_type;
let mut params = HashMap::new();
if let Some(sym) = symbol {
if matches!(account_type, AccountType::FuturesCross | AccountType::FuturesIsolated) {
params.insert("symbol".to_string(),
format_symbol(&sym.base, &sym.quote, account_type));
}
}
let cancel_all_endpoint = match account_type {
AccountType::Spot | AccountType::Margin => KrakenEndpoint::SpotCancelOrder,
_ => KrakenEndpoint::FuturesCancelOrder,
};
let response = self.post(cancel_all_endpoint, params, account_type).await?;
let _ = response;
let sym_str = symbol.as_ref().map(|s| s.to_string()).unwrap_or_default();
Ok(Order {
id: format!("cancel_all_{}", crate::core::timestamp_millis()),
client_order_id: None,
symbol: sym_str,
side: OrderSide::Buy,
order_type: OrderType::Market,
status: crate::core::OrderStatus::Canceled,
price: None,
stop_price: None,
quantity: 0.0,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: Some(crate::core::timestamp_millis() as i64),
time_in_force: crate::core::TimeInForce::Gtc,
})
}
CancelScope::BySymbol { ref symbol } => {
let account_type = req.account_type;
let mut params = HashMap::new();
if matches!(account_type, AccountType::FuturesCross | AccountType::FuturesIsolated) {
params.insert("symbol".to_string(),
format_symbol(&symbol.base, &symbol.quote, account_type));
}
let cancel_all_endpoint = match account_type {
AccountType::Spot | AccountType::Margin => KrakenEndpoint::SpotCancelOrder,
_ => KrakenEndpoint::FuturesCancelOrder,
};
let response = self.post(cancel_all_endpoint, params, account_type).await?;
let _ = response;
Ok(Order {
id: format!("cancel_all_{}", crate::core::timestamp_millis()),
client_order_id: None,
symbol: symbol.to_string(),
side: OrderSide::Buy,
order_type: OrderType::Market,
status: crate::core::OrderStatus::Canceled,
price: None,
stop_price: None,
quantity: 0.0,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: Some(crate::core::timestamp_millis() as i64),
time_in_force: crate::core::TimeInForce::Gtc,
})
}
CancelScope::Batch { ref order_ids } => {
let symbol = req.symbol.as_ref()
.ok_or_else(|| ExchangeError::InvalidRequest("Symbol required for batch cancel".into()))?
.clone();
let account_type = req.account_type;
match account_type {
AccountType::Spot | AccountType::Margin => {
return Err(ExchangeError::UnsupportedOperation(
"Kraken Spot does not support batch cancel. Cancel orders individually.".to_string()
));
}
_ => {}
}
let _ = (order_ids, symbol);
Err(ExchangeError::UnsupportedOperation(
"Kraken Futures batch cancel requires individual cancels. Use CancelScope::Single.".to_string()
))
}
CancelScope::ByLabel(_)
| CancelScope::ByCurrencyKind { .. }
| CancelScope::ScheduledAt(_) => Err(ExchangeError::UnsupportedOperation(
"Kraken does not support this cancel scope".to_string()
)),
}
}
async fn get_order(
&self,
_symbol: &str,
order_id: &str,
account_type: AccountType,
) -> ExchangeResult<Order> {
let _symbol_parts: Vec<&str> = _symbol.split('/').collect();
let _symbol = if _symbol_parts.len() == 2 {
crate::core::Symbol::new(_symbol_parts[0], _symbol_parts[1])
} else {
crate::core::Symbol { base: _symbol.to_string(), quote: String::new(), raw: Some(_symbol.to_string()) }
};
let mut params = HashMap::new();
params.insert("txid".to_string(), order_id.to_string());
let response = self.post(KrakenEndpoint::SpotGetOrder, params, account_type).await?;
KrakenParser::parse_order(&response, order_id)
}
async fn get_open_orders(
&self,
_symbol: Option<&str>,
account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
let _symbol_str = _symbol;
let _symbol: Option<crate::core::Symbol> = _symbol_str.map(|s| {
let parts: Vec<&str> = s.split('/').collect();
if parts.len() == 2 {
crate::core::Symbol::new(parts[0], parts[1])
} else {
crate::core::Symbol { base: s.to_string(), quote: String::new(), raw: Some(s.to_string()) }
}
});
let params = HashMap::new();
let response = self.post(KrakenEndpoint::SpotOpenOrders, params, account_type).await?;
KrakenParser::parse_open_orders(&response)
}
async fn get_user_trades(
&self,
filter: UserTradeFilter,
account_type: AccountType,
) -> ExchangeResult<Vec<UserTrade>> {
match account_type {
AccountType::FuturesCross | AccountType::FuturesIsolated => {
return Err(ExchangeError::UnsupportedOperation(
"get_user_trades is not supported for Kraken Futures (use get_order_history)".to_string(),
));
}
_ => {}
}
let page_size: u32 = 50;
let max_trades = filter.limit.unwrap_or(page_size);
let start_secs = filter.start_time.map(|ms| ms / 1000);
let end_secs = filter.end_time.map(|ms| ms / 1000);
let mut all_trades: Vec<UserTrade> = Vec::new();
let mut offset: u32 = 0;
loop {
let response = self.get_trades_history(
None,
start_secs.map(|s| s as i64),
end_secs.map(|s| s as i64),
if offset > 0 { Some(offset) } else { None },
).await?;
let mut page = KrakenParser::parse_trades_history(&response)?;
if let Some(ref oid) = filter.order_id {
page.retain(|t| &t.order_id == oid);
}
if let Some(ref sym) = filter.symbol {
let sym_upper = sym.to_uppercase();
page.retain(|t| t.symbol.to_uppercase().contains(&sym_upper));
}
let page_len = page.len() as u32;
all_trades.extend(page);
if all_trades.len() as u32 >= max_trades || page_len < page_size {
break;
}
offset += page_size;
}
all_trades.truncate(max_trades as usize);
Ok(all_trades)
}
fn trading_capabilities(&self, account_type: AccountType) -> TradingCapabilities {
let is_futures = !matches!(account_type, AccountType::Spot | AccountType::Margin);
TradingCapabilities {
has_market_order: true,
has_limit_order: true,
has_stop_market: true, has_stop_limit: true, has_trailing_stop: false,
has_bracket: false,
has_oco: false,
has_amend: true,
has_batch: is_futures,
max_batch_size: if is_futures { Some(10) } else { None },
has_cancel_all: true,
has_user_trades: !is_futures,
has_order_history: true,
}
}
}
#[async_trait]
impl Account for KrakenConnector {
async fn get_balance(&self, query: BalanceQuery) -> ExchangeResult<Vec<Balance>> {
let _asset = query.asset.clone();
let account_type = query.account_type;
let params = HashMap::new();
let response = self.post(KrakenEndpoint::SpotBalance, params, account_type).await?;
KrakenParser::parse_balances(&response)
}
async fn get_account_info(&self, account_type: AccountType) -> ExchangeResult<AccountInfo> {
let balances = self.get_balance(BalanceQuery { asset: None, account_type }).await?;
Ok(AccountInfo {
account_type,
can_trade: true,
can_withdraw: true,
can_deposit: true,
maker_commission: 0.16, taker_commission: 0.26, balances,
})
}
async fn get_fees(&self, symbol: Option<&str>) -> ExchangeResult<FeeInfo> {
let account_type = AccountType::Spot;
let mut params = HashMap::new();
if let Some(sym) = symbol {
let parts: Vec<&str> = sym.split('/').collect();
let formatted = if parts.len() == 2 {
format_symbol(parts[0], parts[1], account_type)
} else {
sym.to_string()
};
params.insert("pair".to_string(), formatted);
}
let response = self.post(KrakenEndpoint::SpotTradeBalance, params, account_type).await?;
let result = KrakenParser::extract_result(&response)?;
let maker_rate = result.get("fee")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.map(|v| v / 100.0)
.unwrap_or(0.0016);
let taker_rate = maker_rate;
Ok(FeeInfo {
maker_rate,
taker_rate,
symbol: symbol.map(String::from),
tier: None,
})
}
fn account_capabilities(&self, account_type: AccountType) -> AccountCapabilities {
let is_futures = !matches!(account_type, AccountType::Spot | AccountType::Margin);
AccountCapabilities {
has_balances: true,
has_account_info: true,
has_fees: true,
has_transfers: false,
has_sub_accounts: true,
has_deposit_withdraw: !is_futures,
has_margin: false,
has_earn_staking: false,
has_funding_history: !is_futures,
has_ledger: !is_futures,
has_convert: false,
has_positions: false,
}
}
}
#[async_trait]
impl Positions for KrakenConnector {
async fn get_positions(&self, query: PositionQuery) -> ExchangeResult<Vec<Position>> {
let _symbol = query.symbol.clone();
let account_type = query.account_type;
match account_type {
AccountType::Spot | AccountType::Margin => {
return Err(ExchangeError::UnsupportedOperation(
"Positions not supported for Spot/Margin".to_string()
));
}
_ => {}
}
let response = self.get(
KrakenEndpoint::FuturesOpenPositions,
HashMap::new(),
account_type,
).await?;
KrakenParser::parse_futures_positions(&response)
}
async fn get_funding_rate(
&self,
symbol: &str,
account_type: AccountType,
) -> ExchangeResult<FundingRate> {
let symbol_str = symbol;
let symbol = {
let parts: Vec<&str> = symbol_str.split('/').collect();
if parts.len() == 2 {
crate::core::Symbol::new(parts[0], parts[1])
} else {
crate::core::Symbol { base: symbol_str.to_string(), quote: String::new(), raw: Some(symbol_str.to_string()) }
}
};
match account_type {
AccountType::Spot | AccountType::Margin => {
return Err(ExchangeError::UnsupportedOperation(
"Funding rate not supported for Spot/Margin".to_string()
));
}
_ => {}
}
let formatted = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("symbol".to_string(), formatted.clone());
let response = self.get(
KrakenEndpoint::FuturesHistoricalFunding,
params,
account_type,
).await?;
KrakenParser::parse_funding_rate(&response, &formatted)
}
async fn modify_position(&self, req: PositionModification) -> ExchangeResult<()> {
match req {
PositionModification::SetLeverage { ref symbol, leverage, account_type } => {
let symbol = symbol.clone();
match account_type {
AccountType::Spot | AccountType::Margin => {
return Err(ExchangeError::UnsupportedOperation(
"Leverage not supported for Spot/Margin".to_string()
));
}
_ => {}
}
let formatted = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("symbol".to_string(), formatted);
params.insert("maxLeverage".to_string(), leverage.to_string());
let response = self.post(KrakenEndpoint::FuturesSetLeverage, params, account_type).await?;
KrakenParser::extract_futures_data(&response)?;
Ok(())
}
PositionModification::ClosePosition { ref symbol, account_type } => {
let symbol = symbol.clone();
match account_type {
AccountType::Spot | AccountType::Margin => {
return Err(ExchangeError::UnsupportedOperation(
"ClosePosition only supported for futures on Kraken".to_string()
));
}
_ => {}
}
let formatted = format_symbol(&symbol.base, &symbol.quote, account_type);
let mut params = HashMap::new();
params.insert("symbol".to_string(), formatted);
params.insert("orderType".to_string(), "mkt".to_string());
params.insert("reduceOnly".to_string(), "true".to_string());
params.insert("side".to_string(), "buy".to_string());
params.insert("size".to_string(), "0".to_string());
let response = self.post(KrakenEndpoint::FuturesSendOrder, params, account_type).await?;
KrakenParser::extract_futures_data(&response)?;
Ok(())
}
PositionModification::SetMarginMode { .. }
| PositionModification::AddMargin { .. }
| PositionModification::RemoveMargin { .. }
| PositionModification::SetTpSl { .. }
| PositionModification::SwitchPositionMode { .. }
| PositionModification::MovePositions { .. } => {
Err(ExchangeError::UnsupportedOperation(
"This position modification is not supported on Kraken".to_string()
))
}
}
}
}
#[async_trait]
impl CancelAll for KrakenConnector {
async fn cancel_all_orders(
&self,
scope: CancelScope,
account_type: AccountType,
) -> ExchangeResult<CancelAllResponse> {
let symbol = match &scope {
CancelScope::All { symbol } => symbol.clone(),
CancelScope::BySymbol { symbol } => Some(symbol.clone()),
_ => {
return Err(ExchangeError::InvalidRequest(
"cancel_all_orders only accepts All or BySymbol scope".to_string()
));
}
};
match account_type {
AccountType::Spot | AccountType::Margin => {
let response = self.post(KrakenEndpoint::SpotCancelAll, HashMap::new(), account_type).await?;
KrakenParser::parse_cancel_all_response(&response)
}
_ => {
let mut params = HashMap::new();
if let Some(sym) = symbol {
params.insert(
"symbol".to_string(),
format_symbol(&sym.base, &sym.quote, account_type),
);
}
let response = self.post(KrakenEndpoint::FuturesCancelOrder, params, account_type).await?;
KrakenParser::parse_futures_cancel_all_response(&response)
}
}
}
}
#[async_trait]
impl AmendOrder for KrakenConnector {
async fn amend_order(&self, req: AmendRequest) -> ExchangeResult<Order> {
if req.fields.price.is_none() && req.fields.quantity.is_none() {
return Err(ExchangeError::InvalidRequest(
"At least one of price or quantity must be provided for amend".to_string()
));
}
let account_type = req.account_type;
let formatted = format_symbol(&req.symbol.base, &req.symbol.quote, account_type);
let symbol_str = req.symbol.to_string();
match account_type {
AccountType::Spot | AccountType::Margin => {
let mut params = HashMap::new();
params.insert("txid".to_string(), req.order_id.clone());
params.insert("pair".to_string(), formatted.clone());
if let Some(price) = req.fields.price {
params.insert("price".to_string(), self.precision.price(&formatted, price));
}
if let Some(qty) = req.fields.quantity {
params.insert("volume".to_string(), self.precision.qty(&formatted, qty));
}
let response = self.post(KrakenEndpoint::SpotEditOrder, params, account_type).await?;
KrakenParser::parse_amend_spot_order(&response, &symbol_str)
}
_ => {
let mut params = HashMap::new();
params.insert("orderId".to_string(), req.order_id.clone());
params.insert("symbol".to_string(), formatted.clone());
if let Some(price) = req.fields.price {
params.insert("limitPrice".to_string(), self.precision.price(&formatted, price));
}
if let Some(qty) = req.fields.quantity {
params.insert("size".to_string(), self.precision.qty(&formatted, qty));
}
let response = self.post(KrakenEndpoint::FuturesEditOrder, params, account_type).await?;
KrakenParser::parse_amend_futures_order(&response, &symbol_str)
}
}
}
}
#[async_trait]
impl BatchOrders for KrakenConnector {
async fn place_orders_batch(
&self,
orders: Vec<OrderRequest>,
) -> ExchangeResult<Vec<OrderResult>> {
if orders.is_empty() {
return Ok(vec![]);
}
let account_type = orders[0].account_type;
match account_type {
AccountType::Spot | AccountType::Margin => {
return Err(ExchangeError::UnsupportedOperation(
"Batch orders not supported on Kraken Spot (futures only)".to_string()
));
}
_ => {}
}
if orders.len() > self.max_batch_place_size() {
return Err(ExchangeError::InvalidRequest(
format!("Batch size {} exceeds Kraken Futures limit of {}", orders.len(), self.max_batch_place_size())
));
}
let batch_json: Vec<serde_json::Value> = orders.iter().map(|req| {
let formatted = format_symbol(&req.symbol.base, &req.symbol.quote, account_type);
let side_str = match req.side { OrderSide::Buy => "buy", OrderSide::Sell => "sell" };
let mut obj = json!({
"order": "send",
"symbol": formatted,
"side": side_str,
"size": req.quantity as i64,
});
match req.order_type {
OrderType::Market => {
obj["orderType"] = json!("mkt");
}
OrderType::Limit { price } => {
obj["orderType"] = json!("lmt");
obj["limitPrice"] = json!(self.precision.price(&formatted, price));
}
_ => {
obj["orderType"] = json!("mkt");
}
}
if req.reduce_only {
obj["reduceOnly"] = json!(true);
}
if let Some(ref cid) = req.client_order_id {
obj["cl_ord_id"] = json!(cid);
}
obj
}).collect();
let mut params = HashMap::new();
let batch_str = serde_json::to_string(&batch_json)
.map_err(|e| ExchangeError::Parse(format!("Failed to serialize batch orders: {}", e)))?;
params.insert("json".to_string(), batch_str);
let response = self.post(KrakenEndpoint::FuturesBatchOrder, params, account_type).await?;
KrakenParser::parse_batch_orders_response(&response)
}
async fn cancel_orders_batch(
&self,
order_ids: Vec<String>,
_symbol: Option<&str>,
account_type: AccountType,
) -> ExchangeResult<Vec<OrderResult>> {
match account_type {
AccountType::Spot | AccountType::Margin => {
return Err(ExchangeError::UnsupportedOperation(
"Batch cancel not supported on Kraken Spot".to_string()
));
}
_ => {}
}
let cancel_json: Vec<serde_json::Value> = order_ids.iter().map(|id| {
json!({
"order": "cancel",
"order_id": id,
})
}).collect();
let mut params = HashMap::new();
let batch_str = serde_json::to_string(&cancel_json)
.map_err(|e| ExchangeError::Parse(format!("Failed to serialize cancel batch: {}", e)))?;
params.insert("json".to_string(), batch_str);
let response = self.post(KrakenEndpoint::FuturesBatchOrder, params, account_type).await?;
KrakenParser::parse_batch_orders_response(&response)
}
fn max_batch_place_size(&self) -> usize {
10 }
fn max_batch_cancel_size(&self) -> usize {
10 }
}
#[async_trait]
impl CustodialFunds for KrakenConnector {
async fn get_deposit_address(
&self,
asset: &str,
network: Option<&str>,
) -> ExchangeResult<DepositAddress> {
let kraken_asset = map_asset_to_kraken(asset);
let mut params = HashMap::new();
params.insert("asset".to_string(), kraken_asset.to_string());
if let Some(method) = network {
params.insert("method".to_string(), method.to_string());
}
params.insert("new".to_string(), "false".to_string());
let response = self.post(
KrakenEndpoint::SpotDepositAddresses,
params,
AccountType::Spot,
).await?;
KrakenParser::parse_deposit_address(&response, asset)
}
async fn withdraw(&self, req: WithdrawRequest) -> ExchangeResult<WithdrawResponse> {
let kraken_asset = map_asset_to_kraken(&req.asset);
let mut params = HashMap::new();
params.insert("asset".to_string(), kraken_asset.to_string());
params.insert("key".to_string(), req.address.clone());
params.insert("amount".to_string(), req.amount.to_string());
let response = self.post(
KrakenEndpoint::SpotWithdraw,
params,
AccountType::Spot,
).await?;
KrakenParser::parse_withdraw_response(&response)
}
async fn get_funds_history(
&self,
filter: FundsHistoryFilter,
) -> ExchangeResult<Vec<FundsRecord>> {
let asset = filter.asset.as_deref().unwrap_or("");
let kraken_asset = if asset.is_empty() {
String::new()
} else {
map_asset_to_kraken(asset)
};
match filter.record_type {
FundsRecordType::Deposit => {
let mut params = HashMap::new();
if !kraken_asset.is_empty() {
params.insert("asset".to_string(), kraken_asset.to_string());
}
let response = self.post(
KrakenEndpoint::SpotDepositStatus,
params,
AccountType::Spot,
).await?;
KrakenParser::parse_deposit_history(&response)
}
FundsRecordType::Withdrawal => {
let mut params = HashMap::new();
if !kraken_asset.is_empty() {
params.insert("asset".to_string(), kraken_asset.to_string());
}
let response = self.post(
KrakenEndpoint::SpotWithdrawStatus,
params,
AccountType::Spot,
).await?;
KrakenParser::parse_withdrawal_history(&response)
}
FundsRecordType::Both => {
let mut deposits_params = HashMap::new();
let mut withdrawals_params = HashMap::new();
if !kraken_asset.is_empty() {
deposits_params.insert("asset".to_string(), kraken_asset.to_string());
withdrawals_params.insert("asset".to_string(), kraken_asset.to_string());
}
let dep_response = self.post(
KrakenEndpoint::SpotDepositStatus,
deposits_params,
AccountType::Spot,
).await?;
let wit_response = self.post(
KrakenEndpoint::SpotWithdrawStatus,
withdrawals_params,
AccountType::Spot,
).await?;
let mut records = KrakenParser::parse_deposit_history(&dep_response)?;
records.extend(KrakenParser::parse_withdrawal_history(&wit_response)?);
Ok(records)
}
}
}
}
#[async_trait]
impl SubAccounts for KrakenConnector {
async fn sub_account_operation(
&self,
op: SubAccountOperation,
) -> ExchangeResult<SubAccountResult> {
match op {
SubAccountOperation::List => {
let response = self.post(
KrakenEndpoint::SpotListSubaccounts,
HashMap::new(),
AccountType::Spot,
).await?;
KrakenParser::parse_list_subaccounts(&response)
}
SubAccountOperation::Transfer { sub_account_id, asset, amount, to_sub } => {
let kraken_asset = map_asset_to_kraken(&asset);
let mut params = HashMap::new();
params.insert("asset".to_string(), kraken_asset.to_string());
params.insert("amount".to_string(), amount.to_string());
params.insert("subaccount".to_string(), sub_account_id.clone());
let endpoint = if to_sub {
KrakenEndpoint::SpotTransferToSubaccount
} else {
KrakenEndpoint::SpotTransferFromSubaccount
};
let response = self.post(endpoint, params, AccountType::Spot).await?;
KrakenParser::parse_subaccount_transfer(&response)
}
SubAccountOperation::Create { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Kraken does not support sub-account creation via standard API".to_string()
))
}
SubAccountOperation::GetBalance { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Kraken does not support per-sub-account balance queries via standard API".to_string()
))
}
}
}
}
fn map_asset_to_kraken(asset: &str) -> String {
match asset.to_uppercase().as_str() {
"BTC" | "XBT" => "XXBT".to_string(),
"ETH" => "XETH".to_string(),
"LTC" => "XLTC".to_string(),
"XRP" => "XXRP".to_string(),
"USD" => "ZUSD".to_string(),
"EUR" => "ZEUR".to_string(),
"GBP" => "ZGBP".to_string(),
"CAD" => "ZCAD".to_string(),
"JPY" => "ZJPY".to_string(),
other => other.to_string(),
}
}
impl KrakenConnector {
fn to_full_format(symbol: &str) -> String {
let mut result = symbol.to_string();
if (result.starts_with("XBT") && !result.starts_with("XXBT"))
|| ((result.starts_with("ETH") || result.starts_with("LTC"))
&& !result.starts_with("XETH") && !result.starts_with("XLTC")) {
result = format!("X{}", result);
}
if result.ends_with("USD") && !result.ends_with("ZUSD") {
let base = &result[..result.len() - 3];
result = format!("{}ZUSD", base);
} else if result.ends_with("EUR") && !result.ends_with("ZEUR") {
let base = &result[..result.len() - 3];
result = format!("{}ZEUR", base);
}
result
}
}
#[async_trait]
impl FundingHistory for KrakenConnector {
async fn get_funding_payments(
&self,
filter: FundingFilter,
_account_type: AccountType,
) -> ExchangeResult<Vec<FundingPayment>> {
let mut params = HashMap::new();
params.insert("type".to_string(), "rollover".to_string());
if let Some(start) = filter.start_time {
params.insert("start".to_string(), (start / 1000).to_string());
}
if let Some(end) = filter.end_time {
params.insert("end".to_string(), (end / 1000).to_string());
}
let response = self
.post(KrakenEndpoint::SpotLedgers, params, AccountType::Spot)
.await?;
KrakenParser::parse_funding_payments(&response)
}
}
#[async_trait]
impl AccountLedger for KrakenConnector {
async fn get_ledger(
&self,
filter: LedgerFilter,
_account_type: AccountType,
) -> ExchangeResult<Vec<LedgerEntry>> {
let mut params = HashMap::new();
if let Some(asset) = &filter.asset {
params.insert("asset".to_string(), asset.clone());
}
if let Some(start) = filter.start_time {
params.insert("start".to_string(), (start / 1000).to_string());
}
if let Some(end) = filter.end_time {
params.insert("end".to_string(), (end / 1000).to_string());
}
params.insert("ofs".to_string(), "0".to_string());
let response = self
.post(KrakenEndpoint::SpotLedgers, params, AccountType::Spot)
.await?;
KrakenParser::parse_ledger(&response)
}
}