1use crate::application::auth::WebsocketInfo;
7use crate::application::interfaces::account::AccountService;
8use crate::application::interfaces::costs::CostsService;
9use crate::application::interfaces::market::MarketService;
10use crate::application::interfaces::operations::OperationsService;
11use crate::application::interfaces::order::OrderService;
12use crate::application::interfaces::sentiment::SentimentService;
13use crate::application::interfaces::watchlist::WatchlistService;
14use crate::error::AppError;
15use crate::model::http::HttpClient;
16use crate::model::requests::RecentPricesRequest;
17use crate::model::requests::{
18 AddToWatchlistRequest, CloseCostsRequest, CreateWatchlistRequest, EditCostsRequest,
19 OpenCostsRequest, UpdateWorkingOrderRequest,
20};
21use crate::model::requests::{
22 ClosePositionRequest, CreateOrderRequest, CreateWorkingOrderRequest, UpdatePositionRequest,
23};
24use crate::model::responses::{
25 AccountPreferencesResponse, ApplicationDetailsResponse, CategoriesResponse,
26 CategoryInstrumentsResponse, ClientSentimentResponse, CostsHistoryResponse,
27 CreateWatchlistResponse, DBEntryResponse, DurableMediumResponse, HistoricalPricesResponse,
28 IndicativeCostsResponse, MarketNavigationResponse, MarketSearchResponse, MarketSentiment,
29 MultipleMarketDetailsResponse, SinglePositionResponse, StatusResponse,
30 WatchlistMarketsResponse, WatchlistsResponse,
31};
32use crate::model::responses::{
33 ClosePositionResponse, CreateOrderResponse, CreateWorkingOrderResponse, UpdatePositionResponse,
34};
35use crate::model::streaming::{
36 StreamingAccountDataField, StreamingChartField, StreamingMarketField, StreamingPriceField,
37 get_streaming_account_data_fields, get_streaming_chart_fields, get_streaming_market_fields,
38 get_streaming_price_fields,
39};
40use crate::prelude::{
41 AccountActivityResponse, AccountFields, AccountsResponse, ChartData, ChartScale,
42 OrderConfirmationResponse, PositionsResponse, TradeFields, TransactionHistoryResponse,
43 WorkingOrdersResponse,
44};
45use crate::presentation::market::{MarketData, MarketDetails};
46use crate::presentation::price::PriceData;
47use async_trait::async_trait;
48use lightstreamer_rs::client::{LightstreamerClient, LogType, Transport};
49use lightstreamer_rs::subscription::{
50 ChannelSubscriptionListener, Snapshot, Subscription, SubscriptionMode,
51};
52use lightstreamer_rs::utils::setup_signal_hook;
53use serde_json::Value;
54use std::collections::HashSet;
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::sync::{Mutex, Notify, RwLock, mpsc};
58use tokio::time::sleep;
59use tracing::{debug, error, info, warn};
60
61const MAX_CONNECTION_ATTEMPTS: u64 = 3;
62
63pub struct Client {
68 http_client: Arc<HttpClient>,
69}
70
71impl Client {
72 pub fn new() -> Self {
77 let http_client = Arc::new(HttpClient::default());
78 Self { http_client }
79 }
80
81 pub async fn get_ws_info(&self) -> WebsocketInfo {
86 self.http_client.get_ws_info().await
87 }
88}
89
90impl Default for Client {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96#[async_trait]
97impl MarketService for Client {
98 async fn search_markets(&self, search_term: &str) -> Result<MarketSearchResponse, AppError> {
99 let path = format!("markets?searchTerm={}", search_term);
100 info!("Searching markets with term: {}", search_term);
101 let result: MarketSearchResponse = self.http_client.get(&path, Some(1)).await?;
102 debug!("{} markets found", result.markets.len());
103 Ok(result)
104 }
105
106 async fn get_market_details(&self, epic: &str) -> Result<MarketDetails, AppError> {
107 let path = format!("markets/{epic}");
108 info!("Getting market details: {}", epic);
109 let market_value: Value = self.http_client.get(&path, Some(3)).await?;
110 let market_details: MarketDetails = serde_json::from_value(market_value)?;
111 debug!("Market details obtained for: {}", epic);
112 Ok(market_details)
113 }
114
115 async fn get_multiple_market_details(
116 &self,
117 epics: &[String],
118 ) -> Result<MultipleMarketDetailsResponse, AppError> {
119 if epics.is_empty() {
120 return Ok(MultipleMarketDetailsResponse::default());
121 } else if epics.len() > 50 {
122 return Err(AppError::InvalidInput(
123 "The maximum number of EPICs is 50".to_string(),
124 ));
125 }
126
127 let epics_str = epics.join(",");
128 let path = format!("markets?epics={}", epics_str);
129 debug!(
130 "Getting market details for {} EPICs in a batch",
131 epics.len()
132 );
133
134 let response: MultipleMarketDetailsResponse = self.http_client.get(&path, Some(2)).await?;
135
136 Ok(response)
137 }
138
139 async fn get_historical_prices(
140 &self,
141 epic: &str,
142 resolution: &str,
143 from: &str,
144 to: &str,
145 ) -> Result<HistoricalPricesResponse, AppError> {
146 let path = format!(
147 "prices/{}?resolution={}&from={}&to={}",
148 epic, resolution, from, to
149 );
150 info!("Getting historical prices for: {}", epic);
151 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
152 debug!("Historical prices obtained for: {}", epic);
153 Ok(result)
154 }
155
156 async fn get_historical_prices_by_date_range(
157 &self,
158 epic: &str,
159 resolution: &str,
160 start_date: &str,
161 end_date: &str,
162 ) -> Result<HistoricalPricesResponse, AppError> {
163 let path = format!("prices/{}/{}/{}/{}", epic, resolution, start_date, end_date);
164 info!(
165 "Getting historical prices for epic: {}, resolution: {}, from: {} to: {}",
166 epic, resolution, start_date, end_date
167 );
168 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
169 debug!(
170 "Historical prices obtained for epic: {}, {} data points",
171 epic,
172 result.prices.len()
173 );
174 Ok(result)
175 }
176
177 async fn get_recent_prices(
178 &self,
179 params: &RecentPricesRequest<'_>,
180 ) -> Result<HistoricalPricesResponse, AppError> {
181 let mut query_params = Vec::new();
182
183 if let Some(res) = params.resolution {
184 query_params.push(format!("resolution={}", res));
185 }
186 if let Some(f) = params.from {
187 query_params.push(format!("from={}", f));
188 }
189 if let Some(t) = params.to {
190 query_params.push(format!("to={}", t));
191 }
192 if let Some(max) = params.max_points {
193 query_params.push(format!("max={}", max));
194 }
195 if let Some(size) = params.page_size {
196 query_params.push(format!("pageSize={}", size));
197 }
198 if let Some(num) = params.page_number {
199 query_params.push(format!("pageNumber={}", num));
200 }
201
202 let query_string = if query_params.is_empty() {
203 String::new()
204 } else {
205 format!("?{}", query_params.join("&"))
206 };
207
208 let path = format!("prices/{}{}", params.epic, query_string);
209 info!("Getting recent prices for epic: {}", params.epic);
210 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(3)).await?;
211 debug!(
212 "Recent prices obtained for epic: {}, {} data points",
213 params.epic,
214 result.prices.len()
215 );
216 Ok(result)
217 }
218
219 async fn get_historical_prices_by_count_v1(
220 &self,
221 epic: &str,
222 resolution: &str,
223 num_points: i32,
224 ) -> Result<HistoricalPricesResponse, AppError> {
225 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
226 info!(
227 "Getting historical prices (v1) for epic: {}, resolution: {}, points: {}",
228 epic, resolution, num_points
229 );
230 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(1)).await?;
231 debug!(
232 "Historical prices (v1) obtained for epic: {}, {} data points",
233 epic,
234 result.prices.len()
235 );
236 Ok(result)
237 }
238
239 async fn get_historical_prices_by_count_v2(
240 &self,
241 epic: &str,
242 resolution: &str,
243 num_points: i32,
244 ) -> Result<HistoricalPricesResponse, AppError> {
245 let path = format!("prices/{}/{}/{}", epic, resolution, num_points);
246 info!(
247 "Getting historical prices (v2) for epic: {}, resolution: {}, points: {}",
248 epic, resolution, num_points
249 );
250 let result: HistoricalPricesResponse = self.http_client.get(&path, Some(2)).await?;
251 debug!(
252 "Historical prices (v2) obtained for epic: {}, {} data points",
253 epic,
254 result.prices.len()
255 );
256 Ok(result)
257 }
258
259 async fn get_market_navigation(&self) -> Result<MarketNavigationResponse, AppError> {
260 let path = "marketnavigation";
261 info!("Getting top-level market navigation nodes");
262 let result: MarketNavigationResponse = self.http_client.get(path, Some(1)).await?;
263 debug!("{} navigation nodes found", result.nodes.len());
264 debug!("{} markets found at root level", result.markets.len());
265 Ok(result)
266 }
267
268 async fn get_market_navigation_node(
269 &self,
270 node_id: &str,
271 ) -> Result<MarketNavigationResponse, AppError> {
272 let path = format!("marketnavigation/{}", node_id);
273 info!("Getting market navigation node: {}", node_id);
274 let result: MarketNavigationResponse = self.http_client.get(&path, Some(1)).await?;
275 debug!("{} child nodes found", result.nodes.len());
276 debug!("{} markets found in node {}", result.markets.len(), node_id);
277 Ok(result)
278 }
279
280 async fn get_all_markets(&self) -> Result<Vec<MarketData>, AppError> {
281 let max_depth = 6;
282 info!(
283 "Starting comprehensive market hierarchy traversal (max {} levels)",
284 max_depth
285 );
286
287 let root_response = self.get_market_navigation().await?;
288 info!(
289 "Root navigation: {} nodes, {} markets at top level",
290 root_response.nodes.len(),
291 root_response.markets.len()
292 );
293
294 let mut all_markets = root_response.markets.clone();
295 let mut nodes_to_process = root_response.nodes.clone();
296 let mut processed_levels = 0;
297
298 while !nodes_to_process.is_empty() && processed_levels < max_depth {
299 let mut next_level_nodes = Vec::new();
300 let mut level_market_count = 0;
301
302 info!(
303 "Processing level {} with {} nodes",
304 processed_levels,
305 nodes_to_process.len()
306 );
307
308 for node in &nodes_to_process {
309 match self.get_market_navigation_node(&node.id).await {
310 Ok(node_response) => {
311 let node_markets = node_response.markets.len();
312 let node_children = node_response.nodes.len();
313
314 if node_markets > 0 || node_children > 0 {
315 debug!(
316 "Node '{}' (level {}): {} markets, {} child nodes",
317 node.name, processed_levels, node_markets, node_children
318 );
319 }
320
321 all_markets.extend(node_response.markets);
322 level_market_count += node_markets;
323 next_level_nodes.extend(node_response.nodes);
324 }
325 Err(e) => {
326 tracing::error!(
327 "Failed to get markets for node '{}' at level {}: {:?}",
328 node.name,
329 processed_levels,
330 e
331 );
332 }
333 }
334 }
335
336 info!(
337 "Level {} completed: {} markets found, {} nodes for next level",
338 processed_levels,
339 level_market_count,
340 next_level_nodes.len()
341 );
342
343 nodes_to_process = next_level_nodes;
344 processed_levels += 1;
345 }
346
347 info!(
348 "Market hierarchy traversal completed: {} total markets found across {} levels",
349 all_markets.len(),
350 processed_levels
351 );
352
353 Ok(all_markets)
354 }
355
356 async fn get_vec_db_entries(&self) -> Result<Vec<DBEntryResponse>, AppError> {
357 info!("Getting all markets from hierarchy for DB entries");
358
359 let all_markets = self.get_all_markets().await?;
360 info!("Collected {} markets from hierarchy", all_markets.len());
361
362 let mut vec_db_entries: Vec<DBEntryResponse> = all_markets
363 .iter()
364 .map(DBEntryResponse::from)
365 .filter(|entry| !entry.epic.is_empty())
366 .collect();
367
368 info!("Created {} DB entries from markets", vec_db_entries.len());
369
370 let unique_symbols: std::collections::HashSet<String> = vec_db_entries
372 .iter()
373 .map(|entry| entry.symbol.clone())
374 .filter(|symbol| !symbol.is_empty())
375 .collect();
376
377 info!(
378 "Found {} unique symbols to fetch expiry dates for",
379 unique_symbols.len()
380 );
381
382 let mut symbol_expiry_map: std::collections::HashMap<String, String> =
383 std::collections::HashMap::new();
384
385 for symbol in unique_symbols {
386 if let Some(entry) = vec_db_entries
387 .iter()
388 .find(|e| e.symbol == symbol && !e.epic.is_empty())
389 {
390 match self.get_market_details(&entry.epic).await {
391 Ok(market_details) => {
392 let expiry_date = market_details
393 .instrument
394 .expiry_details
395 .as_ref()
396 .map(|details| details.last_dealing_date.clone())
397 .unwrap_or_else(|| market_details.instrument.expiry.clone());
398
399 symbol_expiry_map.insert(symbol.clone(), expiry_date);
400 if let Some(expiry) = symbol_expiry_map.get(&symbol) {
401 info!("Fetched expiry date for symbol {}: {}", symbol, expiry);
402 }
403 }
404 Err(e) => {
405 tracing::error!(
406 "Failed to get market details for epic {} (symbol {}): {:?}",
407 entry.epic,
408 symbol,
409 e
410 );
411 symbol_expiry_map.insert(symbol.clone(), entry.expiry.clone());
412 }
413 }
414 }
415 }
416
417 for entry in &mut vec_db_entries {
418 if let Some(expiry_date) = symbol_expiry_map.get(&entry.symbol) {
419 entry.expiry = expiry_date.clone();
420 }
421 }
422
423 info!("Updated expiry dates for {} entries", vec_db_entries.len());
424 Ok(vec_db_entries)
425 }
426
427 async fn get_categories(&self) -> Result<CategoriesResponse, AppError> {
428 info!("Getting all categories of instruments");
429 let result: CategoriesResponse = self.http_client.get("categories", Some(1)).await?;
430 debug!("{} categories found", result.categories.len());
431 Ok(result)
432 }
433
434 async fn get_category_instruments(
435 &self,
436 category_id: &str,
437 page_number: Option<i32>,
438 page_size: Option<i32>,
439 ) -> Result<CategoryInstrumentsResponse, AppError> {
440 let mut path = format!("categories/{}/instruments", category_id);
441
442 let mut query_params = Vec::new();
443 if let Some(page) = page_number {
444 query_params.push(format!("pageNumber={}", page));
445 }
446 if let Some(size) = page_size {
447 if size > 1000 {
448 return Err(AppError::InvalidInput(
449 "pageSize cannot exceed 1000".to_string(),
450 ));
451 }
452 query_params.push(format!("pageSize={}", size));
453 }
454
455 if !query_params.is_empty() {
456 path = format!("{}?{}", path, query_params.join("&"));
457 }
458
459 info!(
460 "Getting instruments for category: {} (page: {:?}, size: {:?})",
461 category_id, page_number, page_size
462 );
463 let result: CategoryInstrumentsResponse = self.http_client.get(&path, Some(1)).await?;
464 debug!(
465 "{} instruments found in category {}",
466 result.instruments.len(),
467 category_id
468 );
469 Ok(result)
470 }
471}
472
473#[async_trait]
474impl AccountService for Client {
475 async fn get_accounts(&self) -> Result<AccountsResponse, AppError> {
476 info!("Getting account information");
477 let result: AccountsResponse = self.http_client.get("accounts", Some(1)).await?;
478 debug!(
479 "Account information obtained: {} accounts",
480 result.accounts.len()
481 );
482 Ok(result)
483 }
484
485 async fn get_positions(&self) -> Result<PositionsResponse, AppError> {
486 debug!("Getting open positions");
487 let result: PositionsResponse = self.http_client.get("positions", Some(2)).await?;
488 debug!("Positions obtained: {} positions", result.positions.len());
489 Ok(result)
490 }
491
492 async fn get_positions_w_filter(&self, filter: &str) -> Result<PositionsResponse, AppError> {
493 debug!("Getting open positions with filter: {}", filter);
494 let mut positions = self.get_positions().await?;
495
496 positions
497 .positions
498 .retain(|position| position.market.epic.contains(filter));
499
500 debug!(
501 "Positions obtained after filtering: {} positions",
502 positions.positions.len()
503 );
504 Ok(positions)
505 }
506
507 async fn get_working_orders(&self) -> Result<WorkingOrdersResponse, AppError> {
508 info!("Getting working orders");
509 let result: WorkingOrdersResponse = self.http_client.get("workingorders", Some(2)).await?;
510 debug!(
511 "Working orders obtained: {} orders",
512 result.working_orders.len()
513 );
514 Ok(result)
515 }
516
517 async fn get_activity(
518 &self,
519 from: &str,
520 to: &str,
521 ) -> Result<AccountActivityResponse, AppError> {
522 let path = format!("history/activity?from={}&to={}&pageSize=500", from, to);
523 info!("Getting account activity");
524 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
525 debug!(
526 "Account activity obtained: {} activities",
527 result.activities.len()
528 );
529 Ok(result)
530 }
531
532 async fn get_activity_with_details(
533 &self,
534 from: &str,
535 to: &str,
536 ) -> Result<AccountActivityResponse, AppError> {
537 let path = format!(
538 "history/activity?from={}&to={}&detailed=true&pageSize=500",
539 from, to
540 );
541 info!("Getting detailed account activity");
542 let result: AccountActivityResponse = self.http_client.get(&path, Some(3)).await?;
543 debug!(
544 "Detailed account activity obtained: {} activities",
545 result.activities.len()
546 );
547 Ok(result)
548 }
549
550 async fn get_transactions(
551 &self,
552 from: &str,
553 to: &str,
554 ) -> Result<TransactionHistoryResponse, AppError> {
555 const PAGE_SIZE: u32 = 200;
556 let mut all_transactions = Vec::new();
557 let mut current_page = 1;
558 #[allow(unused_assignments)]
559 let mut last_metadata = None;
560
561 loop {
562 let path = format!(
563 "history/transactions?from={}&to={}&pageSize={}&pageNumber={}",
564 from, to, PAGE_SIZE, current_page
565 );
566 info!("Getting transaction history page {}", current_page);
567
568 let result: TransactionHistoryResponse = self.http_client.get(&path, Some(2)).await?;
569
570 let total_pages = result.metadata.page_data.total_pages as u32;
571 last_metadata = Some(result.metadata);
572 all_transactions.extend(result.transactions);
573
574 if current_page >= total_pages {
575 break;
576 }
577 current_page += 1;
578 }
579
580 debug!(
581 "Total transaction history obtained: {} transactions",
582 all_transactions.len()
583 );
584
585 Ok(TransactionHistoryResponse {
586 transactions: all_transactions,
587 metadata: last_metadata
588 .ok_or_else(|| AppError::InvalidInput("Could not retrieve metadata".to_string()))?,
589 })
590 }
591
592 async fn get_preferences(&self) -> Result<AccountPreferencesResponse, AppError> {
593 info!("Getting account preferences");
594 let result: AccountPreferencesResponse = self
595 .http_client
596 .get("accounts/preferences", Some(1))
597 .await?;
598 debug!(
599 "Account preferences obtained: trailing_stops_enabled={}",
600 result.trailing_stops_enabled
601 );
602 Ok(result)
603 }
604
605 async fn update_preferences(&self, trailing_stops_enabled: bool) -> Result<(), AppError> {
606 info!(
607 "Updating account preferences: trailing_stops_enabled={}",
608 trailing_stops_enabled
609 );
610 let request = serde_json::json!({
611 "trailingStopsEnabled": trailing_stops_enabled
612 });
613 let _: serde_json::Value = self
614 .http_client
615 .put("accounts/preferences", &request, Some(1))
616 .await?;
617 debug!("Account preferences updated");
618 Ok(())
619 }
620
621 async fn get_activity_by_period(
622 &self,
623 period_ms: u64,
624 ) -> Result<AccountActivityResponse, AppError> {
625 let path = format!("history/activity/{}", period_ms);
626 info!("Getting account activity for period: {} ms", period_ms);
627 let result: AccountActivityResponse = self.http_client.get(&path, Some(1)).await?;
628 debug!(
629 "Account activity obtained: {} activities",
630 result.activities.len()
631 );
632 Ok(result)
633 }
634}
635
636#[async_trait]
637impl OrderService for Client {
638 async fn create_order(
639 &self,
640 order: &CreateOrderRequest,
641 ) -> Result<CreateOrderResponse, AppError> {
642 info!("Creating order for: {}", order.epic);
643 let result: CreateOrderResponse = self
644 .http_client
645 .post("positions/otc", order, Some(2))
646 .await?;
647 debug!("Order created with reference: {}", result.deal_reference);
648 Ok(result)
649 }
650
651 async fn get_order_confirmation(
652 &self,
653 deal_reference: &str,
654 ) -> Result<OrderConfirmationResponse, AppError> {
655 let path = format!("confirms/{}", deal_reference);
656 info!("Getting confirmation for order: {}", deal_reference);
657 let result: OrderConfirmationResponse = self.http_client.get(&path, Some(1)).await?;
658 debug!("Confirmation obtained for order: {}", deal_reference);
659 Ok(result)
660 }
661
662 async fn get_order_confirmation_w_retry(
663 &self,
664 deal_reference: &str,
665 retries: u64,
666 delay_ms: u64,
667 ) -> Result<OrderConfirmationResponse, AppError> {
668 let mut attempts = 0;
669 loop {
670 match self.get_order_confirmation(deal_reference).await {
671 Ok(response) => return Ok(response),
672 Err(e) => {
673 attempts += 1;
674 if attempts > retries {
675 return Err(e);
676 }
677 warn!(
678 "Failed to get order confirmation (attempt {}/{}): {}. Retrying in {} ms...",
679 attempts, retries, e, delay_ms
680 );
681 sleep(Duration::from_millis(delay_ms)).await;
682 }
683 }
684 }
685 }
686
687 async fn update_position(
688 &self,
689 deal_id: &str,
690 update: &UpdatePositionRequest,
691 ) -> Result<UpdatePositionResponse, AppError> {
692 let path = format!("positions/otc/{}", deal_id);
693 info!("Updating position: {}", deal_id);
694 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
695 debug!(
696 "Position updated: {} with deal reference: {}",
697 deal_id, result.deal_reference
698 );
699 Ok(result)
700 }
701
702 async fn update_level_in_position(
703 &self,
704 deal_id: &str,
705 limit_level: Option<f64>,
706 ) -> Result<UpdatePositionResponse, AppError> {
707 let path = format!("positions/otc/{}", deal_id);
708 info!("Updating position: {}", deal_id);
709 let limit_level = limit_level.unwrap_or(0.0);
710
711 let update: UpdatePositionRequest = UpdatePositionRequest {
712 guaranteed_stop: None,
713 limit_level: Some(limit_level),
714 stop_level: None,
715 trailing_stop: None,
716 trailing_stop_distance: None,
717 trailing_stop_increment: None,
718 };
719 let result: UpdatePositionResponse = self.http_client.put(&path, update, Some(2)).await?;
720 debug!(
721 "Position updated: {} with deal reference: {}",
722 deal_id, result.deal_reference
723 );
724 Ok(result)
725 }
726
727 async fn close_position(
728 &self,
729 close_request: &ClosePositionRequest,
730 ) -> Result<ClosePositionResponse, AppError> {
731 info!("Closing position");
732
733 let result: ClosePositionResponse = self
736 .http_client
737 .post_with_delete_method("positions/otc", close_request, Some(1))
738 .await?;
739
740 debug!("Position closed with reference: {}", result.deal_reference);
741 Ok(result)
742 }
743
744 async fn create_working_order(
745 &self,
746 order: &CreateWorkingOrderRequest,
747 ) -> Result<CreateWorkingOrderResponse, AppError> {
748 info!("Creating working order for: {}", order.epic);
749 let result: CreateWorkingOrderResponse = self
750 .http_client
751 .post("workingorders/otc", order, Some(2))
752 .await?;
753 debug!(
754 "Working order created with reference: {}",
755 result.deal_reference
756 );
757 Ok(result)
758 }
759
760 async fn delete_working_order(&self, deal_id: &str) -> Result<(), AppError> {
761 let path = format!("workingorders/otc/{}", deal_id);
762 let result: CreateWorkingOrderResponse =
763 self.http_client.delete(path.as_str(), Some(2)).await?;
764 debug!(
765 "Working order created with reference: {}",
766 result.deal_reference
767 );
768 Ok(())
769 }
770
771 async fn get_position(&self, deal_id: &str) -> Result<SinglePositionResponse, AppError> {
772 let path = format!("positions/{}", deal_id);
773 info!("Getting position: {}", deal_id);
774 let result: SinglePositionResponse = self.http_client.get(&path, Some(2)).await?;
775 debug!("Position obtained for deal: {}", deal_id);
776 Ok(result)
777 }
778
779 async fn update_working_order(
780 &self,
781 deal_id: &str,
782 update: &UpdateWorkingOrderRequest,
783 ) -> Result<CreateWorkingOrderResponse, AppError> {
784 let path = format!("workingorders/otc/{}", deal_id);
785 info!("Updating working order: {}", deal_id);
786 let result: CreateWorkingOrderResponse =
787 self.http_client.put(&path, update, Some(2)).await?;
788 debug!(
789 "Working order updated: {} with reference: {}",
790 deal_id, result.deal_reference
791 );
792 Ok(result)
793 }
794}
795
796#[async_trait]
801impl WatchlistService for Client {
802 async fn get_watchlists(&self) -> Result<WatchlistsResponse, AppError> {
803 info!("Getting all watchlists");
804 let result: WatchlistsResponse = self.http_client.get("watchlists", Some(1)).await?;
805 debug!(
806 "Watchlists obtained: {} watchlists",
807 result.watchlists.len()
808 );
809 Ok(result)
810 }
811
812 async fn create_watchlist(
813 &self,
814 name: &str,
815 epics: Option<&[String]>,
816 ) -> Result<CreateWatchlistResponse, AppError> {
817 info!("Creating watchlist: {}", name);
818 let request = CreateWatchlistRequest {
819 name: name.to_string(),
820 epics: epics.map(|e| e.to_vec()),
821 };
822 let result: CreateWatchlistResponse = self
823 .http_client
824 .post("watchlists", &request, Some(1))
825 .await?;
826 debug!(
827 "Watchlist created: {} with ID: {}",
828 name, result.watchlist_id
829 );
830 Ok(result)
831 }
832
833 async fn get_watchlist(
834 &self,
835 watchlist_id: &str,
836 ) -> Result<WatchlistMarketsResponse, AppError> {
837 let path = format!("watchlists/{}", watchlist_id);
838 info!("Getting watchlist: {}", watchlist_id);
839 let result: WatchlistMarketsResponse = self.http_client.get(&path, Some(1)).await?;
840 debug!(
841 "Watchlist obtained: {} with {} markets",
842 watchlist_id,
843 result.markets.len()
844 );
845 Ok(result)
846 }
847
848 async fn delete_watchlist(&self, watchlist_id: &str) -> Result<StatusResponse, AppError> {
849 let path = format!("watchlists/{}", watchlist_id);
850 info!("Deleting watchlist: {}", watchlist_id);
851 let result: StatusResponse = self.http_client.delete(&path, Some(1)).await?;
852 debug!("Watchlist deleted: {}", watchlist_id);
853 Ok(result)
854 }
855
856 async fn add_to_watchlist(
857 &self,
858 watchlist_id: &str,
859 epic: &str,
860 ) -> Result<StatusResponse, AppError> {
861 let path = format!("watchlists/{}", watchlist_id);
862 info!("Adding {} to watchlist: {}", epic, watchlist_id);
863 let request = AddToWatchlistRequest {
864 epic: epic.to_string(),
865 };
866 let result: StatusResponse = self.http_client.put(&path, &request, Some(1)).await?;
867 debug!("Added {} to watchlist: {}", epic, watchlist_id);
868 Ok(result)
869 }
870
871 async fn remove_from_watchlist(
872 &self,
873 watchlist_id: &str,
874 epic: &str,
875 ) -> Result<StatusResponse, AppError> {
876 let path = format!("watchlists/{}/{}", watchlist_id, epic);
877 info!("Removing {} from watchlist: {}", epic, watchlist_id);
878 let result: StatusResponse = self.http_client.delete(&path, Some(1)).await?;
879 debug!("Removed {} from watchlist: {}", epic, watchlist_id);
880 Ok(result)
881 }
882}
883
884#[async_trait]
889impl SentimentService for Client {
890 async fn get_client_sentiment(
891 &self,
892 market_ids: &[String],
893 ) -> Result<ClientSentimentResponse, AppError> {
894 let market_ids_str = market_ids.join(",");
895 let path = format!("clientsentiment?marketIds={}", market_ids_str);
896 info!("Getting client sentiment for {} markets", market_ids.len());
897 let result: ClientSentimentResponse = self.http_client.get(&path, Some(1)).await?;
898 debug!(
899 "Client sentiment obtained for {} markets",
900 result.client_sentiments.len()
901 );
902 Ok(result)
903 }
904
905 async fn get_client_sentiment_by_market(
906 &self,
907 market_id: &str,
908 ) -> Result<MarketSentiment, AppError> {
909 let path = format!("clientsentiment/{}", market_id);
910 info!("Getting client sentiment for market: {}", market_id);
911 let result: MarketSentiment = self.http_client.get(&path, Some(1)).await?;
912 debug!(
913 "Client sentiment for {}: {}% long, {}% short",
914 market_id, result.long_position_percentage, result.short_position_percentage
915 );
916 Ok(result)
917 }
918
919 async fn get_related_sentiment(
920 &self,
921 market_id: &str,
922 ) -> Result<ClientSentimentResponse, AppError> {
923 let path = format!("clientsentiment/related/{}", market_id);
924 info!("Getting related sentiment for market: {}", market_id);
925 let result: ClientSentimentResponse = self.http_client.get(&path, Some(1)).await?;
926 debug!(
927 "Related sentiment obtained: {} markets",
928 result.client_sentiments.len()
929 );
930 Ok(result)
931 }
932}
933
934#[async_trait]
939impl CostsService for Client {
940 async fn get_indicative_costs_open(
941 &self,
942 request: &OpenCostsRequest,
943 ) -> Result<IndicativeCostsResponse, AppError> {
944 info!(
945 "Getting indicative costs for opening position on: {}",
946 request.epic
947 );
948 let result: IndicativeCostsResponse = self
949 .http_client
950 .post("indicativecostsandcharges/open", request, Some(1))
951 .await?;
952 debug!(
953 "Indicative costs obtained, reference: {}",
954 result.indicative_quote_reference
955 );
956 Ok(result)
957 }
958
959 async fn get_indicative_costs_close(
960 &self,
961 request: &CloseCostsRequest,
962 ) -> Result<IndicativeCostsResponse, AppError> {
963 info!(
964 "Getting indicative costs for closing position: {}",
965 request.deal_id
966 );
967 let result: IndicativeCostsResponse = self
968 .http_client
969 .post("indicativecostsandcharges/close", request, Some(1))
970 .await?;
971 debug!(
972 "Indicative costs obtained, reference: {}",
973 result.indicative_quote_reference
974 );
975 Ok(result)
976 }
977
978 async fn get_indicative_costs_edit(
979 &self,
980 request: &EditCostsRequest,
981 ) -> Result<IndicativeCostsResponse, AppError> {
982 info!(
983 "Getting indicative costs for editing position: {}",
984 request.deal_id
985 );
986 let result: IndicativeCostsResponse = self
987 .http_client
988 .post("indicativecostsandcharges/edit", request, Some(1))
989 .await?;
990 debug!(
991 "Indicative costs obtained, reference: {}",
992 result.indicative_quote_reference
993 );
994 Ok(result)
995 }
996
997 async fn get_costs_history(
998 &self,
999 from: &str,
1000 to: &str,
1001 ) -> Result<CostsHistoryResponse, AppError> {
1002 let path = format!("indicativecostsandcharges/history/from/{}/to/{}", from, to);
1003 info!("Getting costs history from {} to {}", from, to);
1004 let result: CostsHistoryResponse = self.http_client.get(&path, Some(1)).await?;
1005 debug!("Costs history obtained: {} entries", result.costs.len());
1006 Ok(result)
1007 }
1008
1009 async fn get_durable_medium(
1010 &self,
1011 quote_reference: &str,
1012 ) -> Result<DurableMediumResponse, AppError> {
1013 let path = format!(
1014 "indicativecostsandcharges/durablemedium/{}",
1015 quote_reference
1016 );
1017 info!("Getting durable medium for reference: {}", quote_reference);
1018 let result: DurableMediumResponse = self.http_client.get(&path, Some(1)).await?;
1019 debug!("Durable medium obtained for reference: {}", quote_reference);
1020 Ok(result)
1021 }
1022}
1023
1024#[async_trait]
1029impl OperationsService for Client {
1030 async fn get_client_apps(&self) -> Result<ApplicationDetailsResponse, AppError> {
1031 info!("Getting client applications");
1032 let result: ApplicationDetailsResponse = self
1033 .http_client
1034 .get("operations/application", Some(1))
1035 .await?;
1036 debug!("Client application obtained: {}", result.api_key);
1037 Ok(result)
1038 }
1039
1040 async fn disable_client_app(&self) -> Result<StatusResponse, AppError> {
1041 info!("Disabling current client application");
1042 let result: StatusResponse = self
1043 .http_client
1044 .put(
1045 "operations/application/disable",
1046 &serde_json::json!({}),
1047 Some(1),
1048 )
1049 .await?;
1050 debug!("Client application disabled");
1051 Ok(result)
1052 }
1053}
1054
1055pub struct StreamerClient {
1065 account_id: String,
1066 market_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
1067 price_streamer_client: Option<Arc<Mutex<LightstreamerClient>>>,
1068 has_market_stream_subs: bool,
1070 has_price_stream_subs: bool,
1071}
1072
1073impl StreamerClient {
1074 pub async fn new() -> Result<Self, AppError> {
1083 let http_client_raw = Arc::new(RwLock::new(Client::new()));
1084 let http_client = http_client_raw.read().await;
1085 let ws_info = http_client.get_ws_info().await;
1086 let password = ws_info.get_ws_password();
1087
1088 let market_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
1090 Some(ws_info.server.as_str()),
1091 None,
1092 Some(&ws_info.account_id),
1093 Some(&password),
1094 )?));
1095
1096 let price_streamer_client = Arc::new(Mutex::new(LightstreamerClient::new(
1097 Some(ws_info.server.as_str()),
1098 None,
1099 Some(&ws_info.account_id),
1100 Some(&password),
1101 )?));
1102
1103 {
1106 let mut client = market_streamer_client.lock().await;
1107 client
1108 .connection_options
1109 .set_forced_transport(Some(Transport::WsStreaming));
1110 client.set_logging_type(LogType::TracingLogs);
1111 }
1112 {
1113 let mut client = price_streamer_client.lock().await;
1114 client
1115 .connection_options
1116 .set_forced_transport(Some(Transport::WsStreaming));
1117 client.set_logging_type(LogType::TracingLogs);
1118 }
1119
1120 Ok(Self {
1121 account_id: ws_info.account_id.clone(),
1122 market_streamer_client: Some(market_streamer_client),
1123 price_streamer_client: Some(price_streamer_client),
1124 has_market_stream_subs: false,
1125 has_price_stream_subs: false,
1126 })
1127 }
1128
1129 pub async fn default() -> Result<Self, AppError> {
1131 Self::new().await
1132 }
1133
1134 pub async fn market_subscribe(
1164 &mut self,
1165 epics: Vec<String>,
1166 fields: HashSet<StreamingMarketField>,
1167 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1168 self.has_market_stream_subs = true;
1170
1171 let fields = get_streaming_market_fields(&fields);
1172 let market_epics: Vec<String> = epics
1173 .iter()
1174 .map(|epic| "MARKET:".to_string() + epic)
1175 .collect();
1176 let mut subscription =
1177 Subscription::new(SubscriptionMode::Merge, Some(market_epics), Some(fields))?;
1178
1179 subscription.set_data_adapter(None)?;
1180 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1181
1182 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1184 subscription.add_listener(Box::new(listener));
1185
1186 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1188 AppError::WebSocketError("market streamer client not initialized".to_string())
1189 })?;
1190
1191 {
1192 let mut client = client.lock().await;
1193 client
1194 .connection_options
1195 .set_forced_transport(Some(Transport::WsStreaming));
1196 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1197 .await?;
1198 }
1199
1200 let (price_tx, price_rx) = mpsc::unbounded_channel();
1202 tokio::spawn(async move {
1203 let mut receiver = item_receiver;
1204 while let Some(item_update) = receiver.recv().await {
1205 let price_data = PriceData::from(&item_update);
1206 if price_tx.send(price_data).is_err() {
1207 tracing::debug!("Price channel receiver dropped");
1208 break;
1209 }
1210 }
1211 });
1212
1213 info!(
1214 "Market subscription created for {} instruments",
1215 epics.len()
1216 );
1217 Ok(price_rx)
1218 }
1219
1220 pub async fn trade_subscribe(
1243 &mut self,
1244 ) -> Result<mpsc::UnboundedReceiver<TradeFields>, AppError> {
1245 self.has_market_stream_subs = true;
1247
1248 let account_id = self.account_id.clone();
1249 let fields = Some(vec![
1250 "CONFIRMS".to_string(),
1251 "OPU".to_string(),
1252 "WOU".to_string(),
1253 ]);
1254 let trade_items = vec![format!("TRADE:{account_id}")];
1255
1256 let mut subscription =
1257 Subscription::new(SubscriptionMode::Distinct, Some(trade_items), fields)?;
1258
1259 subscription.set_data_adapter(None)?;
1260 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1261
1262 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1264 subscription.add_listener(Box::new(listener));
1265
1266 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1268 AppError::WebSocketError("market streamer client not initialized".to_string())
1269 })?;
1270
1271 {
1272 let mut client = client.lock().await;
1273 client
1274 .connection_options
1275 .set_forced_transport(Some(Transport::WsStreaming));
1276 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1277 .await?;
1278 }
1279
1280 let (trade_tx, trade_rx) = mpsc::unbounded_channel();
1282 tokio::spawn(async move {
1283 let mut receiver = item_receiver;
1284 while let Some(item_update) = receiver.recv().await {
1285 let trade_data = crate::presentation::trade::TradeData::from(&item_update);
1286 if trade_tx.send(trade_data.fields).is_err() {
1287 tracing::debug!("Trade channel receiver dropped");
1288 break;
1289 }
1290 }
1291 });
1292
1293 info!("Trade subscription created for account: {}", account_id);
1294 Ok(trade_rx)
1295 }
1296
1297 pub async fn account_subscribe(
1324 &mut self,
1325 fields: HashSet<StreamingAccountDataField>,
1326 ) -> Result<mpsc::UnboundedReceiver<AccountFields>, AppError> {
1327 self.has_market_stream_subs = true;
1329
1330 let fields = get_streaming_account_data_fields(&fields);
1331 let account_id = self.account_id.clone();
1332 let account_items = vec![format!("ACCOUNT:{account_id}")];
1333
1334 let mut subscription =
1335 Subscription::new(SubscriptionMode::Merge, Some(account_items), Some(fields))?;
1336
1337 subscription.set_data_adapter(None)?;
1338 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1339
1340 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1342 subscription.add_listener(Box::new(listener));
1343
1344 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1346 AppError::WebSocketError("market streamer client not initialized".to_string())
1347 })?;
1348
1349 {
1350 let mut client = client.lock().await;
1351 client
1352 .connection_options
1353 .set_forced_transport(Some(Transport::WsStreaming));
1354 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1355 .await?;
1356 }
1357
1358 let (account_tx, account_rx) = mpsc::unbounded_channel();
1360 tokio::spawn(async move {
1361 let mut receiver = item_receiver;
1362 while let Some(item_update) = receiver.recv().await {
1363 let account_data = crate::presentation::account::AccountData::from(&item_update);
1364 if account_tx.send(account_data.fields).is_err() {
1365 tracing::debug!("Account channel receiver dropped");
1366 break;
1367 }
1368 }
1369 });
1370
1371 info!("Account subscription created for account: {}", account_id);
1372 Ok(account_rx)
1373 }
1374
1375 pub async fn price_subscribe(
1406 &mut self,
1407 epics: Vec<String>,
1408 fields: HashSet<StreamingPriceField>,
1409 ) -> Result<mpsc::UnboundedReceiver<PriceData>, AppError> {
1410 self.has_price_stream_subs = true;
1412
1413 let fields = get_streaming_price_fields(&fields);
1414 let account_id = self.account_id.clone();
1415 let price_epics: Vec<String> = epics
1416 .iter()
1417 .map(|epic| format!("PRICE:{account_id}:{epic}"))
1418 .collect();
1419
1420 tracing::debug!("Pricing subscribe items: {:?}", price_epics);
1422 tracing::debug!("Pricing subscribe fields: {:?}", fields);
1423
1424 let mut subscription =
1425 Subscription::new(SubscriptionMode::Merge, Some(price_epics), Some(fields))?;
1426
1427 let pricing_adapter =
1429 std::env::var("IG_PRICING_ADAPTER").unwrap_or_else(|_| "Pricing".to_string());
1430 tracing::debug!("Using Pricing data adapter: {}", pricing_adapter);
1431 subscription.set_data_adapter(Some(pricing_adapter))?;
1432 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1433
1434 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1436 subscription.add_listener(Box::new(listener));
1437
1438 let client = self.price_streamer_client.as_ref().ok_or_else(|| {
1440 AppError::WebSocketError("price streamer client not initialized".to_string())
1441 })?;
1442
1443 {
1444 let mut client = client.lock().await;
1445 client
1446 .connection_options
1447 .set_forced_transport(Some(Transport::WsStreaming));
1448 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1449 .await?;
1450 }
1451
1452 let (price_tx, price_rx) = mpsc::unbounded_channel();
1454 tokio::spawn(async move {
1455 let mut receiver = item_receiver;
1456 while let Some(item_update) = receiver.recv().await {
1457 let price_data = PriceData::from(&item_update);
1458 if price_tx.send(price_data).is_err() {
1459 tracing::debug!("Price channel receiver dropped");
1460 break;
1461 }
1462 }
1463 });
1464
1465 info!(
1466 "Price subscription created for {} instruments (account: {})",
1467 epics.len(),
1468 account_id
1469 );
1470 Ok(price_rx)
1471 }
1472
1473 pub async fn chart_subscribe(
1506 &mut self,
1507 epics: Vec<String>,
1508 scale: ChartScale,
1509 fields: HashSet<StreamingChartField>,
1510 ) -> Result<mpsc::UnboundedReceiver<ChartData>, AppError> {
1511 self.has_market_stream_subs = true;
1513
1514 let fields = get_streaming_chart_fields(&fields);
1515
1516 let chart_items: Vec<String> = epics
1517 .iter()
1518 .map(|epic| format!("CHART:{epic}:{scale}",))
1519 .collect();
1520
1521 let mode = if matches!(scale, ChartScale::Tick) {
1523 SubscriptionMode::Distinct
1524 } else {
1525 SubscriptionMode::Merge
1526 };
1527
1528 let mut subscription = Subscription::new(mode, Some(chart_items), Some(fields))?;
1529
1530 subscription.set_data_adapter(None)?;
1531 subscription.set_requested_snapshot(Some(Snapshot::Yes))?;
1532
1533 let (listener, item_receiver) = ChannelSubscriptionListener::create_channel();
1535 subscription.add_listener(Box::new(listener));
1536
1537 let client = self.market_streamer_client.as_ref().ok_or_else(|| {
1539 AppError::WebSocketError("market streamer client not initialized".to_string())
1540 })?;
1541
1542 {
1543 let mut client = client.lock().await;
1544 client
1545 .connection_options
1546 .set_forced_transport(Some(Transport::WsStreaming));
1547 LightstreamerClient::subscribe(client.subscription_sender.clone(), subscription)
1548 .await?;
1549 }
1550
1551 let (chart_tx, chart_rx) = mpsc::unbounded_channel();
1553 tokio::spawn(async move {
1554 let mut receiver = item_receiver;
1555 while let Some(item_update) = receiver.recv().await {
1556 let chart_data = ChartData::from(&item_update);
1557 if chart_tx.send(chart_data).is_err() {
1558 tracing::debug!("Chart channel receiver dropped");
1559 break;
1560 }
1561 }
1562 });
1563
1564 info!(
1565 "Chart subscription created for {} instruments (scale: {})",
1566 epics.len(),
1567 scale
1568 );
1569
1570 Ok(chart_rx)
1571 }
1572
1573 pub async fn connect(&mut self, shutdown_signal: Option<Arc<Notify>>) -> Result<(), AppError> {
1590 let signal = if let Some(sig) = shutdown_signal {
1592 sig
1593 } else {
1594 let sig = Arc::new(Notify::new());
1595 setup_signal_hook(Arc::clone(&sig)).await;
1596 sig
1597 };
1598
1599 let mut tasks = Vec::new();
1600
1601 if self.has_market_stream_subs {
1603 if let Some(client) = self.market_streamer_client.as_ref() {
1604 let client = Arc::clone(client);
1605 let signal = Arc::clone(&signal);
1606 let task =
1607 tokio::spawn(
1608 async move { Self::connect_client(client, signal, "Market").await },
1609 );
1610 tasks.push(task);
1611 }
1612 } else {
1613 info!("Skipping Market streamer connection: no active subscriptions");
1614 }
1615
1616 if self.has_price_stream_subs {
1618 if let Some(client) = self.price_streamer_client.as_ref() {
1619 let client = Arc::clone(client);
1620 let signal = Arc::clone(&signal);
1621 let task =
1622 tokio::spawn(
1623 async move { Self::connect_client(client, signal, "Price").await },
1624 );
1625 tasks.push(task);
1626 }
1627 } else {
1628 info!("Skipping Price streamer connection: no active subscriptions");
1629 }
1630
1631 if tasks.is_empty() {
1632 warn!("No streaming clients selected for connection (no active subscriptions)");
1633 return Ok(());
1634 }
1635
1636 info!("Connecting {} streaming client(s)...", tasks.len());
1637
1638 let results = futures::future::join_all(tasks).await;
1640
1641 let mut has_error = false;
1643 for (idx, result) in results.iter().enumerate() {
1644 match result {
1645 Ok(Ok(_)) => {
1646 debug!("Streaming client {} completed successfully", idx);
1647 }
1648 Ok(Err(e)) => {
1649 error!("Streaming client {} failed: {:?}", idx, e);
1650 has_error = true;
1651 }
1652 Err(e) => {
1653 error!("Streaming client {} task panicked: {:?}", idx, e);
1654 has_error = true;
1655 }
1656 }
1657 }
1658
1659 if has_error {
1660 return Err(AppError::WebSocketError(
1661 "one or more streaming connections failed".to_string(),
1662 ));
1663 }
1664
1665 info!("All streaming connections closed gracefully");
1666 Ok(())
1667 }
1668
1669 async fn connect_client(
1671 client: Arc<Mutex<LightstreamerClient>>,
1672 signal: Arc<Notify>,
1673 client_type: &str,
1674 ) -> Result<(), AppError> {
1675 let mut retry_interval_millis: u64 = 0;
1676 let mut retry_counter: u64 = 0;
1677
1678 while retry_counter < MAX_CONNECTION_ATTEMPTS {
1679 let connect_result = {
1680 let mut client = client.lock().await;
1681 client.connect_direct(Arc::clone(&signal)).await
1682 };
1683
1684 let result_with_string_error = connect_result.map_err(|e| format!("{:?}", e));
1686
1687 match result_with_string_error {
1688 Ok(_) => {
1689 info!("{} streamer connected successfully", client_type);
1690 break;
1691 }
1692 Err(error_msg) => {
1693 if error_msg.contains("No more requests to fulfill") {
1695 info!(
1696 "{} streamer closed gracefully: no active subscriptions (server reason: No more requests to fulfill)",
1697 client_type
1698 );
1699 return Ok(());
1700 }
1701
1702 error!("{} streamer connection failed: {}", client_type, error_msg);
1703
1704 if retry_counter < MAX_CONNECTION_ATTEMPTS - 1 {
1705 tokio::time::sleep(std::time::Duration::from_millis(retry_interval_millis))
1706 .await;
1707 retry_interval_millis =
1708 (retry_interval_millis + (200 * retry_counter)).min(5000);
1709 retry_counter += 1;
1710 warn!(
1711 "{} streamer retrying (attempt {}/{}) in {:.2} seconds...",
1712 client_type,
1713 retry_counter + 1,
1714 MAX_CONNECTION_ATTEMPTS,
1715 retry_interval_millis as f64 / 1000.0
1716 );
1717 } else {
1718 retry_counter += 1;
1719 }
1720 }
1721 }
1722 }
1723
1724 if retry_counter >= MAX_CONNECTION_ATTEMPTS {
1725 error!(
1726 "{} streamer failed after {} attempts",
1727 client_type, MAX_CONNECTION_ATTEMPTS
1728 );
1729 return Err(AppError::WebSocketError(format!(
1730 "{} streamer: maximum connection attempts ({}) exceeded",
1731 client_type, MAX_CONNECTION_ATTEMPTS
1732 )));
1733 }
1734
1735 info!("{} streamer connection closed gracefully", client_type);
1736 Ok(())
1737 }
1738
1739 pub async fn disconnect(&mut self) -> Result<(), AppError> {
1747 let mut disconnected = 0;
1748
1749 if let Some(client) = self.market_streamer_client.as_ref() {
1750 let mut client = client.lock().await;
1751 client.disconnect().await;
1752 info!("Market streamer disconnected");
1753 disconnected += 1;
1754 }
1755
1756 if let Some(client) = self.price_streamer_client.as_ref() {
1757 let mut client = client.lock().await;
1758 client.disconnect().await;
1759 info!("Price streamer disconnected");
1760 disconnected += 1;
1761 }
1762
1763 info!("Disconnected {} streaming client(s)", disconnected);
1764 Ok(())
1765 }
1766}