ig_client/application/
client.rs

1/******************************************************************************
2   Author: Joaquín Béjar García
3   Email: jb@taunais.com
4   Date: 19/10/25
5******************************************************************************/
6use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14    ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17    ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
18};
19use crate::model::responses::{
20    DBEntryResponse, HistoricalPricesResponse, MarketNavigationResponse, MarketSearchResponse,
21    MultipleMarketDetailsResponse,
22};
23use crate::model::streaming::{
24    StreamingAccountDataField, StreamingMarketField, StreamingPriceField,
25    get_streaming_account_data_fields, get_streaming_market_fields, get_streaming_price_fields,
26};
27use crate::prelude::{
28    AccountActivityResponse, AccountFields, AccountsResponse, OrderConfirmationResponse,
29    PositionsResponse, TradeFields, TransactionHistoryResponse, WorkingOrdersResponse,
30};
31use crate::presentation::market::{MarketData, MarketDetails};
32use crate::presentation::price::PriceData;
33use async_trait::async_trait;
34use lightstreamer_rs::client::{LightstreamerClient, Transport};
35use lightstreamer_rs::subscription::{
36    ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
37};
38use lightstreamer_rs::utils::setup_signal_hook;
39use serde_json::Value;
40use std::collections::HashSet;
41use std::sync::Arc;
42use tokio::sync::{Mutex, Notify, RwLock, mpsc};
43use tracing::{debug, error, info, warn};
44
45const MAX_CONNECTION_ATTEMPTS: u64 = 3;
46
47/// Main client for interacting with IG Markets API
48///
49/// This client provides a unified interface for all IG Markets API operations,
50/// including market data, account management, and order execution.
51pub struct Client {
52    http_client: Arc<HttpClient>,
53}
54
55impl Client {
56    /// Creates a new client instance
57    ///
58    /// # Returns
59    /// A new Client with default configuration
60    pub fn new() -> Self {
61        let http_client = Arc::new(HttpClient::default());
62        Self { http_client }
63    }
64
65    /// Gets WebSocket connection information for Lightstreamer
66    ///
67    /// # Returns
68    /// * `WebsocketInfo` containing server endpoint, authentication tokens, and account ID
69    pub async fn get_ws_info(&self) -> WebsocketInfo {
70        self.http_client.get_ws_info().await
71    }
72}
73
74impl Default for Client {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80#[async_trait]
81impl MarketService for Client {
82    async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
83        let path = format!("markets?searchTerm={}", search_term);
84        info!("Searching markets with term: {}", search_term);
85        let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
86        debug!("{} markets found", result.markets.len());
87        Ok(result)
88    }
89
90    async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
91        let path = format!("markets/{epic}");
92        info!("Getting market details: {}", epic);
93        let market_value: Value = self.http_client.get(&path, Some(3)).await?;
94        let market_details: MarketDetails = serde_json::from_value(market_value)?;
95        debug!("Market details obtained for: {}", epic);
96        Ok(market_details)
97    }
98
99    async fn get_multiple_market_details(
100        &self,
101        epics: &[String],
102    ) -> Result<MultipleMarketDetailsResponse, AppError> {
103        if epics.is_empty() {
104            return Ok(MultipleMarketDetailsResponse::default());
105        } else if epics.len() > 50 {
106            return Err(AppError::InvalidInput(
107                "The maximum number of EPICs is 50".to_string(),
108            ));
109        }
110
111        let epics_str = epics.join(",");
112        let path = format!("markets?epics={}", epics_str);
113        debug!(
114            "Getting market details for {} EPICs in a batch",
115            epics.len()
116        );
117
118        let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
119
120        Ok(response)
121    }
122
123    async fn get_historical_prices(
124        &self,
125        epic: &str,
126        resolution: &str,
127        from: &str,
128        to: &str,
129    ) -> Result<HistoricalPricesResponse, AppError> {
130        let path = format!(
131            "prices/{}?resolution={}&from={}&to={}",
132            epic, resolution, from, to
133        );
134        info!("Getting historical prices for: {}", epic);
135        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
136        debug!("Historical prices obtained for: {}", epic);
137        Ok(result)
138    }
139
140    async fn get_historical_prices_by_date_range(
141        &self,
142        epic: &str,
143        resolution: &str,
144        start_date: &str,
145        end_date: &str,
146    ) -> Result<HistoricalPricesResponse, AppError> {
147        let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
148        info!(
149            "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
150            epic, resolution, start_date, end_date
151        );
152        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
153        debug!(
154            "Historical prices obtained for epic: {}, {} data points",
155            epic,
156            result.prices.len()
157        );
158        Ok(result)
159    }
160
161    async fn get_recent_prices(
162        &self,
163        params: &RecentPricesRequest<'_>,
164    ) -> Result<HistoricalPricesResponse, AppError> {
165        let mut query_params = Vec::new();
166
167        if let Some(res) = params.resolution {
168            query_params.push(format!("resolution={}", res));
169        }
170        if let Some(f) = params.from {
171            query_params.push(format!("from={}", f));
172        }
173        if let Some(t) = params.to {
174            query_params.push(format!("to={}", t));
175        }
176        if let Some(max) = params.max_points {
177            query_params.push(format!("max={}", max));
178        }
179        if let Some(size) = params.page_size {
180            query_params.push(format!("pageSize={}", size));
181        }
182        if let Some(num) = params.page_number {
183            query_params.push(format!("pageNumber={}", num));
184        }
185
186        let query_string = if query_params.is_empty() {
187            String::new()
188        } else {
189            format!("?{}", query_params.join("&"))
190        };
191
192        let path = format!("prices/{}{}", params.epic, query_string);
193        info!("Getting recent prices for epic: {}", params.epic);
194        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
195        debug!(
196            "Recent prices obtained for epic: {}, {} data points",
197            params.epic,
198            result.prices.len()
199        );
200        Ok(result)
201    }
202
203    async fn get_historical_prices_by_count_v1(
204        &self,
205        epic: &str,
206        resolution: &str,
207        num_points: i32,
208    ) -> Result<HistoricalPricesResponse, AppError> {
209        let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
210        info!(
211            "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
212            epic, resolution, num_points
213        );
214        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
215        debug!(
216            "Historical prices (v1) obtained for epic: {}, {} data points",
217            epic,
218            result.prices.len()
219        );
220        Ok(result)
221    }
222
223    async fn get_historical_prices_by_count_v2(
224        &self,
225        epic: &str,
226        resolution: &str,
227        num_points: i32,
228    ) -> Result<HistoricalPricesResponse, AppError> {
229        let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
230        info!(
231            "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
232            epic, resolution, num_points
233        );
234        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
235        debug!(
236            "Historical prices (v2) obtained for epic: {}, {} data points",
237            epic,
238            result.prices.len()
239        );
240        Ok(result)
241    }
242
243    async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
244        let path = "marketnavigation";
245        info!("Getting top-level market navigation nodes");
246        let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
247        debug!("{} navigation nodes found", result.nodes.len());
248        debug!("{} markets found at root level", result.markets.len());
249        Ok(result)
250    }
251
252    async fn get_market_navigation_node(
253        &self,
254        node_id: &str,
255    ) -> Result<MarketNavigationResponse, AppError> {
256        let path = format!("marketnavigation/{}", node_id);
257        info!("Getting market navigation node: {}", node_id);
258        let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
259        debug!("{} child nodes found", result.nodes.len());
260        debug!("{} markets found in node {}", result.markets.len(), node_id);
261        Ok(result)
262    }
263
264    async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
265        let max_depth = 6;
266        info!(
267            "Starting comprehensive market hierarchy traversal (max {} levels)",
268            max_depth
269        );
270
271        let root_response = self.get_market_navigation().await?;
272        info!(
273            "Root navigation: {} nodes, {} markets at top level",
274            root_response.nodes.len(),
275            root_response.markets.len()
276        );
277
278        let mut all_markets = root_response.markets.clone();
279        let mut nodes_to_process = root_response.nodes.clone();
280        let mut processed_levels = 0;
281
282        while !nodes_to_process.is_empty() && processed_levels < max_depth {
283            let mut next_level_nodes = Vec::new();
284            let mut level_market_count = 0;
285
286            info!(
287                "Processing level {} with {} nodes",
288                processed_levels,
289                nodes_to_process.len()
290            );
291
292            for node in &nodes_to_process {
293                match self.get_market_navigation_node(&node.id).await {
294                    Ok(node_response) => {
295                        let node_markets = node_response.markets.len();
296                        let node_children = node_response.nodes.len();
297
298                        if node_markets > 0 || node_children > 0 {
299                            debug!(
300                                "Node '{}' (level {}): {} markets, {} child nodes",
301                                node.name, processed_levels, node_markets, node_children
302                            );
303                        }
304
305                        all_markets.extend(node_response.markets);
306                        level_market_count += node_markets;
307                        next_level_nodes.extend(node_response.nodes);
308                    }
309                    Err(e) => {
310                        tracing::error!(
311                            "Failed to get markets for node '{}' at level {}: {:?}",
312                            node.name,
313                            processed_levels,
314                            e
315                        );
316                    }
317                }
318            }
319
320            info!(
321                "Level {} completed: {} markets found, {} nodes for next level",
322                processed_levels,
323                level_market_count,
324                next_level_nodes.len()
325            );
326
327            nodes_to_process = next_level_nodes;
328            processed_levels += 1;
329        }
330
331        info!(
332            "Market hierarchy traversal completed: {} total markets found across {} levels",
333            all_markets.len(),
334            processed_levels
335        );
336
337        Ok(all_markets)
338    }
339
340    async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
341        info!("Getting all markets from hierarchy for DB entries");
342
343        let all_markets = self.get_all_markets().await?;
344        info!("Collected {} markets from hierarchy", all_markets.len());
345
346        let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
347            .iter()
348            .map(DBEntryResponse::from)
349            .filter(|entry| !entry.epic.is_empty())
350            .collect();
351
352        info!("Created {} DB entries from markets", vec_db_entries.len());
353
354        // Collect unique symbols
355        let unique_symbols: std::collections::HashSet<String> = vec_db_entries
356            .iter()
357            .map(|entry| entry.symbol.clone())
358            .filter(|symbol| !symbol.is_empty())
359            .collect();
360
361        info!(
362            "Found {} unique symbols to fetch expiry dates for",
363            unique_symbols.len()
364        );
365
366        let mut symbol_expiry_map: std::collections::HashMap<String, String> =
367            std::collections::HashMap::new();
368
369        for symbol in unique_symbols {
370            if let Some(entry) = vec_db_entries
371                .iter()
372                .find(|e| e.symbol == symbol && !e.epic.is_empty())
373            {
374                match self.get_market_details(&entry.epic).await {
375                    Ok(market_details) => {
376                        let expiry_date = market_details
377                            .instrument
378                            .expiry_details
379                            .as_ref()
380                            .map(|details| details.last_dealing_date.clone())
381                            .unwrap_or_else(|| market_details.instrument.expiry.clone());
382
383                        symbol_expiry_map.insert(symbol.clone(), expiry_date);
384                        info!(
385                            "Fetched expiry date for symbol {}: {}",
386                            symbol,
387                            symbol_expiry_map.get(&symbol).unwrap()
388                        );
389                    }
390                    Err(e) => {
391                        tracing::error!(
392                            "Failed to get market details for epic {} (symbol {}): {:?}",
393                            entry.epic,
394                            symbol,
395                            e
396                        );
397                        symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
398                    }
399                }
400            }
401        }
402
403        for entry in &mut vec_db_entries {
404            if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
405                entry.expiry = expiry_date.clone();
406            }
407        }
408
409        info!("Updated expiry dates for {} entries", vec_db_entries.len());
410        Ok(vec_db_entries)
411    }
412}
413
414#[async_trait]
415impl AccountService for Client {
416    async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
417        info!("Getting account information");
418        let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
419        debug!(
420            "Account information obtained: {} accounts",
421            result.accounts.len()
422        );
423        Ok(result)
424    }
425
426    async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
427        debug!("Getting open positions");
428        let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
429        debug!("Positions obtained: {} positions", result.positions.len());
430        Ok(result)
431    }
432
433    async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
434        debug!("Getting open positions with filter: {}", filter);
435        let mut positions = self.get_positions().await?;
436
437        positions
438            .positions
439            .retain(|position| position.market.epic.contains(filter));
440
441        debug!(
442            "Positions obtained after filtering: {} positions",
443            positions.positions.len()
444        );
445        Ok(positions)
446    }
447
448    async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
449        info!("Getting working orders");
450        let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
451        debug!(
452            "Working orders obtained: {} orders",
453            result.working_orders.len()
454        );
455        Ok(result)
456    }
457
458    async fn get_activity(
459        &self,
460        from: &str,
461        to: &str,
462    ) -> Result<AccountActivityResponse, AppError> {
463        let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
464        info!("Getting account activity");
465        let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
466        debug!(
467            "Account activity obtained: {} activities",
468            result.activities.len()
469        );
470        Ok(result)
471    }
472
473    async fn get_activity_with_details(
474        &self,
475        from: &str,
476        to: &str,
477    ) -> Result<AccountActivityResponse, AppError> {
478        let path = format!(
479            "history/activity?from={}&to={}&detailed=true&pageSize=500",
480            from, to
481        );
482        info!("Getting detailed account activity");
483        let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
484        debug!(
485            "Detailed account activity obtained: {} activities",
486            result.activities.len()
487        );
488        Ok(result)
489    }
490
491    async fn get_transactions(
492        &self,
493        from: &str,
494        to: &str,
495    ) -> Result<TransactionHistoryResponse, AppError> {
496        const PAGE_SIZE: u32 = 200;
497        let mut all_transactions = Vec::new();
498        let mut current_page = 1;
499        #[allow(unused_assignments)]
500        let mut last_metadata = None;
501
502        loop {
503            let path = format!(
504                "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
505                from, to, PAGE_SIZE, current_page
506            );
507            info!("Getting transaction history page {}", current_page);
508
509            let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
510
511            let total_pages = result.metadata.page_data.total_pages as u32;
512            last_metadata = Some(result.metadata);
513            all_transactions.extend(result.transactions);
514
515            if current_page >= total_pages {
516                break;
517            }
518            current_page += 1;
519        }
520
521        debug!(
522            "Total transaction history obtained: {} transactions",
523            all_transactions.len()
524        );
525
526        Ok(TransactionHistoryResponse {
527            transactions: all_transactions,
528            metadata: last_metadata
529                .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
530        })
531    }
532}
533
534#[async_trait]
535impl OrderService for Client {
536    async fn create_order(
537        &self,
538        order: &CreateOrderRequest,
539    ) -> Result<CreateOrderResponse, AppError> {
540        info!("Creating order for: {}", order.epic);
541        let result: CreateOrderResponse = self
542            .http_client
543            .post("positions/otc", order, Some(2))
544            .await?;
545        debug!("Order created with reference: {}", result.deal_reference);
546        Ok(result)
547    }
548
549    async fn get_order_confirmation(
550        &self,
551        deal_reference: &str,
552    ) -> Result<OrderConfirmationResponse, AppError> {
553        let path = format!("confirms/{}", deal_reference);
554        info!("Getting confirmation for order: {}", deal_reference);
555        let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
556        debug!("Confirmation obtained for order: {}", deal_reference);
557        Ok(result)
558    }
559
560    async fn update_position(
561        &self,
562        deal_id: &str,
563        update: &UpdatePositionRequest,
564    ) -> Result<UpdatePositionResponse, AppError> {
565        let path = format!("positions/otc/{}", deal_id);
566        info!("Updating position: {}", deal_id);
567        let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
568        debug!(
569            "Position updated: {} with deal reference: {}",
570            deal_id, result.deal_reference
571        );
572        Ok(result)
573    }
574
575    async fn close_position(
576        &self,
577        close_request: &ClosePositionRequest,
578    ) -> Result<ClosePositionResponse, AppError> {
579        info!("Closing position");
580
581        // IG API requires POST with _method: DELETE header for closing positions
582        // This is a workaround for HTTP client limitations with DELETE + body
583        let result: ClosePositionResponse = self
584            .http_client
585            .post_with_delete_method("positions/otc", close_request, Some(1))
586            .await?;
587
588        debug!("Position closed with reference: {}", result.deal_reference);
589        Ok(result)
590    }
591
592    async fn create_working_order(
593        &self,
594        order: &CreateWorkingOrderRequest,
595    ) -> Result<CreateWorkingOrderResponse, AppError> {
596        info!("Creating working order for: {}", order.epic);
597        let result: CreateWorkingOrderResponse = self
598            .http_client
599            .post("workingorders/otc", order, Some(2))
600            .await?;
601        debug!(
602            "Working order created with reference: {}",
603            result.deal_reference
604        );
605        Ok(result)
606    }
607
608    async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
609        let path = format!("workingorders/otc/{}", deal_id);
610        let result: CreateWorkingOrderResponse =
611            self.http_client.delete(path.as_str(), Some(2)).await?;
612        debug!(
613            "Working order created with reference: {}",
614            result.deal_reference
615        );
616        Ok(())
617    }
618}
619
620/// Streaming client for IG Markets real-time data.
621///
622/// This client manages two Lightstreamer connections for different data types:
623/// - **Market streamer**: Handles market data (prices, market state), trade updates (CONFIRMS, OPU, WOU),
624///   and account updates (positions, orders, balance). Uses the default adapter.
625/// - **Price streamer**: Handles detailed price data (bid/ask levels, sizes, multiple currencies).
626///   Uses the "Pricing" adapter.
627///
628/// Each connection type can be managed independently and runs in parallel.
629pub struct StreamerClient {
630    account_id: String,
631    market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
632    price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
633    // Flags indicating whether there is at least one active subscription for each client
634    has_market_stream_subs: bool,
635    has_price_stream_subs: bool,
636}
637
638impl StreamerClient {
639    /// Creates a new streaming client instance.
640    ///
641    /// This initializes both streaming clients (market and price) but does not
642    /// establish connections yet. Connections are established when `connect()` is called.
643    ///
644    /// # Returns
645    ///
646    /// Returns a new `StreamerClient` instance or an error if initialization fails.
647    pub async fn new() -> Result<Self, AppError> {
648        let http_client_raw = Arc::new(RwLock::new(Client::new()));
649        let http_client = http_client_raw.read().await;
650        let ws_info = http_client.get_ws_info().await;
651        let password = ws_info.get_ws_password();
652
653        // Market data client (no adapter specified - uses default)
654        let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
655            Some(ws_info.server.as_str()),
656            None,
657            Some(&ws_info.account_id),
658            Some(&password),
659        )?));
660
661        let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
662            Some(ws_info.server.as_str()),
663            None,
664            Some(&ws_info.account_id),
665            Some(&password),
666        )?));
667
668        // Force WebSocket streaming transport on both clients to satisfy IG requirements
669        {
670            let mut client = market_streamer_client.lock().await;
671            client
672                .connection_options
673                .set_forced_transport(Some(Transport::WsStreaming));
674        }
675        {
676            let mut client = price_streamer_client.lock().await;
677            client
678                .connection_options
679                .set_forced_transport(Some(Transport::WsStreaming));
680        }
681
682        Ok(Self {
683            account_id: ws_info.account_id.clone(),
684            market_streamer_client: Some(market_streamer_client),
685            price_streamer_client: Some(price_streamer_client),
686            has_market_stream_subs: false,
687            has_price_stream_subs: false,
688        })
689    }
690
691    /// Creates a default streaming client instance.
692    pub async fn default() -> Result<Self, AppError> {
693        Self::new().await
694    }
695
696    /// Subscribes to market data updates for the specified instruments.
697    ///
698    /// This method creates a subscription to receive real-time market data updates
699    /// for the given EPICs and returns a channel receiver for consuming the updates.
700    ///
701    /// # Arguments
702    ///
703    /// * `epics` - List of instrument EPICs to subscribe to
704    /// * `fields` - Set of market data fields to receive (e.g., BID, OFFER, etc.)
705    ///
706    /// # Returns
707    ///
708    /// Returns a receiver channel for `PriceData` updates, or an error if
709    /// the subscription setup failed.
710    ///
711    /// # Examples
712    ///
713    /// ```ignore
714    /// let mut receiver = client.market_subscribe(
715    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
716    ///     fields
717    /// ).await?;
718    ///
719    /// tokio::spawn(async move {
720    ///     while let Some(price_data) = receiver.recv().await {
721    ///         println!("Price update: {:?}", price_data);
722    ///     }
723    /// });
724    /// ```
725    pub async fn market_subscribe(
726        &mut self,
727        epics: Vec<String>,
728        fields: HashSet<StreamingMarketField>,
729    ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
730        // Mark that we have at least one subscription on the market streamer
731        self.has_market_stream_subs = true;
732
733        let fields = get_streaming_market_fields(&fields);
734        let market_epics: Vec<String> = epics
735            .iter()
736            .map(|epic| "MARKET:".to_string() + epic)
737            .collect();
738        let mut subscription =
739            Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
740
741        subscription.set_data_adapter(None)?;
742        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
743
744        // Create channel listener that converts ItemUpdate to PriceData
745        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
746        subscription.add_listener(Box::new(listener));
747
748        // Configure client and add subscription
749        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
750            AppError::WebSocketError("market streamer client not initialized".to_string())
751        })?;
752
753        {
754            let mut client = client.lock().await;
755            client
756                .connection_options
757                .set_forced_transport(Some(Transport::WsStreaming));
758            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
759        }
760
761        // Create a channel for PriceData and spawn a task to convert ItemUpdate to PriceData
762        let (price_tx, price_rx) = mpsc::unbounded_channel();
763        tokio::spawn(async move {
764            let mut receiver = item_receiver;
765            while let Some(item_update) = receiver.recv().await {
766                let price_data = PriceData::from(&item_update);
767                let _ = price_tx.send(price_data);
768            }
769        });
770
771        info!(
772            "Market subscription created for {} instruments",
773            epics.len()
774        );
775        Ok(price_rx)
776    }
777
778    /// Subscribes to trade updates for the account.
779    ///
780    /// This method creates a subscription to receive real-time trade confirmations,
781    /// order updates (OPU), and working order updates (WOU) for the account,
782    /// and returns a channel receiver for consuming the updates.
783    ///
784    /// # Returns
785    ///
786    /// Returns a receiver channel for `TradeFields` updates, or an error if
787    /// the subscription setup failed.
788    ///
789    /// # Examples
790    ///
791    /// ```ignore
792    /// let mut receiver = client.trade_subscribe().await?;
793    ///
794    /// tokio::spawn(async move {
795    ///     while let Some(trade_fields) = receiver.recv().await {
796    ///         println!("Trade update: {:?}", trade_fields);
797    ///     }
798    /// });
799    /// ```
800    pub async fn trade_subscribe(
801        &mut self,
802    ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
803        // Mark that we have at least one subscription on the market streamer
804        self.has_market_stream_subs = true;
805
806        let account_id = self.account_id.clone();
807        let fields = Some(vec![
808            "CONFIRMS".to_string(),
809            "OPU".to_string(),
810            "WOU".to_string(),
811        ]);
812        let trade_items = vec![format!("TRADE:{account_id}")];
813
814        let mut subscription =
815            Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
816
817        subscription.set_data_adapter(None)?;
818        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
819
820        // Create channel listener
821        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
822        subscription.add_listener(Box::new(listener));
823
824        // Configure client and add subscription (reusing market_streamer_client)
825        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
826            AppError::WebSocketError("market streamer client not initialized".to_string())
827        })?;
828
829        {
830            let mut client = client.lock().await;
831            client
832                .connection_options
833                .set_forced_transport(Some(Transport::WsStreaming));
834            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
835        }
836
837        // Create a channel for TradeFields and spawn a task to convert ItemUpdate to TradeFields
838        let (trade_tx, trade_rx) = mpsc::unbounded_channel();
839        tokio::spawn(async move {
840            let mut receiver = item_receiver;
841            while let Some(item_update) = receiver.recv().await {
842                let trade_data = crate::presentation::trade::TradeData::from(&item_update);
843                let _ = trade_tx.send(trade_data.fields);
844            }
845        });
846
847        info!("Trade subscription created for account: {}", account_id);
848        Ok(trade_rx)
849    }
850
851    /// Subscribes to account data updates.
852    ///
853    /// This method creates a subscription to receive real-time account updates including
854    /// profit/loss, margin, equity, available funds, and other account metrics,
855    /// and returns a channel receiver for consuming the updates.
856    ///
857    /// # Arguments
858    ///
859    /// * `fields` - Set of account data fields to receive (e.g., PNL, MARGIN, EQUITY, etc.)
860    ///
861    /// # Returns
862    ///
863    /// Returns a receiver channel for `AccountFields` updates, or an error if
864    /// the subscription setup failed.
865    ///
866    /// # Examples
867    ///
868    /// ```ignore
869    /// let mut receiver = client.account_subscribe(fields).await?;
870    ///
871    /// tokio::spawn(async move {
872    ///     while let Some(account_fields) = receiver.recv().await {
873    ///         println!("Account update: {:?}", account_fields);
874    ///     }
875    /// });
876    /// ```
877    pub async fn account_subscribe(
878        &mut self,
879        fields: HashSet<StreamingAccountDataField>,
880    ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
881        // Mark that we have at least one subscription on the market streamer
882        self.has_market_stream_subs = true;
883
884        let fields = get_streaming_account_data_fields(&fields);
885        let account_id = self.account_id.clone();
886        let account_items = vec![format!("ACCOUNT:{account_id}")];
887
888        let mut subscription =
889            Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
890
891        subscription.set_data_adapter(None)?;
892        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
893
894        // Create channel listener
895        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
896        subscription.add_listener(Box::new(listener));
897
898        // Configure client and add subscription (reusing market_streamer_client)
899        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
900            AppError::WebSocketError("market streamer client not initialized".to_string())
901        })?;
902
903        {
904            let mut client = client.lock().await;
905            client
906                .connection_options
907                .set_forced_transport(Some(Transport::WsStreaming));
908            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
909        }
910
911        // Create a channel for AccountFields and spawn a task to convert ItemUpdate to AccountFields
912        let (account_tx, account_rx) = mpsc::unbounded_channel();
913        tokio::spawn(async move {
914            let mut receiver = item_receiver;
915            while let Some(item_update) = receiver.recv().await {
916                let account_data = crate::presentation::account::AccountData::from(&item_update);
917                let _ = account_tx.send(account_data.fields);
918            }
919        });
920
921        info!("Account subscription created for account: {}", account_id);
922        Ok(account_rx)
923    }
924
925    /// Subscribes to price data updates for the specified instruments.
926    ///
927    /// This method creates a subscription to receive real-time price updates including
928    /// bid/ask prices, sizes, and multiple currency levels for the given EPICs,
929    /// and returns a channel receiver for consuming the updates.
930    ///
931    /// # Arguments
932    ///
933    /// * `epics` - List of instrument EPICs to subscribe to
934    /// * `fields` - Set of price data fields to receive (e.g., BID_PRICE1, ASK_PRICE1, etc.)
935    ///
936    /// # Returns
937    ///
938    /// Returns a receiver channel for `PriceData` updates, or an error if
939    /// the subscription setup failed.
940    ///
941    /// # Examples
942    ///
943    /// ```ignore
944    /// let mut receiver = client.price_subscribe(
945    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
946    ///     fields
947    /// ).await?;
948    ///
949    /// tokio::spawn(async move {
950    ///     while let Some(price_data) = receiver.recv().await {
951    ///         println!("Price update: {:?}", price_data);
952    ///     }
953    /// });
954    /// ```
955    pub async fn price_subscribe(
956        &mut self,
957        epics: Vec<String>,
958        fields: HashSet<StreamingPriceField>,
959    ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
960        // Mark that we have at least one subscription on the price streamer
961        self.has_price_stream_subs = true;
962
963        let fields = get_streaming_price_fields(&fields);
964        let account_id = self.account_id.clone();
965        let price_epics: Vec<String> = epics
966            .iter()
967            .map(|epic| format!("PRICE:{account_id}:{epic}"))
968            .collect();
969
970        // Debug what we are about to subscribe to (items and fields)
971        tracing::debug!("Pricing subscribe items: {:?}", price_epics);
972        tracing::debug!("Pricing subscribe fields: {:?}", fields);
973
974        let mut subscription =
975            Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
976
977        // Allow overriding the Pricing adapter name via env var to match server config
978        let pricing_adapter =
979            std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
980        tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
981        subscription.set_data_adapter(Some(pricing_adapter))?;
982        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
983
984        // Create channel listener
985        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
986        subscription.add_listener(Box::new(listener));
987
988        // Configure client and add subscription
989        let client = self.price_streamer_client.as_ref().ok_or_else(|| {
990            AppError::WebSocketError("price streamer client not initialized".to_string())
991        })?;
992
993        {
994            let mut client = client.lock().await;
995            client
996                .connection_options
997                .set_forced_transport(Some(Transport::WsStreaming));
998            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
999        }
1000
1001        // Create a channel for PriceData and spawn a task to convert ItemUpdate to PriceData
1002        let (price_tx, price_rx) = mpsc::unbounded_channel();
1003        tokio::spawn(async move {
1004            let mut receiver = item_receiver;
1005            while let Some(item_update) = receiver.recv().await {
1006                let price_data = PriceData::from(&item_update);
1007                let _ = price_tx.send(price_data);
1008            }
1009        });
1010
1011        info!(
1012            "Price subscription created for {} instruments (account: {})",
1013            epics.len(),
1014            account_id
1015        );
1016        Ok(price_rx)
1017    }
1018
1019    /// Connects all active Lightstreamer clients and maintains the connections.
1020    ///
1021    /// This method establishes connections for all streaming clients that have active
1022    /// subscriptions (market and price). Each client runs in its own task and
1023    /// all connections are maintained until a shutdown signal is received.
1024    ///
1025    /// # Arguments
1026    ///
1027    /// * `shutdown_signal` - Optional signal to gracefully shutdown all connections.
1028    ///   If None, a default signal handler for SIGINT/SIGTERM will be created.
1029    ///
1030    /// # Returns
1031    ///
1032    /// Returns `Ok(())` when all connections are closed gracefully, or an error if
1033    /// any connection fails after maximum retry attempts.
1034    ///
1035    pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1036        // Use provided signal or create a new one with signal hooks
1037        let signal = if let Some(sig) = shutdown_signal {
1038            sig
1039        } else {
1040            let sig = Arc::new(Notify::new());
1041            setup_signal_hook(Arc::clone(&sig)).await;
1042            sig
1043        };
1044
1045        let mut tasks = Vec::new();
1046
1047        // Connect market streamer only if there are active subscriptions
1048        if self.has_market_stream_subs {
1049            if let Some(client) = self.market_streamer_client.as_ref() {
1050                let client = Arc::clone(client);
1051                let signal = Arc::clone(&signal);
1052                let task =
1053                    tokio::spawn(
1054                        async move { Self::connect_client(client, signal, "Market").await },
1055                    );
1056                tasks.push(task);
1057            }
1058        } else {
1059            info!("Skipping Market streamer connection: no active subscriptions");
1060        }
1061
1062        // Connect price streamer only if there are active subscriptions
1063        if self.has_price_stream_subs {
1064            if let Some(client) = self.price_streamer_client.as_ref() {
1065                let client = Arc::clone(client);
1066                let signal = Arc::clone(&signal);
1067                let task =
1068                    tokio::spawn(
1069                        async move { Self::connect_client(client, signal, "Price").await },
1070                    );
1071                tasks.push(task);
1072            }
1073        } else {
1074            info!("Skipping Price streamer connection: no active subscriptions");
1075        }
1076
1077        if tasks.is_empty() {
1078            warn!("No streaming clients selected for connection (no active subscriptions)");
1079            return Ok(());
1080        }
1081
1082        info!("Connecting {} streaming client(s)...", tasks.len());
1083
1084        // Wait for all tasks to complete
1085        let results = futures::future::join_all(tasks).await;
1086
1087        // Check if any task failed
1088        let mut has_error = false;
1089        for (idx, result) in results.iter().enumerate() {
1090            match result {
1091                Ok(Ok(_)) => {
1092                    debug!("Streaming client {} completed successfully", idx);
1093                }
1094                Ok(Err(e)) => {
1095                    error!("Streaming client {} failed: {:?}", idx, e);
1096                    has_error = true;
1097                }
1098                Err(e) => {
1099                    error!("Streaming client {} task panicked: {:?}", idx, e);
1100                    has_error = true;
1101                }
1102            }
1103        }
1104
1105        if has_error {
1106            return Err(AppError::WebSocketError(
1107                "one or more streaming connections failed".to_string(),
1108            ));
1109        }
1110
1111        info!("All streaming connections closed gracefully");
1112        Ok(())
1113    }
1114
1115    /// Internal helper to connect a single Lightstreamer client with retry logic.
1116    async fn connect_client(
1117        client: Arc<Mutex<LightstreamerClient>>,
1118        signal: Arc<Notify>,
1119        client_type: &str,
1120    ) -> Result<(), AppError> {
1121        let mut retry_interval_millis: u64 = 0;
1122        let mut retry_counter: u64 = 0;
1123
1124        while retry_counter < MAX_CONNECTION_ATTEMPTS {
1125            let connect_result = {
1126                let mut client = client.lock().await;
1127                client.connect_direct(Arc::clone(&signal)).await
1128            };
1129
1130            // Convert error to String immediately to avoid Send issues
1131            let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1132
1133            match result_with_string_error {
1134                Ok(_) => {
1135                    info!("{} streamer connected successfully", client_type);
1136                    break;
1137                }
1138                Err(error_msg) => {
1139                    // If server closed because there are no active subscriptions, treat as graceful
1140                    if error_msg.contains("No more requests to fulfill") {
1141                        info!(
1142                            "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1143                            client_type
1144                        );
1145                        return Ok(());
1146                    }
1147
1148                    error!("{} streamer connection failed: {}", client_type, error_msg);
1149
1150                    if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1151                        tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1152                            .await;
1153                        retry_interval_millis =
1154                            (retry_interval_millis + (200 * retry_counter)).min(5000);
1155                        retry_counter += 1;
1156                        warn!(
1157                            "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1158                            client_type,
1159                            retry_counter + 1,
1160                            MAX_CONNECTION_ATTEMPTS,
1161                            retry_interval_millis as f64 / 1000.0
1162                        );
1163                    } else {
1164                        retry_counter += 1;
1165                    }
1166                }
1167            }
1168        }
1169
1170        if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1171            error!(
1172                "{} streamer failed after {} attempts",
1173                client_type, MAX_CONNECTION_ATTEMPTS
1174            );
1175            return Err(AppError::WebSocketError(format!(
1176                "{} streamer: maximum connection attempts ({}) exceeded",
1177                client_type, MAX_CONNECTION_ATTEMPTS
1178            )));
1179        }
1180
1181        info!("{} streamer connection closed gracefully", client_type);
1182        Ok(())
1183    }
1184
1185    /// Disconnects all active Lightstreamer clients.
1186    ///
1187    /// This method gracefully closes all streaming connections (market and price).
1188    ///
1189    /// # Returns
1190    ///
1191    /// Returns `Ok(())` if all disconnections were successful.
1192    pub async fn disconnect(&mut self) -> Result<(), AppError> {
1193        let mut disconnected = 0;
1194
1195        if let Some(client) = self.market_streamer_client.as_ref() {
1196            let mut client = client.lock().await;
1197            client.disconnect().await;
1198            info!("Market streamer disconnected");
1199            disconnected += 1;
1200        }
1201
1202        if let Some(client) = self.price_streamer_client.as_ref() {
1203            let mut client = client.lock().await;
1204            client.disconnect().await;
1205            info!("Price streamer disconnected");
1206            disconnected += 1;
1207        }
1208
1209        info!("Disconnected {} streaming client(s)", disconnected);
1210        Ok(())
1211    }
1212}