kiteticker_async_manager/manager/
connection_manager.rs1use crate::manager::{
16 ChannelId, ConnectionStats, HealthMonitor, HealthSummary, KiteManagerConfig,
17 ManagedConnection, ManagerStats, MessageProcessor, ProcessorStats,
18};
19use crate::models::{Mode, TickerMessage};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25#[derive(Debug)]
31pub struct KiteTickerManager {
32 config: KiteManagerConfig,
34
35 api_key: String,
37 access_token: String,
38
39 connections: Vec<ManagedConnection>,
41
42 processors: Vec<MessageProcessor>,
44
45 output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48 symbol_mapping: HashMap<u32, ChannelId>,
50
51 health_monitor: Option<HealthMonitor>,
53
54 next_connection_index: usize,
56
57 #[allow(dead_code)]
59 start_time: Instant,
60 raw_only: bool,
62}
63
64#[derive(Debug, Clone)]
66pub struct KiteTickerManagerBuilder {
67 api_key: String,
68 access_token: String,
69 config: KiteManagerConfig,
70 raw_only: bool,
71}
72
73impl KiteTickerManagerBuilder {
74 pub fn new(
76 api_key: impl Into<String>,
77 access_token: impl Into<String>,
78 ) -> Self {
79 Self {
80 api_key: api_key.into(),
81 access_token: access_token.into(),
82 config: KiteManagerConfig::default(),
83 raw_only: false,
84 }
85 }
86
87 pub fn max_connections(mut self, n: usize) -> Self {
88 self.config.max_connections = n;
89 self
90 }
91 pub fn max_symbols_per_connection(mut self, n: usize) -> Self {
92 self.config.max_symbols_per_connection = n;
93 self
94 }
95 pub fn connection_timeout(mut self, d: std::time::Duration) -> Self {
96 self.config.connection_timeout = d;
97 self
98 }
99 pub fn health_check_interval(mut self, d: std::time::Duration) -> Self {
100 self.config.health_check_interval = d;
101 self
102 }
103 pub fn reconnect_attempts(mut self, attempts: usize) -> Self {
104 self.config.max_reconnect_attempts = attempts;
105 self
106 }
107 pub fn reconnect_delay(mut self, d: std::time::Duration) -> Self {
108 self.config.reconnect_delay = d;
109 self
110 }
111 pub fn enable_dedicated_parsers(mut self, enable: bool) -> Self {
112 self.config.enable_dedicated_parsers = enable;
113 self
114 }
115 pub fn default_mode(mut self, mode: Mode) -> Self {
116 self.config.default_mode = mode;
117 self
118 }
119 pub fn connection_buffer_size(mut self, sz: usize) -> Self {
120 self.config.connection_buffer_size = sz;
121 self
122 }
123 pub fn parser_buffer_size(mut self, sz: usize) -> Self {
124 self.config.parser_buffer_size = sz;
125 self
126 }
127 pub fn raw_only(mut self, raw: bool) -> Self {
128 self.raw_only = raw;
129 self
130 }
131
132 pub fn config(mut self, config: KiteManagerConfig) -> Self {
134 self.config = config;
135 self
136 }
137
138 pub fn build(self) -> KiteTickerManager {
140 KiteTickerManager::new(self.api_key, self.access_token, self.config)
141 .with_raw_only(self.raw_only)
142 }
143}
144
145impl KiteTickerManager {
146 pub fn new(
177 api_key: String,
178 access_token: String,
179 config: KiteManagerConfig,
180 ) -> Self {
181 Self {
182 config,
183 api_key,
184 access_token,
185 connections: Vec::new(),
186 processors: Vec::new(),
187 output_channels: Vec::new(),
188 symbol_mapping: HashMap::new(),
189 health_monitor: None,
190 next_connection_index: 0,
191 start_time: Instant::now(),
192 raw_only: false,
193 }
194 }
195
196 pub fn with_raw_only(mut self, raw: bool) -> Self {
198 self.raw_only = raw;
199 self
200 }
201
202 pub async fn start(&mut self) -> Result<(), String> {
204 log::info!(
205 "Starting KiteTickerManager with {} connections",
206 self.config.max_connections
207 );
208
209 for i in 0..self.config.max_connections {
211 let channel_id = ChannelId::from_index(i)
212 .ok_or_else(|| format!("Invalid connection index: {}", i))?;
213
214 let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
216
217 let mut connection =
219 ManagedConnection::new(channel_id, connection_sender);
220
221 if self.raw_only {
223 connection
224 .connect_with_raw(
225 &self.api_key,
226 &self.access_token,
227 &self.config,
228 true,
229 )
230 .await
231 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
232 } else {
233 connection
234 .connect(&self.api_key, &self.access_token, &self.config)
235 .await
236 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
237 }
238
239 let (mut processor, output_receiver) = MessageProcessor::new(
241 channel_id,
242 processor_receiver,
243 self.config.parser_buffer_size,
244 );
245
246 if self.config.enable_dedicated_parsers {
248 processor.start();
249 log::info!("Started dedicated parser for connection {}", i);
250 }
251
252 self.connections.push(connection);
253 self.processors.push(processor);
254 self.output_channels.push(output_receiver);
255 }
256
257 if self.config.health_check_interval.as_secs() > 0 {
259 let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = self
260 .connections
261 .iter()
262 .map(|c| Arc::clone(&c.stats))
263 .collect();
264
265 let mut health_monitor =
266 HealthMonitor::new(connection_stats, self.config.health_check_interval);
267 health_monitor.start();
268 self.health_monitor = Some(health_monitor);
269
270 log::info!("Started health monitor");
271 }
272
273 log::info!(
274 "KiteTickerManager started successfully with {} connections",
275 self.connections.len()
276 );
277
278 Ok(())
279 }
280
281 pub async fn subscribe_symbols(
283 &mut self,
284 symbols: &[u32],
285 mode: Option<Mode>,
286 ) -> Result<(), String> {
287 let mode = mode.unwrap_or(self.config.default_mode);
288
289 log::info!(
290 "Subscribing to {} symbols with mode: {:?}",
291 symbols.len(),
292 mode
293 );
294
295 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
297
298 for &symbol in symbols {
299 if self.symbol_mapping.contains_key(&symbol) {
301 log::debug!("Symbol {} already subscribed", symbol);
302 continue;
303 }
304
305 let connection_id = self.find_available_connection()?;
307
308 self.symbol_mapping.insert(symbol, connection_id);
310 connection_symbols
311 .entry(connection_id)
312 .or_default()
313 .push(symbol);
314 }
315
316 for (connection_id, symbols) in connection_symbols {
318 let connection = &mut self.connections[connection_id.to_index()];
319 let mode_clone = mode; if !symbols.is_empty() {
322 if connection.subscribed_symbols.is_empty() {
324 connection
325 .subscribe_symbols(&symbols, mode_clone)
326 .await
327 .map_err(|e| {
328 format!(
329 "Failed to subscribe on connection {:?}: {}",
330 connection_id, e
331 )
332 })?;
333
334 connection.start_message_processing().await.map_err(|e| {
336 format!(
337 "Failed to start message processing on connection {:?}: {}",
338 connection_id, e
339 )
340 })?;
341 } else {
342 connection
343 .add_symbols(&symbols, mode_clone)
344 .await
345 .map_err(|e| {
346 format!(
347 "Failed to add symbols on connection {:?}: {}",
348 connection_id, e
349 )
350 })?;
351 }
352
353 log::info!(
354 "Subscribed {} symbols on connection {:?}",
355 symbols.len(),
356 connection_id
357 );
358 }
359 }
360
361 log::info!("Successfully subscribed to {} new symbols", symbols.len());
362 Ok(())
363 }
364
365 fn find_available_connection(&mut self) -> Result<ChannelId, String> {
367 let _start_index = self.next_connection_index;
368
369 for _ in 0..self.config.max_connections {
371 let connection = &self.connections[self.next_connection_index];
372
373 if connection
374 .can_accept_symbols(1, self.config.max_symbols_per_connection)
375 {
376 let channel_id = connection.id;
377 self.next_connection_index =
378 (self.next_connection_index + 1) % self.config.max_connections;
379 return Ok(channel_id);
380 }
381
382 self.next_connection_index =
383 (self.next_connection_index + 1) % self.config.max_connections;
384 }
385
386 Err("All connections are at capacity".to_string())
387 }
388
389 pub fn get_channel(
391 &mut self,
392 channel_id: ChannelId,
393 ) -> Option<broadcast::Receiver<TickerMessage>> {
394 if channel_id.to_index() < self.output_channels.len() {
395 Some(self.output_channels[channel_id.to_index()].resubscribe())
396 } else {
397 None
398 }
399 }
400
401 pub fn get_all_channels(
403 &mut self,
404 ) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
405 let mut channels = Vec::new();
406
407 for (i, channel) in self.output_channels.iter().enumerate() {
408 if let Some(channel_id) = ChannelId::from_index(i) {
409 channels.push((channel_id, channel.resubscribe()));
410 }
411 }
412
413 channels
414 }
415
416 pub async fn get_stats(&self) -> Result<ManagerStats, String> {
418 if let Some(health_monitor) = &self.health_monitor {
419 Ok(health_monitor.get_manager_stats().await)
420 } else {
421 Err("Health monitor not available".to_string())
422 }
423 }
424
425 pub async fn get_health(&self) -> Result<HealthSummary, String> {
427 if let Some(health_monitor) = &self.health_monitor {
428 Ok(health_monitor.get_health_summary().await)
429 } else {
430 Err("Health monitor not available".to_string())
431 }
432 }
433
434 pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
436 let mut stats = Vec::new();
437
438 for processor in &self.processors {
439 let processor_stats = processor.get_stats().await;
440 stats.push((processor.channel_id, processor_stats));
441 }
442
443 stats
444 }
445
446 pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
448 let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
449
450 for (&symbol, &channel_id) in &self.symbol_mapping {
451 distribution.entry(channel_id).or_default().push(symbol);
452 }
453
454 distribution
455 }
456
457 pub async fn unsubscribe_symbols(
459 &mut self,
460 symbols: &[u32],
461 ) -> Result<(), String> {
462 log::info!("Unsubscribing from {} symbols", symbols.len());
463
464 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
466
467 for &symbol in symbols {
468 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
469 connection_symbols
470 .entry(channel_id)
471 .or_default()
472 .push(symbol);
473 self.symbol_mapping.remove(&symbol);
474 } else {
475 log::debug!("Symbol {} not found in subscriptions", symbol);
476 }
477 }
478
479 for (channel_id, symbols) in connection_symbols {
481 let connection = &mut self.connections[channel_id.to_index()];
482
483 if !symbols.is_empty() {
484 connection.remove_symbols(&symbols).await.map_err(|e| {
485 format!(
486 "Failed to unsubscribe from connection {:?}: {}",
487 channel_id, e
488 )
489 })?;
490
491 log::info!(
492 "Unsubscribed {} symbols from connection {:?}",
493 symbols.len(),
494 channel_id
495 );
496 }
497 }
498
499 log::info!("Successfully unsubscribed from {} symbols", symbols.len());
500 Ok(())
501 }
502
503 pub async fn change_mode(
505 &mut self,
506 symbols: &[u32],
507 mode: Mode,
508 ) -> Result<(), String> {
509 log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
510
511 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
513
514 for &symbol in symbols {
515 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
516 connection_symbols
517 .entry(channel_id)
518 .or_default()
519 .push(symbol);
520 } else {
521 log::debug!("Symbol {} not found in subscriptions", symbol);
522 }
523 }
524
525 for (channel_id, symbols) in connection_symbols {
527 let connection = &mut self.connections[channel_id.to_index()];
528 if symbols.is_empty() {
529 continue;
530 }
531 if let Some(ref cmd) = connection.cmd_tx {
533 let mode_req = crate::models::Request::mode(mode, &symbols).to_string();
534 let _ =
535 cmd.send(tokio_tungstenite::tungstenite::Message::Text(mode_req));
536 for &s in &symbols {
537 connection.subscribed_symbols.insert(s, mode);
538 }
539 log::info!(
540 "Changed mode for {} symbols on connection {:?}",
541 symbols.len(),
542 channel_id
543 );
544 } else if let Some(subscriber) = &mut connection.subscriber {
545 subscriber.set_mode(&symbols, mode).await.map_err(|e| {
547 format!(
548 "Failed to change mode on connection {:?}: {}",
549 channel_id, e
550 )
551 })?;
552 for &s in &symbols {
553 connection.subscribed_symbols.insert(s, mode);
554 }
555 }
556 }
557
558 log::info!("Successfully changed mode for {} symbols", symbols.len());
559 Ok(())
560 }
561
562 pub async fn stop(&mut self) -> Result<(), String> {
564 log::info!("Stopping KiteTickerManager");
565
566 if let Some(health_monitor) = &mut self.health_monitor {
568 health_monitor.stop().await;
569 }
570
571 for processor in &mut self.processors {
573 processor.stop().await;
574 }
575
576 for connection in &mut self.connections {
578 if let Some(handle) = connection.task_handle.take() {
579 handle.abort();
580 let _ = handle.await;
581 }
582 }
583
584 log::info!("KiteTickerManager stopped");
585 Ok(())
586 }
587}