nt-execution 1.0.0

Order execution and broker integration for Neural Trader - supports Alpaca, Interactive Brokers, and more
Documentation
// OANDA forex trading integration
//
// Features:
// - Forex trading with 50+ currency pairs
// - CFD trading on indices, commodities, metals
// - Tick-by-tick data streaming
// - Sub-second execution
// - Multiple order types including OCO, trailing stops

use crate::broker::{
    Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
};
use crate::{OrderRequest, OrderResponse, OrderSide, OrderStatus, OrderType, Symbol, TimeInForce};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use reqwest::{Client, Method, StatusCode};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU32;
use std::time::Duration;
use tracing::{debug, error, info};
use uuid;

/// OANDA configuration
#[derive(Debug, Clone)]
pub struct OANDAConfig {
    /// API access token
    pub access_token: String,
    /// Account ID
    pub account_id: String,
    /// Practice (paper) trading mode
    pub practice: bool,
    /// Request timeout
    pub timeout: Duration,
}

impl Default for OANDAConfig {
    fn default() -> Self {
        Self {
            access_token: String::new(),
            account_id: String::new(),
            practice: true,
            timeout: Duration::from_secs(30),
        }
    }
}

/// OANDA broker client for forex trading
pub struct OANDABroker {
    client: Client,
    config: OANDAConfig,
    base_url: String,
    stream_url: String,
    rate_limiter: DefaultDirectRateLimiter,
}

impl OANDABroker {
    /// Create a new OANDA broker client
    pub fn new(config: OANDAConfig) -> Self {
        let (base_url, stream_url) = if config.practice {
            (
                "https://api-fxpractice.oanda.com".to_string(),
                "https://stream-fxpractice.oanda.com".to_string(),
            )
        } else {
            (
                "https://api-fxtrade.oanda.com".to_string(),
                "https://stream-fxtrade.oanda.com".to_string(),
            )
        };

        let client = Client::builder()
            .timeout(config.timeout)
            .build()
            .expect("Failed to create HTTP client");

        // OANDA rate limit: 120 requests per second
        let quota = Quota::per_second(NonZeroU32::new(100).unwrap());
        let rate_limiter = RateLimiter::direct(quota);

        Self {
            client,
            config,
            base_url,
            stream_url,
            rate_limiter,
        }
    }

    /// Make authenticated request to OANDA API
    async fn request<T: serde::de::DeserializeOwned>(
        &self,
        method: Method,
        path: &str,
        body: Option<impl Serialize>,
    ) -> Result<T, BrokerError> {
        self.rate_limiter.until_ready().await;

        let url = format!("{}{}", self.base_url, path);
        let mut req = self
            .client
            .request(method.clone(), &url)
            .header("Authorization", format!("Bearer {}", self.config.access_token))
            .header("Content-Type", "application/json");

        if let Some(body) = body {
            req = req.json(&body);
        }

        debug!("OANDA API request: {} {}", method, path);

        let response = req.send().await?;

        match response.status() {
            StatusCode::OK | StatusCode::CREATED => {
                let result = response.json().await?;
                Ok(result)
            }
            StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
                Err(BrokerError::Auth("Invalid OANDA access token".to_string()))
            }
            StatusCode::TOO_MANY_REQUESTS => Err(BrokerError::RateLimit),
            status => {
                let error_text = response.text().await.unwrap_or_default();
                error!("OANDA API error {}: {}", status, error_text);
                Err(BrokerError::Other(anyhow::anyhow!("HTTP {}: {}", status, error_text)))
            }
        }
    }

    /// Convert internal order to OANDA order format
    fn convert_order(&self, order: &OrderRequest) -> OANDAOrderRequest {
        let order_type = match order.order_type {
            OrderType::Market => "MARKET",
            OrderType::Limit => "LIMIT",
            OrderType::StopLoss => "STOP",
            OrderType::StopLimit => "STOP",
        };

        let units = if order.side == OrderSide::Buy {
            order.quantity.to_string()
        } else {
            format!("-{}", order.quantity)
        };

        OANDAOrderRequest {
            order: OANDAOrderDetails {
                units,
                instrument: order.symbol.to_string(),
                order_type: order_type.to_string(),
                time_in_force: match order.time_in_force {
                    TimeInForce::GTC => "GTC".to_string(),
                    TimeInForce::Day => "DAY".to_string(),
                    TimeInForce::FOK => "FOK".to_string(),
                    TimeInForce::IOC => "IOC".to_string(),
                    _ => "GTC".to_string(),
                },
                price: order.limit_price.map(|p| p.to_string()),
                price_bound: None,
                trigger_condition: Some("DEFAULT".to_string()),
            },
        }
    }
}

#[async_trait]
impl BrokerClient for OANDABroker {
    async fn get_account(&self) -> Result<Account, BrokerError> {
        #[derive(Deserialize)]
        struct AccountResponse {
            account: OANDAAccount,
        }

        #[derive(Deserialize)]
        struct OANDAAccount {
            id: String,
            currency: String,
            balance: String,
            #[serde(rename = "NAV")]
            nav: String,
            #[serde(rename = "marginAvailable")]
            margin_available: String,
            #[serde(rename = "marginUsed")]
            margin_used: String,
            #[serde(rename = "unrealizedPL")]
            unrealized_pl: String,
        }

        let response: AccountResponse = self
            .request(
                Method::GET,
                &format!("/v3/accounts/{}", self.config.account_id),
                None::<()>,
            )
            .await?;

        let balance = Decimal::from_str_exact(&response.account.balance)
            .unwrap_or_default();
        let nav = Decimal::from_str_exact(&response.account.nav)
            .unwrap_or_default();
        let margin_available = Decimal::from_str_exact(&response.account.margin_available)
            .unwrap_or_default();

        Ok(Account {
            account_id: response.account.id,
            cash: balance,
            portfolio_value: nav,
            buying_power: margin_available,
            equity: nav,
            last_equity: nav,
            multiplier: "1".to_string(),
            currency: response.account.currency,
            shorting_enabled: true,
            long_market_value: nav,
            short_market_value: Decimal::ZERO,
            initial_margin: Decimal::ZERO,
            maintenance_margin: Decimal::from_str_exact(&response.account.margin_used)
                .unwrap_or_default(),
            day_trading_buying_power: margin_available,
            daytrade_count: 0,
        })
    }

