1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
18};
19use crate::model::responses::{
20 DBEntryResponse, HistoricalPricesResponse, MarketNavigationResponse, MarketSearchResponse,
21 MultipleMarketDetailsResponse,
22};
23use crate::model::streaming::{
24 StreamingAccountDataField, StreamingChartField, StreamingMarketField, StreamingPriceField,
25 get_streaming_account_data_fields, get_streaming_chart_fields, get_streaming_market_fields,
26 get_streaming_price_fields,
27};
28use crate::prelude::{
29 AccountActivityResponse, AccountFields, AccountsResponse, ChartData, ChartScale,
30 OrderConfirmationResponse, PositionsResponse, TradeFields, TransactionHistoryResponse,
31 WorkingOrdersResponse,
32};
33use crate::presentation::market::{MarketData, MarketDetails};
34use crate::presentation::price::PriceData;
35use async_trait::async_trait;
36use lightstreamer_rs::client::{LightstreamerClient, Transport};
37use lightstreamer_rs::subscription::{
38 ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
39};
40use lightstreamer_rs::utils::setup_signal_hook;
41use serde_json::Value;
42use std::collections::HashSet;
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::{Mutex, Notify, RwLock, mpsc};
46use tokio::time::sleep;
47use tracing::{debug, error, info, warn};
48
49const MAX_CONNECTION_ATTEMPTS: u64 = 3;
50
51pub struct Client {
56 http_client: Arc<HttpClient>,
57}
58
59impl Client {
60 pub fn new() -> Self {
65 let http_client = Arc::new(HttpClient::default());
66 Self { http_client }
67 }
68
69 pub async fn get_ws_info(&self) -> WebsocketInfo {
74 self.http_client.get_ws_info().await
75 }
76}
77
78impl Default for Client {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84#[async_trait]
85impl MarketService for Client {
86 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
87 let path = format!("markets?searchTerm={}", search_term);
88 info!("Searching markets with term: {}", search_term);
89 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
90 debug!("{} markets found", result.markets.len());
91 Ok(result)
92 }
93
94 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
95 let path = format!("markets/{epic}");
96 info!("Getting market details: {}", epic);
97 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
98 let market_details: MarketDetails = serde_json::from_value(market_value)?;
99 debug!("Market details obtained for: {}", epic);
100 Ok(market_details)
101 }
102
103 async fn get_multiple_market_details(
104 &self,
105 epics: &[String],
106 ) -> Result<MultipleMarketDetailsResponse, AppError> {
107 if epics.is_empty() {
108 return Ok(MultipleMarketDetailsResponse::default());
109 } else if epics.len() > 50 {
110 return Err(AppError::InvalidInput(
111 "The maximum number of EPICs is 50".to_string(),
112 ));
113 }
114
115 let epics_str = epics.join(",");
116 let path = format!("markets?epics={}", epics_str);
117 debug!(
118 "Getting market details for {} EPICs in a batch",
119 epics.len()
120 );
121
122 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
123
124 Ok(response)
125 }
126
127 async fn get_historical_prices(
128 &self,
129 epic: &str,
130 resolution: &str,
131 from: &str,
132 to: &str,
133 ) -> Result<HistoricalPricesResponse, AppError> {
134 let path = format!(
135 "prices/{}?resolution={}&from={}&to={}",
136 epic, resolution, from, to
137 );
138 info!("Getting historical prices for: {}", epic);
139 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
140 debug!("Historical prices obtained for: {}", epic);
141 Ok(result)
142 }
143
144 async fn get_historical_prices_by_date_range(
145 &self,
146 epic: &str,
147 resolution: &str,
148 start_date: &str,
149 end_date: &str,
150 ) -> Result<HistoricalPricesResponse, AppError> {
151 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
152 info!(
153 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
154 epic, resolution, start_date, end_date
155 );
156 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
157 debug!(
158 "Historical prices obtained for epic: {}, {} data points",
159 epic,
160 result.prices.len()
161 );
162 Ok(result)
163 }
164
165 async fn get_recent_prices(
166 &self,
167 params: &RecentPricesRequest<'_>,
168 ) -> Result<HistoricalPricesResponse, AppError> {
169 let mut query_params = Vec::new();
170
171 if let Some(res) = params.resolution {
172 query_params.push(format!("resolution={}", res));
173 }
174 if let Some(f) = params.from {
175 query_params.push(format!("from={}", f));
176 }
177 if let Some(t) = params.to {
178 query_params.push(format!("to={}", t));
179 }
180 if let Some(max) = params.max_points {
181 query_params.push(format!("max={}", max));
182 }
183 if let Some(size) = params.page_size {
184 query_params.push(format!("pageSize={}", size));
185 }
186 if let Some(num) = params.page_number {
187 query_params.push(format!("pageNumber={}", num));
188 }
189
190 let query_string = if query_params.is_empty() {
191 String::new()
192 } else {
193 format!("?{}", query_params.join("&"))
194 };
195
196 let path = format!("prices/{}{}", params.epic, query_string);
197 info!("Getting recent prices for epic: {}", params.epic);
198 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
199 debug!(
200 "Recent prices obtained for epic: {}, {} data points",
201 params.epic,
202 result.prices.len()
203 );
204 Ok(result)
205 }
206
207 async fn get_historical_prices_by_count_v1(
208 &self,
209 epic: &str,
210 resolution: &str,
211 num_points: i32,
212 ) -> Result<HistoricalPricesResponse, AppError> {
213 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
214 info!(
215 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
216 epic, resolution, num_points
217 );
218 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
219 debug!(
220 "Historical prices (v1) obtained for epic: {}, {} data points",
221 epic,
222 result.prices.len()
223 );
224 Ok(result)
225 }
226
227 async fn get_historical_prices_by_count_v2(
228 &self,
229 epic: &str,
230 resolution: &str,
231 num_points: i32,
232 ) -> Result<HistoricalPricesResponse, AppError> {
233 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
234 info!(
235 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
236 epic, resolution, num_points
237 );
238 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
239 debug!(
240 "Historical prices (v2) obtained for epic: {}, {} data points",
241 epic,
242 result.prices.len()
243 );
244 Ok(result)
245 }
246
247 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
248 let path = "marketnavigation";
249 info!("Getting top-level market navigation nodes");
250 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
251 debug!("{} navigation nodes found", result.nodes.len());
252 debug!("{} markets found at root level", result.markets.len());
253 Ok(result)
254 }
255
256 async fn get_market_navigation_node(
257 &self,
258 node_id: &str,
259 ) -> Result<MarketNavigationResponse, AppError> {
260 let path = format!("marketnavigation/{}", node_id);
261 info!("Getting market navigation node: {}", node_id);
262 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
263 debug!("{} child nodes found", result.nodes.len());
264 debug!("{} markets found in node {}", result.markets.len(), node_id);
265 Ok(result)
266 }
267
268 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
269 let max_depth = 6;
270 info!(
271 "Starting comprehensive market hierarchy traversal (max {} levels)",
272 max_depth
273 );
274
275 let root_response = self.get_market_navigation().await?;
276 info!(
277 "Root navigation: {} nodes, {} markets at top level",
278 root_response.nodes.len(),
279 root_response.markets.len()
280 );
281
282 let mut all_markets = root_response.markets.clone();
283 let mut nodes_to_process = root_response.nodes.clone();
284 let mut processed_levels = 0;
285
286 while !nodes_to_process.is_empty() && processed_levels < max_depth {
287 let mut next_level_nodes = Vec::new();
288 let mut level_market_count = 0;
289
290 info!(
291 "Processing level {} with {} nodes",
292 processed_levels,
293 nodes_to_process.len()
294 );
295
296 for node in &nodes_to_process {
297 match self.get_market_navigation_node(&node.id).await {
298 Ok(node_response) => {
299 let node_markets = node_response.markets.len();
300 let node_children = node_response.nodes.len();
301
302 if node_markets > 0 || node_children > 0 {
303 debug!(
304 "Node '{}' (level {}): {} markets, {} child nodes",
305 node.name, processed_levels, node_markets, node_children
306 );
307 }
308
309 all_markets.extend(node_response.markets);
310 level_market_count += node_markets;
311 next_level_nodes.extend(node_response.nodes);
312 }
313 Err(e) => {
314 tracing::error!(
315 "Failed to get markets for node '{}' at level {}: {:?}",
316 node.name,
317 processed_levels,
318 e
319 );
320 }
321 }
322 }
323
324 info!(
325 "Level {} completed: {} markets found, {} nodes for next level",
326 processed_levels,
327 level_market_count,
328 next_level_nodes.len()
329 );
330
331 nodes_to_process = next_level_nodes;
332 processed_levels += 1;
333 }
334
335 info!(
336 "Market hierarchy traversal completed: {} total markets found across {} levels",
337 all_markets.len(),
338 processed_levels
339 );
340
341 Ok(all_markets)
342 }
343
344 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
345 info!("Getting all markets from hierarchy for DB entries");
346
347 let all_markets = self.get_all_markets().await?;
348 info!("Collected {} markets from hierarchy", all_markets.len());
349
350 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
351 .iter()
352 .map(DBEntryResponse::from)
353 .filter(|entry| !entry.epic.is_empty())
354 .collect();
355
356 info!("Created {} DB entries from markets", vec_db_entries.len());
357
358 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
360 .iter()
361 .map(|entry| entry.symbol.clone())
362 .filter(|symbol| !symbol.is_empty())
363 .collect();
364
365 info!(
366 "Found {} unique symbols to fetch expiry dates for",
367 unique_symbols.len()
368 );
369
370 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
371 std::collections::HashMap::new();
372
373 for symbol in unique_symbols {
374 if let Some(entry) = vec_db_entries
375 .iter()
376 .find(|e| e.symbol == symbol && !e.epic.is_empty())
377 {
378 match self.get_market_details(&entry.epic).await {
379 Ok(market_details) => {
380 let expiry_date = market_details
381 .instrument
382 .expiry_details
383 .as_ref()
384 .map(|details| details.last_dealing_date.clone())
385 .unwrap_or_else(|| market_details.instrument.expiry.clone());
386
387 symbol_expiry_map.insert(symbol.clone(), expiry_date);
388 info!(
389 "Fetched expiry date for symbol {}: {}",
390 symbol,
391 symbol_expiry_map.get(&symbol).unwrap()
392 );
393 }
394 Err(e) => {
395 tracing::error!(
396 "Failed to get market details for epic {} (symbol {}): {:?}",
397 entry.epic,
398 symbol,
399 e
400 );
401 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
402 }
403 }
404 }
405 }
406
407 for entry in &mut vec_db_entries {
408 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
409 entry.expiry = expiry_date.clone();
410 }
411 }
412
413 info!("Updated expiry dates for {} entries", vec_db_entries.len());
414 Ok(vec_db_entries)
415 }
416}
417
418#[async_trait]
419impl AccountService for Client {
420 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
421 info!("Getting account information");
422 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
423 debug!(
424 "Account information obtained: {} accounts",
425 result.accounts.len()
426 );
427 Ok(result)
428 }
429
430 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
431 debug!("Getting open positions");
432 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
433 debug!("Positions obtained: {} positions", result.positions.len());
434 Ok(result)
435 }
436
437 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
438 debug!("Getting open positions with filter: {}", filter);
439 let mut positions = self.get_positions().await?;
440
441 positions
442 .positions
443 .retain(|position| position.market.epic.contains(filter));
444
445 debug!(
446 "Positions obtained after filtering: {} positions",
447 positions.positions.len()
448 );
449 Ok(positions)
450 }
451
452 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
453 info!("Getting working orders");
454 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
455 debug!(
456 "Working orders obtained: {} orders",
457 result.working_orders.len()
458 );
459 Ok(result)
460 }
461
462 async fn get_activity(
463 &self,
464 from: &str,
465 to: &str,
466 ) -> Result<AccountActivityResponse, AppError> {
467 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
468 info!("Getting account activity");
469 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
470 debug!(
471 "Account activity obtained: {} activities",
472 result.activities.len()
473 );
474 Ok(result)
475 }
476
477 async fn get_activity_with_details(
478 &self,
479 from: &str,
480 to: &str,
481 ) -> Result<AccountActivityResponse, AppError> {
482 let path = format!(
483 "history/activity?from={}&to={}&detailed=true&pageSize=500",
484 from, to
485 );
486 info!("Getting detailed account activity");
487 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
488 debug!(
489 "Detailed account activity obtained: {} activities",
490 result.activities.len()
491 );
492 Ok(result)
493 }
494
495 async fn get_transactions(
496 &self,
497 from: &str,
498 to: &str,
499 ) -> Result<TransactionHistoryResponse, AppError> {
500 const PAGE_SIZE: u32 = 200;
501 let mut all_transactions = Vec::new();
502 let mut current_page = 1;
503 #[allow(unused_assignments)]
504 let mut last_metadata = None;
505
506 loop {
507 let path = format!(
508 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
509 from, to, PAGE_SIZE, current_page
510 );
511 info!("Getting transaction history page {}", current_page);
512
513 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
514
515 let total_pages = result.metadata.page_data.total_pages as u32;
516 last_metadata = Some(result.metadata);
517 all_transactions.extend(result.transactions);
518
519 if current_page >= total_pages {
520 break;
521 }
522 current_page += 1;
523 }
524
525 debug!(
526 "Total transaction history obtained: {} transactions",
527 all_transactions.len()
528 );
529
530 Ok(TransactionHistoryResponse {
531 transactions: all_transactions,
532 metadata: last_metadata
533 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
534 })
535 }
536}
537
538#[async_trait]
539impl OrderService for Client {
540 async fn create_order(
541 &self,
542 order: &CreateOrderRequest,
543 ) -> Result<CreateOrderResponse, AppError> {
544 info!("Creating order for: {}", order.epic);
545 let result: CreateOrderResponse = self
546 .http_client
547 .post("positions/otc", order, Some(2))
548 .await?;
549 debug!("Order created with reference: {}", result.deal_reference);
550 Ok(result)
551 }
552
553 async fn get_order_confirmation(
554 &self,
555 deal_reference: &str,
556 ) -> Result<OrderConfirmationResponse, AppError> {
557 let path = format!("confirms/{}", deal_reference);
558 info!("Getting confirmation for order: {}", deal_reference);
559 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
560 debug!("Confirmation obtained for order: {}", deal_reference);
561 Ok(result)
562 }
563
564 async fn get_order_confirmation_w_retry(
565 &self,
566 deal_reference: &str,
567 retries: u64,
568 delay_ms: u64,
569 ) -> Result<OrderConfirmationResponse, AppError> {
570 let mut attempts = 0;
571 loop {
572 match self.get_order_confirmation(deal_reference).await {
573 Ok(response) => return Ok(response),
574 Err(e) => {
575 attempts += 1;
576 if attempts > retries {
577 return Err(e);
578 }
579 warn!(
580 "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
581 attempts, retries, e, delay_ms
582 );
583 sleep(Duration::from_millis(delay_ms)).await;
584 }
585 }
586 }
587 }
588
589 async fn update_position(
590 &self,
591 deal_id: &str,
592 update: &UpdatePositionRequest,
593 ) -> Result<UpdatePositionResponse, AppError> {
594 let path = format!("positions/otc/{}", deal_id);
595 info!("Updating position: {}", deal_id);
596 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
597 debug!(
598 "Position updated: {} with deal reference: {}",
599 deal_id, result.deal_reference
600 );
601 Ok(result)
602 }
603
604 async fn close_position(
605 &self,
606 close_request: &ClosePositionRequest,
607 ) -> Result<ClosePositionResponse, AppError> {
608 info!("Closing position");
609
610 let result: ClosePositionResponse = self
613 .http_client
614 .post_with_delete_method("positions/otc", close_request, Some(1))
615 .await?;
616
617 debug!("Position closed with reference: {}", result.deal_reference);
618 Ok(result)
619 }
620
621 async fn create_working_order(
622 &self,
623 order: &CreateWorkingOrderRequest,
624 ) -> Result<CreateWorkingOrderResponse, AppError> {
625 info!("Creating working order for: {}", order.epic);
626 let result: CreateWorkingOrderResponse = self
627 .http_client
628 .post("workingorders/otc", order, Some(2))
629 .await?;
630 debug!(
631 "Working order created with reference: {}",
632 result.deal_reference
633 );
634 Ok(result)
635 }
636
637 async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
638 let path = format!("workingorders/otc/{}", deal_id);
639 let result: CreateWorkingOrderResponse =
640 self.http_client.delete(path.as_str(), Some(2)).await?;
641 debug!(
642 "Working order created with reference: {}",
643 result.deal_reference
644 );
645 Ok(())
646 }
647}
648
649pub struct StreamerClient {
659 account_id: String,
660 market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
661 price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
662 has_market_stream_subs: bool,
664 has_price_stream_subs: bool,
665}
666
667impl StreamerClient {
668 pub async fn new() -> Result<Self, AppError> {
677 let http_client_raw = Arc::new(RwLock::new(Client::new()));
678 let http_client = http_client_raw.read().await;
679 let ws_info = http_client.get_ws_info().await;
680 let password = ws_info.get_ws_password();
681
682 let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
684 Some(ws_info.server.as_str()),
685 None,
686 Some(&ws_info.account_id),
687 Some(&password),
688 )?));
689
690 let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
691 Some(ws_info.server.as_str()),
692 None,
693 Some(&ws_info.account_id),
694 Some(&password),
695 )?));
696
697 {
699 let mut client = market_streamer_client.lock().await;
700 client
701 .connection_options
702 .set_forced_transport(Some(Transport::WsStreaming));
703 }
704 {
705 let mut client = price_streamer_client.lock().await;
706 client
707 .connection_options
708 .set_forced_transport(Some(Transport::WsStreaming));
709 }
710
711 Ok(Self {
712 account_id: ws_info.account_id.clone(),
713 market_streamer_client: Some(market_streamer_client),
714 price_streamer_client: Some(price_streamer_client),
715 has_market_stream_subs: false,
716 has_price_stream_subs: false,
717 })
718 }
719
720 pub async fn default() -> Result<Self, AppError> {
722 Self::new().await
723 }
724
725 pub async fn market_subscribe(
755 &mut self,
756 epics: Vec<String>,
757 fields: HashSet<StreamingMarketField>,
758 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
759 self.has_market_stream_subs = true;
761
762 let fields = get_streaming_market_fields(&fields);
763 let market_epics: Vec<String> = epics
764 .iter()
765 .map(|epic| "MARKET:".to_string() + epic)
766 .collect();
767 let mut subscription =
768 Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
769
770 subscription.set_data_adapter(None)?;
771 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
772
773 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
775 subscription.add_listener(Box::new(listener));
776
777 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
779 AppError::WebSocketError("market streamer client not initialized".to_string())
780 })?;
781
782 {
783 let mut client = client.lock().await;
784 client
785 .connection_options
786 .set_forced_transport(Some(Transport::WsStreaming));
787 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
788 }
789
790 let (price_tx, price_rx) = mpsc::unbounded_channel();
792 tokio::spawn(async move {
793 let mut receiver = item_receiver;
794 while let Some(item_update) = receiver.recv().await {
795 let price_data = PriceData::from(&item_update);
796 let _ = price_tx.send(price_data);
797 }
798 });
799
800 info!(
801 "Market subscription created for {} instruments",
802 epics.len()
803 );
804 Ok(price_rx)
805 }
806
807 pub async fn trade_subscribe(
830 &mut self,
831 ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
832 self.has_market_stream_subs = true;
834
835 let account_id = self.account_id.clone();
836 let fields = Some(vec![
837 "CONFIRMS".to_string(),
838 "OPU".to_string(),
839 "WOU".to_string(),
840 ]);
841 let trade_items = vec![format!("TRADE:{account_id}")];
842
843 let mut subscription =
844 Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
845
846 subscription.set_data_adapter(None)?;
847 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
848
849 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
851 subscription.add_listener(Box::new(listener));
852
853 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
855 AppError::WebSocketError("market streamer client not initialized".to_string())
856 })?;
857
858 {
859 let mut client = client.lock().await;
860 client
861 .connection_options
862 .set_forced_transport(Some(Transport::WsStreaming));
863 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
864 }
865
866 let (trade_tx, trade_rx) = mpsc::unbounded_channel();
868 tokio::spawn(async move {
869 let mut receiver = item_receiver;
870 while let Some(item_update) = receiver.recv().await {
871 let trade_data = crate::presentation::trade::TradeData::from(&item_update);
872 let _ = trade_tx.send(trade_data.fields);
873 }
874 });
875
876 info!("Trade subscription created for account: {}", account_id);
877 Ok(trade_rx)
878 }
879
880 pub async fn account_subscribe(
907 &mut self,
908 fields: HashSet<StreamingAccountDataField>,
909 ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
910 self.has_market_stream_subs = true;
912
913 let fields = get_streaming_account_data_fields(&fields);
914 let account_id = self.account_id.clone();
915 let account_items = vec![format!("ACCOUNT:{account_id}")];
916
917 let mut subscription =
918 Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
919
920 subscription.set_data_adapter(None)?;
921 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
922
923 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
925 subscription.add_listener(Box::new(listener));
926
927 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
929 AppError::WebSocketError("market streamer client not initialized".to_string())
930 })?;
931
932 {
933 let mut client = client.lock().await;
934 client
935 .connection_options
936 .set_forced_transport(Some(Transport::WsStreaming));
937 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
938 }
939
940 let (account_tx, account_rx) = mpsc::unbounded_channel();
942 tokio::spawn(async move {
943 let mut receiver = item_receiver;
944 while let Some(item_update) = receiver.recv().await {
945 let account_data = crate::presentation::account::AccountData::from(&item_update);
946 let _ = account_tx.send(account_data.fields);
947 }
948 });
949
950 info!("Account subscription created for account: {}", account_id);
951 Ok(account_rx)
952 }
953
954 pub async fn price_subscribe(
985 &mut self,
986 epics: Vec<String>,
987 fields: HashSet<StreamingPriceField>,
988 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
989 self.has_price_stream_subs = true;
991
992 let fields = get_streaming_price_fields(&fields);
993 let account_id = self.account_id.clone();
994 let price_epics: Vec<String> = epics
995 .iter()
996 .map(|epic| format!("PRICE:{account_id}:{epic}"))
997 .collect();
998
999 tracing::debug!("Pricing subscribe items: {:?}", price_epics);
1001 tracing::debug!("Pricing subscribe fields: {:?}", fields);
1002
1003 let mut subscription =
1004 Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1005
1006 let pricing_adapter =
1008 std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1009 tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1010 subscription.set_data_adapter(Some(pricing_adapter))?;
1011 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1012
1013 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1015 subscription.add_listener(Box::new(listener));
1016
1017 let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1019 AppError::WebSocketError("price streamer client not initialized".to_string())
1020 })?;
1021
1022 {
1023 let mut client = client.lock().await;
1024 client
1025 .connection_options
1026 .set_forced_transport(Some(Transport::WsStreaming));
1027 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1028 }
1029
1030 let (price_tx, price_rx) = mpsc::unbounded_channel();
1032 tokio::spawn(async move {
1033 let mut receiver = item_receiver;
1034 while let Some(item_update) = receiver.recv().await {
1035 let price_data = PriceData::from(&item_update);
1036 let _ = price_tx.send(price_data);
1037 }
1038 });
1039
1040 info!(
1041 "Price subscription created for {} instruments (account: {})",
1042 epics.len(),
1043 account_id
1044 );
1045 Ok(price_rx)
1046 }
1047
1048 pub async fn chart_subscribe(
1081 &mut self,
1082 epics: Vec<String>,
1083 scale: ChartScale,
1084 fields: HashSet<StreamingChartField>,
1085 ) -> Result<mpsc::UnboundedReceiver<ChartData>, AppError> {
1086 self.has_market_stream_subs = true;
1088
1089 let fields = get_streaming_chart_fields(&fields);
1090
1091 let chart_items: Vec<String> = epics
1092 .iter()
1093 .map(|epic| format!("CHART:{epic}:{scale}",))
1094 .collect();
1095
1096 let mode = if matches!(scale, ChartScale::Tick) {
1098 SubscriptionMode::Distinct
1099 } else {
1100 SubscriptionMode::Merge
1101 };
1102
1103 let mut subscription = Subscription::new(mode, Some(chart_items), Some(fields))?;
1104
1105 subscription.set_data_adapter(None)?;
1106 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1107
1108 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1110 subscription.add_listener(Box::new(listener));
1111
1112 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1114 AppError::WebSocketError("market streamer client not initialized".to_string())
1115 })?;
1116
1117 {
1118 let mut client = client.lock().await;
1119 client
1120 .connection_options
1121 .set_forced_transport(Some(Transport::WsStreaming));
1122 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1123 }
1124
1125 let (chart_tx, chart_rx) = mpsc::unbounded_channel();
1127 tokio::spawn(async move {
1128 let mut receiver = item_receiver;
1129 while let Some(item_update) = receiver.recv().await {
1130 let chart_data = ChartData::from(&item_update);
1131 let _ = chart_tx.send(chart_data);
1132 }
1133 });
1134
1135 info!(
1136 "Chart subscription created for {} instruments (scale: {})",
1137 epics.len(),
1138 scale
1139 );
1140
1141 Ok(chart_rx)
1142 }
1143
1144 pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1161 let signal = if let Some(sig) = shutdown_signal {
1163 sig
1164 } else {
1165 let sig = Arc::new(Notify::new());
1166 setup_signal_hook(Arc::clone(&sig)).await;
1167 sig
1168 };
1169
1170 let mut tasks = Vec::new();
1171
1172 if self.has_market_stream_subs {
1174 if let Some(client) = self.market_streamer_client.as_ref() {
1175 let client = Arc::clone(client);
1176 let signal = Arc::clone(&signal);
1177 let task =
1178 tokio::spawn(
1179 async move { Self::connect_client(client, signal, "Market").await },
1180 );
1181 tasks.push(task);
1182 }
1183 } else {
1184 info!("Skipping Market streamer connection: no active subscriptions");
1185 }
1186
1187 if self.has_price_stream_subs {
1189 if let Some(client) = self.price_streamer_client.as_ref() {
1190 let client = Arc::clone(client);
1191 let signal = Arc::clone(&signal);
1192 let task =
1193 tokio::spawn(
1194 async move { Self::connect_client(client, signal, "Price").await },
1195 );
1196 tasks.push(task);
1197 }
1198 } else {
1199 info!("Skipping Price streamer connection: no active subscriptions");
1200 }
1201
1202 if tasks.is_empty() {
1203 warn!("No streaming clients selected for connection (no active subscriptions)");
1204 return Ok(());
1205 }
1206
1207 info!("Connecting {} streaming client(s)...", tasks.len());
1208
1209 let results = futures::future::join_all(tasks).await;
1211
1212 let mut has_error = false;
1214 for (idx, result) in results.iter().enumerate() {
1215 match result {
1216 Ok(Ok(_)) => {
1217 debug!("Streaming client {} completed successfully", idx);
1218 }
1219 Ok(Err(e)) => {
1220 error!("Streaming client {} failed: {:?}", idx, e);
1221 has_error = true;
1222 }
1223 Err(e) => {
1224 error!("Streaming client {} task panicked: {:?}", idx, e);
1225 has_error = true;
1226 }
1227 }
1228 }
1229
1230 if has_error {
1231 return Err(AppError::WebSocketError(
1232 "one or more streaming connections failed".to_string(),
1233 ));
1234 }
1235
1236 info!("All streaming connections closed gracefully");
1237 Ok(())
1238 }
1239
1240 async fn connect_client(
1242 client: Arc<Mutex<LightstreamerClient>>,
1243 signal: Arc<Notify>,
1244 client_type: &str,
1245 ) -> Result<(), AppError> {
1246 let mut retry_interval_millis: u64 = 0;
1247 let mut retry_counter: u64 = 0;
1248
1249 while retry_counter < MAX_CONNECTION_ATTEMPTS {
1250 let connect_result = {
1251 let mut client = client.lock().await;
1252 client.connect_direct(Arc::clone(&signal)).await
1253 };
1254
1255 let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1257
1258 match result_with_string_error {
1259 Ok(_) => {
1260 info!("{} streamer connected successfully", client_type);
1261 break;
1262 }
1263 Err(error_msg) => {
1264 if error_msg.contains("No more requests to fulfill") {
1266 info!(
1267 "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1268 client_type
1269 );
1270 return Ok(());
1271 }
1272
1273 error!("{} streamer connection failed: {}", client_type, error_msg);
1274
1275 if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1276 tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1277 .await;
1278 retry_interval_millis =
1279 (retry_interval_millis + (200 * retry_counter)).min(5000);
1280 retry_counter += 1;
1281 warn!(
1282 "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1283 client_type,
1284 retry_counter + 1,
1285 MAX_CONNECTION_ATTEMPTS,
1286 retry_interval_millis as f64 / 1000.0
1287 );
1288 } else {
1289 retry_counter += 1;
1290 }
1291 }
1292 }
1293 }
1294
1295 if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1296 error!(
1297 "{} streamer failed after {} attempts",
1298 client_type, MAX_CONNECTION_ATTEMPTS
1299 );
1300 return Err(AppError::WebSocketError(format!(
1301 "{} streamer: maximum connection attempts ({}) exceeded",
1302 client_type, MAX_CONNECTION_ATTEMPTS
1303 )));
1304 }
1305
1306 info!("{} streamer connection closed gracefully", client_type);
1307 Ok(())
1308 }
1309
1310 pub async fn disconnect(&mut self) -> Result<(), AppError> {
1318 let mut disconnected = 0;
1319
1320 if let Some(client) = self.market_streamer_client.as_ref() {
1321 let mut client = client.lock().await;
1322 client.disconnect().await;
1323 info!("Market streamer disconnected");
1324 disconnected += 1;
1325 }
1326
1327 if let Some(client) = self.price_streamer_client.as_ref() {
1328 let mut client = client.lock().await;
1329 client.disconnect().await;
1330 info!("Price streamer disconnected");
1331 disconnected += 1;
1332 }
1333
1334 info!("Disconnected {} streaming client(s)", disconnected);
1335 Ok(())
1336 }
1337}