kiteticker_async_manager/manager/
connection_manager.rs

1//! # Connection Manager Module
2//!
3//! This module contains the main `KiteTickerManager` which provides high-performance
4//! multi-connection WebSocket management for the Kite Connect ticker API.
5//!
6//! ## Features
7//!
8//! - **Multi-Connection Support**: Utilizes up to 3 WebSocket connections
9//! - **Dynamic Load Balancing**: Automatic symbol distribution across connections
10//! - **High-Performance Processing**: Dedicated parser tasks per connection
11//! - **Dynamic Subscriptions**: Runtime symbol addition/removal without reconnection
12//! - **Health Monitoring**: Real-time connection health tracking
13//! - **Error Resilience**: Comprehensive error handling and recovery
14
15use crate::manager::{
16  ChannelId, ConnectionStats, HealthMonitor, HealthSummary, KiteManagerConfig,
17  ManagedConnection, ManagerStats, MessageProcessor, ProcessorStats,
18};
19use crate::models::{Mode, TickerMessage};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25/// High-performance multi-connection WebSocket manager for Kite ticker data
26///
27/// This manager creates 3 independent WebSocket connections and distributes symbols
28/// across them using round-robin allocation. Each connection has its own dedicated
29/// parser task for maximum performance.
30#[derive(Debug)]
31pub struct KiteTickerManager {
32  /// Manager configuration
33  config: KiteManagerConfig,
34
35  /// API credentials
36  api_key: String,
37  access_token: String,
38
39  /// WebSocket connections (up to 3)
40  connections: Vec<ManagedConnection>,
41
42  /// Message processors (one per connection)
43  processors: Vec<MessageProcessor>,
44
45  /// Output channels (one per connection)
46  output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48  /// Symbol to connection mapping
49  symbol_mapping: HashMap<u32, ChannelId>,
50
51  /// Health monitor
52  health_monitor: Option<HealthMonitor>,
53
54  /// Next connection index for round-robin distribution
55  next_connection_index: usize,
56
57  /// Manager start time for uptime tracking
58  #[allow(dead_code)]
59  start_time: Instant,
60  /// If true, underlying connections operate in raw-only mode (no tick parsing)
61  raw_only: bool,
62}
63
64/// Builder for `KiteTickerManager` providing a fluent API for configuration.
65#[derive(Debug, Clone)]
66pub struct KiteTickerManagerBuilder {
67  api_key: String,
68  access_token: String,
69  config: KiteManagerConfig,
70  raw_only: bool,
71}
72
73impl KiteTickerManagerBuilder {
74  /// Create a new builder with mandatory credentials and default config
75  pub fn new(
76    api_key: impl Into<String>,
77    access_token: impl Into<String>,
78  ) -> Self {
79    Self {
80      api_key: api_key.into(),
81      access_token: access_token.into(),
82      config: KiteManagerConfig::default(),
83      raw_only: false,
84    }
85  }
86
87  pub fn max_connections(mut self, n: usize) -> Self {
88    self.config.max_connections = n;
89    self
90  }
91  pub fn max_symbols_per_connection(mut self, n: usize) -> Self {
92    self.config.max_symbols_per_connection = n;
93    self
94  }
95  pub fn connection_timeout(mut self, d: std::time::Duration) -> Self {
96    self.config.connection_timeout = d;
97    self
98  }
99  pub fn health_check_interval(mut self, d: std::time::Duration) -> Self {
100    self.config.health_check_interval = d;
101    self
102  }
103  pub fn reconnect_attempts(mut self, attempts: usize) -> Self {
104    self.config.max_reconnect_attempts = attempts;
105    self
106  }
107  pub fn reconnect_delay(mut self, d: std::time::Duration) -> Self {
108    self.config.reconnect_delay = d;
109    self
110  }
111  pub fn enable_dedicated_parsers(mut self, enable: bool) -> Self {
112    self.config.enable_dedicated_parsers = enable;
113    self
114  }
115  pub fn default_mode(mut self, mode: Mode) -> Self {
116    self.config.default_mode = mode;
117    self
118  }
119  pub fn connection_buffer_size(mut self, sz: usize) -> Self {
120    self.config.connection_buffer_size = sz;
121    self
122  }
123  pub fn parser_buffer_size(mut self, sz: usize) -> Self {
124    self.config.parser_buffer_size = sz;
125    self
126  }
127  pub fn raw_only(mut self, raw: bool) -> Self {
128    self.raw_only = raw;
129    self
130  }
131
132  /// Override entire config (advanced)
133  pub fn config(mut self, config: KiteManagerConfig) -> Self {
134    self.config = config;
135    self
136  }
137
138  /// Build the manager (not started yet)
139  pub fn build(self) -> KiteTickerManager {
140    KiteTickerManager::new(self.api_key, self.access_token, self.config)
141      .with_raw_only(self.raw_only)
142  }
143}
144
145impl KiteTickerManager {
146  /// Creates a new KiteTickerManager instance with the specified configuration
147  ///
148  /// This initializes the manager with the provided API credentials and configuration,
149  /// but does not start any connections. Call [`start()`](Self::start) to begin operation.
150  ///
151  /// # Arguments
152  ///
153  /// * `api_key` - Your Kite Connect API key
154  /// * `access_token` - Valid access token from Kite Connect
155  /// * `config` - Manager configuration settings
156  ///
157  /// # Example
158  ///
159  /// ```rust,no_run
160  /// use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode};
161  ///
162  /// let config = KiteManagerConfig {
163  ///     max_connections: 3,
164  ///     max_symbols_per_connection: 3000,
165  ///     enable_dedicated_parsers: true,
166  ///     default_mode: Mode::LTP,
167  ///     ..Default::default()
168  /// };
169  ///
170  /// let manager = KiteTickerManager::new(
171  ///     "your_api_key".to_string(),
172  ///     "your_access_token".to_string(),
173  ///     config,
174  /// );
175  /// ```
176  pub fn new(
177    api_key: String,
178    access_token: String,
179    config: KiteManagerConfig,
180  ) -> Self {
181    Self {
182      config,
183      api_key,
184      access_token,
185      connections: Vec::new(),
186      processors: Vec::new(),
187      output_channels: Vec::new(),
188      symbol_mapping: HashMap::new(),
189      health_monitor: None,
190      next_connection_index: 0,
191      start_time: Instant::now(),
192      raw_only: false,
193    }
194  }
195
196  /// Set raw-only mode (builder uses this)
197  pub fn with_raw_only(mut self, raw: bool) -> Self {
198    self.raw_only = raw;
199    self
200  }
201
202  /// Initialize all connections and start the manager
203  pub async fn start(&mut self) -> Result<(), String> {
204    log::info!(
205      "Starting KiteTickerManager with {} connections",
206      self.config.max_connections
207    );
208
209    // Create all connections and processors
210    for i in 0..self.config.max_connections {
211      let channel_id = ChannelId::from_index(i)
212        .ok_or_else(|| format!("Invalid connection index: {}", i))?;
213
214      // Create message channel between connection and processor
215      let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
216
217      // Create managed connection
218      let mut connection =
219        ManagedConnection::new(channel_id, connection_sender);
220
221      // Connect to WebSocket
222      if self.raw_only {
223        connection
224          .connect_with_raw(
225            &self.api_key,
226            &self.access_token,
227            &self.config,
228            true,
229          )
230          .await
231          .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
232      } else {
233        connection
234          .connect(&self.api_key, &self.access_token, &self.config)
235          .await
236          .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
237      }
238
239      // Create message processor
240      let (mut processor, output_receiver) = MessageProcessor::new(
241        channel_id,
242        processor_receiver,
243        self.config.parser_buffer_size,
244      );
245
246      // Start processor if enabled
247      if self.config.enable_dedicated_parsers {
248        processor.start();
249        log::info!("Started dedicated parser for connection {}", i);
250      }
251
252      self.connections.push(connection);
253      self.processors.push(processor);
254      self.output_channels.push(output_receiver);
255    }
256
257    // Start health monitoring
258    if self.config.health_check_interval.as_secs() > 0 {
259      let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = self
260        .connections
261        .iter()
262        .map(|c| Arc::clone(&c.stats))
263        .collect();
264
265      let mut health_monitor =
266        HealthMonitor::new(connection_stats, self.config.health_check_interval);
267      health_monitor.start();
268      self.health_monitor = Some(health_monitor);
269
270      log::info!("Started health monitor");
271    }
272
273    log::info!(
274      "KiteTickerManager started successfully with {} connections",
275      self.connections.len()
276    );
277
278    Ok(())
279  }
280
281  /// Subscribe to symbols using round-robin distribution
282  pub async fn subscribe_symbols(
283    &mut self,
284    symbols: &[u32],
285    mode: Option<Mode>,
286  ) -> Result<(), String> {
287    let mode = mode.unwrap_or(self.config.default_mode);
288
289    log::info!(
290      "Subscribing to {} symbols with mode: {:?}",
291      symbols.len(),
292      mode
293    );
294
295    // Group symbols by connection using round-robin
296    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
297
298    for &symbol in symbols {
299      // Skip if already subscribed
300      if self.symbol_mapping.contains_key(&symbol) {
301        log::debug!("Symbol {} already subscribed", symbol);
302        continue;
303      }
304
305      // Find connection with available capacity
306      let connection_id = self.find_available_connection()?;
307
308      // Add to mapping
309      self.symbol_mapping.insert(symbol, connection_id);
310      connection_symbols
311        .entry(connection_id)
312        .or_default()
313        .push(symbol);
314    }
315
316    // Subscribe symbols on each connection
317    for (connection_id, symbols) in connection_symbols {
318      let connection = &mut self.connections[connection_id.to_index()];
319      let mode_clone = mode; // Clone for each connection
320
321      if !symbols.is_empty() {
322        // Use dynamic subscription if already has symbols, otherwise initial setup
323        if connection.subscribed_symbols.is_empty() {
324          connection
325            .subscribe_symbols(&symbols, mode_clone)
326            .await
327            .map_err(|e| {
328              format!(
329                "Failed to subscribe on connection {:?}: {}",
330                connection_id, e
331              )
332            })?;
333
334          // Start message processing after initial subscription
335          connection.start_message_processing().await.map_err(|e| {
336            format!(
337              "Failed to start message processing on connection {:?}: {}",
338              connection_id, e
339            )
340          })?;
341        } else {
342          connection
343            .add_symbols(&symbols, mode_clone)
344            .await
345            .map_err(|e| {
346              format!(
347                "Failed to add symbols on connection {:?}: {}",
348                connection_id, e
349              )
350            })?;
351        }
352
353        log::info!(
354          "Subscribed {} symbols on connection {:?}",
355          symbols.len(),
356          connection_id
357        );
358      }
359    }
360
361    log::info!("Successfully subscribed to {} new symbols", symbols.len());
362    Ok(())
363  }
364
365  /// Find connection with available capacity using round-robin
366  fn find_available_connection(&mut self) -> Result<ChannelId, String> {
367    let _start_index = self.next_connection_index;
368
369    // Try round-robin allocation
370    for _ in 0..self.config.max_connections {
371      let connection = &self.connections[self.next_connection_index];
372
373      if connection
374        .can_accept_symbols(1, self.config.max_symbols_per_connection)
375      {
376        let channel_id = connection.id;
377        self.next_connection_index =
378          (self.next_connection_index + 1) % self.config.max_connections;
379        return Ok(channel_id);
380      }
381
382      self.next_connection_index =
383        (self.next_connection_index + 1) % self.config.max_connections;
384    }
385
386    Err("All connections are at capacity".to_string())
387  }
388
389  /// Get output channel for a specific connection
390  pub fn get_channel(
391    &mut self,
392    channel_id: ChannelId,
393  ) -> Option<broadcast::Receiver<TickerMessage>> {
394    if channel_id.to_index() < self.output_channels.len() {
395      Some(self.output_channels[channel_id.to_index()].resubscribe())
396    } else {
397      None
398    }
399  }
400
401  /// Get all output channels
402  pub fn get_all_channels(
403    &mut self,
404  ) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
405    let mut channels = Vec::new();
406
407    for (i, channel) in self.output_channels.iter().enumerate() {
408      if let Some(channel_id) = ChannelId::from_index(i) {
409        channels.push((channel_id, channel.resubscribe()));
410      }
411    }
412
413    channels
414  }
415
416  /// Get manager statistics
417  pub async fn get_stats(&self) -> Result<ManagerStats, String> {
418    if let Some(health_monitor) = &self.health_monitor {
419      Ok(health_monitor.get_manager_stats().await)
420    } else {
421      Err("Health monitor not available".to_string())
422    }
423  }
424
425  /// Get health summary
426  pub async fn get_health(&self) -> Result<HealthSummary, String> {
427    if let Some(health_monitor) = &self.health_monitor {
428      Ok(health_monitor.get_health_summary().await)
429    } else {
430      Err("Health monitor not available".to_string())
431    }
432  }
433
434  /// Get processor statistics for all channels
435  pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
436    let mut stats = Vec::new();
437
438    for processor in &self.processors {
439      let processor_stats = processor.get_stats().await;
440      stats.push((processor.channel_id, processor_stats));
441    }
442
443    stats
444  }
445
446  /// Get symbol distribution across connections
447  pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
448    let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
449
450    for (&symbol, &channel_id) in &self.symbol_mapping {
451      distribution.entry(channel_id).or_default().push(symbol);
452    }
453
454    distribution
455  }
456
457  /// Unsubscribe from symbols
458  pub async fn unsubscribe_symbols(
459    &mut self,
460    symbols: &[u32],
461  ) -> Result<(), String> {
462    log::info!("Unsubscribing from {} symbols", symbols.len());
463
464    // Group symbols by connection
465    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
466
467    for &symbol in symbols {
468      if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
469        connection_symbols
470          .entry(channel_id)
471          .or_default()
472          .push(symbol);
473        self.symbol_mapping.remove(&symbol);
474      } else {
475        log::debug!("Symbol {} not found in subscriptions", symbol);
476      }
477    }
478
479    // Unsubscribe from each connection
480    for (channel_id, symbols) in connection_symbols {
481      let connection = &mut self.connections[channel_id.to_index()];
482
483      if !symbols.is_empty() {
484        connection.remove_symbols(&symbols).await.map_err(|e| {
485          format!(
486            "Failed to unsubscribe from connection {:?}: {}",
487            channel_id, e
488          )
489        })?;
490
491        log::info!(
492          "Unsubscribed {} symbols from connection {:?}",
493          symbols.len(),
494          channel_id
495        );
496      }
497    }
498
499    log::info!("Successfully unsubscribed from {} symbols", symbols.len());
500    Ok(())
501  }
502
503  /// Dynamically change subscription mode for existing symbols
504  pub async fn change_mode(
505    &mut self,
506    symbols: &[u32],
507    mode: Mode,
508  ) -> Result<(), String> {
509    log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
510
511    // Group symbols by connection
512    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
513
514    for &symbol in symbols {
515      if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
516        connection_symbols
517          .entry(channel_id)
518          .or_default()
519          .push(symbol);
520      } else {
521        log::debug!("Symbol {} not found in subscriptions", symbol);
522      }
523    }
524
525    // Change mode on each connection
526    for (channel_id, symbols) in connection_symbols {
527      let connection = &mut self.connections[channel_id.to_index()];
528      if symbols.is_empty() {
529        continue;
530      }
531      // Send mode request directly via command sender if available
532      if let Some(ref cmd) = connection.cmd_tx {
533        let mode_req = crate::models::Request::mode(mode, &symbols).to_string();
534        let _ =
535          cmd.send(tokio_tungstenite::tungstenite::Message::Text(mode_req));
536        for &s in &symbols {
537          connection.subscribed_symbols.insert(s, mode);
538        }
539        log::info!(
540          "Changed mode for {} symbols on connection {:?}",
541          symbols.len(),
542          channel_id
543        );
544      } else if let Some(subscriber) = &mut connection.subscriber {
545        // fallback (should normally have command sender)
546        subscriber.set_mode(&symbols, mode).await.map_err(|e| {
547          format!(
548            "Failed to change mode on connection {:?}: {}",
549            channel_id, e
550          )
551        })?;
552        for &s in &symbols {
553          connection.subscribed_symbols.insert(s, mode);
554        }
555      }
556    }
557
558    log::info!("Successfully changed mode for {} symbols", symbols.len());
559    Ok(())
560  }
561
562  /// Stop the manager and all connections
563  pub async fn stop(&mut self) -> Result<(), String> {
564    log::info!("Stopping KiteTickerManager");
565
566    // Stop health monitor
567    if let Some(health_monitor) = &mut self.health_monitor {
568      health_monitor.stop().await;
569    }
570
571    // Stop all processors
572    for processor in &mut self.processors {
573      processor.stop().await;
574    }
575
576    // Stop all connections
577    for connection in &mut self.connections {
578      if let Some(handle) = connection.task_handle.take() {
579        handle.abort();
580        let _ = handle.await;
581      }
582    }
583
584    log::info!("KiteTickerManager stopped");
585    Ok(())
586  }
587}