1use crate::broker::{
11 Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
12};
13use crate::{OrderRequest, OrderResponse, OrderSide, OrderStatus, OrderType, Symbol, TimeInForce};
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
17use reqwest::{Client, Method, StatusCode};
18use rust_decimal::Decimal;
19use serde::{Deserialize, Serialize};
20use std::num::NonZeroU32;
21use std::time::Duration;
22use tracing::{debug, error, info};
23use uuid;
24
25#[derive(Debug, Clone)]
27pub struct OANDAConfig {
28 pub access_token: String,
30 pub account_id: String,
32 pub practice: bool,
34 pub timeout: Duration,
36}
37
38impl Default for OANDAConfig {
39 fn default() -> Self {
40 Self {
41 access_token: String::new(),
42 account_id: String::new(),
43 practice: true,
44 timeout: Duration::from_secs(30),
45 }
46 }
47}
48
49pub struct OANDABroker {
51 client: Client,
52 config: OANDAConfig,
53 base_url: String,
54 stream_url: String,
55 rate_limiter: DefaultDirectRateLimiter,
56}
57
58impl OANDABroker {
59 pub fn new(config: OANDAConfig) -> Self {
61 let (base_url, stream_url) = if config.practice {
62 (
63 "https://api-fxpractice.oanda.com".to_string(),
64 "https://stream-fxpractice.oanda.com".to_string(),
65 )
66 } else {
67 (
68 "https://api-fxtrade.oanda.com".to_string(),
69 "https://stream-fxtrade.oanda.com".to_string(),
70 )
71 };
72
73 let client = Client::builder()
74 .timeout(config.timeout)
75 .build()
76 .expect("Failed to create HTTP client");
77
78 let quota = Quota::per_second(NonZeroU32::new(100).unwrap());
80 let rate_limiter = RateLimiter::direct(quota);
81
82 Self {
83 client,
84 config,
85 base_url,
86 stream_url,
87 rate_limiter,
88 }
89 }
90
91 async fn request<T: serde::de::DeserializeOwned>(
93 &self,
94 method: Method,
95 path: &str,
96 body: Option<impl Serialize>,
97 ) -> Result<T, BrokerError> {
98 self.rate_limiter.until_ready().await;
99
100 let url = format!("{}{}", self.base_url, path);
101 let mut req = self
102 .client
103 .request(method.clone(), &url)
104 .header("Authorization", format!("Bearer {}", self.config.access_token))
105 .header("Content-Type", "application/json");
106
107 if let Some(body) = body {
108 req = req.json(&body);
109 }
110
111 debug!("OANDA API request: {} {}", method, path);
112
113 let response = req.send().await?;
114
115 match response.status() {
116 StatusCode::OK | StatusCode::CREATED => {
117 let result = response.json().await?;
118 Ok(result)
119 }
120 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
121 Err(BrokerError::Auth("Invalid OANDA access token".to_string()))
122 }
123 StatusCode::TOO_MANY_REQUESTS => Err(BrokerError::RateLimit),
124 status => {
125 let error_text = response.text().await.unwrap_or_default();
126 error!("OANDA API error {}: {}", status, error_text);
127 Err(BrokerError::Other(anyhow::anyhow!("HTTP {}: {}", status, error_text)))
128 }
129 }
130 }
131
132 fn convert_order(&self, order: &OrderRequest) -> OANDAOrderRequest {
134 let order_type = match order.order_type {
135 OrderType::Market => "MARKET",
136 OrderType::Limit => "LIMIT",
137 OrderType::StopLoss => "STOP",
138 OrderType::StopLimit => "STOP",
139 };
140
141 let units = if order.side == OrderSide::Buy {
142 order.quantity.to_string()
143 } else {
144 format!("-{}", order.quantity)
145 };
146
147 OANDAOrderRequest {
148 order: OANDAOrderDetails {
149 units,
150 instrument: order.symbol.to_string(),
151 order_type: order_type.to_string(),
152 time_in_force: match order.time_in_force {
153 TimeInForce::GTC => "GTC".to_string(),
154 TimeInForce::Day => "DAY".to_string(),
155 TimeInForce::FOK => "FOK".to_string(),
156 TimeInForce::IOC => "IOC".to_string(),
157 _ => "GTC".to_string(),
158 },
159 price: order.limit_price.map(|p| p.to_string()),
160 price_bound: None,
161 trigger_condition: Some("DEFAULT".to_string()),
162 },
163 }
164 }
165}
166
167#[async_trait]
168impl BrokerClient for OANDABroker {
169 async fn get_account(&self) -> Result<Account, BrokerError> {
170 #[derive(Deserialize)]
171 struct AccountResponse {
172 account: OANDAAccount,
173 }
174
175 #[derive(Deserialize)]
176 struct OANDAAccount {
177 id: String,
178 currency: String,
179 balance: String,
180 #[serde(rename = "NAV")]
181 nav: String,
182 #[serde(rename = "marginAvailable")]
183 margin_available: String,
184 #[serde(rename = "marginUsed")]
185 margin_used: String,
186 #[serde(rename = "unrealizedPL")]
187 unrealized_pl: String,
188 }
189
190 let response: AccountResponse = self
191 .request(
192 Method::GET,
193 &format!("/v3/accounts/{}", self.config.account_id),
194 None::<()>,
195 )
196 .await?;
197
198 let balance = Decimal::from_str_exact(&response.account.balance)
199 .unwrap_or_default();
200 let nav = Decimal::from_str_exact(&response.account.nav)
201 .unwrap_or_default();
202 let margin_available = Decimal::from_str_exact(&response.account.margin_available)
203 .unwrap_or_default();
204
205 Ok(Account {
206 account_id: response.account.id,
207 cash: balance,
208 portfolio_value: nav,
209 buying_power: margin_available,
210 equity: nav,
211 last_equity: nav,
212 multiplier: "1".to_string(),
213 currency: response.account.currency,
214 shorting_enabled: true,
215 long_market_value: nav,
216 short_market_value: Decimal::ZERO,
217 initial_margin: Decimal::ZERO,
218 maintenance_margin: Decimal::from_str_exact(&response.account.margin_used)
219 .unwrap_or_default(),
220 day_trading_buying_power: margin_available,
221 daytrade_count: 0,
222 })
223 }
224
225 async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
226 #[derive(Deserialize)]
227 struct PositionsResponse {
228 positions: Vec<OANDAPosition>,
229 }
230
231 #[derive(Deserialize)]
232 struct OANDAPosition {
233 instrument: String,
234 long: OANDAPositionSide,
235 short: OANDAPositionSide,
236 }
237
238 #[derive(Deserialize)]
239 struct OANDAPositionSide {
240 units: String,
241 #[serde(rename = "averagePrice")]
242 average_price: String,
243 #[serde(rename = "unrealizedPL")]
244 unrealized_pl: String,
245 }
246
247 let response: PositionsResponse = self
248 .request(
249 Method::GET,
250 &format!("/v3/accounts/{}/positions", self.config.account_id),
251 None::<()>,
252 )
253 .await?;
254
255 let mut positions = Vec::new();
256
257 for pos in response.positions {
258 let long_units = pos.long.units.parse::<i64>().unwrap_or(0);
259 let short_units = pos.short.units.parse::<i64>().unwrap_or(0);
260
261 if long_units != 0 {
262 let avg_price = Decimal::from_str_exact(&pos.long.average_price)
263 .unwrap_or_default();
264 let unrealized_pl = Decimal::from_str_exact(&pos.long.unrealized_pl)
265 .unwrap_or_default();
266
267 positions.push(Position {
268 symbol: Symbol::new(pos.instrument.as_str()).expect("Invalid symbol from OANDA"),
269 qty: long_units,
270 side: PositionSide::Long,
271 avg_entry_price: avg_price,
272 market_value: avg_price * Decimal::from(long_units),
273 cost_basis: avg_price * Decimal::from(long_units),
274 unrealized_pl,
275 unrealized_plpc: if avg_price != Decimal::ZERO {
276 (unrealized_pl / (avg_price * Decimal::from(long_units))) * Decimal::from(100)
277 } else {
278 Decimal::ZERO
279 },
280 current_price: avg_price,
281 lastday_price: avg_price,
282 change_today: Decimal::ZERO,
283 });
284 }
285
286 if short_units != 0 {
287 let avg_price = Decimal::from_str_exact(&pos.short.average_price)
288 .unwrap_or_default();
289 let unrealized_pl = Decimal::from_str_exact(&pos.short.unrealized_pl)
290 .unwrap_or_default();
291
292 positions.push(Position {
293 symbol: Symbol::new(pos.instrument.as_str()).expect("Invalid symbol from OANDA"),
294 qty: short_units.abs(),
295 side: PositionSide::Short,
296 avg_entry_price: avg_price,
297 market_value: avg_price * Decimal::from(short_units.abs()),
298 cost_basis: avg_price * Decimal::from(short_units.abs()),
299 unrealized_pl,
300 unrealized_plpc: if avg_price != Decimal::ZERO {
301 (unrealized_pl / (avg_price * Decimal::from(short_units.abs()))) * Decimal::from(100)
302 } else {
303 Decimal::ZERO
304 },
305 current_price: avg_price,
306 lastday_price: avg_price,
307 change_today: Decimal::ZERO,
308 });
309 }
310 }
311
312 Ok(positions)
313 }
314
315 async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
316 let oanda_order = self.convert_order(&order);
317
318 #[derive(Deserialize)]
319 struct OrderCreatedResponse {
320 #[serde(rename = "orderCreateTransaction")]
321 order_create_transaction: OrderTransaction,
322 }
323
324 #[derive(Deserialize)]
325 struct OrderTransaction {
326 id: String,
327 time: String,
328 }
329
330 let response: OrderCreatedResponse = self
331 .request(
332 Method::POST,
333 &format!("/v3/accounts/{}/orders", self.config.account_id),
334 Some(oanda_order),
335 )
336 .await?;
337
338 Ok(OrderResponse {
339 order_id: response.order_create_transaction.id,
340 client_order_id: uuid::Uuid::new_v4().to_string(),
341 status: OrderStatus::Accepted,
342 filled_qty: 0,
343 filled_avg_price: None,
344 submitted_at: Utc::now(),
345 filled_at: None,
346 })
347 }
348
349 async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
350 let _: serde_json::Value = self
351 .request(
352 Method::PUT,
353 &format!("/v3/accounts/{}/orders/{}/cancel", self.config.account_id, order_id),
354 None::<()>,
355 )
356 .await?;
357
358 Ok(())
359 }
360
361 async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
362 Err(BrokerError::Other(anyhow::anyhow!("Not implemented")))
363 }
364
365 async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
366 Ok(Vec::new())
367 }
368
369 async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
370 match self.get_account().await {
371 Ok(_) => Ok(HealthStatus::Healthy),
372 Err(_) => Ok(HealthStatus::Unhealthy),
373 }
374 }
375}
376
377#[derive(Debug, Serialize)]
378struct OANDAOrderRequest {
379 order: OANDAOrderDetails,
380}
381
382#[derive(Debug, Serialize)]
383struct OANDAOrderDetails {
384 units: String,
385 instrument: String,
386 #[serde(rename = "type")]
387 order_type: String,
388 #[serde(rename = "timeInForce")]
389 time_in_force: String,
390 #[serde(skip_serializing_if = "Option::is_none")]
391 price: Option<String>,
392 #[serde(skip_serializing_if = "Option::is_none")]
393 #[serde(rename = "priceBound")]
394 price_bound: Option<String>,
395 #[serde(skip_serializing_if = "Option::is_none")]
396 #[serde(rename = "triggerCondition")]
397 trigger_condition: Option<String>,
398}