Skip to main content

ig_client/application/
client.rs

1/******************************************************************************
2   Author: Joaquín Béjar García
3   Email: jb@taunais.com
4   Date: 19/10/25
5******************************************************************************/
6use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::costs::CostsService;
9use crate::application::interfaces::market::MarketService;
10use crate::application::interfaces::operations::OperationsService;
11use crate::application::interfaces::order::OrderService;
12use crate::application::interfaces::sentiment::SentimentService;
13use crate::application::interfaces::watchlist::WatchlistService;
14use crate::error::AppError;
15use crate::model::http::HttpClient;
16use crate::model::requests::RecentPricesRequest;
17use crate::model::requests::{
18    AddToWatchlistRequest, CloseCostsRequest, CreateWatchlistRequest, EditCostsRequest,
19    OpenCostsRequest, UpdateWorkingOrderRequest,
20};
21use crate::model::requests::{
22    ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
23};
24use crate::model::responses::{
25    AccountPreferencesResponse, ApplicationDetailsResponse, CategoriesResponse,
26    CategoryInstrumentsResponse, ClientSentimentResponse, CostsHistoryResponse,
27    CreateWatchlistResponse, DBEntryResponse, DurableMediumResponse, HistoricalPricesResponse,
28    IndicativeCostsResponse, MarketNavigationResponse, MarketSearchResponse, MarketSentiment,
29    MultipleMarketDetailsResponse, SinglePositionResponse, StatusResponse,
30    WatchlistMarketsResponse, WatchlistsResponse,
31};
32use crate::model::responses::{
33    ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
34};
35use crate::model::streaming::{
36    StreamingAccountDataField, StreamingChartField, StreamingMarketField, StreamingPriceField,
37    get_streaming_account_data_fields, get_streaming_chart_fields, get_streaming_market_fields,
38    get_streaming_price_fields,
39};
40use crate::prelude::{
41    AccountActivityResponse, AccountFields, AccountsResponse, ChartData, ChartScale,
42    OrderConfirmationResponse, PositionsResponse, TradeFields, TransactionHistoryResponse,
43    WorkingOrdersResponse,
44};
45use crate::presentation::market::{MarketData, MarketDetails};
46use crate::presentation::price::PriceData;
47use async_trait::async_trait;
48use lightstreamer_rs::client::{LightstreamerClient, LogType, Transport};
49use lightstreamer_rs::subscription::{
50    ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
51};
52use lightstreamer_rs::utils::setup_signal_hook;
53use serde_json::Value;
54use std::collections::HashSet;
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::sync::{Mutex, Notify, RwLock, mpsc};
58use tokio::time::sleep;
59use tracing::{debug, error, info, warn};
60
61const MAX_CONNECTION_ATTEMPTS: u64 = 3;
62
63/// Main client for interacting with IG Markets API
64///
65/// This client provides a unified interface for all IG Markets API operations,
66/// including market data, account management, and order execution.
67pub struct Client {
68    http_client: Arc<HttpClient>,
69}
70
71impl Client {
72    /// Creates a new client instance
73    ///
74    /// # Returns
75    /// A new Client with default configuration
76    pub fn new() -> Self {
77        let http_client = Arc::new(HttpClient::default());
78        Self { http_client }
79    }
80
81    /// Gets WebSocket connection information for Lightstreamer
82    ///
83    /// # Returns
84    /// * `WebsocketInfo` containing server endpoint, authentication tokens, and account ID
85    pub async fn get_ws_info(&self) -> WebsocketInfo {
86        self.http_client.get_ws_info().await
87    }
88}
89
90impl Default for Client {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96#[async_trait]
97impl MarketService for Client {
98    async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
99        let path = format!("markets?searchTerm={}", search_term);
100        info!("Searching markets with term: {}", search_term);
101        let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
102        debug!("{} markets found", result.markets.len());
103        Ok(result)
104    }
105
106    async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
107        let path = format!("markets/{epic}");
108        info!("Getting market details: {}", epic);
109        let market_value: Value = self.http_client.get(&path, Some(3)).await?;
110        let market_details: MarketDetails = serde_json::from_value(market_value)?;
111        debug!("Market details obtained for: {}", epic);
112        Ok(market_details)
113    }
114
115    async fn get_multiple_market_details(
116        &self,
117        epics: &[String],
118    ) -> Result<MultipleMarketDetailsResponse, AppError> {
119        if epics.is_empty() {
120            return Ok(MultipleMarketDetailsResponse::default());
121        } else if epics.len() > 50 {
122            return Err(AppError::InvalidInput(
123                "The maximum number of EPICs is 50".to_string(),
124            ));
125        }
126
127        let epics_str = epics.join(",");
128        let path = format!("markets?epics={}", epics_str);
129        debug!(
130            "Getting market details for {} EPICs in a batch",
131            epics.len()
132        );
133
134        let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
135
136        Ok(response)
137    }
138
139    async fn get_historical_prices(
140        &self,
141        epic: &str,
142        resolution: &str,
143        from: &str,
144        to: &str,
145    ) -> Result<HistoricalPricesResponse, AppError> {
146        let path = format!(
147            "prices/{}?resolution={}&from={}&to={}",
148            epic, resolution, from, to
149        );
150        info!("Getting historical prices for: {}", epic);
151        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
152        debug!("Historical prices obtained for: {}", epic);
153        Ok(result)
154    }
155
156    async fn get_historical_prices_by_date_range(
157        &self,
158        epic: &str,
159        resolution: &str,
160        start_date: &str,
161        end_date: &str,
162    ) -> Result<HistoricalPricesResponse, AppError> {
163        let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
164        info!(
165            "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
166            epic, resolution, start_date, end_date
167        );
168        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
169        debug!(
170            "Historical prices obtained for epic: {}, {} data points",
171            epic,
172            result.prices.len()
173        );
174        Ok(result)
175    }
176
177    async fn get_recent_prices(
178        &self,
179        params: &RecentPricesRequest<'_>,
180    ) -> Result<HistoricalPricesResponse, AppError> {
181        let mut query_params = Vec::new();
182
183        if let Some(res) = params.resolution {
184            query_params.push(format!("resolution={}", res));
185        }
186        if let Some(f) = params.from {
187            query_params.push(format!("from={}", f));
188        }
189        if let Some(t) = params.to {
190            query_params.push(format!("to={}", t));
191        }
192        if let Some(max) = params.max_points {
193            query_params.push(format!("max={}", max));
194        }
195        if let Some(size) = params.page_size {
196            query_params.push(format!("pageSize={}", size));
197        }
198        if let Some(num) = params.page_number {
199            query_params.push(format!("pageNumber={}", num));
200        }
201
202        let query_string = if query_params.is_empty() {
203            String::new()
204        } else {
205            format!("?{}", query_params.join("&"))
206        };
207
208        let path = format!("prices/{}{}", params.epic, query_string);
209        info!("Getting recent prices for epic: {}", params.epic);
210        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
211        debug!(
212            "Recent prices obtained for epic: {}, {} data points",
213            params.epic,
214            result.prices.len()
215        );
216        Ok(result)
217    }
218
219    async fn get_historical_prices_by_count_v1(
220        &self,
221        epic: &str,
222        resolution: &str,
223        num_points: i32,
224    ) -> Result<HistoricalPricesResponse, AppError> {
225        let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
226        info!(
227            "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
228            epic, resolution, num_points
229        );
230        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
231        debug!(
232            "Historical prices (v1) obtained for epic: {}, {} data points",
233            epic,
234            result.prices.len()
235        );
236        Ok(result)
237    }
238
239    async fn get_historical_prices_by_count_v2(
240        &self,
241        epic: &str,
242        resolution: &str,
243        num_points: i32,
244    ) -> Result<HistoricalPricesResponse, AppError> {
245        let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
246        info!(
247            "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
248            epic, resolution, num_points
249        );
250        let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
251        debug!(
252            "Historical prices (v2) obtained for epic: {}, {} data points",
253            epic,
254            result.prices.len()
255        );
256        Ok(result)
257    }
258
259    async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
260        let path = "marketnavigation";
261        info!("Getting top-level market navigation nodes");
262        let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
263        debug!("{} navigation nodes found", result.nodes.len());
264        debug!("{} markets found at root level", result.markets.len());
265        Ok(result)
266    }
267
268    async fn get_market_navigation_node(
269        &self,
270        node_id: &str,
271    ) -> Result<MarketNavigationResponse, AppError> {
272        let path = format!("marketnavigation/{}", node_id);
273        info!("Getting market navigation node: {}", node_id);
274        let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
275        debug!("{} child nodes found", result.nodes.len());
276        debug!("{} markets found in node {}", result.markets.len(), node_id);
277        Ok(result)
278    }
279
280    async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
281        let max_depth = 6;
282        info!(
283            "Starting comprehensive market hierarchy traversal (max {} levels)",
284            max_depth
285        );
286
287        let root_response = self.get_market_navigation().await?;
288        info!(
289            "Root navigation: {} nodes, {} markets at top level",
290            root_response.nodes.len(),
291            root_response.markets.len()
292        );
293
294        let mut all_markets = root_response.markets.clone();
295        let mut nodes_to_process = root_response.nodes.clone();
296        let mut processed_levels = 0;
297
298        while !nodes_to_process.is_empty() && processed_levels < max_depth {
299            let mut next_level_nodes = Vec::new();
300            let mut level_market_count = 0;
301
302            info!(
303                "Processing level {} with {} nodes",
304                processed_levels,
305                nodes_to_process.len()
306            );
307
308            for node in &nodes_to_process {
309                match self.get_market_navigation_node(&node.id).await {
310                    Ok(node_response) => {
311                        let node_markets = node_response.markets.len();
312                        let node_children = node_response.nodes.len();
313
314                        if node_markets > 0 || node_children > 0 {
315                            debug!(
316                                "Node '{}' (level {}): {} markets, {} child nodes",
317                                node.name, processed_levels, node_markets, node_children
318                            );
319                        }
320
321                        all_markets.extend(node_response.markets);
322                        level_market_count += node_markets;
323                        next_level_nodes.extend(node_response.nodes);
324                    }
325                    Err(e) => {
326                        tracing::error!(
327                            "Failed to get markets for node '{}' at level {}: {:?}",
328                            node.name,
329                            processed_levels,
330                            e
331                        );
332                    }
333                }
334            }
335
336            info!(
337                "Level {} completed: {} markets found, {} nodes for next level",
338                processed_levels,
339                level_market_count,
340                next_level_nodes.len()
341            );
342
343            nodes_to_process = next_level_nodes;
344            processed_levels += 1;
345        }
346
347        info!(
348            "Market hierarchy traversal completed: {} total markets found across {} levels",
349            all_markets.len(),
350            processed_levels
351        );
352
353        Ok(all_markets)
354    }
355
356    async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
357        info!("Getting all markets from hierarchy for DB entries");
358
359        let all_markets = self.get_all_markets().await?;
360        info!("Collected {} markets from hierarchy", all_markets.len());
361
362        let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
363            .iter()
364            .map(DBEntryResponse::from)
365            .filter(|entry| !entry.epic.is_empty())
366            .collect();
367
368        info!("Created {} DB entries from markets", vec_db_entries.len());
369
370        // Collect unique symbols
371        let unique_symbols: std::collections::HashSet<String> = vec_db_entries
372            .iter()
373            .map(|entry| entry.symbol.clone())
374            .filter(|symbol| !symbol.is_empty())
375            .collect();
376
377        info!(
378            "Found {} unique symbols to fetch expiry dates for",
379            unique_symbols.len()
380        );
381
382        let mut symbol_expiry_map: std::collections::HashMap<String, String> =
383            std::collections::HashMap::new();
384
385        for symbol in unique_symbols {
386            if let Some(entry) = vec_db_entries
387                .iter()
388                .find(|e| e.symbol == symbol && !e.epic.is_empty())
389            {
390                match self.get_market_details(&entry.epic).await {
391                    Ok(market_details) => {
392                        let expiry_date = market_details
393                            .instrument
394                            .expiry_details
395                            .as_ref()
396                            .map(|details| details.last_dealing_date.clone())
397                            .unwrap_or_else(|| market_details.instrument.expiry.clone());
398
399                        symbol_expiry_map.insert(symbol.clone(), expiry_date);
400                        if let Some(expiry) = symbol_expiry_map.get(&symbol) {
401                            info!("Fetched expiry date for symbol {}: {}", symbol, expiry);
402                        }
403                    }
404                    Err(e) => {
405                        tracing::error!(
406                            "Failed to get market details for epic {} (symbol {}): {:?}",
407                            entry.epic,
408                            symbol,
409                            e
410                        );
411                        symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
412                    }
413                }
414            }
415        }
416
417        for entry in &mut vec_db_entries {
418            if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
419                entry.expiry = expiry_date.clone();
420            }
421        }
422
423        info!("Updated expiry dates for {} entries", vec_db_entries.len());
424        Ok(vec_db_entries)
425    }
426
427    async fn get_categories(&self) -> Result<CategoriesResponse, AppError> {
428        info!("Getting all categories of instruments");
429        let result: CategoriesResponse = self.http_client.get("categories", Some(1)).await?;
430        debug!("{} categories found", result.categories.len());
431        Ok(result)
432    }
433
434    async fn get_category_instruments(
435        &self,
436        category_id: &str,
437        page_number: Option<i32>,
438        page_size: Option<i32>,
439    ) -> Result<CategoryInstrumentsResponse, AppError> {
440        let mut path = format!("categories/{}/instruments", category_id);
441
442        let mut query_params = Vec::new();
443        if let Some(page) = page_number {
444            query_params.push(format!("pageNumber={}", page));
445        }
446        if let Some(size) = page_size {
447            if size > 1000 {
448                return Err(AppError::InvalidInput(
449                    "pageSize cannot exceed 1000".to_string(),
450                ));
451            }
452            query_params.push(format!("pageSize={}", size));
453        }
454
455        if !query_params.is_empty() {
456            path = format!("{}?{}", path, query_params.join("&"));
457        }
458
459        info!(
460            "Getting instruments for category: {} (page: {:?}, size: {:?})",
461            category_id, page_number, page_size
462        );
463        let result: CategoryInstrumentsResponse = self.http_client.get(&path, Some(1)).await?;
464        debug!(
465            "{} instruments found in category {}",
466            result.instruments.len(),
467            category_id
468        );
469        Ok(result)
470    }
471}
472
473#[async_trait]
474impl AccountService for Client {
475    async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
476        info!("Getting account information");
477        let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
478        debug!(
479            "Account information obtained: {} accounts",
480            result.accounts.len()
481        );
482        Ok(result)
483    }
484
485    async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
486        debug!("Getting open positions");
487        let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
488        debug!("Positions obtained: {} positions", result.positions.len());
489        Ok(result)
490    }
491
492    async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
493        debug!("Getting open positions with filter: {}", filter);
494        let mut positions = self.get_positions().await?;
495
496        positions
497            .positions
498            .retain(|position| position.market.epic.contains(filter));
499
500        debug!(
501            "Positions obtained after filtering: {} positions",
502            positions.positions.len()
503        );
504        Ok(positions)
505    }
506
507    async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
508        info!("Getting working orders");
509        let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
510        debug!(
511            "Working orders obtained: {} orders",
512            result.working_orders.len()
513        );
514        Ok(result)
515    }
516
517    async fn get_activity(
518        &self,
519        from: &str,
520        to: &str,
521    ) -> Result<AccountActivityResponse, AppError> {
522        let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
523        info!("Getting account activity");
524        let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
525        debug!(
526            "Account activity obtained: {} activities",
527            result.activities.len()
528        );
529        Ok(result)
530    }
531
532    async fn get_activity_with_details(
533        &self,
534        from: &str,
535        to: &str,
536    ) -> Result<AccountActivityResponse, AppError> {
537        let path = format!(
538            "history/activity?from={}&to={}&detailed=true&pageSize=500",
539            from, to
540        );
541        info!("Getting detailed account activity");
542        let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
543        debug!(
544            "Detailed account activity obtained: {} activities",
545            result.activities.len()
546        );
547        Ok(result)
548    }
549
550    async fn get_transactions(
551        &self,
552        from: &str,
553        to: &str,
554    ) -> Result<TransactionHistoryResponse, AppError> {
555        const PAGE_SIZE: u32 = 200;
556        let mut all_transactions = Vec::new();
557        let mut current_page = 1;
558        #[allow(unused_assignments)]
559        let mut last_metadata = None;
560
561        loop {
562            let path = format!(
563                "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
564                from, to, PAGE_SIZE, current_page
565            );
566            info!("Getting transaction history page {}", current_page);
567
568            let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
569
570            let total_pages = result.metadata.page_data.total_pages as u32;
571            last_metadata = Some(result.metadata);
572            all_transactions.extend(result.transactions);
573
574            if current_page >= total_pages {
575                break;
576            }
577            current_page += 1;
578        }
579
580        debug!(
581            "Total transaction history obtained: {} transactions",
582            all_transactions.len()
583        );
584
585        Ok(TransactionHistoryResponse {
586            transactions: all_transactions,
587            metadata: last_metadata
588                .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
589        })
590    }
591
592    async fn get_preferences(&self) -> Result<AccountPreferencesResponse, AppError> {
593        info!("Getting account preferences");
594        let result: AccountPreferencesResponse = self
595            .http_client
596            .get("accounts/preferences", Some(1))
597            .await?;
598        debug!(
599            "Account preferences obtained: trailing_stops_enabled={}",
600            result.trailing_stops_enabled
601        );
602        Ok(result)
603    }
604
605    async fn update_preferences(&self, trailing_stops_enabled: bool) -> Result<(), AppError> {
606        info!(
607            "Updating account preferences: trailing_stops_enabled={}",
608            trailing_stops_enabled
609        );
610        let request = serde_json::json!({
611            "trailingStopsEnabled": trailing_stops_enabled
612        });
613        let _: serde_json::Value = self
614            .http_client
615            .put("accounts/preferences", &request, Some(1))
616            .await?;
617        debug!("Account preferences updated");
618        Ok(())
619    }
620
621    async fn get_activity_by_period(
622        &self,
623        period_ms: u64,
624    ) -> Result<AccountActivityResponse, AppError> {
625        let path = format!("history/activity/{}", period_ms);
626        info!("Getting account activity for period: {} ms", period_ms);
627        let result: AccountActivityResponse = self.http_client.get(&path, Some(1)).await?;
628        debug!(
629            "Account activity obtained: {} activities",
630            result.activities.len()
631        );
632        Ok(result)
633    }
634}
635
636#[async_trait]
637impl OrderService for Client {
638    async fn create_order(
639        &self,
640        order: &CreateOrderRequest,
641    ) -> Result<CreateOrderResponse, AppError> {
642        info!("Creating order for: {}", order.epic);
643        let result: CreateOrderResponse = self
644            .http_client
645            .post("positions/otc", order, Some(2))
646            .await?;
647        debug!("Order created with reference: {}", result.deal_reference);
648        Ok(result)
649    }
650
651    async fn get_order_confirmation(
652        &self,
653        deal_reference: &str,
654    ) -> Result<OrderConfirmationResponse, AppError> {
655        let path = format!("confirms/{}", deal_reference);
656        info!("Getting confirmation for order: {}", deal_reference);
657        let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
658        debug!("Confirmation obtained for order: {}", deal_reference);
659        Ok(result)
660    }
661
662    async fn get_order_confirmation_w_retry(
663        &self,
664        deal_reference: &str,
665        retries: u64,
666        delay_ms: u64,
667    ) -> Result<OrderConfirmationResponse, AppError> {
668        let mut attempts = 0;
669        loop {
670            match self.get_order_confirmation(deal_reference).await {
671                Ok(response) => return Ok(response),
672                Err(e) => {
673                    attempts += 1;
674                    if attempts > retries {
675                        return Err(e);
676                    }
677                    warn!(
678                        "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
679                        attempts, retries, e, delay_ms
680                    );
681                    sleep(Duration::from_millis(delay_ms)).await;
682                }
683            }
684        }
685    }
686
687    async fn update_position(
688        &self,
689        deal_id: &str,
690        update: &UpdatePositionRequest,
691    ) -> Result<UpdatePositionResponse, AppError> {
692        let path = format!("positions/otc/{}", deal_id);
693        info!("Updating position: {}", deal_id);
694        let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
695        debug!(
696            "Position updated: {} with deal reference: {}",
697            deal_id, result.deal_reference
698        );
699        Ok(result)
700    }
701
702    async fn update_level_in_position(
703        &self,
704        deal_id: &str,
705        limit_level: Option<f64>,
706    ) -> Result<UpdatePositionResponse, AppError> {
707        let path = format!("positions/otc/{}", deal_id);
708        info!("Updating position: {}", deal_id);
709        let limit_level = limit_level.unwrap_or(0.0);
710
711        let update: UpdatePositionRequest = UpdatePositionRequest {
712            guaranteed_stop: None,
713            limit_level: Some(limit_level),
714            stop_level: None,
715            trailing_stop: None,
716            trailing_stop_distance: None,
717            trailing_stop_increment: None,
718        };
719        let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
720        debug!(
721            "Position updated: {} with deal reference: {}",
722            deal_id, result.deal_reference
723        );
724        Ok(result)
725    }
726
727    async fn close_position(
728        &self,
729        close_request: &ClosePositionRequest,
730    ) -> Result<ClosePositionResponse, AppError> {
731        info!("Closing position");
732
733        // IG API requires POST with _method: DELETE header for closing positions
734        // This is a workaround for HTTP client limitations with DELETE + body
735        let result: ClosePositionResponse = self
736            .http_client
737            .post_with_delete_method("positions/otc", close_request, Some(1))
738            .await?;
739
740        debug!("Position closed with reference: {}", result.deal_reference);
741        Ok(result)
742    }
743
744    async fn create_working_order(
745        &self,
746        order: &CreateWorkingOrderRequest,
747    ) -> Result<CreateWorkingOrderResponse, AppError> {
748        info!("Creating working order for: {}", order.epic);
749        let result: CreateWorkingOrderResponse = self
750            .http_client
751            .post("workingorders/otc", order, Some(2))
752            .await?;
753        debug!(
754            "Working order created with reference: {}",
755            result.deal_reference
756        );
757        Ok(result)
758    }
759
760    async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
761        let path = format!("workingorders/otc/{}", deal_id);
762        let result: CreateWorkingOrderResponse =
763            self.http_client.delete(path.as_str(), Some(2)).await?;
764        debug!(
765            "Working order created with reference: {}",
766            result.deal_reference
767        );
768        Ok(())
769    }
770
771    async fn get_position(&self, deal_id: &str) -> Result<SinglePositionResponse, AppError> {
772        let path = format!("positions/{}", deal_id);
773        info!("Getting position: {}", deal_id);
774        let result: SinglePositionResponse = self.http_client.get(&path, Some(2)).await?;
775        debug!("Position obtained for deal: {}", deal_id);
776        Ok(result)
777    }
778
779    async fn update_working_order(
780        &self,
781        deal_id: &str,
782        update: &UpdateWorkingOrderRequest,
783    ) -> Result<CreateWorkingOrderResponse, AppError> {
784        let path = format!("workingorders/otc/{}", deal_id);
785        info!("Updating working order: {}", deal_id);
786        let result: CreateWorkingOrderResponse =
787            self.http_client.put(&path, update, Some(2)).await?;
788        debug!(
789            "Working order updated: {} with reference: {}",
790            deal_id, result.deal_reference
791        );
792        Ok(result)
793    }
794}
795
796// ============================================================================
797// WATCHLIST SERVICE IMPLEMENTATION
798// ============================================================================
799
800#[async_trait]
801impl WatchlistService for Client {
802    async fn get_watchlists(&self) -> Result<WatchlistsResponse, AppError> {
803        info!("Getting all watchlists");
804        let result: WatchlistsResponse = self.http_client.get("watchlists", Some(1)).await?;
805        debug!(
806            "Watchlists obtained: {} watchlists",
807            result.watchlists.len()
808        );
809        Ok(result)
810    }
811
812    async fn create_watchlist(
813        &self,
814        name: &str,
815        epics: Option<&[String]>,
816    ) -> Result<CreateWatchlistResponse, AppError> {
817        info!("Creating watchlist: {}", name);
818        let request = CreateWatchlistRequest {
819            name: name.to_string(),
820            epics: epics.map(|e| e.to_vec()),
821        };
822        let result: CreateWatchlistResponse = self
823            .http_client
824            .post("watchlists", &request, Some(1))
825            .await?;
826        debug!(
827            "Watchlist created: {} with ID: {}",
828            name, result.watchlist_id
829        );
830        Ok(result)
831    }
832
833    async fn get_watchlist(
834        &self,
835        watchlist_id: &str,
836    ) -> Result<WatchlistMarketsResponse, AppError> {
837        let path = format!("watchlists/{}", watchlist_id);
838        info!("Getting watchlist: {}", watchlist_id);
839        let result: WatchlistMarketsResponse = self.http_client.get(&path, Some(1)).await?;
840        debug!(
841            "Watchlist obtained: {} with {} markets",
842            watchlist_id,
843            result.markets.len()
844        );
845        Ok(result)
846    }
847
848    async fn delete_watchlist(&self, watchlist_id: &str) -> Result<StatusResponse, AppError> {
849        let path = format!("watchlists/{}", watchlist_id);
850        info!("Deleting watchlist: {}", watchlist_id);
851        let result: StatusResponse = self.http_client.delete(&path, Some(1)).await?;
852        debug!("Watchlist deleted: {}", watchlist_id);
853        Ok(result)
854    }
855
856    async fn add_to_watchlist(
857        &self,
858        watchlist_id: &str,
859        epic: &str,
860    ) -> Result<StatusResponse, AppError> {
861        let path = format!("watchlists/{}", watchlist_id);
862        info!("Adding {} to watchlist: {}", epic, watchlist_id);
863        let request = AddToWatchlistRequest {
864            epic: epic.to_string(),
865        };
866        let result: StatusResponse = self.http_client.put(&path, &request, Some(1)).await?;
867        debug!("Added {} to watchlist: {}", epic, watchlist_id);
868        Ok(result)
869    }
870
871    async fn remove_from_watchlist(
872        &self,
873        watchlist_id: &str,
874        epic: &str,
875    ) -> Result<StatusResponse, AppError> {
876        let path = format!("watchlists/{}/{}", watchlist_id, epic);
877        info!("Removing {} from watchlist: {}", epic, watchlist_id);
878        let result: StatusResponse = self.http_client.delete(&path, Some(1)).await?;
879        debug!("Removed {} from watchlist: {}", epic, watchlist_id);
880        Ok(result)
881    }
882}
883
884// ============================================================================
885// SENTIMENT SERVICE IMPLEMENTATION
886// ============================================================================
887
888#[async_trait]
889impl SentimentService for Client {
890    async fn get_client_sentiment(
891        &self,
892        market_ids: &[String],
893    ) -> Result<ClientSentimentResponse, AppError> {
894        let market_ids_str = market_ids.join(",");
895        let path = format!("clientsentiment?marketIds={}", market_ids_str);
896        info!("Getting client sentiment for {} markets", market_ids.len());
897        let result: ClientSentimentResponse = self.http_client.get(&path, Some(1)).await?;
898        debug!(
899            "Client sentiment obtained for {} markets",
900            result.client_sentiments.len()
901        );
902        Ok(result)
903    }
904
905    async fn get_client_sentiment_by_market(
906        &self,
907        market_id: &str,
908    ) -> Result<MarketSentiment, AppError> {
909        let path = format!("clientsentiment/{}", market_id);
910        info!("Getting client sentiment for market: {}", market_id);
911        let result: MarketSentiment = self.http_client.get(&path, Some(1)).await?;
912        debug!(
913            "Client sentiment for {}: {}% long, {}% short",
914            market_id, result.long_position_percentage, result.short_position_percentage
915        );
916        Ok(result)
917    }
918
919    async fn get_related_sentiment(
920        &self,
921        market_id: &str,
922    ) -> Result<ClientSentimentResponse, AppError> {
923        let path = format!("clientsentiment/related/{}", market_id);
924        info!("Getting related sentiment for market: {}", market_id);
925        let result: ClientSentimentResponse = self.http_client.get(&path, Some(1)).await?;
926        debug!(
927            "Related sentiment obtained: {} markets",
928            result.client_sentiments.len()
929        );
930        Ok(result)
931    }
932}
933
934// ============================================================================
935// COSTS SERVICE IMPLEMENTATION
936// ============================================================================
937
938#[async_trait]
939impl CostsService for Client {
940    async fn get_indicative_costs_open(
941        &self,
942        request: &OpenCostsRequest,
943    ) -> Result<IndicativeCostsResponse, AppError> {
944        info!(
945            "Getting indicative costs for opening position on: {}",
946            request.epic
947        );
948        let result: IndicativeCostsResponse = self
949            .http_client
950            .post("indicativecostsandcharges/open", request, Some(1))
951            .await?;
952        debug!(
953            "Indicative costs obtained, reference: {}",
954            result.indicative_quote_reference
955        );
956        Ok(result)
957    }
958
959    async fn get_indicative_costs_close(
960        &self,
961        request: &CloseCostsRequest,
962    ) -> Result<IndicativeCostsResponse, AppError> {
963        info!(
964            "Getting indicative costs for closing position: {}",
965            request.deal_id
966        );
967        let result: IndicativeCostsResponse = self
968            .http_client
969            .post("indicativecostsandcharges/close", request, Some(1))
970            .await?;
971        debug!(
972            "Indicative costs obtained, reference: {}",
973            result.indicative_quote_reference
974        );
975        Ok(result)
976    }
977
978    async fn get_indicative_costs_edit(
979        &self,
980        request: &EditCostsRequest,
981    ) -> Result<IndicativeCostsResponse, AppError> {
982        info!(
983            "Getting indicative costs for editing position: {}",
984            request.deal_id
985        );
986        let result: IndicativeCostsResponse = self
987            .http_client
988            .post("indicativecostsandcharges/edit", request, Some(1))
989            .await?;
990        debug!(
991            "Indicative costs obtained, reference: {}",
992            result.indicative_quote_reference
993        );
994        Ok(result)
995    }
996
997    async fn get_costs_history(
998        &self,
999        from: &str,
1000        to: &str,
1001    ) -> Result<CostsHistoryResponse, AppError> {
1002        let path = format!("indicativecostsandcharges/history/from/{}/to/{}", from, to);
1003        info!("Getting costs history from {} to {}", from, to);
1004        let result: CostsHistoryResponse = self.http_client.get(&path, Some(1)).await?;
1005        debug!("Costs history obtained: {} entries", result.costs.len());
1006        Ok(result)
1007    }
1008
1009    async fn get_durable_medium(
1010        &self,
1011        quote_reference: &str,
1012    ) -> Result<DurableMediumResponse, AppError> {
1013        let path = format!(
1014            "indicativecostsandcharges/durablemedium/{}",
1015            quote_reference
1016        );
1017        info!("Getting durable medium for reference: {}", quote_reference);
1018        let result: DurableMediumResponse = self.http_client.get(&path, Some(1)).await?;
1019        debug!("Durable medium obtained for reference: {}", quote_reference);
1020        Ok(result)
1021    }
1022}
1023
1024// ============================================================================
1025// OPERATIONS SERVICE IMPLEMENTATION
1026// ============================================================================
1027
1028#[async_trait]
1029impl OperationsService for Client {
1030    async fn get_client_apps(&self) -> Result<ApplicationDetailsResponse, AppError> {
1031        info!("Getting client applications");
1032        let result: ApplicationDetailsResponse = self
1033            .http_client
1034            .get("operations/application", Some(1))
1035            .await?;
1036        debug!("Client application obtained: {}", result.api_key);
1037        Ok(result)
1038    }
1039
1040    async fn disable_client_app(&self) -> Result<StatusResponse, AppError> {
1041        info!("Disabling current client application");
1042        let result: StatusResponse = self
1043            .http_client
1044            .put(
1045                "operations/application/disable",
1046                &serde_json::json!({}),
1047                Some(1),
1048            )
1049            .await?;
1050        debug!("Client application disabled");
1051        Ok(result)
1052    }
1053}
1054
1055/// Streaming client for IG Markets real-time data.
1056///
1057/// This client manages two Lightstreamer connections for different data types:
1058/// - **Market streamer**: Handles market data (prices, market state), trade updates (CONFIRMS, OPU, WOU),
1059///   and account updates (positions, orders, balance). Uses the default adapter.
1060/// - **Price streamer**: Handles detailed price data (bid/ask levels, sizes, multiple currencies).
1061///   Uses the "Pricing" adapter.
1062///
1063/// Each connection type can be managed independently and runs in parallel.
1064pub struct StreamerClient {
1065    account_id: String,
1066    market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
1067    price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
1068    // Flags indicating whether there is at least one active subscription for each client
1069    has_market_stream_subs: bool,
1070    has_price_stream_subs: bool,
1071}
1072
1073impl StreamerClient {
1074    /// Creates a new streaming client instance.
1075    ///
1076    /// This initializes both streaming clients (market and price) but does not
1077    /// establish connections yet. Connections are established when `connect()` is called.
1078    ///
1079    /// # Returns
1080    ///
1081    /// Returns a new `StreamerClient` instance or an error if initialization fails.
1082    pub async fn new() -> Result<Self, AppError> {
1083        let http_client_raw = Arc::new(RwLock::new(Client::new()));
1084        let http_client = http_client_raw.read().await;
1085        let ws_info = http_client.get_ws_info().await;
1086        let password = ws_info.get_ws_password();
1087
1088        // Market data client (no adapter specified - uses default)
1089        let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
1090            Some(ws_info.server.as_str()),
1091            None,
1092            Some(&ws_info.account_id),
1093            Some(&password),
1094        )?));
1095
1096        let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
1097            Some(ws_info.server.as_str()),
1098            None,
1099            Some(&ws_info.account_id),
1100            Some(&password),
1101        )?));
1102
1103        // Force WebSocket streaming transport on both clients to satisfy IG requirements
1104        // and configure logging to use tracing levels for proper log propagation
1105        {
1106            let mut client = market_streamer_client.lock().await;
1107            client
1108                .connection_options
1109                .set_forced_transport(Some(Transport::WsStreaming));
1110            client.set_logging_type(LogType::TracingLogs);
1111        }
1112        {
1113            let mut client = price_streamer_client.lock().await;
1114            client
1115                .connection_options
1116                .set_forced_transport(Some(Transport::WsStreaming));
1117            client.set_logging_type(LogType::TracingLogs);
1118        }
1119
1120        Ok(Self {
1121            account_id: ws_info.account_id.clone(),
1122            market_streamer_client: Some(market_streamer_client),
1123            price_streamer_client: Some(price_streamer_client),
1124            has_market_stream_subs: false,
1125            has_price_stream_subs: false,
1126        })
1127    }
1128
1129    /// Creates a default streaming client instance.
1130    pub async fn default() -> Result<Self, AppError> {
1131        Self::new().await
1132    }
1133
1134    /// Subscribes to market data updates for the specified instruments.
1135    ///
1136    /// This method creates a subscription to receive real-time market data updates
1137    /// for the given EPICs and returns a channel receiver for consuming the updates.
1138    ///
1139    /// # Arguments
1140    ///
1141    /// * `epics` - List of instrument EPICs to subscribe to
1142    /// * `fields` - Set of market data fields to receive (e.g., BID, OFFER, etc.)
1143    ///
1144    /// # Returns
1145    ///
1146    /// Returns a receiver channel for `PriceData` updates, or an error if
1147    /// the subscription setup failed.
1148    ///
1149    /// # Examples
1150    ///
1151    /// ```ignore
1152    /// let mut receiver = client.market_subscribe(
1153    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
1154    ///     fields
1155    /// ).await?;
1156    ///
1157    /// tokio::spawn(async move {
1158    ///     while let Some(price_data) = receiver.recv().await {
1159    ///         println!("Price update: {:?}", price_data);
1160    ///     }
1161    /// });
1162    /// ```
1163    pub async fn market_subscribe(
1164        &mut self,
1165        epics: Vec<String>,
1166        fields: HashSet<StreamingMarketField>,
1167    ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1168        // Mark that we have at least one subscription on the market streamer
1169        self.has_market_stream_subs = true;
1170
1171        let fields = get_streaming_market_fields(&fields);
1172        let market_epics: Vec<String> = epics
1173            .iter()
1174            .map(|epic| "MARKET:".to_string() + epic)
1175            .collect();
1176        let mut subscription =
1177            Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
1178
1179        subscription.set_data_adapter(None)?;
1180        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1181
1182        // Create channel listener that converts ItemUpdate to PriceData
1183        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1184        subscription.add_listener(Box::new(listener));
1185
1186        // Configure client and add subscription
1187        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1188            AppError::WebSocketError("market streamer client not initialized".to_string())
1189        })?;
1190
1191        {
1192            let mut client = client.lock().await;
1193            client
1194                .connection_options
1195                .set_forced_transport(Some(Transport::WsStreaming));
1196            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1197                .await?;
1198        }
1199
1200        // Create a channel for PriceData and spawn a task to convert ItemUpdate to PriceData
1201        let (price_tx, price_rx) = mpsc::unbounded_channel();
1202        tokio::spawn(async move {
1203            let mut receiver = item_receiver;
1204            while let Some(item_update) = receiver.recv().await {
1205                let price_data = PriceData::from(&item_update);
1206                if price_tx.send(price_data).is_err() {
1207                    tracing::debug!("Price channel receiver dropped");
1208                    break;
1209                }
1210            }
1211        });
1212
1213        info!(
1214            "Market subscription created for {} instruments",
1215            epics.len()
1216        );
1217        Ok(price_rx)
1218    }
1219
1220    /// Subscribes to trade updates for the account.
1221    ///
1222    /// This method creates a subscription to receive real-time trade confirmations,
1223    /// order updates (OPU), and working order updates (WOU) for the account,
1224    /// and returns a channel receiver for consuming the updates.
1225    ///
1226    /// # Returns
1227    ///
1228    /// Returns a receiver channel for `TradeFields` updates, or an error if
1229    /// the subscription setup failed.
1230    ///
1231    /// # Examples
1232    ///
1233    /// ```ignore
1234    /// let mut receiver = client.trade_subscribe().await?;
1235    ///
1236    /// tokio::spawn(async move {
1237    ///     while let Some(trade_fields) = receiver.recv().await {
1238    ///         println!("Trade update: {:?}", trade_fields);
1239    ///     }
1240    /// });
1241    /// ```
1242    pub async fn trade_subscribe(
1243        &mut self,
1244    ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
1245        // Mark that we have at least one subscription on the market streamer
1246        self.has_market_stream_subs = true;
1247
1248        let account_id = self.account_id.clone();
1249        let fields = Some(vec![
1250            "CONFIRMS".to_string(),
1251            "OPU".to_string(),
1252            "WOU".to_string(),
1253        ]);
1254        let trade_items = vec![format!("TRADE:{account_id}")];
1255
1256        let mut subscription =
1257            Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
1258
1259        subscription.set_data_adapter(None)?;
1260        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1261
1262        // Create channel listener
1263        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1264        subscription.add_listener(Box::new(listener));
1265
1266        // Configure client and add subscription (reusing market_streamer_client)
1267        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1268            AppError::WebSocketError("market streamer client not initialized".to_string())
1269        })?;
1270
1271        {
1272            let mut client = client.lock().await;
1273            client
1274                .connection_options
1275                .set_forced_transport(Some(Transport::WsStreaming));
1276            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1277                .await?;
1278        }
1279
1280        // Create a channel for TradeFields and spawn a task to convert ItemUpdate to TradeFields
1281        let (trade_tx, trade_rx) = mpsc::unbounded_channel();
1282        tokio::spawn(async move {
1283            let mut receiver = item_receiver;
1284            while let Some(item_update) = receiver.recv().await {
1285                let trade_data = crate::presentation::trade::TradeData::from(&item_update);
1286                if trade_tx.send(trade_data.fields).is_err() {
1287                    tracing::debug!("Trade channel receiver dropped");
1288                    break;
1289                }
1290            }
1291        });
1292
1293        info!("Trade subscription created for account: {}", account_id);
1294        Ok(trade_rx)
1295    }
1296
1297    /// Subscribes to account data updates.
1298    ///
1299    /// This method creates a subscription to receive real-time account updates including
1300    /// profit/loss, margin, equity, available funds, and other account metrics,
1301    /// and returns a channel receiver for consuming the updates.
1302    ///
1303    /// # Arguments
1304    ///
1305    /// * `fields` - Set of account data fields to receive (e.g., PNL, MARGIN, EQUITY, etc.)
1306    ///
1307    /// # Returns
1308    ///
1309    /// Returns a receiver channel for `AccountFields` updates, or an error if
1310    /// the subscription setup failed.
1311    ///
1312    /// # Examples
1313    ///
1314    /// ```ignore
1315    /// let mut receiver = client.account_subscribe(fields).await?;
1316    ///
1317    /// tokio::spawn(async move {
1318    ///     while let Some(account_fields) = receiver.recv().await {
1319    ///         println!("Account update: {:?}", account_fields);
1320    ///     }
1321    /// });
1322    /// ```
1323    pub async fn account_subscribe(
1324        &mut self,
1325        fields: HashSet<StreamingAccountDataField>,
1326    ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
1327        // Mark that we have at least one subscription on the market streamer
1328        self.has_market_stream_subs = true;
1329
1330        let fields = get_streaming_account_data_fields(&fields);
1331        let account_id = self.account_id.clone();
1332        let account_items = vec![format!("ACCOUNT:{account_id}")];
1333
1334        let mut subscription =
1335            Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
1336
1337        subscription.set_data_adapter(None)?;
1338        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1339
1340        // Create channel listener
1341        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1342        subscription.add_listener(Box::new(listener));
1343
1344        // Configure client and add subscription (reusing market_streamer_client)
1345        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1346            AppError::WebSocketError("market streamer client not initialized".to_string())
1347        })?;
1348
1349        {
1350            let mut client = client.lock().await;
1351            client
1352                .connection_options
1353                .set_forced_transport(Some(Transport::WsStreaming));
1354            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1355                .await?;
1356        }
1357
1358        // Create a channel for AccountFields and spawn a task to convert ItemUpdate to AccountFields
1359        let (account_tx, account_rx) = mpsc::unbounded_channel();
1360        tokio::spawn(async move {
1361            let mut receiver = item_receiver;
1362            while let Some(item_update) = receiver.recv().await {
1363                let account_data = crate::presentation::account::AccountData::from(&item_update);
1364                if account_tx.send(account_data.fields).is_err() {
1365                    tracing::debug!("Account channel receiver dropped");
1366                    break;
1367                }
1368            }
1369        });
1370
1371        info!("Account subscription created for account: {}", account_id);
1372        Ok(account_rx)
1373    }
1374
1375    /// Subscribes to price data updates for the specified instruments.
1376    ///
1377    /// This method creates a subscription to receive real-time price updates including
1378    /// bid/ask prices, sizes, and multiple currency levels for the given EPICs,
1379    /// and returns a channel receiver for consuming the updates.
1380    ///
1381    /// # Arguments
1382    ///
1383    /// * `epics` - List of instrument EPICs to subscribe to
1384    /// * `fields` - Set of price data fields to receive (e.g., BID_PRICE1, ASK_PRICE1, etc.)
1385    ///
1386    /// # Returns
1387    ///
1388    /// Returns a receiver channel for `PriceData` updates, or an error if
1389    /// the subscription setup failed.
1390    ///
1391    /// # Examples
1392    ///
1393    /// ```ignore
1394    /// let mut receiver = client.price_subscribe(
1395    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
1396    ///     fields
1397    /// ).await?;
1398    ///
1399    /// tokio::spawn(async move {
1400    ///     while let Some(price_data) = receiver.recv().await {
1401    ///         println!("Price update: {:?}", price_data);
1402    ///     }
1403    /// });
1404    /// ```
1405    pub async fn price_subscribe(
1406        &mut self,
1407        epics: Vec<String>,
1408        fields: HashSet<StreamingPriceField>,
1409    ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1410        // Mark that we have at least one subscription on the price streamer
1411        self.has_price_stream_subs = true;
1412
1413        let fields = get_streaming_price_fields(&fields);
1414        let account_id = self.account_id.clone();
1415        let price_epics: Vec<String> = epics
1416            .iter()
1417            .map(|epic| format!("PRICE:{account_id}:{epic}"))
1418            .collect();
1419
1420        // Debug what we are about to subscribe to (items and fields)
1421        tracing::debug!("Pricing subscribe items: {:?}", price_epics);
1422        tracing::debug!("Pricing subscribe fields: {:?}", fields);
1423
1424        let mut subscription =
1425            Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1426
1427        // Allow overriding the Pricing adapter name via env var to match server config
1428        let pricing_adapter =
1429            std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1430        tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1431        subscription.set_data_adapter(Some(pricing_adapter))?;
1432        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1433
1434        // Create channel listener
1435        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1436        subscription.add_listener(Box::new(listener));
1437
1438        // Configure client and add subscription
1439        let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1440            AppError::WebSocketError("price streamer client not initialized".to_string())
1441        })?;
1442
1443        {
1444            let mut client = client.lock().await;
1445            client
1446                .connection_options
1447                .set_forced_transport(Some(Transport::WsStreaming));
1448            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1449                .await?;
1450        }
1451
1452        // Create a channel for PriceData and spawn a task to convert ItemUpdate to PriceData
1453        let (price_tx, price_rx) = mpsc::unbounded_channel();
1454        tokio::spawn(async move {
1455            let mut receiver = item_receiver;
1456            while let Some(item_update) = receiver.recv().await {
1457                let price_data = PriceData::from(&item_update);
1458                if price_tx.send(price_data).is_err() {
1459                    tracing::debug!("Price channel receiver dropped");
1460                    break;
1461                }
1462            }
1463        });
1464
1465        info!(
1466            "Price subscription created for {} instruments (account: {})",
1467            epics.len(),
1468            account_id
1469        );
1470        Ok(price_rx)
1471    }
1472
1473    /// Subscribes to chart data updates for the specified instruments and scale.
1474    ///
1475    /// This method creates a subscription to receive real-time chart updates including
1476    /// OHLC data, volume, and other chart metrics for the given EPICs and chart scale,
1477    /// and returns a channel receiver for consuming the updates.
1478    ///
1479    /// # Arguments
1480    ///
1481    /// * `epics` - List of instrument EPICs to subscribe to.
1482    /// * `scale` - Chart scale (e.g., Tick, 1Min, 5Min, etc.).
1483    /// * `fields` - Set of chart data fields to receive (e.g., OPEN, HIGH, LOW, CLOSE, VOLUME).
1484    ///
1485    /// # Returns
1486    ///
1487    /// Returns a receiver channel for `ChartData` updates, or an error if
1488    /// the subscription setup failed.
1489    ///
1490    /// # Examples
1491    ///
1492    /// ```ignore
1493    /// let mut receiver = client.chart_subscribe(
1494    ///     vec!["IX.D.DAX.DAILY.IP".to_string()],
1495    ///     ChartScale::OneMin,
1496    ///     fields
1497    /// ).await?;
1498    ///
1499    /// tokio::spawn(async move {
1500    ///     while let Some(chart_data) = receiver.recv().await {
1501    ///         println!("Chart update: {:?}", chart_data);
1502    ///     }
1503    /// });
1504    /// ```
1505    pub async fn chart_subscribe(
1506        &mut self,
1507        epics: Vec<String>,
1508        scale: ChartScale,
1509        fields: HashSet<StreamingChartField>,
1510    ) -> Result<mpsc::UnboundedReceiver<ChartData>, AppError> {
1511        // Mark that we have at least one subscription on the market streamer
1512        self.has_market_stream_subs = true;
1513
1514        let fields = get_streaming_chart_fields(&fields);
1515
1516        let chart_items: Vec<String> = epics
1517            .iter()
1518            .map(|epic| format!("CHART:{epic}:{scale}",))
1519            .collect();
1520
1521        // Candle data uses MERGE mode, tick data uses DISTINCT
1522        let mode = if matches!(scale, ChartScale::Tick) {
1523            SubscriptionMode::Distinct
1524        } else {
1525            SubscriptionMode::Merge
1526        };
1527
1528        let mut subscription = Subscription::new(mode, Some(chart_items), Some(fields))?;
1529
1530        subscription.set_data_adapter(None)?;
1531        subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1532
1533        // Create channel listener
1534        let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1535        subscription.add_listener(Box::new(listener));
1536
1537        // Configure client and add subscription (reusing market_streamer_client)
1538        let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1539            AppError::WebSocketError("market streamer client not initialized".to_string())
1540        })?;
1541
1542        {
1543            let mut client = client.lock().await;
1544            client
1545                .connection_options
1546                .set_forced_transport(Some(Transport::WsStreaming));
1547            LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1548                .await?;
1549        }
1550
1551        // Create a channel for ChartData and spawn a task to convert ItemUpdate to ChartData
1552        let (chart_tx, chart_rx) = mpsc::unbounded_channel();
1553        tokio::spawn(async move {
1554            let mut receiver = item_receiver;
1555            while let Some(item_update) = receiver.recv().await {
1556                let chart_data = ChartData::from(&item_update);
1557                if chart_tx.send(chart_data).is_err() {
1558                    tracing::debug!("Chart channel receiver dropped");
1559                    break;
1560                }
1561            }
1562        });
1563
1564        info!(
1565            "Chart subscription created for {} instruments (scale: {})",
1566            epics.len(),
1567            scale
1568        );
1569
1570        Ok(chart_rx)
1571    }
1572
1573    /// Connects all active Lightstreamer clients and maintains the connections.
1574    ///
1575    /// This method establishes connections for all streaming clients that have active
1576    /// subscriptions (market and price). Each client runs in its own task and
1577    /// all connections are maintained until a shutdown signal is received.
1578    ///
1579    /// # Arguments
1580    ///
1581    /// * `shutdown_signal` - Optional signal to gracefully shutdown all connections.
1582    ///   If None, a default signal handler for SIGINT/SIGTERM will be created.
1583    ///
1584    /// # Returns
1585    ///
1586    /// Returns `Ok(())` when all connections are closed gracefully, or an error if
1587    /// any connection fails after maximum retry attempts.
1588    ///
1589    pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1590        // Use provided signal or create a new one with signal hooks
1591        let signal = if let Some(sig) = shutdown_signal {
1592            sig
1593        } else {
1594            let sig = Arc::new(Notify::new());
1595            setup_signal_hook(Arc::clone(&sig)).await;
1596            sig
1597        };
1598
1599        let mut tasks = Vec::new();
1600
1601        // Connect market streamer only if there are active subscriptions
1602        if self.has_market_stream_subs {
1603            if let Some(client) = self.market_streamer_client.as_ref() {
1604                let client = Arc::clone(client);
1605                let signal = Arc::clone(&signal);
1606                let task =
1607                    tokio::spawn(
1608                        async move { Self::connect_client(client, signal, "Market").await },
1609                    );
1610                tasks.push(task);
1611            }
1612        } else {
1613            info!("Skipping Market streamer connection: no active subscriptions");
1614        }
1615
1616        // Connect price streamer only if there are active subscriptions
1617        if self.has_price_stream_subs {
1618            if let Some(client) = self.price_streamer_client.as_ref() {
1619                let client = Arc::clone(client);
1620                let signal = Arc::clone(&signal);
1621                let task =
1622                    tokio::spawn(
1623                        async move { Self::connect_client(client, signal, "Price").await },
1624                    );
1625                tasks.push(task);
1626            }
1627        } else {
1628            info!("Skipping Price streamer connection: no active subscriptions");
1629        }
1630
1631        if tasks.is_empty() {
1632            warn!("No streaming clients selected for connection (no active subscriptions)");
1633            return Ok(());
1634        }
1635
1636        info!("Connecting {} streaming client(s)...", tasks.len());
1637
1638        // Wait for all tasks to complete
1639        let results = futures::future::join_all(tasks).await;
1640
1641        // Check if any task failed
1642        let mut has_error = false;
1643        for (idx, result) in results.iter().enumerate() {
1644            match result {
1645                Ok(Ok(_)) => {
1646                    debug!("Streaming client {} completed successfully", idx);
1647                }
1648                Ok(Err(e)) => {
1649                    error!("Streaming client {} failed: {:?}", idx, e);
1650                    has_error = true;
1651                }
1652                Err(e) => {
1653                    error!("Streaming client {} task panicked: {:?}", idx, e);
1654                    has_error = true;
1655                }
1656            }
1657        }
1658
1659        if has_error {
1660            return Err(AppError::WebSocketError(
1661                "one or more streaming connections failed".to_string(),
1662            ));
1663        }
1664
1665        info!("All streaming connections closed gracefully");
1666        Ok(())
1667    }
1668
1669    /// Internal helper to connect a single Lightstreamer client with retry logic.
1670    async fn connect_client(
1671        client: Arc<Mutex<LightstreamerClient>>,
1672        signal: Arc<Notify>,
1673        client_type: &str,
1674    ) -> Result<(), AppError> {
1675        let mut retry_interval_millis: u64 = 0;
1676        let mut retry_counter: u64 = 0;
1677
1678        while retry_counter < MAX_CONNECTION_ATTEMPTS {
1679            let connect_result = {
1680                let mut client = client.lock().await;
1681                client.connect_direct(Arc::clone(&signal)).await
1682            };
1683
1684            // Convert error to String immediately to avoid Send issues
1685            let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1686
1687            match result_with_string_error {
1688                Ok(_) => {
1689                    info!("{} streamer connected successfully", client_type);
1690                    break;
1691                }
1692                Err(error_msg) => {
1693                    // If server closed because there are no active subscriptions, treat as graceful
1694                    if error_msg.contains("No more requests to fulfill") {
1695                        info!(
1696                            "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1697                            client_type
1698                        );
1699                        return Ok(());
1700                    }
1701
1702                    error!("{} streamer connection failed: {}", client_type, error_msg);
1703
1704                    if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1705                        tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1706                            .await;
1707                        retry_interval_millis =
1708                            (retry_interval_millis + (200 * retry_counter)).min(5000);
1709                        retry_counter += 1;
1710                        warn!(
1711                            "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1712                            client_type,
1713                            retry_counter + 1,
1714                            MAX_CONNECTION_ATTEMPTS,
1715                            retry_interval_millis as f64 / 1000.0
1716                        );
1717                    } else {
1718                        retry_counter += 1;
1719                    }
1720                }
1721            }
1722        }
1723
1724        if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1725            error!(
1726                "{} streamer failed after {} attempts",
1727                client_type, MAX_CONNECTION_ATTEMPTS
1728            );
1729            return Err(AppError::WebSocketError(format!(
1730                "{} streamer: maximum connection attempts ({}) exceeded",
1731                client_type, MAX_CONNECTION_ATTEMPTS
1732            )));
1733        }
1734
1735        info!("{} streamer connection closed gracefully", client_type);
1736        Ok(())
1737    }
1738
1739    /// Disconnects all active Lightstreamer clients.
1740    ///
1741    /// This method gracefully closes all streaming connections (market and price).
1742    ///
1743    /// # Returns
1744    ///
1745    /// Returns `Ok(())` if all disconnections were successful.
1746    pub async fn disconnect(&mut self) -> Result<(), AppError> {
1747        let mut disconnected = 0;
1748
1749        if let Some(client) = self.market_streamer_client.as_ref() {
1750            let mut client = client.lock().await;
1751            client.disconnect().await;
1752            info!("Market streamer disconnected");
1753            disconnected += 1;
1754        }
1755
1756        if let Some(client) = self.price_streamer_client.as_ref() {
1757            let mut client = client.lock().await;
1758            client.disconnect().await;
1759            info!("Price streamer disconnected");
1760            disconnected += 1;
1761        }
1762
1763        info!("Disconnected {} streaming client(s)", disconnected);
1764        Ok(())
1765    }
1766}