kiteticker_async_manager/manager/
connection_manager.rs

1//! # Connection Manager Module
2//!
3//! This module contains the main `KiteTickerManager` which provides high-performance
4//! multi-connection WebSocket management for the Kite Connect ticker API.
5//!
6//! ## Features
7//!
8//! - **Multi-Connection Support**: Utilizes up to 3 WebSocket connections
9//! - **Dynamic Load Balancing**: Automatic symbol distribution across connections
10//! - **High-Performance Processing**: Dedicated parser tasks per connection
11//! - **Dynamic Subscriptions**: Runtime symbol addition/removal without reconnection
12//! - **Health Monitoring**: Real-time connection health tracking
13//! - **Error Resilience**: Comprehensive error handling and recovery
14
15use crate::manager::{
16  ChannelId, ConnectionStats, HealthMonitor, HealthSummary, KiteManagerConfig,
17  ManagedConnection, ManagerStats, MessageProcessor, ProcessorStats,
18};
19use crate::models::{Mode, TickerMessage};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25/// High-performance multi-connection WebSocket manager for Kite ticker data
26///
27/// This manager creates 3 independent WebSocket connections and distributes symbols
28/// across them using round-robin allocation. Each connection has its own dedicated
29/// parser task for maximum performance.
30#[derive(Debug)]
31pub struct KiteTickerManager {
32  /// Manager configuration
33  config: KiteManagerConfig,
34
35  /// API credentials
36  api_key: String,
37  access_token: String,
38
39  /// WebSocket connections (up to 3)
40  connections: Vec<ManagedConnection>,
41
42  /// Message processors (one per connection)
43  processors: Vec<MessageProcessor>,
44
45  /// Output channels (one per connection)
46  output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48  /// Symbol to connection mapping
49  symbol_mapping: HashMap<u32, ChannelId>,
50
51  /// Health monitor
52  health_monitor: Option<HealthMonitor>,
53
54  /// Next connection index for round-robin distribution
55  next_connection_index: usize,
56
57  /// Manager start time for uptime tracking
58  #[allow(dead_code)]
59  start_time: Instant,
60  /// If true, underlying connections operate in raw-only mode (no tick parsing)
61  raw_only: bool,
62}
63
64/// Builder for `KiteTickerManager` providing a fluent API for configuration.
65#[derive(Debug, Clone)]
66pub struct KiteTickerManagerBuilder {
67  api_key: String,
68  access_token: String,
69  config: KiteManagerConfig,
70  raw_only: bool,
71}
72
73impl KiteTickerManagerBuilder {
74  /// Create a new builder with mandatory credentials and default config
75  pub fn new(
76    api_key: impl Into<String>,
77    access_token: impl Into<String>,
78  ) -> Self {
79    Self {
80      api_key: api_key.into(),
81      access_token: access_token.into(),
82      config: KiteManagerConfig::default(),
83      raw_only: false,
84    }
85  }
86
87  pub fn max_connections(mut self, n: usize) -> Self {
88    self.config.max_connections = n;
89    self
90  }
91  pub fn max_symbols_per_connection(mut self, n: usize) -> Self {
92    self.config.max_symbols_per_connection = n;
93    self
94  }
95  pub fn connection_timeout(mut self, d: std::time::Duration) -> Self {
96    self.config.connection_timeout = d;
97    self
98  }
99  pub fn health_check_interval(mut self, d: std::time::Duration) -> Self {
100    self.config.health_check_interval = d;
101    self
102  }
103  pub fn reconnect_attempts(mut self, attempts: usize) -> Self {
104    self.config.max_reconnect_attempts = attempts;
105    self
106  }
107  pub fn reconnect_delay(mut self, d: std::time::Duration) -> Self {
108    self.config.reconnect_delay = d;
109    self
110  }
111  pub fn enable_dedicated_parsers(mut self, enable: bool) -> Self {
112    self.config.enable_dedicated_parsers = enable;
113    self
114  }
115  pub fn default_mode(mut self, mode: Mode) -> Self {
116    self.config.default_mode = mode;
117    self
118  }
119  pub fn connection_buffer_size(mut self, sz: usize) -> Self {
120    self.config.connection_buffer_size = sz;
121    self
122  }
123  pub fn parser_buffer_size(mut self, sz: usize) -> Self {
124    self.config.parser_buffer_size = sz;
125    self
126  }
127  pub fn raw_only(mut self, raw: bool) -> Self {
128    self.raw_only = raw;
129    self
130  }
131
132  /// Override entire config (advanced)
133  pub fn config(mut self, config: KiteManagerConfig) -> Self {
134    self.config = config;
135    self
136  }
137
138  /// Build the manager (not started yet)
139  pub fn build(self) -> KiteTickerManager {
140    KiteTickerManager::new(self.api_key, self.access_token, self.config)
141      .with_raw_only(self.raw_only)
142  }
143}
144
145impl KiteTickerManager {
146  /// Creates a new KiteTickerManager instance with the specified configuration
147  ///
148  /// This initializes the manager with the provided API credentials and configuration,
149  /// but does not start any connections. Call [`start()`](Self::start) to begin operation.
150  ///
151  /// # Arguments
152  ///
153  /// * `api_key` - Your Kite Connect API key
154  /// * `access_token` - Valid access token from Kite Connect
155  /// * `config` - Manager configuration settings
156  ///
157  /// # Example
158  ///
159  /// ```rust,no_run
160  /// use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode};
161  ///
162  /// let config = KiteManagerConfig {
163  ///     max_connections: 3,
164  ///     max_symbols_per_connection: 3000,
165  ///     enable_dedicated_parsers: true,
166  ///     default_mode: Mode::LTP,
167  ///     ..Default::default()
168  /// };
169  ///
170  /// let manager = KiteTickerManager::new(
171  ///     "your_api_key".to_string(),
172  ///     "your_access_token".to_string(),
173  ///     config,
174  /// );
175  /// ```
176  pub fn new(
177    api_key: String,
178    access_token: String,
179    config: KiteManagerConfig,
180  ) -> Self {
181    Self {
182      config,
183      api_key,
184      access_token,
185      connections: Vec::new(),
186      processors: Vec::new(),
187      output_channels: Vec::new(),
188      symbol_mapping: HashMap::new(),
189      health_monitor: None,
190      next_connection_index: 0,
191      start_time: Instant::now(),
192      raw_only: false,
193    }
194  }
195
196  /// Set raw-only mode (builder uses this)
197  pub fn with_raw_only(mut self, raw: bool) -> Self {
198    self.raw_only = raw;
199    self
200  }
201
202  /// Initialize all connections and start the manager
203  pub async fn start(&mut self) -> Result<(), String> {
204    log::info!(
205      "Starting KiteTickerManager with {} connections",
206      self.config.max_connections
207    );
208
209    // Create all connections and processors
210    for i in 0..self.config.max_connections {
211      let channel_id = ChannelId::from_index(i)
212        .ok_or_else(|| format!("Invalid connection index: {}", i))?;
213
214      // Create message channel between connection and processor
215      let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
216
217      // Create managed connection
218      let mut connection =
219        ManagedConnection::new(channel_id, connection_sender);
220
221      // Connect to WebSocket
222      if self.raw_only {
223        connection
224          .connect_with_raw(
225            &self.api_key,
226            &self.access_token,
227            &self.config,
228            true,
229          )
230          .await
231          .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
232      } else {
233        connection
234          .connect(&self.api_key, &self.access_token, &self.config)
235          .await
236          .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
237      }
238
239      // Create message processor
240      let (mut processor, output_receiver) = MessageProcessor::new(
241        channel_id,
242        processor_receiver,
243        self.config.parser_buffer_size,
244      );
245
246      // Start processor if enabled
247      if self.config.enable_dedicated_parsers {
248        processor.start();
249        log::info!("Started dedicated parser for connection {}", i);
250      }
251
252      self.connections.push(connection);
253      self.processors.push(processor);
254      self.output_channels.push(output_receiver);
255    }
256
257    // Start health monitoring
258    if self.config.health_check_interval.as_secs() > 0 {
259      let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = self
260        .connections
261        .iter()
262        .map(|c| Arc::clone(&c.stats))
263        .collect();
264
265      let mut health_monitor =
266        HealthMonitor::new(connection_stats, self.config.health_check_interval);
267      health_monitor.start();
268      self.health_monitor = Some(health_monitor);
269
270      log::info!("Started health monitor");
271    }
272
273    log::info!(
274      "KiteTickerManager started successfully with {} connections",
275      self.connections.len()
276    );
277
278    Ok(())
279  }
280
281  /// Subscribe to symbols using round-robin distribution
282  pub async fn subscribe_symbols(
283    &mut self,
284    symbols: &[u32],
285    mode: Option<Mode>,
286  ) -> Result<(), String> {
287    let mode = mode.unwrap_or(self.config.default_mode);
288
289    log::info!(
290      "Subscribing to {} symbols with mode: {:?}",
291      symbols.len(),
292      mode
293    );
294
295    // Group symbols by connection using round-robin
296    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
297
298    for &symbol in symbols {
299      // Skip if already subscribed
300      if self.symbol_mapping.contains_key(&symbol) {
301        log::debug!("Symbol {} already subscribed", symbol);
302        continue;
303      }
304
305      // Find connection with available capacity
306      let connection_id = self.find_available_connection()?;
307
308      // Add to mapping
309      self.symbol_mapping.insert(symbol, connection_id);
310      connection_symbols
311        .entry(connection_id)
312        .or_default()
313        .push(symbol);
314    }
315
316    // Subscribe symbols on each connection
317    for (connection_id, symbols) in connection_symbols {
318      let connection = &mut self.connections[connection_id.to_index()];
319      let mode_clone = mode; // Clone for each connection
320
321      if !symbols.is_empty() {
322        // Use dynamic subscription if already has symbols, otherwise initial setup
323        if connection.subscribed_symbols.is_empty() {
324          connection
325            .subscribe_symbols(&symbols, mode_clone)
326            .await
327            .map_err(|e| {
328              format!(
329                "Failed to subscribe on connection {:?}: {}",
330                connection_id, e
331              )
332            })?;
333        } else {
334          connection
335            .add_symbols(&symbols, mode_clone)
336            .await
337            .map_err(|e| {
338              format!(
339                "Failed to add symbols on connection {:?}: {}",
340                connection_id, e
341              )
342            })?;
343        }
344
345        log::info!(
346          "Subscribed {} symbols on connection {:?}",
347          symbols.len(),
348          connection_id
349        );
350      }
351    }
352
353    log::info!("Successfully subscribed to {} new symbols", symbols.len());
354    Ok(())
355  }
356
357  /// Find connection with available capacity using round-robin
358  fn find_available_connection(&mut self) -> Result<ChannelId, String> {
359    let _start_index = self.next_connection_index;
360
361    // Try round-robin allocation
362    for _ in 0..self.config.max_connections {
363      let connection = &self.connections[self.next_connection_index];
364
365      if connection
366        .can_accept_symbols(1, self.config.max_symbols_per_connection)
367      {
368        let channel_id = connection.id;
369        self.next_connection_index =
370          (self.next_connection_index + 1) % self.config.max_connections;
371        return Ok(channel_id);
372      }
373
374      self.next_connection_index =
375        (self.next_connection_index + 1) % self.config.max_connections;
376    }
377
378    Err("All connections are at capacity".to_string())
379  }
380
381  /// Get output channel for a specific connection
382  pub fn get_channel(
383    &mut self,
384    channel_id: ChannelId,
385  ) -> Option<broadcast::Receiver<TickerMessage>> {
386    if channel_id.to_index() < self.output_channels.len() {
387      Some(self.output_channels[channel_id.to_index()].resubscribe())
388    } else {
389      None
390    }
391  }
392
393  /// Get all output channels
394  pub fn get_all_channels(
395    &mut self,
396  ) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
397    let mut channels = Vec::new();
398
399    for (i, channel) in self.output_channels.iter().enumerate() {
400      if let Some(channel_id) = ChannelId::from_index(i) {
401        channels.push((channel_id, channel.resubscribe()));
402      }
403    }
404
405    channels
406  }
407
408  /// Get manager statistics
409  pub async fn get_stats(&self) -> Result<ManagerStats, String> {
410    if let Some(health_monitor) = &self.health_monitor {
411      Ok(health_monitor.get_manager_stats().await)
412    } else {
413      Err("Health monitor not available".to_string())
414    }
415  }
416
417  /// Get health summary
418  pub async fn get_health(&self) -> Result<HealthSummary, String> {
419    if let Some(health_monitor) = &self.health_monitor {
420      Ok(health_monitor.get_health_summary().await)
421    } else {
422      Err("Health monitor not available".to_string())
423    }
424  }
425
426  /// Get processor statistics for all channels
427  pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
428    let mut stats = Vec::new();
429
430    for processor in &self.processors {
431      let processor_stats = processor.get_stats().await;
432      stats.push((processor.channel_id, processor_stats));
433    }
434
435    stats
436  }
437
438  /// Get symbol distribution across connections
439  pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
440    let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
441
442    for (&symbol, &channel_id) in &self.symbol_mapping {
443      distribution.entry(channel_id).or_default().push(symbol);
444    }
445
446    distribution
447  }
448
449  /// Unsubscribe from symbols
450  pub async fn unsubscribe_symbols(
451    &mut self,
452    symbols: &[u32],
453  ) -> Result<(), String> {
454    log::info!("Unsubscribing from {} symbols", symbols.len());
455
456    // Group symbols by connection
457    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
458
459    for &symbol in symbols {
460      if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
461        connection_symbols
462          .entry(channel_id)
463          .or_default()
464          .push(symbol);
465        self.symbol_mapping.remove(&symbol);
466      } else {
467        log::debug!("Symbol {} not found in subscriptions", symbol);
468      }
469    }
470
471    // Unsubscribe from each connection
472    for (channel_id, symbols) in connection_symbols {
473      let connection = &mut self.connections[channel_id.to_index()];
474
475      if !symbols.is_empty() {
476        connection.remove_symbols(&symbols).await.map_err(|e| {
477          format!(
478            "Failed to unsubscribe from connection {:?}: {}",
479            channel_id, e
480          )
481        })?;
482
483        log::info!(
484          "Unsubscribed {} symbols from connection {:?}",
485          symbols.len(),
486          channel_id
487        );
488      }
489    }
490
491    log::info!("Successfully unsubscribed from {} symbols", symbols.len());
492    Ok(())
493  }
494
495  /// Dynamically change subscription mode for existing symbols
496  pub async fn change_mode(
497    &mut self,
498    symbols: &[u32],
499    mode: Mode,
500  ) -> Result<(), String> {
501    log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
502
503    // Group symbols by connection
504    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
505
506    for &symbol in symbols {
507      if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
508        connection_symbols
509          .entry(channel_id)
510          .or_default()
511          .push(symbol);
512      } else {
513        log::debug!("Symbol {} not found in subscriptions", symbol);
514      }
515    }
516
517    // Change mode on each connection
518    for (channel_id, symbols) in connection_symbols {
519      let connection = &mut self.connections[channel_id.to_index()];
520      if symbols.is_empty() {
521        continue;
522      }
523      // Send mode request directly via command sender if available
524      if let Some(ref cmd) = connection.cmd_tx {
525        let mode_req = crate::models::Request::mode(mode, &symbols).to_string();
526        let _ =
527          cmd.send(tokio_tungstenite::tungstenite::Message::Text(mode_req));
528        for &s in &symbols {
529          connection.subscribed_symbols.insert(s, mode);
530        }
531        log::info!(
532          "Changed mode for {} symbols on connection {:?}",
533          symbols.len(),
534          channel_id
535        );
536      } else if let Some(subscriber) = &mut connection.subscriber {
537        // fallback (should normally have command sender)
538        subscriber.set_mode(&symbols, mode).await.map_err(|e| {
539          format!(
540            "Failed to change mode on connection {:?}: {}",
541            channel_id, e
542          )
543        })?;
544        for &s in &symbols {
545          connection.subscribed_symbols.insert(s, mode);
546        }
547      }
548    }
549
550    log::info!("Successfully changed mode for {} symbols", symbols.len());
551    Ok(())
552  }
553
554  /// Stop the manager and all connections
555  pub async fn stop(&mut self) -> Result<(), String> {
556    log::info!("Stopping KiteTickerManager");
557
558    // Stop health monitor
559    if let Some(health_monitor) = &mut self.health_monitor {
560      health_monitor.stop().await;
561    }
562
563    // Stop all processors
564    for processor in &mut self.processors {
565      processor.stop().await;
566    }
567
568    // Stop all connections
569    for connection in &mut self.connections {
570      if let Some(handle) = connection.task_handle.take() {
571        handle.abort();
572        let _ = handle.await;
573      }
574    }
575
576    log::info!("KiteTickerManager stopped");
577    Ok(())
578  }
579}