1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::market::MarketService;
9use crate::application::interfaces::order::OrderService;
10use crate::error::AppError;
11use crate::model::http::HttpClient;
12use crate::model::requests::RecentPricesRequest;
13use crate::model::requests::{
14 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
15};
16use crate::model::responses::{
17 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
18};
19use crate::model::responses::{
20 DBEntryResponse, HistoricalPricesResponse, MarketNavigationResponse, MarketSearchResponse,
21 MultipleMarketDetailsResponse,
22};
23use crate::model::streaming::{
24 StreamingAccountDataField, StreamingChartField, StreamingMarketField, StreamingPriceField,
25 get_streaming_account_data_fields, get_streaming_chart_fields, get_streaming_market_fields,
26 get_streaming_price_fields,
27};
28use crate::prelude::{
29 AccountActivityResponse, AccountFields, AccountsResponse, ChartData, ChartScale,
30 OrderConfirmationResponse, PositionsResponse, TradeFields, TransactionHistoryResponse,
31 WorkingOrdersResponse,
32};
33use crate::presentation::market::{MarketData, MarketDetails};
34use crate::presentation::price::PriceData;
35use async_trait::async_trait;
36use lightstreamer_rs::client::{LightstreamerClient, Transport};
37use lightstreamer_rs::subscription::{
38 ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
39};
40use lightstreamer_rs::utils::setup_signal_hook;
41use serde_json::Value;
42use std::collections::HashSet;
43use std::sync::Arc;
44use std::time::Duration;
45use tokio::sync::{Mutex, Notify, RwLock, mpsc};
46use tokio::time::sleep;
47use tracing::{debug, error, info, warn};
48
49const MAX_CONNECTION_ATTEMPTS: u64 = 3;
50
51pub struct Client {
56 http_client: Arc<HttpClient>,
57}
58
59impl Client {
60 pub fn new() -> Self {
65 let http_client = Arc::new(HttpClient::default());
66 Self { http_client }
67 }
68
69 pub async fn get_ws_info(&self) -> WebsocketInfo {
74 self.http_client.get_ws_info().await
75 }
76}
77
78impl Default for Client {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84#[async_trait]
85impl MarketService for Client {
86 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
87 let path = format!("markets?searchTerm={}", search_term);
88 info!("Searching markets with term: {}", search_term);
89 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
90 debug!("{} markets found", result.markets.len());
91 Ok(result)
92 }
93
94 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
95 let path = format!("markets/{epic}");
96 info!("Getting market details: {}", epic);
97 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
98 let market_details: MarketDetails = serde_json::from_value(market_value)?;
99 debug!("Market details obtained for: {}", epic);
100 Ok(market_details)
101 }
102
103 async fn get_multiple_market_details(
104 &self,
105 epics: &[String],
106 ) -> Result<MultipleMarketDetailsResponse, AppError> {
107 if epics.is_empty() {
108 return Ok(MultipleMarketDetailsResponse::default());
109 } else if epics.len() > 50 {
110 return Err(AppError::InvalidInput(
111 "The maximum number of EPICs is 50".to_string(),
112 ));
113 }
114
115 let epics_str = epics.join(",");
116 let path = format!("markets?epics={}", epics_str);
117 debug!(
118 "Getting market details for {} EPICs in a batch",
119 epics.len()
120 );
121
122 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
123
124 Ok(response)
125 }
126
127 async fn get_historical_prices(
128 &self,
129 epic: &str,
130 resolution: &str,
131 from: &str,
132 to: &str,
133 ) -> Result<HistoricalPricesResponse, AppError> {
134 let path = format!(
135 "prices/{}?resolution={}&from={}&to={}",
136 epic, resolution, from, to
137 );
138 info!("Getting historical prices for: {}", epic);
139 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
140 debug!("Historical prices obtained for: {}", epic);
141 Ok(result)
142 }
143
144 async fn get_historical_prices_by_date_range(
145 &self,
146 epic: &str,
147 resolution: &str,
148 start_date: &str,
149 end_date: &str,
150 ) -> Result<HistoricalPricesResponse, AppError> {
151 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
152 info!(
153 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
154 epic, resolution, start_date, end_date
155 );
156 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
157 debug!(
158 "Historical prices obtained for epic: {}, {} data points",
159 epic,
160 result.prices.len()
161 );
162 Ok(result)
163 }
164
165 async fn get_recent_prices(
166 &self,
167 params: &RecentPricesRequest<'_>,
168 ) -> Result<HistoricalPricesResponse, AppError> {
169 let mut query_params = Vec::new();
170
171 if let Some(res) = params.resolution {
172 query_params.push(format!("resolution={}", res));
173 }
174 if let Some(f) = params.from {
175 query_params.push(format!("from={}", f));
176 }
177 if let Some(t) = params.to {
178 query_params.push(format!("to={}", t));
179 }
180 if let Some(max) = params.max_points {
181 query_params.push(format!("max={}", max));
182 }
183 if let Some(size) = params.page_size {
184 query_params.push(format!("pageSize={}", size));
185 }
186 if let Some(num) = params.page_number {
187 query_params.push(format!("pageNumber={}", num));
188 }
189
190 let query_string = if query_params.is_empty() {
191 String::new()
192 } else {
193 format!("?{}", query_params.join("&"))
194 };
195
196 let path = format!("prices/{}{}", params.epic, query_string);
197 info!("Getting recent prices for epic: {}", params.epic);
198 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
199 debug!(
200 "Recent prices obtained for epic: {}, {} data points",
201 params.epic,
202 result.prices.len()
203 );
204 Ok(result)
205 }
206
207 async fn get_historical_prices_by_count_v1(
208 &self,
209 epic: &str,
210 resolution: &str,
211 num_points: i32,
212 ) -> Result<HistoricalPricesResponse, AppError> {
213 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
214 info!(
215 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
216 epic, resolution, num_points
217 );
218 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
219 debug!(
220 "Historical prices (v1) obtained for epic: {}, {} data points",
221 epic,
222 result.prices.len()
223 );
224 Ok(result)
225 }
226
227 async fn get_historical_prices_by_count_v2(
228 &self,
229 epic: &str,
230 resolution: &str,
231 num_points: i32,
232 ) -> Result<HistoricalPricesResponse, AppError> {
233 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
234 info!(
235 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
236 epic, resolution, num_points
237 );
238 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
239 debug!(
240 "Historical prices (v2) obtained for epic: {}, {} data points",
241 epic,
242 result.prices.len()
243 );
244 Ok(result)
245 }
246
247 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
248 let path = "marketnavigation";
249 info!("Getting top-level market navigation nodes");
250 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
251 debug!("{} navigation nodes found", result.nodes.len());
252 debug!("{} markets found at root level", result.markets.len());
253 Ok(result)
254 }
255
256 async fn get_market_navigation_node(
257 &self,
258 node_id: &str,
259 ) -> Result<MarketNavigationResponse, AppError> {
260 let path = format!("marketnavigation/{}", node_id);
261 info!("Getting market navigation node: {}", node_id);
262 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
263 debug!("{} child nodes found", result.nodes.len());
264 debug!("{} markets found in node {}", result.markets.len(), node_id);
265 Ok(result)
266 }
267
268 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
269 let max_depth = 6;
270 info!(
271 "Starting comprehensive market hierarchy traversal (max {} levels)",
272 max_depth
273 );
274
275 let root_response = self.get_market_navigation().await?;
276 info!(
277 "Root navigation: {} nodes, {} markets at top level",
278 root_response.nodes.len(),
279 root_response.markets.len()
280 );
281
282 let mut all_markets = root_response.markets.clone();
283 let mut nodes_to_process = root_response.nodes.clone();
284 let mut processed_levels = 0;
285
286 while !nodes_to_process.is_empty() && processed_levels < max_depth {
287 let mut next_level_nodes = Vec::new();
288 let mut level_market_count = 0;
289
290 info!(
291 "Processing level {} with {} nodes",
292 processed_levels,
293 nodes_to_process.len()
294 );
295
296 for node in &nodes_to_process {
297 match self.get_market_navigation_node(&node.id).await {
298 Ok(node_response) => {
299 let node_markets = node_response.markets.len();
300 let node_children = node_response.nodes.len();
301
302 if node_markets > 0 || node_children > 0 {
303 debug!(
304 "Node '{}' (level {}): {} markets, {} child nodes",
305 node.name, processed_levels, node_markets, node_children
306 );
307 }
308
309 all_markets.extend(node_response.markets);
310 level_market_count += node_markets;
311 next_level_nodes.extend(node_response.nodes);
312 }
313 Err(e) => {
314 tracing::error!(
315 "Failed to get markets for node '{}' at level {}: {:?}",
316 node.name,
317 processed_levels,
318 e
319 );
320 }
321 }
322 }
323
324 info!(
325 "Level {} completed: {} markets found, {} nodes for next level",
326 processed_levels,
327 level_market_count,
328 next_level_nodes.len()
329 );
330
331 nodes_to_process = next_level_nodes;
332 processed_levels += 1;
333 }
334
335 info!(
336 "Market hierarchy traversal completed: {} total markets found across {} levels",
337 all_markets.len(),
338 processed_levels
339 );
340
341 Ok(all_markets)
342 }
343
344 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
345 info!("Getting all markets from hierarchy for DB entries");
346
347 let all_markets = self.get_all_markets().await?;
348 info!("Collected {} markets from hierarchy", all_markets.len());
349
350 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
351 .iter()
352 .map(DBEntryResponse::from)
353 .filter(|entry| !entry.epic.is_empty())
354 .collect();
355
356 info!("Created {} DB entries from markets", vec_db_entries.len());
357
358 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
360 .iter()
361 .map(|entry| entry.symbol.clone())
362 .filter(|symbol| !symbol.is_empty())
363 .collect();
364
365 info!(
366 "Found {} unique symbols to fetch expiry dates for",
367 unique_symbols.len()
368 );
369
370 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
371 std::collections::HashMap::new();
372
373 for symbol in unique_symbols {
374 if let Some(entry) = vec_db_entries
375 .iter()
376 .find(|e| e.symbol == symbol && !e.epic.is_empty())
377 {
378 match self.get_market_details(&entry.epic).await {
379 Ok(market_details) => {
380 let expiry_date = market_details
381 .instrument
382 .expiry_details
383 .as_ref()
384 .map(|details| details.last_dealing_date.clone())
385 .unwrap_or_else(|| market_details.instrument.expiry.clone());
386
387 symbol_expiry_map.insert(symbol.clone(), expiry_date);
388 info!(
389 "Fetched expiry date for symbol {}: {}",
390 symbol,
391 symbol_expiry_map.get(&symbol).unwrap()
392 );
393 }
394 Err(e) => {
395 tracing::error!(
396 "Failed to get market details for epic {} (symbol {}): {:?}",
397 entry.epic,
398 symbol,
399 e
400 );
401 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
402 }
403 }
404 }
405 }
406
407 for entry in &mut vec_db_entries {
408 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
409 entry.expiry = expiry_date.clone();
410 }
411 }
412
413 info!("Updated expiry dates for {} entries", vec_db_entries.len());
414 Ok(vec_db_entries)
415 }
416}
417
418#[async_trait]
419impl AccountService for Client {
420 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
421 info!("Getting account information");
422 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
423 debug!(
424 "Account information obtained: {} accounts",
425 result.accounts.len()
426 );
427 Ok(result)
428 }
429
430 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
431 debug!("Getting open positions");
432 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
433 debug!("Positions obtained: {} positions", result.positions.len());
434 Ok(result)
435 }
436
437 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
438 debug!("Getting open positions with filter: {}", filter);
439 let mut positions = self.get_positions().await?;
440
441 positions
442 .positions
443 .retain(|position| position.market.epic.contains(filter));
444
445 debug!(
446 "Positions obtained after filtering: {} positions",
447 positions.positions.len()
448 );
449 Ok(positions)
450 }
451
452 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
453 info!("Getting working orders");
454 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
455 debug!(
456 "Working orders obtained: {} orders",
457 result.working_orders.len()
458 );
459 Ok(result)
460 }
461
462 async fn get_activity(
463 &self,
464 from: &str,
465 to: &str,
466 ) -> Result<AccountActivityResponse, AppError> {
467 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
468 info!("Getting account activity");
469 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
470 debug!(
471 "Account activity obtained: {} activities",
472 result.activities.len()
473 );
474 Ok(result)
475 }
476
477 async fn get_activity_with_details(
478 &self,
479 from: &str,
480 to: &str,
481 ) -> Result<AccountActivityResponse, AppError> {
482 let path = format!(
483 "history/activity?from={}&to={}&detailed=true&pageSize=500",
484 from, to
485 );
486 info!("Getting detailed account activity");
487 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
488 debug!(
489 "Detailed account activity obtained: {} activities",
490 result.activities.len()
491 );
492 Ok(result)
493 }
494
495 async fn get_transactions(
496 &self,
497 from: &str,
498 to: &str,
499 ) -> Result<TransactionHistoryResponse, AppError> {
500 const PAGE_SIZE: u32 = 200;
501 let mut all_transactions = Vec::new();
502 let mut current_page = 1;
503 #[allow(unused_assignments)]
504 let mut last_metadata = None;
505
506 loop {
507 let path = format!(
508 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
509 from, to, PAGE_SIZE, current_page
510 );
511 info!("Getting transaction history page {}", current_page);
512
513 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
514
515 let total_pages = result.metadata.page_data.total_pages as u32;
516 last_metadata = Some(result.metadata);
517 all_transactions.extend(result.transactions);
518
519 if current_page >= total_pages {
520 break;
521 }
522 current_page += 1;
523 }
524
525 debug!(
526 "Total transaction history obtained: {} transactions",
527 all_transactions.len()
528 );
529
530 Ok(TransactionHistoryResponse {
531 transactions: all_transactions,
532 metadata: last_metadata
533 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
534 })
535 }
536}
537
538#[async_trait]
539impl OrderService for Client {
540 async fn create_order(
541 &self,
542 order: &CreateOrderRequest,
543 ) -> Result<CreateOrderResponse, AppError> {
544 info!("Creating order for: {}", order.epic);
545 let result: CreateOrderResponse = self
546 .http_client
547 .post("positions/otc", order, Some(2))
548 .await?;
549 debug!("Order created with reference: {}", result.deal_reference);
550 Ok(result)
551 }
552
553 async fn get_order_confirmation(
554 &self,
555 deal_reference: &str,
556 ) -> Result<OrderConfirmationResponse, AppError> {
557 let path = format!("confirms/{}", deal_reference);
558 info!("Getting confirmation for order: {}", deal_reference);
559 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
560 debug!("Confirmation obtained for order: {}", deal_reference);
561 Ok(result)
562 }
563
564 async fn get_order_confirmation_w_retry(
565 &self,
566 deal_reference: &str,
567 retries: u64,
568 delay_ms: u64,
569 ) -> Result<OrderConfirmationResponse, AppError> {
570 let mut attempts = 0;
571 loop {
572 match self.get_order_confirmation(deal_reference).await {
573 Ok(response) => return Ok(response),
574 Err(e) => {
575 attempts += 1;
576 if attempts > retries {
577 return Err(e);
578 }
579 warn!(
580 "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
581 attempts, retries, e, delay_ms
582 );
583 sleep(Duration::from_millis(delay_ms)).await;
584 }
585 }
586 }
587 }
588
589 async fn update_position(
590 &self,
591 deal_id: &str,
592 update: &UpdatePositionRequest,
593 ) -> Result<UpdatePositionResponse, AppError> {
594 let path = format!("positions/otc/{}", deal_id);
595 info!("Updating position: {}", deal_id);
596 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
597 debug!(
598 "Position updated: {} with deal reference: {}",
599 deal_id, result.deal_reference
600 );
601 Ok(result)
602 }
603
604 async fn update_level_in_position(
605 &self,
606 deal_id: &str,
607 limit_level: Option<f64>,
608 ) -> Result<UpdatePositionResponse, AppError> {
609 let path = format!("positions/otc/{}", deal_id);
610 info!("Updating position: {}", deal_id);
611 let limit_level = limit_level.unwrap_or(0.0);
612
613 let update: UpdatePositionRequest = UpdatePositionRequest {
614 guaranteed_stop: None,
615 limit_level: Some(limit_level),
616 stop_level: None,
617 trailing_stop: None,
618 trailing_stop_distance: None,
619 trailing_stop_increment: None,
620 };
621 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
622 debug!(
623 "Position updated: {} with deal reference: {}",
624 deal_id, result.deal_reference
625 );
626 Ok(result)
627 }
628
629 async fn close_position(
630 &self,
631 close_request: &ClosePositionRequest,
632 ) -> Result<ClosePositionResponse, AppError> {
633 info!("Closing position");
634
635 let result: ClosePositionResponse = self
638 .http_client
639 .post_with_delete_method("positions/otc", close_request, Some(1))
640 .await?;
641
642 debug!("Position closed with reference: {}", result.deal_reference);
643 Ok(result)
644 }
645
646 async fn create_working_order(
647 &self,
648 order: &CreateWorkingOrderRequest,
649 ) -> Result<CreateWorkingOrderResponse, AppError> {
650 info!("Creating working order for: {}", order.epic);
651 let result: CreateWorkingOrderResponse = self
652 .http_client
653 .post("workingorders/otc", order, Some(2))
654 .await?;
655 debug!(
656 "Working order created with reference: {}",
657 result.deal_reference
658 );
659 Ok(result)
660 }
661
662 async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
663 let path = format!("workingorders/otc/{}", deal_id);
664 let result: CreateWorkingOrderResponse =
665 self.http_client.delete(path.as_str(), Some(2)).await?;
666 debug!(
667 "Working order created with reference: {}",
668 result.deal_reference
669 );
670 Ok(())
671 }
672}
673
674pub struct StreamerClient {
684 account_id: String,
685 market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
686 price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
687 has_market_stream_subs: bool,
689 has_price_stream_subs: bool,
690}
691
692impl StreamerClient {
693 pub async fn new() -> Result<Self, AppError> {
702 let http_client_raw = Arc::new(RwLock::new(Client::new()));
703 let http_client = http_client_raw.read().await;
704 let ws_info = http_client.get_ws_info().await;
705 let password = ws_info.get_ws_password();
706
707 let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
709 Some(ws_info.server.as_str()),
710 None,
711 Some(&ws_info.account_id),
712 Some(&password),
713 )?));
714
715 let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
716 Some(ws_info.server.as_str()),
717 None,
718 Some(&ws_info.account_id),
719 Some(&password),
720 )?));
721
722 {
724 let mut client = market_streamer_client.lock().await;
725 client
726 .connection_options
727 .set_forced_transport(Some(Transport::WsStreaming));
728 }
729 {
730 let mut client = price_streamer_client.lock().await;
731 client
732 .connection_options
733 .set_forced_transport(Some(Transport::WsStreaming));
734 }
735
736 Ok(Self {
737 account_id: ws_info.account_id.clone(),
738 market_streamer_client: Some(market_streamer_client),
739 price_streamer_client: Some(price_streamer_client),
740 has_market_stream_subs: false,
741 has_price_stream_subs: false,
742 })
743 }
744
745 pub async fn default() -> Result<Self, AppError> {
747 Self::new().await
748 }
749
750 pub async fn market_subscribe(
780 &mut self,
781 epics: Vec<String>,
782 fields: HashSet<StreamingMarketField>,
783 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
784 self.has_market_stream_subs = true;
786
787 let fields = get_streaming_market_fields(&fields);
788 let market_epics: Vec<String> = epics
789 .iter()
790 .map(|epic| "MARKET:".to_string() + epic)
791 .collect();
792 let mut subscription =
793 Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
794
795 subscription.set_data_adapter(None)?;
796 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
797
798 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
800 subscription.add_listener(Box::new(listener));
801
802 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
804 AppError::WebSocketError("market streamer client not initialized".to_string())
805 })?;
806
807 {
808 let mut client = client.lock().await;
809 client
810 .connection_options
811 .set_forced_transport(Some(Transport::WsStreaming));
812 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
813 }
814
815 let (price_tx, price_rx) = mpsc::unbounded_channel();
817 tokio::spawn(async move {
818 let mut receiver = item_receiver;
819 while let Some(item_update) = receiver.recv().await {
820 let price_data = PriceData::from(&item_update);
821 let _ = price_tx.send(price_data);
822 }
823 });
824
825 info!(
826 "Market subscription created for {} instruments",
827 epics.len()
828 );
829 Ok(price_rx)
830 }
831
832 pub async fn trade_subscribe(
855 &mut self,
856 ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
857 self.has_market_stream_subs = true;
859
860 let account_id = self.account_id.clone();
861 let fields = Some(vec![
862 "CONFIRMS".to_string(),
863 "OPU".to_string(),
864 "WOU".to_string(),
865 ]);
866 let trade_items = vec![format!("TRADE:{account_id}")];
867
868 let mut subscription =
869 Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
870
871 subscription.set_data_adapter(None)?;
872 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
873
874 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
876 subscription.add_listener(Box::new(listener));
877
878 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
880 AppError::WebSocketError("market streamer client not initialized".to_string())
881 })?;
882
883 {
884 let mut client = client.lock().await;
885 client
886 .connection_options
887 .set_forced_transport(Some(Transport::WsStreaming));
888 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
889 }
890
891 let (trade_tx, trade_rx) = mpsc::unbounded_channel();
893 tokio::spawn(async move {
894 let mut receiver = item_receiver;
895 while let Some(item_update) = receiver.recv().await {
896 let trade_data = crate::presentation::trade::TradeData::from(&item_update);
897 let _ = trade_tx.send(trade_data.fields);
898 }
899 });
900
901 info!("Trade subscription created for account: {}", account_id);
902 Ok(trade_rx)
903 }
904
905 pub async fn account_subscribe(
932 &mut self,
933 fields: HashSet<StreamingAccountDataField>,
934 ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
935 self.has_market_stream_subs = true;
937
938 let fields = get_streaming_account_data_fields(&fields);
939 let account_id = self.account_id.clone();
940 let account_items = vec![format!("ACCOUNT:{account_id}")];
941
942 let mut subscription =
943 Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
944
945 subscription.set_data_adapter(None)?;
946 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
947
948 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
950 subscription.add_listener(Box::new(listener));
951
952 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
954 AppError::WebSocketError("market streamer client not initialized".to_string())
955 })?;
956
957 {
958 let mut client = client.lock().await;
959 client
960 .connection_options
961 .set_forced_transport(Some(Transport::WsStreaming));
962 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
963 }
964
965 let (account_tx, account_rx) = mpsc::unbounded_channel();
967 tokio::spawn(async move {
968 let mut receiver = item_receiver;
969 while let Some(item_update) = receiver.recv().await {
970 let account_data = crate::presentation::account::AccountData::from(&item_update);
971 let _ = account_tx.send(account_data.fields);
972 }
973 });
974
975 info!("Account subscription created for account: {}", account_id);
976 Ok(account_rx)
977 }
978
979 pub async fn price_subscribe(
1010 &mut self,
1011 epics: Vec<String>,
1012 fields: HashSet<StreamingPriceField>,
1013 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1014 self.has_price_stream_subs = true;
1016
1017 let fields = get_streaming_price_fields(&fields);
1018 let account_id = self.account_id.clone();
1019 let price_epics: Vec<String> = epics
1020 .iter()
1021 .map(|epic| format!("PRICE:{account_id}:{epic}"))
1022 .collect();
1023
1024 tracing::debug!("Pricing subscribe items: {:?}", price_epics);
1026 tracing::debug!("Pricing subscribe fields: {:?}", fields);
1027
1028 let mut subscription =
1029 Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1030
1031 let pricing_adapter =
1033 std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1034 tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1035 subscription.set_data_adapter(Some(pricing_adapter))?;
1036 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1037
1038 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1040 subscription.add_listener(Box::new(listener));
1041
1042 let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1044 AppError::WebSocketError("price streamer client not initialized".to_string())
1045 })?;
1046
1047 {
1048 let mut client = client.lock().await;
1049 client
1050 .connection_options
1051 .set_forced_transport(Some(Transport::WsStreaming));
1052 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1053 }
1054
1055 let (price_tx, price_rx) = mpsc::unbounded_channel();
1057 tokio::spawn(async move {
1058 let mut receiver = item_receiver;
1059 while let Some(item_update) = receiver.recv().await {
1060 let price_data = PriceData::from(&item_update);
1061 let _ = price_tx.send(price_data);
1062 }
1063 });
1064
1065 info!(
1066 "Price subscription created for {} instruments (account: {})",
1067 epics.len(),
1068 account_id
1069 );
1070 Ok(price_rx)
1071 }
1072
1073 pub async fn chart_subscribe(
1106 &mut self,
1107 epics: Vec<String>,
1108 scale: ChartScale,
1109 fields: HashSet<StreamingChartField>,
1110 ) -> Result<mpsc::UnboundedReceiver<ChartData>, AppError> {
1111 self.has_market_stream_subs = true;
1113
1114 let fields = get_streaming_chart_fields(&fields);
1115
1116 let chart_items: Vec<String> = epics
1117 .iter()
1118 .map(|epic| format!("CHART:{epic}:{scale}",))
1119 .collect();
1120
1121 let mode = if matches!(scale, ChartScale::Tick) {
1123 SubscriptionMode::Distinct
1124 } else {
1125 SubscriptionMode::Merge
1126 };
1127
1128 let mut subscription = Subscription::new(mode, Some(chart_items), Some(fields))?;
1129
1130 subscription.set_data_adapter(None)?;
1131 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1132
1133 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1135 subscription.add_listener(Box::new(listener));
1136
1137 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1139 AppError::WebSocketError("market streamer client not initialized".to_string())
1140 })?;
1141
1142 {
1143 let mut client = client.lock().await;
1144 client
1145 .connection_options
1146 .set_forced_transport(Some(Transport::WsStreaming));
1147 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription).await;
1148 }
1149
1150 let (chart_tx, chart_rx) = mpsc::unbounded_channel();
1152 tokio::spawn(async move {
1153 let mut receiver = item_receiver;
1154 while let Some(item_update) = receiver.recv().await {
1155 let chart_data = ChartData::from(&item_update);
1156 let _ = chart_tx.send(chart_data);
1157 }
1158 });
1159
1160 info!(
1161 "Chart subscription created for {} instruments (scale: {})",
1162 epics.len(),
1163 scale
1164 );
1165
1166 Ok(chart_rx)
1167 }
1168
1169 pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1186 let signal = if let Some(sig) = shutdown_signal {
1188 sig
1189 } else {
1190 let sig = Arc::new(Notify::new());
1191 setup_signal_hook(Arc::clone(&sig)).await;
1192 sig
1193 };
1194
1195 let mut tasks = Vec::new();
1196
1197 if self.has_market_stream_subs {
1199 if let Some(client) = self.market_streamer_client.as_ref() {
1200 let client = Arc::clone(client);
1201 let signal = Arc::clone(&signal);
1202 let task =
1203 tokio::spawn(
1204 async move { Self::connect_client(client, signal, "Market").await },
1205 );
1206 tasks.push(task);
1207 }
1208 } else {
1209 info!("Skipping Market streamer connection: no active subscriptions");
1210 }
1211
1212 if self.has_price_stream_subs {
1214 if let Some(client) = self.price_streamer_client.as_ref() {
1215 let client = Arc::clone(client);
1216 let signal = Arc::clone(&signal);
1217 let task =
1218 tokio::spawn(
1219 async move { Self::connect_client(client, signal, "Price").await },
1220 );
1221 tasks.push(task);
1222 }
1223 } else {
1224 info!("Skipping Price streamer connection: no active subscriptions");
1225 }
1226
1227 if tasks.is_empty() {
1228 warn!("No streaming clients selected for connection (no active subscriptions)");
1229 return Ok(());
1230 }
1231
1232 info!("Connecting {} streaming client(s)...", tasks.len());
1233
1234 let results = futures::future::join_all(tasks).await;
1236
1237 let mut has_error = false;
1239 for (idx, result) in results.iter().enumerate() {
1240 match result {
1241 Ok(Ok(_)) => {
1242 debug!("Streaming client {} completed successfully", idx);
1243 }
1244 Ok(Err(e)) => {
1245 error!("Streaming client {} failed: {:?}", idx, e);
1246 has_error = true;
1247 }
1248 Err(e) => {
1249 error!("Streaming client {} task panicked: {:?}", idx, e);
1250 has_error = true;
1251 }
1252 }
1253 }
1254
1255 if has_error {
1256 return Err(AppError::WebSocketError(
1257 "one or more streaming connections failed".to_string(),
1258 ));
1259 }
1260
1261 info!("All streaming connections closed gracefully");
1262 Ok(())
1263 }
1264
1265 async fn connect_client(
1267 client: Arc<Mutex<LightstreamerClient>>,
1268 signal: Arc<Notify>,
1269 client_type: &str,
1270 ) -> Result<(), AppError> {
1271 let mut retry_interval_millis: u64 = 0;
1272 let mut retry_counter: u64 = 0;
1273
1274 while retry_counter < MAX_CONNECTION_ATTEMPTS {
1275 let connect_result = {
1276 let mut client = client.lock().await;
1277 client.connect_direct(Arc::clone(&signal)).await
1278 };
1279
1280 let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1282
1283 match result_with_string_error {
1284 Ok(_) => {
1285 info!("{} streamer connected successfully", client_type);
1286 break;
1287 }
1288 Err(error_msg) => {
1289 if error_msg.contains("No more requests to fulfill") {
1291 info!(
1292 "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1293 client_type
1294 );
1295 return Ok(());
1296 }
1297
1298 error!("{} streamer connection failed: {}", client_type, error_msg);
1299
1300 if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1301 tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1302 .await;
1303 retry_interval_millis =
1304 (retry_interval_millis + (200 * retry_counter)).min(5000);
1305 retry_counter += 1;
1306 warn!(
1307 "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1308 client_type,
1309 retry_counter + 1,
1310 MAX_CONNECTION_ATTEMPTS,
1311 retry_interval_millis as f64 / 1000.0
1312 );
1313 } else {
1314 retry_counter += 1;
1315 }
1316 }
1317 }
1318 }
1319
1320 if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1321 error!(
1322 "{} streamer failed after {} attempts",
1323 client_type, MAX_CONNECTION_ATTEMPTS
1324 );
1325 return Err(AppError::WebSocketError(format!(
1326 "{} streamer: maximum connection attempts ({}) exceeded",
1327 client_type, MAX_CONNECTION_ATTEMPTS
1328 )));
1329 }
1330
1331 info!("{} streamer connection closed gracefully", client_type);
1332 Ok(())
1333 }
1334
1335 pub async fn disconnect(&mut self) -> Result<(), AppError> {
1343 let mut disconnected = 0;
1344
1345 if let Some(client) = self.market_streamer_client.as_ref() {
1346 let mut client = client.lock().await;
1347 client.disconnect().await;
1348 info!("Market streamer disconnected");
1349 disconnected += 1;
1350 }
1351
1352 if let Some(client) = self.price_streamer_client.as_ref() {
1353 let mut client = client.lock().await;
1354 client.disconnect().await;
1355 info!("Price streamer disconnected");
1356 disconnected += 1;
1357 }
1358
1359 info!("Disconnected {} streaming client(s)", disconnected);
1360 Ok(())
1361 }
1362}