kiteticker_async_manager/manager/
connection_manager.rs

1//! # Connection Manager Module
2//!
3//! This module contains the main `KiteTickerManager` which provides high-performance
4//! multi-connection WebSocket management for the Kite Connect ticker API.
5//!
6//! ## Features
7//!
8//! - **Multi-Connection Support**: Utilizes up to 3 WebSocket connections
9//! - **Dynamic Load Balancing**: Automatic symbol distribution across connections
10//! - **High-Performance Processing**: Dedicated parser tasks per connection
11//! - **Dynamic Subscriptions**: Runtime symbol addition/removal without reconnection
12//! - **Health Monitoring**: Real-time connection health tracking
13//! - **Error Resilience**: Comprehensive error handling and recovery
14
15use crate::manager::{
16  ChannelId, ConnectionStats, HealthMonitor, HealthSummary, KiteManagerConfig,
17  ManagedConnection, ManagerStats, MessageProcessor, ProcessorStats,
18};
19use crate::models::{Mode, TickerMessage};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25/// High-performance multi-connection WebSocket manager for Kite ticker data
26///
27/// This manager creates 3 independent WebSocket connections and distributes symbols
28/// across them using round-robin allocation. Each connection has its own dedicated
29/// parser task for maximum performance.
30#[derive(Debug)]
31pub struct KiteTickerManager {
32  /// Manager configuration
33  config: KiteManagerConfig,
34
35  /// API credentials
36  api_key: String,
37  access_token: String,
38
39  /// WebSocket connections (up to 3)
40  connections: Vec<ManagedConnection>,
41
42  /// Message processors (one per connection)
43  processors: Vec<MessageProcessor>,
44
45  /// Output channels (one per connection)
46  output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48  /// Symbol to connection mapping
49  symbol_mapping: HashMap<u32, ChannelId>,
50
51  /// Health monitor
52  health_monitor: Option<HealthMonitor>,
53
54  /// Next connection index for round-robin distribution
55  next_connection_index: usize,
56
57  /// Manager start time for uptime tracking
58  #[allow(dead_code)]
59  start_time: Instant,
60  /// If true, underlying connections operate in raw-only mode (no tick parsing)
61  raw_only: bool,
62}
63
64/// Builder for `KiteTickerManager` providing a fluent API for configuration.
65#[derive(Debug, Clone)]
66pub struct KiteTickerManagerBuilder {
67  api_key: String,
68  access_token: String,
69  config: KiteManagerConfig,
70  raw_only: bool,
71}
72
73impl KiteTickerManagerBuilder {
74  /// Create a new builder with mandatory credentials and default config
75  pub fn new(
76    api_key: impl Into<String>,
77    access_token: impl Into<String>,
78  ) -> Self {
79    Self {
80      api_key: api_key.into(),
81      access_token: access_token.into(),
82      config: KiteManagerConfig::default(),
83      raw_only: false,
84    }
85  }
86
87  pub fn max_connections(mut self, n: usize) -> Self {
88    self.config.max_connections = n;
89    self
90  }
91  pub fn max_symbols_per_connection(mut self, n: usize) -> Self {
92    self.config.max_symbols_per_connection = n;
93    self
94  }
95  pub fn connection_timeout(mut self, d: std::time::Duration) -> Self {
96    self.config.connection_timeout = d;
97    self
98  }
99  pub fn health_check_interval(mut self, d: std::time::Duration) -> Self {
100    self.config.health_check_interval = d;
101    self
102  }
103  pub fn reconnect_attempts(mut self, attempts: usize) -> Self {
104    self.config.max_reconnect_attempts = attempts;
105    self
106  }
107  pub fn reconnect_delay(mut self, d: std::time::Duration) -> Self {
108    self.config.reconnect_delay = d;
109    self
110  }
111  pub fn enable_dedicated_parsers(mut self, enable: bool) -> Self {
112    self.config.enable_dedicated_parsers = enable;
113    self
114  }
115  pub fn default_mode(mut self, mode: Mode) -> Self {
116    self.config.default_mode = mode;
117    self
118  }
119  pub fn connection_buffer_size(mut self, sz: usize) -> Self {
120    self.config.connection_buffer_size = sz;
121    self
122  }
123  pub fn parser_buffer_size(mut self, sz: usize) -> Self {
124    self.config.parser_buffer_size = sz;
125    self
126  }
127  pub fn raw_only(mut self, raw: bool) -> Self {
128    self.raw_only = raw;
129    self
130  }
131
132  /// Override entire config (advanced)
133  pub fn config(mut self, config: KiteManagerConfig) -> Self {
134    self.config = config;
135    self
136  }
137
138  /// Build the manager (not started yet)
139  pub fn build(self) -> KiteTickerManager {
140    KiteTickerManager::new(self.api_key, self.access_token, self.config)
141      .with_raw_only(self.raw_only)
142  }
143}
144
145impl KiteTickerManager {
146  /// Creates a new KiteTickerManager instance with the specified configuration
147  ///
148  /// This initializes the manager with the provided API credentials and configuration,
149  /// but does not start any connections. Call [`start()`](Self::start) to begin operation.
150  ///
151  /// # Arguments
152  ///
153  /// * `api_key` - Your Kite Connect API key
154  /// * `access_token` - Valid access token from Kite Connect
155  /// * `config` - Manager configuration settings
156  ///
157  /// # Example
158  ///
159  /// ```rust,no_run
160  /// use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode};
161  ///
162  /// let config = KiteManagerConfig {
163  ///     max_connections: 3,
164  ///     max_symbols_per_connection: 3000,
165  ///     enable_dedicated_parsers: true,
166  ///     default_mode: Mode::LTP,
167  ///     ..Default::default()
168  /// };
169  ///
170  /// let manager = KiteTickerManager::new(
171  ///     "your_api_key".to_string(),
172  ///     "your_access_token".to_string(),
173  ///     config,
174  /// );
175  /// ```
176  pub fn new(
177    api_key: String,
178    access_token: String,
179    config: KiteManagerConfig,
180  ) -> Self {
181    Self {
182      config,
183      api_key,
184      access_token,
185      connections: Vec::new(),
186      processors: Vec::new(),
187      output_channels: Vec::new(),
188      symbol_mapping: HashMap::new(),
189      health_monitor: None,
190      next_connection_index: 0,
191      start_time: Instant::now(),
192      raw_only: false,
193    }
194  }
195
196  /// Set raw-only mode (builder uses this)
197  pub fn with_raw_only(mut self, raw: bool) -> Self {
198    self.raw_only = raw;
199    self
200  }
201
202  /// Initialize all connections and start the manager
203  pub async fn start(&mut self) -> Result<(), String> {
204    log::info!(
205      "Starting KiteTickerManager with {} connections",
206      self.config.max_connections
207    );
208
209    // Create all connections and processors
210    for i in 0..self.config.max_connections {
211      let channel_id = ChannelId::from_index(i)
212        .ok_or_else(|| format!("Invalid connection index: {}", i))?;
213
214      // Create message channel between connection and processor
215      let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
216
217      // Create managed connection
218      let mut connection =
219        ManagedConnection::new(channel_id, connection_sender);
220
221      // Connect to WebSocket
222      if self.raw_only {
223        connection
224          .connect_with_raw(
225            &self.api_key,
226            &self.access_token,
227            &self.config,
228            true,
229          )
230          .await
231          .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
232      } else {
233        connection
234          .connect(&self.api_key, &self.access_token, &self.config)
235          .await
236          .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
237      }
238
239      // Create message processor
240      let (mut processor, output_receiver) = MessageProcessor::new(
241        channel_id,
242        processor_receiver,
243        self.config.parser_buffer_size,
244      );
245
246      // Start processor if enabled
247      if self.config.enable_dedicated_parsers {
248        processor.start();
249        log::info!("Started dedicated parser for connection {}", i);
250      }
251
252      self.connections.push(connection);
253      self.processors.push(processor);
254      self.output_channels.push(output_receiver);
255    }
256
257    // Start health monitoring
258    if self.config.health_check_interval.as_secs() > 0 {
259      let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = self
260        .connections
261        .iter()
262        .map(|c| Arc::clone(&c.stats))
263        .collect();
264
265      let mut health_monitor =
266        HealthMonitor::new(connection_stats, self.config.health_check_interval);
267      health_monitor.start();
268      self.health_monitor = Some(health_monitor);
269
270      log::info!("Started health monitor");
271    }
272
273    log::info!(
274      "KiteTickerManager started successfully with {} connections",
275      self.connections.len()
276    );
277
278    Ok(())
279  }
280
281  /// Subscribe to symbols using round-robin distribution
282  pub async fn subscribe_symbols(
283    &mut self,
284    symbols: &[u32],
285    mode: Option<Mode>,
286  ) -> Result<(), String> {
287    let mode = mode.unwrap_or(self.config.default_mode);
288
289    log::info!(
290      "Subscribing to {} symbols with mode: {:?}",
291      symbols.len(),
292      mode
293    );
294
295    // Group symbols by connection using round-robin
296    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
297
298    for &symbol in symbols {
299      // Skip if already subscribed
300      if self.symbol_mapping.contains_key(&symbol) {
301        log::debug!("Symbol {} already subscribed", symbol);
302        continue;
303      }
304
305      // Find connection with available capacity
306      let connection_id = self.find_available_connection()?;
307
308      // Add to mapping
309      self.symbol_mapping.insert(symbol, connection_id);
310      connection_symbols
311        .entry(connection_id)
312        .or_default()
313        .push(symbol);
314    }
315
316    // Subscribe symbols on each connection
317    for (connection_id, symbols) in connection_symbols {
318      let connection = &mut self.connections[connection_id.to_index()];
319      let mode_clone = mode; // Clone for each connection
320
321      if !symbols.is_empty() {
322        // Use dynamic subscription if already has symbols, otherwise initial setup
323        if connection.subscribed_symbols.is_empty() {
324          // First-time subscription on this connection: create subscriber
325          connection
326            .subscribe_symbols(&symbols, mode_clone)
327            .await
328            .map_err(|e| {
329              format!(
330                "Failed to subscribe on connection {:?}: {}",
331                connection_id, e
332              )
333            })?;
334
335          // IMPORTANT: Start forwarding messages from the subscriber to the processor
336          connection.start_message_processing().await.map_err(|e| {
337            format!(
338              "Failed to start message processing on connection {:?}: {}",
339              connection_id, e
340            )
341          })?;
342        } else {
343          connection
344            .add_symbols(&symbols, mode_clone)
345            .await
346            .map_err(|e| {
347              format!(
348                "Failed to add symbols on connection {:?}: {}",
349                connection_id, e
350              )
351            })?;
352        }
353
354        log::info!(
355          "Subscribed {} symbols on connection {:?}",
356          symbols.len(),
357          connection_id
358        );
359      }
360    }
361
362    log::info!("Successfully subscribed to {} new symbols", symbols.len());
363    Ok(())
364  }
365
366  /// Find connection with available capacity using round-robin
367  fn find_available_connection(&mut self) -> Result<ChannelId, String> {
368    let _start_index = self.next_connection_index;
369
370    // Try round-robin allocation
371    for _ in 0..self.config.max_connections {
372      let connection = &self.connections[self.next_connection_index];
373
374      if connection
375        .can_accept_symbols(1, self.config.max_symbols_per_connection)
376      {
377        let channel_id = connection.id;
378        self.next_connection_index =
379          (self.next_connection_index + 1) % self.config.max_connections;
380        return Ok(channel_id);
381      }
382
383      self.next_connection_index =
384        (self.next_connection_index + 1) % self.config.max_connections;
385    }
386
387    Err("All connections are at capacity".to_string())
388  }
389
390  /// Get output channel for a specific connection
391  pub fn get_channel(
392    &mut self,
393    channel_id: ChannelId,
394  ) -> Option<broadcast::Receiver<TickerMessage>> {
395    if channel_id.to_index() < self.output_channels.len() {
396      Some(self.output_channels[channel_id.to_index()].resubscribe())
397    } else {
398      None
399    }
400  }
401
402  /// Get all output channels
403  pub fn get_all_channels(
404    &mut self,
405  ) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
406    let mut channels = Vec::new();
407
408    for (i, channel) in self.output_channels.iter().enumerate() {
409      if let Some(channel_id) = ChannelId::from_index(i) {
410        channels.push((channel_id, channel.resubscribe()));
411      }
412    }
413
414    channels
415  }
416
417  /// Get a raw frame receiver (bytes::Bytes per websocket frame) for a connection.
418  /// Returns None if the connection is not initialized.
419  pub fn get_raw_frame_channel(
420    &self,
421    channel_id: ChannelId,
422  ) -> Option<tokio::sync::broadcast::Receiver<bytes::Bytes>> {
423    self
424      .connections
425      .get(channel_id.to_index())
426      .and_then(|mc| mc.ticker.as_ref())
427      .map(|t| t.subscribe_raw_frames())
428  }
429
430  /// Convenience: create a 184-byte Full-tick raw subscriber for a connection.
431  /// Useful when operating in raw_only mode to consume only depth packets.
432  pub fn get_full_raw_subscriber(
433    &self,
434    channel_id: ChannelId,
435  ) -> Option<crate::KiteTickerRawSubscriber184> {
436    self
437      .connections
438      .get(channel_id.to_index())
439      .and_then(|mc| mc.ticker.as_ref())
440      .map(|t| t.subscribe_full_raw())
441  }
442
443  /// Convenience: get raw frame receivers for all initialized connections.
444  /// Each item is `(ChannelId, broadcast::Receiver<bytes::Bytes>)`.
445  pub fn get_all_raw_frame_channels(
446    &self,
447  ) -> Vec<(ChannelId, tokio::sync::broadcast::Receiver<bytes::Bytes>)> {
448    let mut out = Vec::with_capacity(self.connections.len());
449    for (i, mc) in self.connections.iter().enumerate() {
450      if let Some(ch) = ChannelId::from_index(i) {
451        if let Some(t) = mc.ticker.as_ref() {
452          out.push((ch, t.subscribe_raw_frames()));
453        }
454      }
455    }
456    out
457  }
458
459  /// Get manager statistics
460  pub async fn get_stats(&self) -> Result<ManagerStats, String> {
461    if let Some(health_monitor) = &self.health_monitor {
462      Ok(health_monitor.get_manager_stats().await)
463    } else {
464      Err("Health monitor not available".to_string())
465    }
466  }
467
468  /// Get health summary
469  pub async fn get_health(&self) -> Result<HealthSummary, String> {
470    if let Some(health_monitor) = &self.health_monitor {
471      Ok(health_monitor.get_health_summary().await)
472    } else {
473      Err("Health monitor not available".to_string())
474    }
475  }
476
477  /// Get processor statistics for all channels
478  pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
479    let mut stats = Vec::new();
480
481    for processor in &self.processors {
482      let processor_stats = processor.get_stats().await;
483      stats.push((processor.channel_id, processor_stats));
484    }
485
486    stats
487  }
488
489  /// Get symbol distribution across connections
490  pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
491    let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
492
493    for (&symbol, &channel_id) in &self.symbol_mapping {
494      distribution.entry(channel_id).or_default().push(symbol);
495    }
496
497    distribution
498  }
499
500  /// Unsubscribe from symbols
501  pub async fn unsubscribe_symbols(
502    &mut self,
503    symbols: &[u32],
504  ) -> Result<(), String> {
505    log::info!("Unsubscribing from {} symbols", symbols.len());
506
507    // Group symbols by connection
508    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
509
510    for &symbol in symbols {
511      if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
512        connection_symbols
513          .entry(channel_id)
514          .or_default()
515          .push(symbol);
516        self.symbol_mapping.remove(&symbol);
517      } else {
518        log::debug!("Symbol {} not found in subscriptions", symbol);
519      }
520    }
521
522    // Unsubscribe from each connection
523    for (channel_id, symbols) in connection_symbols {
524      let connection = &mut self.connections[channel_id.to_index()];
525
526      if !symbols.is_empty() {
527        connection.remove_symbols(&symbols).await.map_err(|e| {
528          format!(
529            "Failed to unsubscribe from connection {:?}: {}",
530            channel_id, e
531          )
532        })?;
533
534        log::info!(
535          "Unsubscribed {} symbols from connection {:?}",
536          symbols.len(),
537          channel_id
538        );
539      }
540    }
541
542    log::info!("Successfully unsubscribed from {} symbols", symbols.len());
543    Ok(())
544  }
545
546  /// Dynamically change subscription mode for existing symbols
547  pub async fn change_mode(
548    &mut self,
549    symbols: &[u32],
550    mode: Mode,
551  ) -> Result<(), String> {
552    log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
553
554    // Group symbols by connection
555    let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
556
557    for &symbol in symbols {
558      if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
559        connection_symbols
560          .entry(channel_id)
561          .or_default()
562          .push(symbol);
563      } else {
564        log::debug!("Symbol {} not found in subscriptions", symbol);
565      }
566    }
567
568    // Change mode on each connection
569    for (channel_id, symbols) in connection_symbols {
570      let connection = &mut self.connections[channel_id.to_index()];
571      if symbols.is_empty() {
572        continue;
573      }
574      // Send mode request directly via command sender if available
575      if let Some(ref cmd) = connection.cmd_tx {
576        let mode_req = crate::models::Request::mode(mode, &symbols).to_string();
577        let _ = cmd.send(tokio_tungstenite::tungstenite::Message::Text(
578          mode_req.into(),
579        ));
580        for &s in &symbols {
581          connection.subscribed_symbols.insert(s, mode);
582        }
583        log::info!(
584          "Changed mode for {} symbols on connection {:?}",
585          symbols.len(),
586          channel_id
587        );
588      } else if let Some(subscriber) = &mut connection.subscriber {
589        // fallback (should normally have command sender)
590        subscriber.set_mode(&symbols, mode).await.map_err(|e| {
591          format!(
592            "Failed to change mode on connection {:?}: {}",
593            channel_id, e
594          )
595        })?;
596        for &s in &symbols {
597          connection.subscribed_symbols.insert(s, mode);
598        }
599      }
600    }
601
602    log::info!("Successfully changed mode for {} symbols", symbols.len());
603    Ok(())
604  }
605
606  /// Stop the manager and all connections
607  pub async fn stop(&mut self) -> Result<(), String> {
608    log::info!("Stopping KiteTickerManager");
609
610    // Stop health monitor
611    if let Some(health_monitor) = &mut self.health_monitor {
612      health_monitor.stop().await;
613    }
614
615    // Stop all processors
616    for processor in &mut self.processors {
617      processor.stop().await;
618    }
619
620    // Stop all connections
621    for connection in &mut self.connections {
622      if let Some(handle) = connection.task_handle.take() {
623        handle.abort();
624        let _ = handle.await;
625      }
626    }
627
628    log::info!("KiteTickerManager stopped");
629    Ok(())
630  }
631}