ig_client/presentation/
market.rs

1use crate::application::models::market::{MarketNavigationResponse, MarketNode};
2use crate::application::services::MarketService;
3use crate::error::AppError;
4use crate::presentation::serialization::{string_as_bool_opt, string_as_float_opt};
5use crate::session::interface::IgSession;
6use lightstreamer_rs::subscription::ItemUpdate;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10use std::pin::Pin;
11use tracing::{debug, error, info};
12
13/// Represents the current state of a market
14#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
15#[serde(rename_all = "UPPERCASE")]
16pub enum MarketState {
17    /// Market is closed for trading
18    Closed,
19    /// Market is offline and not available
20    #[default]
21    Offline,
22    /// Market is open and available for trading
23    Tradeable,
24    /// Market is in edit mode
25    Edit,
26    /// Market is in auction phase
27    Auction,
28    /// Market is in auction phase but editing is not allowed
29    AuctionNoEdit,
30    /// Market is temporarily suspended
31    Suspended,
32}
33
34/// Representation of market data received from the IG Markets streaming API
35#[derive(Debug, Clone, Serialize, Deserialize, Default)]
36pub struct MarketData {
37    /// Name of the item this data belongs to
38    pub item_name: String,
39    /// Position of the item in the subscription
40    pub item_pos: i32,
41    /// All market fields
42    pub fields: MarketFields,
43    /// Fields that have changed in this update
44    pub changed_fields: MarketFields,
45    /// Whether this is a snapshot or an update
46    pub is_snapshot: bool,
47}
48
49impl MarketData {
50    /// Converts an ItemUpdate from the Lightstreamer API to a MarketData object
51    ///
52    /// # Arguments
53    /// * `item_update` - The ItemUpdate received from the Lightstreamer API
54    ///
55    /// # Returns
56    /// * `Result<Self, String>` - The converted MarketData or an error message
57    pub fn from_item_update(item_update: &ItemUpdate) -> Result<Self, String> {
58        // Extract the item_name, defaulting to an empty string if None
59        let item_name = item_update.item_name.clone().unwrap_or_default();
60
61        // Convert item_pos from usize to i32
62        let item_pos = item_update.item_pos as i32;
63
64        // Extract is_snapshot
65        let is_snapshot = item_update.is_snapshot;
66
67        // Convert fields
68        let fields = Self::create_market_fields(&item_update.fields)?;
69
70        // Convert changed_fields by first creating a HashMap<String, Option<String>>
71        let mut changed_fields_map: HashMap<String, Option<String>> = HashMap::new();
72        for (key, value) in &item_update.changed_fields {
73            changed_fields_map.insert(key.clone(), Some(value.clone()));
74        }
75        let changed_fields = Self::create_market_fields(&changed_fields_map)?;
76
77        Ok(MarketData {
78            item_name,
79            item_pos,
80            fields,
81            changed_fields,
82            is_snapshot,
83        })
84    }
85
86    /// Helper method to create MarketFields from a HashMap of field values
87    ///
88    /// # Arguments
89    /// * `fields_map` - HashMap containing field names and their string values
90    ///
91    /// # Returns
92    /// * `Result<MarketFields, String>` - The parsed MarketFields or an error message
93    fn create_market_fields(
94        fields_map: &HashMap<String, Option<String>>,
95    ) -> Result<MarketFields, String> {
96        // Helper function to safely get a field value
97        let get_field = |key: &str| -> Option<String> { fields_map.get(key).cloned().flatten() };
98
99        // Parse market state
100        let market_state = match get_field("MARKET_STATE").as_deref() {
101            Some("closed") => Some(MarketState::Closed),
102            Some("offline") => Some(MarketState::Offline),
103            Some("tradeable") => Some(MarketState::Tradeable),
104            Some("edit") => Some(MarketState::Edit),
105            Some("auction") => Some(MarketState::Auction),
106            Some("auction_no_edit") => Some(MarketState::AuctionNoEdit),
107            Some("suspended") => Some(MarketState::Suspended),
108            Some(unknown) => return Err(format!("Unknown market state: {unknown}")),
109            None => None,
110        };
111
112        // Parse boolean field
113        let market_delay = match get_field("MARKET_DELAY").as_deref() {
114            Some("0") => Some(false),
115            Some("1") => Some(true),
116            Some(val) => return Err(format!("Invalid MARKET_DELAY value: {val}")),
117            None => None,
118        };
119
120        // Helper function to parse float values
121        let parse_float = |key: &str| -> Result<Option<f64>, String> {
122            match get_field(key) {
123                Some(val) if !val.is_empty() => val
124                    .parse::<f64>()
125                    .map(Some)
126                    .map_err(|_| format!("Failed to parse {key} as float: {val}")),
127                _ => Ok(None),
128            }
129        };
130
131        Ok(MarketFields {
132            mid_open: parse_float("MID_OPEN")?,
133            high: parse_float("HIGH")?,
134            offer: parse_float("OFFER")?,
135            change: parse_float("CHANGE")?,
136            market_delay,
137            low: parse_float("LOW")?,
138            bid: parse_float("BID")?,
139            change_pct: parse_float("CHANGE_PCT")?,
140            market_state,
141            update_time: get_field("UPDATE_TIME"),
142        })
143    }
144}
145
146impl fmt::Display for MarketData {
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        let json = serde_json::to_string(self).map_err(|_| fmt::Error)?;
149        write!(f, "{json}")
150    }
151}
152
153impl From<&ItemUpdate> for MarketData {
154    fn from(item_update: &ItemUpdate) -> Self {
155        Self::from_item_update(item_update).unwrap_or_else(|_| MarketData {
156            item_name: String::new(),
157            item_pos: 0,
158            fields: MarketFields::default(),
159            changed_fields: MarketFields::default(),
160            is_snapshot: false,
161        })
162    }
163}
164
165/// Fields containing market price and status information
166#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
167pub struct MarketFields {
168    /// The mid-open price of the market
169    #[serde(rename = "MID_OPEN")]
170    #[serde(with = "string_as_float_opt")]
171    #[serde(default)]
172    pub mid_open: Option<f64>,
173
174    /// The highest price reached by the market in the current trading session
175    #[serde(rename = "HIGH")]
176    #[serde(with = "string_as_float_opt")]
177    #[serde(default)]
178    pub high: Option<f64>,
179
180    /// The current offer (ask) price of the market
181    #[serde(rename = "OFFER")]
182    #[serde(with = "string_as_float_opt")]
183    #[serde(default)]
184    pub offer: Option<f64>,
185
186    /// The absolute price change since the previous close
187    #[serde(rename = "CHANGE")]
188    #[serde(with = "string_as_float_opt")]
189    #[serde(default)]
190    pub change: Option<f64>,
191
192    /// Indicates if there is a delay in market data
193    #[serde(rename = "MARKET_DELAY")]
194    #[serde(with = "string_as_bool_opt")]
195    #[serde(default)]
196    pub market_delay: Option<bool>,
197
198    /// The lowest price reached by the market in the current trading session
199    #[serde(rename = "LOW")]
200    #[serde(with = "string_as_float_opt")]
201    #[serde(default)]
202    pub low: Option<f64>,
203
204    /// The current bid price of the market
205    #[serde(rename = "BID")]
206    #[serde(with = "string_as_float_opt")]
207    #[serde(default)]
208    pub bid: Option<f64>,
209
210    /// The percentage price change since the previous close
211    #[serde(rename = "CHANGE_PCT")]
212    #[serde(with = "string_as_float_opt")]
213    #[serde(default)]
214    pub change_pct: Option<f64>,
215
216    /// The current state of the market (e.g., Tradeable, Closed, etc.)
217    #[serde(rename = "MARKET_STATE")]
218    #[serde(default)]
219    pub market_state: Option<MarketState>,
220
221    /// The timestamp of the last market update
222    #[serde(rename = "UPDATE_TIME")]
223    #[serde(default)]
224    pub update_time: Option<String>,
225}
226
227use once_cell::sync::Lazy;
228use std::sync::Arc;
229use tokio::sync::Semaphore;
230
231// Global semaphore to limit concurrency in API requests
232// This ensures that rate limits are not exceeded
233static API_SEMAPHORE: Lazy<Arc<Semaphore>> = Lazy::new(|| Arc::new(Semaphore::new(1)));
234
235/// Function to recursively build the market hierarchy with rate limiting
236///
237/// This function builds the market hierarchy recursively, respecting
238/// the API rate limits. It uses a semaphore to ensure that only
239/// one request is made at a time, thus avoiding exceeding rate limits.
240pub fn build_market_hierarchy<'a>(
241    market_service: &'a impl MarketService,
242    session: &'a IgSession,
243    node_id: Option<&'a str>,
244    depth: usize,
245) -> Pin<Box<dyn Future<Output = Result<Vec<MarketNode>, AppError>> + 'a>> {
246    Box::pin(async move {
247        // Limit the depth to avoid infinite loops
248        if depth > 7 {
249            debug!("Reached maximum depth of 5, stopping recursion");
250            return Ok(Vec::new());
251        }
252
253        // Acquire the semaphore to limit concurrency
254        // This ensures that only one API request is made at a time
255        let _permit = API_SEMAPHORE.clone().acquire_owned().await.unwrap();
256
257        // The rate limiter will handle any necessary delays between requests
258        // No explicit sleep calls are needed here
259
260        // Get the nodes and markets at the current level
261        let navigation: MarketNavigationResponse = match node_id {
262            Some(id) => {
263                debug!("Getting navigation node: {}", id);
264                match market_service.get_market_navigation_node(session, id).await {
265                    Ok(response) => {
266                        debug!(
267                            "Response received for node {}: {} nodes, {} markets",
268                            id,
269                            response.nodes.len(),
270                            response.markets.len()
271                        );
272                        response
273                    }
274                    Err(e) => {
275                        error!("Error getting node {}: {:?}", id, e);
276                        // If we hit a rate limit, return empty results instead of failing
277                        if matches!(e, AppError::RateLimitExceeded | AppError::Unexpected(_)) {
278                            info!("Rate limit or API error encountered, returning partial results");
279                            return Ok(Vec::new());
280                        }
281                        return Err(e);
282                    }
283                }
284            }
285            None => {
286                debug!("Getting top-level navigation nodes");
287                match market_service.get_market_navigation(session).await {
288                    Ok(response) => {
289                        debug!(
290                            "Response received for top-level nodes: {} nodes, {} markets",
291                            response.nodes.len(),
292                            response.markets.len()
293                        );
294                        response
295                    }
296                    Err(e) => {
297                        error!("Error getting top-level nodes: {:?}", e);
298                        return Err(e);
299                    }
300                }
301            }
302        };
303
304        let mut nodes = Vec::new();
305
306        // Process all nodes at this level
307        let nodes_to_process = navigation.nodes;
308
309        // Release the semaphore before processing child nodes
310        // This allows other requests to be processed while we wait
311        // for recursive requests to complete
312        drop(_permit);
313
314        // Process nodes sequentially with rate limiting
315        // This is important to respect the API rate limits
316        // By processing nodes sequentially, we allow the rate limiter
317        // to properly control the flow of requests
318        for node in nodes_to_process.into_iter() {
319            // Recursively get the children of this node
320            match build_market_hierarchy(market_service, session, Some(&node.id), depth + 1).await {
321                Ok(children) => {
322                    info!("Adding node {} with {} children", node.name, children.len());
323                    nodes.push(MarketNode {
324                        id: node.id.clone(),
325                        name: node.name.clone(),
326                        children,
327                        markets: Vec::new(),
328                    });
329                }
330                Err(e) => {
331                    error!("Error building hierarchy for node {}: {:?}", node.id, e);
332                    // Continuar con otros nodos incluso si uno falla
333                    if depth < 7 {
334                        nodes.push(MarketNode {
335                            id: node.id.clone(),
336                            name: format!("{} (error: {})", node.name, e),
337                            children: Vec::new(),
338                            markets: Vec::new(),
339                        });
340                    }
341                }
342            }
343        }
344
345        // Process all markets in this node
346        let markets_to_process = navigation.markets;
347        for market in markets_to_process {
348            debug!("Adding market: {}", market.instrument_name);
349            nodes.push(MarketNode {
350                id: market.epic.clone(),
351                name: market.instrument_name.clone(),
352                children: Vec::new(),
353                markets: vec![market],
354            });
355        }
356
357        Ok(nodes)
358    })
359}
360
361/// Recursively extract all markets from the hierarchy into a flat list
362pub fn extract_markets_from_hierarchy(
363    nodes: &[MarketNode],
364) -> Vec<crate::application::models::market::MarketData> {
365    let mut all_markets = Vec::new();
366
367    for node in nodes {
368        // Add markets from this node
369        all_markets.extend(node.markets.clone());
370
371        // Recursively add markets from child nodes
372        if !node.children.is_empty() {
373            all_markets.extend(extract_markets_from_hierarchy(&node.children));
374        }
375    }
376
377    all_markets
378}