kiteticker_async_manager/manager/
connection_manager.rs1use crate::models::{Mode, TickerMessage};
16use crate::manager::{
17 KiteManagerConfig, ChannelId, ManagedConnection, MessageProcessor, HealthMonitor,
18 ManagerStats, HealthSummary, ConnectionStats, ProcessorStats
19};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{mpsc, broadcast, RwLock};
24
25#[derive(Debug)]
31pub struct KiteTickerManager {
32 config: KiteManagerConfig,
34
35 api_key: String,
37 access_token: String,
38
39 connections: Vec<ManagedConnection>,
41
42 processors: Vec<MessageProcessor>,
44
45 output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48 symbol_mapping: HashMap<u32, ChannelId>,
50
51 health_monitor: Option<HealthMonitor>,
53
54 next_connection_index: usize,
56
57 #[allow(dead_code)]
59 start_time: Instant,
60}
61
62impl KiteTickerManager {
63 pub fn new(
94 api_key: String,
95 access_token: String,
96 config: KiteManagerConfig,
97 ) -> Self {
98 Self {
99 config,
100 api_key,
101 access_token,
102 connections: Vec::new(),
103 processors: Vec::new(),
104 output_channels: Vec::new(),
105 symbol_mapping: HashMap::new(),
106 health_monitor: None,
107 next_connection_index: 0,
108 start_time: Instant::now(),
109 }
110 }
111
112 pub async fn start(&mut self) -> Result<(), String> {
114 log::info!("Starting KiteTickerManager with {} connections", self.config.max_connections);
115
116 for i in 0..self.config.max_connections {
118 let channel_id = ChannelId::from_index(i)
119 .ok_or_else(|| format!("Invalid connection index: {}", i))?;
120
121 let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
123
124 let mut connection = ManagedConnection::new(channel_id, connection_sender);
126
127 connection.connect(&self.api_key, &self.access_token, &self.config).await
129 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
130
131 let (mut processor, output_receiver) = MessageProcessor::new(
133 channel_id,
134 processor_receiver,
135 self.config.parser_buffer_size,
136 );
137
138 if self.config.enable_dedicated_parsers {
140 processor.start();
141 log::info!("Started dedicated parser for connection {}", i);
142 }
143
144 self.connections.push(connection);
145 self.processors.push(processor);
146 self.output_channels.push(output_receiver);
147 }
148
149 if self.config.health_check_interval.as_secs() > 0 {
151 let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> =
152 self.connections.iter().map(|c| Arc::clone(&c.stats)).collect();
153
154 let mut health_monitor = HealthMonitor::new(
155 connection_stats,
156 self.config.health_check_interval,
157 );
158 health_monitor.start();
159 self.health_monitor = Some(health_monitor);
160
161 log::info!("Started health monitor");
162 }
163
164 log::info!("KiteTickerManager started successfully with {} connections",
165 self.connections.len());
166
167 Ok(())
168 }
169
170 pub async fn subscribe_symbols(
172 &mut self,
173 symbols: &[u32],
174 mode: Option<Mode>,
175 ) -> Result<(), String> {
176 let mode = mode.unwrap_or_else(|| self.config.default_mode.clone());
177
178 log::info!("Subscribing to {} symbols with mode: {:?}", symbols.len(), mode);
179
180 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
182
183 for &symbol in symbols {
184 if self.symbol_mapping.contains_key(&symbol) {
186 log::debug!("Symbol {} already subscribed", symbol);
187 continue;
188 }
189
190 let connection_id = self.find_available_connection()?;
192
193 self.symbol_mapping.insert(symbol, connection_id);
195 connection_symbols.entry(connection_id).or_default().push(symbol);
196 }
197
198 for (connection_id, symbols) in connection_symbols {
200 let connection = &mut self.connections[connection_id.to_index()];
201 let mode_clone = mode.clone(); if !symbols.is_empty() {
204 if connection.subscribed_symbols.is_empty() {
206 connection.subscribe_symbols(&symbols, mode_clone).await
207 .map_err(|e| format!("Failed to subscribe on connection {:?}: {}", connection_id, e))?;
208
209 connection.start_message_processing().await
211 .map_err(|e| format!("Failed to start message processing on connection {:?}: {}", connection_id, e))?;
212 } else {
213 connection.add_symbols(&symbols, mode_clone).await
214 .map_err(|e| format!("Failed to add symbols on connection {:?}: {}", connection_id, e))?;
215 }
216
217 log::info!("Subscribed {} symbols on connection {:?}", symbols.len(), connection_id);
218 }
219 }
220
221 log::info!("Successfully subscribed to {} new symbols", symbols.len());
222 Ok(())
223 }
224
225 fn find_available_connection(&mut self) -> Result<ChannelId, String> {
227 let _start_index = self.next_connection_index;
228
229 for _ in 0..self.config.max_connections {
231 let connection = &self.connections[self.next_connection_index];
232
233 if connection.can_accept_symbols(1, self.config.max_symbols_per_connection) {
234 let channel_id = connection.id;
235 self.next_connection_index = (self.next_connection_index + 1) % self.config.max_connections;
236 return Ok(channel_id);
237 }
238
239 self.next_connection_index = (self.next_connection_index + 1) % self.config.max_connections;
240 }
241
242 Err("All connections are at capacity".to_string())
243 }
244
245 pub fn get_channel(&mut self, channel_id: ChannelId) -> Option<broadcast::Receiver<TickerMessage>> {
247 if channel_id.to_index() < self.output_channels.len() {
248 Some(self.output_channels[channel_id.to_index()].resubscribe())
249 } else {
250 None
251 }
252 }
253
254 pub fn get_all_channels(&mut self) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
256 let mut channels = Vec::new();
257
258 for (i, channel) in self.output_channels.iter().enumerate() {
259 if let Some(channel_id) = ChannelId::from_index(i) {
260 channels.push((channel_id, channel.resubscribe()));
261 }
262 }
263
264 channels
265 }
266
267 pub async fn get_stats(&self) -> Result<ManagerStats, String> {
269 if let Some(health_monitor) = &self.health_monitor {
270 Ok(health_monitor.get_manager_stats().await)
271 } else {
272 Err("Health monitor not available".to_string())
273 }
274 }
275
276 pub async fn get_health(&self) -> Result<HealthSummary, String> {
278 if let Some(health_monitor) = &self.health_monitor {
279 Ok(health_monitor.get_health_summary().await)
280 } else {
281 Err("Health monitor not available".to_string())
282 }
283 }
284
285 pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
287 let mut stats = Vec::new();
288
289 for processor in &self.processors {
290 let processor_stats = processor.get_stats().await;
291 stats.push((processor.channel_id, processor_stats));
292 }
293
294 stats
295 }
296
297 pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
299 let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
300
301 for (&symbol, &channel_id) in &self.symbol_mapping {
302 distribution.entry(channel_id).or_default().push(symbol);
303 }
304
305 distribution
306 }
307
308 pub async fn unsubscribe_symbols(&mut self, symbols: &[u32]) -> Result<(), String> {
310 log::info!("Unsubscribing from {} symbols", symbols.len());
311
312 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
314
315 for &symbol in symbols {
316 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
317 connection_symbols.entry(channel_id).or_default().push(symbol);
318 self.symbol_mapping.remove(&symbol);
319 } else {
320 log::debug!("Symbol {} not found in subscriptions", symbol);
321 }
322 }
323
324 for (channel_id, symbols) in connection_symbols {
326 let connection = &mut self.connections[channel_id.to_index()];
327
328 if !symbols.is_empty() {
329 connection.remove_symbols(&symbols).await
330 .map_err(|e| format!("Failed to unsubscribe from connection {:?}: {}", channel_id, e))?;
331
332 log::info!("Unsubscribed {} symbols from connection {:?}", symbols.len(), channel_id);
333 }
334 }
335
336 log::info!("Successfully unsubscribed from {} symbols", symbols.len());
337 Ok(())
338 }
339
340 pub async fn change_mode(&mut self, symbols: &[u32], mode: Mode) -> Result<(), String> {
342 log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
343
344 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
346
347 for &symbol in symbols {
348 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
349 connection_symbols.entry(channel_id).or_default().push(symbol);
350 } else {
351 log::debug!("Symbol {} not found in subscriptions", symbol);
352 }
353 }
354
355 for (channel_id, symbols) in connection_symbols {
357 let connection = &mut self.connections[channel_id.to_index()];
358
359 if !symbols.is_empty() {
360 if let Some(subscriber) = &mut connection.subscriber {
361 subscriber.set_mode(&symbols, mode.clone()).await
362 .map_err(|e| format!("Failed to change mode on connection {:?}: {}", channel_id, e))?;
363
364 for &symbol in &symbols {
366 connection.subscribed_symbols.insert(symbol, mode.clone());
367 }
368
369 log::info!("Changed mode for {} symbols on connection {:?}", symbols.len(), channel_id);
370 }
371 }
372 }
373
374 log::info!("Successfully changed mode for {} symbols", symbols.len());
375 Ok(())
376 }
377
378 pub async fn stop(&mut self) -> Result<(), String> {
380 log::info!("Stopping KiteTickerManager");
381
382 if let Some(health_monitor) = &mut self.health_monitor {
384 health_monitor.stop().await;
385 }
386
387 for processor in &mut self.processors {
389 processor.stop().await;
390 }
391
392 for connection in &mut self.connections {
394 if let Some(handle) = connection.task_handle.take() {
395 handle.abort();
396 let _ = handle.await;
397 }
398 }
399
400 log::info!("KiteTickerManager stopped");
401 Ok(())
402 }
403}