ig_client/application/
client.rs

1/******************************************************************************
2   Author: Joaquín Béjar García
3   Email: jb@taunais.com
4   Date: 19/10/25
5******************************************************************************/
6use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14    ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17    ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
18};
19use crate::model::responses::{
20    DBEntryResponse, HistoricalPricesResponse, MarketNavigationResponse, MarketSearchResponse,
21    MultipleMarketDetailsResponse,
22};
23use crate::model::streaming::{
24    StreamingAccountDataField, StreamingMarketField, StreamingPriceField,
25    get_streaming_account_data_fields, get_streaming_market_fields, get_streaming_price_fields,
26};
27use crate::prelude::{
28    AccountActivityResponse, AccountFields, AccountsResponse, OrderConfirmationResponse,
29    PositionsResponse, TradeFields, TransactionHistoryResponse, WorkingOrdersResponse,
30};
31use crate::presentation::market::{MarketData, MarketDetails};
32use crate::presentation::price::PriceData;
33use async_trait::async_trait;
34use lightstreamer_rs::client::{LightstreamerClient, Transport};
35use lightstreamer_rs::subscription::{
36    ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
37};
38use lightstreamer_rs::utils::setup_signal_hook;
39use serde_json::Value;
40use std::collections::HashSet;
41use std::sync::Arc;
42use std::time::Duration;
43use tokio::sync::{Mutex, Notify, RwLock, mpsc};
44use tokio::time::sleep;
45use tracing::{debug, error, info, warn};
46
47const MAX_CONNECTION_ATTEMPTS: u64 = 3;
48
49/// Main client for interacting with IG Markets API
50///
51/// This client provides a unified interface for all IG Markets API operations,
52/// including market data, account management, and order execution.
53pub struct Client {
54    http_client: Arc<HttpClient>,
55}
56
57impl Client {
58    /// Creates a new client instance
59    ///
60    /// # Returns
61    /// A new Client with default configuration
62    pub fn new() -> Self {
63        let http_client = Arc::new(HttpClient::default());
64        Self { http_client }
65    }
66
67    /// Gets WebSocket connection information for Lightstreamer
68    ///
69    /// # Returns
70    /// * `WebsocketInfo` containing server endpoint, authentication tokens, and account ID
71    pub async fn get_ws_info(&self) -> WebsocketInfo {
72        self.http_client.get_ws_info().await
73    }
74}
75
76impl Default for Client {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82#[async_trait]
83impl MarketService for Client {
84    async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
85        let path = format!("markets?searchTerm={}", search_term);
86        info!("Searching markets with term: {}", search_term);
87        let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
88        debug!("{} markets found", result.markets.len());
89        Ok(result)
90    }
91
92    async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
93        let path = format!("markets/{epic}");
94        info!("Getting market details: {}", epic);
95        let market_value: Value = self.http_client.get(&path, Some(3)).await?;
96        let market_details: MarketDetails = serde_json::from_value(market_value)?;
97        debug!("Market details obtained for: {}", epic);
98        Ok(market_details)
99    }
100
101    async fn get_multiple_market_details(
102        &self,
103        epics: &[String],
104    ) -> Result<MultipleMarketDetailsResponse, AppError> {
105        if epics.is_empty() {
106            return Ok(MultipleMarketDetailsResponse::default());
107        } else if epics.len() > 50 {
108            return Err(AppError::InvalidInput(
109                "The maximum number of EPICs is 50".to_string(),
110            ));
111        }
112
113        let epics_str = epics.join(",");
114        let path = format!("markets?epics={}", epics_str);
115        debug!(
116            "Getting market details for {} EPICs in a batch",
117            epics.len()
118        );
119
120        let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
121
122        Ok(response)
123    }
124
125    async fn get_historical_prices(
126        &self,
127        epic: &str,
128        resolution: &str,
129        from: &str,
130        to: &str,
131    ) -> Result<HistoricalPricesResponse, AppError> {
132        let path = format!(
133            "prices/{}?resolution={}&from={}&to={}",
134            epic, resolution, from, to
135        );
136        info!("Getting historical prices for: {}", epic);
137        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
138        debug!("Historical prices obtained for: {}", epic);
139        Ok(result)
140    }
141
142    async fn get_historical_prices_by_date_range(
143        &self,
144        epic: &str,
145        resolution: &str,
146        start_date: &str,
147        end_date: &str,
148    ) -> Result<HistoricalPricesResponse, AppError> {
149        let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
150        info!(
151            "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
152            epic, resolution, start_date, end_date
153        );
154        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
155        debug!(
156            "Historical prices obtained for epic: {}, {} data points",
157            epic,
158            result.prices.len()
159        );
160        Ok(result)
161    }
162
163    async fn get_recent_prices(
164        &self,
165        params: &RecentPricesRequest<'_>,
166    ) -> Result<HistoricalPricesResponse, AppError> {
167        let mut query_params = Vec::new();
168
169        if let Some(res) = params.resolution {
170            query_params.push(format!("resolution={}", res));
171        }
172        if let Some(f) = params.from {
173            query_params.push(format!("from={}", f));
174        }
175        if let Some(t) = params.to {
176            query_params.push(format!("to={}", t));
177        }
178        if let Some(max) = params.max_points {
179            query_params.push(format!("max={}", max));
180        }
181        if let Some(size) = params.page_size {
182            query_params.push(format!("pageSize={}", size));
183        }
184        if let Some(num) = params.page_number {
185            query_params.push(format!("pageNumber={}", num));
186        }
187
188        let query_string = if query_params.is_empty() {
189            String::new()
190        } else {
191            format!("?{}", query_params.join("&"))
192        };
193
194        let path = format!("prices/{}{}", params.epic, query_string);
195        info!("Getting recent prices for epic: {}", params.epic);
196        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
197        debug!(
198            "Recent prices obtained for epic: {}, {} data points",
199            params.epic,
200            result.prices.len()
201        );
202        Ok(result)
203    }
204
205    async fn get_historical_prices_by_count_v1(
206        &self,
207        epic: &str,
208        resolution: &str,
209        num_points: i32,
210    ) -> Result<HistoricalPricesResponse, AppError> {
211        let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
212        info!(
213            "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
214            epic, resolution, num_points
215        );
216        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
217        debug!(
218            "Historical prices (v1) obtained for epic: {}, {} data points",
219            epic,
220            result.prices.len()
221        );
222        Ok(result)
223    }
224
225    async fn get_historical_prices_by_count_v2(
226        &self,
227        epic: &str,
228        resolution: &str,
229        num_points: i32,
230    ) -> Result<HistoricalPricesResponse, AppError> {
231        let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
232        info!(
233            "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
234            epic, resolution, num_points
235        );
236        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
237        debug!(
238            "Historical prices (v2) obtained for epic: {}, {} data points",
239            epic,
240            result.prices.len()
241        );
242        Ok(result)
243    }
244
245    async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
246        let path = "marketnavigation";
247        info!("Getting top-level market navigation nodes");
248        let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
249        debug!("{} navigation nodes found", result.nodes.len());
250        debug!("{} markets found at root level", result.markets.len());
251        Ok(result)
252    }
253
254    async fn get_market_navigation_node(
255        &self,
256        node_id: &str,
257    ) -> Result<MarketNavigationResponse, AppError> {
258        let path = format!("marketnavigation/{}", node_id);
259        info!("Getting market navigation node: {}", node_id);
260        let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
261        debug!("{} child nodes found", result.nodes.len());
262        debug!("{} markets found in node {}", result.markets.len(), node_id);
263        Ok(result)
264    }
265
266    async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
267        let max_depth = 6;
268        info!(
269            "Starting comprehensive market hierarchy traversal (max {} levels)",
270            max_depth
271        );
272
273        let root_response = self.get_market_navigation().await?;
274        info!(
275            "Root navigation: {} nodes, {} markets at top level",
276            root_response.nodes.len(),
277            root_response.markets.len()
278        );
279
280        let mut all_markets = root_response.markets.clone();
281        let mut nodes_to_process = root_response.nodes.clone();
282        let mut processed_levels = 0;
283
284        while !nodes_to_process.is_empty() && processed_levels < max_depth {
285            let mut next_level_nodes = Vec::new();
286            let mut level_market_count = 0;
287
288            info!(
289                "Processing level {} with {} nodes",
290                processed_levels,
291                nodes_to_process.len()
292            );
293
294            for node in &nodes_to_process {
295                match self.get_market_navigation_node(&node.id).await {
296                    Ok(node_response) => {
297                        let node_markets = node_response.markets.len();
298                        let node_children = node_response.nodes.len();
299
300                        if node_markets > 0 || node_children > 0 {
301                            debug!(
302                                "Node '{}' (level {}): {} markets, {} child nodes",
303                                node.name, processed_levels, node_markets, node_children
304                            );
305                        }
306
307                        all_markets.extend(node_response.markets);
308                        level_market_count += node_markets;
309                        next_level_nodes.extend(node_response.nodes);
310                    }
311                    Err(e) => {
312                        tracing::error!(
313                            "Failed to get markets for node '{}' at level {}: {:?}",
314                            node.name,
315                            processed_levels,
316                            e
317                        );
318                    }
319                }
320            }
321
322            info!(
323                "Level {} completed: {} markets found, {} nodes for next level",
324                processed_levels,
325                level_market_count,
326                next_level_nodes.len()
327            );
328
329            nodes_to_process = next_level_nodes;
330            processed_levels += 1;
331        }
332
333        info!(
334            "Market hierarchy traversal completed: {} total markets found across {} levels",
335            all_markets.len(),
336            processed_levels
337        );
338
339        Ok(all_markets)
340    }
341
342    async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
343        info!("Getting all markets from hierarchy for DB entries");
344
345        let all_markets = self.get_all_markets().await?;
346        info!("Collected {} markets from hierarchy", all_markets.len());
347
348        let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
349            .iter()
350            .map(DBEntryResponse::from)
351            .filter(|entry| !entry.epic.is_empty())
352            .collect();
353
354        info!("Created {} DB entries from markets", vec_db_entries.len());
355
356        // Collect unique symbols
357        let unique_symbols: std::collections::HashSet<String> = vec_db_entries
358            .iter()
359            .map(|entry| entry.symbol.clone())
360            .filter(|symbol| !symbol.is_empty())
361            .collect();
362
363        info!(
364            "Found {} unique symbols to fetch expiry dates for",
365            unique_symbols.len()
366        );
367
368        let mut symbol_expiry_map: std::collections::HashMap<String, String> =
369            std::collections::HashMap::new();
370
371        for symbol in unique_symbols {
372            if let Some(entry) = vec_db_entries
373                .iter()
374                .find(|e| e.symbol == symbol && !e.epic.is_empty())
375            {
376                match self.get_market_details(&entry.epic).await {
377                    Ok(market_details) => {
378                        let expiry_date = market_details
379                            .instrument
380                            .expiry_details
381                            .as_ref()
382                            .map(|details| details.last_dealing_date.clone())
383                            .unwrap_or_else(|| market_details.instrument.expiry.clone());
384
385                        symbol_expiry_map.insert(symbol.clone(), expiry_date);
386                        info!(
387                            "Fetched expiry date for symbol {}: {}",
388                            symbol,
389                            symbol_expiry_map.get(&symbol).unwrap()
390                        );
391                    }
392                    Err(e) => {
393                        tracing::error!(
394                            "Failed to get market details for epic {} (symbol {}): {:?}",
395                            entry.epic,
396                            symbol,
397                            e
398                        );
399                        symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
400                    }
401                }
402            }
403        }
404
405        for entry in &mut vec_db_entries {
406            if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
407                entry.expiry = expiry_date.clone();
408            }
409        }
410
411        info!("Updated expiry dates for {} entries", vec_db_entries.len());
412        Ok(vec_db_entries)
413    }
414}
415
416#[async_trait]
417impl AccountService for Client {
418    async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
419        info!("Getting account information");
420        let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
421        debug!(
422            "Account information obtained: {} accounts",
423            result.accounts.len()
424        );
425        Ok(result)
426    }
427
428    async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
429        debug!("Getting open positions");
430        let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
431        debug!("Positions obtained: {} positions", result.positions.len());
432        Ok(result)
433    }
434
435    async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
436        debug!("Getting open positions with filter: {}", filter);
437        let mut positions = self.get_positions().await?;
438
439        positions
440            .positions
441            .retain(|position| position.market.epic.contains(filter));
442
443        debug!(
444            "Positions obtained after filtering: {} positions",
445            positions.positions.len()
446        );
447        Ok(positions)
448    }
449
450    async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
451        info!("Getting working orders");
452        let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
453        debug!(
454            "Working orders obtained: {} orders",
455            result.working_orders.len()
456        );
457        Ok(result)
458    }
459
460    async fn get_activity(
461        &self,
462        from: &str,
463        to: &str,
464    ) -> Result<AccountActivityResponse, AppError> {
465        let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
466        info!("Getting account activity");
467        let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
468        debug!(
469            "Account activity obtained: {} activities",
470            result.activities.len()
471        );
472        Ok(result)
473    }
474
475    async fn get_activity_with_details(
476        &self,
477        from: &str,
478        to: &str,
479    ) -> Result<AccountActivityResponse, AppError> {
480        let path = format!(
481            "history/activity?from={}&to={}&detailed=true&pageSize=500",
482            from, to
483        );
484        info!("Getting detailed account activity");
485        let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
486        debug!(
487            "Detailed account activity obtained: {} activities",
488            result.activities.len()
489        );
490        Ok(result)
491    }
492
493    async fn get_transactions(
494        &self,
495        from: &str,
496        to: &str,
497    ) -> Result<TransactionHistoryResponse, AppError> {
498        const PAGE_SIZE: u32 = 200;
499        let mut all_transactions = Vec::new();
500        let mut current_page = 1;
501        #[allow(unused_assignments)]
502        let mut last_metadata = None;
503
504        loop {
505            let path = format!(
506                "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
507                from, to, PAGE_SIZE, current_page
508            );
509            info!("Getting transaction history page {}", current_page);
510
511            let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
512
513            let total_pages = result.metadata.page_data.total_pages as u32;
514            last_metadata = Some(result.metadata);
515            all_transactions.extend(result.transactions);
516
517            if current_page >= total_pages {
518                break;
519            }
520            current_page += 1;
521        }
522
523        debug!(
524            "Total transaction history obtained: {} transactions",
525            all_transactions.len()
526        );
527
528        Ok(TransactionHistoryResponse {
529            transactions: all_transactions,
530            metadata: last_metadata
531                .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
532        })
533    }
534}
535
536#[async_trait]
537impl OrderService for Client {
538    async fn create_order(
539        &self,
540        order: &CreateOrderRequest,
541    ) -> Result<CreateOrderResponse, AppError> {
542        info!("Creating order for: {}", order.epic);
543        let result: CreateOrderResponse = self
544            .http_client
545            .post("positions/otc", order, Some(2))
546            .await?;
547        debug!("Order created with reference: {}", result.deal_reference);
548        Ok(result)
549    }
550
551    async fn get_order_confirmation(
552        &self,
553        deal_reference: &str,
554    ) -> Result<OrderConfirmationResponse, AppError> {
555        let path = format!("confirms/{}", deal_reference);
556        info!("Getting confirmation for order: {}", deal_reference);
557        let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
558        debug!("Confirmation obtained for order: {}", deal_reference);
559        Ok(result)
560    }
561
562    async fn get_order_confirmation_w_retry(
563        &self,
564        deal_reference: &str,
565        retries: u64,
566        delay_ms: u64,
567    ) -> Result<OrderConfirmationResponse, AppError> {
568        let mut attempts = 0;
569        loop {
570            match self.get_order_confirmation(deal_reference).await {
571                Ok(response) => return Ok(response),
572                Err(e) => {
573                    attempts += 1;
574                    if attempts > retries {
575                        return Err(e);
576                    }
577                    warn!(
578                        "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
579                        attempts, retries, e, delay_ms
580                    );
581                    sleep(Duration::from_millis(delay_ms)).await;
582                }
583            }
584        }
585    }
586
587    async fn update_position(
588        &self,
589        deal_id: &str,
590        update: &UpdatePositionRequest,
591    ) -> Result<UpdatePositionResponse, AppError> {
592        let path = format!("positions/otc/{}", deal_id);
593        info!("Updating position: {}", deal_id);
594        let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
595        debug!(
596            "Position updated: {} with deal reference: {}",
597            deal_id, result.deal_reference
598        );
599        Ok(result)
600    }
601
602    async fn close_position(
603        &self,
604        close_request: &ClosePositionRequest,
605    ) -> Result<ClosePositionResponse, AppError> {
606        info!("Closing position");
607
608        // IG API requires POST with _method: DELETE header for closing positions
609        // This is a workaround for HTTP client limitations with DELETE + body
610        let result: ClosePositionResponse = self
611            .http_client
612            .post_with_delete_method("positions/otc", close_request, Some(1))
613            .await?;
614
615        debug!("Position closed with reference: {}", result.deal_reference);
616        Ok(result)
617    }
618
619    async fn create_working_order(
620        &self,
621        order: &CreateWorkingOrderRequest,
622    ) -> Result<CreateWorkingOrderResponse, AppError> {
623        info!("Creating working order for: {}", order.epic);
624        let result: CreateWorkingOrderResponse = self
625            .http_client
626            .post("workingorders/otc", order, Some(2))
627            .await?;
628        debug!(
629            "Working order created with reference: {}",
630            result.deal_reference
631        );
632        Ok(result)
633    }
634
635    async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
636        let path = format!("workingorders/otc/{}", deal_id);
637        let result: CreateWorkingOrderResponse =
638            self.http_client.delete(path.as_str(), Some(2)).await?;
639        debug!(
640            "Working order created with reference: {}",
641            result.deal_reference
642        );
643        Ok(())
644    }
645}
646
647/// Streaming client for IG Markets real-time data.
648///
649/// This client manages two Lightstreamer connections for different data types:
650/// - **Market streamer**: Handles market data (prices, market state), trade updates (CONFIRMS, OPU, WOU),
651///   and account updates (positions, orders, balance). Uses the default adapter.
652/// - **Price streamer**: Handles detailed price data (bid/ask levels, sizes, multiple currencies).
653///   Uses the "Pricing" adapter.
654///
655/// Each connection type can be managed independently and runs in parallel.
656pub struct StreamerClient {
657    account_id: String,
658    market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
659    price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
660    // Flags indicating whether there is at least one active subscription for each client
661    has_market_stream_subs: bool,
662    has_price_stream_subs: bool,
663}
664
665impl StreamerClient {
666    /// Creates a new streaming client instance.
667    ///
668    /// This initializes both streaming clients (market and price) but does not
669    /// establish connections yet. Connections are established when `connect()` is called.
670    ///
671    /// # Returns
672    ///
673    /// Returns a new `StreamerClient` instance or an error if initialization fails.
674    pub async fn new() -> Result<Self, AppError> {
675        let http_client_raw = Arc::new(RwLock::new(Client::new()));
676        let http_client = http_client_raw.read().await;
677        let ws_info = http_client.get_ws_info().await;
678        let password = ws_info.get_ws_password();
679
680        // Market data client (no adapter specified - uses default)
681        let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
682            Some(ws_info.server.as_str()),
683            None,
684            Some(&ws_info.account_id),
685            Some(&password),
686        )?));
687
688        let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
689            Some(ws_info.server.as_str()),
690            None,
691            Some(&ws_info.account_id),
692            Some(&password),
693        )?));
694
695        // Force WebSocket streaming transport on both clients to satisfy IG requirements
696        {
697            let mut client = market_streamer_client.lock().await;
698            client
699                .connection_options
700                .set_forced_transport(Some(Transport::WsStreaming));
701        }
702        {
703            let mut client = price_streamer_client.lock().await;
704            client
705                .connection_options
706                .set_forced_transport(Some(Transport::WsStreaming));
707        }
708
709        Ok(Self {
710            account_id: ws_info.account_id.clone(),
711            market_streamer_client: Some(market_streamer_client),
712            price_streamer_client: Some(price_streamer_client),
713            has_market_stream_subs: false,
714            has_price_stream_subs: false,
715        })
716    }
717
718    /// Creates a default streaming client instance.
719    pub async fn default() -> Result<Self, AppError> {
720        Self::new().await
721    }
722
723    /// Subscribes to market data updates for the specified instruments.
724    ///
725    /// This method creates a subscription to receive real-time market data updates
726    /// for the given EPICs and returns a channel receiver for consuming the updates.
727    ///
728    /// # Arguments
729    ///
730    /// * `epics` - List of instrument EPICs to subscribe to
731    /// * `fields` - Set of market data fields to receive (e.g., BID, OFFER, etc.)
732    ///
733    /// # Returns
734    ///
735    /// Returns a receiver channel for `PriceData` updates, or an error if
736    /// the subscription setup failed.
737    ///
738    /// # Examples
739    ///
740    /// ```ignore
741    /// let mut receiver = client.market_subscribe(
742    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
743    ///     fields
744    /// ).await?;
745    ///
746    /// tokio::spawn(async move {
747    ///     while let Some(price_data) = receiver.recv().await {
748    ///         println!("Price update: {:?}", price_data);
749    ///     }
750    /// });
751    /// ```
752    pub async fn market_subscribe(
753        &mut self,
754        epics: Vec<String>,
755        fields: HashSet<StreamingMarketField>,
756    ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
757        // Mark that we have at least one subscription on the market streamer
758        self.has_market_stream_subs = true;
759
760        let fields = get_streaming_market_fields(&fields);
761        let market_epics: Vec<String> = epics
762            .iter()
763            .map(|epic| "MARKET:".to_string() + epic)
764            .collect();
765        let mut subscription =
766            Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
767
768        subscription.set_data_adapter(None)?;
769        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
770
771        // Create channel listener that converts ItemUpdate to PriceData
772        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
773        subscription.add_listener(Box::new(listener));
774
775        // Configure client and add subscription
776        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
777            AppError::WebSocketError("market streamer client not initialized".to_string())
778        })?;
779
780        {
781            let mut client = client.lock().await;
782            client
783                .connection_options
784                .set_forced_transport(Some(Transport::WsStreaming));
785            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
786        }
787
788        // Create a channel for PriceData and spawn a task to convert ItemUpdate to PriceData
789        let (price_tx, price_rx) = mpsc::unbounded_channel();
790        tokio::spawn(async move {
791            let mut receiver = item_receiver;
792            while let Some(item_update) = receiver.recv().await {
793                let price_data = PriceData::from(&item_update);
794                let _ = price_tx.send(price_data);
795            }
796        });
797
798        info!(
799            "Market subscription created for {} instruments",
800            epics.len()
801        );
802        Ok(price_rx)
803    }
804
805    /// Subscribes to trade updates for the account.
806    ///
807    /// This method creates a subscription to receive real-time trade confirmations,
808    /// order updates (OPU), and working order updates (WOU) for the account,
809    /// and returns a channel receiver for consuming the updates.
810    ///
811    /// # Returns
812    ///
813    /// Returns a receiver channel for `TradeFields` updates, or an error if
814    /// the subscription setup failed.
815    ///
816    /// # Examples
817    ///
818    /// ```ignore
819    /// let mut receiver = client.trade_subscribe().await?;
820    ///
821    /// tokio::spawn(async move {
822    ///     while let Some(trade_fields) = receiver.recv().await {
823    ///         println!("Trade update: {:?}", trade_fields);
824    ///     }
825    /// });
826    /// ```
827    pub async fn trade_subscribe(
828        &mut self,
829    ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
830        // Mark that we have at least one subscription on the market streamer
831        self.has_market_stream_subs = true;
832
833        let account_id = self.account_id.clone();
834        let fields = Some(vec![
835            "CONFIRMS".to_string(),
836            "OPU".to_string(),
837            "WOU".to_string(),
838        ]);
839        let trade_items = vec![format!("TRADE:{account_id}")];
840
841        let mut subscription =
842            Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
843
844        subscription.set_data_adapter(None)?;
845        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
846
847        // Create channel listener
848        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
849        subscription.add_listener(Box::new(listener));
850
851        // Configure client and add subscription (reusing market_streamer_client)
852        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
853            AppError::WebSocketError("market streamer client not initialized".to_string())
854        })?;
855
856        {
857            let mut client = client.lock().await;
858            client
859                .connection_options
860                .set_forced_transport(Some(Transport::WsStreaming));
861            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
862        }
863
864        // Create a channel for TradeFields and spawn a task to convert ItemUpdate to TradeFields
865        let (trade_tx, trade_rx) = mpsc::unbounded_channel();
866        tokio::spawn(async move {
867            let mut receiver = item_receiver;
868            while let Some(item_update) = receiver.recv().await {
869                let trade_data = crate::presentation::trade::TradeData::from(&item_update);
870                let _ = trade_tx.send(trade_data.fields);
871            }
872        });
873
874        info!("Trade subscription created for account: {}", account_id);
875        Ok(trade_rx)
876    }
877
878    /// Subscribes to account data updates.
879    ///
880    /// This method creates a subscription to receive real-time account updates including
881    /// profit/loss, margin, equity, available funds, and other account metrics,
882    /// and returns a channel receiver for consuming the updates.
883    ///
884    /// # Arguments
885    ///
886    /// * `fields` - Set of account data fields to receive (e.g., PNL, MARGIN, EQUITY, etc.)
887    ///
888    /// # Returns
889    ///
890    /// Returns a receiver channel for `AccountFields` updates, or an error if
891    /// the subscription setup failed.
892    ///
893    /// # Examples
894    ///
895    /// ```ignore
896    /// let mut receiver = client.account_subscribe(fields).await?;
897    ///
898    /// tokio::spawn(async move {
899    ///     while let Some(account_fields) = receiver.recv().await {
900    ///         println!("Account update: {:?}", account_fields);
901    ///     }
902    /// });
903    /// ```
904    pub async fn account_subscribe(
905        &mut self,
906        fields: HashSet<StreamingAccountDataField>,
907    ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
908        // Mark that we have at least one subscription on the market streamer
909        self.has_market_stream_subs = true;
910
911        let fields = get_streaming_account_data_fields(&fields);
912        let account_id = self.account_id.clone();
913        let account_items = vec![format!("ACCOUNT:{account_id}")];
914
915        let mut subscription =
916            Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
917
918        subscription.set_data_adapter(None)?;
919        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
920
921        // Create channel listener
922        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
923        subscription.add_listener(Box::new(listener));
924
925        // Configure client and add subscription (reusing market_streamer_client)
926        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
927            AppError::WebSocketError("market streamer client not initialized".to_string())
928        })?;
929
930        {
931            let mut client = client.lock().await;
932            client
933                .connection_options
934                .set_forced_transport(Some(Transport::WsStreaming));
935            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
936        }
937
938        // Create a channel for AccountFields and spawn a task to convert ItemUpdate to AccountFields
939        let (account_tx, account_rx) = mpsc::unbounded_channel();
940        tokio::spawn(async move {
941            let mut receiver = item_receiver;
942            while let Some(item_update) = receiver.recv().await {
943                let account_data = crate::presentation::account::AccountData::from(&item_update);
944                let _ = account_tx.send(account_data.fields);
945            }
946        });
947
948        info!("Account subscription created for account: {}", account_id);
949        Ok(account_rx)
950    }
951
952    /// Subscribes to price data updates for the specified instruments.
953    ///
954    /// This method creates a subscription to receive real-time price updates including
955    /// bid/ask prices, sizes, and multiple currency levels for the given EPICs,
956    /// and returns a channel receiver for consuming the updates.
957    ///
958    /// # Arguments
959    ///
960    /// * `epics` - List of instrument EPICs to subscribe to
961    /// * `fields` - Set of price data fields to receive (e.g., BID_PRICE1, ASK_PRICE1, etc.)
962    ///
963    /// # Returns
964    ///
965    /// Returns a receiver channel for `PriceData` updates, or an error if
966    /// the subscription setup failed.
967    ///
968    /// # Examples
969    ///
970    /// ```ignore
971    /// let mut receiver = client.price_subscribe(
972    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
973    ///     fields
974    /// ).await?;
975    ///
976    /// tokio::spawn(async move {
977    ///     while let Some(price_data) = receiver.recv().await {
978    ///         println!("Price update: {:?}", price_data);
979    ///     }
980    /// });
981    /// ```
982    pub async fn price_subscribe(
983        &mut self,
984        epics: Vec<String>,
985        fields: HashSet<StreamingPriceField>,
986    ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
987        // Mark that we have at least one subscription on the price streamer
988        self.has_price_stream_subs = true;
989
990        let fields = get_streaming_price_fields(&fields);
991        let account_id = self.account_id.clone();
992        let price_epics: Vec<String> = epics
993            .iter()
994            .map(|epic| format!("PRICE:{account_id}:{epic}"))
995            .collect();
996
997        // Debug what we are about to subscribe to (items and fields)
998        tracing::debug!("Pricing subscribe items: {:?}", price_epics);
999        tracing::debug!("Pricing subscribe fields: {:?}", fields);
1000
1001        let mut subscription =
1002            Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1003
1004        // Allow overriding the Pricing adapter name via env var to match server config
1005        let pricing_adapter =
1006            std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1007        tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1008        subscription.set_data_adapter(Some(pricing_adapter))?;
1009        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1010
1011        // Create channel listener
1012        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1013        subscription.add_listener(Box::new(listener));
1014
1015        // Configure client and add subscription
1016        let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1017            AppError::WebSocketError("price streamer client not initialized".to_string())
1018        })?;
1019
1020        {
1021            let mut client = client.lock().await;
1022            client
1023                .connection_options
1024                .set_forced_transport(Some(Transport::WsStreaming));
1025            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1026        }
1027
1028        // Create a channel for PriceData and spawn a task to convert ItemUpdate to PriceData
1029        let (price_tx, price_rx) = mpsc::unbounded_channel();
1030        tokio::spawn(async move {
1031            let mut receiver = item_receiver;
1032            while let Some(item_update) = receiver.recv().await {
1033                let price_data = PriceData::from(&item_update);
1034                let _ = price_tx.send(price_data);
1035            }
1036        });
1037
1038        info!(
1039            "Price subscription created for {} instruments (account: {})",
1040            epics.len(),
1041            account_id
1042        );
1043        Ok(price_rx)
1044    }
1045
1046    /// Connects all active Lightstreamer clients and maintains the connections.
1047    ///
1048    /// This method establishes connections for all streaming clients that have active
1049    /// subscriptions (market and price). Each client runs in its own task and
1050    /// all connections are maintained until a shutdown signal is received.
1051    ///
1052    /// # Arguments
1053    ///
1054    /// * `shutdown_signal` - Optional signal to gracefully shutdown all connections.
1055    ///   If None, a default signal handler for SIGINT/SIGTERM will be created.
1056    ///
1057    /// # Returns
1058    ///
1059    /// Returns `Ok(())` when all connections are closed gracefully, or an error if
1060    /// any connection fails after maximum retry attempts.
1061    ///
1062    pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1063        // Use provided signal or create a new one with signal hooks
1064        let signal = if let Some(sig) = shutdown_signal {
1065            sig
1066        } else {
1067            let sig = Arc::new(Notify::new());
1068            setup_signal_hook(Arc::clone(&sig)).await;
1069            sig
1070        };
1071
1072        let mut tasks = Vec::new();
1073
1074        // Connect market streamer only if there are active subscriptions
1075        if self.has_market_stream_subs {
1076            if let Some(client) = self.market_streamer_client.as_ref() {
1077                let client = Arc::clone(client);
1078                let signal = Arc::clone(&signal);
1079                let task =
1080                    tokio::spawn(
1081                        async move { Self::connect_client(client, signal, "Market").await },
1082                    );
1083                tasks.push(task);
1084            }
1085        } else {
1086            info!("Skipping Market streamer connection: no active subscriptions");
1087        }
1088
1089        // Connect price streamer only if there are active subscriptions
1090        if self.has_price_stream_subs {
1091            if let Some(client) = self.price_streamer_client.as_ref() {
1092                let client = Arc::clone(client);
1093                let signal = Arc::clone(&signal);
1094                let task =
1095                    tokio::spawn(
1096                        async move { Self::connect_client(client, signal, "Price").await },
1097                    );
1098                tasks.push(task);
1099            }
1100        } else {
1101            info!("Skipping Price streamer connection: no active subscriptions");
1102        }
1103
1104        if tasks.is_empty() {
1105            warn!("No streaming clients selected for connection (no active subscriptions)");
1106            return Ok(());
1107        }
1108
1109        info!("Connecting {} streaming client(s)...", tasks.len());
1110
1111        // Wait for all tasks to complete
1112        let results = futures::future::join_all(tasks).await;
1113
1114        // Check if any task failed
1115        let mut has_error = false;
1116        for (idx, result) in results.iter().enumerate() {
1117            match result {
1118                Ok(Ok(_)) => {
1119                    debug!("Streaming client {} completed successfully", idx);
1120                }
1121                Ok(Err(e)) => {
1122                    error!("Streaming client {} failed: {:?}", idx, e);
1123                    has_error = true;
1124                }
1125                Err(e) => {
1126                    error!("Streaming client {} task panicked: {:?}", idx, e);
1127                    has_error = true;
1128                }
1129            }
1130        }
1131
1132        if has_error {
1133            return Err(AppError::WebSocketError(
1134                "one or more streaming connections failed".to_string(),
1135            ));
1136        }
1137
1138        info!("All streaming connections closed gracefully");
1139        Ok(())
1140    }
1141
1142    /// Internal helper to connect a single Lightstreamer client with retry logic.
1143    async fn connect_client(
1144        client: Arc<Mutex<LightstreamerClient>>,
1145        signal: Arc<Notify>,
1146        client_type: &str,
1147    ) -> Result<(), AppError> {
1148        let mut retry_interval_millis: u64 = 0;
1149        let mut retry_counter: u64 = 0;
1150
1151        while retry_counter < MAX_CONNECTION_ATTEMPTS {
1152            let connect_result = {
1153                let mut client = client.lock().await;
1154                client.connect_direct(Arc::clone(&signal)).await
1155            };
1156
1157            // Convert error to String immediately to avoid Send issues
1158            let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1159
1160            match result_with_string_error {
1161                Ok(_) => {
1162                    info!("{} streamer connected successfully", client_type);
1163                    break;
1164                }
1165                Err(error_msg) => {
1166                    // If server closed because there are no active subscriptions, treat as graceful
1167                    if error_msg.contains("No more requests to fulfill") {
1168                        info!(
1169                            "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1170                            client_type
1171                        );
1172                        return Ok(());
1173                    }
1174
1175                    error!("{} streamer connection failed: {}", client_type, error_msg);
1176
1177                    if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1178                        tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1179                            .await;
1180                        retry_interval_millis =
1181                            (retry_interval_millis + (200 * retry_counter)).min(5000);
1182                        retry_counter += 1;
1183                        warn!(
1184                            "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1185                            client_type,
1186                            retry_counter + 1,
1187                            MAX_CONNECTION_ATTEMPTS,
1188                            retry_interval_millis as f64 / 1000.0
1189                        );
1190                    } else {
1191                        retry_counter += 1;
1192                    }
1193                }
1194            }
1195        }
1196
1197        if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1198            error!(
1199                "{} streamer failed after {} attempts",
1200                client_type, MAX_CONNECTION_ATTEMPTS
1201            );
1202            return Err(AppError::WebSocketError(format!(
1203                "{} streamer: maximum connection attempts ({}) exceeded",
1204                client_type, MAX_CONNECTION_ATTEMPTS
1205            )));
1206        }
1207
1208        info!("{} streamer connection closed gracefully", client_type);
1209        Ok(())
1210    }
1211
1212    /// Disconnects all active Lightstreamer clients.
1213    ///
1214    /// This method gracefully closes all streaming connections (market and price).
1215    ///
1216    /// # Returns
1217    ///
1218    /// Returns `Ok(())` if all disconnections were successful.
1219    pub async fn disconnect(&mut self) -> Result<(), AppError> {
1220        let mut disconnected = 0;
1221
1222        if let Some(client) = self.market_streamer_client.as_ref() {
1223            let mut client = client.lock().await;
1224            client.disconnect().await;
1225            info!("Market streamer disconnected");
1226            disconnected += 1;
1227        }
1228
1229        if let Some(client) = self.price_streamer_client.as_ref() {
1230            let mut client = client.lock().await;
1231            client.disconnect().await;
1232            info!("Price streamer disconnected");
1233            disconnected += 1;
1234        }
1235
1236        info!("Disconnected {} streaming client(s)", disconnected);
1237        Ok(())
1238    }
1239}