use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use reqwest::header::HeaderMap;
use serde_json::{json, Value};
use crate::core::{
HttpClient, Credentials,
ExchangeId, AccountType,
ExchangeError, ExchangeResult,
Price, Kline, Ticker, OrderBook,
Order, OrderSide, OrderType, Balance, AccountInfo,
OrderRequest, CancelRequest, CancelScope,
BalanceQuery,
OrderHistoryFilter, PlaceOrderResponse, FeeInfo,
CancelAllResponse,
ExchangeIdentity, MarketData, Trading, Account,
CancelAll, AmendOrder, CustodialFunds,
AmendRequest,
DepositAddress, WithdrawResponse, FundsRecord,
UserTrade, UserTradeFilter,
MarketDataCapabilities, TradingCapabilities, AccountCapabilities,
};
use crate::core::traits::MarketDataPublic;
use crate::core::types::{PublicTrade, TradeSide};
use crate::core::types::SymbolInput;
use crate::core::types::{WithdrawRequest, FundsHistoryFilter, FundsRecordType};
use crate::core::types::StreamEvent;
use crate::core::timestamp_millis;
use crate::core::types::SymbolInfo;
use crate::core::types::ConnectorStats;
use crate::core::utils::{RuntimeLimiter, RateLimitMonitor, RateLimitPressure};
use crate::core::types::{RateLimitCapabilities, LimitModel, RestLimitPool, WsLimits, EndpointWeight, OrderbookCapabilities};
use crate::core::utils::PrecisionCache;
use super::endpoints::{UpbitUrls, UpbitEndpoint, map_kline_interval};
use super::auth::{UpbitAuth, json_to_query_string};
use super::parser::UpbitParser;
use crate::core::utils::symbol_normalizer::SymbolNormalizer;
static UPBIT_POOLS: &[RestLimitPool] = &[
RestLimitPool {
name: "market",
max_budget: 10,
window_seconds: 1,
is_weight: false,
has_server_headers: true,
server_header: Some("Remaining-Req"),
header_reports_used: false,
},
RestLimitPool {
name: "account",
max_budget: 30,
window_seconds: 1,
is_weight: false,
has_server_headers: true,
server_header: Some("Remaining-Req"),
header_reports_used: false,
},
RestLimitPool {
name: "order",
max_budget: 8,
window_seconds: 1,
is_weight: false,
has_server_headers: true,
server_header: Some("Remaining-Req"),
header_reports_used: false,
},
];
static UPBIT_RATE_CAPS: RateLimitCapabilities = RateLimitCapabilities {
model: LimitModel::Group,
rest_pools: UPBIT_POOLS,
decaying: None,
endpoint_weights: &[] as &[EndpointWeight],
ws: WsLimits {
max_connections: None,
max_subs_per_conn: None,
max_msg_per_sec: Some(5),
max_streams_per_conn: None,
},
};
pub struct UpbitConnector {
http: HttpClient,
auth: Option<UpbitAuth>,
urls: UpbitUrls,
limiter: Arc<Mutex<RuntimeLimiter>>,
monitor: Arc<Mutex<RateLimitMonitor>>,
precision: PrecisionCache,
}
impl UpbitConnector {
pub async fn new(credentials: Option<Credentials>, region: &str) -> ExchangeResult<Self> {
let urls = match region {
"kr" | "korea" => UpbitUrls::KOREA,
"sg" | "singapore" => UpbitUrls::SINGAPORE,
"id" => UpbitUrls::INDONESIA,
"th" => UpbitUrls::THAILAND,
_ => UpbitUrls::KOREA, };
let http = HttpClient::new(30_000)?;
let auth = credentials
.as_ref()
.map(UpbitAuth::new)
.transpose()?;
let limiter = Arc::new(Mutex::new(RuntimeLimiter::from_caps(&UPBIT_RATE_CAPS)));
let monitor = Arc::new(Mutex::new(RateLimitMonitor::new("Upbit")));
Ok(Self {
http,
auth,
urls,
limiter,
monitor,
precision: PrecisionCache::new(),
})
}
pub async fn public() -> ExchangeResult<Self> {
Self::new(None, "kr").await
}
fn update_rate_from_headers(&self, headers: &HeaderMap) {
let header_val = match headers
.get("Remaining-Req")
.and_then(|v| v.to_str().ok())
{
Some(s) => s.to_string(),
None => return,
};
let group_name = header_val
.split(';')
.find(|part| part.trim().starts_with("group="))
.and_then(|part| part.trim().strip_prefix("group="))
.map(|s| s.trim().to_string());
let remaining_sec = header_val
.split(';')
.find(|part| part.trim().starts_with("sec="))
.and_then(|part| part.trim().strip_prefix("sec="))
.and_then(|v| v.trim().parse::<u32>().ok());
if let (Some(group), Some(remaining)) = (group_name, remaining_sec) {
let group_max = match group.as_str() {
"market" => 10u32,
"account" => 30u32,
"order" => 8u32,
_ => return,
};
let used = group_max.saturating_sub(remaining);
if let Ok(mut limiter) = self.limiter.lock() {
limiter.update_from_server(&group, used);
}
}
}
async fn rate_limit_wait(&self, group: &str, weight: u32, essential: bool) -> bool {
loop {
let wait_time = {
let mut limiter = self.limiter.lock().expect("limiter poisoned");
let pressure = self.monitor.lock().expect("monitor poisoned").check(&mut limiter);
if pressure >= RateLimitPressure::Cutoff && !essential {
return false;
}
if limiter.try_acquire(group, weight) {
return true;
}
limiter.time_until_ready(group, weight)
};
if wait_time > Duration::ZERO {
tokio::time::sleep(wait_time).await;
}
}
}
async fn get(
&self,
endpoint: UpbitEndpoint,
params: HashMap<String, String>,
account_type: AccountType,
) -> ExchangeResult<Value> {
let group = if endpoint.requires_auth() { "account" } else { "market" };
let essential = endpoint.requires_auth();
if !self.rate_limit_wait(group, 1, essential).await {
return Err(ExchangeError::RateLimitExceeded {
retry_after: None,
message: "Rate limit budget >= 90% used; market data request dropped".to_string(),
});
}
let base_url = self.urls.rest_url(account_type);
let mut path = endpoint.path().to_string();
if endpoint == UpbitEndpoint::CandlesMinutes {
if let Some(unit) = params.get("unit") {
path = format!("{}/{}", path, unit);
}
}
let query_string = if params.is_empty() {
String::new()
} else {
let pairs: Vec<_> = params.iter()
.filter(|(k, _)| *k != "unit") .map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
url::form_urlencoded::Serializer::new(String::new())
.extend_pairs(pairs)
.finish()
};
let url = if query_string.is_empty() {
format!("{}{}", base_url, path)
} else {
format!("{}{}?{}", base_url, path, query_string)
};
let headers = if endpoint.requires_auth() {
let auth = self.auth.as_ref()
.ok_or_else(|| ExchangeError::Auth("Authentication required".to_string()))?;
auth.sign_request("GET", &path, Some(&query_string))?
} else {
HashMap::new()
};
let (response, resp_headers) = self.http.get_with_response_headers(&url, &HashMap::new(), &headers).await?;
self.update_rate_from_headers(&resp_headers);
Ok(response)
}
async fn post(
&self,
endpoint: UpbitEndpoint,
body: Value,
_account_type: AccountType,
) -> ExchangeResult<Value> {
self.rate_limit_wait("order", 1, true).await;
let base_url = self.urls.rest;
let path = endpoint.path();
let url = format!("{}{}", base_url, path);
let auth = self.auth.as_ref()
.ok_or_else(|| ExchangeError::Auth("Authentication required".to_string()))?;
let body_str = body.to_string();
let query_string = json_to_query_string(&body_str)?;
let headers = auth.sign_request("POST", path, Some(&query_string))?;
let (response, resp_headers) = self.http.post_with_response_headers(&url, &body, &headers).await?;
self.update_rate_from_headers(&resp_headers);
Ok(response)
}
async fn delete(
&self,
endpoint: UpbitEndpoint,
params: HashMap<String, String>,
_account_type: AccountType,
) -> ExchangeResult<Value> {
self.rate_limit_wait("order", 1, true).await;
let base_url = self.urls.rest;
let path = endpoint.path();
let query_string = if params.is_empty() {
String::new()
} else {
let pairs: Vec<_> = params.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
url::form_urlencoded::Serializer::new(String::new())
.extend_pairs(pairs)
.finish()
};
let url = if query_string.is_empty() {
format!("{}{}", base_url, path)
} else {
format!("{}{}?{}", base_url, path, query_string)
};
let auth = self.auth.as_ref()
.ok_or_else(|| ExchangeError::Auth("Authentication required".to_string()))?;
let headers = auth.sign_request("DELETE", path, Some(&query_string))?;
let (response, resp_headers) = self.http.delete_with_response_headers(&url, &HashMap::new(), &headers).await?;
self.update_rate_from_headers(&resp_headers);
Ok(response)
}
pub async fn get_trading_pairs(&self) -> ExchangeResult<Vec<String>> {
let response = self.get(UpbitEndpoint::TradingPairs, HashMap::new(), AccountType::Spot).await?;
if let Some(arr) = response.as_array() {
Ok(arr.iter()
.filter_map(|v| v.get("market").and_then(|m| m.as_str()).map(String::from))
.collect())
} else {
Ok(vec![])
}
}
pub async fn get_closed_orders(
&self,
market: Option<&str>,
state: Option<&str>,
start_time: Option<i64>,
end_time: Option<i64>,
limit: Option<u32>,
cursor: Option<&str>,
) -> ExchangeResult<Value> {
let mut params = HashMap::new();
if let Some(m) = market {
params.insert("market".to_string(), m.to_string());
}
params.insert(
"state".to_string(),
state.unwrap_or("done").to_string(),
);
if let Some(st) = start_time {
params.insert("start_time".to_string(), st.to_string());
}
if let Some(et) = end_time {
params.insert("end_time".to_string(), et.to_string());
}
if let Some(l) = limit {
params.insert("limit".to_string(), l.clamp(1, 1000).to_string());
}
if let Some(c) = cursor {
params.insert("cursor".to_string(), c.to_string());
}
self.get(UpbitEndpoint::ClosedOrders, params, AccountType::Spot).await
}
pub async fn get_markets_with_warnings(&self) -> ExchangeResult<Vec<StreamEvent>> {
let mut params = HashMap::new();
params.insert("isDetails".to_string(), "true".to_string());
let response = self.get(UpbitEndpoint::TradingPairs, params, AccountType::Spot).await?;
let markets = response.as_array()
.ok_or_else(|| ExchangeError::Parse("Expected array from /v1/market/all".to_string()))?;
let now_ms = timestamp_millis() as i64;
let mut events = Vec::new();
for market in markets {
let market_code = market.get("market")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if market_code.is_empty() {
continue;
}
let market_event = match market.get("market_event") {
Some(ev) => ev,
None => continue,
};
let has_warning = market_event.get("warning")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let active_cautions: Vec<&str> = market_event
.get("caution")
.and_then(|c| c.as_object())
.map(|obj| {
obj.iter()
.filter(|(_, v)| v.as_bool().unwrap_or(false))
.map(|(k, _)| k.as_str())
.collect()
})
.unwrap_or_default();
if has_warning {
events.push(StreamEvent::MarketWarning {
symbol: Some(market_code.clone()),
warning_kind: "market_warning".to_string(),
message: "Upbit market warning flag active".to_string(),
timestamp: now_ms,
});
}
for caution in active_cautions {
events.push(StreamEvent::MarketWarning {
symbol: Some(market_code.clone()),
warning_kind: caution.to_lowercase(),
message: format!("Upbit caution: {}", caution),
timestamp: now_ms,
});
}
}
Ok(events)
}
}
impl ExchangeIdentity for UpbitConnector {
fn exchange_id(&self) -> ExchangeId {
ExchangeId::Upbit
}
fn metrics(&self) -> ConnectorStats {
let (http_requests, http_errors, last_latency_ms) = self.http.stats();
let (rate_used, rate_max, rate_groups) = if let Ok(mut limiter) = self.limiter.lock() {
let (used, max) = limiter.primary_stats();
let groups = limiter.group_stats();
(used, max, groups)
} else {
(0, 0, Vec::new())
};
ConnectorStats {
http_requests,
http_errors,
last_latency_ms,
rate_used,
rate_max,
rate_groups,
ws_ping_rtt_ms: 0,
}
}
fn rate_limit_capabilities(&self) -> RateLimitCapabilities {
UPBIT_RATE_CAPS
}
fn is_testnet(&self) -> bool {
false
}
fn supported_account_types(&self) -> Vec<AccountType> {
vec![AccountType::Spot]
}
fn orderbook_capabilities(&self, _account_type: AccountType) -> OrderbookCapabilities {
OrderbookCapabilities {
ws_depths: &[1, 5, 15, 30],
ws_default_depth: Some(30),
rest_max_depth: Some(30),
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: false,
update_speeds_ms: &[],
default_speed_ms: None,
ws_channels: &[],
checksum: None,
has_sequence: false,
has_prev_sequence: false,
supports_aggregation: true,
aggregation_levels: &[],
}
}
}
#[async_trait]
impl MarketData for UpbitConnector {
async fn get_price(&self, symbol: SymbolInput<'_>, account_type: AccountType) -> ExchangeResult<Price> {
let symbol = symbol.resolve(ExchangeId::Upbit, account_type)?;
let mut params = HashMap::new();
params.insert("markets".to_string(), symbol.to_string());
let response = self.get(UpbitEndpoint::Tickers, params, account_type).await?;
UpbitParser::parse_price(&response)
}
async fn get_orderbook(&self, symbol: SymbolInput<'_>, depth: Option<u16>, account_type: AccountType) -> ExchangeResult<OrderBook> {
let symbol = symbol.resolve(ExchangeId::Upbit, account_type)?;
let mut params = HashMap::new();
params.insert("markets".to_string(), symbol.to_string());
if let Some(n) = depth {
let count = n.clamp(1, 30);
params.insert("count".to_string(), count.to_string());
}
let response = self.get(UpbitEndpoint::Orderbook, params, account_type).await?;
UpbitParser::parse_orderbook(&response)
}
async fn get_klines(
&self,
symbol: SymbolInput<'_>,
interval: &str,
limit: Option<u16>,
account_type: AccountType,
end_time: Option<i64>,
) -> ExchangeResult<Vec<Kline>> {
let symbol = symbol.resolve(ExchangeId::Upbit, account_type)?;
let (endpoint, unit) = map_kline_interval(interval);
let mut params = HashMap::new();
params.insert("market".to_string(), symbol.to_string());
if let Some(u) = unit {
params.insert("unit".to_string(), u.to_string());
}
if let Some(l) = limit {
params.insert("count".to_string(), l.min(200).to_string());
}
if let Some(et) = end_time {
if let Some(dt) = chrono::DateTime::from_timestamp_millis(et) {
params.insert("to".to_string(), dt.format("%Y-%m-%dT%H:%M:%SZ").to_string());
}
}
let response = self.get(endpoint, params, account_type).await?;
UpbitParser::parse_klines(&response)
}
async fn get_ticker(&self, symbol: SymbolInput<'_>, account_type: AccountType) -> ExchangeResult<Ticker> {
let symbol = symbol.resolve(ExchangeId::Upbit, account_type)?;
let symbol_str = symbol.to_string();
let mut params = HashMap::new();
params.insert("markets".to_string(), symbol_str.clone());
let ticker_fut = self.get(UpbitEndpoint::Tickers, params.clone(), account_type);
let mut ob_params = HashMap::new();
ob_params.insert("markets".to_string(), symbol_str);
ob_params.insert("count".to_string(), "1".to_string());
let ob_fut = self.get(UpbitEndpoint::Orderbook, ob_params, account_type);
let (ticker_resp, ob_resp) = tokio::join!(ticker_fut, ob_fut);
let mut ticker = UpbitParser::parse_ticker(&ticker_resp?)?;
if let Ok(ob) = ob_resp {
if let Ok(ob_data) = UpbitParser::parse_orderbook(&ob) {
ticker.bid_price = ob_data.bids.first().map(|l| l.price);
ticker.ask_price = ob_data.asks.first().map(|l| l.price);
}
}
Ok(ticker)
}
async fn ping(&self) -> ExchangeResult<()> {
self.get(UpbitEndpoint::TradingPairs, HashMap::new(), AccountType::Spot).await?;
Ok(())
}
async fn get_exchange_info(&self, account_type: AccountType) -> ExchangeResult<Vec<SymbolInfo>> {
let response = self.get(UpbitEndpoint::TradingPairs, HashMap::new(), AccountType::Spot).await?;
let info = UpbitParser::parse_exchange_info(&response, account_type)?;
self.precision.load_from_symbols(&info);
Ok(info)
}
fn market_data_capabilities(&self, _account_type: AccountType) -> MarketDataCapabilities {
MarketDataCapabilities {
has_ping: true,
has_price: true,
has_ticker: true,
has_orderbook: true,
has_klines: true,
has_exchange_info: true,
has_recent_trades: false,
supported_intervals: &["1m", "3m", "5m", "10m", "15m", "30m", "1h", "4h", "1d", "1w", "1M"],
max_kline_limit: Some(200),
has_ws_klines: true,
has_ws_trades: true,
has_ws_orderbook: true,
has_ws_ticker: true,
}
}
}
#[async_trait]
impl Trading for UpbitConnector {
async fn place_order(&self, req: OrderRequest) -> ExchangeResult<PlaceOrderResponse> {
let symbol = req.symbol.clone();
let side = req.side;
let quantity = req.quantity;
let account_type = req.account_type;
match req.order_type {
OrderType::Market => {
let upbit_symbol = if let Some(raw) = symbol.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &symbol, account_type)
.unwrap_or_else(|_| format!("{}-{}", symbol.quote.to_uppercase(), symbol.base.to_uppercase()))
};
let (ord_type, side_str) = match side {
OrderSide::Buy => ("price", "bid"),
OrderSide::Sell => ("market", "ask"),
};
let mut body = json!({
"market": upbit_symbol,
"side": side_str,
"ord_type": ord_type,
});
match side {
OrderSide::Buy => {
body["price"] = json!(self.precision.qty(&upbit_symbol, quantity));
},
OrderSide::Sell => {
body["volume"] = json!(self.precision.qty(&upbit_symbol, quantity));
},
}
let response = self.post(UpbitEndpoint::CreateOrder, body, account_type).await?;
UpbitParser::parse_order(&response, &upbit_symbol).map(PlaceOrderResponse::Simple)
}
OrderType::Limit { price } => {
let upbit_symbol = if let Some(raw) = symbol.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &symbol, account_type)
.unwrap_or_else(|_| format!("{}-{}", symbol.quote.to_uppercase(), symbol.base.to_uppercase()))
};
let side_str = match side {
OrderSide::Buy => "bid",
OrderSide::Sell => "ask",
};
let body = json!({
"market": upbit_symbol,
"side": side_str,
"ord_type": "limit",
"volume": self.precision.qty(&upbit_symbol, quantity),
"price": self.precision.price(&upbit_symbol, price),
});
let response = self.post(UpbitEndpoint::CreateOrder, body, account_type).await?;
UpbitParser::parse_order(&response, &upbit_symbol).map(PlaceOrderResponse::Simple)
}
_ => Err(ExchangeError::UnsupportedOperation(
format!("{:?} order type not supported on {:?}", req.order_type, self.exchange_id())
)),
}
}
async fn get_order_history(
&self,
filter: OrderHistoryFilter,
account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
let mut params = HashMap::new();
params.insert("state".to_string(), "done".to_string());
if let Some(ref sym) = filter.symbol {
let upbit_symbol = if let Some(raw) = sym.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &sym, account_type)
.unwrap_or_else(|_| format!("{}-{}", sym.quote.to_uppercase(), sym.base.to_uppercase()))
};
params.insert("market".to_string(), upbit_symbol);
}
if let Some(lim) = filter.limit {
params.insert("limit".to_string(), lim.min(100).to_string());
}
let response = self.get(UpbitEndpoint::ListOrders, params, account_type).await?;
UpbitParser::parse_orders(&response)
}
async fn cancel_order(&self, req: CancelRequest) -> ExchangeResult<Order> {
match req.scope {
CancelScope::Single { ref order_id } => {
let symbol = req.symbol.as_ref()
.ok_or_else(|| ExchangeError::InvalidRequest("Symbol required for cancel".into()))?;
let account_type = req.account_type;
let upbit_symbol = if let Some(raw) = symbol.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &symbol, account_type)
.unwrap_or_else(|_| format!("{}-{}", symbol.quote.to_uppercase(), symbol.base.to_uppercase()))
};
let mut params = HashMap::new();
params.insert("uuid".to_string(), order_id.to_string());
let response = self.delete(UpbitEndpoint::CancelOrder, params, account_type).await?;
UpbitParser::parse_order(&response, &upbit_symbol)
}
_ => Err(ExchangeError::UnsupportedOperation(
format!("{:?} cancel scope not supported on {:?}", req.scope, self.exchange_id())
)),
}
}
async fn get_order(&self, symbol: &str, order_id: &str, account_type: AccountType) -> ExchangeResult<Order> {
let parts: Vec<&str> = symbol.split('/').collect();
let sym = if parts.len() == 2 {
crate::core::Symbol::new(parts[0], parts[1])
} else {
crate::core::Symbol { base: symbol.to_string(), quote: String::new(), raw: Some(symbol.to_string()) }
};
let upbit_symbol = if let Some(raw) = sym.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &sym, account_type)
.unwrap_or_else(|_| format!("{}-{}", sym.quote.to_uppercase(), sym.base.to_uppercase()))
};
let mut params = HashMap::new();
params.insert("uuid".to_string(), order_id.to_string());
let response = self.get(UpbitEndpoint::GetOrder, params, account_type).await?;
UpbitParser::parse_order(&response, &upbit_symbol)
}
async fn get_open_orders(&self, symbol: Option<&str>, account_type: AccountType) -> ExchangeResult<Vec<Order>> {
let mut params = HashMap::new();
params.insert("state".to_string(), "wait".to_string());
if let Some(s) = symbol {
let parts: Vec<&str> = s.split('/').collect();
let sym = if parts.len() == 2 {
crate::core::Symbol::new(parts[0], parts[1])
} else {
crate::core::Symbol { base: s.to_string(), quote: String::new(), raw: Some(s.to_string()) }
};
let upbit_symbol = if let Some(raw) = sym.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &sym, account_type)
.unwrap_or_else(|_| format!("{}-{}", sym.quote.to_uppercase(), sym.base.to_uppercase()))
};
params.insert("market".to_string(), upbit_symbol);
}
let response = self.get(UpbitEndpoint::ListOrders, params, account_type).await?;
UpbitParser::parse_orders(&response)
}
async fn get_user_trades(
&self,
filter: UserTradeFilter,
account_type: AccountType,
) -> ExchangeResult<Vec<UserTrade>> {
let order_id = filter.order_id.as_deref().ok_or_else(|| {
ExchangeError::UnsupportedOperation(
"Upbit requires order_id for get_user_trades (no bulk fills endpoint)".to_string(),
)
})?;
let mut params = HashMap::new();
params.insert("uuid".to_string(), order_id.to_string());
let response = self.get(UpbitEndpoint::GetOrder, params, account_type).await?;
UpbitParser::parse_order_trades(&response)
}
fn trading_capabilities(&self, _account_type: AccountType) -> TradingCapabilities {
TradingCapabilities {
has_market_order: true,
has_limit_order: true,
has_stop_market: false,
has_stop_limit: false,
has_trailing_stop: false,
has_bracket: false,
has_oco: false,
has_amend: true,
has_batch: false,
max_batch_size: None,
has_cancel_all: true,
has_user_trades: true,
has_order_history: true,
}
}
}
#[async_trait]
impl Account for UpbitConnector {
async fn get_balance(&self, query: BalanceQuery) -> ExchangeResult<Vec<Balance>> {
let asset = query.asset.clone();
let account_type = query.account_type;
let response = self.get(UpbitEndpoint::Balances, HashMap::new(), account_type).await?;
let balances = UpbitParser::parse_balances(&response)?;
if let Some(asset_name) = asset {
Ok(balances.into_iter()
.filter(|b| b.asset.eq_ignore_ascii_case(&asset_name))
.collect())
} else {
Ok(balances)
}
}
async fn get_account_info(&self, account_type: AccountType) -> ExchangeResult<AccountInfo> {
let balances = self.get_balance(BalanceQuery { asset: None, account_type }).await?;
Ok(AccountInfo {
account_type,
balances,
can_trade: true,
can_withdraw: true,
can_deposit: true,
maker_commission: 0.05, taker_commission: 0.05, })
}
async fn get_fees(&self, _symbol: Option<&str>) -> ExchangeResult<FeeInfo> {
Err(ExchangeError::UnsupportedOperation(
"Upbit does not provide a fee query API endpoint".to_string()
))
}
fn account_capabilities(&self, _account_type: AccountType) -> AccountCapabilities {
AccountCapabilities {
has_balances: true,
has_account_info: true,
has_fees: false,
has_transfers: false,
has_sub_accounts: false,
has_deposit_withdraw: true,
has_margin: false,
has_earn_staking: false,
has_funding_history: false,
has_ledger: false,
has_convert: false,
has_positions: false,
}
}
}
#[async_trait]
impl CancelAll for UpbitConnector {
async fn cancel_all_orders(
&self,
scope: CancelScope,
account_type: AccountType,
) -> ExchangeResult<CancelAllResponse> {
let mut params = HashMap::new();
match &scope {
CancelScope::All { symbol } => {
if let Some(sym) = symbol {
let upbit_symbol = if let Some(raw) = sym.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &sym, account_type)
.unwrap_or_else(|_| format!("{}-{}", sym.quote.to_uppercase(), sym.base.to_uppercase()))
};
params.insert("market".to_string(), upbit_symbol);
}
}
CancelScope::BySymbol { symbol } => {
let upbit_symbol = if let Some(raw) = symbol.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &symbol, account_type)
.unwrap_or_else(|_| format!("{}-{}", symbol.quote.to_uppercase(), symbol.base.to_uppercase()))
};
params.insert("market".to_string(), upbit_symbol);
}
_ => return Err(ExchangeError::InvalidRequest(
"cancel_all_orders requires CancelScope::All or BySymbol".to_string()
)),
}
let _response = self.delete(UpbitEndpoint::BatchCancelOrders, params, account_type).await?;
Ok(CancelAllResponse {
cancelled_count: 0, failed_count: 0,
details: vec![],
})
}
}
#[async_trait]
impl CustodialFunds for UpbitConnector {
async fn get_deposit_address(
&self,
asset: &str,
network: Option<&str>,
) -> ExchangeResult<DepositAddress> {
let mut params = HashMap::new();
params.insert("currency".to_string(), asset.to_uppercase());
if let Some(net) = network {
params.insert("net_type".to_string(), net.to_string());
}
let existing = self.get(UpbitEndpoint::ListDepositAddresses, params.clone(), AccountType::Spot).await;
if let Ok(ref response) = existing {
let addr_obj = if let Some(arr) = response.as_array() {
arr.first().cloned()
} else if response.is_object() {
Some(response.clone())
} else {
None
};
if let Some(obj) = addr_obj {
if let Some(addr) = obj.get("deposit_address").and_then(|v| v.as_str()) {
if !addr.is_empty() {
return Ok(DepositAddress {
address: addr.to_string(),
tag: obj.get("secondary_address")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
network: network.map(|n| n.to_string()),
asset: asset.to_uppercase(),
created_at: None,
});
}
}
}
}
let body = if let Some(net) = network {
serde_json::json!({
"currency": asset.to_uppercase(),
"net_type": net,
})
} else {
serde_json::json!({
"currency": asset.to_uppercase(),
})
};
let response = self.post(UpbitEndpoint::CreateDepositAddress, body, AccountType::Spot).await?;
let address = response.get("deposit_address")
.and_then(|v| v.as_str())
.unwrap_or_default();
if address.is_empty() {
return Err(ExchangeError::InvalidRequest(
"Deposit address is being generated — retry in a few seconds".to_string()
));
}
Ok(DepositAddress {
address: address.to_string(),
tag: response.get("secondary_address")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
network: network.map(|n| n.to_string()),
asset: asset.to_uppercase(),
created_at: None,
})
}
async fn withdraw(&self, req: WithdrawRequest) -> ExchangeResult<WithdrawResponse> {
let mut body = serde_json::json!({
"currency": req.asset.to_uppercase(),
"amount": req.amount.to_string(),
"address": req.address,
});
if let Some(ref net) = req.network {
body["net_type"] = serde_json::json!(net);
}
if let Some(ref tag) = req.tag {
body["secondary_address"] = serde_json::json!(tag);
}
let response = self.post(UpbitEndpoint::InitiateWithdrawal, body, AccountType::Spot).await?;
let withdraw_id = response.get("uuid")
.and_then(|v| v.as_str())
.ok_or_else(|| ExchangeError::Parse("Missing uuid in withdrawal response".to_string()))?
.to_string();
let status = response.get("state")
.and_then(|v| v.as_str())
.unwrap_or("submitted")
.to_string();
let tx_hash = response.get("txid")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
Ok(WithdrawResponse {
withdraw_id,
status,
tx_hash,
})
}
async fn get_funds_history(
&self,
filter: FundsHistoryFilter,
) -> ExchangeResult<Vec<FundsRecord>> {
let mut records = Vec::new();
let fetch_deposits = matches!(
filter.record_type,
FundsRecordType::Deposit | FundsRecordType::Both
);
let fetch_withdrawals = matches!(
filter.record_type,
FundsRecordType::Withdrawal | FundsRecordType::Both
);
if fetch_deposits {
let mut params: HashMap<String, String> = HashMap::new();
if let Some(ref asset) = filter.asset {
params.insert("currency".to_string(), asset.to_uppercase());
}
if let Some(limit) = filter.limit {
params.insert("limit".to_string(), limit.min(100u32).to_string());
}
params.insert("order_by".to_string(), "desc".to_string());
let response = self.get(UpbitEndpoint::ListDeposits, params, AccountType::Spot).await?;
if let Some(arr) = response.as_array() {
for item in arr {
let id = item.get("uuid").and_then(|v| v.as_str()).unwrap_or("").to_string();
let asset_name = item.get("currency").and_then(|v| v.as_str()).unwrap_or("").to_string();
let amount = item.get("amount").and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.or_else(|| item.get("amount").and_then(|v| v.as_f64()))
.unwrap_or(0.0);
let tx_hash = item.get("txid").and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let status = item.get("state").and_then(|v| v.as_str()).unwrap_or("").to_string();
let timestamp = item.get("created_at").and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.timestamp_millis())
.unwrap_or(0);
let network = item.get("net_type").and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
records.push(FundsRecord::Deposit {
id,
asset: asset_name,
amount,
tx_hash,
network,
status,
timestamp,
});
}
}
}
if fetch_withdrawals {
let mut params: HashMap<String, String> = HashMap::new();
if let Some(ref asset) = filter.asset {
params.insert("currency".to_string(), asset.to_uppercase());
}
if let Some(limit) = filter.limit {
params.insert("limit".to_string(), limit.min(100u32).to_string());
}
params.insert("order_by".to_string(), "desc".to_string());
let response = self.get(UpbitEndpoint::ListWithdrawals, params, AccountType::Spot).await?;
if let Some(arr) = response.as_array() {
for item in arr {
let id = item.get("uuid").and_then(|v| v.as_str()).unwrap_or("").to_string();
let asset_name = item.get("currency").and_then(|v| v.as_str()).unwrap_or("").to_string();
let amount = item.get("amount").and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.or_else(|| item.get("amount").and_then(|v| v.as_f64()))
.unwrap_or(0.0);
let fee = item.get("fee").and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.or_else(|| item.get("fee").and_then(|v| v.as_f64()));
let address = item.get("address").and_then(|v| v.as_str()).unwrap_or("").to_string();
let tag = item.get("secondary_address").and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let tx_hash = item.get("txid").and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let status = item.get("state").and_then(|v| v.as_str()).unwrap_or("").to_string();
let timestamp = item.get("created_at").and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.timestamp_millis())
.unwrap_or(0);
let network = item.get("net_type").and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
records.push(FundsRecord::Withdrawal {
id,
asset: asset_name,
amount,
fee,
address,
tag,
tx_hash,
network,
status,
timestamp,
});
}
}
}
Ok(records)
}
}
#[async_trait]
impl AmendOrder for UpbitConnector {
async fn amend_order(&self, req: AmendRequest) -> ExchangeResult<Order> {
let symbol = &req.symbol;
let account_type = AccountType::Spot;
let upbit_symbol = if let Some(raw) = symbol.raw() {
raw.to_string()
} else {
SymbolNormalizer::to_exchange(ExchangeId::Upbit, &symbol, account_type)
.unwrap_or_else(|_| format!("{}-{}", symbol.quote.to_uppercase(), symbol.base.to_uppercase()))
};
let mut body = serde_json::json!({
"cancel_uuid": req.order_id,
"market": upbit_symbol,
});
if req.fields.price.is_none() && req.fields.quantity.is_none() {
return Err(ExchangeError::InvalidRequest(
"AmendOrder requires at least one of: price, quantity".to_string(),
));
}
if let Some(new_price) = req.fields.price {
body["price"] = serde_json::json!(self.precision.price(&upbit_symbol, new_price));
}
if let Some(new_qty) = req.fields.quantity {
body["volume"] = serde_json::json!(self.precision.qty(&upbit_symbol, new_qty));
}
if body.get("price").is_some() {
body["ord_type"] = serde_json::json!("limit");
}
let response = self.post(UpbitEndpoint::ReplaceOrder, body, account_type).await?;
let new_order = response.get("new_order").unwrap_or(&response);
UpbitParser::parse_order(new_order, &upbit_symbol)
}
}
impl UpbitConnector {
pub async fn get_order_chance(&self, market: &str) -> ExchangeResult<Value> {
let mut params = std::collections::HashMap::new();
params.insert("market".to_string(), market.to_string());
self.get(UpbitEndpoint::OrderChance, params, AccountType::Spot).await
}
pub async fn list_open_orders_paginated(
&self,
market: Option<&str>,
page: Option<u32>,
limit: Option<u32>,
) -> ExchangeResult<Value> {
let mut params = std::collections::HashMap::new();
if let Some(m) = market {
params.insert("market".to_string(), m.to_string());
}
if let Some(p) = page {
params.insert("page".to_string(), p.to_string());
}
if let Some(l) = limit {
params.insert("limit".to_string(), l.to_string());
}
self.get(UpbitEndpoint::OpenOrders, params, AccountType::Spot).await
}
pub async fn get_wallet_status(&self, currency: Option<&str>) -> ExchangeResult<Value> {
let mut params = std::collections::HashMap::new();
if let Some(c) = currency {
params.insert("currency".to_string(), c.to_string());
}
self.get(UpbitEndpoint::WalletStatus, params, AccountType::Spot).await
}
pub async fn withdraw_krw(
&self,
amount: f64,
two_factor_type: &str,
) -> ExchangeResult<Value> {
let body = json!({
"amount": amount.to_string(),
"two_factor_type": two_factor_type,
});
self.post(UpbitEndpoint::WithdrawKrw, body, AccountType::Spot).await
}
pub async fn cancel_withdraw(&self, uuid: &str) -> ExchangeResult<Value> {
let mut params = std::collections::HashMap::new();
params.insert("uuid".to_string(), uuid.to_string());
self.delete(UpbitEndpoint::CancelWithdraw, params, AccountType::Spot).await
}
}
#[async_trait::async_trait]
impl crate::core::traits::Positions for UpbitConnector {
async fn get_positions(
&self,
_query: crate::core::types::PositionQuery,
) -> crate::core::types::ExchangeResult<Vec<crate::core::types::Position>> {
Err(crate::core::types::ExchangeError::UnsupportedOperation(
"Upbit is spot-only: no positions".into(),
))
}
async fn get_funding_rate(
&self,
_symbol: &str,
_account_type: crate::core::types::AccountType,
) -> crate::core::types::ExchangeResult<crate::core::types::FundingRate> {
Err(crate::core::types::ExchangeError::UnsupportedOperation(
"Upbit is spot-only: no funding rate".into(),
))
}
async fn modify_position(
&self,
_req: crate::core::types::PositionModification,
) -> crate::core::types::ExchangeResult<()> {
Err(crate::core::types::ExchangeError::UnsupportedOperation(
"Upbit is spot-only: no position modification".into(),
))
}
}
#[async_trait]
impl MarketDataPublic for UpbitConnector {
async fn get_recent_trades(
&self,
symbol: SymbolInput<'_>,
limit: Option<u32>,
account_type: AccountType,
) -> ExchangeResult<Vec<PublicTrade>> {
let sym = symbol.resolve(ExchangeId::Upbit, account_type)?;
let mut params = HashMap::new();
params.insert("market".to_string(), sym.to_string());
if let Some(l) = limit {
params.insert("count".to_string(), l.to_string());
}
let raw = self.get(UpbitEndpoint::RecentTrades, params, account_type).await?;
let arr = raw.as_array().ok_or_else(|| {
ExchangeError::Parse("get_recent_trades: expected array".into())
})?;
let mut result = Vec::with_capacity(arr.len());
for item in arr {
let parse_f64 = |key: &str| -> f64 {
item.get(key).and_then(|v| v.as_f64()).unwrap_or(0.0)
};
let ask_bid = item.get("ask_bid").and_then(|v| v.as_str()).unwrap_or("BID");
let side = if ask_bid.eq_ignore_ascii_case("ASK") { TradeSide::Sell } else { TradeSide::Buy };
let date_str = item.get("trade_date_utc").and_then(|v| v.as_str()).unwrap_or("1970-01-01");
let time_str = item.get("trade_time_utc").and_then(|v| v.as_str()).unwrap_or("00:00:00");
let dt_str = format!("{} {}", date_str, time_str);
let timestamp = {
use chrono::{NaiveDateTime, TimeZone, Utc};
NaiveDateTime::parse_from_str(&dt_str, "%Y-%m-%d %H:%M:%S")
.ok()
.map(|ndt| Utc.from_utc_datetime(&ndt).timestamp_millis())
.unwrap_or(0)
};
let seq = item.get("sequential_id").and_then(|v| v.as_i64()).map(|id| id.to_string()).unwrap_or_default();
result.push(PublicTrade {
id: seq,
price: parse_f64("trade_price"),
quantity: parse_f64("trade_volume"),
side,
timestamp,
});
}
Ok(result)
}
}
impl crate::core::traits::HasCapabilities for UpbitConnector {
fn capabilities(&self) -> crate::core::types::ConnectorCapabilities {
crate::core::types::ConnectorCapabilities {
has_ticker: true, has_orderbook: true, has_klines: true,
has_recent_trades: true, has_exchange_info: true,
has_liquidation_history: false, has_open_interest_history: false,
has_premium_index: false, has_long_short_ratio_history: false,
has_funding_rate_history: false, has_mark_price_klines: false,
has_index_price_klines: false,
has_market_order: true, has_limit_order: true,
has_open_orders: true, has_order_history: true, has_user_trades: true,
has_positions: false, has_mark_price: false, has_modify_position: false,
has_closed_pnl: false, has_long_short_ratio: false,
has_cancel_all: true, has_amend_order: true,
has_batch_place: false, has_batch_cancel: false,
max_batch_place_size: 0, max_batch_cancel_size: 0,
has_balance: true, has_account_info: true, has_fees: true,
has_transfers: false, has_deposit_withdraw: true, has_sub_accounts: false,
has_funding_payments: false, has_ledger: false,
has_websocket: true, has_ws_klines: false, has_ws_trades: true,
has_ws_orderbook: true, has_ws_ticker: true,
has_ws_mark_price: false, has_ws_funding_rate: false,
validation: self.validation_status(),
}
}
fn validation_status(&self) -> Option<&'static crate::core::types::ValidationStamp> {
crate::core::utils::validation_snapshot::validation_for(crate::core::types::ExchangeId::Upbit)
}
}