1use crate::application::models::market::{MarketNavigationResponse, MarketNode};
2use crate::application::services::MarketService;
3use crate::error::AppError;
4use crate::presentation::serialization::{string_as_bool_opt, string_as_float_opt};
5use crate::session::interface::IgSession;
6use lightstreamer_rs::subscription::ItemUpdate;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10use std::pin::Pin;
11use tracing::{debug, error, info};
12
13#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
15#[serde(rename_all = "UPPERCASE")]
16pub enum MarketState {
17 Closed,
19 #[default]
21 Offline,
22 Tradeable,
24 Edit,
26 Auction,
28 AuctionNoEdit,
30 Suspended,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, Default)]
36pub struct MarketData {
37 item_name: String,
39 item_pos: i32,
41 fields: MarketFields,
43 changed_fields: MarketFields,
45 is_snapshot: bool,
47}
48
49impl MarketData {
50 pub fn from_item_update(item_update: &ItemUpdate) -> Result<Self, String> {
58 let item_name = item_update.item_name.clone().unwrap_or_default();
60
61 let item_pos = item_update.item_pos as i32;
63
64 let is_snapshot = item_update.is_snapshot;
66
67 let fields = Self::create_market_fields(&item_update.fields)?;
69
70 let mut changed_fields_map: HashMap<String, Option<String>> = HashMap::new();
72 for (key, value) in &item_update.changed_fields {
73 changed_fields_map.insert(key.clone(), Some(value.clone()));
74 }
75 let changed_fields = Self::create_market_fields(&changed_fields_map)?;
76
77 Ok(MarketData {
78 item_name,
79 item_pos,
80 fields,
81 changed_fields,
82 is_snapshot,
83 })
84 }
85
86 fn create_market_fields(
94 fields_map: &HashMap<String, Option<String>>,
95 ) -> Result<MarketFields, String> {
96 let get_field = |key: &str| -> Option<String> { fields_map.get(key).cloned().flatten() };
98
99 let market_state = match get_field("MARKET_STATE").as_deref() {
101 Some("closed") => Some(MarketState::Closed),
102 Some("offline") => Some(MarketState::Offline),
103 Some("tradeable") => Some(MarketState::Tradeable),
104 Some("edit") => Some(MarketState::Edit),
105 Some("auction") => Some(MarketState::Auction),
106 Some("auction_no_edit") => Some(MarketState::AuctionNoEdit),
107 Some("suspended") => Some(MarketState::Suspended),
108 Some(unknown) => return Err(format!("Unknown market state: {}", unknown)),
109 None => None,
110 };
111
112 let market_delay = match get_field("MARKET_DELAY").as_deref() {
114 Some("0") => Some(false),
115 Some("1") => Some(true),
116 Some(val) => return Err(format!("Invalid MARKET_DELAY value: {}", val)),
117 None => None,
118 };
119
120 let parse_float = |key: &str| -> Result<Option<f64>, String> {
122 match get_field(key) {
123 Some(val) if !val.is_empty() => val
124 .parse::<f64>()
125 .map(Some)
126 .map_err(|_| format!("Failed to parse {} as float: {}", key, val)),
127 _ => Ok(None),
128 }
129 };
130
131 Ok(MarketFields {
132 mid_open: parse_float("MID_OPEN")?,
133 high: parse_float("HIGH")?,
134 offer: parse_float("OFFER")?,
135 change: parse_float("CHANGE")?,
136 market_delay,
137 low: parse_float("LOW")?,
138 bid: parse_float("BID")?,
139 change_pct: parse_float("CHANGE_PCT")?,
140 market_state,
141 update_time: get_field("UPDATE_TIME"),
142 })
143 }
144}
145
146impl fmt::Display for MarketData {
147 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
148 let json = serde_json::to_string(self).map_err(|_| fmt::Error)?;
149 write!(f, "{}", json)
150 }
151}
152
153impl From<&ItemUpdate> for MarketData {
154 fn from(item_update: &ItemUpdate) -> Self {
155 Self::from_item_update(item_update).unwrap_or_else(|_| MarketData {
156 item_name: String::new(),
157 item_pos: 0,
158 fields: MarketFields::default(),
159 changed_fields: MarketFields::default(),
160 is_snapshot: false,
161 })
162 }
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize, Default)]
167pub struct MarketFields {
168 #[serde(rename = "MID_OPEN")]
169 #[serde(with = "string_as_float_opt")]
170 #[serde(default)]
171 mid_open: Option<f64>,
172
173 #[serde(rename = "HIGH")]
174 #[serde(with = "string_as_float_opt")]
175 #[serde(default)]
176 high: Option<f64>,
177
178 #[serde(rename = "OFFER")]
179 #[serde(with = "string_as_float_opt")]
180 #[serde(default)]
181 offer: Option<f64>,
182
183 #[serde(rename = "CHANGE")]
184 #[serde(with = "string_as_float_opt")]
185 #[serde(default)]
186 change: Option<f64>,
187
188 #[serde(rename = "MARKET_DELAY")]
189 #[serde(with = "string_as_bool_opt")]
190 #[serde(default)]
191 market_delay: Option<bool>,
192
193 #[serde(rename = "LOW")]
194 #[serde(with = "string_as_float_opt")]
195 #[serde(default)]
196 low: Option<f64>,
197
198 #[serde(rename = "BID")]
199 #[serde(with = "string_as_float_opt")]
200 #[serde(default)]
201 bid: Option<f64>,
202
203 #[serde(rename = "CHANGE_PCT")]
204 #[serde(with = "string_as_float_opt")]
205 #[serde(default)]
206 change_pct: Option<f64>,
207
208 #[serde(rename = "MARKET_STATE")]
209 #[serde(default)]
210 market_state: Option<MarketState>,
211
212 #[serde(rename = "UPDATE_TIME")]
213 #[serde(default)]
214 update_time: Option<String>,
215}
216
217use once_cell::sync::Lazy;
218use std::sync::Arc;
219use tokio::sync::Semaphore;
220
221static API_SEMAPHORE: Lazy<Arc<Semaphore>> = Lazy::new(|| Arc::new(Semaphore::new(1)));
224
225pub fn build_market_hierarchy<'a>(
231 market_service: &'a impl MarketService,
232 session: &'a IgSession,
233 node_id: Option<&'a str>,
234 depth: usize,
235) -> Pin<Box<dyn Future<Output = Result<Vec<MarketNode>, AppError>> + 'a>> {
236 Box::pin(async move {
237 if depth > 7 {
239 debug!("Reached maximum depth of 5, stopping recursion");
240 return Ok(Vec::new());
241 }
242
243 let _permit = API_SEMAPHORE.clone().acquire_owned().await.unwrap();
246
247 let navigation: MarketNavigationResponse = match node_id {
252 Some(id) => {
253 debug!("Getting navigation node: {}", id);
254 match market_service.get_market_navigation_node(session, id).await {
255 Ok(response) => {
256 debug!(
257 "Response received for node {}: {} nodes, {} markets",
258 id,
259 response.nodes.len(),
260 response.markets.len()
261 );
262 response
263 }
264 Err(e) => {
265 error!("Error getting node {}: {:?}", id, e);
266 if matches!(e, AppError::RateLimitExceeded | AppError::Unexpected(_)) {
268 info!("Rate limit or API error encountered, returning partial results");
269 return Ok(Vec::new());
270 }
271 return Err(e);
272 }
273 }
274 }
275 None => {
276 debug!("Getting top-level navigation nodes");
277 match market_service.get_market_navigation(session).await {
278 Ok(response) => {
279 debug!(
280 "Response received for top-level nodes: {} nodes, {} markets",
281 response.nodes.len(),
282 response.markets.len()
283 );
284 response
285 }
286 Err(e) => {
287 error!("Error getting top-level nodes: {:?}", e);
288 return Err(e);
289 }
290 }
291 }
292 };
293
294 let mut nodes = Vec::new();
295
296 let nodes_to_process = navigation.nodes;
298
299 drop(_permit);
303
304 for node in nodes_to_process.into_iter() {
309 match build_market_hierarchy(market_service, session, Some(&node.id), depth + 1).await {
311 Ok(children) => {
312 info!("Adding node {} with {} children", node.name, children.len());
313 nodes.push(MarketNode {
314 id: node.id.clone(),
315 name: node.name.clone(),
316 children,
317 markets: Vec::new(),
318 });
319 }
320 Err(e) => {
321 error!("Error building hierarchy for node {}: {:?}", node.id, e);
322 if depth < 7 {
324 nodes.push(MarketNode {
325 id: node.id.clone(),
326 name: format!("{} (error: {})", node.name, e),
327 children: Vec::new(),
328 markets: Vec::new(),
329 });
330 }
331 }
332 }
333 }
334
335 let markets_to_process = navigation.markets;
337 for market in markets_to_process {
338 debug!("Adding market: {}", market.instrument_name);
339 nodes.push(MarketNode {
340 id: market.epic.clone(),
341 name: market.instrument_name.clone(),
342 children: Vec::new(),
343 markets: vec![market],
344 });
345 }
346
347 Ok(nodes)
348 })
349}
350
351pub fn extract_markets_from_hierarchy(
353 nodes: &[MarketNode],
354) -> Vec<crate::application::models::market::MarketData> {
355 let mut all_markets = Vec::new();
356
357 for node in nodes {
358 all_markets.extend(node.markets.clone());
360
361 if !node.children.is_empty() {
363 all_markets.extend(extract_markets_from_hierarchy(&node.children));
364 }
365 }
366
367 all_markets
368}