1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17 CategoriesResponse, CategoryInstrumentsResponse, DBEntryResponse, HistoricalPricesResponse,
18 MarketNavigationResponse, MarketSearchResponse, MultipleMarketDetailsResponse,
19};
20use crate::model::responses::{
21 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
22};
23use crate::model::streaming::{
24 StreamingAccountDataField, StreamingChartField, StreamingMarketField, StreamingPriceField,
25 get_streaming_account_data_fields, get_streaming_chart_fields, get_streaming_market_fields,
26 get_streaming_price_fields,
27};
28use crate::prelude::{
29 AccountActivityResponse, AccountFields, AccountsResponse, ChartData, ChartScale,
30 OrderConfirmationResponse, PositionsResponse, TradeFields, TransactionHistoryResponse,
31 WorkingOrdersResponse,
32};
33use crate::presentation::market::{MarketData, MarketDetails};
34use crate::presentation::price::PriceData;
35use async_trait::async_trait;
36use lightstreamer_rs::client::{LightstreamerClient, Transport};
37use lightstreamer_rs::subscription::{
38 ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
39};
40use lightstreamer_rs::utils::setup_signal_hook;
41use serde_json::Value;
42use std::collections::HashSet;
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::{Mutex, Notify, RwLock, mpsc};
46use tokio::time::sleep;
47use tracing::{debug, error, info, warn};
48
49const MAX_CONNECTION_ATTEMPTS: u64 = 3;
50
51pub struct Client {
56 http_client: Arc<HttpClient>,
57}
58
59impl Client {
60 pub fn new() -> Self {
65 let http_client = Arc::new(HttpClient::default());
66 Self { http_client }
67 }
68
69 pub async fn get_ws_info(&self) -> WebsocketInfo {
74 self.http_client.get_ws_info().await
75 }
76}
77
78impl Default for Client {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84#[async_trait]
85impl MarketService for Client {
86 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
87 let path = format!("markets?searchTerm={}", search_term);
88 info!("Searching markets with term: {}", search_term);
89 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
90 debug!("{} markets found", result.markets.len());
91 Ok(result)
92 }
93
94 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
95 let path = format!("markets/{epic}");
96 info!("Getting market details: {}", epic);
97 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
98 let market_details: MarketDetails = serde_json::from_value(market_value)?;
99 debug!("Market details obtained for: {}", epic);
100 Ok(market_details)
101 }
102
103 async fn get_multiple_market_details(
104 &self,
105 epics: &[String],
106 ) -> Result<MultipleMarketDetailsResponse, AppError> {
107 if epics.is_empty() {
108 return Ok(MultipleMarketDetailsResponse::default());
109 } else if epics.len() > 50 {
110 return Err(AppError::InvalidInput(
111 "The maximum number of EPICs is 50".to_string(),
112 ));
113 }
114
115 let epics_str = epics.join(",");
116 let path = format!("markets?epics={}", epics_str);
117 debug!(
118 "Getting market details for {} EPICs in a batch",
119 epics.len()
120 );
121
122 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
123
124 Ok(response)
125 }
126
127 async fn get_historical_prices(
128 &self,
129 epic: &str,
130 resolution: &str,
131 from: &str,
132 to: &str,
133 ) -> Result<HistoricalPricesResponse, AppError> {
134 let path = format!(
135 "prices/{}?resolution={}&from={}&to={}",
136 epic, resolution, from, to
137 );
138 info!("Getting historical prices for: {}", epic);
139 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
140 debug!("Historical prices obtained for: {}", epic);
141 Ok(result)
142 }
143
144 async fn get_historical_prices_by_date_range(
145 &self,
146 epic: &str,
147 resolution: &str,
148 start_date: &str,
149 end_date: &str,
150 ) -> Result<HistoricalPricesResponse, AppError> {
151 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
152 info!(
153 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
154 epic, resolution, start_date, end_date
155 );
156 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
157 debug!(
158 "Historical prices obtained for epic: {}, {} data points",
159 epic,
160 result.prices.len()
161 );
162 Ok(result)
163 }
164
165 async fn get_recent_prices(
166 &self,
167 params: &RecentPricesRequest<'_>,
168 ) -> Result<HistoricalPricesResponse, AppError> {
169 let mut query_params = Vec::new();
170
171 if let Some(res) = params.resolution {
172 query_params.push(format!("resolution={}", res));
173 }
174 if let Some(f) = params.from {
175 query_params.push(format!("from={}", f));
176 }
177 if let Some(t) = params.to {
178 query_params.push(format!("to={}", t));
179 }
180 if let Some(max) = params.max_points {
181 query_params.push(format!("max={}", max));
182 }
183 if let Some(size) = params.page_size {
184 query_params.push(format!("pageSize={}", size));
185 }
186 if let Some(num) = params.page_number {
187 query_params.push(format!("pageNumber={}", num));
188 }
189
190 let query_string = if query_params.is_empty() {
191 String::new()
192 } else {
193 format!("?{}", query_params.join("&"))
194 };
195
196 let path = format!("prices/{}{}", params.epic, query_string);
197 info!("Getting recent prices for epic: {}", params.epic);
198 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
199 debug!(
200 "Recent prices obtained for epic: {}, {} data points",
201 params.epic,
202 result.prices.len()
203 );
204 Ok(result)
205 }
206
207 async fn get_historical_prices_by_count_v1(
208 &self,
209 epic: &str,
210 resolution: &str,
211 num_points: i32,
212 ) -> Result<HistoricalPricesResponse, AppError> {
213 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
214 info!(
215 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
216 epic, resolution, num_points
217 );
218 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
219 debug!(
220 "Historical prices (v1) obtained for epic: {}, {} data points",
221 epic,
222 result.prices.len()
223 );
224 Ok(result)
225 }
226
227 async fn get_historical_prices_by_count_v2(
228 &self,
229 epic: &str,
230 resolution: &str,
231 num_points: i32,
232 ) -> Result<HistoricalPricesResponse, AppError> {
233 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
234 info!(
235 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
236 epic, resolution, num_points
237 );
238 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
239 debug!(
240 "Historical prices (v2) obtained for epic: {}, {} data points",
241 epic,
242 result.prices.len()
243 );
244 Ok(result)
245 }
246
247 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
248 let path = "marketnavigation";
249 info!("Getting top-level market navigation nodes");
250 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
251 debug!("{} navigation nodes found", result.nodes.len());
252 debug!("{} markets found at root level", result.markets.len());
253 Ok(result)
254 }
255
256 async fn get_market_navigation_node(
257 &self,
258 node_id: &str,
259 ) -> Result<MarketNavigationResponse, AppError> {
260 let path = format!("marketnavigation/{}", node_id);
261 info!("Getting market navigation node: {}", node_id);
262 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
263 debug!("{} child nodes found", result.nodes.len());
264 debug!("{} markets found in node {}", result.markets.len(), node_id);
265 Ok(result)
266 }
267
268 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
269 let max_depth = 6;
270 info!(
271 "Starting comprehensive market hierarchy traversal (max {} levels)",
272 max_depth
273 );
274
275 let root_response = self.get_market_navigation().await?;
276 info!(
277 "Root navigation: {} nodes, {} markets at top level",
278 root_response.nodes.len(),
279 root_response.markets.len()
280 );
281
282 let mut all_markets = root_response.markets.clone();
283 let mut nodes_to_process = root_response.nodes.clone();
284 let mut processed_levels = 0;
285
286 while !nodes_to_process.is_empty() && processed_levels < max_depth {
287 let mut next_level_nodes = Vec::new();
288 let mut level_market_count = 0;
289
290 info!(
291 "Processing level {} with {} nodes",
292 processed_levels,
293 nodes_to_process.len()
294 );
295
296 for node in &nodes_to_process {
297 match self.get_market_navigation_node(&node.id).await {
298 Ok(node_response) => {
299 let node_markets = node_response.markets.len();
300 let node_children = node_response.nodes.len();
301
302 if node_markets > 0 || node_children > 0 {
303 debug!(
304 "Node '{}' (level {}): {} markets, {} child nodes",
305 node.name, processed_levels, node_markets, node_children
306 );
307 }
308
309 all_markets.extend(node_response.markets);
310 level_market_count += node_markets;
311 next_level_nodes.extend(node_response.nodes);
312 }
313 Err(e) => {
314 tracing::error!(
315 "Failed to get markets for node '{}' at level {}: {:?}",
316 node.name,
317 processed_levels,
318 e
319 );
320 }
321 }
322 }
323
324 info!(
325 "Level {} completed: {} markets found, {} nodes for next level",
326 processed_levels,
327 level_market_count,
328 next_level_nodes.len()
329 );
330
331 nodes_to_process = next_level_nodes;
332 processed_levels += 1;
333 }
334
335 info!(
336 "Market hierarchy traversal completed: {} total markets found across {} levels",
337 all_markets.len(),
338 processed_levels
339 );
340
341 Ok(all_markets)
342 }
343
344 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
345 info!("Getting all markets from hierarchy for DB entries");
346
347 let all_markets = self.get_all_markets().await?;
348 info!("Collected {} markets from hierarchy", all_markets.len());
349
350 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
351 .iter()
352 .map(DBEntryResponse::from)
353 .filter(|entry| !entry.epic.is_empty())
354 .collect();
355
356 info!("Created {} DB entries from markets", vec_db_entries.len());
357
358 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
360 .iter()
361 .map(|entry| entry.symbol.clone())
362 .filter(|symbol| !symbol.is_empty())
363 .collect();
364
365 info!(
366 "Found {} unique symbols to fetch expiry dates for",
367 unique_symbols.len()
368 );
369
370 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
371 std::collections::HashMap::new();
372
373 for symbol in unique_symbols {
374 if let Some(entry) = vec_db_entries
375 .iter()
376 .find(|e| e.symbol == symbol && !e.epic.is_empty())
377 {
378 match self.get_market_details(&entry.epic).await {
379 Ok(market_details) => {
380 let expiry_date = market_details
381 .instrument
382 .expiry_details
383 .as_ref()
384 .map(|details| details.last_dealing_date.clone())
385 .unwrap_or_else(|| market_details.instrument.expiry.clone());
386
387 symbol_expiry_map.insert(symbol.clone(), expiry_date);
388 info!(
389 "Fetched expiry date for symbol {}: {}",
390 symbol,
391 symbol_expiry_map.get(&symbol).unwrap()
392 );
393 }
394 Err(e) => {
395 tracing::error!(
396 "Failed to get market details for epic {} (symbol {}): {:?}",
397 entry.epic,
398 symbol,
399 e
400 );
401 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
402 }
403 }
404 }
405 }
406
407 for entry in &mut vec_db_entries {
408 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
409 entry.expiry = expiry_date.clone();
410 }
411 }
412
413 info!("Updated expiry dates for {} entries", vec_db_entries.len());
414 Ok(vec_db_entries)
415 }
416
417 async fn get_categories(&self) -> Result<CategoriesResponse, AppError> {
418 info!("Getting all categories of instruments");
419 let result: CategoriesResponse = self.http_client.get("categories", Some(1)).await?;
420 debug!("{} categories found", result.categories.len());
421 Ok(result)
422 }
423
424 async fn get_category_instruments(
425 &self,
426 category_id: &str,
427 page_number: Option<i32>,
428 page_size: Option<i32>,
429 ) -> Result<CategoryInstrumentsResponse, AppError> {
430 let mut path = format!("categories/{}/instruments", category_id);
431
432 let mut query_params = Vec::new();
433 if let Some(page) = page_number {
434 query_params.push(format!("pageNumber={}", page));
435 }
436 if let Some(size) = page_size {
437 if size > 1000 {
438 return Err(AppError::InvalidInput(
439 "pageSize cannot exceed 1000".to_string(),
440 ));
441 }
442 query_params.push(format!("pageSize={}", size));
443 }
444
445 if !query_params.is_empty() {
446 path = format!("{}?{}", path, query_params.join("&"));
447 }
448
449 info!(
450 "Getting instruments for category: {} (page: {:?}, size: {:?})",
451 category_id, page_number, page_size
452 );
453 let result: CategoryInstrumentsResponse = self.http_client.get(&path, Some(1)).await?;
454 debug!(
455 "{} instruments found in category {}",
456 result.instruments.len(),
457 category_id
458 );
459 Ok(result)
460 }
461}
462
463#[async_trait]
464impl AccountService for Client {
465 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
466 info!("Getting account information");
467 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
468 debug!(
469 "Account information obtained: {} accounts",
470 result.accounts.len()
471 );
472 Ok(result)
473 }
474
475 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
476 debug!("Getting open positions");
477 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
478 debug!("Positions obtained: {} positions", result.positions.len());
479 Ok(result)
480 }
481
482 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
483 debug!("Getting open positions with filter: {}", filter);
484 let mut positions = self.get_positions().await?;
485
486 positions
487 .positions
488 .retain(|position| position.market.epic.contains(filter));
489
490 debug!(
491 "Positions obtained after filtering: {} positions",
492 positions.positions.len()
493 );
494 Ok(positions)
495 }
496
497 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
498 info!("Getting working orders");
499 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
500 debug!(
501 "Working orders obtained: {} orders",
502 result.working_orders.len()
503 );
504 Ok(result)
505 }
506
507 async fn get_activity(
508 &self,
509 from: &str,
510 to: &str,
511 ) -> Result<AccountActivityResponse, AppError> {
512 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
513 info!("Getting account activity");
514 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
515 debug!(
516 "Account activity obtained: {} activities",
517 result.activities.len()
518 );
519 Ok(result)
520 }
521
522 async fn get_activity_with_details(
523 &self,
524 from: &str,
525 to: &str,
526 ) -> Result<AccountActivityResponse, AppError> {
527 let path = format!(
528 "history/activity?from={}&to={}&detailed=true&pageSize=500",
529 from, to
530 );
531 info!("Getting detailed account activity");
532 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
533 debug!(
534 "Detailed account activity obtained: {} activities",
535 result.activities.len()
536 );
537 Ok(result)
538 }
539
540 async fn get_transactions(
541 &self,
542 from: &str,
543 to: &str,
544 ) -> Result<TransactionHistoryResponse, AppError> {
545 const PAGE_SIZE: u32 = 200;
546 let mut all_transactions = Vec::new();
547 let mut current_page = 1;
548 #[allow(unused_assignments)]
549 let mut last_metadata = None;
550
551 loop {
552 let path = format!(
553 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
554 from, to, PAGE_SIZE, current_page
555 );
556 info!("Getting transaction history page {}", current_page);
557
558 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
559
560 let total_pages = result.metadata.page_data.total_pages as u32;
561 last_metadata = Some(result.metadata);
562 all_transactions.extend(result.transactions);
563
564 if current_page >= total_pages {
565 break;
566 }
567 current_page += 1;
568 }
569
570 debug!(
571 "Total transaction history obtained: {} transactions",
572 all_transactions.len()
573 );
574
575 Ok(TransactionHistoryResponse {
576 transactions: all_transactions,
577 metadata: last_metadata
578 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
579 })
580 }
581}
582
583#[async_trait]
584impl OrderService for Client {
585 async fn create_order(
586 &self,
587 order: &CreateOrderRequest,
588 ) -> Result<CreateOrderResponse, AppError> {
589 info!("Creating order for: {}", order.epic);
590 let result: CreateOrderResponse = self
591 .http_client
592 .post("positions/otc", order, Some(2))
593 .await?;
594 debug!("Order created with reference: {}", result.deal_reference);
595 Ok(result)
596 }
597
598 async fn get_order_confirmation(
599 &self,
600 deal_reference: &str,
601 ) -> Result<OrderConfirmationResponse, AppError> {
602 let path = format!("confirms/{}", deal_reference);
603 info!("Getting confirmation for order: {}", deal_reference);
604 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
605 debug!("Confirmation obtained for order: {}", deal_reference);
606 Ok(result)
607 }
608
609 async fn get_order_confirmation_w_retry(
610 &self,
611 deal_reference: &str,
612 retries: u64,
613 delay_ms: u64,
614 ) -> Result<OrderConfirmationResponse, AppError> {
615 let mut attempts = 0;
616 loop {
617 match self.get_order_confirmation(deal_reference).await {
618 Ok(response) => return Ok(response),
619 Err(e) => {
620 attempts += 1;
621 if attempts > retries {
622 return Err(e);
623 }
624 warn!(
625 "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
626 attempts, retries, e, delay_ms
627 );
628 sleep(Duration::from_millis(delay_ms)).await;
629 }
630 }
631 }
632 }
633
634 async fn update_position(
635 &self,
636 deal_id: &str,
637 update: &UpdatePositionRequest,
638 ) -> Result<UpdatePositionResponse, AppError> {
639 let path = format!("positions/otc/{}", deal_id);
640 info!("Updating position: {}", deal_id);
641 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
642 debug!(
643 "Position updated: {} with deal reference: {}",
644 deal_id, result.deal_reference
645 );
646 Ok(result)
647 }
648
649 async fn update_level_in_position(
650 &self,
651 deal_id: &str,
652 limit_level: Option<f64>,
653 ) -> Result<UpdatePositionResponse, AppError> {
654 let path = format!("positions/otc/{}", deal_id);
655 info!("Updating position: {}", deal_id);
656 let limit_level = limit_level.unwrap_or(0.0);
657
658 let update: UpdatePositionRequest = UpdatePositionRequest {
659 guaranteed_stop: None,
660 limit_level: Some(limit_level),
661 stop_level: None,
662 trailing_stop: None,
663 trailing_stop_distance: None,
664 trailing_stop_increment: None,
665 };
666 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
667 debug!(
668 "Position updated: {} with deal reference: {}",
669 deal_id, result.deal_reference
670 );
671 Ok(result)
672 }
673
674 async fn close_position(
675 &self,
676 close_request: &ClosePositionRequest,
677 ) -> Result<ClosePositionResponse, AppError> {
678 info!("Closing position");
679
680 let result: ClosePositionResponse = self
683 .http_client
684 .post_with_delete_method("positions/otc", close_request, Some(1))
685 .await?;
686
687 debug!("Position closed with reference: {}", result.deal_reference);
688 Ok(result)
689 }
690
691 async fn create_working_order(
692 &self,
693 order: &CreateWorkingOrderRequest,
694 ) -> Result<CreateWorkingOrderResponse, AppError> {
695 info!("Creating working order for: {}", order.epic);
696 let result: CreateWorkingOrderResponse = self
697 .http_client
698 .post("workingorders/otc", order, Some(2))
699 .await?;
700 debug!(
701 "Working order created with reference: {}",
702 result.deal_reference
703 );
704 Ok(result)
705 }
706
707 async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
708 let path = format!("workingorders/otc/{}", deal_id);
709 let result: CreateWorkingOrderResponse =
710 self.http_client.delete(path.as_str(), Some(2)).await?;
711 debug!(
712 "Working order created with reference: {}",
713 result.deal_reference
714 );
715 Ok(())
716 }
717}
718
719pub struct StreamerClient {
729 account_id: String,
730 market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
731 price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
732 has_market_stream_subs: bool,
734 has_price_stream_subs: bool,
735}
736
737impl StreamerClient {
738 pub async fn new() -> Result<Self, AppError> {
747 let http_client_raw = Arc::new(RwLock::new(Client::new()));
748 let http_client = http_client_raw.read().await;
749 let ws_info = http_client.get_ws_info().await;
750 let password = ws_info.get_ws_password();
751
752 let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
754 Some(ws_info.server.as_str()),
755 None,
756 Some(&ws_info.account_id),
757 Some(&password),
758 )?));
759
760 let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
761 Some(ws_info.server.as_str()),
762 None,
763 Some(&ws_info.account_id),
764 Some(&password),
765 )?));
766
767 {
769 let mut client = market_streamer_client.lock().await;
770 client
771 .connection_options
772 .set_forced_transport(Some(Transport::WsStreaming));
773 }
774 {
775 let mut client = price_streamer_client.lock().await;
776 client
777 .connection_options
778 .set_forced_transport(Some(Transport::WsStreaming));
779 }
780
781 Ok(Self {
782 account_id: ws_info.account_id.clone(),
783 market_streamer_client: Some(market_streamer_client),
784 price_streamer_client: Some(price_streamer_client),
785 has_market_stream_subs: false,
786 has_price_stream_subs: false,
787 })
788 }
789
790 pub async fn default() -> Result<Self, AppError> {
792 Self::new().await
793 }
794
795 pub async fn market_subscribe(
825 &mut self,
826 epics: Vec<String>,
827 fields: HashSet<StreamingMarketField>,
828 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
829 self.has_market_stream_subs = true;
831
832 let fields = get_streaming_market_fields(&fields);
833 let market_epics: Vec<String> = epics
834 .iter()
835 .map(|epic| "MARKET:".to_string() + epic)
836 .collect();
837 let mut subscription =
838 Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
839
840 subscription.set_data_adapter(None)?;
841 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
842
843 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
845 subscription.add_listener(Box::new(listener));
846
847 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
849 AppError::WebSocketError("market streamer client not initialized".to_string())
850 })?;
851
852 {
853 let mut client = client.lock().await;
854 client
855 .connection_options
856 .set_forced_transport(Some(Transport::WsStreaming));
857 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
858 }
859
860 let (price_tx, price_rx) = mpsc::unbounded_channel();
862 tokio::spawn(async move {
863 let mut receiver = item_receiver;
864 while let Some(item_update) = receiver.recv().await {
865 let price_data = PriceData::from(&item_update);
866 let _ = price_tx.send(price_data);
867 }
868 });
869
870 info!(
871 "Market subscription created for {} instruments",
872 epics.len()
873 );
874 Ok(price_rx)
875 }
876
877 pub async fn trade_subscribe(
900 &mut self,
901 ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
902 self.has_market_stream_subs = true;
904
905 let account_id = self.account_id.clone();
906 let fields = Some(vec![
907 "CONFIRMS".to_string(),
908 "OPU".to_string(),
909 "WOU".to_string(),
910 ]);
911 let trade_items = vec![format!("TRADE:{account_id}")];
912
913 let mut subscription =
914 Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
915
916 subscription.set_data_adapter(None)?;
917 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
918
919 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
921 subscription.add_listener(Box::new(listener));
922
923 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
925 AppError::WebSocketError("market streamer client not initialized".to_string())
926 })?;
927
928 {
929 let mut client = client.lock().await;
930 client
931 .connection_options
932 .set_forced_transport(Some(Transport::WsStreaming));
933 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
934 }
935
936 let (trade_tx, trade_rx) = mpsc::unbounded_channel();
938 tokio::spawn(async move {
939 let mut receiver = item_receiver;
940 while let Some(item_update) = receiver.recv().await {
941 let trade_data = crate::presentation::trade::TradeData::from(&item_update);
942 let _ = trade_tx.send(trade_data.fields);
943 }
944 });
945
946 info!("Trade subscription created for account: {}", account_id);
947 Ok(trade_rx)
948 }
949
950 pub async fn account_subscribe(
977 &mut self,
978 fields: HashSet<StreamingAccountDataField>,
979 ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
980 self.has_market_stream_subs = true;
982
983 let fields = get_streaming_account_data_fields(&fields);
984 let account_id = self.account_id.clone();
985 let account_items = vec![format!("ACCOUNT:{account_id}")];
986
987 let mut subscription =
988 Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
989
990 subscription.set_data_adapter(None)?;
991 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
992
993 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
995 subscription.add_listener(Box::new(listener));
996
997 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
999 AppError::WebSocketError("market streamer client not initialized".to_string())
1000 })?;
1001
1002 {
1003 let mut client = client.lock().await;
1004 client
1005 .connection_options
1006 .set_forced_transport(Some(Transport::WsStreaming));
1007 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1008 }
1009
1010 let (account_tx, account_rx) = mpsc::unbounded_channel();
1012 tokio::spawn(async move {
1013 let mut receiver = item_receiver;
1014 while let Some(item_update) = receiver.recv().await {
1015 let account_data = crate::presentation::account::AccountData::from(&item_update);
1016 let _ = account_tx.send(account_data.fields);
1017 }
1018 });
1019
1020 info!("Account subscription created for account: {}", account_id);
1021 Ok(account_rx)
1022 }
1023
1024 pub async fn price_subscribe(
1055 &mut self,
1056 epics: Vec<String>,
1057 fields: HashSet<StreamingPriceField>,
1058 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1059 self.has_price_stream_subs = true;
1061
1062 let fields = get_streaming_price_fields(&fields);
1063 let account_id = self.account_id.clone();
1064 let price_epics: Vec<String> = epics
1065 .iter()
1066 .map(|epic| format!("PRICE:{account_id}:{epic}"))
1067 .collect();
1068
1069 tracing::debug!("Pricing subscribe items: {:?}", price_epics);
1071 tracing::debug!("Pricing subscribe fields: {:?}", fields);
1072
1073 let mut subscription =
1074 Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1075
1076 let pricing_adapter =
1078 std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1079 tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1080 subscription.set_data_adapter(Some(pricing_adapter))?;
1081 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1082
1083 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1085 subscription.add_listener(Box::new(listener));
1086
1087 let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1089 AppError::WebSocketError("price streamer client not initialized".to_string())
1090 })?;
1091
1092 {
1093 let mut client = client.lock().await;
1094 client
1095 .connection_options
1096 .set_forced_transport(Some(Transport::WsStreaming));
1097 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1098 }
1099
1100 let (price_tx, price_rx) = mpsc::unbounded_channel();
1102 tokio::spawn(async move {
1103 let mut receiver = item_receiver;
1104 while let Some(item_update) = receiver.recv().await {
1105 let price_data = PriceData::from(&item_update);
1106 let _ = price_tx.send(price_data);
1107 }
1108 });
1109
1110 info!(
1111 "Price subscription created for {} instruments (account: {})",
1112 epics.len(),
1113 account_id
1114 );
1115 Ok(price_rx)
1116 }
1117
1118 pub async fn chart_subscribe(
1151 &mut self,
1152 epics: Vec<String>,
1153 scale: ChartScale,
1154 fields: HashSet<StreamingChartField>,
1155 ) -> Result<mpsc::UnboundedReceiver<ChartData>, AppError> {
1156 self.has_market_stream_subs = true;
1158
1159 let fields = get_streaming_chart_fields(&fields);
1160
1161 let chart_items: Vec<String> = epics
1162 .iter()
1163 .map(|epic| format!("CHART:{epic}:{scale}",))
1164 .collect();
1165
1166 let mode = if matches!(scale, ChartScale::Tick) {
1168 SubscriptionMode::Distinct
1169 } else {
1170 SubscriptionMode::Merge
1171 };
1172
1173 let mut subscription = Subscription::new(mode, Some(chart_items), Some(fields))?;
1174
1175 subscription.set_data_adapter(None)?;
1176 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1177
1178 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1180 subscription.add_listener(Box::new(listener));
1181
1182 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1184 AppError::WebSocketError("market streamer client not initialized".to_string())
1185 })?;
1186
1187 {
1188 let mut client = client.lock().await;
1189 client
1190 .connection_options
1191 .set_forced_transport(Some(Transport::WsStreaming));
1192 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1193 }
1194
1195 let (chart_tx, chart_rx) = mpsc::unbounded_channel();
1197 tokio::spawn(async move {
1198 let mut receiver = item_receiver;
1199 while let Some(item_update) = receiver.recv().await {
1200 let chart_data = ChartData::from(&item_update);
1201 let _ = chart_tx.send(chart_data);
1202 }
1203 });
1204
1205 info!(
1206 "Chart subscription created for {} instruments (scale: {})",
1207 epics.len(),
1208 scale
1209 );
1210
1211 Ok(chart_rx)
1212 }
1213
1214 pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1231 let signal = if let Some(sig) = shutdown_signal {
1233 sig
1234 } else {
1235 let sig = Arc::new(Notify::new());
1236 setup_signal_hook(Arc::clone(&sig)).await;
1237 sig
1238 };
1239
1240 let mut tasks = Vec::new();
1241
1242 if self.has_market_stream_subs {
1244 if let Some(client) = self.market_streamer_client.as_ref() {
1245 let client = Arc::clone(client);
1246 let signal = Arc::clone(&signal);
1247 let task =
1248 tokio::spawn(
1249 async move { Self::connect_client(client, signal, "Market").await },
1250 );
1251 tasks.push(task);
1252 }
1253 } else {
1254 info!("Skipping Market streamer connection: no active subscriptions");
1255 }
1256
1257 if self.has_price_stream_subs {
1259 if let Some(client) = self.price_streamer_client.as_ref() {
1260 let client = Arc::clone(client);
1261 let signal = Arc::clone(&signal);
1262 let task =
1263 tokio::spawn(
1264 async move { Self::connect_client(client, signal, "Price").await },
1265 );
1266 tasks.push(task);
1267 }
1268 } else {
1269 info!("Skipping Price streamer connection: no active subscriptions");
1270 }
1271
1272 if tasks.is_empty() {
1273 warn!("No streaming clients selected for connection (no active subscriptions)");
1274 return Ok(());
1275 }
1276
1277 info!("Connecting {} streaming client(s)...", tasks.len());
1278
1279 let results = futures::future::join_all(tasks).await;
1281
1282 let mut has_error = false;
1284 for (idx, result) in results.iter().enumerate() {
1285 match result {
1286 Ok(Ok(_)) => {
1287 debug!("Streaming client {} completed successfully", idx);
1288 }
1289 Ok(Err(e)) => {
1290 error!("Streaming client {} failed: {:?}", idx, e);
1291 has_error = true;
1292 }
1293 Err(e) => {
1294 error!("Streaming client {} task panicked: {:?}", idx, e);
1295 has_error = true;
1296 }
1297 }
1298 }
1299
1300 if has_error {
1301 return Err(AppError::WebSocketError(
1302 "one or more streaming connections failed".to_string(),
1303 ));
1304 }
1305
1306 info!("All streaming connections closed gracefully");
1307 Ok(())
1308 }
1309
1310 async fn connect_client(
1312 client: Arc<Mutex<LightstreamerClient>>,
1313 signal: Arc<Notify>,
1314 client_type: &str,
1315 ) -> Result<(), AppError> {
1316 let mut retry_interval_millis: u64 = 0;
1317 let mut retry_counter: u64 = 0;
1318
1319 while retry_counter < MAX_CONNECTION_ATTEMPTS {
1320 let connect_result = {
1321 let mut client = client.lock().await;
1322 client.connect_direct(Arc::clone(&signal)).await
1323 };
1324
1325 let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1327
1328 match result_with_string_error {
1329 Ok(_) => {
1330 info!("{} streamer connected successfully", client_type);
1331 break;
1332 }
1333 Err(error_msg) => {
1334 if error_msg.contains("No more requests to fulfill") {
1336 info!(
1337 "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1338 client_type
1339 );
1340 return Ok(());
1341 }
1342
1343 error!("{} streamer connection failed: {}", client_type, error_msg);
1344
1345 if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1346 tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1347 .await;
1348 retry_interval_millis =
1349 (retry_interval_millis + (200 * retry_counter)).min(5000);
1350 retry_counter += 1;
1351 warn!(
1352 "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1353 client_type,
1354 retry_counter + 1,
1355 MAX_CONNECTION_ATTEMPTS,
1356 retry_interval_millis as f64 / 1000.0
1357 );
1358 } else {
1359 retry_counter += 1;
1360 }
1361 }
1362 }
1363 }
1364
1365 if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1366 error!(
1367 "{} streamer failed after {} attempts",
1368 client_type, MAX_CONNECTION_ATTEMPTS
1369 );
1370 return Err(AppError::WebSocketError(format!(
1371 "{} streamer: maximum connection attempts ({}) exceeded",
1372 client_type, MAX_CONNECTION_ATTEMPTS
1373 )));
1374 }
1375
1376 info!("{} streamer connection closed gracefully", client_type);
1377 Ok(())
1378 }
1379
1380 pub async fn disconnect(&mut self) -> Result<(), AppError> {
1388 let mut disconnected = 0;
1389
1390 if let Some(client) = self.market_streamer_client.as_ref() {
1391 let mut client = client.lock().await;
1392 client.disconnect().await;
1393 info!("Market streamer disconnected");
1394 disconnected += 1;
1395 }
1396
1397 if let Some(client) = self.price_streamer_client.as_ref() {
1398 let mut client = client.lock().await;
1399 client.disconnect().await;
1400 info!("Price streamer disconnected");
1401 disconnected += 1;
1402 }
1403
1404 info!("Disconnected {} streaming client(s)", disconnected);
1405 Ok(())
1406 }
1407}