use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use serde_json::{json, Value};
use crate::core::{
HttpClient, Credentials,
ExchangeId, AccountType, Symbol,
ExchangeError, ExchangeResult,
Price, Kline, Ticker, OrderBook,
Order, OrderSide, OrderType, Balance, AccountInfo,
Position, FundingRate,
OrderRequest, CancelRequest, CancelScope,
BalanceQuery, PositionQuery, PositionModification,
OrderHistoryFilter, PlaceOrderResponse, FeeInfo,
CancelAllResponse, CancelAll, CustodialFunds,
DepositAddress, WithdrawResponse, FundsRecord,
};
use crate::core::types::SymbolInfo;
use crate::core::traits::{
ExchangeIdentity, MarketData, Trading, Account, Positions,
};
use crate::core::{MarketDataCapabilities, TradingCapabilities, AccountCapabilities};
use crate::core::types::ConnectorStats;
use crate::core::types::{WithdrawRequest, FundsHistoryFilter, FundsRecordType};
use crate::core::types::{UserTrade, UserTradeFilter};
use crate::core::utils::{RuntimeLimiter, RateLimitMonitor, RateLimitPressure};
use crate::core::types::{RateLimitCapabilities, LimitModel, RestLimitPool, WsLimits, EndpointWeight, OrderbookCapabilities, WsBookChannel};
use crate::core::utils::PrecisionCache;
use super::endpoints::{GeminiUrls, GeminiEndpoint, format_symbol, normalize_symbol, map_kline_interval};
use super::auth::GeminiAuth;
use super::parser::GeminiParser;
static GEMINI_POOLS: &[RestLimitPool] = &[
RestLimitPool {
name: "public",
max_budget: 120,
window_seconds: 60,
is_weight: false,
has_server_headers: false,
server_header: None,
header_reports_used: false,
},
RestLimitPool {
name: "private",
max_budget: 600,
window_seconds: 60,
is_weight: false,
has_server_headers: false,
server_header: None,
header_reports_used: false,
},
];
static GEMINI_RATE_CAPS: RateLimitCapabilities = RateLimitCapabilities {
model: LimitModel::Group,
rest_pools: GEMINI_POOLS,
decaying: None,
endpoint_weights: &[] as &[EndpointWeight],
ws: WsLimits {
max_connections: None,
max_subs_per_conn: None,
max_msg_per_sec: None,
max_streams_per_conn: None,
},
};
pub struct GeminiConnector {
http: HttpClient,
auth: Option<GeminiAuth>,
urls: GeminiUrls,
testnet: bool,
limiter: Arc<Mutex<RuntimeLimiter>>,
monitor: Arc<Mutex<RateLimitMonitor>>,
precision: PrecisionCache,
}
impl GeminiConnector {
pub async fn new(credentials: Option<Credentials>, testnet: bool) -> ExchangeResult<Self> {
let urls = if testnet {
GeminiUrls::TESTNET
} else {
GeminiUrls::MAINNET
};
let http = HttpClient::new(30_000)?;
let auth = credentials
.as_ref()
.map(GeminiAuth::new)
.transpose()?;
let limiter = Arc::new(Mutex::new(RuntimeLimiter::from_caps(&GEMINI_RATE_CAPS)));
let monitor = Arc::new(Mutex::new(RateLimitMonitor::new("Gemini")));
Ok(Self {
http,
auth,
urls,
testnet,
limiter,
monitor,
precision: PrecisionCache::new(),
})
}
pub async fn public(testnet: bool) -> ExchangeResult<Self> {
Self::new(None, testnet).await
}
async fn rate_limit_wait(&self, is_private: bool) -> bool {
let group = if is_private { "private" } else { "public" };
let essential = is_private;
loop {
let wait_time = {
let mut limiter = self.limiter.lock().expect("limiter poisoned");
let pressure = self.monitor.lock().expect("monitor poisoned").check(&mut limiter);
if pressure >= RateLimitPressure::Cutoff && !essential {
return false;
}
if limiter.try_acquire(group, 1) {
return true;
}
limiter.time_until_ready(group, 1)
};
if wait_time > Duration::ZERO {
tokio::time::sleep(wait_time).await;
}
}
}
async fn get(
&self,
endpoint: GeminiEndpoint,
path_params: &[(&str, &str)],
) -> ExchangeResult<Value> {
if !self.rate_limit_wait(endpoint.requires_auth()).await {
return Err(ExchangeError::RateLimitExceeded {
retry_after: None,
message: "Rate limit budget >= 90% used; market data request dropped".to_string(),
});
}
let base_url = self.urls.rest_url(AccountType::Spot);
let mut path = endpoint.path().to_string();
for (key, value) in path_params {
path = path.replace(&format!("{{{}}}", key), value);
}
let url = format!("{}{}", base_url, path);
let response = self.http.get(&url, &HashMap::new()).await?;
GeminiParser::check_error(&response)?;
Ok(response)
}
async fn post(
&self,
endpoint: GeminiEndpoint,
params: HashMap<String, Value>,
path_params: &[(&str, &str)],
) -> ExchangeResult<Value> {
self.rate_limit_wait(true).await;
let base_url = self.urls.rest_url(AccountType::Spot);
let mut path = endpoint.path().to_string();
for (key, value) in path_params {
path = path.replace(&format!("{{{}}}", key), value);
}
let url = format!("{}{}", base_url, path);
let auth = self.auth.as_ref()
.ok_or_else(|| ExchangeError::Auth("Authentication required".to_string()))?;
let headers = auth.sign_request(&path, params)?;
let response = self.http.post(&url, &json!({}), &headers).await?;
GeminiParser::check_error(&response)?;
Ok(response)
}
}
impl ExchangeIdentity for GeminiConnector {
fn exchange_id(&self) -> ExchangeId {
ExchangeId::Gemini
}
fn metrics(&self) -> ConnectorStats {
let (http_requests, http_errors, last_latency_ms) = self.http.stats();
let (rate_used, rate_max, rate_groups) = if let Ok(mut limiter) = self.limiter.lock() {
let (used, max) = limiter.primary_stats();
let groups = limiter.group_stats();
(used, max, groups)
} else {
(0, 0, Vec::new())
};
ConnectorStats {
http_requests,
http_errors,
last_latency_ms,
rate_used,
rate_max,
rate_groups,
ws_ping_rtt_ms: 0,
}
}
fn rate_limit_capabilities(&self) -> RateLimitCapabilities {
GEMINI_RATE_CAPS
}
fn is_testnet(&self) -> bool {
self.testnet
}
fn supported_account_types(&self) -> Vec<AccountType> {
vec![
AccountType::Spot,
AccountType::FuturesCross,
]
}
fn orderbook_capabilities(&self, _account_type: AccountType) -> OrderbookCapabilities {
static GEMINI_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::snapshot("depth5", 5, 1000),
WsBookChannel::snapshot("depth10", 10, 1000),
WsBookChannel::snapshot("depth20", 20, 1000),
WsBookChannel::snapshot("depth5@100ms", 5, 100),
WsBookChannel::snapshot("depth10@100ms", 10, 100),
WsBookChannel::snapshot("depth20@100ms", 20, 100),
WsBookChannel::delta("depth", None, Some(1000)),
WsBookChannel::delta("depth@100ms", None, Some(100)),
];
OrderbookCapabilities {
ws_depths: &[5, 10, 20],
ws_default_depth: Some(20),
rest_max_depth: None,
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[100, 1000],
default_speed_ms: Some(1000),
ws_channels: GEMINI_CHANNELS,
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
}
}
}
#[async_trait]
impl MarketData for GeminiConnector {
async fn get_price(&self, symbol: Symbol, account_type: AccountType) -> ExchangeResult<Price> {
let symbol_str = normalize_symbol(&format_symbol(&symbol.base, &symbol.quote, account_type));
let response = self.get(
GeminiEndpoint::Ticker,
&[("symbol", &symbol_str)],
).await?;
let ticker = GeminiParser::parse_ticker(&response, &symbol_str)?;
Ok(ticker.last_price)
}
async fn get_ticker(&self, symbol: Symbol, account_type: AccountType) -> ExchangeResult<Ticker> {
let symbol_str = normalize_symbol(&format_symbol(&symbol.base, &symbol.quote, account_type));
let response = self.get(
GeminiEndpoint::TickerV2,
&[("symbol", &symbol_str)],
).await?;
GeminiParser::parse_ticker(&response, &symbol_str)
}
async fn get_orderbook(
&self,
symbol: Symbol,
_depth: Option<u16>,
account_type: AccountType,
) -> ExchangeResult<OrderBook> {
let symbol_str = normalize_symbol(&format_symbol(&symbol.base, &symbol.quote, account_type));
let response = self.get(
GeminiEndpoint::OrderBook,
&[("symbol", &symbol_str)],
).await?;
GeminiParser::parse_orderbook(&response)
}
async fn get_klines(
&self,
symbol: Symbol,
interval: &str,
_limit: Option<u16>,
account_type: AccountType,
_end_time: Option<i64>,
) -> ExchangeResult<Vec<Kline>> {
let symbol_str = normalize_symbol(&format_symbol(&symbol.base, &symbol.quote, account_type));
let time_frame = map_kline_interval(interval);
let endpoint = if matches!(account_type, AccountType::FuturesCross | AccountType::FuturesIsolated) {
GeminiEndpoint::DerivativeCandles
} else {
GeminiEndpoint::Candles
};
let response = self.get(
endpoint,
&[("symbol", &symbol_str), ("time_frame", time_frame)],
).await?;
GeminiParser::parse_candles(&response)
}
async fn ping(&self) -> ExchangeResult<()> {
self.get(GeminiEndpoint::Symbols, &[]).await?;
Ok(())
}
async fn get_exchange_info(&self, account_type: AccountType) -> ExchangeResult<Vec<SymbolInfo>> {
let symbols_response = self.get(GeminiEndpoint::Symbols, &[]).await?;
let symbols = GeminiParser::parse_symbols(&symbols_response)?;
let mut result = Vec::with_capacity(symbols.len());
for symbol_lower in &symbols {
if !symbol_lower.chars().all(|c| c.is_alphabetic()) {
continue;
}
match self.get(GeminiEndpoint::SymbolDetails, &[("symbol", symbol_lower)]).await {
Ok(details) => {
if let Some(info) = GeminiParser::parse_symbol_details(&details, symbol_lower, account_type) {
result.push(info);
}
}
Err(_) => continue, }
}
self.precision.load_from_symbols(&result);
Ok(result)
}
fn market_data_capabilities(&self, account_type: AccountType) -> MarketDataCapabilities {
let is_futures = !matches!(account_type, AccountType::Spot | AccountType::Margin);
MarketDataCapabilities {
has_ping: true,
has_price: true,
has_ticker: true,
has_orderbook: true,
has_klines: true,
has_exchange_info: !is_futures,
has_recent_trades: false,
supported_intervals: &["1m", "5m", "15m", "30m", "1h", "6h", "1d"],
max_kline_limit: None,
has_ws_ticker: true,
has_ws_trades: true,
has_ws_orderbook: true,
has_ws_klines: true,
}
}
}
#[async_trait]
impl Trading for GeminiConnector {
async fn place_order(&self, req: OrderRequest) -> ExchangeResult<PlaceOrderResponse> {
let symbol = req.symbol.clone();
let side = req.side;
let quantity = req.quantity;
let account_type = req.account_type;
let symbol_str = normalize_symbol(&format_symbol(&symbol.base, &symbol.quote, account_type));
match req.order_type {
OrderType::Market => {
let mut params = HashMap::new();
params.insert("symbol".to_string(), json!(symbol_str));
params.insert("amount".to_string(), json!(self.precision.qty(&symbol_str, quantity)));
params.insert("side".to_string(), json!(match side {
OrderSide::Buy => "buy",
OrderSide::Sell => "sell",
}));
params.insert("type".to_string(), json!("exchange market"));
let response = self.post(GeminiEndpoint::NewOrder, params, &[]).await?;
GeminiParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::Limit { price } => {
let mut params = HashMap::new();
params.insert("symbol".to_string(), json!(symbol_str));
params.insert("amount".to_string(), json!(self.precision.qty(&symbol_str, quantity)));
params.insert("price".to_string(), json!(self.precision.price(&symbol_str, price)));
params.insert("side".to_string(), json!(match side {
OrderSide::Buy => "buy",
OrderSide::Sell => "sell",
}));
params.insert("type".to_string(), json!("exchange limit"));
let response = self.post(GeminiEndpoint::NewOrder, params, &[]).await?;
GeminiParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::StopLimit { stop_price, limit_price } => {
let mut params = HashMap::new();
params.insert("symbol".to_string(), json!(symbol_str));
params.insert("amount".to_string(), json!(self.precision.qty(&symbol_str, quantity)));
params.insert("price".to_string(), json!(self.precision.price(&symbol_str, limit_price)));
params.insert("stop_price".to_string(), json!(self.precision.price(&symbol_str, stop_price)));
params.insert("side".to_string(), json!(match side {
OrderSide::Buy => "buy",
OrderSide::Sell => "sell",
}));
params.insert("type".to_string(), json!("exchange stop limit"));
let response = self.post(GeminiEndpoint::NewOrder, params, &[]).await?;
GeminiParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::PostOnly { price } => {
let mut params = HashMap::new();
params.insert("symbol".to_string(), json!(symbol_str));
params.insert("amount".to_string(), json!(self.precision.qty(&symbol_str, quantity)));
params.insert("price".to_string(), json!(self.precision.price(&symbol_str, price)));
params.insert("side".to_string(), json!(match side {
OrderSide::Buy => "buy",
OrderSide::Sell => "sell",
}));
params.insert("type".to_string(), json!("exchange limit"));
params.insert("options".to_string(), json!(["maker-or-cancel"]));
let response = self.post(GeminiEndpoint::NewOrder, params, &[]).await?;
GeminiParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::Ioc { price } => {
let limit_price = price.unwrap_or(0.0);
let mut params = HashMap::new();
params.insert("symbol".to_string(), json!(symbol_str));
params.insert("amount".to_string(), json!(self.precision.qty(&symbol_str, quantity)));
params.insert("price".to_string(), json!(self.precision.price(&symbol_str, limit_price)));
params.insert("side".to_string(), json!(match side {
OrderSide::Buy => "buy",
OrderSide::Sell => "sell",
}));
params.insert("type".to_string(), json!("exchange limit"));
params.insert("options".to_string(), json!(["immediate-or-cancel"]));
let response = self.post(GeminiEndpoint::NewOrder, params, &[]).await?;
GeminiParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
OrderType::Fok { price } => {
let mut params = HashMap::new();
params.insert("symbol".to_string(), json!(symbol_str));
params.insert("amount".to_string(), json!(self.precision.qty(&symbol_str, quantity)));
params.insert("price".to_string(), json!(self.precision.price(&symbol_str, price)));
params.insert("side".to_string(), json!(match side {
OrderSide::Buy => "buy",
OrderSide::Sell => "sell",
}));
params.insert("type".to_string(), json!("exchange limit"));
params.insert("options".to_string(), json!(["fill-or-kill"]));
let response = self.post(GeminiEndpoint::NewOrder, params, &[]).await?;
GeminiParser::parse_order(&response).map(PlaceOrderResponse::Simple)
}
_ => Err(ExchangeError::UnsupportedOperation(
format!("{:?} order type not supported on {:?}", req.order_type, self.exchange_id())
)),
}
}
async fn get_order_history(
&self,
filter: OrderHistoryFilter,
account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
let mut params = HashMap::new();
if let Some(ref symbol) = filter.symbol {
let symbol_str = normalize_symbol(&format_symbol(&symbol.base, &symbol.quote, account_type));
params.insert("symbol".to_string(), json!(symbol_str));
}
let limit = filter.limit.unwrap_or(50).min(500);
params.insert("limit_trades".to_string(), json!(limit));
if let Some(since) = filter.start_time {
params.insert("timestamp".to_string(), json!(since / 1000)); }
let response = self.post(GeminiEndpoint::PastTrades, params, &[]).await?;
GeminiParser::parse_past_trades(&response)
}
async fn cancel_order(&self, req: CancelRequest) -> ExchangeResult<Order> {
match req.scope {
CancelScope::Single { ref order_id } => {
let mut params = HashMap::new();
params.insert("order_id".to_string(), json!(order_id.parse::<i64>().unwrap_or(0)));
let response = self.post(GeminiEndpoint::CancelOrder, params, &[]).await?;
GeminiParser::parse_order(&response)
}
_ => Err(ExchangeError::UnsupportedOperation(
format!("{:?} cancel scope not supported on {:?}", req.scope, self.exchange_id())
)),
}
}
async fn get_order(
&self,
_symbol: &str,
order_id: &str,
_account_type: AccountType,
) -> ExchangeResult<Order> {
let mut params = HashMap::new();
params.insert("order_id".to_string(), json!(order_id.parse::<i64>().unwrap_or(0)));
let response = self.post(GeminiEndpoint::OrderStatus, params, &[]).await?;
GeminiParser::parse_order(&response)
}
async fn get_open_orders(
&self,
_symbol: Option<&str>,
_account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
let response = self.post(GeminiEndpoint::ActiveOrders, HashMap::new(), &[]).await?;
GeminiParser::parse_orders(&response)
}
async fn get_user_trades(
&self,
filter: UserTradeFilter,
account_type: AccountType,
) -> ExchangeResult<Vec<UserTrade>> {
let mut params = HashMap::new();
if let Some(ref symbol_str) = filter.symbol {
let sym_normalized = if symbol_str.contains('/') {
let parts: Vec<&str> = symbol_str.splitn(2, '/').collect();
normalize_symbol(&format_symbol(parts[0], parts.get(1).unwrap_or(&"USD"), account_type))
} else {
normalize_symbol(symbol_str)
};
params.insert("symbol".to_string(), json!(sym_normalized));
}
let limit = filter.limit.unwrap_or(50).min(500);
params.insert("limit_trades".to_string(), json!(limit));
if let Some(st) = filter.start_time {
params.insert("timestamp".to_string(), json!(st / 1000));
}
let response = self.post(GeminiEndpoint::PastTrades, params, &[]).await?;
GeminiParser::parse_user_trades(&response, filter.end_time)
}
fn trading_capabilities(&self, account_type: AccountType) -> TradingCapabilities {
let is_futures = !matches!(account_type, AccountType::Spot | AccountType::Margin);
TradingCapabilities {
has_market_order: true,
has_limit_order: true,
has_stop_market: false,
has_stop_limit: !is_futures,
has_trailing_stop: false,
has_bracket: false,
has_oco: false,
has_amend: false,
has_batch: false,
max_batch_size: None,
has_cancel_all: true,
has_user_trades: true,
has_order_history: true,
}
}
}
#[async_trait]
impl Account for GeminiConnector {
async fn get_balance(&self, _query: BalanceQuery) -> ExchangeResult<Vec<Balance>> {
let response = self.post(GeminiEndpoint::Balances, HashMap::new(), &[]).await?;
GeminiParser::parse_balances(&response)
}
async fn get_account_info(&self, _account_type: AccountType) -> ExchangeResult<AccountInfo> {
Ok(AccountInfo {
account_type: _account_type,
can_trade: true,
can_withdraw: true,
can_deposit: true,
maker_commission: 0.0,
taker_commission: 0.0,
balances: vec![],
})
}
async fn get_fees(&self, symbol: Option<&str>) -> ExchangeResult<FeeInfo> {
let response = self.post(GeminiEndpoint::NotionalVolume, HashMap::new(), &[]).await?;
GeminiParser::parse_notional_volume_fees(&response, symbol)
}
fn account_capabilities(&self, account_type: AccountType) -> AccountCapabilities {
let is_futures = !matches!(account_type, AccountType::Spot | AccountType::Margin);
AccountCapabilities {
has_balances: true,
has_account_info: false,
has_fees: !is_futures,
has_transfers: false,
has_sub_accounts: false,
has_deposit_withdraw: !is_futures,
has_margin: false,
has_earn_staking: false,
has_funding_history: is_futures,
has_ledger: false,
has_convert: false,
has_positions: is_futures,
}
}
}
#[async_trait]
impl Positions for GeminiConnector {
async fn get_positions(&self, _query: PositionQuery) -> ExchangeResult<Vec<Position>> {
let response = self.post(GeminiEndpoint::Positions, HashMap::new(), &[]).await?;
GeminiParser::parse_positions(&response)
}
async fn get_funding_rate(
&self,
symbol: &str,
_account_type: AccountType,
) -> ExchangeResult<FundingRate> {
let symbol_parts: Vec<&str> = symbol.split('/').collect();
let sym = if symbol_parts.len() == 2 {
crate::core::Symbol::new(symbol_parts[0], symbol_parts[1])
} else {
crate::core::Symbol { base: symbol.to_string(), quote: String::new(), raw: Some(symbol.to_string()) }
};
let symbol_str = normalize_symbol(&format_symbol(&sym.base, &sym.quote, AccountType::FuturesCross));
let response = self.get(
GeminiEndpoint::FundingAmount,
&[("symbol", &symbol_str)],
).await?;
GeminiParser::parse_funding_rate(&response)
}
async fn modify_position(&self, req: PositionModification) -> ExchangeResult<()> {
match req {
PositionModification::SetLeverage { .. } => {
Err(ExchangeError::NotSupported("Set leverage not supported by Gemini".to_string()))
}
_ => Err(ExchangeError::UnsupportedOperation(
format!("{:?} not supported on {:?}", req, self.exchange_id())
)),
}
}
}
#[async_trait]
impl CancelAll for GeminiConnector {
async fn cancel_all_orders(
&self,
_scope: CancelScope,
_account_type: AccountType,
) -> ExchangeResult<CancelAllResponse> {
let response = self.post(GeminiEndpoint::CancelAllOrders, HashMap::new(), &[]).await?;
let cancelled_count = response
.get("details")
.and_then(|d| d.get("cancelledOrders"))
.and_then(|arr| arr.as_array())
.map(|arr| arr.len() as u32)
.unwrap_or(0);
let failed_count = response
.get("details")
.and_then(|d| d.get("cancelRejects"))
.and_then(|arr| arr.as_array())
.map(|arr| arr.len() as u32)
.unwrap_or(0);
Ok(CancelAllResponse {
cancelled_count,
failed_count,
details: vec![],
})
}
}
#[async_trait]
impl CustodialFunds for GeminiConnector {
async fn get_deposit_address(
&self,
asset: &str,
network: Option<&str>,
) -> ExchangeResult<DepositAddress> {
let currency = network.unwrap_or(asset).to_lowercase();
let params = HashMap::new();
let response = self.post(
GeminiEndpoint::NewDepositAddress,
params,
&[("network", ¤cy)],
).await?;
let address = response.get("address")
.and_then(|v| v.as_str())
.ok_or_else(|| ExchangeError::Parse("Missing address in deposit address response".to_string()))?
.to_string();
let created_at = response.get("timestamp")
.and_then(|v| v.as_i64());
Ok(DepositAddress {
address,
tag: None, network: Some(currency),
asset: asset.to_string(),
created_at,
})
}
async fn withdraw(&self, req: WithdrawRequest) -> ExchangeResult<WithdrawResponse> {
let currency = req.asset.to_lowercase();
let mut params = HashMap::new();
params.insert("address".to_string(), json!(req.address));
params.insert("amount".to_string(), json!(req.amount.to_string()));
let response = self.post(
GeminiEndpoint::Withdraw,
params,
&[("currency", ¤cy)],
).await?;
let withdraw_id = response.get("withdrawalId")
.or_else(|| response.get("id"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tx_hash = response.get("txHash")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
Ok(WithdrawResponse {
withdraw_id,
status: "Pending".to_string(),
tx_hash,
})
}
async fn get_funds_history(
&self,
filter: FundsHistoryFilter,
) -> ExchangeResult<Vec<FundsRecord>> {
let mut params = HashMap::new();
if let Some(limit) = filter.limit {
params.insert("limit_transfers".to_string(), json!(limit.min(50u32)));
}
if let Some(start) = filter.start_time {
params.insert("timestamp".to_string(), json!(start / 1000));
}
let response = self.post(GeminiEndpoint::Transfers, params, &[]).await?;
let records = if let Some(arr) = response.as_array() {
arr.iter().filter_map(|item| {
let obj = item.as_object()?;
let transfer_type = obj.get("type")?.as_str()?;
let currency = obj.get("currency")?.as_str().unwrap_or("").to_uppercase();
if let Some(ref asset_filter) = filter.asset {
if !currency.eq_ignore_ascii_case(asset_filter) {
return None;
}
}
let id = obj.get("eid").and_then(|v| v.as_i64()).map(|v| v.to_string())
.or_else(|| obj.get("eventId").and_then(|v| v.as_str()).map(|s| s.to_string()))
.unwrap_or_default();
let amount_str = obj.get("amount").and_then(|v| v.as_str()).unwrap_or("0");
let amount = amount_str.parse::<f64>().unwrap_or(0.0);
let timestamp = obj.get("timestampms").and_then(|v| v.as_i64()).unwrap_or(0);
let status = obj.get("status").and_then(|v| v.as_str()).unwrap_or("Unknown").to_string();
let tx_hash = obj.get("txHash")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
match transfer_type {
"Deposit" | "deposit" => {
if matches!(filter.record_type, FundsRecordType::Deposit | FundsRecordType::Both) {
Some(FundsRecord::Deposit {
id,
asset: currency,
amount,
tx_hash,
network: None,
status,
timestamp,
})
} else {
None
}
}
"Withdrawal" | "withdrawal" => {
if matches!(filter.record_type, FundsRecordType::Withdrawal | FundsRecordType::Both) {
let address = obj.get("destination")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let fee_str = obj.get("feeAmount").and_then(|v| v.as_str()).unwrap_or("0");
let fee = fee_str.parse::<f64>().ok().filter(|&f| f > 0.0);
Some(FundsRecord::Withdrawal {
id,
asset: currency,
amount,
fee,
address,
tag: None,
tx_hash,
network: None,
status,
timestamp,
})
} else {
None
}
}
_ => None,
}
}).collect()
} else {
vec![]
};
Ok(records)
}
}
impl GeminiConnector {
pub async fn get_symbols(&self) -> ExchangeResult<Vec<String>> {
let response = self.get(GeminiEndpoint::Symbols, &[]).await?;
GeminiParser::parse_symbols(&response)
}
pub async fn get_notional_volume(&self) -> ExchangeResult<Value> {
self.post(GeminiEndpoint::NotionalVolume, HashMap::new(), &[]).await
}
pub async fn get_funding_payments(
&self,
since: Option<i64>,
to: Option<i64>,
) -> ExchangeResult<Value> {
let mut params = HashMap::new();
if let Some(s) = since {
params.insert("since".to_string(), json!(s));
}
if let Some(t) = to {
params.insert("to".to_string(), json!(t));
}
self.post(GeminiEndpoint::FundingPayments, params, &[]).await
}
pub async fn get_margin_info(&self) -> ExchangeResult<Value> {
self.post(GeminiEndpoint::MarginAccount, HashMap::new(), &[]).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_connector_creation() {
let connector = GeminiConnector::public(false).await.unwrap();
assert_eq!(connector.exchange_id(), ExchangeId::Gemini);
assert!(!connector.is_testnet());
}
#[test]
fn test_format_symbol() {
let symbol = format_symbol("BTC", "USD", AccountType::Spot);
assert_eq!(symbol, "btcusd");
let symbol = format_symbol("ETH", "USD", AccountType::FuturesCross);
assert_eq!(symbol, "ethgusdperp");
}
}