1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
18};
19use crate::model::responses::{
20 DBEntryResponse, HistoricalPricesResponse, MarketNavigationResponse, MarketSearchResponse,
21 MultipleMarketDetailsResponse,
22};
23use crate::model::streaming::{
24 StreamingAccountDataField, StreamingMarketField, StreamingPriceField,
25 get_streaming_account_data_fields, get_streaming_market_fields, get_streaming_price_fields,
26};
27use crate::prelude::{
28 AccountActivityResponse, AccountFields, AccountsResponse, OrderConfirmationResponse,
29 PositionsResponse, TradeFields, TransactionHistoryResponse, WorkingOrdersResponse,
30};
31use crate::presentation::market::{MarketData, MarketDetails};
32use crate::presentation::price::PriceData;
33use async_trait::async_trait;
34use lightstreamer_rs::client::{LightstreamerClient, Transport};
35use lightstreamer_rs::subscription::{
36 ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
37};
38use lightstreamer_rs::utils::setup_signal_hook;
39use serde_json::Value;
40use std::collections::HashSet;
41use std::sync::Arc;
42use std::time::Duration;
43use tokio::sync::{Mutex, Notify, RwLock, mpsc};
44use tokio::time::sleep;
45use tracing::{debug, error, info, warn};
46
47const MAX_CONNECTION_ATTEMPTS: u64 = 3;
48
49pub struct Client {
54 http_client: Arc<HttpClient>,
55}
56
57impl Client {
58 pub fn new() -> Self {
63 let http_client = Arc::new(HttpClient::default());
64 Self { http_client }
65 }
66
67 pub async fn get_ws_info(&self) -> WebsocketInfo {
72 self.http_client.get_ws_info().await
73 }
74}
75
76impl Default for Client {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82#[async_trait]
83impl MarketService for Client {
84 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
85 let path = format!("markets?searchTerm={}", search_term);
86 info!("Searching markets with term: {}", search_term);
87 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
88 debug!("{} markets found", result.markets.len());
89 Ok(result)
90 }
91
92 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
93 let path = format!("markets/{epic}");
94 info!("Getting market details: {}", epic);
95 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
96 let market_details: MarketDetails = serde_json::from_value(market_value)?;
97 debug!("Market details obtained for: {}", epic);
98 Ok(market_details)
99 }
100
101 async fn get_multiple_market_details(
102 &self,
103 epics: &[String],
104 ) -> Result<MultipleMarketDetailsResponse, AppError> {
105 if epics.is_empty() {
106 return Ok(MultipleMarketDetailsResponse::default());
107 } else if epics.len() > 50 {
108 return Err(AppError::InvalidInput(
109 "The maximum number of EPICs is 50".to_string(),
110 ));
111 }
112
113 let epics_str = epics.join(",");
114 let path = format!("markets?epics={}", epics_str);
115 debug!(
116 "Getting market details for {} EPICs in a batch",
117 epics.len()
118 );
119
120 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
121
122 Ok(response)
123 }
124
125 async fn get_historical_prices(
126 &self,
127 epic: &str,
128 resolution: &str,
129 from: &str,
130 to: &str,
131 ) -> Result<HistoricalPricesResponse, AppError> {
132 let path = format!(
133 "prices/{}?resolution={}&from={}&to={}",
134 epic, resolution, from, to
135 );
136 info!("Getting historical prices for: {}", epic);
137 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
138 debug!("Historical prices obtained for: {}", epic);
139 Ok(result)
140 }
141
142 async fn get_historical_prices_by_date_range(
143 &self,
144 epic: &str,
145 resolution: &str,
146 start_date: &str,
147 end_date: &str,
148 ) -> Result<HistoricalPricesResponse, AppError> {
149 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
150 info!(
151 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
152 epic, resolution, start_date, end_date
153 );
154 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
155 debug!(
156 "Historical prices obtained for epic: {}, {} data points",
157 epic,
158 result.prices.len()
159 );
160 Ok(result)
161 }
162
163 async fn get_recent_prices(
164 &self,
165 params: &RecentPricesRequest<'_>,
166 ) -> Result<HistoricalPricesResponse, AppError> {
167 let mut query_params = Vec::new();
168
169 if let Some(res) = params.resolution {
170 query_params.push(format!("resolution={}", res));
171 }
172 if let Some(f) = params.from {
173 query_params.push(format!("from={}", f));
174 }
175 if let Some(t) = params.to {
176 query_params.push(format!("to={}", t));
177 }
178 if let Some(max) = params.max_points {
179 query_params.push(format!("max={}", max));
180 }
181 if let Some(size) = params.page_size {
182 query_params.push(format!("pageSize={}", size));
183 }
184 if let Some(num) = params.page_number {
185 query_params.push(format!("pageNumber={}", num));
186 }
187
188 let query_string = if query_params.is_empty() {
189 String::new()
190 } else {
191 format!("?{}", query_params.join("&"))
192 };
193
194 let path = format!("prices/{}{}", params.epic, query_string);
195 info!("Getting recent prices for epic: {}", params.epic);
196 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
197 debug!(
198 "Recent prices obtained for epic: {}, {} data points",
199 params.epic,
200 result.prices.len()
201 );
202 Ok(result)
203 }
204
205 async fn get_historical_prices_by_count_v1(
206 &self,
207 epic: &str,
208 resolution: &str,
209 num_points: i32,
210 ) -> Result<HistoricalPricesResponse, AppError> {
211 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
212 info!(
213 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
214 epic, resolution, num_points
215 );
216 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
217 debug!(
218 "Historical prices (v1) obtained for epic: {}, {} data points",
219 epic,
220 result.prices.len()
221 );
222 Ok(result)
223 }
224
225 async fn get_historical_prices_by_count_v2(
226 &self,
227 epic: &str,
228 resolution: &str,
229 num_points: i32,
230 ) -> Result<HistoricalPricesResponse, AppError> {
231 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
232 info!(
233 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
234 epic, resolution, num_points
235 );
236 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
237 debug!(
238 "Historical prices (v2) obtained for epic: {}, {} data points",
239 epic,
240 result.prices.len()
241 );
242 Ok(result)
243 }
244
245 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
246 let path = "marketnavigation";
247 info!("Getting top-level market navigation nodes");
248 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
249 debug!("{} navigation nodes found", result.nodes.len());
250 debug!("{} markets found at root level", result.markets.len());
251 Ok(result)
252 }
253
254 async fn get_market_navigation_node(
255 &self,
256 node_id: &str,
257 ) -> Result<MarketNavigationResponse, AppError> {
258 let path = format!("marketnavigation/{}", node_id);
259 info!("Getting market navigation node: {}", node_id);
260 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
261 debug!("{} child nodes found", result.nodes.len());
262 debug!("{} markets found in node {}", result.markets.len(), node_id);
263 Ok(result)
264 }
265
266 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
267 let max_depth = 6;
268 info!(
269 "Starting comprehensive market hierarchy traversal (max {} levels)",
270 max_depth
271 );
272
273 let root_response = self.get_market_navigation().await?;
274 info!(
275 "Root navigation: {} nodes, {} markets at top level",
276 root_response.nodes.len(),
277 root_response.markets.len()
278 );
279
280 let mut all_markets = root_response.markets.clone();
281 let mut nodes_to_process = root_response.nodes.clone();
282 let mut processed_levels = 0;
283
284 while !nodes_to_process.is_empty() && processed_levels < max_depth {
285 let mut next_level_nodes = Vec::new();
286 let mut level_market_count = 0;
287
288 info!(
289 "Processing level {} with {} nodes",
290 processed_levels,
291 nodes_to_process.len()
292 );
293
294 for node in &nodes_to_process {
295 match self.get_market_navigation_node(&node.id).await {
296 Ok(node_response) => {
297 let node_markets = node_response.markets.len();
298 let node_children = node_response.nodes.len();
299
300 if node_markets > 0 || node_children > 0 {
301 debug!(
302 "Node '{}' (level {}): {} markets, {} child nodes",
303 node.name, processed_levels, node_markets, node_children
304 );
305 }
306
307 all_markets.extend(node_response.markets);
308 level_market_count += node_markets;
309 next_level_nodes.extend(node_response.nodes);
310 }
311 Err(e) => {
312 tracing::error!(
313 "Failed to get markets for node '{}' at level {}: {:?}",
314 node.name,
315 processed_levels,
316 e
317 );
318 }
319 }
320 }
321
322 info!(
323 "Level {} completed: {} markets found, {} nodes for next level",
324 processed_levels,
325 level_market_count,
326 next_level_nodes.len()
327 );
328
329 nodes_to_process = next_level_nodes;
330 processed_levels += 1;
331 }
332
333 info!(
334 "Market hierarchy traversal completed: {} total markets found across {} levels",
335 all_markets.len(),
336 processed_levels
337 );
338
339 Ok(all_markets)
340 }
341
342 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
343 info!("Getting all markets from hierarchy for DB entries");
344
345 let all_markets = self.get_all_markets().await?;
346 info!("Collected {} markets from hierarchy", all_markets.len());
347
348 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
349 .iter()
350 .map(DBEntryResponse::from)
351 .filter(|entry| !entry.epic.is_empty())
352 .collect();
353
354 info!("Created {} DB entries from markets", vec_db_entries.len());
355
356 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
358 .iter()
359 .map(|entry| entry.symbol.clone())
360 .filter(|symbol| !symbol.is_empty())
361 .collect();
362
363 info!(
364 "Found {} unique symbols to fetch expiry dates for",
365 unique_symbols.len()
366 );
367
368 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
369 std::collections::HashMap::new();
370
371 for symbol in unique_symbols {
372 if let Some(entry) = vec_db_entries
373 .iter()
374 .find(|e| e.symbol == symbol && !e.epic.is_empty())
375 {
376 match self.get_market_details(&entry.epic).await {
377 Ok(market_details) => {
378 let expiry_date = market_details
379 .instrument
380 .expiry_details
381 .as_ref()
382 .map(|details| details.last_dealing_date.clone())
383 .unwrap_or_else(|| market_details.instrument.expiry.clone());
384
385 symbol_expiry_map.insert(symbol.clone(), expiry_date);
386 info!(
387 "Fetched expiry date for symbol {}: {}",
388 symbol,
389 symbol_expiry_map.get(&symbol).unwrap()
390 );
391 }
392 Err(e) => {
393 tracing::error!(
394 "Failed to get market details for epic {} (symbol {}): {:?}",
395 entry.epic,
396 symbol,
397 e
398 );
399 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
400 }
401 }
402 }
403 }
404
405 for entry in &mut vec_db_entries {
406 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
407 entry.expiry = expiry_date.clone();
408 }
409 }
410
411 info!("Updated expiry dates for {} entries", vec_db_entries.len());
412 Ok(vec_db_entries)
413 }
414}
415
416#[async_trait]
417impl AccountService for Client {
418 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
419 info!("Getting account information");
420 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
421 debug!(
422 "Account information obtained: {} accounts",
423 result.accounts.len()
424 );
425 Ok(result)
426 }
427
428 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
429 debug!("Getting open positions");
430 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
431 debug!("Positions obtained: {} positions", result.positions.len());
432 Ok(result)
433 }
434
435 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
436 debug!("Getting open positions with filter: {}", filter);
437 let mut positions = self.get_positions().await?;
438
439 positions
440 .positions
441 .retain(|position| position.market.epic.contains(filter));
442
443 debug!(
444 "Positions obtained after filtering: {} positions",
445 positions.positions.len()
446 );
447 Ok(positions)
448 }
449
450 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
451 info!("Getting working orders");
452 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
453 debug!(
454 "Working orders obtained: {} orders",
455 result.working_orders.len()
456 );
457 Ok(result)
458 }
459
460 async fn get_activity(
461 &self,
462 from: &str,
463 to: &str,
464 ) -> Result<AccountActivityResponse, AppError> {
465 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
466 info!("Getting account activity");
467 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
468 debug!(
469 "Account activity obtained: {} activities",
470 result.activities.len()
471 );
472 Ok(result)
473 }
474
475 async fn get_activity_with_details(
476 &self,
477 from: &str,
478 to: &str,
479 ) -> Result<AccountActivityResponse, AppError> {
480 let path = format!(
481 "history/activity?from={}&to={}&detailed=true&pageSize=500",
482 from, to
483 );
484 info!("Getting detailed account activity");
485 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
486 debug!(
487 "Detailed account activity obtained: {} activities",
488 result.activities.len()
489 );
490 Ok(result)
491 }
492
493 async fn get_transactions(
494 &self,
495 from: &str,
496 to: &str,
497 ) -> Result<TransactionHistoryResponse, AppError> {
498 const PAGE_SIZE: u32 = 200;
499 let mut all_transactions = Vec::new();
500 let mut current_page = 1;
501 #[allow(unused_assignments)]
502 let mut last_metadata = None;
503
504 loop {
505 let path = format!(
506 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
507 from, to, PAGE_SIZE, current_page
508 );
509 info!("Getting transaction history page {}", current_page);
510
511 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
512
513 let total_pages = result.metadata.page_data.total_pages as u32;
514 last_metadata = Some(result.metadata);
515 all_transactions.extend(result.transactions);
516
517 if current_page >= total_pages {
518 break;
519 }
520 current_page += 1;
521 }
522
523 debug!(
524 "Total transaction history obtained: {} transactions",
525 all_transactions.len()
526 );
527
528 Ok(TransactionHistoryResponse {
529 transactions: all_transactions,
530 metadata: last_metadata
531 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
532 })
533 }
534}
535
536#[async_trait]
537impl OrderService for Client {
538 async fn create_order(
539 &self,
540 order: &CreateOrderRequest,
541 ) -> Result<CreateOrderResponse, AppError> {
542 info!("Creating order for: {}", order.epic);
543 let result: CreateOrderResponse = self
544 .http_client
545 .post("positions/otc", order, Some(2))
546 .await?;
547 debug!("Order created with reference: {}", result.deal_reference);
548 Ok(result)
549 }
550
551 async fn get_order_confirmation(
552 &self,
553 deal_reference: &str,
554 ) -> Result<OrderConfirmationResponse, AppError> {
555 let path = format!("confirms/{}", deal_reference);
556 info!("Getting confirmation for order: {}", deal_reference);
557 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
558 debug!("Confirmation obtained for order: {}", deal_reference);
559 Ok(result)
560 }
561
562 async fn get_order_confirmation_w_retry(
563 &self,
564 deal_reference: &str,
565 retries: u64,
566 delay_ms: u64,
567 ) -> Result<OrderConfirmationResponse, AppError> {
568 let mut attempts = 0;
569 loop {
570 match self.get_order_confirmation(deal_reference).await {
571 Ok(response) => return Ok(response),
572 Err(e) => {
573 attempts += 1;
574 if attempts > retries {
575 return Err(e);
576 }
577 warn!(
578 "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
579 attempts, retries, e, delay_ms
580 );
581 sleep(Duration::from_millis(delay_ms)).await;
582 }
583 }
584 }
585 }
586
587 async fn update_position(
588 &self,
589 deal_id: &str,
590 update: &UpdatePositionRequest,
591 ) -> Result<UpdatePositionResponse, AppError> {
592 let path = format!("positions/otc/{}", deal_id);
593 info!("Updating position: {}", deal_id);
594 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
595 debug!(
596 "Position updated: {} with deal reference: {}",
597 deal_id, result.deal_reference
598 );
599 Ok(result)
600 }
601
602 async fn close_position(
603 &self,
604 close_request: &ClosePositionRequest,
605 ) -> Result<ClosePositionResponse, AppError> {
606 info!("Closing position");
607
608 let result: ClosePositionResponse = self
611 .http_client
612 .post_with_delete_method("positions/otc", close_request, Some(1))
613 .await?;
614
615 debug!("Position closed with reference: {}", result.deal_reference);
616 Ok(result)
617 }
618
619 async fn create_working_order(
620 &self,
621 order: &CreateWorkingOrderRequest,
622 ) -> Result<CreateWorkingOrderResponse, AppError> {
623 info!("Creating working order for: {}", order.epic);
624 let result: CreateWorkingOrderResponse = self
625 .http_client
626 .post("workingorders/otc", order, Some(2))
627 .await?;
628 debug!(
629 "Working order created with reference: {}",
630 result.deal_reference
631 );
632 Ok(result)
633 }
634
635 async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
636 let path = format!("workingorders/otc/{}", deal_id);
637 let result: CreateWorkingOrderResponse =
638 self.http_client.delete(path.as_str(), Some(2)).await?;
639 debug!(
640 "Working order created with reference: {}",
641 result.deal_reference
642 );
643 Ok(())
644 }
645}
646
647pub struct StreamerClient {
657 account_id: String,
658 market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
659 price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
660 has_market_stream_subs: bool,
662 has_price_stream_subs: bool,
663}
664
665impl StreamerClient {
666 pub async fn new() -> Result<Self, AppError> {
675 let http_client_raw = Arc::new(RwLock::new(Client::new()));
676 let http_client = http_client_raw.read().await;
677 let ws_info = http_client.get_ws_info().await;
678 let password = ws_info.get_ws_password();
679
680 let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
682 Some(ws_info.server.as_str()),
683 None,
684 Some(&ws_info.account_id),
685 Some(&password),
686 )?));
687
688 let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
689 Some(ws_info.server.as_str()),
690 None,
691 Some(&ws_info.account_id),
692 Some(&password),
693 )?));
694
695 {
697 let mut client = market_streamer_client.lock().await;
698 client
699 .connection_options
700 .set_forced_transport(Some(Transport::WsStreaming));
701 }
702 {
703 let mut client = price_streamer_client.lock().await;
704 client
705 .connection_options
706 .set_forced_transport(Some(Transport::WsStreaming));
707 }
708
709 Ok(Self {
710 account_id: ws_info.account_id.clone(),
711 market_streamer_client: Some(market_streamer_client),
712 price_streamer_client: Some(price_streamer_client),
713 has_market_stream_subs: false,
714 has_price_stream_subs: false,
715 })
716 }
717
718 pub async fn default() -> Result<Self, AppError> {
720 Self::new().await
721 }
722
723 pub async fn market_subscribe(
753 &mut self,
754 epics: Vec<String>,
755 fields: HashSet<StreamingMarketField>,
756 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
757 self.has_market_stream_subs = true;
759
760 let fields = get_streaming_market_fields(&fields);
761 let market_epics: Vec<String> = epics
762 .iter()
763 .map(|epic| "MARKET:".to_string() + epic)
764 .collect();
765 let mut subscription =
766 Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
767
768 subscription.set_data_adapter(None)?;
769 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
770
771 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
773 subscription.add_listener(Box::new(listener));
774
775 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
777 AppError::WebSocketError("market streamer client not initialized".to_string())
778 })?;
779
780 {
781 let mut client = client.lock().await;
782 client
783 .connection_options
784 .set_forced_transport(Some(Transport::WsStreaming));
785 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
786 }
787
788 let (price_tx, price_rx) = mpsc::unbounded_channel();
790 tokio::spawn(async move {
791 let mut receiver = item_receiver;
792 while let Some(item_update) = receiver.recv().await {
793 let price_data = PriceData::from(&item_update);
794 let _ = price_tx.send(price_data);
795 }
796 });
797
798 info!(
799 "Market subscription created for {} instruments",
800 epics.len()
801 );
802 Ok(price_rx)
803 }
804
805 pub async fn trade_subscribe(
828 &mut self,
829 ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
830 self.has_market_stream_subs = true;
832
833 let account_id = self.account_id.clone();
834 let fields = Some(vec![
835 "CONFIRMS".to_string(),
836 "OPU".to_string(),
837 "WOU".to_string(),
838 ]);
839 let trade_items = vec![format!("TRADE:{account_id}")];
840
841 let mut subscription =
842 Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
843
844 subscription.set_data_adapter(None)?;
845 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
846
847 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
849 subscription.add_listener(Box::new(listener));
850
851 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
853 AppError::WebSocketError("market streamer client not initialized".to_string())
854 })?;
855
856 {
857 let mut client = client.lock().await;
858 client
859 .connection_options
860 .set_forced_transport(Some(Transport::WsStreaming));
861 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
862 }
863
864 let (trade_tx, trade_rx) = mpsc::unbounded_channel();
866 tokio::spawn(async move {
867 let mut receiver = item_receiver;
868 while let Some(item_update) = receiver.recv().await {
869 let trade_data = crate::presentation::trade::TradeData::from(&item_update);
870 let _ = trade_tx.send(trade_data.fields);
871 }
872 });
873
874 info!("Trade subscription created for account: {}", account_id);
875 Ok(trade_rx)
876 }
877
878 pub async fn account_subscribe(
905 &mut self,
906 fields: HashSet<StreamingAccountDataField>,
907 ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
908 self.has_market_stream_subs = true;
910
911 let fields = get_streaming_account_data_fields(&fields);
912 let account_id = self.account_id.clone();
913 let account_items = vec![format!("ACCOUNT:{account_id}")];
914
915 let mut subscription =
916 Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
917
918 subscription.set_data_adapter(None)?;
919 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
920
921 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
923 subscription.add_listener(Box::new(listener));
924
925 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
927 AppError::WebSocketError("market streamer client not initialized".to_string())
928 })?;
929
930 {
931 let mut client = client.lock().await;
932 client
933 .connection_options
934 .set_forced_transport(Some(Transport::WsStreaming));
935 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
936 }
937
938 let (account_tx, account_rx) = mpsc::unbounded_channel();
940 tokio::spawn(async move {
941 let mut receiver = item_receiver;
942 while let Some(item_update) = receiver.recv().await {
943 let account_data = crate::presentation::account::AccountData::from(&item_update);
944 let _ = account_tx.send(account_data.fields);
945 }
946 });
947
948 info!("Account subscription created for account: {}", account_id);
949 Ok(account_rx)
950 }
951
952 pub async fn price_subscribe(
983 &mut self,
984 epics: Vec<String>,
985 fields: HashSet<StreamingPriceField>,
986 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
987 self.has_price_stream_subs = true;
989
990 let fields = get_streaming_price_fields(&fields);
991 let account_id = self.account_id.clone();
992 let price_epics: Vec<String> = epics
993 .iter()
994 .map(|epic| format!("PRICE:{account_id}:{epic}"))
995 .collect();
996
997 tracing::debug!("Pricing subscribe items: {:?}", price_epics);
999 tracing::debug!("Pricing subscribe fields: {:?}", fields);
1000
1001 let mut subscription =
1002 Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1003
1004 let pricing_adapter =
1006 std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1007 tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1008 subscription.set_data_adapter(Some(pricing_adapter))?;
1009 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1010
1011 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1013 subscription.add_listener(Box::new(listener));
1014
1015 let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1017 AppError::WebSocketError("price streamer client not initialized".to_string())
1018 })?;
1019
1020 {
1021 let mut client = client.lock().await;
1022 client
1023 .connection_options
1024 .set_forced_transport(Some(Transport::WsStreaming));
1025 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1026 }
1027
1028 let (price_tx, price_rx) = mpsc::unbounded_channel();
1030 tokio::spawn(async move {
1031 let mut receiver = item_receiver;
1032 while let Some(item_update) = receiver.recv().await {
1033 let price_data = PriceData::from(&item_update);
1034 let _ = price_tx.send(price_data);
1035 }
1036 });
1037
1038 info!(
1039 "Price subscription created for {} instruments (account: {})",
1040 epics.len(),
1041 account_id
1042 );
1043 Ok(price_rx)
1044 }
1045
1046 pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1063 let signal = if let Some(sig) = shutdown_signal {
1065 sig
1066 } else {
1067 let sig = Arc::new(Notify::new());
1068 setup_signal_hook(Arc::clone(&sig)).await;
1069 sig
1070 };
1071
1072 let mut tasks = Vec::new();
1073
1074 if self.has_market_stream_subs {
1076 if let Some(client) = self.market_streamer_client.as_ref() {
1077 let client = Arc::clone(client);
1078 let signal = Arc::clone(&signal);
1079 let task =
1080 tokio::spawn(
1081 async move { Self::connect_client(client, signal, "Market").await },
1082 );
1083 tasks.push(task);
1084 }
1085 } else {
1086 info!("Skipping Market streamer connection: no active subscriptions");
1087 }
1088
1089 if self.has_price_stream_subs {
1091 if let Some(client) = self.price_streamer_client.as_ref() {
1092 let client = Arc::clone(client);
1093 let signal = Arc::clone(&signal);
1094 let task =
1095 tokio::spawn(
1096 async move { Self::connect_client(client, signal, "Price").await },
1097 );
1098 tasks.push(task);
1099 }
1100 } else {
1101 info!("Skipping Price streamer connection: no active subscriptions");
1102 }
1103
1104 if tasks.is_empty() {
1105 warn!("No streaming clients selected for connection (no active subscriptions)");
1106 return Ok(());
1107 }
1108
1109 info!("Connecting {} streaming client(s)...", tasks.len());
1110
1111 let results = futures::future::join_all(tasks).await;
1113
1114 let mut has_error = false;
1116 for (idx, result) in results.iter().enumerate() {
1117 match result {
1118 Ok(Ok(_)) => {
1119 debug!("Streaming client {} completed successfully", idx);
1120 }
1121 Ok(Err(e)) => {
1122 error!("Streaming client {} failed: {:?}", idx, e);
1123 has_error = true;
1124 }
1125 Err(e) => {
1126 error!("Streaming client {} task panicked: {:?}", idx, e);
1127 has_error = true;
1128 }
1129 }
1130 }
1131
1132 if has_error {
1133 return Err(AppError::WebSocketError(
1134 "one or more streaming connections failed".to_string(),
1135 ));
1136 }
1137
1138 info!("All streaming connections closed gracefully");
1139 Ok(())
1140 }
1141
1142 async fn connect_client(
1144 client: Arc<Mutex<LightstreamerClient>>,
1145 signal: Arc<Notify>,
1146 client_type: &str,
1147 ) -> Result<(), AppError> {
1148 let mut retry_interval_millis: u64 = 0;
1149 let mut retry_counter: u64 = 0;
1150
1151 while retry_counter < MAX_CONNECTION_ATTEMPTS {
1152 let connect_result = {
1153 let mut client = client.lock().await;
1154 client.connect_direct(Arc::clone(&signal)).await
1155 };
1156
1157 let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1159
1160 match result_with_string_error {
1161 Ok(_) => {
1162 info!("{} streamer connected successfully", client_type);
1163 break;
1164 }
1165 Err(error_msg) => {
1166 if error_msg.contains("No more requests to fulfill") {
1168 info!(
1169 "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1170 client_type
1171 );
1172 return Ok(());
1173 }
1174
1175 error!("{} streamer connection failed: {}", client_type, error_msg);
1176
1177 if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1178 tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1179 .await;
1180 retry_interval_millis =
1181 (retry_interval_millis + (200 * retry_counter)).min(5000);
1182 retry_counter += 1;
1183 warn!(
1184 "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1185 client_type,
1186 retry_counter + 1,
1187 MAX_CONNECTION_ATTEMPTS,
1188 retry_interval_millis as f64 / 1000.0
1189 );
1190 } else {
1191 retry_counter += 1;
1192 }
1193 }
1194 }
1195 }
1196
1197 if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1198 error!(
1199 "{} streamer failed after {} attempts",
1200 client_type, MAX_CONNECTION_ATTEMPTS
1201 );
1202 return Err(AppError::WebSocketError(format!(
1203 "{} streamer: maximum connection attempts ({}) exceeded",
1204 client_type, MAX_CONNECTION_ATTEMPTS
1205 )));
1206 }
1207
1208 info!("{} streamer connection closed gracefully", client_type);
1209 Ok(())
1210 }
1211
1212 pub async fn disconnect(&mut self) -> Result<(), AppError> {
1220 let mut disconnected = 0;
1221
1222 if let Some(client) = self.market_streamer_client.as_ref() {
1223 let mut client = client.lock().await;
1224 client.disconnect().await;
1225 info!("Market streamer disconnected");
1226 disconnected += 1;
1227 }
1228
1229 if let Some(client) = self.price_streamer_client.as_ref() {
1230 let mut client = client.lock().await;
1231 client.disconnect().await;
1232 info!("Price streamer disconnected");
1233 disconnected += 1;
1234 }
1235
1236 info!("Disconnected {} streaming client(s)", disconnected);
1237 Ok(())
1238 }
1239}