ig_client/presentation/
market.rs

1use crate::application::models::market::{MarketNavigationResponse, MarketNode};
2use crate::application::services::MarketService;
3use crate::error::AppError;
4use crate::presentation::serialization::{string_as_bool_opt, string_as_float_opt};
5use crate::session::interface::IgSession;
6use lightstreamer_rs::subscription::ItemUpdate;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10use std::pin::Pin;
11use tracing::{debug, error, info};
12
13/// Represents the current state of a market
14#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
15#[serde(rename_all = "UPPERCASE")]
16pub enum MarketState {
17    /// Market is closed for trading
18    Closed,
19    /// Market is offline and not available
20    #[default]
21    Offline,
22    /// Market is open and available for trading
23    Tradeable,
24    /// Market is in edit mode
25    Edit,
26    /// Market is in auction phase
27    Auction,
28    /// Market is in auction phase but editing is not allowed
29    AuctionNoEdit,
30    /// Market is temporarily suspended
31    Suspended,
32}
33
34/// Representation of market data received from the IG Markets streaming API
35#[derive(Debug, Clone, Serialize, Deserialize, Default)]
36pub struct MarketData {
37    /// Name of the item this data belongs to
38    item_name: String,
39    /// Position of the item in the subscription
40    item_pos: i32,
41    /// All market fields
42    fields: MarketFields,
43    /// Fields that have changed in this update
44    changed_fields: MarketFields,
45    /// Whether this is a snapshot or an update
46    is_snapshot: bool,
47}
48
49impl MarketData {
50    /// Converts an ItemUpdate from the Lightstreamer API to a MarketData object
51    ///
52    /// # Arguments
53    /// * `item_update` - The ItemUpdate received from the Lightstreamer API
54    ///
55    /// # Returns
56    /// * `Result<Self, String>` - The converted MarketData or an error message
57    pub fn from_item_update(item_update: &ItemUpdate) -> Result<Self, String> {
58        // Extract the item_name, defaulting to an empty string if None
59        let item_name = item_update.item_name.clone().unwrap_or_default();
60
61        // Convert item_pos from usize to i32
62        let item_pos = item_update.item_pos as i32;
63
64        // Extract is_snapshot
65        let is_snapshot = item_update.is_snapshot;
66
67        // Convert fields
68        let fields = Self::create_market_fields(&item_update.fields)?;
69
70        // Convert changed_fields by first creating a HashMap<String, Option<String>>
71        let mut changed_fields_map: HashMap<String, Option<String>> = HashMap::new();
72        for (key, value) in &item_update.changed_fields {
73            changed_fields_map.insert(key.clone(), Some(value.clone()));
74        }
75        let changed_fields = Self::create_market_fields(&changed_fields_map)?;
76
77        Ok(MarketData {
78            item_name,
79            item_pos,
80            fields,
81            changed_fields,
82            is_snapshot,
83        })
84    }
85
86    /// Helper method to create MarketFields from a HashMap of field values
87    ///
88    /// # Arguments
89    /// * `fields_map` - HashMap containing field names and their string values
90    ///
91    /// # Returns
92    /// * `Result<MarketFields, String>` - The parsed MarketFields or an error message
93    fn create_market_fields(
94        fields_map: &HashMap<String, Option<String>>,
95    ) -> Result<MarketFields, String> {
96        // Helper function to safely get a field value
97        let get_field = |key: &str| -> Option<String> { fields_map.get(key).cloned().flatten() };
98
99        // Parse market state
100        let market_state = match get_field("MARKET_STATE").as_deref() {
101            Some("closed") => Some(MarketState::Closed),
102            Some("offline") => Some(MarketState::Offline),
103            Some("tradeable") => Some(MarketState::Tradeable),
104            Some("edit") => Some(MarketState::Edit),
105            Some("auction") => Some(MarketState::Auction),
106            Some("auction_no_edit") => Some(MarketState::AuctionNoEdit),
107            Some("suspended") => Some(MarketState::Suspended),
108            Some(unknown) => return Err(format!("Unknown market state: {}", unknown)),
109            None => None,
110        };
111
112        // Parse boolean field
113        let market_delay = match get_field("MARKET_DELAY").as_deref() {
114            Some("0") => Some(false),
115            Some("1") => Some(true),
116            Some(val) => return Err(format!("Invalid MARKET_DELAY value: {}", val)),
117            None => None,
118        };
119
120        // Helper function to parse float values
121        let parse_float = |key: &str| -> Result<Option<f64>, String> {
122            match get_field(key) {
123                Some(val) if !val.is_empty() => val
124                    .parse::<f64>()
125                    .map(Some)
126                    .map_err(|_| format!("Failed to parse {} as float: {}", key, val)),
127                _ => Ok(None),
128            }
129        };
130
131        Ok(MarketFields {
132            mid_open: parse_float("MID_OPEN")?,
133            high: parse_float("HIGH")?,
134            offer: parse_float("OFFER")?,
135            change: parse_float("CHANGE")?,
136            market_delay,
137            low: parse_float("LOW")?,
138            bid: parse_float("BID")?,
139            change_pct: parse_float("CHANGE_PCT")?,
140            market_state,
141            update_time: get_field("UPDATE_TIME"),
142        })
143    }
144}
145
146impl fmt::Display for MarketData {
147    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148        let json = serde_json::to_string(self).map_err(|_| fmt::Error)?;
149        write!(f, "{}", json)
150    }
151}
152
153impl From<&ItemUpdate> for MarketData {
154    fn from(item_update: &ItemUpdate) -> Self {
155        Self::from_item_update(item_update).unwrap_or_else(|_| MarketData {
156            item_name: String::new(),
157            item_pos: 0,
158            fields: MarketFields::default(),
159            changed_fields: MarketFields::default(),
160            is_snapshot: false,
161        })
162    }
163}
164
165/// Fields containing market price and status information
166#[derive(Debug, Clone, Serialize, Deserialize, Default)]
167pub struct MarketFields {
168    #[serde(rename = "MID_OPEN")]
169    #[serde(with = "string_as_float_opt")]
170    #[serde(default)]
171    mid_open: Option<f64>,
172
173    #[serde(rename = "HIGH")]
174    #[serde(with = "string_as_float_opt")]
175    #[serde(default)]
176    high: Option<f64>,
177
178    #[serde(rename = "OFFER")]
179    #[serde(with = "string_as_float_opt")]
180    #[serde(default)]
181    offer: Option<f64>,
182
183    #[serde(rename = "CHANGE")]
184    #[serde(with = "string_as_float_opt")]
185    #[serde(default)]
186    change: Option<f64>,
187
188    #[serde(rename = "MARKET_DELAY")]
189    #[serde(with = "string_as_bool_opt")]
190    #[serde(default)]
191    market_delay: Option<bool>,
192
193    #[serde(rename = "LOW")]
194    #[serde(with = "string_as_float_opt")]
195    #[serde(default)]
196    low: Option<f64>,
197
198    #[serde(rename = "BID")]
199    #[serde(with = "string_as_float_opt")]
200    #[serde(default)]
201    bid: Option<f64>,
202
203    #[serde(rename = "CHANGE_PCT")]
204    #[serde(with = "string_as_float_opt")]
205    #[serde(default)]
206    change_pct: Option<f64>,
207
208    #[serde(rename = "MARKET_STATE")]
209    #[serde(default)]
210    market_state: Option<MarketState>,
211
212    #[serde(rename = "UPDATE_TIME")]
213    #[serde(default)]
214    update_time: Option<String>,
215}
216
217use once_cell::sync::Lazy;
218use std::sync::Arc;
219use tokio::sync::Semaphore;
220
221// Global semaphore to limit concurrency in API requests
222// This ensures that rate limits are not exceeded
223static API_SEMAPHORE: Lazy<Arc<Semaphore>> = Lazy::new(|| Arc::new(Semaphore::new(1)));
224
225/// Function to recursively build the market hierarchy with rate limiting
226///
227/// This function builds the market hierarchy recursively, respecting
228/// the API rate limits. It uses a semaphore to ensure that only
229/// one request is made at a time, thus avoiding exceeding rate limits.
230pub fn build_market_hierarchy<'a>(
231    market_service: &'a impl MarketService,
232    session: &'a IgSession,
233    node_id: Option<&'a str>,
234    depth: usize,
235) -> Pin<Box<dyn Future<Output = Result<Vec<MarketNode>, AppError>> + 'a>> {
236    Box::pin(async move {
237        // Limit the depth to avoid infinite loops
238        if depth > 7 {
239            debug!("Reached maximum depth of 5, stopping recursion");
240            return Ok(Vec::new());
241        }
242
243        // Acquire the semaphore to limit concurrency
244        // This ensures that only one API request is made at a time
245        let _permit = API_SEMAPHORE.clone().acquire_owned().await.unwrap();
246
247        // The rate limiter will handle any necessary delays between requests
248        // No explicit sleep calls are needed here
249
250        // Get the nodes and markets at the current level
251        let navigation: MarketNavigationResponse = match node_id {
252            Some(id) => {
253                debug!("Getting navigation node: {}", id);
254                match market_service.get_market_navigation_node(session, id).await {
255                    Ok(response) => {
256                        debug!(
257                            "Response received for node {}: {} nodes, {} markets",
258                            id,
259                            response.nodes.len(),
260                            response.markets.len()
261                        );
262                        response
263                    }
264                    Err(e) => {
265                        error!("Error getting node {}: {:?}", id, e);
266                        // If we hit a rate limit, return empty results instead of failing
267                        if matches!(e, AppError::RateLimitExceeded | AppError::Unexpected(_)) {
268                            info!("Rate limit or API error encountered, returning partial results");
269                            return Ok(Vec::new());
270                        }
271                        return Err(e);
272                    }
273                }
274            }
275            None => {
276                debug!("Getting top-level navigation nodes");
277                match market_service.get_market_navigation(session).await {
278                    Ok(response) => {
279                        debug!(
280                            "Response received for top-level nodes: {} nodes, {} markets",
281                            response.nodes.len(),
282                            response.markets.len()
283                        );
284                        response
285                    }
286                    Err(e) => {
287                        error!("Error getting top-level nodes: {:?}", e);
288                        return Err(e);
289                    }
290                }
291            }
292        };
293
294        let mut nodes = Vec::new();
295
296        // Process all nodes at this level
297        let nodes_to_process = navigation.nodes;
298
299        // Release the semaphore before processing child nodes
300        // This allows other requests to be processed while we wait
301        // for recursive requests to complete
302        drop(_permit);
303
304        // Process nodes sequentially with rate limiting
305        // This is important to respect the API rate limits
306        // By processing nodes sequentially, we allow the rate limiter
307        // to properly control the flow of requests
308        for node in nodes_to_process.into_iter() {
309            // Recursively get the children of this node
310            match build_market_hierarchy(market_service, session, Some(&node.id), depth + 1).await {
311                Ok(children) => {
312                    info!("Adding node {} with {} children", node.name, children.len());
313                    nodes.push(MarketNode {
314                        id: node.id.clone(),
315                        name: node.name.clone(),
316                        children,
317                        markets: Vec::new(),
318                    });
319                }
320                Err(e) => {
321                    error!("Error building hierarchy for node {}: {:?}", node.id, e);
322                    // Continuar con otros nodos incluso si uno falla
323                    if depth < 7 {
324                        nodes.push(MarketNode {
325                            id: node.id.clone(),
326                            name: format!("{} (error: {})", node.name, e),
327                            children: Vec::new(),
328                            markets: Vec::new(),
329                        });
330                    }
331                }
332            }
333        }
334
335        // Process all markets in this node
336        let markets_to_process = navigation.markets;
337        for market in markets_to_process {
338            debug!("Adding market: {}", market.instrument_name);
339            nodes.push(MarketNode {
340                id: market.epic.clone(),
341                name: market.instrument_name.clone(),
342                children: Vec::new(),
343                markets: vec![market],
344            });
345        }
346
347        Ok(nodes)
348    })
349}
350
351/// Recursively extract all markets from the hierarchy into a flat list
352pub fn extract_markets_from_hierarchy(
353    nodes: &[MarketNode],
354) -> Vec<crate::application::models::market::MarketData> {
355    let mut all_markets = Vec::new();
356
357    for node in nodes {
358        // Add markets from this node
359        all_markets.extend(node.markets.clone());
360
361        // Recursively add markets from child nodes
362        if !node.children.is_empty() {
363            all_markets.extend(extract_markets_from_hierarchy(&node.children));
364        }
365    }
366
367    all_markets
368}