use crate::broker::{
Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
};
use crate::{OrderRequest, OrderResponse, OrderSide, OrderStatus, OrderType, Symbol, TimeInForce};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use reqwest::{Client, Method, StatusCode};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU32;
use std::time::Duration;
use tracing::{debug, error, info};
use uuid;
#[derive(Debug, Clone)]
pub struct OANDAConfig {
pub access_token: String,
pub account_id: String,
pub practice: bool,
pub timeout: Duration,
}
impl Default for OANDAConfig {
fn default() -> Self {
Self {
access_token: String::new(),
account_id: String::new(),
practice: true,
timeout: Duration::from_secs(30),
}
}
}
pub struct OANDABroker {
client: Client,
config: OANDAConfig,
base_url: String,
stream_url: String,
rate_limiter: DefaultDirectRateLimiter,
}
impl OANDABroker {
pub fn new(config: OANDAConfig) -> Self {
let (base_url, stream_url) = if config.practice {
(
"https://api-fxpractice.oanda.com".to_string(),
"https://stream-fxpractice.oanda.com".to_string(),
)
} else {
(
"https://api-fxtrade.oanda.com".to_string(),
"https://stream-fxtrade.oanda.com".to_string(),
)
};
let client = Client::builder()
.timeout(config.timeout)
.build()
.expect("Failed to create HTTP client");
let quota = Quota::per_second(NonZeroU32::new(100).unwrap());
let rate_limiter = RateLimiter::direct(quota);
Self {
client,
config,
base_url,
stream_url,
rate_limiter,
}
}
async fn request<T: serde::de::DeserializeOwned>(
&self,
method: Method,
path: &str,
body: Option<impl Serialize>,
) -> Result<T, BrokerError> {
self.rate_limiter.until_ready().await;
let url = format!("{}{}", self.base_url, path);
let mut req = self
.client
.request(method.clone(), &url)
.header("Authorization", format!("Bearer {}", self.config.access_token))
.header("Content-Type", "application/json");
if let Some(body) = body {
req = req.json(&body);
}
debug!("OANDA API request: {} {}", method, path);
let response = req.send().await?;
match response.status() {
StatusCode::OK | StatusCode::CREATED => {
let result = response.json().await?;
Ok(result)
}
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(BrokerError::Auth("Invalid OANDA access token".to_string()))
}
StatusCode::TOO_MANY_REQUESTS => Err(BrokerError::RateLimit),
status => {
let error_text = response.text().await.unwrap_or_default();
error!("OANDA API error {}: {}", status, error_text);
Err(BrokerError::Other(anyhow::anyhow!("HTTP {}: {}", status, error_text)))
}
}
}
fn convert_order(&self, order: &OrderRequest) -> OANDAOrderRequest {
let order_type = match order.order_type {
OrderType::Market => "MARKET",
OrderType::Limit => "LIMIT",
OrderType::StopLoss => "STOP",
OrderType::StopLimit => "STOP",
};
let units = if order.side == OrderSide::Buy {
order.quantity.to_string()
} else {
format!("-{}", order.quantity)
};
OANDAOrderRequest {
order: OANDAOrderDetails {
units,
instrument: order.symbol.to_string(),
order_type: order_type.to_string(),
time_in_force: match order.time_in_force {
TimeInForce::GTC => "GTC".to_string(),
TimeInForce::Day => "DAY".to_string(),
TimeInForce::FOK => "FOK".to_string(),
TimeInForce::IOC => "IOC".to_string(),
_ => "GTC".to_string(),
},
price: order.limit_price.map(|p| p.to_string()),
price_bound: None,
trigger_condition: Some("DEFAULT".to_string()),
},
}
}
}
#[async_trait]
impl BrokerClient for OANDABroker {
async fn get_account(&self) -> Result<Account, BrokerError> {
#[derive(Deserialize)]
struct AccountResponse {
account: OANDAAccount,
}
#[derive(Deserialize)]
struct OANDAAccount {
id: String,
currency: String,
balance: String,
#[serde(rename = "NAV")]
nav: String,
#[serde(rename = "marginAvailable")]
margin_available: String,
#[serde(rename = "marginUsed")]
margin_used: String,
#[serde(rename = "unrealizedPL")]
unrealized_pl: String,
}
let response: AccountResponse = self
.request(
Method::GET,
&format!("/v3/accounts/{}", self.config.account_id),
None::<()>,
)
.await?;
let balance = Decimal::from_str_exact(&response.account.balance)
.unwrap_or_default();
let nav = Decimal::from_str_exact(&response.account.nav)
.unwrap_or_default();
let margin_available = Decimal::from_str_exact(&response.account.margin_available)
.unwrap_or_default();
Ok(Account {
account_id: response.account.id,
cash: balance,
portfolio_value: nav,
buying_power: margin_available,
equity: nav,
last_equity: nav,
multiplier: "1".to_string(),
currency: response.account.currency,
shorting_enabled: true,
long_market_value: nav,
short_market_value: Decimal::ZERO,
initial_margin: Decimal::ZERO,
maintenance_margin: Decimal::from_str_exact(&response.account.margin_used)
.unwrap_or_default(),
day_trading_buying_power: margin_available,
daytrade_count: 0,
})
}
async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
#[derive(Deserialize)]
struct PositionsResponse {
positions: Vec<OANDAPosition>,
}
#[derive(Deserialize)]
struct OANDAPosition {
instrument: String,
long: OANDAPositionSide,
short: OANDAPositionSide,
}
#[derive(Deserialize)]
struct OANDAPositionSide {
units: String,
#[serde(rename = "averagePrice")]
average_price: String,
#[serde(rename = "unrealizedPL")]
unrealized_pl: String,
}
let response: PositionsResponse = self
.request(
Method::GET,
&format!("/v3/accounts/{}/positions", self.config.account_id),
None::<()>,
)
.await?;
let mut positions = Vec::new();
for pos in response.positions {
let long_units = pos.long.units.parse::<i64>().unwrap_or(0);
let short_units = pos.short.units.parse::<i64>().unwrap_or(0);
if long_units != 0 {
let avg_price = Decimal::from_str_exact(&pos.long.average_price)
.unwrap_or_default();
let unrealized_pl = Decimal::from_str_exact(&pos.long.unrealized_pl)
.unwrap_or_default();
positions.push(Position {
symbol: Symbol::new(pos.instrument.as_str()).expect("Invalid symbol from OANDA"),
qty: long_units,
side: PositionSide::Long,
avg_entry_price: avg_price,
market_value: avg_price * Decimal::from(long_units),
cost_basis: avg_price * Decimal::from(long_units),
unrealized_pl,
unrealized_plpc: if avg_price != Decimal::ZERO {
(unrealized_pl / (avg_price * Decimal::from(long_units))) * Decimal::from(100)
} else {
Decimal::ZERO
},
current_price: avg_price,
lastday_price: avg_price,
change_today: Decimal::ZERO,
});
}
if short_units != 0 {
let avg_price = Decimal::from_str_exact(&pos.short.average_price)
.unwrap_or_default();
let unrealized_pl = Decimal::from_str_exact(&pos.short.unrealized_pl)
.unwrap_or_default();
positions.push(Position {
symbol: Symbol::new(pos.instrument.as_str()).expect("Invalid symbol from OANDA"),
qty: short_units.abs(),
side: PositionSide::Short,
avg_entry_price: avg_price,
market_value: avg_price * Decimal::from(short_units.abs()),
cost_basis: avg_price * Decimal::from(short_units.abs()),
unrealized_pl,
unrealized_plpc: if avg_price != Decimal::ZERO {
(unrealized_pl / (avg_price * Decimal::from(short_units.abs()))) * Decimal::from(100)
} else {
Decimal::ZERO
},
current_price: avg_price,
lastday_price: avg_price,
change_today: Decimal::ZERO,
});
}
}
Ok(positions)
}
async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
let oanda_order = self.convert_order(&order);
#[derive(Deserialize)]
struct OrderCreatedResponse {
#[serde(rename = "orderCreateTransaction")]
order_create_transaction: OrderTransaction,
}
#[derive(Deserialize)]
struct OrderTransaction {
id: String,
time: String,
}
let response: OrderCreatedResponse = self
.request(
Method::POST,
&format!("/v3/accounts/{}/orders", self.config.account_id),
Some(oanda_order),
)
.await?;
Ok(OrderResponse {
order_id: response.order_create_transaction.id,
client_order_id: uuid::Uuid::new_v4().to_string(),
status: OrderStatus::Accepted,
filled_qty: 0,
filled_avg_price: None,
submitted_at: Utc::now(),
filled_at: None,
})
}
async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
let _: serde_json::Value = self
.request(
Method::PUT,
&format!("/v3/accounts/{}/orders/{}/cancel", self.config.account_id, order_id),
None::<()>,
)
.await?;
Ok(())
}
async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
Err(BrokerError::Other(anyhow::anyhow!("Not implemented")))
}
async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
Ok(Vec::new())
}
async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
match self.get_account().await {
Ok(_) => Ok(HealthStatus::Healthy),
Err(_) => Ok(HealthStatus::Unhealthy),
}
}
}
#[derive(Debug, Serialize)]
struct OANDAOrderRequest {
order: OANDAOrderDetails,
}
#[derive(Debug, Serialize)]
struct OANDAOrderDetails {
units: String,
instrument: String,
#[serde(rename = "type")]
order_type: String,
#[serde(rename = "timeInForce")]
time_in_force: String,
#[serde(skip_serializing_if = "Option::is_none")]
price: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "priceBound")]
price_bound: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "triggerCondition")]
trigger_condition: Option<String>,
}