1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
18};
19use crate::model::responses::{
20 DBEntryResponse, HistoricalPricesResponse, MarketNavigationResponse, MarketSearchResponse,
21 MultipleMarketDetailsResponse,
22};
23use crate::prelude::{
24 AccountActivityResponse, AccountsResponse, OrderConfirmationResponse, PositionsResponse,
25 TransactionHistoryResponse, WorkingOrdersResponse,
26};
27use crate::presentation::market::{MarketData, MarketDetails};
28use async_trait::async_trait;
29use serde_json::Value;
30use std::sync::Arc;
31use tracing::{debug, info};
32
33pub struct Client {
38 http_client: Arc<HttpClient>,
39}
40
41impl Client {
42 pub fn new() -> Self {
47 let http_client = Arc::new(HttpClient::default());
48 Self { http_client }
49 }
50
51 pub async fn get_ws_info(&self) -> WebsocketInfo {
56 self.http_client.get_ws_info().await
57 }
58}
59
60impl Default for Client {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66#[async_trait]
67impl MarketService for Client {
68 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
69 let path = format!("markets?searchTerm={}", search_term);
70 info!("Searching markets with term: {}", search_term);
71 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
72 debug!("{} markets found", result.markets.len());
73 Ok(result)
74 }
75
76 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
77 let path = format!("markets/{epic}");
78 info!("Getting market details: {}", epic);
79 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
80 let market_details: MarketDetails = serde_json::from_value(market_value)?;
81 debug!("Market details obtained for: {}", epic);
82 Ok(market_details)
83 }
84
85 async fn get_multiple_market_details(
86 &self,
87 epics: &[String],
88 ) -> Result<MultipleMarketDetailsResponse, AppError> {
89 if epics.is_empty() {
90 return Ok(MultipleMarketDetailsResponse::default());
91 } else if epics.len() > 50 {
92 return Err(AppError::InvalidInput(
93 "The maximum number of EPICs is 50".to_string(),
94 ));
95 }
96
97 let epics_str = epics.join(",");
98 let path = format!("markets?epics={}", epics_str);
99 debug!(
100 "Getting market details for {} EPICs in a batch",
101 epics.len()
102 );
103
104 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
105
106 Ok(response)
107 }
108
109 async fn get_historical_prices(
110 &self,
111 epic: &str,
112 resolution: &str,
113 from: &str,
114 to: &str,
115 ) -> Result<HistoricalPricesResponse, AppError> {
116 let path = format!(
117 "prices/{}?resolution={}&from={}&to={}",
118 epic, resolution, from, to
119 );
120 info!("Getting historical prices for: {}", epic);
121 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
122 debug!("Historical prices obtained for: {}", epic);
123 Ok(result)
124 }
125
126 async fn get_historical_prices_by_date_range(
127 &self,
128 epic: &str,
129 resolution: &str,
130 start_date: &str,
131 end_date: &str,
132 ) -> Result<HistoricalPricesResponse, AppError> {
133 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
134 info!(
135 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
136 epic, resolution, start_date, end_date
137 );
138 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
139 debug!(
140 "Historical prices obtained for epic: {}, {} data points",
141 epic,
142 result.prices.len()
143 );
144 Ok(result)
145 }
146
147 async fn get_recent_prices(
148 &self,
149 params: &RecentPricesRequest<'_>,
150 ) -> Result<HistoricalPricesResponse, AppError> {
151 let mut query_params = Vec::new();
152
153 if let Some(res) = params.resolution {
154 query_params.push(format!("resolution={}", res));
155 }
156 if let Some(f) = params.from {
157 query_params.push(format!("from={}", f));
158 }
159 if let Some(t) = params.to {
160 query_params.push(format!("to={}", t));
161 }
162 if let Some(max) = params.max_points {
163 query_params.push(format!("max={}", max));
164 }
165 if let Some(size) = params.page_size {
166 query_params.push(format!("pageSize={}", size));
167 }
168 if let Some(num) = params.page_number {
169 query_params.push(format!("pageNumber={}", num));
170 }
171
172 let query_string = if query_params.is_empty() {
173 String::new()
174 } else {
175 format!("?{}", query_params.join("&"))
176 };
177
178 let path = format!("prices/{}{}", params.epic, query_string);
179 info!("Getting recent prices for epic: {}", params.epic);
180 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
181 debug!(
182 "Recent prices obtained for epic: {}, {} data points",
183 params.epic,
184 result.prices.len()
185 );
186 Ok(result)
187 }
188
189 async fn get_historical_prices_by_count_v1(
190 &self,
191 epic: &str,
192 resolution: &str,
193 num_points: i32,
194 ) -> Result<HistoricalPricesResponse, AppError> {
195 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
196 info!(
197 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
198 epic, resolution, num_points
199 );
200 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
201 debug!(
202 "Historical prices (v1) obtained for epic: {}, {} data points",
203 epic,
204 result.prices.len()
205 );
206 Ok(result)
207 }
208
209 async fn get_historical_prices_by_count_v2(
210 &self,
211 epic: &str,
212 resolution: &str,
213 num_points: i32,
214 ) -> Result<HistoricalPricesResponse, AppError> {
215 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
216 info!(
217 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
218 epic, resolution, num_points
219 );
220 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
221 debug!(
222 "Historical prices (v2) obtained for epic: {}, {} data points",
223 epic,
224 result.prices.len()
225 );
226 Ok(result)
227 }
228
229 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
230 let path = "marketnavigation";
231 info!("Getting top-level market navigation nodes");
232 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
233 debug!("{} navigation nodes found", result.nodes.len());
234 debug!("{} markets found at root level", result.markets.len());
235 Ok(result)
236 }
237
238 async fn get_market_navigation_node(
239 &self,
240 node_id: &str,
241 ) -> Result<MarketNavigationResponse, AppError> {
242 let path = format!("marketnavigation/{}", node_id);
243 info!("Getting market navigation node: {}", node_id);
244 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
245 debug!("{} child nodes found", result.nodes.len());
246 debug!("{} markets found in node {}", result.markets.len(), node_id);
247 Ok(result)
248 }
249
250 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
251 let max_depth = 6;
252 info!(
253 "Starting comprehensive market hierarchy traversal (max {} levels)",
254 max_depth
255 );
256
257 let root_response = self.get_market_navigation().await?;
258 info!(
259 "Root navigation: {} nodes, {} markets at top level",
260 root_response.nodes.len(),
261 root_response.markets.len()
262 );
263
264 let mut all_markets = root_response.markets.clone();
265 let mut nodes_to_process = root_response.nodes.clone();
266 let mut processed_levels = 0;
267
268 while !nodes_to_process.is_empty() && processed_levels < max_depth {
269 let mut next_level_nodes = Vec::new();
270 let mut level_market_count = 0;
271
272 info!(
273 "Processing level {} with {} nodes",
274 processed_levels,
275 nodes_to_process.len()
276 );
277
278 for node in &nodes_to_process {
279 match self.get_market_navigation_node(&node.id).await {
280 Ok(node_response) => {
281 let node_markets = node_response.markets.len();
282 let node_children = node_response.nodes.len();
283
284 if node_markets > 0 || node_children > 0 {
285 debug!(
286 "Node '{}' (level {}): {} markets, {} child nodes",
287 node.name, processed_levels, node_markets, node_children
288 );
289 }
290
291 all_markets.extend(node_response.markets);
292 level_market_count += node_markets;
293 next_level_nodes.extend(node_response.nodes);
294 }
295 Err(e) => {
296 tracing::error!(
297 "Failed to get markets for node '{}' at level {}: {:?}",
298 node.name,
299 processed_levels,
300 e
301 );
302 }
303 }
304 }
305
306 info!(
307 "Level {} completed: {} markets found, {} nodes for next level",
308 processed_levels,
309 level_market_count,
310 next_level_nodes.len()
311 );
312
313 nodes_to_process = next_level_nodes;
314 processed_levels += 1;
315 }
316
317 info!(
318 "Market hierarchy traversal completed: {} total markets found across {} levels",
319 all_markets.len(),
320 processed_levels
321 );
322
323 Ok(all_markets)
324 }
325
326 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
327 info!("Getting all markets from hierarchy for DB entries");
328
329 let all_markets = self.get_all_markets().await?;
330 info!("Collected {} markets from hierarchy", all_markets.len());
331
332 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
333 .iter()
334 .map(DBEntryResponse::from)
335 .filter(|entry| !entry.epic.is_empty())
336 .collect();
337
338 info!("Created {} DB entries from markets", vec_db_entries.len());
339
340 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
342 .iter()
343 .map(|entry| entry.symbol.clone())
344 .filter(|symbol| !symbol.is_empty())
345 .collect();
346
347 info!(
348 "Found {} unique symbols to fetch expiry dates for",
349 unique_symbols.len()
350 );
351
352 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
353 std::collections::HashMap::new();
354
355 for symbol in unique_symbols {
356 if let Some(entry) = vec_db_entries
357 .iter()
358 .find(|e| e.symbol == symbol && !e.epic.is_empty())
359 {
360 match self.get_market_details(&entry.epic).await {
361 Ok(market_details) => {
362 let expiry_date = market_details
363 .instrument
364 .expiry_details
365 .as_ref()
366 .map(|details| details.last_dealing_date.clone())
367 .unwrap_or_else(|| market_details.instrument.expiry.clone());
368
369 symbol_expiry_map.insert(symbol.clone(), expiry_date);
370 info!(
371 "Fetched expiry date for symbol {}: {}",
372 symbol,
373 symbol_expiry_map.get(&symbol).unwrap()
374 );
375 }
376 Err(e) => {
377 tracing::error!(
378 "Failed to get market details for epic {} (symbol {}): {:?}",
379 entry.epic,
380 symbol,
381 e
382 );
383 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
384 }
385 }
386 }
387 }
388
389 for entry in &mut vec_db_entries {
390 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
391 entry.expiry = expiry_date.clone();
392 }
393 }
394
395 info!("Updated expiry dates for {} entries", vec_db_entries.len());
396 Ok(vec_db_entries)
397 }
398}
399
400#[async_trait]
401impl AccountService for Client {
402 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
403 info!("Getting account information");
404 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
405 debug!(
406 "Account information obtained: {} accounts",
407 result.accounts.len()
408 );
409 Ok(result)
410 }
411
412 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
413 debug!("Getting open positions");
414 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
415 debug!("Positions obtained: {} positions", result.positions.len());
416 Ok(result)
417 }
418
419 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
420 debug!("Getting open positions with filter: {}", filter);
421 let mut positions = self.get_positions().await?;
422
423 positions
424 .positions
425 .retain(|position| position.market.epic.contains(filter));
426
427 debug!(
428 "Positions obtained after filtering: {} positions",
429 positions.positions.len()
430 );
431 Ok(positions)
432 }
433
434 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
435 info!("Getting working orders");
436 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
437 debug!(
438 "Working orders obtained: {} orders",
439 result.working_orders.len()
440 );
441 Ok(result)
442 }
443
444 async fn get_activity(
445 &self,
446 from: &str,
447 to: &str,
448 ) -> Result<AccountActivityResponse, AppError> {
449 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
450 info!("Getting account activity");
451 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
452 debug!(
453 "Account activity obtained: {} activities",
454 result.activities.len()
455 );
456 Ok(result)
457 }
458
459 async fn get_activity_with_details(
460 &self,
461 from: &str,
462 to: &str,
463 ) -> Result<AccountActivityResponse, AppError> {
464 let path = format!(
465 "history/activity?from={}&to={}&detailed=true&pageSize=500",
466 from, to
467 );
468 info!("Getting detailed account activity");
469 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
470 debug!(
471 "Detailed account activity obtained: {} activities",
472 result.activities.len()
473 );
474 Ok(result)
475 }
476
477 async fn get_transactions(
478 &self,
479 from: &str,
480 to: &str,
481 ) -> Result<TransactionHistoryResponse, AppError> {
482 const PAGE_SIZE: u32 = 200;
483 let mut all_transactions = Vec::new();
484 let mut current_page = 1;
485 #[allow(unused_assignments)]
486 let mut last_metadata = None;
487
488 loop {
489 let path = format!(
490 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
491 from, to, PAGE_SIZE, current_page
492 );
493 info!("Getting transaction history page {}", current_page);
494
495 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
496
497 let total_pages = result.metadata.page_data.total_pages as u32;
498 last_metadata = Some(result.metadata);
499 all_transactions.extend(result.transactions);
500
501 if current_page >= total_pages {
502 break;
503 }
504 current_page += 1;
505 }
506
507 debug!(
508 "Total transaction history obtained: {} transactions",
509 all_transactions.len()
510 );
511
512 Ok(TransactionHistoryResponse {
513 transactions: all_transactions,
514 metadata: last_metadata
515 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
516 })
517 }
518}
519
520#[async_trait]
521impl OrderService for Client {
522 async fn create_order(
523 &self,
524 order: &CreateOrderRequest,
525 ) -> Result<CreateOrderResponse, AppError> {
526 info!("Creating order for: {}", order.epic);
527 let result: CreateOrderResponse = self
528 .http_client
529 .post("positions/otc", order, Some(2))
530 .await?;
531 debug!("Order created with reference: {}", result.deal_reference);
532 Ok(result)
533 }
534
535 async fn get_order_confirmation(
536 &self,
537 deal_reference: &str,
538 ) -> Result<OrderConfirmationResponse, AppError> {
539 let path = format!("confirms/{}", deal_reference);
540 info!("Getting confirmation for order: {}", deal_reference);
541 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
542 debug!("Confirmation obtained for order: {}", deal_reference);
543 Ok(result)
544 }
545
546 async fn update_position(
547 &self,
548 deal_id: &str,
549 update: &UpdatePositionRequest,
550 ) -> Result<UpdatePositionResponse, AppError> {
551 let path = format!("positions/otc/{}", deal_id);
552 info!("Updating position: {}", deal_id);
553 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
554 debug!(
555 "Position updated: {} with deal reference: {}",
556 deal_id, result.deal_reference
557 );
558 Ok(result)
559 }
560
561 async fn close_position(
562 &self,
563 close_request: &ClosePositionRequest,
564 ) -> Result<ClosePositionResponse, AppError> {
565 info!("Closing position");
566
567 let result: ClosePositionResponse = self
570 .http_client
571 .post_with_delete_method("positions/otc", close_request, Some(1))
572 .await?;
573
574 debug!("Position closed with reference: {}", result.deal_reference);
575 Ok(result)
576 }
577
578 async fn create_working_order(
579 &self,
580 order: &CreateWorkingOrderRequest,
581 ) -> Result<CreateWorkingOrderResponse, AppError> {
582 info!("Creating working order for: {}", order.epic);
583 let result: CreateWorkingOrderResponse = self
584 .http_client
585 .post("workingorders/otc", order, Some(2))
586 .await?;
587 debug!(
588 "Working order created with reference: {}",
589 result.deal_reference
590 );
591 Ok(result)
592 }
593}