kiteticker_async_manager/manager/
connection_manager.rs

1//! # Connection Manager Module
2//!
3//! This module contains the main `KiteTickerManager` which provides high-performance
4//! multi-connection WebSocket management for the Kite Connect ticker API.
5//!
6//! ## Features
7//!
8//! - **Multi-Connection Support**: Utilizes up to 3 WebSocket connections
9//! - **Dynamic Load Balancing**: Automatic symbol distribution across connections
10//! - **High-Performance Processing**: Dedicated parser tasks per connection
11//! - **Dynamic Subscriptions**: Runtime symbol addition/removal without reconnection
12//! - **Health Monitoring**: Real-time connection health tracking
13//! - **Error Resilience**: Comprehensive error handling and recovery
14
15use crate::manager::{
16  ChannelId, ConnectionStats, HealthMonitor, HealthSummary, KiteManagerConfig,
17  ManagedConnection, ManagerStats, MessageProcessor, ProcessorStats,
18};
19use crate::models::{Mode, TickerMessage};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25/// High-performance multi-connection WebSocket manager for Kite ticker data
26///
27/// This manager creates 3 independent WebSocket connections and distributes symbols
28/// across them using round-robin allocation. Each connection has its own dedicated
29/// parser task for maximum performance.
30#[derive(Debug)]
31pub struct KiteTickerManager {
32  /// Manager configuration
33  config: KiteManagerConfig,
34
35  /// API credentials
36  api_key: String,
37  access_token: String,
38
39  /// WebSocket connections (up to 3)
40  connections: Vec<ManagedConnection>,
41
42  /// Message processors (one per connection)
43  processors: Vec<MessageProcessor>,
44
45  /// Output channels (one per connection)
46  output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48  /// Symbol to connection mapping
49  symbol_mapping: HashMap<u32, ChannelId>,
50
51  /// Health monitor
52  health_monitor: Option<HealthMonitor>,
53
54  /// Next connection index for round-robin distribution
55  next_connection_index: usize,
56
57  /// Manager start time for uptime tracking
58  #[allow(dead_code)]
59  start_time: Instant,
60  /// If true, underlying connections operate in raw-only mode (no tick parsing)
61  raw_only: bool,
62}
63
64/// Builder for `KiteTickerManager` providing a fluent API for configuration.
65#[derive(Debug, Clone)]
66pub struct KiteTickerManagerBuilder {
67  api_key: String,
68  access_token: String,
69  config: KiteManagerConfig,
70  raw_only: bool,
71}
72
73impl KiteTickerManagerBuilder {
74  /// Create a new builder with mandatory credentials and default config
75  pub fn new(
76    api_key: impl Into<String>,
77    access_token: impl Into<String>,
78  ) -> Self {
79    Self {
80      api_key: api_key.into(),
81      access_token: access_token.into(),
82      config: KiteManagerConfig::default(),
83      raw_only: false,
84    }
85  }
86
87  pub fn max_connections(mut self, n: usize) -> Self {
88    self.config.max_connections = n;
89    self
90  }
91  pub fn max_symbols_per_connection(mut self, n: usize) -> Self {
92    self.config.max_symbols_per_connection = n;
93    self
94  }
95  pub fn connection_timeout(mut self, d: std::time::Duration) -> Self {
96    self.config.connection_timeout = d;
97    self
98  }
99  pub fn health_check_interval(mut self, d: std::time::Duration) -> Self {
100    self.config.health_check_interval = d;
101    self
102  }
103  pub fn reconnect_attempts(mut self, attempts: usize) -> Self {
104    self.config.max_reconnect_attempts = attempts;
105    self
106  }
107  pub fn reconnect_delay(mut self, d: std::time::Duration) -> Self {
108    self.config.reconnect_delay = d;
109    self
110  }
111  pub fn enable_dedicated_parsers(mut self, enable: bool) -> Self {
112    self.config.enable_dedicated_parsers = enable;
113    self
114  }
115  pub fn default_mode(mut self, mode: Mode) -> Self {
116    self.config.default_mode = mode;
117    self
118  }
119  pub fn connection_buffer_size(mut self, sz: usize) -> Self {
120    self.config.connection_buffer_size = sz;
121    self
122  }
123  pub fn parser_buffer_size(mut self, sz: usize) -> Self {
124    self.config.parser_buffer_size = sz;
125    self
126  }
127  pub fn raw_only(mut self, raw: bool) -> Self {
128    self.raw_only = raw;
129    self
130  }
131
132  /// Override entire config (advanced)
133  pub fn config(mut self, config: KiteManagerConfig) -> Self {
134    self.config = config;
135    self
136  }
137
138  /// Build the manager (not started yet)
139  pub fn build(self) -> KiteTickerManager {
140    KiteTickerManager::new(self.api_key, self.access_token, self.config)
141      .with_raw_only(self.raw_only)
142  }
143}
144
145impl KiteTickerManager {
146  /// Creates a new KiteTickerManager instance with the specified configuration
147  ///
148  /// This initializes the manager with the provided API credentials and configuration,
149  /// but does not start any connections. Call [`start()`](Self::start) to begin operation.
150  ///
151  /// # Arguments
152  ///
153  /// * `api_key` - Your Kite Connect API key
154  /// * `access_token` - Valid access token from Kite Connect
155  /// * `config` - Manager configuration settings
156  ///
157  /// # Example
158  ///
159  /// ```rust,no_run
160  /// use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode};
161  ///
162  /// let config = KiteManagerConfig {
163  ///     max_connections: 3,
164  ///     max_symbols_per_connection: 3000,
165  ///     enable_dedicated_parsers: true,
166  ///     default_mode: Mode::LTP,
167  ///     ..Default::default()
168  /// };
169  ///
170  /// let manager = KiteTickerManager::new(
171  ///     "your_api_key".to_string(),
172  ///     "your_access_token".to_string(),
173  ///     config,
174  /// );
175  /// ```
176  pub fn new(
177    api_key: String,
178    access_token: String,
179    config: KiteManagerConfig,
180  ) -> Self {
181    Self {
182      config,
183      api_key,
184      access_token,
185      connections: Vec::new(),
186      processors: Vec::new(),
187      output_channels: Vec::new(),
188      symbol_mapping: HashMap::new(),
189      health_monitor: None,
190      next_connection_index: 0,
191      start_time: Instant::now(),
192      raw_only: false,
193    }
194  }
195
196  /// Set raw-only mode (builder uses this)
197  pub fn with_raw_only(mut self, raw: bool) -> Self {
198    self.raw_only = raw;
199    self
200  }
201
202  /// Initialize all connections and start the manager
203  pub async fn start(&mut self) -> Result<(), String> {
204    log::info!(
205      "Starting KiteTickerManager with {} connections",
206      self.config.max_connections
207    );
208
209    // Create all connections and processors
210    for i in 0..self.config.max_connections {
211      let channel_id = ChannelId::from_index(i)
212        .ok_or_else(|| format!("Invalid connection index: {}", i))?;
213
214      // Create message channel between connection and processor
215      let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
216
217      // Create managed connection
218      let mut connection =
219        ManagedConnection::new(channel_id, connection_sender);
220
221      // Connect to WebSocket
222      if self.raw_only {
223        connection
224          .connect_with_raw(
225            &self.api_key,
226            &self.access_token,
227            &self.config,
228            true,
229          )
230          .await
231          .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
232      } else {
233        connection
234          .connect(&self.api_key, &self.access_token, &self.config)
235          .await
236          .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
237      }
238
239      // Create message processor
240      let (mut processor, output_receiver) = MessageProcessor::new(
241        channel_id,
242        processor_receiver,
243        self.config.parser_buffer_size,
244      );
245
246      // Start processor if enabled
247      if self.config.enable_dedicated_parsers {
248        processor.start();
249        log::info!("Started dedicated parser for connection {}", i);
250      }
251
252      self.connections.push(connection);
253      self.processors.push(processor);
254      self.output_channels.push(output_receiver);
255    }
256
257    // Start health monitoring
258    if self.config.health_check_interval.as_secs() > 0 {
259      let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = self
260        .connections
261        .iter()
262        .map(|c| Arc::clone(&c.stats))
263        .collect();
264
265      let mut health_monitor =
266        HealthMonitor::new(connection_stats, self.config.health_check_interval);
267      health_monitor.start();
268      self.health_monitor = Some(health_monitor);
269
270      log::info!("Started health monitor");
271    }
272
273    log::info!(
274      "KiteTickerManager started successfully with {} connections",
275      self.connections.len()
276    );
277
278    Ok(())
279  }
280
281  /// Subscribe to symbols using round-robin distribution
282  pub async fn subscribe_symbols(
283    &mut self,
284    symbols: &[u32],
285    mode: Option<Mode>,
286  ) -> Result<(), String> {
287    let mode = mode.unwrap_or(self.config.default_mode);
288
289    log::info!(
290      "Subscribing to {} symbols with mode: {:?}",
291      symbols.len(),
292      mode
293    );
294
295    // Group symbols by connection using round-robin
296    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
297
298    for &symbol in symbols {
299      // Skip if already subscribed
300      if self.symbol_mapping.contains_key(&symbol) {
301        log::debug!("Symbol {} already subscribed", symbol);
302        continue;
303      }
304
305      // Find connection with available capacity
306      let connection_id = self.find_available_connection()?;
307
308      // Add to mapping
309      self.symbol_mapping.insert(symbol, connection_id);
310      connection_symbols
311        .entry(connection_id)
312        .or_default()
313        .push(symbol);
314    }
315
316    // Subscribe symbols on each connection
317    for (connection_id, symbols) in connection_symbols {
318      let connection = &mut self.connections[connection_id.to_index()];
319      let mode_clone = mode; // Clone for each connection
320
321      if !symbols.is_empty() {
322        // Use dynamic subscription if already has symbols, otherwise initial setup
323        if connection.subscribed_symbols.is_empty() {
324          // First-time subscription on this connection: create subscriber
325          connection
326            .subscribe_symbols(&symbols, mode_clone)
327            .await
328            .map_err(|e| {
329              format!(
330                "Failed to subscribe on connection {:?}: {}",
331                connection_id, e
332              )
333            })?;
334
335          // IMPORTANT: Start forwarding messages from the subscriber to the processor
336          connection.start_message_processing().await.map_err(|e| {
337            format!(
338              "Failed to start message processing on connection {:?}: {}",
339              connection_id, e
340            )
341          })?;
342        } else {
343          connection
344            .add_symbols(&symbols, mode_clone)
345            .await
346            .map_err(|e| {
347              format!(
348                "Failed to add symbols on connection {:?}: {}",
349                connection_id, e
350              )
351            })?;
352        }
353
354        log::info!(
355          "Subscribed {} symbols on connection {:?}",
356          symbols.len(),
357          connection_id
358        );
359      }
360    }
361
362    log::info!("Successfully subscribed to {} new symbols", symbols.len());
363    Ok(())
364  }
365
366  /// Find connection with available capacity using round-robin
367  fn find_available_connection(&mut self) -> Result<ChannelId, String> {
368    let _start_index = self.next_connection_index;
369
370    // Try round-robin allocation
371    for _ in 0..self.config.max_connections {
372      let connection = &self.connections[self.next_connection_index];
373
374      if connection
375        .can_accept_symbols(1, self.config.max_symbols_per_connection)
376      {
377        let channel_id = connection.id;
378        self.next_connection_index =
379          (self.next_connection_index + 1) % self.config.max_connections;
380        return Ok(channel_id);
381      }
382
383      self.next_connection_index =
384        (self.next_connection_index + 1) % self.config.max_connections;
385    }
386
387    Err("All connections are at capacity".to_string())
388  }
389
390  /// Get output channel for a specific connection
391  pub fn get_channel(
392    &mut self,
393    channel_id: ChannelId,
394  ) -> Option<broadcast::Receiver<TickerMessage>> {
395    if channel_id.to_index() < self.output_channels.len() {
396      Some(self.output_channels[channel_id.to_index()].resubscribe())
397    } else {
398      None
399    }
400  }
401
402  /// Get all output channels
403  pub fn get_all_channels(
404    &mut self,
405  ) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
406    let mut channels = Vec::new();
407
408    for (i, channel) in self.output_channels.iter().enumerate() {
409      if let Some(channel_id) = ChannelId::from_index(i) {
410        channels.push((channel_id, channel.resubscribe()));
411      }
412    }
413
414    channels
415  }
416
417  /// Get manager statistics
418  pub async fn get_stats(&self) -> Result<ManagerStats, String> {
419    if let Some(health_monitor) = &self.health_monitor {
420      Ok(health_monitor.get_manager_stats().await)
421    } else {
422      Err("Health monitor not available".to_string())
423    }
424  }
425
426  /// Get health summary
427  pub async fn get_health(&self) -> Result<HealthSummary, String> {
428    if let Some(health_monitor) = &self.health_monitor {
429      Ok(health_monitor.get_health_summary().await)
430    } else {
431      Err("Health monitor not available".to_string())
432    }
433  }
434
435  /// Get processor statistics for all channels
436  pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
437    let mut stats = Vec::new();
438
439    for processor in &self.processors {
440      let processor_stats = processor.get_stats().await;
441      stats.push((processor.channel_id, processor_stats));
442    }
443
444    stats
445  }
446
447  /// Get symbol distribution across connections
448  pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
449    let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
450
451    for (&symbol, &channel_id) in &self.symbol_mapping {
452      distribution.entry(channel_id).or_default().push(symbol);
453    }
454
455    distribution
456  }
457
458  /// Unsubscribe from symbols
459  pub async fn unsubscribe_symbols(
460    &mut self,
461    symbols: &[u32],
462  ) -> Result<(), String> {
463    log::info!("Unsubscribing from {} symbols", symbols.len());
464
465    // Group symbols by connection
466    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
467
468    for &symbol in symbols {
469      if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
470        connection_symbols
471          .entry(channel_id)
472          .or_default()
473          .push(symbol);
474        self.symbol_mapping.remove(&symbol);
475      } else {
476        log::debug!("Symbol {} not found in subscriptions", symbol);
477      }
478    }
479
480    // Unsubscribe from each connection
481    for (channel_id, symbols) in connection_symbols {
482      let connection = &mut self.connections[channel_id.to_index()];
483
484      if !symbols.is_empty() {
485        connection.remove_symbols(&symbols).await.map_err(|e| {
486          format!(
487            "Failed to unsubscribe from connection {:?}: {}",
488            channel_id, e
489          )
490        })?;
491
492        log::info!(
493          "Unsubscribed {} symbols from connection {:?}",
494          symbols.len(),
495          channel_id
496        );
497      }
498    }
499
500    log::info!("Successfully unsubscribed from {} symbols", symbols.len());
501    Ok(())
502  }
503
504  /// Dynamically change subscription mode for existing symbols
505  pub async fn change_mode(
506    &mut self,
507    symbols: &[u32],
508    mode: Mode,
509  ) -> Result<(), String> {
510    log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
511
512    // Group symbols by connection
513    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
514
515    for &symbol in symbols {
516      if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
517        connection_symbols
518          .entry(channel_id)
519          .or_default()
520          .push(symbol);
521      } else {
522        log::debug!("Symbol {} not found in subscriptions", symbol);
523      }
524    }
525
526    // Change mode on each connection
527    for (channel_id, symbols) in connection_symbols {
528      let connection = &mut self.connections[channel_id.to_index()];
529      if symbols.is_empty() {
530        continue;
531      }
532      // Send mode request directly via command sender if available
533      if let Some(ref cmd) = connection.cmd_tx {
534        let mode_req = crate::models::Request::mode(mode, &symbols).to_string();
535        let _ =
536          cmd.send(tokio_tungstenite::tungstenite::Message::Text(mode_req));
537        for &s in &symbols {
538          connection.subscribed_symbols.insert(s, mode);
539        }
540        log::info!(
541          "Changed mode for {} symbols on connection {:?}",
542          symbols.len(),
543          channel_id
544        );
545      } else if let Some(subscriber) = &mut connection.subscriber {
546        // fallback (should normally have command sender)
547        subscriber.set_mode(&symbols, mode).await.map_err(|e| {
548          format!(
549            "Failed to change mode on connection {:?}: {}",
550            channel_id, e
551          )
552        })?;
553        for &s in &symbols {
554          connection.subscribed_symbols.insert(s, mode);
555        }
556      }
557    }
558
559    log::info!("Successfully changed mode for {} symbols", symbols.len());
560    Ok(())
561  }
562
563  /// Stop the manager and all connections
564  pub async fn stop(&mut self) -> Result<(), String> {
565    log::info!("Stopping KiteTickerManager");
566
567    // Stop health monitor
568    if let Some(health_monitor) = &mut self.health_monitor {
569      health_monitor.stop().await;
570    }
571
572    // Stop all processors
573    for processor in &mut self.processors {
574      processor.stop().await;
575    }
576
577    // Stop all connections
578    for connection in &mut self.connections {
579      if let Some(handle) = connection.task_handle.take() {
580        handle.abort();
581        let _ = handle.await;
582      }
583    }
584
585    log::info!("KiteTickerManager stopped");
586    Ok(())
587  }
588}