use async_trait::async_trait;
use reqwest::Client;
use std::collections::HashMap;
use std::sync::Mutex;
use crate::core::types::*;
use crate::core::traits::*;
use super::endpoints::*;
use super::auth::*;
use super::parser::*;
pub struct JQuantsConnector {
client: Client,
auth: Mutex<JQuantsAuth>, urls: JQuantsUrls,
}
impl JQuantsConnector {
pub fn new(refresh_token: impl Into<String>) -> Self {
Self {
client: Client::new(),
auth: Mutex::new(JQuantsAuth::new(refresh_token)),
urls: JQuantsUrls::default(),
}
}
pub fn from_env() -> Self {
Self {
client: Client::new(),
auth: Mutex::new(JQuantsAuth::from_env()),
urls: JQuantsUrls::default(),
}
}
async fn ensure_id_token(&self) -> ExchangeResult<String> {
if let Some(token) = self.auth.lock().expect("Mutex poisoned").get_cached_id_token() {
return Ok(token.to_string());
}
let refresh_token = self.auth.lock().expect("Mutex poisoned").refresh_token().to_string();
if refresh_token.is_empty() {
return Err(ExchangeError::Auth(
"Missing refresh token. Set JQUANTS_REFRESH_TOKEN env var.".to_string()
));
}
let url = format!(
"{}{}?refreshtoken={}",
self.urls.rest_base,
JQuantsEndpoint::AuthRefresh.path(),
refresh_token
);
let response = self.client
.post(&url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!("Failed to get ID token: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let text = response.text().await.unwrap_or_default();
return Err(ExchangeError::Auth(format!(
"Failed to get ID token: HTTP {} - {}",
status, text
)));
}
let json: serde_json::Value = response.json().await
.map_err(|e| ExchangeError::Parse(format!("Failed to parse ID token response: {}", e)))?;
let id_token = JQuantsParser::parse_id_token(&json)?;
self.auth.lock().expect("Mutex poisoned").cache_id_token(id_token.clone());
Ok(id_token)
}
async fn get(
&self,
endpoint: JQuantsEndpoint,
params: HashMap<String, String>,
) -> ExchangeResult<serde_json::Value> {
let id_token = self.ensure_id_token().await?;
let url = format!("{}{}", self.urls.rest_base, endpoint.path());
let mut request = self.client.get(&url);
request = request.header("Authorization", format!("Bearer {}", id_token));
request = request.header("Content-Type", "application/json");
if !params.is_empty() {
request = request.query(¶ms);
}
let response = request.send().await
.map_err(|e| ExchangeError::Network(format!("Request failed: {}", e)))?;
let status = response.status();
if status.as_u16() == 401 {
self.auth.lock().expect("Mutex poisoned").clear_id_token();
return Err(ExchangeError::Auth("ID token expired, retry request".to_string()));
}
if !status.is_success() {
let text = response.text().await.unwrap_or_default();
return Err(ExchangeError::Api {
code: status.as_u16() as i32,
message: text,
});
}
response.json().await
.map_err(|e| ExchangeError::Parse(format!("JSON parse error: {}", e)))
}
}
impl ExchangeIdentity for JQuantsConnector {
fn exchange_id(&self) -> ExchangeId {
ExchangeId::JQuants
}
fn is_testnet(&self) -> bool {
false }
fn supported_account_types(&self) -> Vec<AccountType> {
vec![AccountType::Spot] }
}
#[async_trait]
impl MarketData for JQuantsConnector {
async fn get_price(
&self,
symbol: Symbol,
_account_type: AccountType,
) -> ExchangeResult<Price> {
let code = format_symbol(&symbol);
let mut params = HashMap::new();
params.insert("code".to_string(), code);
let response = self.get(JQuantsEndpoint::DailyQuotes, params).await?;
JQuantsParser::parse_current_price(&response)
}
async fn get_orderbook(
&self,
_symbol: Symbol,
_depth: Option<u16>,
_account_type: AccountType,
) -> ExchangeResult<OrderBook> {
Err(ExchangeError::UnsupportedOperation(
"JQuants does not provide orderbook data - it is a data-only provider with delayed data".to_string()
))
}
async fn get_klines(
&self,
symbol: Symbol,
_interval: &str, limit: Option<u16>,
_account_type: AccountType,
end_time: Option<i64>,
) -> ExchangeResult<Vec<Kline>> {
use chrono::{DateTime, Duration, NaiveDate, Utc};
let code = format_symbol(&symbol);
let mut params = HashMap::new();
params.insert("code".to_string(), code);
let end_date: NaiveDate = if let Some(end_ms) = end_time {
let dt = DateTime::from_timestamp_millis(end_ms)
.ok_or_else(|| ExchangeError::InvalidRequest(
"Invalid end_time timestamp".to_string()
))?;
dt.date_naive()
} else {
Utc::now().date_naive()
};
let days_back = limit.unwrap_or(100) as i64;
let start_date = end_date - Duration::days(days_back - 1);
let fmt = |d: NaiveDate| d.format("%Y%m%d").to_string();
params.insert("from".to_string(), fmt(start_date));
params.insert("to".to_string(), fmt(end_date));
let response = self.get(JQuantsEndpoint::DailyQuotes, params).await?;
let mut klines = JQuantsParser::parse_daily_quotes(&response)?;
if let Some(lim) = limit {
let len = klines.len();
if len > lim as usize {
klines = klines[len - lim as usize..].to_vec();
}
}
Ok(klines)
}
async fn get_ticker(
&self,
symbol: Symbol,
_account_type: AccountType,
) -> ExchangeResult<Ticker> {
let code = format_symbol(&symbol);
let mut params = HashMap::new();
params.insert("code".to_string(), code.clone());
let response = self.get(JQuantsEndpoint::DailyQuotes, params).await?;
JQuantsParser::parse_ticker(&response, &code)
}
async fn ping(&self) -> ExchangeResult<()> {
Ok(())
}
async fn get_exchange_info(&self, account_type: AccountType) -> ExchangeResult<Vec<SymbolInfo>> {
let params = HashMap::new();
let response = self.get(JQuantsEndpoint::ListedInfo, params).await?;
let symbols = JQuantsParser::parse_symbols(&response)?;
let infos = symbols.into_iter().map(|code| SymbolInfo {
symbol: code.clone(),
base_asset: code,
quote_asset: "JPY".to_string(),
status: "TRADING".to_string(),
price_precision: 0,
quantity_precision: 0,
min_quantity: Some(1.0),
max_quantity: None,
tick_size: None,
step_size: Some(1.0),
min_notional: None,
account_type,
}).collect();
Ok(infos)
}
}
#[async_trait]
impl Trading for JQuantsConnector {
async fn place_order(&self, _req: OrderRequest) -> ExchangeResult<PlaceOrderResponse> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - trading not supported".to_string()
))
}
async fn cancel_order(&self, _req: CancelRequest) -> ExchangeResult<Order> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - trading not supported".to_string()
))
}
async fn get_order(
&self,
_symbol: &str,
_order_id: &str,
_account_type: AccountType,
) -> ExchangeResult<Order> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - trading not supported".to_string()
))
}
async fn get_open_orders(
&self,
_symbol: Option<&str>,
_account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - trading not supported".to_string()
))
}
async fn get_order_history(
&self,
_filter: OrderHistoryFilter,
_account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - trading not supported".to_string()
))
}
}
#[async_trait]
impl Account for JQuantsConnector {
async fn get_balance(&self, _query: BalanceQuery) -> ExchangeResult<Vec<Balance>> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - account operations not supported".to_string()
))
}
async fn get_account_info(&self, _account_type: AccountType) -> ExchangeResult<AccountInfo> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - account operations not supported".to_string()
))
}
async fn get_fees(&self, _symbol: Option<&str>) -> ExchangeResult<FeeInfo> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - account operations not supported".to_string()
))
}
}
#[async_trait]
impl Positions for JQuantsConnector {
async fn get_positions(&self, _query: PositionQuery) -> ExchangeResult<Vec<Position>> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - position tracking not supported".to_string()
))
}
async fn get_funding_rate(
&self,
_symbol: &str,
_account_type: AccountType,
) -> ExchangeResult<FundingRate> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - position tracking not supported".to_string()
))
}
async fn modify_position(&self, _req: PositionModification) -> ExchangeResult<()> {
Err(ExchangeError::UnsupportedOperation(
"JQuants is a data provider - position tracking not supported".to_string()
))
}
}
impl JQuantsConnector {
pub async fn get_symbols(&self) -> ExchangeResult<Vec<String>> {
let params = HashMap::new();
let response = self.get(JQuantsEndpoint::ListedInfo, params).await?;
JQuantsParser::parse_symbols(&response)
}
}