1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17 CategoriesResponse, CategoryInstrumentsResponse, DBEntryResponse, HistoricalPricesResponse,
18 MarketNavigationResponse, MarketSearchResponse, MultipleMarketDetailsResponse,
19};
20use crate::model::responses::{
21 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
22};
23use crate::model::streaming::{
24 StreamingAccountDataField, StreamingChartField, StreamingMarketField, StreamingPriceField,
25 get_streaming_account_data_fields, get_streaming_chart_fields, get_streaming_market_fields,
26 get_streaming_price_fields,
27};
28use crate::prelude::{
29 AccountActivityResponse, AccountFields, AccountsResponse, ChartData, ChartScale,
30 OrderConfirmationResponse, PositionsResponse, TradeFields, TransactionHistoryResponse,
31 WorkingOrdersResponse,
32};
33use crate::presentation::market::{MarketData, MarketDetails};
34use crate::presentation::price::PriceData;
35use async_trait::async_trait;
36use lightstreamer_rs::client::{LightstreamerClient, LogType, Transport};
37use lightstreamer_rs::subscription::{
38 ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
39};
40use lightstreamer_rs::utils::setup_signal_hook;
41use serde_json::Value;
42use std::collections::HashSet;
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::{Mutex, Notify, RwLock, mpsc};
46use tokio::time::sleep;
47use tracing::{debug, error, info, warn};
48
49const MAX_CONNECTION_ATTEMPTS: u64 = 3;
50
51pub struct Client {
56 http_client: Arc<HttpClient>,
57}
58
59impl Client {
60 pub fn new() -> Self {
65 let http_client = Arc::new(HttpClient::default());
66 Self { http_client }
67 }
68
69 pub async fn get_ws_info(&self) -> WebsocketInfo {
74 self.http_client.get_ws_info().await
75 }
76}
77
78impl Default for Client {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84#[async_trait]
85impl MarketService for Client {
86 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
87 let path = format!("markets?searchTerm={}", search_term);
88 info!("Searching markets with term: {}", search_term);
89 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
90 debug!("{} markets found", result.markets.len());
91 Ok(result)
92 }
93
94 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
95 let path = format!("markets/{epic}");
96 info!("Getting market details: {}", epic);
97 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
98 let market_details: MarketDetails = serde_json::from_value(market_value)?;
99 debug!("Market details obtained for: {}", epic);
100 Ok(market_details)
101 }
102
103 async fn get_multiple_market_details(
104 &self,
105 epics: &[String],
106 ) -> Result<MultipleMarketDetailsResponse, AppError> {
107 if epics.is_empty() {
108 return Ok(MultipleMarketDetailsResponse::default());
109 } else if epics.len() > 50 {
110 return Err(AppError::InvalidInput(
111 "The maximum number of EPICs is 50".to_string(),
112 ));
113 }
114
115 let epics_str = epics.join(",");
116 let path = format!("markets?epics={}", epics_str);
117 debug!(
118 "Getting market details for {} EPICs in a batch",
119 epics.len()
120 );
121
122 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
123
124 Ok(response)
125 }
126
127 async fn get_historical_prices(
128 &self,
129 epic: &str,
130 resolution: &str,
131 from: &str,
132 to: &str,
133 ) -> Result<HistoricalPricesResponse, AppError> {
134 let path = format!(
135 "prices/{}?resolution={}&from={}&to={}",
136 epic, resolution, from, to
137 );
138 info!("Getting historical prices for: {}", epic);
139 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
140 debug!("Historical prices obtained for: {}", epic);
141 Ok(result)
142 }
143
144 async fn get_historical_prices_by_date_range(
145 &self,
146 epic: &str,
147 resolution: &str,
148 start_date: &str,
149 end_date: &str,
150 ) -> Result<HistoricalPricesResponse, AppError> {
151 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
152 info!(
153 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
154 epic, resolution, start_date, end_date
155 );
156 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
157 debug!(
158 "Historical prices obtained for epic: {}, {} data points",
159 epic,
160 result.prices.len()
161 );
162 Ok(result)
163 }
164
165 async fn get_recent_prices(
166 &self,
167 params: &RecentPricesRequest<'_>,
168 ) -> Result<HistoricalPricesResponse, AppError> {
169 let mut query_params = Vec::new();
170
171 if let Some(res) = params.resolution {
172 query_params.push(format!("resolution={}", res));
173 }
174 if let Some(f) = params.from {
175 query_params.push(format!("from={}", f));
176 }
177 if let Some(t) = params.to {
178 query_params.push(format!("to={}", t));
179 }
180 if let Some(max) = params.max_points {
181 query_params.push(format!("max={}", max));
182 }
183 if let Some(size) = params.page_size {
184 query_params.push(format!("pageSize={}", size));
185 }
186 if let Some(num) = params.page_number {
187 query_params.push(format!("pageNumber={}", num));
188 }
189
190 let query_string = if query_params.is_empty() {
191 String::new()
192 } else {
193 format!("?{}", query_params.join("&"))
194 };
195
196 let path = format!("prices/{}{}", params.epic, query_string);
197 info!("Getting recent prices for epic: {}", params.epic);
198 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
199 debug!(
200 "Recent prices obtained for epic: {}, {} data points",
201 params.epic,
202 result.prices.len()
203 );
204 Ok(result)
205 }
206
207 async fn get_historical_prices_by_count_v1(
208 &self,
209 epic: &str,
210 resolution: &str,
211 num_points: i32,
212 ) -> Result<HistoricalPricesResponse, AppError> {
213 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
214 info!(
215 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
216 epic, resolution, num_points
217 );
218 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
219 debug!(
220 "Historical prices (v1) obtained for epic: {}, {} data points",
221 epic,
222 result.prices.len()
223 );
224 Ok(result)
225 }
226
227 async fn get_historical_prices_by_count_v2(
228 &self,
229 epic: &str,
230 resolution: &str,
231 num_points: i32,
232 ) -> Result<HistoricalPricesResponse, AppError> {
233 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
234 info!(
235 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
236 epic, resolution, num_points
237 );
238 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
239 debug!(
240 "Historical prices (v2) obtained for epic: {}, {} data points",
241 epic,
242 result.prices.len()
243 );
244 Ok(result)
245 }
246
247 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
248 let path = "marketnavigation";
249 info!("Getting top-level market navigation nodes");
250 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
251 debug!("{} navigation nodes found", result.nodes.len());
252 debug!("{} markets found at root level", result.markets.len());
253 Ok(result)
254 }
255
256 async fn get_market_navigation_node(
257 &self,
258 node_id: &str,
259 ) -> Result<MarketNavigationResponse, AppError> {
260 let path = format!("marketnavigation/{}", node_id);
261 info!("Getting market navigation node: {}", node_id);
262 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
263 debug!("{} child nodes found", result.nodes.len());
264 debug!("{} markets found in node {}", result.markets.len(), node_id);
265 Ok(result)
266 }
267
268 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
269 let max_depth = 6;
270 info!(
271 "Starting comprehensive market hierarchy traversal (max {} levels)",
272 max_depth
273 );
274
275 let root_response = self.get_market_navigation().await?;
276 info!(
277 "Root navigation: {} nodes, {} markets at top level",
278 root_response.nodes.len(),
279 root_response.markets.len()
280 );
281
282 let mut all_markets = root_response.markets.clone();
283 let mut nodes_to_process = root_response.nodes.clone();
284 let mut processed_levels = 0;
285
286 while !nodes_to_process.is_empty() && processed_levels < max_depth {
287 let mut next_level_nodes = Vec::new();
288 let mut level_market_count = 0;
289
290 info!(
291 "Processing level {} with {} nodes",
292 processed_levels,
293 nodes_to_process.len()
294 );
295
296 for node in &nodes_to_process {
297 match self.get_market_navigation_node(&node.id).await {
298 Ok(node_response) => {
299 let node_markets = node_response.markets.len();
300 let node_children = node_response.nodes.len();
301
302 if node_markets > 0 || node_children > 0 {
303 debug!(
304 "Node '{}' (level {}): {} markets, {} child nodes",
305 node.name, processed_levels, node_markets, node_children
306 );
307 }
308
309 all_markets.extend(node_response.markets);
310 level_market_count += node_markets;
311 next_level_nodes.extend(node_response.nodes);
312 }
313 Err(e) => {
314 tracing::error!(
315 "Failed to get markets for node '{}' at level {}: {:?}",
316 node.name,
317 processed_levels,
318 e
319 );
320 }
321 }
322 }
323
324 info!(
325 "Level {} completed: {} markets found, {} nodes for next level",
326 processed_levels,
327 level_market_count,
328 next_level_nodes.len()
329 );
330
331 nodes_to_process = next_level_nodes;
332 processed_levels += 1;
333 }
334
335 info!(
336 "Market hierarchy traversal completed: {} total markets found across {} levels",
337 all_markets.len(),
338 processed_levels
339 );
340
341 Ok(all_markets)
342 }
343
344 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
345 info!("Getting all markets from hierarchy for DB entries");
346
347 let all_markets = self.get_all_markets().await?;
348 info!("Collected {} markets from hierarchy", all_markets.len());
349
350 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
351 .iter()
352 .map(DBEntryResponse::from)
353 .filter(|entry| !entry.epic.is_empty())
354 .collect();
355
356 info!("Created {} DB entries from markets", vec_db_entries.len());
357
358 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
360 .iter()
361 .map(|entry| entry.symbol.clone())
362 .filter(|symbol| !symbol.is_empty())
363 .collect();
364
365 info!(
366 "Found {} unique symbols to fetch expiry dates for",
367 unique_symbols.len()
368 );
369
370 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
371 std::collections::HashMap::new();
372
373 for symbol in unique_symbols {
374 if let Some(entry) = vec_db_entries
375 .iter()
376 .find(|e| e.symbol == symbol && !e.epic.is_empty())
377 {
378 match self.get_market_details(&entry.epic).await {
379 Ok(market_details) => {
380 let expiry_date = market_details
381 .instrument
382 .expiry_details
383 .as_ref()
384 .map(|details| details.last_dealing_date.clone())
385 .unwrap_or_else(|| market_details.instrument.expiry.clone());
386
387 symbol_expiry_map.insert(symbol.clone(), expiry_date);
388 if let Some(expiry) = symbol_expiry_map.get(&symbol) {
389 info!("Fetched expiry date for symbol {}: {}", symbol, expiry);
390 }
391 }
392 Err(e) => {
393 tracing::error!(
394 "Failed to get market details for epic {} (symbol {}): {:?}",
395 entry.epic,
396 symbol,
397 e
398 );
399 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
400 }
401 }
402 }
403 }
404
405 for entry in &mut vec_db_entries {
406 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
407 entry.expiry = expiry_date.clone();
408 }
409 }
410
411 info!("Updated expiry dates for {} entries", vec_db_entries.len());
412 Ok(vec_db_entries)
413 }
414
415 async fn get_categories(&self) -> Result<CategoriesResponse, AppError> {
416 info!("Getting all categories of instruments");
417 let result: CategoriesResponse = self.http_client.get("categories", Some(1)).await?;
418 debug!("{} categories found", result.categories.len());
419 Ok(result)
420 }
421
422 async fn get_category_instruments(
423 &self,
424 category_id: &str,
425 page_number: Option<i32>,
426 page_size: Option<i32>,
427 ) -> Result<CategoryInstrumentsResponse, AppError> {
428 let mut path = format!("categories/{}/instruments", category_id);
429
430 let mut query_params = Vec::new();
431 if let Some(page) = page_number {
432 query_params.push(format!("pageNumber={}", page));
433 }
434 if let Some(size) = page_size {
435 if size > 1000 {
436 return Err(AppError::InvalidInput(
437 "pageSize cannot exceed 1000".to_string(),
438 ));
439 }
440 query_params.push(format!("pageSize={}", size));
441 }
442
443 if !query_params.is_empty() {
444 path = format!("{}?{}", path, query_params.join("&"));
445 }
446
447 info!(
448 "Getting instruments for category: {} (page: {:?}, size: {:?})",
449 category_id, page_number, page_size
450 );
451 let result: CategoryInstrumentsResponse = self.http_client.get(&path, Some(1)).await?;
452 debug!(
453 "{} instruments found in category {}",
454 result.instruments.len(),
455 category_id
456 );
457 Ok(result)
458 }
459}
460
461#[async_trait]
462impl AccountService for Client {
463 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
464 info!("Getting account information");
465 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
466 debug!(
467 "Account information obtained: {} accounts",
468 result.accounts.len()
469 );
470 Ok(result)
471 }
472
473 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
474 debug!("Getting open positions");
475 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
476 debug!("Positions obtained: {} positions", result.positions.len());
477 Ok(result)
478 }
479
480 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
481 debug!("Getting open positions with filter: {}", filter);
482 let mut positions = self.get_positions().await?;
483
484 positions
485 .positions
486 .retain(|position| position.market.epic.contains(filter));
487
488 debug!(
489 "Positions obtained after filtering: {} positions",
490 positions.positions.len()
491 );
492 Ok(positions)
493 }
494
495 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
496 info!("Getting working orders");
497 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
498 debug!(
499 "Working orders obtained: {} orders",
500 result.working_orders.len()
501 );
502 Ok(result)
503 }
504
505 async fn get_activity(
506 &self,
507 from: &str,
508 to: &str,
509 ) -> Result<AccountActivityResponse, AppError> {
510 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
511 info!("Getting account activity");
512 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
513 debug!(
514 "Account activity obtained: {} activities",
515 result.activities.len()
516 );
517 Ok(result)
518 }
519
520 async fn get_activity_with_details(
521 &self,
522 from: &str,
523 to: &str,
524 ) -> Result<AccountActivityResponse, AppError> {
525 let path = format!(
526 "history/activity?from={}&to={}&detailed=true&pageSize=500",
527 from, to
528 );
529 info!("Getting detailed account activity");
530 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
531 debug!(
532 "Detailed account activity obtained: {} activities",
533 result.activities.len()
534 );
535 Ok(result)
536 }
537
538 async fn get_transactions(
539 &self,
540 from: &str,
541 to: &str,
542 ) -> Result<TransactionHistoryResponse, AppError> {
543 const PAGE_SIZE: u32 = 200;
544 let mut all_transactions = Vec::new();
545 let mut current_page = 1;
546 #[allow(unused_assignments)]
547 let mut last_metadata = None;
548
549 loop {
550 let path = format!(
551 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
552 from, to, PAGE_SIZE, current_page
553 );
554 info!("Getting transaction history page {}", current_page);
555
556 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
557
558 let total_pages = result.metadata.page_data.total_pages as u32;
559 last_metadata = Some(result.metadata);
560 all_transactions.extend(result.transactions);
561
562 if current_page >= total_pages {
563 break;
564 }
565 current_page += 1;
566 }
567
568 debug!(
569 "Total transaction history obtained: {} transactions",
570 all_transactions.len()
571 );
572
573 Ok(TransactionHistoryResponse {
574 transactions: all_transactions,
575 metadata: last_metadata
576 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
577 })
578 }
579}
580
581#[async_trait]
582impl OrderService for Client {
583 async fn create_order(
584 &self,
585 order: &CreateOrderRequest,
586 ) -> Result<CreateOrderResponse, AppError> {
587 info!("Creating order for: {}", order.epic);
588 let result: CreateOrderResponse = self
589 .http_client
590 .post("positions/otc", order, Some(2))
591 .await?;
592 debug!("Order created with reference: {}", result.deal_reference);
593 Ok(result)
594 }
595
596 async fn get_order_confirmation(
597 &self,
598 deal_reference: &str,
599 ) -> Result<OrderConfirmationResponse, AppError> {
600 let path = format!("confirms/{}", deal_reference);
601 info!("Getting confirmation for order: {}", deal_reference);
602 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
603 debug!("Confirmation obtained for order: {}", deal_reference);
604 Ok(result)
605 }
606
607 async fn get_order_confirmation_w_retry(
608 &self,
609 deal_reference: &str,
610 retries: u64,
611 delay_ms: u64,
612 ) -> Result<OrderConfirmationResponse, AppError> {
613 let mut attempts = 0;
614 loop {
615 match self.get_order_confirmation(deal_reference).await {
616 Ok(response) => return Ok(response),
617 Err(e) => {
618 attempts += 1;
619 if attempts > retries {
620 return Err(e);
621 }
622 warn!(
623 "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
624 attempts, retries, e, delay_ms
625 );
626 sleep(Duration::from_millis(delay_ms)).await;
627 }
628 }
629 }
630 }
631
632 async fn update_position(
633 &self,
634 deal_id: &str,
635 update: &UpdatePositionRequest,
636 ) -> Result<UpdatePositionResponse, AppError> {
637 let path = format!("positions/otc/{}", deal_id);
638 info!("Updating position: {}", deal_id);
639 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
640 debug!(
641 "Position updated: {} with deal reference: {}",
642 deal_id, result.deal_reference
643 );
644 Ok(result)
645 }
646
647 async fn update_level_in_position(
648 &self,
649 deal_id: &str,
650 limit_level: Option<f64>,
651 ) -> Result<UpdatePositionResponse, AppError> {
652 let path = format!("positions/otc/{}", deal_id);
653 info!("Updating position: {}", deal_id);
654 let limit_level = limit_level.unwrap_or(0.0);
655
656 let update: UpdatePositionRequest = UpdatePositionRequest {
657 guaranteed_stop: None,
658 limit_level: Some(limit_level),
659 stop_level: None,
660 trailing_stop: None,
661 trailing_stop_distance: None,
662 trailing_stop_increment: None,
663 };
664 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
665 debug!(
666 "Position updated: {} with deal reference: {}",
667 deal_id, result.deal_reference
668 );
669 Ok(result)
670 }
671
672 async fn close_position(
673 &self,
674 close_request: &ClosePositionRequest,
675 ) -> Result<ClosePositionResponse, AppError> {
676 info!("Closing position");
677
678 let result: ClosePositionResponse = self
681 .http_client
682 .post_with_delete_method("positions/otc", close_request, Some(1))
683 .await?;
684
685 debug!("Position closed with reference: {}", result.deal_reference);
686 Ok(result)
687 }
688
689 async fn create_working_order(
690 &self,
691 order: &CreateWorkingOrderRequest,
692 ) -> Result<CreateWorkingOrderResponse, AppError> {
693 info!("Creating working order for: {}", order.epic);
694 let result: CreateWorkingOrderResponse = self
695 .http_client
696 .post("workingorders/otc", order, Some(2))
697 .await?;
698 debug!(
699 "Working order created with reference: {}",
700 result.deal_reference
701 );
702 Ok(result)
703 }
704
705 async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
706 let path = format!("workingorders/otc/{}", deal_id);
707 let result: CreateWorkingOrderResponse =
708 self.http_client.delete(path.as_str(), Some(2)).await?;
709 debug!(
710 "Working order created with reference: {}",
711 result.deal_reference
712 );
713 Ok(())
714 }
715}
716
717pub struct StreamerClient {
727 account_id: String,
728 market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
729 price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
730 has_market_stream_subs: bool,
732 has_price_stream_subs: bool,
733}
734
735impl StreamerClient {
736 pub async fn new() -> Result<Self, AppError> {
745 let http_client_raw = Arc::new(RwLock::new(Client::new()));
746 let http_client = http_client_raw.read().await;
747 let ws_info = http_client.get_ws_info().await;
748 let password = ws_info.get_ws_password();
749
750 let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
752 Some(ws_info.server.as_str()),
753 None,
754 Some(&ws_info.account_id),
755 Some(&password),
756 )?));
757
758 let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
759 Some(ws_info.server.as_str()),
760 None,
761 Some(&ws_info.account_id),
762 Some(&password),
763 )?));
764
765 {
768 let mut client = market_streamer_client.lock().await;
769 client
770 .connection_options
771 .set_forced_transport(Some(Transport::WsStreaming));
772 client.set_logging_type(LogType::TracingLogs);
773 }
774 {
775 let mut client = price_streamer_client.lock().await;
776 client
777 .connection_options
778 .set_forced_transport(Some(Transport::WsStreaming));
779 client.set_logging_type(LogType::TracingLogs);
780 }
781
782 Ok(Self {
783 account_id: ws_info.account_id.clone(),
784 market_streamer_client: Some(market_streamer_client),
785 price_streamer_client: Some(price_streamer_client),
786 has_market_stream_subs: false,
787 has_price_stream_subs: false,
788 })
789 }
790
791 pub async fn default() -> Result<Self, AppError> {
793 Self::new().await
794 }
795
796 pub async fn market_subscribe(
826 &mut self,
827 epics: Vec<String>,
828 fields: HashSet<StreamingMarketField>,
829 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
830 self.has_market_stream_subs = true;
832
833 let fields = get_streaming_market_fields(&fields);
834 let market_epics: Vec<String> = epics
835 .iter()
836 .map(|epic| "MARKET:".to_string() + epic)
837 .collect();
838 let mut subscription =
839 Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
840
841 subscription.set_data_adapter(None)?;
842 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
843
844 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
846 subscription.add_listener(Box::new(listener));
847
848 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
850 AppError::WebSocketError("market streamer client not initialized".to_string())
851 })?;
852
853 {
854 let mut client = client.lock().await;
855 client
856 .connection_options
857 .set_forced_transport(Some(Transport::WsStreaming));
858 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
859 }
860
861 let (price_tx, price_rx) = mpsc::unbounded_channel();
863 tokio::spawn(async move {
864 let mut receiver = item_receiver;
865 while let Some(item_update) = receiver.recv().await {
866 let price_data = PriceData::from(&item_update);
867 let _ = price_tx.send(price_data);
868 }
869 });
870
871 info!(
872 "Market subscription created for {} instruments",
873 epics.len()
874 );
875 Ok(price_rx)
876 }
877
878 pub async fn trade_subscribe(
901 &mut self,
902 ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
903 self.has_market_stream_subs = true;
905
906 let account_id = self.account_id.clone();
907 let fields = Some(vec![
908 "CONFIRMS".to_string(),
909 "OPU".to_string(),
910 "WOU".to_string(),
911 ]);
912 let trade_items = vec![format!("TRADE:{account_id}")];
913
914 let mut subscription =
915 Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
916
917 subscription.set_data_adapter(None)?;
918 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
919
920 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
922 subscription.add_listener(Box::new(listener));
923
924 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
926 AppError::WebSocketError("market streamer client not initialized".to_string())
927 })?;
928
929 {
930 let mut client = client.lock().await;
931 client
932 .connection_options
933 .set_forced_transport(Some(Transport::WsStreaming));
934 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
935 }
936
937 let (trade_tx, trade_rx) = mpsc::unbounded_channel();
939 tokio::spawn(async move {
940 let mut receiver = item_receiver;
941 while let Some(item_update) = receiver.recv().await {
942 let trade_data = crate::presentation::trade::TradeData::from(&item_update);
943 let _ = trade_tx.send(trade_data.fields);
944 }
945 });
946
947 info!("Trade subscription created for account: {}", account_id);
948 Ok(trade_rx)
949 }
950
951 pub async fn account_subscribe(
978 &mut self,
979 fields: HashSet<StreamingAccountDataField>,
980 ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
981 self.has_market_stream_subs = true;
983
984 let fields = get_streaming_account_data_fields(&fields);
985 let account_id = self.account_id.clone();
986 let account_items = vec![format!("ACCOUNT:{account_id}")];
987
988 let mut subscription =
989 Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
990
991 subscription.set_data_adapter(None)?;
992 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
993
994 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
996 subscription.add_listener(Box::new(listener));
997
998 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1000 AppError::WebSocketError("market streamer client not initialized".to_string())
1001 })?;
1002
1003 {
1004 let mut client = client.lock().await;
1005 client
1006 .connection_options
1007 .set_forced_transport(Some(Transport::WsStreaming));
1008 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1009 }
1010
1011 let (account_tx, account_rx) = mpsc::unbounded_channel();
1013 tokio::spawn(async move {
1014 let mut receiver = item_receiver;
1015 while let Some(item_update) = receiver.recv().await {
1016 let account_data = crate::presentation::account::AccountData::from(&item_update);
1017 let _ = account_tx.send(account_data.fields);
1018 }
1019 });
1020
1021 info!("Account subscription created for account: {}", account_id);
1022 Ok(account_rx)
1023 }
1024
1025 pub async fn price_subscribe(
1056 &mut self,
1057 epics: Vec<String>,
1058 fields: HashSet<StreamingPriceField>,
1059 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1060 self.has_price_stream_subs = true;
1062
1063 let fields = get_streaming_price_fields(&fields);
1064 let account_id = self.account_id.clone();
1065 let price_epics: Vec<String> = epics
1066 .iter()
1067 .map(|epic| format!("PRICE:{account_id}:{epic}"))
1068 .collect();
1069
1070 tracing::debug!("Pricing subscribe items: {:?}", price_epics);
1072 tracing::debug!("Pricing subscribe fields: {:?}", fields);
1073
1074 let mut subscription =
1075 Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1076
1077 let pricing_adapter =
1079 std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1080 tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1081 subscription.set_data_adapter(Some(pricing_adapter))?;
1082 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1083
1084 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1086 subscription.add_listener(Box::new(listener));
1087
1088 let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1090 AppError::WebSocketError("price streamer client not initialized".to_string())
1091 })?;
1092
1093 {
1094 let mut client = client.lock().await;
1095 client
1096 .connection_options
1097 .set_forced_transport(Some(Transport::WsStreaming));
1098 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1099 }
1100
1101 let (price_tx, price_rx) = mpsc::unbounded_channel();
1103 tokio::spawn(async move {
1104 let mut receiver = item_receiver;
1105 while let Some(item_update) = receiver.recv().await {
1106 let price_data = PriceData::from(&item_update);
1107 let _ = price_tx.send(price_data);
1108 }
1109 });
1110
1111 info!(
1112 "Price subscription created for {} instruments (account: {})",
1113 epics.len(),
1114 account_id
1115 );
1116 Ok(price_rx)
1117 }
1118
1119 pub async fn chart_subscribe(
1152 &mut self,
1153 epics: Vec<String>,
1154 scale: ChartScale,
1155 fields: HashSet<StreamingChartField>,
1156 ) -> Result<mpsc::UnboundedReceiver<ChartData>, AppError> {
1157 self.has_market_stream_subs = true;
1159
1160 let fields = get_streaming_chart_fields(&fields);
1161
1162 let chart_items: Vec<String> = epics
1163 .iter()
1164 .map(|epic| format!("CHART:{epic}:{scale}",))
1165 .collect();
1166
1167 let mode = if matches!(scale, ChartScale::Tick) {
1169 SubscriptionMode::Distinct
1170 } else {
1171 SubscriptionMode::Merge
1172 };
1173
1174 let mut subscription = Subscription::new(mode, Some(chart_items), Some(fields))?;
1175
1176 subscription.set_data_adapter(None)?;
1177 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1178
1179 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1181 subscription.add_listener(Box::new(listener));
1182
1183 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1185 AppError::WebSocketError("market streamer client not initialized".to_string())
1186 })?;
1187
1188 {
1189 let mut client = client.lock().await;
1190 client
1191 .connection_options
1192 .set_forced_transport(Some(Transport::WsStreaming));
1193 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1194 }
1195
1196 let (chart_tx, chart_rx) = mpsc::unbounded_channel();
1198 tokio::spawn(async move {
1199 let mut receiver = item_receiver;
1200 while let Some(item_update) = receiver.recv().await {
1201 let chart_data = ChartData::from(&item_update);
1202 let _ = chart_tx.send(chart_data);
1203 }
1204 });
1205
1206 info!(
1207 "Chart subscription created for {} instruments (scale: {})",
1208 epics.len(),
1209 scale
1210 );
1211
1212 Ok(chart_rx)
1213 }
1214
1215 pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1232 let signal = if let Some(sig) = shutdown_signal {
1234 sig
1235 } else {
1236 let sig = Arc::new(Notify::new());
1237 setup_signal_hook(Arc::clone(&sig)).await;
1238 sig
1239 };
1240
1241 let mut tasks = Vec::new();
1242
1243 if self.has_market_stream_subs {
1245 if let Some(client) = self.market_streamer_client.as_ref() {
1246 let client = Arc::clone(client);
1247 let signal = Arc::clone(&signal);
1248 let task =
1249 tokio::spawn(
1250 async move { Self::connect_client(client, signal, "Market").await },
1251 );
1252 tasks.push(task);
1253 }
1254 } else {
1255 info!("Skipping Market streamer connection: no active subscriptions");
1256 }
1257
1258 if self.has_price_stream_subs {
1260 if let Some(client) = self.price_streamer_client.as_ref() {
1261 let client = Arc::clone(client);
1262 let signal = Arc::clone(&signal);
1263 let task =
1264 tokio::spawn(
1265 async move { Self::connect_client(client, signal, "Price").await },
1266 );
1267 tasks.push(task);
1268 }
1269 } else {
1270 info!("Skipping Price streamer connection: no active subscriptions");
1271 }
1272
1273 if tasks.is_empty() {
1274 warn!("No streaming clients selected for connection (no active subscriptions)");
1275 return Ok(());
1276 }
1277
1278 info!("Connecting {} streaming client(s)...", tasks.len());
1279
1280 let results = futures::future::join_all(tasks).await;
1282
1283 let mut has_error = false;
1285 for (idx, result) in results.iter().enumerate() {
1286 match result {
1287 Ok(Ok(_)) => {
1288 debug!("Streaming client {} completed successfully", idx);
1289 }
1290 Ok(Err(e)) => {
1291 error!("Streaming client {} failed: {:?}", idx, e);
1292 has_error = true;
1293 }
1294 Err(e) => {
1295 error!("Streaming client {} task panicked: {:?}", idx, e);
1296 has_error = true;
1297 }
1298 }
1299 }
1300
1301 if has_error {
1302 return Err(AppError::WebSocketError(
1303 "one or more streaming connections failed".to_string(),
1304 ));
1305 }
1306
1307 info!("All streaming connections closed gracefully");
1308 Ok(())
1309 }
1310
1311 async fn connect_client(
1313 client: Arc<Mutex<LightstreamerClient>>,
1314 signal: Arc<Notify>,
1315 client_type: &str,
1316 ) -> Result<(), AppError> {
1317 let mut retry_interval_millis: u64 = 0;
1318 let mut retry_counter: u64 = 0;
1319
1320 while retry_counter < MAX_CONNECTION_ATTEMPTS {
1321 let connect_result = {
1322 let mut client = client.lock().await;
1323 client.connect_direct(Arc::clone(&signal)).await
1324 };
1325
1326 let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1328
1329 match result_with_string_error {
1330 Ok(_) => {
1331 info!("{} streamer connected successfully", client_type);
1332 break;
1333 }
1334 Err(error_msg) => {
1335 if error_msg.contains("No more requests to fulfill") {
1337 info!(
1338 "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1339 client_type
1340 );
1341 return Ok(());
1342 }
1343
1344 error!("{} streamer connection failed: {}", client_type, error_msg);
1345
1346 if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1347 tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1348 .await;
1349 retry_interval_millis =
1350 (retry_interval_millis + (200 * retry_counter)).min(5000);
1351 retry_counter += 1;
1352 warn!(
1353 "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1354 client_type,
1355 retry_counter + 1,
1356 MAX_CONNECTION_ATTEMPTS,
1357 retry_interval_millis as f64 / 1000.0
1358 );
1359 } else {
1360 retry_counter += 1;
1361 }
1362 }
1363 }
1364 }
1365
1366 if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1367 error!(
1368 "{} streamer failed after {} attempts",
1369 client_type, MAX_CONNECTION_ATTEMPTS
1370 );
1371 return Err(AppError::WebSocketError(format!(
1372 "{} streamer: maximum connection attempts ({}) exceeded",
1373 client_type, MAX_CONNECTION_ATTEMPTS
1374 )));
1375 }
1376
1377 info!("{} streamer connection closed gracefully", client_type);
1378 Ok(())
1379 }
1380
1381 pub async fn disconnect(&mut self) -> Result<(), AppError> {
1389 let mut disconnected = 0;
1390
1391 if let Some(client) = self.market_streamer_client.as_ref() {
1392 let mut client = client.lock().await;
1393 client.disconnect().await;
1394 info!("Market streamer disconnected");
1395 disconnected += 1;
1396 }
1397
1398 if let Some(client) = self.price_streamer_client.as_ref() {
1399 let mut client = client.lock().await;
1400 client.disconnect().await;
1401 info!("Price streamer disconnected");
1402 disconnected += 1;
1403 }
1404
1405 info!("Disconnected {} streaming client(s)", disconnected);
1406 Ok(())
1407 }
1408}