kiteticker_async_manager/manager/
connection_manager.rs

1//! # Connection Manager Module
2//! 
3//! This module contains the main `KiteTickerManager` which provides high-performance
4//! multi-connection WebSocket management for the Kite Connect ticker API.
5//!
6//! ## Features
7//!
8//! - **Multi-Connection Support**: Utilizes up to 3 WebSocket connections
9//! - **Dynamic Load Balancing**: Automatic symbol distribution across connections
10//! - **High-Performance Processing**: Dedicated parser tasks per connection
11//! - **Dynamic Subscriptions**: Runtime symbol addition/removal without reconnection
12//! - **Health Monitoring**: Real-time connection health tracking
13//! - **Error Resilience**: Comprehensive error handling and recovery
14
15use crate::models::{Mode, TickerMessage};
16use crate::manager::{
17    KiteManagerConfig, ChannelId, ManagedConnection, MessageProcessor, HealthMonitor,
18    ManagerStats, HealthSummary, ConnectionStats, ProcessorStats
19};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{mpsc, broadcast, RwLock};
24
25/// High-performance multi-connection WebSocket manager for Kite ticker data
26/// 
27/// This manager creates 3 independent WebSocket connections and distributes symbols
28/// across them using round-robin allocation. Each connection has its own dedicated
29/// parser task for maximum performance.
30#[derive(Debug)]
31pub struct KiteTickerManager {
32    /// Manager configuration
33    config: KiteManagerConfig,
34    
35    /// API credentials
36    api_key: String,
37    access_token: String,
38    
39    /// WebSocket connections (up to 3)
40    connections: Vec<ManagedConnection>,
41    
42    /// Message processors (one per connection)
43    processors: Vec<MessageProcessor>,
44    
45    /// Output channels (one per connection)
46    output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47    
48    /// Symbol to connection mapping
49    symbol_mapping: HashMap<u32, ChannelId>,
50    
51    /// Health monitor
52    health_monitor: Option<HealthMonitor>,
53    
54    /// Next connection index for round-robin distribution
55    next_connection_index: usize,
56    
57    /// Manager start time for uptime tracking
58    #[allow(dead_code)]
59    start_time: Instant,
60}
61
62impl KiteTickerManager {
63    /// Creates a new KiteTickerManager instance with the specified configuration
64    /// 
65    /// This initializes the manager with the provided API credentials and configuration,
66    /// but does not start any connections. Call [`start()`](Self::start) to begin operation.
67    /// 
68    /// # Arguments
69    /// 
70    /// * `api_key` - Your Kite Connect API key
71    /// * `access_token` - Valid access token from Kite Connect
72    /// * `config` - Manager configuration settings
73    /// 
74    /// # Example
75    /// 
76    /// ```rust,no_run
77    /// use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode};
78    /// 
79    /// let config = KiteManagerConfig {
80    ///     max_connections: 3,
81    ///     max_symbols_per_connection: 3000,
82    ///     enable_dedicated_parsers: true,
83    ///     default_mode: Mode::LTP,
84    ///     ..Default::default()
85    /// };
86    /// 
87    /// let manager = KiteTickerManager::new(
88    ///     "your_api_key".to_string(),
89    ///     "your_access_token".to_string(),
90    ///     config,
91    /// );
92    /// ```
93    pub fn new(
94        api_key: String,
95        access_token: String,
96        config: KiteManagerConfig,
97    ) -> Self {
98        Self {
99            config,
100            api_key,
101            access_token,
102            connections: Vec::new(),
103            processors: Vec::new(),
104            output_channels: Vec::new(),
105            symbol_mapping: HashMap::new(),
106            health_monitor: None,
107            next_connection_index: 0,
108            start_time: Instant::now(),
109        }
110    }
111    
112    /// Initialize all connections and start the manager
113    pub async fn start(&mut self) -> Result<(), String> {
114        log::info!("Starting KiteTickerManager with {} connections", self.config.max_connections);
115        
116        // Create all connections and processors
117        for i in 0..self.config.max_connections {
118            let channel_id = ChannelId::from_index(i)
119                .ok_or_else(|| format!("Invalid connection index: {}", i))?;
120            
121            // Create message channel between connection and processor
122            let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
123            
124            // Create managed connection
125            let mut connection = ManagedConnection::new(channel_id, connection_sender);
126            
127            // Connect to WebSocket
128            connection.connect(&self.api_key, &self.access_token, &self.config).await
129                .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
130            
131            // Create message processor
132            let (mut processor, output_receiver) = MessageProcessor::new(
133                channel_id,
134                processor_receiver,
135                self.config.parser_buffer_size,
136            );
137            
138            // Start processor if enabled
139            if self.config.enable_dedicated_parsers {
140                processor.start();
141                log::info!("Started dedicated parser for connection {}", i);
142            }
143            
144            self.connections.push(connection);
145            self.processors.push(processor);
146            self.output_channels.push(output_receiver);
147        }
148        
149        // Start health monitoring
150        if self.config.health_check_interval.as_secs() > 0 {
151            let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = 
152                self.connections.iter().map(|c| Arc::clone(&c.stats)).collect();
153            
154            let mut health_monitor = HealthMonitor::new(
155                connection_stats,
156                self.config.health_check_interval,
157            );
158            health_monitor.start();
159            self.health_monitor = Some(health_monitor);
160            
161            log::info!("Started health monitor");
162        }
163        
164        log::info!("KiteTickerManager started successfully with {} connections", 
165                  self.connections.len());
166        
167        Ok(())
168    }
169    
170    /// Subscribe to symbols using round-robin distribution
171    pub async fn subscribe_symbols(
172        &mut self,
173        symbols: &[u32],
174        mode: Option<Mode>,
175    ) -> Result<(), String> {
176        let mode = mode.unwrap_or_else(|| self.config.default_mode.clone());
177        
178        log::info!("Subscribing to {} symbols with mode: {:?}", symbols.len(), mode);
179        
180        // Group symbols by connection using round-robin
181        let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
182        
183        for &symbol in symbols {
184            // Skip if already subscribed
185            if self.symbol_mapping.contains_key(&symbol) {
186                log::debug!("Symbol {} already subscribed", symbol);
187                continue;
188            }
189            
190            // Find connection with available capacity
191            let connection_id = self.find_available_connection()?;
192            
193            // Add to mapping
194            self.symbol_mapping.insert(symbol, connection_id);
195            connection_symbols.entry(connection_id).or_default().push(symbol);
196        }
197        
198        // Subscribe symbols on each connection
199        for (connection_id, symbols) in connection_symbols {
200            let connection = &mut self.connections[connection_id.to_index()];
201            let mode_clone = mode.clone(); // Clone for each connection
202            
203            if !symbols.is_empty() {
204                // Use dynamic subscription if already has symbols, otherwise initial setup
205                if connection.subscribed_symbols.is_empty() {
206                    connection.subscribe_symbols(&symbols, mode_clone).await
207                        .map_err(|e| format!("Failed to subscribe on connection {:?}: {}", connection_id, e))?;
208                    
209                    // Start message processing after initial subscription
210                    connection.start_message_processing().await
211                        .map_err(|e| format!("Failed to start message processing on connection {:?}: {}", connection_id, e))?;
212                } else {
213                    connection.add_symbols(&symbols, mode_clone).await
214                        .map_err(|e| format!("Failed to add symbols on connection {:?}: {}", connection_id, e))?;
215                }
216                
217                log::info!("Subscribed {} symbols on connection {:?}", symbols.len(), connection_id);
218            }
219        }
220        
221        log::info!("Successfully subscribed to {} new symbols", symbols.len());
222        Ok(())
223    }
224    
225    /// Find connection with available capacity using round-robin
226    fn find_available_connection(&mut self) -> Result<ChannelId, String> {
227        let _start_index = self.next_connection_index;
228        
229        // Try round-robin allocation
230        for _ in 0..self.config.max_connections {
231            let connection = &self.connections[self.next_connection_index];
232            
233            if connection.can_accept_symbols(1, self.config.max_symbols_per_connection) {
234                let channel_id = connection.id;
235                self.next_connection_index = (self.next_connection_index + 1) % self.config.max_connections;
236                return Ok(channel_id);
237            }
238            
239            self.next_connection_index = (self.next_connection_index + 1) % self.config.max_connections;
240        }
241        
242        Err("All connections are at capacity".to_string())
243    }
244    
245    /// Get output channel for a specific connection
246    pub fn get_channel(&mut self, channel_id: ChannelId) -> Option<broadcast::Receiver<TickerMessage>> {
247        if channel_id.to_index() < self.output_channels.len() {
248            Some(self.output_channels[channel_id.to_index()].resubscribe())
249        } else {
250            None
251        }
252    }
253    
254    /// Get all output channels
255    pub fn get_all_channels(&mut self) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
256        let mut channels = Vec::new();
257        
258        for (i, channel) in self.output_channels.iter().enumerate() {
259            if let Some(channel_id) = ChannelId::from_index(i) {
260                channels.push((channel_id, channel.resubscribe()));
261            }
262        }
263        
264        channels
265    }
266    
267    /// Get manager statistics
268    pub async fn get_stats(&self) -> Result<ManagerStats, String> {
269        if let Some(health_monitor) = &self.health_monitor {
270            Ok(health_monitor.get_manager_stats().await)
271        } else {
272            Err("Health monitor not available".to_string())
273        }
274    }
275    
276    /// Get health summary
277    pub async fn get_health(&self) -> Result<HealthSummary, String> {
278        if let Some(health_monitor) = &self.health_monitor {
279            Ok(health_monitor.get_health_summary().await)
280        } else {
281            Err("Health monitor not available".to_string())
282        }
283    }
284    
285    /// Get processor statistics for all channels
286    pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
287        let mut stats = Vec::new();
288        
289        for processor in &self.processors {
290            let processor_stats = processor.get_stats().await;
291            stats.push((processor.channel_id, processor_stats));
292        }
293        
294        stats
295    }
296    
297    /// Get symbol distribution across connections
298    pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
299        let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
300        
301        for (&symbol, &channel_id) in &self.symbol_mapping {
302            distribution.entry(channel_id).or_default().push(symbol);
303        }
304        
305        distribution
306    }
307    
308    /// Unsubscribe from symbols
309    pub async fn unsubscribe_symbols(&mut self, symbols: &[u32]) -> Result<(), String> {
310        log::info!("Unsubscribing from {} symbols", symbols.len());
311        
312        // Group symbols by connection
313        let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
314        
315        for &symbol in symbols {
316            if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
317                connection_symbols.entry(channel_id).or_default().push(symbol);
318                self.symbol_mapping.remove(&symbol);
319            } else {
320                log::debug!("Symbol {} not found in subscriptions", symbol);
321            }
322        }
323        
324        // Unsubscribe from each connection
325        for (channel_id, symbols) in connection_symbols {
326            let connection = &mut self.connections[channel_id.to_index()];
327            
328            if !symbols.is_empty() {
329                connection.remove_symbols(&symbols).await
330                    .map_err(|e| format!("Failed to unsubscribe from connection {:?}: {}", channel_id, e))?;
331                
332                log::info!("Unsubscribed {} symbols from connection {:?}", symbols.len(), channel_id);
333            }
334        }
335        
336        log::info!("Successfully unsubscribed from {} symbols", symbols.len());
337        Ok(())
338    }
339
340    /// Dynamically change subscription mode for existing symbols
341    pub async fn change_mode(&mut self, symbols: &[u32], mode: Mode) -> Result<(), String> {
342        log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
343        
344        // Group symbols by connection
345        let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
346        
347        for &symbol in symbols {
348            if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
349                connection_symbols.entry(channel_id).or_default().push(symbol);
350            } else {
351                log::debug!("Symbol {} not found in subscriptions", symbol);
352            }
353        }
354        
355        // Change mode on each connection
356        for (channel_id, symbols) in connection_symbols {
357            let connection = &mut self.connections[channel_id.to_index()];
358            
359            if !symbols.is_empty() {
360                if let Some(subscriber) = &mut connection.subscriber {
361                    subscriber.set_mode(&symbols, mode.clone()).await
362                        .map_err(|e| format!("Failed to change mode on connection {:?}: {}", channel_id, e))?;
363                    
364                    // Update our tracking
365                    for &symbol in &symbols {
366                        connection.subscribed_symbols.insert(symbol, mode.clone());
367                    }
368                    
369                    log::info!("Changed mode for {} symbols on connection {:?}", symbols.len(), channel_id);
370                }
371            }
372        }
373        
374        log::info!("Successfully changed mode for {} symbols", symbols.len());
375        Ok(())
376    }
377    
378    /// Stop the manager and all connections
379    pub async fn stop(&mut self) -> Result<(), String> {
380        log::info!("Stopping KiteTickerManager");
381        
382        // Stop health monitor
383        if let Some(health_monitor) = &mut self.health_monitor {
384            health_monitor.stop().await;
385        }
386        
387        // Stop all processors
388        for processor in &mut self.processors {
389            processor.stop().await;
390        }
391        
392        // Stop all connections
393        for connection in &mut self.connections {
394            if let Some(handle) = connection.task_handle.take() {
395                handle.abort();
396                let _ = handle.await;
397            }
398        }
399        
400        log::info!("KiteTickerManager stopped");
401        Ok(())
402    }
403}