nt_execution/
oanda_broker.rs

1// OANDA forex trading integration
2//
3// Features:
4// - Forex trading with 50+ currency pairs
5// - CFD trading on indices, commodities, metals
6// - Tick-by-tick data streaming
7// - Sub-second execution
8// - Multiple order types including OCO, trailing stops
9
10use crate::broker::{
11    Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
12};
13use crate::{OrderRequest, OrderResponse, OrderSide, OrderStatus, OrderType, Symbol, TimeInForce};
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
17use reqwest::{Client, Method, StatusCode};
18use rust_decimal::Decimal;
19use serde::{Deserialize, Serialize};
20use std::num::NonZeroU32;
21use std::time::Duration;
22use tracing::{debug, error, info};
23use uuid;
24
25/// OANDA configuration
26#[derive(Debug, Clone)]
27pub struct OANDAConfig {
28    /// API access token
29    pub access_token: String,
30    /// Account ID
31    pub account_id: String,
32    /// Practice (paper) trading mode
33    pub practice: bool,
34    /// Request timeout
35    pub timeout: Duration,
36}
37
38impl Default for OANDAConfig {
39    fn default() -> Self {
40        Self {
41            access_token: String::new(),
42            account_id: String::new(),
43            practice: true,
44            timeout: Duration::from_secs(30),
45        }
46    }
47}
48
49/// OANDA broker client for forex trading
50pub struct OANDABroker {
51    client: Client,
52    config: OANDAConfig,
53    base_url: String,
54    stream_url: String,
55    rate_limiter: DefaultDirectRateLimiter,
56}
57
58impl OANDABroker {
59    /// Create a new OANDA broker client
60    pub fn new(config: OANDAConfig) -> Self {
61        let (base_url, stream_url) = if config.practice {
62            (
63                "https://api-fxpractice.oanda.com".to_string(),
64                "https://stream-fxpractice.oanda.com".to_string(),
65            )
66        } else {
67            (
68                "https://api-fxtrade.oanda.com".to_string(),
69                "https://stream-fxtrade.oanda.com".to_string(),
70            )
71        };
72
73        let client = Client::builder()
74            .timeout(config.timeout)
75            .build()
76            .expect("Failed to create HTTP client");
77
78        // OANDA rate limit: 120 requests per second
79        let quota = Quota::per_second(NonZeroU32::new(100).unwrap());
80        let rate_limiter = RateLimiter::direct(quota);
81
82        Self {
83            client,
84            config,
85            base_url,
86            stream_url,
87            rate_limiter,
88        }
89    }
90
91    /// Make authenticated request to OANDA API
92    async fn request<T: serde::de::DeserializeOwned>(
93        &self,
94        method: Method,
95        path: &str,
96        body: Option<impl Serialize>,
97    ) -> Result<T, BrokerError> {
98        self.rate_limiter.until_ready().await;
99
100        let url = format!("{}{}", self.base_url, path);
101        let mut req = self
102            .client
103            .request(method.clone(), &url)
104            .header("Authorization", format!("Bearer {}", self.config.access_token))
105            .header("Content-Type", "application/json");
106
107        if let Some(body) = body {
108            req = req.json(&body);
109        }
110
111        debug!("OANDA API request: {} {}", method, path);
112
113        let response = req.send().await?;
114
115        match response.status() {
116            StatusCode::OK | StatusCode::CREATED => {
117                let result = response.json().await?;
118                Ok(result)
119            }
120            StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
121                Err(BrokerError::Auth("Invalid OANDA access token".to_string()))
122            }
123            StatusCode::TOO_MANY_REQUESTS => Err(BrokerError::RateLimit),
124            status => {
125                let error_text = response.text().await.unwrap_or_default();
126                error!("OANDA API error {}: {}", status, error_text);
127                Err(BrokerError::Other(anyhow::anyhow!("HTTP {}: {}", status, error_text)))
128            }
129        }
130    }
131
132    /// Convert internal order to OANDA order format
133    fn convert_order(&self, order: &OrderRequest) -> OANDAOrderRequest {
134        let order_type = match order.order_type {
135            OrderType::Market => "MARKET",
136            OrderType::Limit => "LIMIT",
137            OrderType::StopLoss => "STOP",
138            OrderType::StopLimit => "STOP",
139        };
140
141        let units = if order.side == OrderSide::Buy {
142            order.quantity.to_string()
143        } else {
144            format!("-{}", order.quantity)
145        };
146
147        OANDAOrderRequest {
148            order: OANDAOrderDetails {
149                units,
150                instrument: order.symbol.to_string(),
151                order_type: order_type.to_string(),
152                time_in_force: match order.time_in_force {
153                    TimeInForce::GTC => "GTC".to_string(),
154                    TimeInForce::Day => "DAY".to_string(),
155                    TimeInForce::FOK => "FOK".to_string(),
156                    TimeInForce::IOC => "IOC".to_string(),
157                    _ => "GTC".to_string(),
158                },
159                price: order.limit_price.map(|p| p.to_string()),
160                price_bound: None,
161                trigger_condition: Some("DEFAULT".to_string()),
162            },
163        }
164    }
165}
166
167#[async_trait]
168impl BrokerClient for OANDABroker {
169    async fn get_account(&self) -> Result<Account, BrokerError> {
170        #[derive(Deserialize)]
171        struct AccountResponse {
172            account: OANDAAccount,
173        }
174
175        #[derive(Deserialize)]
176        struct OANDAAccount {
177            id: String,
178            currency: String,
179            balance: String,
180            #[serde(rename = "NAV")]
181            nav: String,
182            #[serde(rename = "marginAvailable")]
183            margin_available: String,
184            #[serde(rename = "marginUsed")]
185            margin_used: String,
186            #[serde(rename = "unrealizedPL")]
187            unrealized_pl: String,
188        }
189
190        let response: AccountResponse = self
191            .request(
192                Method::GET,
193                &format!("/v3/accounts/{}", self.config.account_id),
194                None::<()>,
195            )
196            .await?;
197
198        let balance = Decimal::from_str_exact(&response.account.balance)
199            .unwrap_or_default();
200        let nav = Decimal::from_str_exact(&response.account.nav)
201            .unwrap_or_default();
202        let margin_available = Decimal::from_str_exact(&response.account.margin_available)
203            .unwrap_or_default();
204
205        Ok(Account {
206            account_id: response.account.id,
207            cash: balance,
208            portfolio_value: nav,
209            buying_power: margin_available,
210            equity: nav,
211            last_equity: nav,
212            multiplier: "1".to_string(),
213            currency: response.account.currency,
214            shorting_enabled: true,
215            long_market_value: nav,
216            short_market_value: Decimal::ZERO,
217            initial_margin: Decimal::ZERO,
218            maintenance_margin: Decimal::from_str_exact(&response.account.margin_used)
219                .unwrap_or_default(),
220            day_trading_buying_power: margin_available,
221            daytrade_count: 0,
222        })
223    }
224
225    async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
226        #[derive(Deserialize)]
227        struct PositionsResponse {
228            positions: Vec<OANDAPosition>,
229        }
230
231        #[derive(Deserialize)]
232        struct OANDAPosition {
233            instrument: String,
234            long: OANDAPositionSide,
235            short: OANDAPositionSide,
236        }
237
238        #[derive(Deserialize)]
239        struct OANDAPositionSide {
240            units: String,
241            #[serde(rename = "averagePrice")]
242            average_price: String,
243            #[serde(rename = "unrealizedPL")]
244            unrealized_pl: String,
245        }
246
247        let response: PositionsResponse = self
248            .request(
249                Method::GET,
250                &format!("/v3/accounts/{}/positions", self.config.account_id),
251                None::<()>,
252            )
253            .await?;
254
255        let mut positions = Vec::new();
256
257        for pos in response.positions {
258            let long_units = pos.long.units.parse::<i64>().unwrap_or(0);
259            let short_units = pos.short.units.parse::<i64>().unwrap_or(0);
260
261            if long_units != 0 {
262                let avg_price = Decimal::from_str_exact(&pos.long.average_price)
263                    .unwrap_or_default();
264                let unrealized_pl = Decimal::from_str_exact(&pos.long.unrealized_pl)
265                    .unwrap_or_default();
266
267                positions.push(Position {
268                    symbol: Symbol::new(pos.instrument.as_str()).expect("Invalid symbol from OANDA"),
269                    qty: long_units,
270                    side: PositionSide::Long,
271                    avg_entry_price: avg_price,
272                    market_value: avg_price * Decimal::from(long_units),
273                    cost_basis: avg_price * Decimal::from(long_units),
274                    unrealized_pl,
275                    unrealized_plpc: if avg_price != Decimal::ZERO {
276                        (unrealized_pl / (avg_price * Decimal::from(long_units))) * Decimal::from(100)
277                    } else {
278                        Decimal::ZERO
279                    },
280                    current_price: avg_price,
281                    lastday_price: avg_price,
282                    change_today: Decimal::ZERO,
283                });
284            }
285
286            if short_units != 0 {
287                let avg_price = Decimal::from_str_exact(&pos.short.average_price)
288                    .unwrap_or_default();
289                let unrealized_pl = Decimal::from_str_exact(&pos.short.unrealized_pl)
290                    .unwrap_or_default();
291
292                positions.push(Position {
293                    symbol: Symbol::new(pos.instrument.as_str()).expect("Invalid symbol from OANDA"),
294                    qty: short_units.abs(),
295                    side: PositionSide::Short,
296                    avg_entry_price: avg_price,
297                    market_value: avg_price * Decimal::from(short_units.abs()),
298                    cost_basis: avg_price * Decimal::from(short_units.abs()),
299                    unrealized_pl,
300                    unrealized_plpc: if avg_price != Decimal::ZERO {
301                        (unrealized_pl / (avg_price * Decimal::from(short_units.abs()))) * Decimal::from(100)
302                    } else {
303                        Decimal::ZERO
304                    },
305                    current_price: avg_price,
306                    lastday_price: avg_price,
307                    change_today: Decimal::ZERO,
308                });
309            }
310        }
311
312        Ok(positions)
313    }
314
315    async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
316        let oanda_order = self.convert_order(&order);
317
318        #[derive(Deserialize)]
319        struct OrderCreatedResponse {
320            #[serde(rename = "orderCreateTransaction")]
321            order_create_transaction: OrderTransaction,
322        }
323
324        #[derive(Deserialize)]
325        struct OrderTransaction {
326            id: String,
327            time: String,
328        }
329
330        let response: OrderCreatedResponse = self
331            .request(
332                Method::POST,
333                &format!("/v3/accounts/{}/orders", self.config.account_id),
334                Some(oanda_order),
335            )
336            .await?;
337
338        Ok(OrderResponse {
339            order_id: response.order_create_transaction.id,
340            client_order_id: uuid::Uuid::new_v4().to_string(),
341            status: OrderStatus::Accepted,
342            filled_qty: 0,
343            filled_avg_price: None,
344            submitted_at: Utc::now(),
345            filled_at: None,
346        })
347    }
348
349    async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
350        let _: serde_json::Value = self
351            .request(
352                Method::PUT,
353                &format!("/v3/accounts/{}/orders/{}/cancel", self.config.account_id, order_id),
354                None::<()>,
355            )
356            .await?;
357
358        Ok(())
359    }
360
361    async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
362        Err(BrokerError::Other(anyhow::anyhow!("Not implemented")))
363    }
364
365    async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
366        Ok(Vec::new())
367    }
368
369    async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
370        match self.get_account().await {
371            Ok(_) => Ok(HealthStatus::Healthy),
372            Err(_) => Ok(HealthStatus::Unhealthy),
373        }
374    }
375}
376
377#[derive(Debug, Serialize)]
378struct OANDAOrderRequest {
379    order: OANDAOrderDetails,
380}
381
382#[derive(Debug, Serialize)]
383struct OANDAOrderDetails {
384    units: String,
385    instrument: String,
386    #[serde(rename = "type")]
387    order_type: String,
388    #[serde(rename = "timeInForce")]
389    time_in_force: String,
390    #[serde(skip_serializing_if = "Option::is_none")]
391    price: Option<String>,
392    #[serde(skip_serializing_if = "Option::is_none")]
393    #[serde(rename = "priceBound")]
394    price_bound: Option<String>,
395    #[serde(skip_serializing_if = "Option::is_none")]
396    #[serde(rename = "triggerCondition")]
397    trigger_condition: Option<String>,
398}