1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
18};
19use crate::model::responses::{
20 DBEntryResponse, HistoricalPricesResponse, MarketNavigationResponse, MarketSearchResponse,
21 MultipleMarketDetailsResponse,
22};
23use crate::model::streaming::{
24 StreamingAccountDataField, StreamingMarketField, StreamingPriceField,
25 get_streaming_account_data_fields, get_streaming_market_fields, get_streaming_price_fields,
26};
27use crate::prelude::{
28 AccountActivityResponse, AccountFields, AccountsResponse, OrderConfirmationResponse,
29 PositionsResponse, TradeFields, TransactionHistoryResponse, WorkingOrdersResponse,
30};
31use crate::presentation::market::{MarketData, MarketDetails};
32use crate::presentation::price::PriceData;
33use async_trait::async_trait;
34use lightstreamer_rs::client::{LightstreamerClient, Transport};
35use lightstreamer_rs::subscription::{
36 ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
37};
38use lightstreamer_rs::utils::setup_signal_hook;
39use serde_json::Value;
40use std::collections::HashSet;
41use std::sync::Arc;
42use tokio::sync::{Mutex, Notify, RwLock, mpsc};
43use tracing::{debug, error, info, warn};
44
45const MAX_CONNECTION_ATTEMPTS: u64 = 3;
46
47pub struct Client {
52 http_client: Arc<HttpClient>,
53}
54
55impl Client {
56 pub fn new() -> Self {
61 let http_client = Arc::new(HttpClient::default());
62 Self { http_client }
63 }
64
65 pub async fn get_ws_info(&self) -> WebsocketInfo {
70 self.http_client.get_ws_info().await
71 }
72}
73
74impl Default for Client {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80#[async_trait]
81impl MarketService for Client {
82 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
83 let path = format!("markets?searchTerm={}", search_term);
84 info!("Searching markets with term: {}", search_term);
85 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
86 debug!("{} markets found", result.markets.len());
87 Ok(result)
88 }
89
90 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
91 let path = format!("markets/{epic}");
92 info!("Getting market details: {}", epic);
93 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
94 let market_details: MarketDetails = serde_json::from_value(market_value)?;
95 debug!("Market details obtained for: {}", epic);
96 Ok(market_details)
97 }
98
99 async fn get_multiple_market_details(
100 &self,
101 epics: &[String],
102 ) -> Result<MultipleMarketDetailsResponse, AppError> {
103 if epics.is_empty() {
104 return Ok(MultipleMarketDetailsResponse::default());
105 } else if epics.len() > 50 {
106 return Err(AppError::InvalidInput(
107 "The maximum number of EPICs is 50".to_string(),
108 ));
109 }
110
111 let epics_str = epics.join(",");
112 let path = format!("markets?epics={}", epics_str);
113 debug!(
114 "Getting market details for {} EPICs in a batch",
115 epics.len()
116 );
117
118 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
119
120 Ok(response)
121 }
122
123 async fn get_historical_prices(
124 &self,
125 epic: &str,
126 resolution: &str,
127 from: &str,
128 to: &str,
129 ) -> Result<HistoricalPricesResponse, AppError> {
130 let path = format!(
131 "prices/{}?resolution={}&from={}&to={}",
132 epic, resolution, from, to
133 );
134 info!("Getting historical prices for: {}", epic);
135 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
136 debug!("Historical prices obtained for: {}", epic);
137 Ok(result)
138 }
139
140 async fn get_historical_prices_by_date_range(
141 &self,
142 epic: &str,
143 resolution: &str,
144 start_date: &str,
145 end_date: &str,
146 ) -> Result<HistoricalPricesResponse, AppError> {
147 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
148 info!(
149 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
150 epic, resolution, start_date, end_date
151 );
152 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
153 debug!(
154 "Historical prices obtained for epic: {}, {} data points",
155 epic,
156 result.prices.len()
157 );
158 Ok(result)
159 }
160
161 async fn get_recent_prices(
162 &self,
163 params: &RecentPricesRequest<'_>,
164 ) -> Result<HistoricalPricesResponse, AppError> {
165 let mut query_params = Vec::new();
166
167 if let Some(res) = params.resolution {
168 query_params.push(format!("resolution={}", res));
169 }
170 if let Some(f) = params.from {
171 query_params.push(format!("from={}", f));
172 }
173 if let Some(t) = params.to {
174 query_params.push(format!("to={}", t));
175 }
176 if let Some(max) = params.max_points {
177 query_params.push(format!("max={}", max));
178 }
179 if let Some(size) = params.page_size {
180 query_params.push(format!("pageSize={}", size));
181 }
182 if let Some(num) = params.page_number {
183 query_params.push(format!("pageNumber={}", num));
184 }
185
186 let query_string = if query_params.is_empty() {
187 String::new()
188 } else {
189 format!("?{}", query_params.join("&"))
190 };
191
192 let path = format!("prices/{}{}", params.epic, query_string);
193 info!("Getting recent prices for epic: {}", params.epic);
194 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
195 debug!(
196 "Recent prices obtained for epic: {}, {} data points",
197 params.epic,
198 result.prices.len()
199 );
200 Ok(result)
201 }
202
203 async fn get_historical_prices_by_count_v1(
204 &self,
205 epic: &str,
206 resolution: &str,
207 num_points: i32,
208 ) -> Result<HistoricalPricesResponse, AppError> {
209 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
210 info!(
211 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
212 epic, resolution, num_points
213 );
214 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
215 debug!(
216 "Historical prices (v1) obtained for epic: {}, {} data points",
217 epic,
218 result.prices.len()
219 );
220 Ok(result)
221 }
222
223 async fn get_historical_prices_by_count_v2(
224 &self,
225 epic: &str,
226 resolution: &str,
227 num_points: i32,
228 ) -> Result<HistoricalPricesResponse, AppError> {
229 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
230 info!(
231 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
232 epic, resolution, num_points
233 );
234 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
235 debug!(
236 "Historical prices (v2) obtained for epic: {}, {} data points",
237 epic,
238 result.prices.len()
239 );
240 Ok(result)
241 }
242
243 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
244 let path = "marketnavigation";
245 info!("Getting top-level market navigation nodes");
246 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
247 debug!("{} navigation nodes found", result.nodes.len());
248 debug!("{} markets found at root level", result.markets.len());
249 Ok(result)
250 }
251
252 async fn get_market_navigation_node(
253 &self,
254 node_id: &str,
255 ) -> Result<MarketNavigationResponse, AppError> {
256 let path = format!("marketnavigation/{}", node_id);
257 info!("Getting market navigation node: {}", node_id);
258 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
259 debug!("{} child nodes found", result.nodes.len());
260 debug!("{} markets found in node {}", result.markets.len(), node_id);
261 Ok(result)
262 }
263
264 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
265 let max_depth = 6;
266 info!(
267 "Starting comprehensive market hierarchy traversal (max {} levels)",
268 max_depth
269 );
270
271 let root_response = self.get_market_navigation().await?;
272 info!(
273 "Root navigation: {} nodes, {} markets at top level",
274 root_response.nodes.len(),
275 root_response.markets.len()
276 );
277
278 let mut all_markets = root_response.markets.clone();
279 let mut nodes_to_process = root_response.nodes.clone();
280 let mut processed_levels = 0;
281
282 while !nodes_to_process.is_empty() && processed_levels < max_depth {
283 let mut next_level_nodes = Vec::new();
284 let mut level_market_count = 0;
285
286 info!(
287 "Processing level {} with {} nodes",
288 processed_levels,
289 nodes_to_process.len()
290 );
291
292 for node in &nodes_to_process {
293 match self.get_market_navigation_node(&node.id).await {
294 Ok(node_response) => {
295 let node_markets = node_response.markets.len();
296 let node_children = node_response.nodes.len();
297
298 if node_markets > 0 || node_children > 0 {
299 debug!(
300 "Node '{}' (level {}): {} markets, {} child nodes",
301 node.name, processed_levels, node_markets, node_children
302 );
303 }
304
305 all_markets.extend(node_response.markets);
306 level_market_count += node_markets;
307 next_level_nodes.extend(node_response.nodes);
308 }
309 Err(e) => {
310 tracing::error!(
311 "Failed to get markets for node '{}' at level {}: {:?}",
312 node.name,
313 processed_levels,
314 e
315 );
316 }
317 }
318 }
319
320 info!(
321 "Level {} completed: {} markets found, {} nodes for next level",
322 processed_levels,
323 level_market_count,
324 next_level_nodes.len()
325 );
326
327 nodes_to_process = next_level_nodes;
328 processed_levels += 1;
329 }
330
331 info!(
332 "Market hierarchy traversal completed: {} total markets found across {} levels",
333 all_markets.len(),
334 processed_levels
335 );
336
337 Ok(all_markets)
338 }
339
340 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
341 info!("Getting all markets from hierarchy for DB entries");
342
343 let all_markets = self.get_all_markets().await?;
344 info!("Collected {} markets from hierarchy", all_markets.len());
345
346 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
347 .iter()
348 .map(DBEntryResponse::from)
349 .filter(|entry| !entry.epic.is_empty())
350 .collect();
351
352 info!("Created {} DB entries from markets", vec_db_entries.len());
353
354 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
356 .iter()
357 .map(|entry| entry.symbol.clone())
358 .filter(|symbol| !symbol.is_empty())
359 .collect();
360
361 info!(
362 "Found {} unique symbols to fetch expiry dates for",
363 unique_symbols.len()
364 );
365
366 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
367 std::collections::HashMap::new();
368
369 for symbol in unique_symbols {
370 if let Some(entry) = vec_db_entries
371 .iter()
372 .find(|e| e.symbol == symbol && !e.epic.is_empty())
373 {
374 match self.get_market_details(&entry.epic).await {
375 Ok(market_details) => {
376 let expiry_date = market_details
377 .instrument
378 .expiry_details
379 .as_ref()
380 .map(|details| details.last_dealing_date.clone())
381 .unwrap_or_else(|| market_details.instrument.expiry.clone());
382
383 symbol_expiry_map.insert(symbol.clone(), expiry_date);
384 info!(
385 "Fetched expiry date for symbol {}: {}",
386 symbol,
387 symbol_expiry_map.get(&symbol).unwrap()
388 );
389 }
390 Err(e) => {
391 tracing::error!(
392 "Failed to get market details for epic {} (symbol {}): {:?}",
393 entry.epic,
394 symbol,
395 e
396 );
397 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
398 }
399 }
400 }
401 }
402
403 for entry in &mut vec_db_entries {
404 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
405 entry.expiry = expiry_date.clone();
406 }
407 }
408
409 info!("Updated expiry dates for {} entries", vec_db_entries.len());
410 Ok(vec_db_entries)
411 }
412}
413
414#[async_trait]
415impl AccountService for Client {
416 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
417 info!("Getting account information");
418 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
419 debug!(
420 "Account information obtained: {} accounts",
421 result.accounts.len()
422 );
423 Ok(result)
424 }
425
426 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
427 debug!("Getting open positions");
428 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
429 debug!("Positions obtained: {} positions", result.positions.len());
430 Ok(result)
431 }
432
433 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
434 debug!("Getting open positions with filter: {}", filter);
435 let mut positions = self.get_positions().await?;
436
437 positions
438 .positions
439 .retain(|position| position.market.epic.contains(filter));
440
441 debug!(
442 "Positions obtained after filtering: {} positions",
443 positions.positions.len()
444 );
445 Ok(positions)
446 }
447
448 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
449 info!("Getting working orders");
450 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
451 debug!(
452 "Working orders obtained: {} orders",
453 result.working_orders.len()
454 );
455 Ok(result)
456 }
457
458 async fn get_activity(
459 &self,
460 from: &str,
461 to: &str,
462 ) -> Result<AccountActivityResponse, AppError> {
463 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
464 info!("Getting account activity");
465 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
466 debug!(
467 "Account activity obtained: {} activities",
468 result.activities.len()
469 );
470 Ok(result)
471 }
472
473 async fn get_activity_with_details(
474 &self,
475 from: &str,
476 to: &str,
477 ) -> Result<AccountActivityResponse, AppError> {
478 let path = format!(
479 "history/activity?from={}&to={}&detailed=true&pageSize=500",
480 from, to
481 );
482 info!("Getting detailed account activity");
483 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
484 debug!(
485 "Detailed account activity obtained: {} activities",
486 result.activities.len()
487 );
488 Ok(result)
489 }
490
491 async fn get_transactions(
492 &self,
493 from: &str,
494 to: &str,
495 ) -> Result<TransactionHistoryResponse, AppError> {
496 const PAGE_SIZE: u32 = 200;
497 let mut all_transactions = Vec::new();
498 let mut current_page = 1;
499 #[allow(unused_assignments)]
500 let mut last_metadata = None;
501
502 loop {
503 let path = format!(
504 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
505 from, to, PAGE_SIZE, current_page
506 );
507 info!("Getting transaction history page {}", current_page);
508
509 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
510
511 let total_pages = result.metadata.page_data.total_pages as u32;
512 last_metadata = Some(result.metadata);
513 all_transactions.extend(result.transactions);
514
515 if current_page >= total_pages {
516 break;
517 }
518 current_page += 1;
519 }
520
521 debug!(
522 "Total transaction history obtained: {} transactions",
523 all_transactions.len()
524 );
525
526 Ok(TransactionHistoryResponse {
527 transactions: all_transactions,
528 metadata: last_metadata
529 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
530 })
531 }
532}
533
534#[async_trait]
535impl OrderService for Client {
536 async fn create_order(
537 &self,
538 order: &CreateOrderRequest,
539 ) -> Result<CreateOrderResponse, AppError> {
540 info!("Creating order for: {}", order.epic);
541 let result: CreateOrderResponse = self
542 .http_client
543 .post("positions/otc", order, Some(2))
544 .await?;
545 debug!("Order created with reference: {}", result.deal_reference);
546 Ok(result)
547 }
548
549 async fn get_order_confirmation(
550 &self,
551 deal_reference: &str,
552 ) -> Result<OrderConfirmationResponse, AppError> {
553 let path = format!("confirms/{}", deal_reference);
554 info!("Getting confirmation for order: {}", deal_reference);
555 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
556 debug!("Confirmation obtained for order: {}", deal_reference);
557 Ok(result)
558 }
559
560 async fn update_position(
561 &self,
562 deal_id: &str,
563 update: &UpdatePositionRequest,
564 ) -> Result<UpdatePositionResponse, AppError> {
565 let path = format!("positions/otc/{}", deal_id);
566 info!("Updating position: {}", deal_id);
567 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
568 debug!(
569 "Position updated: {} with deal reference: {}",
570 deal_id, result.deal_reference
571 );
572 Ok(result)
573 }
574
575 async fn close_position(
576 &self,
577 close_request: &ClosePositionRequest,
578 ) -> Result<ClosePositionResponse, AppError> {
579 info!("Closing position");
580
581 let result: ClosePositionResponse = self
584 .http_client
585 .post_with_delete_method("positions/otc", close_request, Some(1))
586 .await?;
587
588 debug!("Position closed with reference: {}", result.deal_reference);
589 Ok(result)
590 }
591
592 async fn create_working_order(
593 &self,
594 order: &CreateWorkingOrderRequest,
595 ) -> Result<CreateWorkingOrderResponse, AppError> {
596 info!("Creating working order for: {}", order.epic);
597 let result: CreateWorkingOrderResponse = self
598 .http_client
599 .post("workingorders/otc", order, Some(2))
600 .await?;
601 debug!(
602 "Working order created with reference: {}",
603 result.deal_reference
604 );
605 Ok(result)
606 }
607
608 async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
609 let path = format!("workingorders/otc/{}", deal_id);
610 let result: CreateWorkingOrderResponse =
611 self.http_client.delete(path.as_str(), Some(2)).await?;
612 debug!(
613 "Working order created with reference: {}",
614 result.deal_reference
615 );
616 Ok(())
617 }
618}
619
620pub struct StreamerClient {
630 account_id: String,
631 market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
632 price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
633 has_market_stream_subs: bool,
635 has_price_stream_subs: bool,
636}
637
638impl StreamerClient {
639 pub async fn new() -> Result<Self, AppError> {
648 let http_client_raw = Arc::new(RwLock::new(Client::new()));
649 let http_client = http_client_raw.read().await;
650 let ws_info = http_client.get_ws_info().await;
651 let password = ws_info.get_ws_password();
652
653 let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
655 Some(ws_info.server.as_str()),
656 None,
657 Some(&ws_info.account_id),
658 Some(&password),
659 )?));
660
661 let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
662 Some(ws_info.server.as_str()),
663 None,
664 Some(&ws_info.account_id),
665 Some(&password),
666 )?));
667
668 {
670 let mut client = market_streamer_client.lock().await;
671 client
672 .connection_options
673 .set_forced_transport(Some(Transport::WsStreaming));
674 }
675 {
676 let mut client = price_streamer_client.lock().await;
677 client
678 .connection_options
679 .set_forced_transport(Some(Transport::WsStreaming));
680 }
681
682 Ok(Self {
683 account_id: ws_info.account_id.clone(),
684 market_streamer_client: Some(market_streamer_client),
685 price_streamer_client: Some(price_streamer_client),
686 has_market_stream_subs: false,
687 has_price_stream_subs: false,
688 })
689 }
690
691 pub async fn default() -> Result<Self, AppError> {
693 Self::new().await
694 }
695
696 pub async fn market_subscribe(
726 &mut self,
727 epics: Vec<String>,
728 fields: HashSet<StreamingMarketField>,
729 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
730 self.has_market_stream_subs = true;
732
733 let fields = get_streaming_market_fields(&fields);
734 let market_epics: Vec<String> = epics
735 .iter()
736 .map(|epic| "MARKET:".to_string() + epic)
737 .collect();
738 let mut subscription =
739 Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
740
741 subscription.set_data_adapter(None)?;
742 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
743
744 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
746 subscription.add_listener(Box::new(listener));
747
748 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
750 AppError::WebSocketError("market streamer client not initialized".to_string())
751 })?;
752
753 {
754 let mut client = client.lock().await;
755 client
756 .connection_options
757 .set_forced_transport(Some(Transport::WsStreaming));
758 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
759 }
760
761 let (price_tx, price_rx) = mpsc::unbounded_channel();
763 tokio::spawn(async move {
764 let mut receiver = item_receiver;
765 while let Some(item_update) = receiver.recv().await {
766 let price_data = PriceData::from(&item_update);
767 let _ = price_tx.send(price_data);
768 }
769 });
770
771 info!(
772 "Market subscription created for {} instruments",
773 epics.len()
774 );
775 Ok(price_rx)
776 }
777
778 pub async fn trade_subscribe(
801 &mut self,
802 ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
803 self.has_market_stream_subs = true;
805
806 let account_id = self.account_id.clone();
807 let fields = Some(vec![
808 "CONFIRMS".to_string(),
809 "OPU".to_string(),
810 "WOU".to_string(),
811 ]);
812 let trade_items = vec![format!("TRADE:{account_id}")];
813
814 let mut subscription =
815 Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
816
817 subscription.set_data_adapter(None)?;
818 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
819
820 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
822 subscription.add_listener(Box::new(listener));
823
824 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
826 AppError::WebSocketError("market streamer client not initialized".to_string())
827 })?;
828
829 {
830 let mut client = client.lock().await;
831 client
832 .connection_options
833 .set_forced_transport(Some(Transport::WsStreaming));
834 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
835 }
836
837 let (trade_tx, trade_rx) = mpsc::unbounded_channel();
839 tokio::spawn(async move {
840 let mut receiver = item_receiver;
841 while let Some(item_update) = receiver.recv().await {
842 let trade_data = crate::presentation::trade::TradeData::from(&item_update);
843 let _ = trade_tx.send(trade_data.fields);
844 }
845 });
846
847 info!("Trade subscription created for account: {}", account_id);
848 Ok(trade_rx)
849 }
850
851 pub async fn account_subscribe(
878 &mut self,
879 fields: HashSet<StreamingAccountDataField>,
880 ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
881 self.has_market_stream_subs = true;
883
884 let fields = get_streaming_account_data_fields(&fields);
885 let account_id = self.account_id.clone();
886 let account_items = vec![format!("ACCOUNT:{account_id}")];
887
888 let mut subscription =
889 Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
890
891 subscription.set_data_adapter(None)?;
892 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
893
894 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
896 subscription.add_listener(Box::new(listener));
897
898 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
900 AppError::WebSocketError("market streamer client not initialized".to_string())
901 })?;
902
903 {
904 let mut client = client.lock().await;
905 client
906 .connection_options
907 .set_forced_transport(Some(Transport::WsStreaming));
908 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
909 }
910
911 let (account_tx, account_rx) = mpsc::unbounded_channel();
913 tokio::spawn(async move {
914 let mut receiver = item_receiver;
915 while let Some(item_update) = receiver.recv().await {
916 let account_data = crate::presentation::account::AccountData::from(&item_update);
917 let _ = account_tx.send(account_data.fields);
918 }
919 });
920
921 info!("Account subscription created for account: {}", account_id);
922 Ok(account_rx)
923 }
924
925 pub async fn price_subscribe(
956 &mut self,
957 epics: Vec<String>,
958 fields: HashSet<StreamingPriceField>,
959 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
960 self.has_price_stream_subs = true;
962
963 let fields = get_streaming_price_fields(&fields);
964 let account_id = self.account_id.clone();
965 let price_epics: Vec<String> = epics
966 .iter()
967 .map(|epic| format!("PRICE:{account_id}:{epic}"))
968 .collect();
969
970 tracing::debug!("Pricing subscribe items: {:?}", price_epics);
972 tracing::debug!("Pricing subscribe fields: {:?}", fields);
973
974 let mut subscription =
975 Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
976
977 let pricing_adapter =
979 std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
980 tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
981 subscription.set_data_adapter(Some(pricing_adapter))?;
982 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
983
984 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
986 subscription.add_listener(Box::new(listener));
987
988 let client = self.price_streamer_client.as_ref().ok_or_else(|| {
990 AppError::WebSocketError("price streamer client not initialized".to_string())
991 })?;
992
993 {
994 let mut client = client.lock().await;
995 client
996 .connection_options
997 .set_forced_transport(Some(Transport::WsStreaming));
998 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
999 }
1000
1001 let (price_tx, price_rx) = mpsc::unbounded_channel();
1003 tokio::spawn(async move {
1004 let mut receiver = item_receiver;
1005 while let Some(item_update) = receiver.recv().await {
1006 let price_data = PriceData::from(&item_update);
1007 let _ = price_tx.send(price_data);
1008 }
1009 });
1010
1011 info!(
1012 "Price subscription created for {} instruments (account: {})",
1013 epics.len(),
1014 account_id
1015 );
1016 Ok(price_rx)
1017 }
1018
1019 pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1036 let signal = if let Some(sig) = shutdown_signal {
1038 sig
1039 } else {
1040 let sig = Arc::new(Notify::new());
1041 setup_signal_hook(Arc::clone(&sig)).await;
1042 sig
1043 };
1044
1045 let mut tasks = Vec::new();
1046
1047 if self.has_market_stream_subs {
1049 if let Some(client) = self.market_streamer_client.as_ref() {
1050 let client = Arc::clone(client);
1051 let signal = Arc::clone(&signal);
1052 let task =
1053 tokio::spawn(
1054 async move { Self::connect_client(client, signal, "Market").await },
1055 );
1056 tasks.push(task);
1057 }
1058 } else {
1059 info!("Skipping Market streamer connection: no active subscriptions");
1060 }
1061
1062 if self.has_price_stream_subs {
1064 if let Some(client) = self.price_streamer_client.as_ref() {
1065 let client = Arc::clone(client);
1066 let signal = Arc::clone(&signal);
1067 let task =
1068 tokio::spawn(
1069 async move { Self::connect_client(client, signal, "Price").await },
1070 );
1071 tasks.push(task);
1072 }
1073 } else {
1074 info!("Skipping Price streamer connection: no active subscriptions");
1075 }
1076
1077 if tasks.is_empty() {
1078 warn!("No streaming clients selected for connection (no active subscriptions)");
1079 return Ok(());
1080 }
1081
1082 info!("Connecting {} streaming client(s)...", tasks.len());
1083
1084 let results = futures::future::join_all(tasks).await;
1086
1087 let mut has_error = false;
1089 for (idx, result) in results.iter().enumerate() {
1090 match result {
1091 Ok(Ok(_)) => {
1092 debug!("Streaming client {} completed successfully", idx);
1093 }
1094 Ok(Err(e)) => {
1095 error!("Streaming client {} failed: {:?}", idx, e);
1096 has_error = true;
1097 }
1098 Err(e) => {
1099 error!("Streaming client {} task panicked: {:?}", idx, e);
1100 has_error = true;
1101 }
1102 }
1103 }
1104
1105 if has_error {
1106 return Err(AppError::WebSocketError(
1107 "one or more streaming connections failed".to_string(),
1108 ));
1109 }
1110
1111 info!("All streaming connections closed gracefully");
1112 Ok(())
1113 }
1114
1115 async fn connect_client(
1117 client: Arc<Mutex<LightstreamerClient>>,
1118 signal: Arc<Notify>,
1119 client_type: &str,
1120 ) -> Result<(), AppError> {
1121 let mut retry_interval_millis: u64 = 0;
1122 let mut retry_counter: u64 = 0;
1123
1124 while retry_counter < MAX_CONNECTION_ATTEMPTS {
1125 let connect_result = {
1126 let mut client = client.lock().await;
1127 client.connect_direct(Arc::clone(&signal)).await
1128 };
1129
1130 let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1132
1133 match result_with_string_error {
1134 Ok(_) => {
1135 info!("{} streamer connected successfully", client_type);
1136 break;
1137 }
1138 Err(error_msg) => {
1139 if error_msg.contains("No more requests to fulfill") {
1141 info!(
1142 "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1143 client_type
1144 );
1145 return Ok(());
1146 }
1147
1148 error!("{} streamer connection failed: {}", client_type, error_msg);
1149
1150 if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1151 tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1152 .await;
1153 retry_interval_millis =
1154 (retry_interval_millis + (200 * retry_counter)).min(5000);
1155 retry_counter += 1;
1156 warn!(
1157 "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1158 client_type,
1159 retry_counter + 1,
1160 MAX_CONNECTION_ATTEMPTS,
1161 retry_interval_millis as f64 / 1000.0
1162 );
1163 } else {
1164 retry_counter += 1;
1165 }
1166 }
1167 }
1168 }
1169
1170 if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1171 error!(
1172 "{} streamer failed after {} attempts",
1173 client_type, MAX_CONNECTION_ATTEMPTS
1174 );
1175 return Err(AppError::WebSocketError(format!(
1176 "{} streamer: maximum connection attempts ({}) exceeded",
1177 client_type, MAX_CONNECTION_ATTEMPTS
1178 )));
1179 }
1180
1181 info!("{} streamer connection closed gracefully", client_type);
1182 Ok(())
1183 }
1184
1185 pub async fn disconnect(&mut self) -> Result<(), AppError> {
1193 let mut disconnected = 0;
1194
1195 if let Some(client) = self.market_streamer_client.as_ref() {
1196 let mut client = client.lock().await;
1197 client.disconnect().await;
1198 info!("Market streamer disconnected");
1199 disconnected += 1;
1200 }
1201
1202 if let Some(client) = self.price_streamer_client.as_ref() {
1203 let mut client = client.lock().await;
1204 client.disconnect().await;
1205 info!("Price streamer disconnected");
1206 disconnected += 1;
1207 }
1208
1209 info!("Disconnected {} streaming client(s)", disconnected);
1210 Ok(())
1211 }
1212}