1use crate::application::models::market::{MarketNavigationResponse, MarketNode};
2use crate::application::services::MarketService;
3use crate::error::AppError;
4use crate::presentation::serialization::{string_as_bool_opt, string_as_float_opt};
5use crate::session::interface::IgSession;
6use lightstreamer_rs::subscription::ItemUpdate;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10use std::pin::Pin;
11use tracing::{debug, error, info};
12
13#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
15#[serde(rename_all = "UPPERCASE")]
16pub enum MarketState {
17 Closed,
19 #[default]
21 Offline,
22 Tradeable,
24 Edit,
26 Auction,
28 AuctionNoEdit,
30 Suspended,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, Default)]
36pub struct MarketData {
37 pub item_name: String,
39 pub item_pos: i32,
41 pub fields: MarketFields,
43 pub changed_fields: MarketFields,
45 pub is_snapshot: bool,
47}
48
49impl MarketData {
50 pub fn from_item_update(item_update: &ItemUpdate) -> Result<Self, String> {
58 let item_name = item_update.item_name.clone().unwrap_or_default();
60
61 let item_pos = item_update.item_pos as i32;
63
64 let is_snapshot = item_update.is_snapshot;
66
67 let fields = Self::create_market_fields(&item_update.fields)?;
69
70 let mut changed_fields_map: HashMap<String, Option<String>> = HashMap::new();
72 for (key, value) in &item_update.changed_fields {
73 changed_fields_map.insert(key.clone(), Some(value.clone()));
74 }
75 let changed_fields = Self::create_market_fields(&changed_fields_map)?;
76
77 Ok(MarketData {
78 item_name,
79 item_pos,
80 fields,
81 changed_fields,
82 is_snapshot,
83 })
84 }
85
86 fn create_market_fields(
94 fields_map: &HashMap<String, Option<String>>,
95 ) -> Result<MarketFields, String> {
96 let get_field = |key: &str| -> Option<String> { fields_map.get(key).cloned().flatten() };
98
99 let market_state = match get_field("MARKET_STATE").as_deref() {
101 Some("closed") => Some(MarketState::Closed),
102 Some("offline") => Some(MarketState::Offline),
103 Some("tradeable") => Some(MarketState::Tradeable),
104 Some("edit") => Some(MarketState::Edit),
105 Some("auction") => Some(MarketState::Auction),
106 Some("auction_no_edit") => Some(MarketState::AuctionNoEdit),
107 Some("suspended") => Some(MarketState::Suspended),
108 Some(unknown) => return Err(format!("Unknown market state: {unknown}")),
109 None => None,
110 };
111
112 let market_delay = match get_field("MARKET_DELAY").as_deref() {
114 Some("0") => Some(false),
115 Some("1") => Some(true),
116 Some(val) => return Err(format!("Invalid MARKET_DELAY value: {val}")),
117 None => None,
118 };
119
120 let parse_float = |key: &str| -> Result<Option<f64>, String> {
122 match get_field(key) {
123 Some(val) if !val.is_empty() => val
124 .parse::<f64>()
125 .map(Some)
126 .map_err(|_| format!("Failed to parse {key} as float: {val}")),
127 _ => Ok(None),
128 }
129 };
130
131 Ok(MarketFields {
132 mid_open: parse_float("MID_OPEN")?,
133 high: parse_float("HIGH")?,
134 offer: parse_float("OFFER")?,
135 change: parse_float("CHANGE")?,
136 market_delay,
137 low: parse_float("LOW")?,
138 bid: parse_float("BID")?,
139 change_pct: parse_float("CHANGE_PCT")?,
140 market_state,
141 update_time: get_field("UPDATE_TIME"),
142 })
143 }
144}
145
146impl fmt::Display for MarketData {
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 let json = serde_json::to_string(self).map_err(|_| fmt::Error)?;
149 write!(f, "{json}")
150 }
151}
152
153impl From<&ItemUpdate> for MarketData {
154 fn from(item_update: &ItemUpdate) -> Self {
155 Self::from_item_update(item_update).unwrap_or_else(|_| MarketData {
156 item_name: String::new(),
157 item_pos: 0,
158 fields: MarketFields::default(),
159 changed_fields: MarketFields::default(),
160 is_snapshot: false,
161 })
162 }
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
167pub struct MarketFields {
168 #[serde(rename = "MID_OPEN")]
170 #[serde(with = "string_as_float_opt")]
171 #[serde(default)]
172 pub mid_open: Option<f64>,
173
174 #[serde(rename = "HIGH")]
176 #[serde(with = "string_as_float_opt")]
177 #[serde(default)]
178 pub high: Option<f64>,
179
180 #[serde(rename = "OFFER")]
182 #[serde(with = "string_as_float_opt")]
183 #[serde(default)]
184 pub offer: Option<f64>,
185
186 #[serde(rename = "CHANGE")]
188 #[serde(with = "string_as_float_opt")]
189 #[serde(default)]
190 pub change: Option<f64>,
191
192 #[serde(rename = "MARKET_DELAY")]
194 #[serde(with = "string_as_bool_opt")]
195 #[serde(default)]
196 pub market_delay: Option<bool>,
197
198 #[serde(rename = "LOW")]
200 #[serde(with = "string_as_float_opt")]
201 #[serde(default)]
202 pub low: Option<f64>,
203
204 #[serde(rename = "BID")]
206 #[serde(with = "string_as_float_opt")]
207 #[serde(default)]
208 pub bid: Option<f64>,
209
210 #[serde(rename = "CHANGE_PCT")]
212 #[serde(with = "string_as_float_opt")]
213 #[serde(default)]
214 pub change_pct: Option<f64>,
215
216 #[serde(rename = "MARKET_STATE")]
218 #[serde(default)]
219 pub market_state: Option<MarketState>,
220
221 #[serde(rename = "UPDATE_TIME")]
223 #[serde(default)]
224 pub update_time: Option<String>,
225}
226
227use once_cell::sync::Lazy;
228use std::sync::Arc;
229use tokio::sync::Semaphore;
230
231static API_SEMAPHORE: Lazy<Arc<Semaphore>> = Lazy::new(|| Arc::new(Semaphore::new(1)));
234
235pub fn build_market_hierarchy<'a>(
241 market_service: &'a impl MarketService,
242 session: &'a IgSession,
243 node_id: Option<&'a str>,
244 depth: usize,
245) -> Pin<Box<dyn Future<Output = Result<Vec<MarketNode>, AppError>> + 'a>> {
246 Box::pin(async move {
247 if depth > 7 {
249 debug!("Reached maximum depth of 5, stopping recursion");
250 return Ok(Vec::new());
251 }
252
253 let _permit = API_SEMAPHORE.clone().acquire_owned().await.unwrap();
256
257 let navigation: MarketNavigationResponse = match node_id {
262 Some(id) => {
263 debug!("Getting navigation node: {}", id);
264 match market_service.get_market_navigation_node(session, id).await {
265 Ok(response) => {
266 debug!(
267 "Response received for node {}: {} nodes, {} markets",
268 id,
269 response.nodes.len(),
270 response.markets.len()
271 );
272 response
273 }
274 Err(e) => {
275 error!("Error getting node {}: {:?}", id, e);
276 if matches!(e, AppError::RateLimitExceeded | AppError::Unexpected(_)) {
278 info!("Rate limit or API error encountered, returning partial results");
279 return Ok(Vec::new());
280 }
281 return Err(e);
282 }
283 }
284 }
285 None => {
286 debug!("Getting top-level navigation nodes");
287 match market_service.get_market_navigation(session).await {
288 Ok(response) => {
289 debug!(
290 "Response received for top-level nodes: {} nodes, {} markets",
291 response.nodes.len(),
292 response.markets.len()
293 );
294 response
295 }
296 Err(e) => {
297 error!("Error getting top-level nodes: {:?}", e);
298 return Err(e);
299 }
300 }
301 }
302 };
303
304 let mut nodes = Vec::new();
305
306 let nodes_to_process = navigation.nodes;
308
309 drop(_permit);
313
314 for node in nodes_to_process.into_iter() {
319 match build_market_hierarchy(market_service, session, Some(&node.id), depth + 1).await {
321 Ok(children) => {
322 info!("Adding node {} with {} children", node.name, children.len());
323 nodes.push(MarketNode {
324 id: node.id.clone(),
325 name: node.name.clone(),
326 children,
327 markets: Vec::new(),
328 });
329 }
330 Err(e) => {
331 error!("Error building hierarchy for node {}: {:?}", node.id, e);
332 if depth < 7 {
334 nodes.push(MarketNode {
335 id: node.id.clone(),
336 name: format!("{} (error: {})", node.name, e),
337 children: Vec::new(),
338 markets: Vec::new(),
339 });
340 }
341 }
342 }
343 }
344
345 let markets_to_process = navigation.markets;
347 for market in markets_to_process {
348 debug!("Adding market: {}", market.instrument_name);
349 nodes.push(MarketNode {
350 id: market.epic.clone(),
351 name: market.instrument_name.clone(),
352 children: Vec::new(),
353 markets: vec![market],
354 });
355 }
356
357 Ok(nodes)
358 })
359}
360
361pub fn extract_markets_from_hierarchy(
363 nodes: &[MarketNode],
364) -> Vec<crate::application::models::market::MarketData> {
365 let mut all_markets = Vec::new();
366
367 for node in nodes {
368 all_markets.extend(node.markets.clone());
370
371 if !node.children.is_empty() {
373 all_markets.extend(extract_markets_from_hierarchy(&node.children));
374 }
375 }
376
377 all_markets
378}