1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17 CategoriesResponse, CategoryInstrumentsResponse, DBEntryResponse, HistoricalPricesResponse,
18 MarketNavigationResponse, MarketSearchResponse, MultipleMarketDetailsResponse,
19};
20use crate::model::responses::{
21 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
22};
23use crate::model::streaming::{
24 StreamingAccountDataField, StreamingChartField, StreamingMarketField, StreamingPriceField,
25 get_streaming_account_data_fields, get_streaming_chart_fields, get_streaming_market_fields,
26 get_streaming_price_fields,
27};
28use crate::prelude::{
29 AccountActivityResponse, AccountFields, AccountsResponse, ChartData, ChartScale,
30 OrderConfirmationResponse, PositionsResponse, TradeFields, TransactionHistoryResponse,
31 WorkingOrdersResponse,
32};
33use crate::presentation::market::{MarketData, MarketDetails};
34use crate::presentation::price::PriceData;
35use async_trait::async_trait;
36use lightstreamer_rs::client::{LightstreamerClient, Transport};
37use lightstreamer_rs::subscription::{
38 ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
39};
40use lightstreamer_rs::utils::setup_signal_hook;
41use serde_json::Value;
42use std::collections::HashSet;
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::{Mutex, Notify, RwLock, mpsc};
46use tokio::time::sleep;
47use tracing::{debug, error, info, warn};
48
49const MAX_CONNECTION_ATTEMPTS: u64 = 3;
50
51pub struct Client {
56 http_client: Arc<HttpClient>,
57}
58
59impl Client {
60 pub fn new() -> Self {
65 let http_client = Arc::new(HttpClient::default());
66 Self { http_client }
67 }
68
69 pub async fn get_ws_info(&self) -> WebsocketInfo {
74 self.http_client.get_ws_info().await
75 }
76}
77
78impl Default for Client {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84#[async_trait]
85impl MarketService for Client {
86 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
87 let path = format!("markets?searchTerm={}", search_term);
88 info!("Searching markets with term: {}", search_term);
89 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
90 debug!("{} markets found", result.markets.len());
91 Ok(result)
92 }
93
94 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
95 let path = format!("markets/{epic}");
96 info!("Getting market details: {}", epic);
97 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
98 let market_details: MarketDetails = serde_json::from_value(market_value)?;
99 debug!("Market details obtained for: {}", epic);
100 Ok(market_details)
101 }
102
103 async fn get_multiple_market_details(
104 &self,
105 epics: &[String],
106 ) -> Result<MultipleMarketDetailsResponse, AppError> {
107 if epics.is_empty() {
108 return Ok(MultipleMarketDetailsResponse::default());
109 } else if epics.len() > 50 {
110 return Err(AppError::InvalidInput(
111 "The maximum number of EPICs is 50".to_string(),
112 ));
113 }
114
115 let epics_str = epics.join(",");
116 let path = format!("markets?epics={}", epics_str);
117 debug!(
118 "Getting market details for {} EPICs in a batch",
119 epics.len()
120 );
121
122 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
123
124 Ok(response)
125 }
126
127 async fn get_historical_prices(
128 &self,
129 epic: &str,
130 resolution: &str,
131 from: &str,
132 to: &str,
133 ) -> Result<HistoricalPricesResponse, AppError> {
134 let path = format!(
135 "prices/{}?resolution={}&from={}&to={}",
136 epic, resolution, from, to
137 );
138 info!("Getting historical prices for: {}", epic);
139 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
140 debug!("Historical prices obtained for: {}", epic);
141 Ok(result)
142 }
143
144 async fn get_historical_prices_by_date_range(
145 &self,
146 epic: &str,
147 resolution: &str,
148 start_date: &str,
149 end_date: &str,
150 ) -> Result<HistoricalPricesResponse, AppError> {
151 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
152 info!(
153 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
154 epic, resolution, start_date, end_date
155 );
156 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
157 debug!(
158 "Historical prices obtained for epic: {}, {} data points",
159 epic,
160 result.prices.len()
161 );
162 Ok(result)
163 }
164
165 async fn get_recent_prices(
166 &self,
167 params: &RecentPricesRequest<'_>,
168 ) -> Result<HistoricalPricesResponse, AppError> {
169 let mut query_params = Vec::new();
170
171 if let Some(res) = params.resolution {
172 query_params.push(format!("resolution={}", res));
173 }
174 if let Some(f) = params.from {
175 query_params.push(format!("from={}", f));
176 }
177 if let Some(t) = params.to {
178 query_params.push(format!("to={}", t));
179 }
180 if let Some(max) = params.max_points {
181 query_params.push(format!("max={}", max));
182 }
183 if let Some(size) = params.page_size {
184 query_params.push(format!("pageSize={}", size));
185 }
186 if let Some(num) = params.page_number {
187 query_params.push(format!("pageNumber={}", num));
188 }
189
190 let query_string = if query_params.is_empty() {
191 String::new()
192 } else {
193 format!("?{}", query_params.join("&"))
194 };
195
196 let path = format!("prices/{}{}", params.epic, query_string);
197 info!("Getting recent prices for epic: {}", params.epic);
198 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
199 debug!(
200 "Recent prices obtained for epic: {}, {} data points",
201 params.epic,
202 result.prices.len()
203 );
204 Ok(result)
205 }
206
207 async fn get_historical_prices_by_count_v1(
208 &self,
209 epic: &str,
210 resolution: &str,
211 num_points: i32,
212 ) -> Result<HistoricalPricesResponse, AppError> {
213 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
214 info!(
215 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
216 epic, resolution, num_points
217 );
218 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
219 debug!(
220 "Historical prices (v1) obtained for epic: {}, {} data points",
221 epic,
222 result.prices.len()
223 );
224 Ok(result)
225 }
226
227 async fn get_historical_prices_by_count_v2(
228 &self,
229 epic: &str,
230 resolution: &str,
231 num_points: i32,
232 ) -> Result<HistoricalPricesResponse, AppError> {
233 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
234 info!(
235 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
236 epic, resolution, num_points
237 );
238 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
239 debug!(
240 "Historical prices (v2) obtained for epic: {}, {} data points",
241 epic,
242 result.prices.len()
243 );
244 Ok(result)
245 }
246
247 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
248 let path = "marketnavigation";
249 info!("Getting top-level market navigation nodes");
250 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
251 debug!("{} navigation nodes found", result.nodes.len());
252 debug!("{} markets found at root level", result.markets.len());
253 Ok(result)
254 }
255
256 async fn get_market_navigation_node(
257 &self,
258 node_id: &str,
259 ) -> Result<MarketNavigationResponse, AppError> {
260 let path = format!("marketnavigation/{}", node_id);
261 info!("Getting market navigation node: {}", node_id);
262 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
263 debug!("{} child nodes found", result.nodes.len());
264 debug!("{} markets found in node {}", result.markets.len(), node_id);
265 Ok(result)
266 }
267
268 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
269 let max_depth = 6;
270 info!(
271 "Starting comprehensive market hierarchy traversal (max {} levels)",
272 max_depth
273 );
274
275 let root_response = self.get_market_navigation().await?;
276 info!(
277 "Root navigation: {} nodes, {} markets at top level",
278 root_response.nodes.len(),
279 root_response.markets.len()
280 );
281
282 let mut all_markets = root_response.markets.clone();
283 let mut nodes_to_process = root_response.nodes.clone();
284 let mut processed_levels = 0;
285
286 while !nodes_to_process.is_empty() && processed_levels < max_depth {
287 let mut next_level_nodes = Vec::new();
288 let mut level_market_count = 0;
289
290 info!(
291 "Processing level {} with {} nodes",
292 processed_levels,
293 nodes_to_process.len()
294 );
295
296 for node in &nodes_to_process {
297 match self.get_market_navigation_node(&node.id).await {
298 Ok(node_response) => {
299 let node_markets = node_response.markets.len();
300 let node_children = node_response.nodes.len();
301
302 if node_markets > 0 || node_children > 0 {
303 debug!(
304 "Node '{}' (level {}): {} markets, {} child nodes",
305 node.name, processed_levels, node_markets, node_children
306 );
307 }
308
309 all_markets.extend(node_response.markets);
310 level_market_count += node_markets;
311 next_level_nodes.extend(node_response.nodes);
312 }
313 Err(e) => {
314 tracing::error!(
315 "Failed to get markets for node '{}' at level {}: {:?}",
316 node.name,
317 processed_levels,
318 e
319 );
320 }
321 }
322 }
323
324 info!(
325 "Level {} completed: {} markets found, {} nodes for next level",
326 processed_levels,
327 level_market_count,
328 next_level_nodes.len()
329 );
330
331 nodes_to_process = next_level_nodes;
332 processed_levels += 1;
333 }
334
335 info!(
336 "Market hierarchy traversal completed: {} total markets found across {} levels",
337 all_markets.len(),
338 processed_levels
339 );
340
341 Ok(all_markets)
342 }
343
344 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
345 info!("Getting all markets from hierarchy for DB entries");
346
347 let all_markets = self.get_all_markets().await?;
348 info!("Collected {} markets from hierarchy", all_markets.len());
349
350 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
351 .iter()
352 .map(DBEntryResponse::from)
353 .filter(|entry| !entry.epic.is_empty())
354 .collect();
355
356 info!("Created {} DB entries from markets", vec_db_entries.len());
357
358 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
360 .iter()
361 .map(|entry| entry.symbol.clone())
362 .filter(|symbol| !symbol.is_empty())
363 .collect();
364
365 info!(
366 "Found {} unique symbols to fetch expiry dates for",
367 unique_symbols.len()
368 );
369
370 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
371 std::collections::HashMap::new();
372
373 for symbol in unique_symbols {
374 if let Some(entry) = vec_db_entries
375 .iter()
376 .find(|e| e.symbol == symbol && !e.epic.is_empty())
377 {
378 match self.get_market_details(&entry.epic).await {
379 Ok(market_details) => {
380 let expiry_date = market_details
381 .instrument
382 .expiry_details
383 .as_ref()
384 .map(|details| details.last_dealing_date.clone())
385 .unwrap_or_else(|| market_details.instrument.expiry.clone());
386
387 symbol_expiry_map.insert(symbol.clone(), expiry_date);
388 if let Some(expiry) = symbol_expiry_map.get(&symbol) {
389 info!("Fetched expiry date for symbol {}: {}", symbol, expiry);
390 }
391 }
392 Err(e) => {
393 tracing::error!(
394 "Failed to get market details for epic {} (symbol {}): {:?}",
395 entry.epic,
396 symbol,
397 e
398 );
399 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
400 }
401 }
402 }
403 }
404
405 for entry in &mut vec_db_entries {
406 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
407 entry.expiry = expiry_date.clone();
408 }
409 }
410
411 info!("Updated expiry dates for {} entries", vec_db_entries.len());
412 Ok(vec_db_entries)
413 }
414
415 async fn get_categories(&self) -> Result<CategoriesResponse, AppError> {
416 info!("Getting all categories of instruments");
417 let result: CategoriesResponse = self.http_client.get("categories", Some(1)).await?;
418 debug!("{} categories found", result.categories.len());
419 Ok(result)
420 }
421
422 async fn get_category_instruments(
423 &self,
424 category_id: &str,
425 page_number: Option<i32>,
426 page_size: Option<i32>,
427 ) -> Result<CategoryInstrumentsResponse, AppError> {
428 let mut path = format!("categories/{}/instruments", category_id);
429
430 let mut query_params = Vec::new();
431 if let Some(page) = page_number {
432 query_params.push(format!("pageNumber={}", page));
433 }
434 if let Some(size) = page_size {
435 if size > 1000 {
436 return Err(AppError::InvalidInput(
437 "pageSize cannot exceed 1000".to_string(),
438 ));
439 }
440 query_params.push(format!("pageSize={}", size));
441 }
442
443 if !query_params.is_empty() {
444 path = format!("{}?{}", path, query_params.join("&"));
445 }
446
447 info!(
448 "Getting instruments for category: {} (page: {:?}, size: {:?})",
449 category_id, page_number, page_size
450 );
451 let result: CategoryInstrumentsResponse = self.http_client.get(&path, Some(1)).await?;
452 debug!(
453 "{} instruments found in category {}",
454 result.instruments.len(),
455 category_id
456 );
457 Ok(result)
458 }
459}
460
461#[async_trait]
462impl AccountService for Client {
463 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
464 info!("Getting account information");
465 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
466 debug!(
467 "Account information obtained: {} accounts",
468 result.accounts.len()
469 );
470 Ok(result)
471 }
472
473 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
474 debug!("Getting open positions");
475 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
476 debug!("Positions obtained: {} positions", result.positions.len());
477 Ok(result)
478 }
479
480 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
481 debug!("Getting open positions with filter: {}", filter);
482 let mut positions = self.get_positions().await?;
483
484 positions
485 .positions
486 .retain(|position| position.market.epic.contains(filter));
487
488 debug!(
489 "Positions obtained after filtering: {} positions",
490 positions.positions.len()
491 );
492 Ok(positions)
493 }
494
495 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
496 info!("Getting working orders");
497 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
498 debug!(
499 "Working orders obtained: {} orders",
500 result.working_orders.len()
501 );
502 Ok(result)
503 }
504
505 async fn get_activity(
506 &self,
507 from: &str,
508 to: &str,
509 ) -> Result<AccountActivityResponse, AppError> {
510 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
511 info!("Getting account activity");
512 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
513 debug!(
514 "Account activity obtained: {} activities",
515 result.activities.len()
516 );
517 Ok(result)
518 }
519
520 async fn get_activity_with_details(
521 &self,
522 from: &str,
523 to: &str,
524 ) -> Result<AccountActivityResponse, AppError> {
525 let path = format!(
526 "history/activity?from={}&to={}&detailed=true&pageSize=500",
527 from, to
528 );
529 info!("Getting detailed account activity");
530 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
531 debug!(
532 "Detailed account activity obtained: {} activities",
533 result.activities.len()
534 );
535 Ok(result)
536 }
537
538 async fn get_transactions(
539 &self,
540 from: &str,
541 to: &str,
542 ) -> Result<TransactionHistoryResponse, AppError> {
543 const PAGE_SIZE: u32 = 200;
544 let mut all_transactions = Vec::new();
545 let mut current_page = 1;
546 #[allow(unused_assignments)]
547 let mut last_metadata = None;
548
549 loop {
550 let path = format!(
551 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
552 from, to, PAGE_SIZE, current_page
553 );
554 info!("Getting transaction history page {}", current_page);
555
556 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
557
558 let total_pages = result.metadata.page_data.total_pages as u32;
559 last_metadata = Some(result.metadata);
560 all_transactions.extend(result.transactions);
561
562 if current_page >= total_pages {
563 break;
564 }
565 current_page += 1;
566 }
567
568 debug!(
569 "Total transaction history obtained: {} transactions",
570 all_transactions.len()
571 );
572
573 Ok(TransactionHistoryResponse {
574 transactions: all_transactions,
575 metadata: last_metadata
576 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
577 })
578 }
579}
580
581#[async_trait]
582impl OrderService for Client {
583 async fn create_order(
584 &self,
585 order: &CreateOrderRequest,
586 ) -> Result<CreateOrderResponse, AppError> {
587 info!("Creating order for: {}", order.epic);
588 let result: CreateOrderResponse = self
589 .http_client
590 .post("positions/otc", order, Some(2))
591 .await?;
592 debug!("Order created with reference: {}", result.deal_reference);
593 Ok(result)
594 }
595
596 async fn get_order_confirmation(
597 &self,
598 deal_reference: &str,
599 ) -> Result<OrderConfirmationResponse, AppError> {
600 let path = format!("confirms/{}", deal_reference);
601 info!("Getting confirmation for order: {}", deal_reference);
602 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
603 debug!("Confirmation obtained for order: {}", deal_reference);
604 Ok(result)
605 }
606
607 async fn get_order_confirmation_w_retry(
608 &self,
609 deal_reference: &str,
610 retries: u64,
611 delay_ms: u64,
612 ) -> Result<OrderConfirmationResponse, AppError> {
613 let mut attempts = 0;
614 loop {
615 match self.get_order_confirmation(deal_reference).await {
616 Ok(response) => return Ok(response),
617 Err(e) => {
618 attempts += 1;
619 if attempts > retries {
620 return Err(e);
621 }
622 warn!(
623 "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
624 attempts, retries, e, delay_ms
625 );
626 sleep(Duration::from_millis(delay_ms)).await;
627 }
628 }
629 }
630 }
631
632 async fn update_position(
633 &self,
634 deal_id: &str,
635 update: &UpdatePositionRequest,
636 ) -> Result<UpdatePositionResponse, AppError> {
637 let path = format!("positions/otc/{}", deal_id);
638 info!("Updating position: {}", deal_id);
639 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
640 debug!(
641 "Position updated: {} with deal reference: {}",
642 deal_id, result.deal_reference
643 );
644 Ok(result)
645 }
646
647 async fn update_level_in_position(
648 &self,
649 deal_id: &str,
650 limit_level: Option<f64>,
651 ) -> Result<UpdatePositionResponse, AppError> {
652 let path = format!("positions/otc/{}", deal_id);
653 info!("Updating position: {}", deal_id);
654 let limit_level = limit_level.unwrap_or(0.0);
655
656 let update: UpdatePositionRequest = UpdatePositionRequest {
657 guaranteed_stop: None,
658 limit_level: Some(limit_level),
659 stop_level: None,
660 trailing_stop: None,
661 trailing_stop_distance: None,
662 trailing_stop_increment: None,
663 };
664 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
665 debug!(
666 "Position updated: {} with deal reference: {}",
667 deal_id, result.deal_reference
668 );
669 Ok(result)
670 }
671
672 async fn close_position(
673 &self,
674 close_request: &ClosePositionRequest,
675 ) -> Result<ClosePositionResponse, AppError> {
676 info!("Closing position");
677
678 let result: ClosePositionResponse = self
681 .http_client
682 .post_with_delete_method("positions/otc", close_request, Some(1))
683 .await?;
684
685 debug!("Position closed with reference: {}", result.deal_reference);
686 Ok(result)
687 }
688
689 async fn create_working_order(
690 &self,
691 order: &CreateWorkingOrderRequest,
692 ) -> Result<CreateWorkingOrderResponse, AppError> {
693 info!("Creating working order for: {}", order.epic);
694 let result: CreateWorkingOrderResponse = self
695 .http_client
696 .post("workingorders/otc", order, Some(2))
697 .await?;
698 debug!(
699 "Working order created with reference: {}",
700 result.deal_reference
701 );
702 Ok(result)
703 }
704
705 async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
706 let path = format!("workingorders/otc/{}", deal_id);
707 let result: CreateWorkingOrderResponse =
708 self.http_client.delete(path.as_str(), Some(2)).await?;
709 debug!(
710 "Working order created with reference: {}",
711 result.deal_reference
712 );
713 Ok(())
714 }
715}
716
717pub struct StreamerClient {
727 account_id: String,
728 market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
729 price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
730 has_market_stream_subs: bool,
732 has_price_stream_subs: bool,
733}
734
735impl StreamerClient {
736 pub async fn new() -> Result<Self, AppError> {
745 let http_client_raw = Arc::new(RwLock::new(Client::new()));
746 let http_client = http_client_raw.read().await;
747 let ws_info = http_client.get_ws_info().await;
748 let password = ws_info.get_ws_password();
749
750 let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
752 Some(ws_info.server.as_str()),
753 None,
754 Some(&ws_info.account_id),
755 Some(&password),
756 )?));
757
758 let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
759 Some(ws_info.server.as_str()),
760 None,
761 Some(&ws_info.account_id),
762 Some(&password),
763 )?));
764
765 {
767 let mut client = market_streamer_client.lock().await;
768 client
769 .connection_options
770 .set_forced_transport(Some(Transport::WsStreaming));
771 }
772 {
773 let mut client = price_streamer_client.lock().await;
774 client
775 .connection_options
776 .set_forced_transport(Some(Transport::WsStreaming));
777 }
778
779 Ok(Self {
780 account_id: ws_info.account_id.clone(),
781 market_streamer_client: Some(market_streamer_client),
782 price_streamer_client: Some(price_streamer_client),
783 has_market_stream_subs: false,
784 has_price_stream_subs: false,
785 })
786 }
787
788 pub async fn default() -> Result<Self, AppError> {
790 Self::new().await
791 }
792
793 pub async fn market_subscribe(
823 &mut self,
824 epics: Vec<String>,
825 fields: HashSet<StreamingMarketField>,
826 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
827 self.has_market_stream_subs = true;
829
830 let fields = get_streaming_market_fields(&fields);
831 let market_epics: Vec<String> = epics
832 .iter()
833 .map(|epic| "MARKET:".to_string() + epic)
834 .collect();
835 let mut subscription =
836 Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
837
838 subscription.set_data_adapter(None)?;
839 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
840
841 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
843 subscription.add_listener(Box::new(listener));
844
845 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
847 AppError::WebSocketError("market streamer client not initialized".to_string())
848 })?;
849
850 {
851 let mut client = client.lock().await;
852 client
853 .connection_options
854 .set_forced_transport(Some(Transport::WsStreaming));
855 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
856 }
857
858 let (price_tx, price_rx) = mpsc::unbounded_channel();
860 tokio::spawn(async move {
861 let mut receiver = item_receiver;
862 while let Some(item_update) = receiver.recv().await {
863 let price_data = PriceData::from(&item_update);
864 let _ = price_tx.send(price_data);
865 }
866 });
867
868 info!(
869 "Market subscription created for {} instruments",
870 epics.len()
871 );
872 Ok(price_rx)
873 }
874
875 pub async fn trade_subscribe(
898 &mut self,
899 ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
900 self.has_market_stream_subs = true;
902
903 let account_id = self.account_id.clone();
904 let fields = Some(vec![
905 "CONFIRMS".to_string(),
906 "OPU".to_string(),
907 "WOU".to_string(),
908 ]);
909 let trade_items = vec![format!("TRADE:{account_id}")];
910
911 let mut subscription =
912 Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
913
914 subscription.set_data_adapter(None)?;
915 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
916
917 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
919 subscription.add_listener(Box::new(listener));
920
921 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
923 AppError::WebSocketError("market streamer client not initialized".to_string())
924 })?;
925
926 {
927 let mut client = client.lock().await;
928 client
929 .connection_options
930 .set_forced_transport(Some(Transport::WsStreaming));
931 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
932 }
933
934 let (trade_tx, trade_rx) = mpsc::unbounded_channel();
936 tokio::spawn(async move {
937 let mut receiver = item_receiver;
938 while let Some(item_update) = receiver.recv().await {
939 let trade_data = crate::presentation::trade::TradeData::from(&item_update);
940 let _ = trade_tx.send(trade_data.fields);
941 }
942 });
943
944 info!("Trade subscription created for account: {}", account_id);
945 Ok(trade_rx)
946 }
947
948 pub async fn account_subscribe(
975 &mut self,
976 fields: HashSet<StreamingAccountDataField>,
977 ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
978 self.has_market_stream_subs = true;
980
981 let fields = get_streaming_account_data_fields(&fields);
982 let account_id = self.account_id.clone();
983 let account_items = vec![format!("ACCOUNT:{account_id}")];
984
985 let mut subscription =
986 Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
987
988 subscription.set_data_adapter(None)?;
989 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
990
991 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
993 subscription.add_listener(Box::new(listener));
994
995 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
997 AppError::WebSocketError("market streamer client not initialized".to_string())
998 })?;
999
1000 {
1001 let mut client = client.lock().await;
1002 client
1003 .connection_options
1004 .set_forced_transport(Some(Transport::WsStreaming));
1005 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1006 }
1007
1008 let (account_tx, account_rx) = mpsc::unbounded_channel();
1010 tokio::spawn(async move {
1011 let mut receiver = item_receiver;
1012 while let Some(item_update) = receiver.recv().await {
1013 let account_data = crate::presentation::account::AccountData::from(&item_update);
1014 let _ = account_tx.send(account_data.fields);
1015 }
1016 });
1017
1018 info!("Account subscription created for account: {}", account_id);
1019 Ok(account_rx)
1020 }
1021
1022 pub async fn price_subscribe(
1053 &mut self,
1054 epics: Vec<String>,
1055 fields: HashSet<StreamingPriceField>,
1056 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1057 self.has_price_stream_subs = true;
1059
1060 let fields = get_streaming_price_fields(&fields);
1061 let account_id = self.account_id.clone();
1062 let price_epics: Vec<String> = epics
1063 .iter()
1064 .map(|epic| format!("PRICE:{account_id}:{epic}"))
1065 .collect();
1066
1067 tracing::debug!("Pricing subscribe items: {:?}", price_epics);
1069 tracing::debug!("Pricing subscribe fields: {:?}", fields);
1070
1071 let mut subscription =
1072 Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1073
1074 let pricing_adapter =
1076 std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1077 tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1078 subscription.set_data_adapter(Some(pricing_adapter))?;
1079 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1080
1081 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1083 subscription.add_listener(Box::new(listener));
1084
1085 let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1087 AppError::WebSocketError("price streamer client not initialized".to_string())
1088 })?;
1089
1090 {
1091 let mut client = client.lock().await;
1092 client
1093 .connection_options
1094 .set_forced_transport(Some(Transport::WsStreaming));
1095 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1096 }
1097
1098 let (price_tx, price_rx) = mpsc::unbounded_channel();
1100 tokio::spawn(async move {
1101 let mut receiver = item_receiver;
1102 while let Some(item_update) = receiver.recv().await {
1103 let price_data = PriceData::from(&item_update);
1104 let _ = price_tx.send(price_data);
1105 }
1106 });
1107
1108 info!(
1109 "Price subscription created for {} instruments (account: {})",
1110 epics.len(),
1111 account_id
1112 );
1113 Ok(price_rx)
1114 }
1115
1116 pub async fn chart_subscribe(
1149 &mut self,
1150 epics: Vec<String>,
1151 scale: ChartScale,
1152 fields: HashSet<StreamingChartField>,
1153 ) -> Result<mpsc::UnboundedReceiver<ChartData>, AppError> {
1154 self.has_market_stream_subs = true;
1156
1157 let fields = get_streaming_chart_fields(&fields);
1158
1159 let chart_items: Vec<String> = epics
1160 .iter()
1161 .map(|epic| format!("CHART:{epic}:{scale}",))
1162 .collect();
1163
1164 let mode = if matches!(scale, ChartScale::Tick) {
1166 SubscriptionMode::Distinct
1167 } else {
1168 SubscriptionMode::Merge
1169 };
1170
1171 let mut subscription = Subscription::new(mode, Some(chart_items), Some(fields))?;
1172
1173 subscription.set_data_adapter(None)?;
1174 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1175
1176 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1178 subscription.add_listener(Box::new(listener));
1179
1180 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1182 AppError::WebSocketError("market streamer client not initialized".to_string())
1183 })?;
1184
1185 {
1186 let mut client = client.lock().await;
1187 client
1188 .connection_options
1189 .set_forced_transport(Some(Transport::WsStreaming));
1190 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1191 }
1192
1193 let (chart_tx, chart_rx) = mpsc::unbounded_channel();
1195 tokio::spawn(async move {
1196 let mut receiver = item_receiver;
1197 while let Some(item_update) = receiver.recv().await {
1198 let chart_data = ChartData::from(&item_update);
1199 let _ = chart_tx.send(chart_data);
1200 }
1201 });
1202
1203 info!(
1204 "Chart subscription created for {} instruments (scale: {})",
1205 epics.len(),
1206 scale
1207 );
1208
1209 Ok(chart_rx)
1210 }
1211
1212 pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1229 let signal = if let Some(sig) = shutdown_signal {
1231 sig
1232 } else {
1233 let sig = Arc::new(Notify::new());
1234 setup_signal_hook(Arc::clone(&sig)).await;
1235 sig
1236 };
1237
1238 let mut tasks = Vec::new();
1239
1240 if self.has_market_stream_subs {
1242 if let Some(client) = self.market_streamer_client.as_ref() {
1243 let client = Arc::clone(client);
1244 let signal = Arc::clone(&signal);
1245 let task =
1246 tokio::spawn(
1247 async move { Self::connect_client(client, signal, "Market").await },
1248 );
1249 tasks.push(task);
1250 }
1251 } else {
1252 info!("Skipping Market streamer connection: no active subscriptions");
1253 }
1254
1255 if self.has_price_stream_subs {
1257 if let Some(client) = self.price_streamer_client.as_ref() {
1258 let client = Arc::clone(client);
1259 let signal = Arc::clone(&signal);
1260 let task =
1261 tokio::spawn(
1262 async move { Self::connect_client(client, signal, "Price").await },
1263 );
1264 tasks.push(task);
1265 }
1266 } else {
1267 info!("Skipping Price streamer connection: no active subscriptions");
1268 }
1269
1270 if tasks.is_empty() {
1271 warn!("No streaming clients selected for connection (no active subscriptions)");
1272 return Ok(());
1273 }
1274
1275 info!("Connecting {} streaming client(s)...", tasks.len());
1276
1277 let results = futures::future::join_all(tasks).await;
1279
1280 let mut has_error = false;
1282 for (idx, result) in results.iter().enumerate() {
1283 match result {
1284 Ok(Ok(_)) => {
1285 debug!("Streaming client {} completed successfully", idx);
1286 }
1287 Ok(Err(e)) => {
1288 error!("Streaming client {} failed: {:?}", idx, e);
1289 has_error = true;
1290 }
1291 Err(e) => {
1292 error!("Streaming client {} task panicked: {:?}", idx, e);
1293 has_error = true;
1294 }
1295 }
1296 }
1297
1298 if has_error {
1299 return Err(AppError::WebSocketError(
1300 "one or more streaming connections failed".to_string(),
1301 ));
1302 }
1303
1304 info!("All streaming connections closed gracefully");
1305 Ok(())
1306 }
1307
1308 async fn connect_client(
1310 client: Arc<Mutex<LightstreamerClient>>,
1311 signal: Arc<Notify>,
1312 client_type: &str,
1313 ) -> Result<(), AppError> {
1314 let mut retry_interval_millis: u64 = 0;
1315 let mut retry_counter: u64 = 0;
1316
1317 while retry_counter < MAX_CONNECTION_ATTEMPTS {
1318 let connect_result = {
1319 let mut client = client.lock().await;
1320 client.connect_direct(Arc::clone(&signal)).await
1321 };
1322
1323 let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1325
1326 match result_with_string_error {
1327 Ok(_) => {
1328 info!("{} streamer connected successfully", client_type);
1329 break;
1330 }
1331 Err(error_msg) => {
1332 if error_msg.contains("No more requests to fulfill") {
1334 info!(
1335 "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1336 client_type
1337 );
1338 return Ok(());
1339 }
1340
1341 error!("{} streamer connection failed: {}", client_type, error_msg);
1342
1343 if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1344 tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1345 .await;
1346 retry_interval_millis =
1347 (retry_interval_millis + (200 * retry_counter)).min(5000);
1348 retry_counter += 1;
1349 warn!(
1350 "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1351 client_type,
1352 retry_counter + 1,
1353 MAX_CONNECTION_ATTEMPTS,
1354 retry_interval_millis as f64 / 1000.0
1355 );
1356 } else {
1357 retry_counter += 1;
1358 }
1359 }
1360 }
1361 }
1362
1363 if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1364 error!(
1365 "{} streamer failed after {} attempts",
1366 client_type, MAX_CONNECTION_ATTEMPTS
1367 );
1368 return Err(AppError::WebSocketError(format!(
1369 "{} streamer: maximum connection attempts ({}) exceeded",
1370 client_type, MAX_CONNECTION_ATTEMPTS
1371 )));
1372 }
1373
1374 info!("{} streamer connection closed gracefully", client_type);
1375 Ok(())
1376 }
1377
1378 pub async fn disconnect(&mut self) -> Result<(), AppError> {
1386 let mut disconnected = 0;
1387
1388 if let Some(client) = self.market_streamer_client.as_ref() {
1389 let mut client = client.lock().await;
1390 client.disconnect().await;
1391 info!("Market streamer disconnected");
1392 disconnected += 1;
1393 }
1394
1395 if let Some(client) = self.price_streamer_client.as_ref() {
1396 let mut client = client.lock().await;
1397 client.disconnect().await;
1398 info!("Price streamer disconnected");
1399 disconnected += 1;
1400 }
1401
1402 info!("Disconnected {} streaming client(s)", disconnected);
1403 Ok(())
1404 }
1405}