Skip to main content

ig_client/application/
client.rs

1/******************************************************************************
2   Author: Joaquín Béjar García
3   Email: jb@taunais.com
4   Date: 19/10/25
5******************************************************************************/
6use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14    ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17    CategoriesResponse, CategoryInstrumentsResponse, DBEntryResponse, HistoricalPricesResponse,
18    MarketNavigationResponse, MarketSearchResponse, MultipleMarketDetailsResponse,
19};
20use crate::model::responses::{
21    ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
22};
23use crate::model::streaming::{
24    StreamingAccountDataField, StreamingChartField, StreamingMarketField, StreamingPriceField,
25    get_streaming_account_data_fields, get_streaming_chart_fields, get_streaming_market_fields,
26    get_streaming_price_fields,
27};
28use crate::prelude::{
29    AccountActivityResponse, AccountFields, AccountsResponse, ChartData, ChartScale,
30    OrderConfirmationResponse, PositionsResponse, TradeFields, TransactionHistoryResponse,
31    WorkingOrdersResponse,
32};
33use crate::presentation::market::{MarketData, MarketDetails};
34use crate::presentation::price::PriceData;
35use async_trait::async_trait;
36use lightstreamer_rs::client::{LightstreamerClient, LogType, Transport};
37use lightstreamer_rs::subscription::{
38    ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
39};
40use lightstreamer_rs::utils::setup_signal_hook;
41use serde_json::Value;
42use std::collections::HashSet;
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::{Mutex, Notify, RwLock, mpsc};
46use tokio::time::sleep;
47use tracing::{debug, error, info, warn};
48
49const MAX_CONNECTION_ATTEMPTS: u64 = 3;
50
51/// Main client for interacting with IG Markets API
52///
53/// This client provides a unified interface for all IG Markets API operations,
54/// including market data, account management, and order execution.
55pub struct Client {
56    http_client: Arc<HttpClient>,
57}
58
59impl Client {
60    /// Creates a new client instance
61    ///
62    /// # Returns
63    /// A new Client with default configuration
64    pub fn new() -> Self {
65        let http_client = Arc::new(HttpClient::default());
66        Self { http_client }
67    }
68
69    /// Gets WebSocket connection information for Lightstreamer
70    ///
71    /// # Returns
72    /// * `WebsocketInfo` containing server endpoint, authentication tokens, and account ID
73    pub async fn get_ws_info(&self) -> WebsocketInfo {
74        self.http_client.get_ws_info().await
75    }
76}
77
78impl Default for Client {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84#[async_trait]
85impl MarketService for Client {
86    async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
87        let path = format!("markets?searchTerm={}", search_term);
88        info!("Searching markets with term: {}", search_term);
89        let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
90        debug!("{} markets found", result.markets.len());
91        Ok(result)
92    }
93
94    async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
95        let path = format!("markets/{epic}");
96        info!("Getting market details: {}", epic);
97        let market_value: Value = self.http_client.get(&path, Some(3)).await?;
98        let market_details: MarketDetails = serde_json::from_value(market_value)?;
99        debug!("Market details obtained for: {}", epic);
100        Ok(market_details)
101    }
102
103    async fn get_multiple_market_details(
104        &self,
105        epics: &[String],
106    ) -> Result<MultipleMarketDetailsResponse, AppError> {
107        if epics.is_empty() {
108            return Ok(MultipleMarketDetailsResponse::default());
109        } else if epics.len() > 50 {
110            return Err(AppError::InvalidInput(
111                "The maximum number of EPICs is 50".to_string(),
112            ));
113        }
114
115        let epics_str = epics.join(",");
116        let path = format!("markets?epics={}", epics_str);
117        debug!(
118            "Getting market details for {} EPICs in a batch",
119            epics.len()
120        );
121
122        let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
123
124        Ok(response)
125    }
126
127    async fn get_historical_prices(
128        &self,
129        epic: &str,
130        resolution: &str,
131        from: &str,
132        to: &str,
133    ) -> Result<HistoricalPricesResponse, AppError> {
134        let path = format!(
135            "prices/{}?resolution={}&from={}&to={}",
136            epic, resolution, from, to
137        );
138        info!("Getting historical prices for: {}", epic);
139        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
140        debug!("Historical prices obtained for: {}", epic);
141        Ok(result)
142    }
143
144    async fn get_historical_prices_by_date_range(
145        &self,
146        epic: &str,
147        resolution: &str,
148        start_date: &str,
149        end_date: &str,
150    ) -> Result<HistoricalPricesResponse, AppError> {
151        let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
152        info!(
153            "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
154            epic, resolution, start_date, end_date
155        );
156        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
157        debug!(
158            "Historical prices obtained for epic: {}, {} data points",
159            epic,
160            result.prices.len()
161        );
162        Ok(result)
163    }
164
165    async fn get_recent_prices(
166        &self,
167        params: &RecentPricesRequest<'_>,
168    ) -> Result<HistoricalPricesResponse, AppError> {
169        let mut query_params = Vec::new();
170
171        if let Some(res) = params.resolution {
172            query_params.push(format!("resolution={}", res));
173        }
174        if let Some(f) = params.from {
175            query_params.push(format!("from={}", f));
176        }
177        if let Some(t) = params.to {
178            query_params.push(format!("to={}", t));
179        }
180        if let Some(max) = params.max_points {
181            query_params.push(format!("max={}", max));
182        }
183        if let Some(size) = params.page_size {
184            query_params.push(format!("pageSize={}", size));
185        }
186        if let Some(num) = params.page_number {
187            query_params.push(format!("pageNumber={}", num));
188        }
189
190        let query_string = if query_params.is_empty() {
191            String::new()
192        } else {
193            format!("?{}", query_params.join("&"))
194        };
195
196        let path = format!("prices/{}{}", params.epic, query_string);
197        info!("Getting recent prices for epic: {}", params.epic);
198        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
199        debug!(
200            "Recent prices obtained for epic: {}, {} data points",
201            params.epic,
202            result.prices.len()
203        );
204        Ok(result)
205    }
206
207    async fn get_historical_prices_by_count_v1(
208        &self,
209        epic: &str,
210        resolution: &str,
211        num_points: i32,
212    ) -> Result<HistoricalPricesResponse, AppError> {
213        let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
214        info!(
215            "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
216            epic, resolution, num_points
217        );
218        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
219        debug!(
220            "Historical prices (v1) obtained for epic: {}, {} data points",
221            epic,
222            result.prices.len()
223        );
224        Ok(result)
225    }
226
227    async fn get_historical_prices_by_count_v2(
228        &self,
229        epic: &str,
230        resolution: &str,
231        num_points: i32,
232    ) -> Result<HistoricalPricesResponse, AppError> {
233        let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
234        info!(
235            "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
236            epic, resolution, num_points
237        );
238        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
239        debug!(
240            "Historical prices (v2) obtained for epic: {}, {} data points",
241            epic,
242            result.prices.len()
243        );
244        Ok(result)
245    }
246
247    async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
248        let path = "marketnavigation";
249        info!("Getting top-level market navigation nodes");
250        let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
251        debug!("{} navigation nodes found", result.nodes.len());
252        debug!("{} markets found at root level", result.markets.len());
253        Ok(result)
254    }
255
256    async fn get_market_navigation_node(
257        &self,
258        node_id: &str,
259    ) -> Result<MarketNavigationResponse, AppError> {
260        let path = format!("marketnavigation/{}", node_id);
261        info!("Getting market navigation node: {}", node_id);
262        let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
263        debug!("{} child nodes found", result.nodes.len());
264        debug!("{} markets found in node {}", result.markets.len(), node_id);
265        Ok(result)
266    }
267
268    async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
269        let max_depth = 6;
270        info!(
271            "Starting comprehensive market hierarchy traversal (max {} levels)",
272            max_depth
273        );
274
275        let root_response = self.get_market_navigation().await?;
276        info!(
277            "Root navigation: {} nodes, {} markets at top level",
278            root_response.nodes.len(),
279            root_response.markets.len()
280        );
281
282        let mut all_markets = root_response.markets.clone();
283        let mut nodes_to_process = root_response.nodes.clone();
284        let mut processed_levels = 0;
285
286        while !nodes_to_process.is_empty() && processed_levels < max_depth {
287            let mut next_level_nodes = Vec::new();
288            let mut level_market_count = 0;
289
290            info!(
291                "Processing level {} with {} nodes",
292                processed_levels,
293                nodes_to_process.len()
294            );
295
296            for node in &nodes_to_process {
297                match self.get_market_navigation_node(&node.id).await {
298                    Ok(node_response) => {
299                        let node_markets = node_response.markets.len();
300                        let node_children = node_response.nodes.len();
301
302                        if node_markets > 0 || node_children > 0 {
303                            debug!(
304                                "Node '{}' (level {}): {} markets, {} child nodes",
305                                node.name, processed_levels, node_markets, node_children
306                            );
307                        }
308
309                        all_markets.extend(node_response.markets);
310                        level_market_count += node_markets;
311                        next_level_nodes.extend(node_response.nodes);
312                    }
313                    Err(e) => {
314                        tracing::error!(
315                            "Failed to get markets for node '{}' at level {}: {:?}",
316                            node.name,
317                            processed_levels,
318                            e
319                        );
320                    }
321                }
322            }
323
324            info!(
325                "Level {} completed: {} markets found, {} nodes for next level",
326                processed_levels,
327                level_market_count,
328                next_level_nodes.len()
329            );
330
331            nodes_to_process = next_level_nodes;
332            processed_levels += 1;
333        }
334
335        info!(
336            "Market hierarchy traversal completed: {} total markets found across {} levels",
337            all_markets.len(),
338            processed_levels
339        );
340
341        Ok(all_markets)
342    }
343
344    async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
345        info!("Getting all markets from hierarchy for DB entries");
346
347        let all_markets = self.get_all_markets().await?;
348        info!("Collected {} markets from hierarchy", all_markets.len());
349
350        let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
351            .iter()
352            .map(DBEntryResponse::from)
353            .filter(|entry| !entry.epic.is_empty())
354            .collect();
355
356        info!("Created {} DB entries from markets", vec_db_entries.len());
357
358        // Collect unique symbols
359        let unique_symbols: std::collections::HashSet<String> = vec_db_entries
360            .iter()
361            .map(|entry| entry.symbol.clone())
362            .filter(|symbol| !symbol.is_empty())
363            .collect();
364
365        info!(
366            "Found {} unique symbols to fetch expiry dates for",
367            unique_symbols.len()
368        );
369
370        let mut symbol_expiry_map: std::collections::HashMap<String, String> =
371            std::collections::HashMap::new();
372
373        for symbol in unique_symbols {
374            if let Some(entry) = vec_db_entries
375                .iter()
376                .find(|e| e.symbol == symbol && !e.epic.is_empty())
377            {
378                match self.get_market_details(&entry.epic).await {
379                    Ok(market_details) => {
380                        let expiry_date = market_details
381                            .instrument
382                            .expiry_details
383                            .as_ref()
384                            .map(|details| details.last_dealing_date.clone())
385                            .unwrap_or_else(|| market_details.instrument.expiry.clone());
386
387                        symbol_expiry_map.insert(symbol.clone(), expiry_date);
388                        if let Some(expiry) = symbol_expiry_map.get(&symbol) {
389                            info!("Fetched expiry date for symbol {}: {}", symbol, expiry);
390                        }
391                    }
392                    Err(e) => {
393                        tracing::error!(
394                            "Failed to get market details for epic {} (symbol {}): {:?}",
395                            entry.epic,
396                            symbol,
397                            e
398                        );
399                        symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
400                    }
401                }
402            }
403        }
404
405        for entry in &mut vec_db_entries {
406            if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
407                entry.expiry = expiry_date.clone();
408            }
409        }
410
411        info!("Updated expiry dates for {} entries", vec_db_entries.len());
412        Ok(vec_db_entries)
413    }
414
415    async fn get_categories(&self) -> Result<CategoriesResponse, AppError> {
416        info!("Getting all categories of instruments");
417        let result: CategoriesResponse = self.http_client.get("categories", Some(1)).await?;
418        debug!("{} categories found", result.categories.len());
419        Ok(result)
420    }
421
422    async fn get_category_instruments(
423        &self,
424        category_id: &str,
425        page_number: Option<i32>,
426        page_size: Option<i32>,
427    ) -> Result<CategoryInstrumentsResponse, AppError> {
428        let mut path = format!("categories/{}/instruments", category_id);
429
430        let mut query_params = Vec::new();
431        if let Some(page) = page_number {
432            query_params.push(format!("pageNumber={}", page));
433        }
434        if let Some(size) = page_size {
435            if size > 1000 {
436                return Err(AppError::InvalidInput(
437                    "pageSize cannot exceed 1000".to_string(),
438                ));
439            }
440            query_params.push(format!("pageSize={}", size));
441        }
442
443        if !query_params.is_empty() {
444            path = format!("{}?{}", path, query_params.join("&"));
445        }
446
447        info!(
448            "Getting instruments for category: {} (page: {:?}, size: {:?})",
449            category_id, page_number, page_size
450        );
451        let result: CategoryInstrumentsResponse = self.http_client.get(&path, Some(1)).await?;
452        debug!(
453            "{} instruments found in category {}",
454            result.instruments.len(),
455            category_id
456        );
457        Ok(result)
458    }
459}
460
461#[async_trait]
462impl AccountService for Client {
463    async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
464        info!("Getting account information");
465        let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
466        debug!(
467            "Account information obtained: {} accounts",
468            result.accounts.len()
469        );
470        Ok(result)
471    }
472
473    async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
474        debug!("Getting open positions");
475        let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
476        debug!("Positions obtained: {} positions", result.positions.len());
477        Ok(result)
478    }
479
480    async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
481        debug!("Getting open positions with filter: {}", filter);
482        let mut positions = self.get_positions().await?;
483
484        positions
485            .positions
486            .retain(|position| position.market.epic.contains(filter));
487
488        debug!(
489            "Positions obtained after filtering: {} positions",
490            positions.positions.len()
491        );
492        Ok(positions)
493    }
494
495    async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
496        info!("Getting working orders");
497        let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
498        debug!(
499            "Working orders obtained: {} orders",
500            result.working_orders.len()
501        );
502        Ok(result)
503    }
504
505    async fn get_activity(
506        &self,
507        from: &str,
508        to: &str,
509    ) -> Result<AccountActivityResponse, AppError> {
510        let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
511        info!("Getting account activity");
512        let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
513        debug!(
514            "Account activity obtained: {} activities",
515            result.activities.len()
516        );
517        Ok(result)
518    }
519
520    async fn get_activity_with_details(
521        &self,
522        from: &str,
523        to: &str,
524    ) -> Result<AccountActivityResponse, AppError> {
525        let path = format!(
526            "history/activity?from={}&to={}&detailed=true&pageSize=500",
527            from, to
528        );
529        info!("Getting detailed account activity");
530        let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
531        debug!(
532            "Detailed account activity obtained: {} activities",
533            result.activities.len()
534        );
535        Ok(result)
536    }
537
538    async fn get_transactions(
539        &self,
540        from: &str,
541        to: &str,
542    ) -> Result<TransactionHistoryResponse, AppError> {
543        const PAGE_SIZE: u32 = 200;
544        let mut all_transactions = Vec::new();
545        let mut current_page = 1;
546        #[allow(unused_assignments)]
547        let mut last_metadata = None;
548
549        loop {
550            let path = format!(
551                "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
552                from, to, PAGE_SIZE, current_page
553            );
554            info!("Getting transaction history page {}", current_page);
555
556            let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
557
558            let total_pages = result.metadata.page_data.total_pages as u32;
559            last_metadata = Some(result.metadata);
560            all_transactions.extend(result.transactions);
561
562            if current_page >= total_pages {
563                break;
564            }
565            current_page += 1;
566        }
567
568        debug!(
569            "Total transaction history obtained: {} transactions",
570            all_transactions.len()
571        );
572
573        Ok(TransactionHistoryResponse {
574            transactions: all_transactions,
575            metadata: last_metadata
576                .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
577        })
578    }
579}
580
581#[async_trait]
582impl OrderService for Client {
583    async fn create_order(
584        &self,
585        order: &CreateOrderRequest,
586    ) -> Result<CreateOrderResponse, AppError> {
587        info!("Creating order for: {}", order.epic);
588        let result: CreateOrderResponse = self
589            .http_client
590            .post("positions/otc", order, Some(2))
591            .await?;
592        debug!("Order created with reference: {}", result.deal_reference);
593        Ok(result)
594    }
595
596    async fn get_order_confirmation(
597        &self,
598        deal_reference: &str,
599    ) -> Result<OrderConfirmationResponse, AppError> {
600        let path = format!("confirms/{}", deal_reference);
601        info!("Getting confirmation for order: {}", deal_reference);
602        let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
603        debug!("Confirmation obtained for order: {}", deal_reference);
604        Ok(result)
605    }
606
607    async fn get_order_confirmation_w_retry(
608        &self,
609        deal_reference: &str,
610        retries: u64,
611        delay_ms: u64,
612    ) -> Result<OrderConfirmationResponse, AppError> {
613        let mut attempts = 0;
614        loop {
615            match self.get_order_confirmation(deal_reference).await {
616                Ok(response) => return Ok(response),
617                Err(e) => {
618                    attempts += 1;
619                    if attempts > retries {
620                        return Err(e);
621                    }
622                    warn!(
623                        "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
624                        attempts, retries, e, delay_ms
625                    );
626                    sleep(Duration::from_millis(delay_ms)).await;
627                }
628            }
629        }
630    }
631
632    async fn update_position(
633        &self,
634        deal_id: &str,
635        update: &UpdatePositionRequest,
636    ) -> Result<UpdatePositionResponse, AppError> {
637        let path = format!("positions/otc/{}", deal_id);
638        info!("Updating position: {}", deal_id);
639        let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
640        debug!(
641            "Position updated: {} with deal reference: {}",
642            deal_id, result.deal_reference
643        );
644        Ok(result)
645    }
646
647    async fn update_level_in_position(
648        &self,
649        deal_id: &str,
650        limit_level: Option<f64>,
651    ) -> Result<UpdatePositionResponse, AppError> {
652        let path = format!("positions/otc/{}", deal_id);
653        info!("Updating position: {}", deal_id);
654        let limit_level = limit_level.unwrap_or(0.0);
655
656        let update: UpdatePositionRequest = UpdatePositionRequest {
657            guaranteed_stop: None,
658            limit_level: Some(limit_level),
659            stop_level: None,
660            trailing_stop: None,
661            trailing_stop_distance: None,
662            trailing_stop_increment: None,
663        };
664        let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
665        debug!(
666            "Position updated: {} with deal reference: {}",
667            deal_id, result.deal_reference
668        );
669        Ok(result)
670    }
671
672    async fn close_position(
673        &self,
674        close_request: &ClosePositionRequest,
675    ) -> Result<ClosePositionResponse, AppError> {
676        info!("Closing position");
677
678        // IG API requires POST with _method: DELETE header for closing positions
679        // This is a workaround for HTTP client limitations with DELETE + body
680        let result: ClosePositionResponse = self
681            .http_client
682            .post_with_delete_method("positions/otc", close_request, Some(1))
683            .await?;
684
685        debug!("Position closed with reference: {}", result.deal_reference);
686        Ok(result)
687    }
688
689    async fn create_working_order(
690        &self,
691        order: &CreateWorkingOrderRequest,
692    ) -> Result<CreateWorkingOrderResponse, AppError> {
693        info!("Creating working order for: {}", order.epic);
694        let result: CreateWorkingOrderResponse = self
695            .http_client
696            .post("workingorders/otc", order, Some(2))
697            .await?;
698        debug!(
699            "Working order created with reference: {}",
700            result.deal_reference
701        );
702        Ok(result)
703    }
704
705    async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
706        let path = format!("workingorders/otc/{}", deal_id);
707        let result: CreateWorkingOrderResponse =
708            self.http_client.delete(path.as_str(), Some(2)).await?;
709        debug!(
710            "Working order created with reference: {}",
711            result.deal_reference
712        );
713        Ok(())
714    }
715}
716
717/// Streaming client for IG Markets real-time data.
718///
719/// This client manages two Lightstreamer connections for different data types:
720/// - **Market streamer**: Handles market data (prices, market state), trade updates (CONFIRMS, OPU, WOU),
721///   and account updates (positions, orders, balance). Uses the default adapter.
722/// - **Price streamer**: Handles detailed price data (bid/ask levels, sizes, multiple currencies).
723///   Uses the "Pricing" adapter.
724///
725/// Each connection type can be managed independently and runs in parallel.
726pub struct StreamerClient {
727    account_id: String,
728    market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
729    price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
730    // Flags indicating whether there is at least one active subscription for each client
731    has_market_stream_subs: bool,
732    has_price_stream_subs: bool,
733}
734
735impl StreamerClient {
736    /// Creates a new streaming client instance.
737    ///
738    /// This initializes both streaming clients (market and price) but does not
739    /// establish connections yet. Connections are established when `connect()` is called.
740    ///
741    /// # Returns
742    ///
743    /// Returns a new `StreamerClient` instance or an error if initialization fails.
744    pub async fn new() -> Result<Self, AppError> {
745        let http_client_raw = Arc::new(RwLock::new(Client::new()));
746        let http_client = http_client_raw.read().await;
747        let ws_info = http_client.get_ws_info().await;
748        let password = ws_info.get_ws_password();
749
750        // Market data client (no adapter specified - uses default)
751        let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
752            Some(ws_info.server.as_str()),
753            None,
754            Some(&ws_info.account_id),
755            Some(&password),
756        )?));
757
758        let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
759            Some(ws_info.server.as_str()),
760            None,
761            Some(&ws_info.account_id),
762            Some(&password),
763        )?));
764
765        // Force WebSocket streaming transport on both clients to satisfy IG requirements
766        // and configure logging to use tracing levels for proper log propagation
767        {
768            let mut client = market_streamer_client.lock().await;
769            client
770                .connection_options
771                .set_forced_transport(Some(Transport::WsStreaming));
772            client.set_logging_type(LogType::TracingLogs);
773        }
774        {
775            let mut client = price_streamer_client.lock().await;
776            client
777                .connection_options
778                .set_forced_transport(Some(Transport::WsStreaming));
779            client.set_logging_type(LogType::TracingLogs);
780        }
781
782        Ok(Self {
783            account_id: ws_info.account_id.clone(),
784            market_streamer_client: Some(market_streamer_client),
785            price_streamer_client: Some(price_streamer_client),
786            has_market_stream_subs: false,
787            has_price_stream_subs: false,
788        })
789    }
790
791    /// Creates a default streaming client instance.
792    pub async fn default() -> Result<Self, AppError> {
793        Self::new().await
794    }
795
796    /// Subscribes to market data updates for the specified instruments.
797    ///
798    /// This method creates a subscription to receive real-time market data updates
799    /// for the given EPICs and returns a channel receiver for consuming the updates.
800    ///
801    /// # Arguments
802    ///
803    /// * `epics` - List of instrument EPICs to subscribe to
804    /// * `fields` - Set of market data fields to receive (e.g., BID, OFFER, etc.)
805    ///
806    /// # Returns
807    ///
808    /// Returns a receiver channel for `PriceData` updates, or an error if
809    /// the subscription setup failed.
810    ///
811    /// # Examples
812    ///
813    /// ```ignore
814    /// let mut receiver = client.market_subscribe(
815    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
816    ///     fields
817    /// ).await?;
818    ///
819    /// tokio::spawn(async move {
820    ///     while let Some(price_data) = receiver.recv().await {
821    ///         println!("Price update: {:?}", price_data);
822    ///     }
823    /// });
824    /// ```
825    pub async fn market_subscribe(
826        &mut self,
827        epics: Vec<String>,
828        fields: HashSet<StreamingMarketField>,
829    ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
830        // Mark that we have at least one subscription on the market streamer
831        self.has_market_stream_subs = true;
832
833        let fields = get_streaming_market_fields(&fields);
834        let market_epics: Vec<String> = epics
835            .iter()
836            .map(|epic| "MARKET:".to_string() + epic)
837            .collect();
838        let mut subscription =
839            Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
840
841        subscription.set_data_adapter(None)?;
842        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
843
844        // Create channel listener that converts ItemUpdate to PriceData
845        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
846        subscription.add_listener(Box::new(listener));
847
848        // Configure client and add subscription
849        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
850            AppError::WebSocketError("market streamer client not initialized".to_string())
851        })?;
852
853        {
854            let mut client = client.lock().await;
855            client
856                .connection_options
857                .set_forced_transport(Some(Transport::WsStreaming));
858            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
859        }
860
861        // Create a channel for PriceData and spawn a task to convert ItemUpdate to PriceData
862        let (price_tx, price_rx) = mpsc::unbounded_channel();
863        tokio::spawn(async move {
864            let mut receiver = item_receiver;
865            while let Some(item_update) = receiver.recv().await {
866                let price_data = PriceData::from(&item_update);
867                let _ = price_tx.send(price_data);
868            }
869        });
870
871        info!(
872            "Market subscription created for {} instruments",
873            epics.len()
874        );
875        Ok(price_rx)
876    }
877
878    /// Subscribes to trade updates for the account.
879    ///
880    /// This method creates a subscription to receive real-time trade confirmations,
881    /// order updates (OPU), and working order updates (WOU) for the account,
882    /// and returns a channel receiver for consuming the updates.
883    ///
884    /// # Returns
885    ///
886    /// Returns a receiver channel for `TradeFields` updates, or an error if
887    /// the subscription setup failed.
888    ///
889    /// # Examples
890    ///
891    /// ```ignore
892    /// let mut receiver = client.trade_subscribe().await?;
893    ///
894    /// tokio::spawn(async move {
895    ///     while let Some(trade_fields) = receiver.recv().await {
896    ///         println!("Trade update: {:?}", trade_fields);
897    ///     }
898    /// });
899    /// ```
900    pub async fn trade_subscribe(
901        &mut self,
902    ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
903        // Mark that we have at least one subscription on the market streamer
904        self.has_market_stream_subs = true;
905
906        let account_id = self.account_id.clone();
907        let fields = Some(vec![
908            "CONFIRMS".to_string(),
909            "OPU".to_string(),
910            "WOU".to_string(),
911        ]);
912        let trade_items = vec![format!("TRADE:{account_id}")];
913
914        let mut subscription =
915            Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
916
917        subscription.set_data_adapter(None)?;
918        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
919
920        // Create channel listener
921        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
922        subscription.add_listener(Box::new(listener));
923
924        // Configure client and add subscription (reusing market_streamer_client)
925        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
926            AppError::WebSocketError("market streamer client not initialized".to_string())
927        })?;
928
929        {
930            let mut client = client.lock().await;
931            client
932                .connection_options
933                .set_forced_transport(Some(Transport::WsStreaming));
934            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
935        }
936
937        // Create a channel for TradeFields and spawn a task to convert ItemUpdate to TradeFields
938        let (trade_tx, trade_rx) = mpsc::unbounded_channel();
939        tokio::spawn(async move {
940            let mut receiver = item_receiver;
941            while let Some(item_update) = receiver.recv().await {
942                let trade_data = crate::presentation::trade::TradeData::from(&item_update);
943                let _ = trade_tx.send(trade_data.fields);
944            }
945        });
946
947        info!("Trade subscription created for account: {}", account_id);
948        Ok(trade_rx)
949    }
950
951    /// Subscribes to account data updates.
952    ///
953    /// This method creates a subscription to receive real-time account updates including
954    /// profit/loss, margin, equity, available funds, and other account metrics,
955    /// and returns a channel receiver for consuming the updates.
956    ///
957    /// # Arguments
958    ///
959    /// * `fields` - Set of account data fields to receive (e.g., PNL, MARGIN, EQUITY, etc.)
960    ///
961    /// # Returns
962    ///
963    /// Returns a receiver channel for `AccountFields` updates, or an error if
964    /// the subscription setup failed.
965    ///
966    /// # Examples
967    ///
968    /// ```ignore
969    /// let mut receiver = client.account_subscribe(fields).await?;
970    ///
971    /// tokio::spawn(async move {
972    ///     while let Some(account_fields) = receiver.recv().await {
973    ///         println!("Account update: {:?}", account_fields);
974    ///     }
975    /// });
976    /// ```
977    pub async fn account_subscribe(
978        &mut self,
979        fields: HashSet<StreamingAccountDataField>,
980    ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
981        // Mark that we have at least one subscription on the market streamer
982        self.has_market_stream_subs = true;
983
984        let fields = get_streaming_account_data_fields(&fields);
985        let account_id = self.account_id.clone();
986        let account_items = vec![format!("ACCOUNT:{account_id}")];
987
988        let mut subscription =
989            Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
990
991        subscription.set_data_adapter(None)?;
992        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
993
994        // Create channel listener
995        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
996        subscription.add_listener(Box::new(listener));
997
998        // Configure client and add subscription (reusing market_streamer_client)
999        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1000            AppError::WebSocketError("market streamer client not initialized".to_string())
1001        })?;
1002
1003        {
1004            let mut client = client.lock().await;
1005            client
1006                .connection_options
1007                .set_forced_transport(Some(Transport::WsStreaming));
1008            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1009        }
1010
1011        // Create a channel for AccountFields and spawn a task to convert ItemUpdate to AccountFields
1012        let (account_tx, account_rx) = mpsc::unbounded_channel();
1013        tokio::spawn(async move {
1014            let mut receiver = item_receiver;
1015            while let Some(item_update) = receiver.recv().await {
1016                let account_data = crate::presentation::account::AccountData::from(&item_update);
1017                let _ = account_tx.send(account_data.fields);
1018            }
1019        });
1020
1021        info!("Account subscription created for account: {}", account_id);
1022        Ok(account_rx)
1023    }
1024
1025    /// Subscribes to price data updates for the specified instruments.
1026    ///
1027    /// This method creates a subscription to receive real-time price updates including
1028    /// bid/ask prices, sizes, and multiple currency levels for the given EPICs,
1029    /// and returns a channel receiver for consuming the updates.
1030    ///
1031    /// # Arguments
1032    ///
1033    /// * `epics` - List of instrument EPICs to subscribe to
1034    /// * `fields` - Set of price data fields to receive (e.g., BID_PRICE1, ASK_PRICE1, etc.)
1035    ///
1036    /// # Returns
1037    ///
1038    /// Returns a receiver channel for `PriceData` updates, or an error if
1039    /// the subscription setup failed.
1040    ///
1041    /// # Examples
1042    ///
1043    /// ```ignore
1044    /// let mut receiver = client.price_subscribe(
1045    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
1046    ///     fields
1047    /// ).await?;
1048    ///
1049    /// tokio::spawn(async move {
1050    ///     while let Some(price_data) = receiver.recv().await {
1051    ///         println!("Price update: {:?}", price_data);
1052    ///     }
1053    /// });
1054    /// ```
1055    pub async fn price_subscribe(
1056        &mut self,
1057        epics: Vec<String>,
1058        fields: HashSet<StreamingPriceField>,
1059    ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1060        // Mark that we have at least one subscription on the price streamer
1061        self.has_price_stream_subs = true;
1062
1063        let fields = get_streaming_price_fields(&fields);
1064        let account_id = self.account_id.clone();
1065        let price_epics: Vec<String> = epics
1066            .iter()
1067            .map(|epic| format!("PRICE:{account_id}:{epic}"))
1068            .collect();
1069
1070        // Debug what we are about to subscribe to (items and fields)
1071        tracing::debug!("Pricing subscribe items: {:?}", price_epics);
1072        tracing::debug!("Pricing subscribe fields: {:?}", fields);
1073
1074        let mut subscription =
1075            Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1076
1077        // Allow overriding the Pricing adapter name via env var to match server config
1078        let pricing_adapter =
1079            std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1080        tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1081        subscription.set_data_adapter(Some(pricing_adapter))?;
1082        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1083
1084        // Create channel listener
1085        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1086        subscription.add_listener(Box::new(listener));
1087
1088        // Configure client and add subscription
1089        let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1090            AppError::WebSocketError("price streamer client not initialized".to_string())
1091        })?;
1092
1093        {
1094            let mut client = client.lock().await;
1095            client
1096                .connection_options
1097                .set_forced_transport(Some(Transport::WsStreaming));
1098            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1099        }
1100
1101        // Create a channel for PriceData and spawn a task to convert ItemUpdate to PriceData
1102        let (price_tx, price_rx) = mpsc::unbounded_channel();
1103        tokio::spawn(async move {
1104            let mut receiver = item_receiver;
1105            while let Some(item_update) = receiver.recv().await {
1106                let price_data = PriceData::from(&item_update);
1107                let _ = price_tx.send(price_data);
1108            }
1109        });
1110
1111        info!(
1112            "Price subscription created for {} instruments (account: {})",
1113            epics.len(),
1114            account_id
1115        );
1116        Ok(price_rx)
1117    }
1118
1119    /// Subscribes to chart data updates for the specified instruments and scale.
1120    ///
1121    /// This method creates a subscription to receive real-time chart updates including
1122    /// OHLC data, volume, and other chart metrics for the given EPICs and chart scale,
1123    /// and returns a channel receiver for consuming the updates.
1124    ///
1125    /// # Arguments
1126    ///
1127    /// * `epics` - List of instrument EPICs to subscribe to.
1128    /// * `scale` - Chart scale (e.g., Tick, 1Min, 5Min, etc.).
1129    /// * `fields` - Set of chart data fields to receive (e.g., OPEN, HIGH, LOW, CLOSE, VOLUME).
1130    ///
1131    /// # Returns
1132    ///
1133    /// Returns a receiver channel for `ChartData` updates, or an error if
1134    /// the subscription setup failed.
1135    ///
1136    /// # Examples
1137    ///
1138    /// ```ignore
1139    /// let mut receiver = client.chart_subscribe(
1140    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
1141    ///     ChartScale::OneMin,
1142    ///     fields
1143    /// ).await?;
1144    ///
1145    /// tokio::spawn(async move {
1146    ///     while let Some(chart_data) = receiver.recv().await {
1147    ///         println!("Chart update: {:?}", chart_data);
1148    ///     }
1149    /// });
1150    /// ```
1151    pub async fn chart_subscribe(
1152        &mut self,
1153        epics: Vec<String>,
1154        scale: ChartScale,
1155        fields: HashSet<StreamingChartField>,
1156    ) -> Result<mpsc::UnboundedReceiver<ChartData>, AppError> {
1157        // Mark that we have at least one subscription on the market streamer
1158        self.has_market_stream_subs = true;
1159
1160        let fields = get_streaming_chart_fields(&fields);
1161
1162        let chart_items: Vec<String> = epics
1163            .iter()
1164            .map(|epic| format!("CHART:{epic}:{scale}",))
1165            .collect();
1166
1167        // Candle data uses MERGE mode, tick data uses DISTINCT
1168        let mode = if matches!(scale, ChartScale::Tick) {
1169            SubscriptionMode::Distinct
1170        } else {
1171            SubscriptionMode::Merge
1172        };
1173
1174        let mut subscription = Subscription::new(mode, Some(chart_items), Some(fields))?;
1175
1176        subscription.set_data_adapter(None)?;
1177        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1178
1179        // Create channel listener
1180        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1181        subscription.add_listener(Box::new(listener));
1182
1183        // Configure client and add subscription (reusing market_streamer_client)
1184        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1185            AppError::WebSocketError("market streamer client not initialized".to_string())
1186        })?;
1187
1188        {
1189            let mut client = client.lock().await;
1190            client
1191                .connection_options
1192                .set_forced_transport(Some(Transport::WsStreaming));
1193            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1194        }
1195
1196        // Create a channel for ChartData and spawn a task to convert ItemUpdate to ChartData
1197        let (chart_tx, chart_rx) = mpsc::unbounded_channel();
1198        tokio::spawn(async move {
1199            let mut receiver = item_receiver;
1200            while let Some(item_update) = receiver.recv().await {
1201                let chart_data = ChartData::from(&item_update);
1202                let _ = chart_tx.send(chart_data);
1203            }
1204        });
1205
1206        info!(
1207            "Chart subscription created for {} instruments (scale: {})",
1208            epics.len(),
1209            scale
1210        );
1211
1212        Ok(chart_rx)
1213    }
1214
1215    /// Connects all active Lightstreamer clients and maintains the connections.
1216    ///
1217    /// This method establishes connections for all streaming clients that have active
1218    /// subscriptions (market and price). Each client runs in its own task and
1219    /// all connections are maintained until a shutdown signal is received.
1220    ///
1221    /// # Arguments
1222    ///
1223    /// * `shutdown_signal` - Optional signal to gracefully shutdown all connections.
1224    ///   If None, a default signal handler for SIGINT/SIGTERM will be created.
1225    ///
1226    /// # Returns
1227    ///
1228    /// Returns `Ok(())` when all connections are closed gracefully, or an error if
1229    /// any connection fails after maximum retry attempts.
1230    ///
1231    pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1232        // Use provided signal or create a new one with signal hooks
1233        let signal = if let Some(sig) = shutdown_signal {
1234            sig
1235        } else {
1236            let sig = Arc::new(Notify::new());
1237            setup_signal_hook(Arc::clone(&sig)).await;
1238            sig
1239        };
1240
1241        let mut tasks = Vec::new();
1242
1243        // Connect market streamer only if there are active subscriptions
1244        if self.has_market_stream_subs {
1245            if let Some(client) = self.market_streamer_client.as_ref() {
1246                let client = Arc::clone(client);
1247                let signal = Arc::clone(&signal);
1248                let task =
1249                    tokio::spawn(
1250                        async move { Self::connect_client(client, signal, "Market").await },
1251                    );
1252                tasks.push(task);
1253            }
1254        } else {
1255            info!("Skipping Market streamer connection: no active subscriptions");
1256        }
1257
1258        // Connect price streamer only if there are active subscriptions
1259        if self.has_price_stream_subs {
1260            if let Some(client) = self.price_streamer_client.as_ref() {
1261                let client = Arc::clone(client);
1262                let signal = Arc::clone(&signal);
1263                let task =
1264                    tokio::spawn(
1265                        async move { Self::connect_client(client, signal, "Price").await },
1266                    );
1267                tasks.push(task);
1268            }
1269        } else {
1270            info!("Skipping Price streamer connection: no active subscriptions");
1271        }
1272
1273        if tasks.is_empty() {
1274            warn!("No streaming clients selected for connection (no active subscriptions)");
1275            return Ok(());
1276        }
1277
1278        info!("Connecting {} streaming client(s)...", tasks.len());
1279
1280        // Wait for all tasks to complete
1281        let results = futures::future::join_all(tasks).await;
1282
1283        // Check if any task failed
1284        let mut has_error = false;
1285        for (idx, result) in results.iter().enumerate() {
1286            match result {
1287                Ok(Ok(_)) => {
1288                    debug!("Streaming client {} completed successfully", idx);
1289                }
1290                Ok(Err(e)) => {
1291                    error!("Streaming client {} failed: {:?}", idx, e);
1292                    has_error = true;
1293                }
1294                Err(e) => {
1295                    error!("Streaming client {} task panicked: {:?}", idx, e);
1296                    has_error = true;
1297                }
1298            }
1299        }
1300
1301        if has_error {
1302            return Err(AppError::WebSocketError(
1303                "one or more streaming connections failed".to_string(),
1304            ));
1305        }
1306
1307        info!("All streaming connections closed gracefully");
1308        Ok(())
1309    }
1310
1311    /// Internal helper to connect a single Lightstreamer client with retry logic.
1312    async fn connect_client(
1313        client: Arc<Mutex<LightstreamerClient>>,
1314        signal: Arc<Notify>,
1315        client_type: &str,
1316    ) -> Result<(), AppError> {
1317        let mut retry_interval_millis: u64 = 0;
1318        let mut retry_counter: u64 = 0;
1319
1320        while retry_counter < MAX_CONNECTION_ATTEMPTS {
1321            let connect_result = {
1322                let mut client = client.lock().await;
1323                client.connect_direct(Arc::clone(&signal)).await
1324            };
1325
1326            // Convert error to String immediately to avoid Send issues
1327            let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1328
1329            match result_with_string_error {
1330                Ok(_) => {
1331                    info!("{} streamer connected successfully", client_type);
1332                    break;
1333                }
1334                Err(error_msg) => {
1335                    // If server closed because there are no active subscriptions, treat as graceful
1336                    if error_msg.contains("No more requests to fulfill") {
1337                        info!(
1338                            "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1339                            client_type
1340                        );
1341                        return Ok(());
1342                    }
1343
1344                    error!("{} streamer connection failed: {}", client_type, error_msg);
1345
1346                    if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1347                        tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1348                            .await;
1349                        retry_interval_millis =
1350                            (retry_interval_millis + (200 * retry_counter)).min(5000);
1351                        retry_counter += 1;
1352                        warn!(
1353                            "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1354                            client_type,
1355                            retry_counter + 1,
1356                            MAX_CONNECTION_ATTEMPTS,
1357                            retry_interval_millis as f64 / 1000.0
1358                        );
1359                    } else {
1360                        retry_counter += 1;
1361                    }
1362                }
1363            }
1364        }
1365
1366        if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1367            error!(
1368                "{} streamer failed after {} attempts",
1369                client_type, MAX_CONNECTION_ATTEMPTS
1370            );
1371            return Err(AppError::WebSocketError(format!(
1372                "{} streamer: maximum connection attempts ({}) exceeded",
1373                client_type, MAX_CONNECTION_ATTEMPTS
1374            )));
1375        }
1376
1377        info!("{} streamer connection closed gracefully", client_type);
1378        Ok(())
1379    }
1380
1381    /// Disconnects all active Lightstreamer clients.
1382    ///
1383    /// This method gracefully closes all streaming connections (market and price).
1384    ///
1385    /// # Returns
1386    ///
1387    /// Returns `Ok(())` if all disconnections were successful.
1388    pub async fn disconnect(&mut self) -> Result<(), AppError> {
1389        let mut disconnected = 0;
1390
1391        if let Some(client) = self.market_streamer_client.as_ref() {
1392            let mut client = client.lock().await;
1393            client.disconnect().await;
1394            info!("Market streamer disconnected");
1395            disconnected += 1;
1396        }
1397
1398        if let Some(client) = self.price_streamer_client.as_ref() {
1399            let mut client = client.lock().await;
1400            client.disconnect().await;
1401            info!("Price streamer disconnected");
1402            disconnected += 1;
1403        }
1404
1405        info!("Disconnected {} streaming client(s)", disconnected);
1406        Ok(())
1407    }
1408}