ig_client/application/services/
market_service.rs1use crate::application::services::MarketService;
2use crate::application::services::types::DBEntry;
3use crate::{
4 application::models::market::{
5 HistoricalPricesResponse, MarketDetails, MarketNavigationResponse, MarketSearchResult,
6 },
7 config::Config,
8 error::AppError,
9 session::interface::IgSession,
10 transport::http_client::IgHttpClient,
11};
12use async_trait::async_trait;
13use reqwest::Method;
14use std::collections::{HashMap, HashSet};
15use std::sync::Arc;
16use tracing::{debug, error, info};
17
18pub struct MarketServiceImpl<T: IgHttpClient> {
20 config: Arc<Config>,
21 client: Arc<T>,
22}
23
24impl<T: IgHttpClient> MarketServiceImpl<T> {
25 pub fn new(config: Arc<Config>, client: Arc<T>) -> Self {
27 Self { config, client }
28 }
29
30 pub fn get_config(&self) -> &Config {
35 &self.config
36 }
37
38 pub fn set_config(&mut self, config: Arc<Config>) {
43 self.config = config;
44 }
45}
46
47#[async_trait]
48impl<T: IgHttpClient + 'static> MarketService for MarketServiceImpl<T> {
49 async fn get_all_markets(
63 &self,
64 session: &IgSession,
65 ) -> Result<Vec<crate::application::models::market::MarketData>, AppError> {
66 let max_depth = 6;
67 info!(
68 "Starting comprehensive market hierarchy traversal (max {} levels)",
69 max_depth
70 );
71
72 let root_response = self.get_market_navigation(session).await?;
74 info!(
75 "Root navigation: {} nodes, {} markets at top level",
76 root_response.nodes.len(),
77 root_response.markets.len()
78 );
79
80 let mut all_markets = root_response.markets.clone();
82
83 let mut nodes_to_process = root_response.nodes.clone();
85 let mut processed_levels = 0;
86
87 while !nodes_to_process.is_empty() && processed_levels < max_depth {
88 let mut next_level_nodes = Vec::new();
89 let mut level_market_count = 0;
90
91 info!(
92 "Processing level {} with {} nodes",
93 processed_levels,
94 nodes_to_process.len()
95 );
96
97 for node in &nodes_to_process {
98 match self.get_market_navigation_node(session, &node.id).await {
99 Ok(node_response) => {
100 let node_markets = node_response.markets.len();
101 let node_children = node_response.nodes.len();
102
103 if node_markets > 0 || node_children > 0 {
104 debug!(
105 "Node '{}' (level {}): {} markets, {} child nodes",
106 node.name, processed_levels, node_markets, node_children
107 );
108 }
109
110 all_markets.extend(node_response.markets);
112 level_market_count += node_markets;
113
114 next_level_nodes.extend(node_response.nodes);
116 }
117 Err(e) => {
118 error!(
119 "Failed to get markets for node '{}' at level {}: {:?}",
120 node.name, processed_levels, e
121 );
122 }
123 }
124 }
125
126 info!(
127 "Level {} completed: {} markets found, {} nodes for next level",
128 processed_levels,
129 level_market_count,
130 next_level_nodes.len()
131 );
132
133 nodes_to_process = next_level_nodes;
134 processed_levels += 1;
135 }
136
137 info!(
138 "Market hierarchy traversal completed: {} total markets found across {} levels",
139 all_markets.len(),
140 processed_levels
141 );
142
143 Ok(all_markets)
144 }
145
146 async fn search_markets(
147 &self,
148 session: &IgSession,
149 search_term: &str,
150 ) -> Result<MarketSearchResult, AppError> {
151 let path = format!("markets?searchTerm={search_term}");
152 info!("Searching markets with term: {}", search_term);
153
154 let result = self
155 .client
156 .request::<(), MarketSearchResult>(Method::GET, &path, session, None, "1")
157 .await?;
158
159 debug!("{} markets found", result.markets.len());
160 Ok(result)
161 }
162
163 async fn get_market_details(
164 &self,
165 session: &IgSession,
166 epic: &str,
167 ) -> Result<MarketDetails, AppError> {
168 let path = format!("markets/{epic}");
169 info!("Getting market details: {}", epic);
170
171 let result = self
172 .client
173 .request::<(), MarketDetails>(Method::GET, &path, session, None, "3")
174 .await?;
175
176 debug!("Market details obtained for: {}", epic);
177 Ok(result)
178 }
179
180 async fn get_multiple_market_details(
181 &self,
182 session: &IgSession,
183 epics: &[String],
184 ) -> Result<Vec<MarketDetails>, AppError> {
185 if epics.is_empty() {
186 return Ok(Vec::new());
187 } else if epics.len() > 50 {
188 return Err(AppError::InvalidInput(
189 "The maximum number of EPICs is 50".to_string(),
190 ));
191 }
192
193 let epics_str = epics.join(",");
195 let path = format!("markets?epics={epics_str}");
196
197 debug!(
198 "Getting market details for {} EPICs in a batch: {}",
199 epics.len(),
200 epics_str
201 );
202
203 #[derive(serde::Deserialize)]
205 struct MarketDetailsResponse {
206 #[serde(rename = "marketDetails")]
207 market_details: Vec<MarketDetails>,
208 }
209
210 let response = self
211 .client
212 .request::<(), MarketDetailsResponse>(Method::GET, &path, session, None, "2")
213 .await?;
214
215 debug!(
216 "Market details obtained for {} EPICs",
217 response.market_details.len()
218 );
219 Ok(response.market_details)
220 }
221
222 async fn get_historical_prices(
223 &self,
224 session: &IgSession,
225 epic: &str,
226 resolution: &str,
227 from: &str,
228 to: &str,
229 ) -> Result<HistoricalPricesResponse, AppError> {
230 let path = format!("prices/{epic}?resolution={resolution}&from={from}&to={to}");
231 info!("Getting historical prices for: {}", epic);
232
233 let result = self
234 .client
235 .request::<(), HistoricalPricesResponse>(Method::GET, &path, session, None, "3")
236 .await?;
237
238 debug!("Historical prices obtained for: {}", epic);
239 Ok(result)
240 }
241
242 async fn get_market_navigation(
243 &self,
244 session: &IgSession,
245 ) -> Result<MarketNavigationResponse, AppError> {
246 let path = "marketnavigation";
247 info!("Getting top-level market navigation nodes");
248
249 let result = self
250 .client
251 .request::<(), MarketNavigationResponse>(Method::GET, path, session, None, "1")
252 .await?;
253
254 debug!("{} navigation nodes found", result.nodes.len());
255 debug!("{} markets found at root level", result.markets.len());
256 Ok(result)
257 }
258
259 async fn get_market_navigation_node(
260 &self,
261 session: &IgSession,
262 node_id: &str,
263 ) -> Result<MarketNavigationResponse, AppError> {
264 let path = format!("marketnavigation/{node_id}");
265 info!("Getting market navigation node: {}", node_id);
266
267 let result = self
268 .client
269 .request::<(), MarketNavigationResponse>(Method::GET, &path, session, None, "1")
270 .await?;
271
272 debug!("{} child nodes found", result.nodes.len());
273 debug!("{} markets found in node {}", result.markets.len(), node_id);
274 Ok(result)
275 }
276
277 async fn get_vec_db_entries(&self, session: &IgSession) -> Result<Vec<DBEntry>, AppError> {
278 info!("Getting all markets from hierarchy for DB entries");
279
280 let all_markets = self.get_all_markets(session).await?;
282
283 info!("Collected {} markets from hierarchy", all_markets.len());
284
285 let mut vec_db_entries: Vec<DBEntry> = all_markets
287 .iter()
288 .map(DBEntry::from)
289 .filter(|entry| !entry.epic.is_empty()) .collect();
291
292 info!("Created {} DB entries from markets", vec_db_entries.len());
293
294 let mut symbol_expiry_map: HashMap<String, String> = HashMap::new();
301
302 let unique_symbols: HashSet<String> = vec_db_entries
304 .iter()
305 .map(|entry| entry.symbol.clone())
306 .filter(|symbol| !symbol.is_empty())
307 .collect();
308
309 info!(
310 "Found {} unique symbols to fetch expiry dates for",
311 unique_symbols.len()
312 );
313
314 for symbol in unique_symbols {
316 if let Some(entry) = vec_db_entries
318 .iter()
319 .find(|e| e.symbol == symbol && !e.epic.is_empty())
320 {
321 match self.get_market_details(session, &entry.epic).await {
322 Ok(market_details) => {
323 let expiry_date = market_details
325 .instrument
326 .expiry_details
327 .as_ref()
328 .map(|details| details.last_dealing_date.clone())
329 .unwrap_or_else(|| {
330 market_details.instrument.expiry.clone()
332 });
333
334 symbol_expiry_map.insert(symbol.clone(), expiry_date);
335 info!(
336 "Fetched expiry date for symbol {}: {}",
337 symbol,
338 symbol_expiry_map.get(&symbol).unwrap()
339 );
340 }
341 Err(e) => {
342 error!(
343 "Failed to get market details for epic {} (symbol {}): {:?}",
344 entry.epic, symbol, e
345 );
346 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
348 }
349 }
350 }
351 }
352
353 for entry in &mut vec_db_entries {
355 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
356 entry.expiry = expiry_date.clone();
357 }
358 }
359
360 info!("Updated expiry dates for {} entries", vec_db_entries.len());
361 Ok(vec_db_entries)
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use crate::config::Config;
369 use crate::transport::http_client::IgHttpClientImpl;
370 use crate::utils::rate_limiter::RateLimitType;
371 use std::sync::Arc;
372
373 #[test]
374 fn test_get_and_set_config() {
375 let config = Arc::new(Config::with_rate_limit_type(
376 RateLimitType::NonTradingAccount,
377 0.7,
378 ));
379 let client = Arc::new(IgHttpClientImpl::new(config.clone()));
380 let mut service = MarketServiceImpl::new(config.clone(), client.clone());
381 assert!(std::ptr::eq(service.get_config(), &*config));
382 let new_cfg = Arc::new(Config::default());
383 service.set_config(new_cfg.clone());
384 assert!(std::ptr::eq(service.get_config(), &*new_cfg));
385 }
386}