    async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
        #[derive(Deserialize)]
        struct PositionsResponse {
            positions: Vec<OANDAPosition>,
        }

        #[derive(Deserialize)]
        struct OANDAPosition {
            instrument: String,
            long: OANDAPositionSide,
            short: OANDAPositionSide,
        }

        #[derive(Deserialize)]
        struct OANDAPositionSide {
            units: String,
            #[serde(rename = "averagePrice")]
            average_price: String,
            #[serde(rename = "unrealizedPL")]
            unrealized_pl: String,
        }

        let response: PositionsResponse = self
            .request(
                Method::GET,
                &format!("/v3/accounts/{}/positions", self.config.account_id),
                None::<()>,
            )
            .await?;

        let mut positions = Vec::new();

        for pos in response.positions {
            let long_units = pos.long.units.parse::<i64>().unwrap_or(0);
            let short_units = pos.short.units.parse::<i64>().unwrap_or(0);

            if long_units != 0 {
                let avg_price = Decimal::from_str_exact(&pos.long.average_price)
                    .unwrap_or_default();
                let unrealized_pl = Decimal::from_str_exact(&pos.long.unrealized_pl)
                    .unwrap_or_default();

                positions.push(Position {
                    symbol: Symbol::new(pos.instrument.as_str()).expect("Invalid symbol from OANDA"),
                    qty: long_units,
                    side: PositionSide::Long,
                    avg_entry_price: avg_price,
                    market_value: avg_price * Decimal::from(long_units),
                    cost_basis: avg_price * Decimal::from(long_units),
                    unrealized_pl,
                    unrealized_plpc: if avg_price != Decimal::ZERO {
                        (unrealized_pl / (avg_price * Decimal::from(long_units))) * Decimal::from(100)
                    } else {
                        Decimal::ZERO
                    },
                    current_price: avg_price,
                    lastday_price: avg_price,
                    change_today: Decimal::ZERO,
                });
            }

            if short_units != 0 {
                let avg_price = Decimal::from_str_exact(&pos.short.average_price)
                    .unwrap_or_default();
                let unrealized_pl = Decimal::from_str_exact(&pos.short.unrealized_pl)
                    .unwrap_or_default();

                positions.push(Position {
                    symbol: Symbol::new(pos.instrument.as_str()).expect("Invalid symbol from OANDA"),
                    qty: short_units.abs(),
                    side: PositionSide::Short,
                    avg_entry_price: avg_price,
                    market_value: avg_price * Decimal::from(short_units.abs()),
                    cost_basis: avg_price * Decimal::from(short_units.abs()),
                    unrealized_pl,
                    unrealized_plpc: if avg_price != Decimal::ZERO {
                        (unrealized_pl / (avg_price * Decimal::from(short_units.abs()))) * Decimal::from(100)
                    } else {
                        Decimal::ZERO
                    },
                    current_price: avg_price,
                    lastday_price: avg_price,
                    change_today: Decimal::ZERO,
                });
            }
        }

        Ok(positions)
    }

    async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
        let oanda_order = self.convert_order(&order);

        #[derive(Deserialize)]
        struct OrderCreatedResponse {
            #[serde(rename = "orderCreateTransaction")]
            order_create_transaction: OrderTransaction,
        }

        #[derive(Deserialize)]
        struct OrderTransaction {
            id: String,
            time: String,
        }

        let response: OrderCreatedResponse = self
            .request(
                Method::POST,
                &format!("/v3/accounts/{}/orders", self.config.account_id),
                Some(oanda_order),
            )
            .await?;

        Ok(OrderResponse {
            order_id: response.order_create_transaction.id,
            client_order_id: uuid::Uuid::new_v4().to_string(),
            status: OrderStatus::Accepted,
            filled_qty: 0,
            filled_avg_price: None,
            submitted_at: Utc::now(),
            filled_at: None,
        })
    }

    async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
        let _: serde_json::Value = self
            .request(
                Method::PUT,
                &format!("/v3/accounts/{}/orders/{}/cancel", self.config.account_id, order_id),
                None::<()>,
            )
            .await?;

        Ok(())
    }

    async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
        Err(BrokerError::Other(anyhow::anyhow!("Not implemented")))
    }

    async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
        Ok(Vec::new())
    }

    async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
        match self.get_account().await {
            Ok(_) => Ok(HealthStatus::Healthy),
            Err(_) => Ok(HealthStatus::Unhealthy),
        }
    }
}

#[derive(Debug, Serialize)]
struct OANDAOrderRequest {
    order: OANDAOrderDetails,
}

#[derive(Debug, Serialize)]
struct OANDAOrderDetails {
    units: String,
    instrument: String,
    #[serde(rename = "type")]
    order_type: String,
    #[serde(rename = "timeInForce")]
    time_in_force: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    price: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "priceBound")]
    price_bound: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "triggerCondition")]
    trigger_condition: Option<String>,
}