kiteticker_async_manager/manager/
connection_manager.rs1use crate::manager::{
16 ChannelId, ConnectionStats, HealthMonitor, HealthSummary, KiteManagerConfig,
17 ManagedConnection, ManagerStats, MessageProcessor, ProcessorStats,
18};
19use crate::models::{Mode, TickerMessage};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25#[derive(Debug)]
31pub struct KiteTickerManager {
32 config: KiteManagerConfig,
34
35 api_key: String,
37 access_token: String,
38
39 connections: Vec<ManagedConnection>,
41
42 processors: Vec<MessageProcessor>,
44
45 output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48 symbol_mapping: HashMap<u32, ChannelId>,
50
51 health_monitor: Option<HealthMonitor>,
53
54 next_connection_index: usize,
56
57 #[allow(dead_code)]
59 start_time: Instant,
60 raw_only: bool,
62}
63
64#[derive(Debug, Clone)]
66pub struct KiteTickerManagerBuilder {
67 api_key: String,
68 access_token: String,
69 config: KiteManagerConfig,
70 raw_only: bool,
71}
72
73impl KiteTickerManagerBuilder {
74 pub fn new(
76 api_key: impl Into<String>,
77 access_token: impl Into<String>,
78 ) -> Self {
79 Self {
80 api_key: api_key.into(),
81 access_token: access_token.into(),
82 config: KiteManagerConfig::default(),
83 raw_only: false,
84 }
85 }
86
87 pub fn max_connections(mut self, n: usize) -> Self {
88 self.config.max_connections = n;
89 self
90 }
91 pub fn max_symbols_per_connection(mut self, n: usize) -> Self {
92 self.config.max_symbols_per_connection = n;
93 self
94 }
95 pub fn connection_timeout(mut self, d: std::time::Duration) -> Self {
96 self.config.connection_timeout = d;
97 self
98 }
99 pub fn health_check_interval(mut self, d: std::time::Duration) -> Self {
100 self.config.health_check_interval = d;
101 self
102 }
103 pub fn reconnect_attempts(mut self, attempts: usize) -> Self {
104 self.config.max_reconnect_attempts = attempts;
105 self
106 }
107 pub fn reconnect_delay(mut self, d: std::time::Duration) -> Self {
108 self.config.reconnect_delay = d;
109 self
110 }
111 pub fn enable_dedicated_parsers(mut self, enable: bool) -> Self {
112 self.config.enable_dedicated_parsers = enable;
113 self
114 }
115 pub fn default_mode(mut self, mode: Mode) -> Self {
116 self.config.default_mode = mode;
117 self
118 }
119 pub fn connection_buffer_size(mut self, sz: usize) -> Self {
120 self.config.connection_buffer_size = sz;
121 self
122 }
123 pub fn parser_buffer_size(mut self, sz: usize) -> Self {
124 self.config.parser_buffer_size = sz;
125 self
126 }
127 pub fn raw_only(mut self, raw: bool) -> Self {
128 self.raw_only = raw;
129 self
130 }
131
132 pub fn config(mut self, config: KiteManagerConfig) -> Self {
134 self.config = config;
135 self
136 }
137
138 pub fn build(self) -> KiteTickerManager {
140 KiteTickerManager::new(self.api_key, self.access_token, self.config)
141 .with_raw_only(self.raw_only)
142 }
143}
144
145impl KiteTickerManager {
146 pub fn new(
177 api_key: String,
178 access_token: String,
179 config: KiteManagerConfig,
180 ) -> Self {
181 Self {
182 config,
183 api_key,
184 access_token,
185 connections: Vec::new(),
186 processors: Vec::new(),
187 output_channels: Vec::new(),
188 symbol_mapping: HashMap::new(),
189 health_monitor: None,
190 next_connection_index: 0,
191 start_time: Instant::now(),
192 raw_only: false,
193 }
194 }
195
196 pub fn with_raw_only(mut self, raw: bool) -> Self {
198 self.raw_only = raw;
199 self
200 }
201
202 pub async fn start(&mut self) -> Result<(), String> {
204 log::info!(
205 "Starting KiteTickerManager with {} connections",
206 self.config.max_connections
207 );
208
209 for i in 0..self.config.max_connections {
211 let channel_id = ChannelId::from_index(i)
212 .ok_or_else(|| format!("Invalid connection index: {}", i))?;
213
214 let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
216
217 let mut connection =
219 ManagedConnection::new(channel_id, connection_sender);
220
221 if self.raw_only {
223 connection
224 .connect_with_raw(
225 &self.api_key,
226 &self.access_token,
227 &self.config,
228 true,
229 )
230 .await
231 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
232 } else {
233 connection
234 .connect(&self.api_key, &self.access_token, &self.config)
235 .await
236 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
237 }
238
239 let (mut processor, output_receiver) = MessageProcessor::new(
241 channel_id,
242 processor_receiver,
243 self.config.parser_buffer_size,
244 );
245
246 if self.config.enable_dedicated_parsers {
248 processor.start();
249 log::info!("Started dedicated parser for connection {}", i);
250 }
251
252 self.connections.push(connection);
253 self.processors.push(processor);
254 self.output_channels.push(output_receiver);
255 }
256
257 if self.config.health_check_interval.as_secs() > 0 {
259 let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = self
260 .connections
261 .iter()
262 .map(|c| Arc::clone(&c.stats))
263 .collect();
264
265 let mut health_monitor =
266 HealthMonitor::new(connection_stats, self.config.health_check_interval);
267 health_monitor.start();
268 self.health_monitor = Some(health_monitor);
269
270 log::info!("Started health monitor");
271 }
272
273 log::info!(
274 "KiteTickerManager started successfully with {} connections",
275 self.connections.len()
276 );
277
278 Ok(())
279 }
280
281 pub async fn subscribe_symbols(
283 &mut self,
284 symbols: &[u32],
285 mode: Option<Mode>,
286 ) -> Result<(), String> {
287 let mode = mode.unwrap_or(self.config.default_mode);
288
289 log::info!(
290 "Subscribing to {} symbols with mode: {:?}",
291 symbols.len(),
292 mode
293 );
294
295 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
297
298 for &symbol in symbols {
299 if self.symbol_mapping.contains_key(&symbol) {
301 log::debug!("Symbol {} already subscribed", symbol);
302 continue;
303 }
304
305 let connection_id = self.find_available_connection()?;
307
308 self.symbol_mapping.insert(symbol, connection_id);
310 connection_symbols
311 .entry(connection_id)
312 .or_default()
313 .push(symbol);
314 }
315
316 for (connection_id, symbols) in connection_symbols {
318 let connection = &mut self.connections[connection_id.to_index()];
319 let mode_clone = mode; if !symbols.is_empty() {
322 if connection.subscribed_symbols.is_empty() {
324 connection
326 .subscribe_symbols(&symbols, mode_clone)
327 .await
328 .map_err(|e| {
329 format!(
330 "Failed to subscribe on connection {:?}: {}",
331 connection_id, e
332 )
333 })?;
334
335 connection.start_message_processing().await.map_err(|e| {
337 format!(
338 "Failed to start message processing on connection {:?}: {}",
339 connection_id, e
340 )
341 })?;
342 } else {
343 connection
344 .add_symbols(&symbols, mode_clone)
345 .await
346 .map_err(|e| {
347 format!(
348 "Failed to add symbols on connection {:?}: {}",
349 connection_id, e
350 )
351 })?;
352 }
353
354 log::info!(
355 "Subscribed {} symbols on connection {:?}",
356 symbols.len(),
357 connection_id
358 );
359 }
360 }
361
362 log::info!("Successfully subscribed to {} new symbols", symbols.len());
363 Ok(())
364 }
365
366 fn find_available_connection(&mut self) -> Result<ChannelId, String> {
368 let _start_index = self.next_connection_index;
369
370 for _ in 0..self.config.max_connections {
372 let connection = &self.connections[self.next_connection_index];
373
374 if connection
375 .can_accept_symbols(1, self.config.max_symbols_per_connection)
376 {
377 let channel_id = connection.id;
378 self.next_connection_index =
379 (self.next_connection_index + 1) % self.config.max_connections;
380 return Ok(channel_id);
381 }
382
383 self.next_connection_index =
384 (self.next_connection_index + 1) % self.config.max_connections;
385 }
386
387 Err("All connections are at capacity".to_string())
388 }
389
390 pub fn get_channel(
392 &mut self,
393 channel_id: ChannelId,
394 ) -> Option<broadcast::Receiver<TickerMessage>> {
395 if channel_id.to_index() < self.output_channels.len() {
396 Some(self.output_channels[channel_id.to_index()].resubscribe())
397 } else {
398 None
399 }
400 }
401
402 pub fn get_all_channels(
404 &mut self,
405 ) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
406 let mut channels = Vec::new();
407
408 for (i, channel) in self.output_channels.iter().enumerate() {
409 if let Some(channel_id) = ChannelId::from_index(i) {
410 channels.push((channel_id, channel.resubscribe()));
411 }
412 }
413
414 channels
415 }
416
417 pub async fn get_stats(&self) -> Result<ManagerStats, String> {
419 if let Some(health_monitor) = &self.health_monitor {
420 Ok(health_monitor.get_manager_stats().await)
421 } else {
422 Err("Health monitor not available".to_string())
423 }
424 }
425
426 pub async fn get_health(&self) -> Result<HealthSummary, String> {
428 if let Some(health_monitor) = &self.health_monitor {
429 Ok(health_monitor.get_health_summary().await)
430 } else {
431 Err("Health monitor not available".to_string())
432 }
433 }
434
435 pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
437 let mut stats = Vec::new();
438
439 for processor in &self.processors {
440 let processor_stats = processor.get_stats().await;
441 stats.push((processor.channel_id, processor_stats));
442 }
443
444 stats
445 }
446
447 pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
449 let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
450
451 for (&symbol, &channel_id) in &self.symbol_mapping {
452 distribution.entry(channel_id).or_default().push(symbol);
453 }
454
455 distribution
456 }
457
458 pub async fn unsubscribe_symbols(
460 &mut self,
461 symbols: &[u32],
462 ) -> Result<(), String> {
463 log::info!("Unsubscribing from {} symbols", symbols.len());
464
465 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
467
468 for &symbol in symbols {
469 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
470 connection_symbols
471 .entry(channel_id)
472 .or_default()
473 .push(symbol);
474 self.symbol_mapping.remove(&symbol);
475 } else {
476 log::debug!("Symbol {} not found in subscriptions", symbol);
477 }
478 }
479
480 for (channel_id, symbols) in connection_symbols {
482 let connection = &mut self.connections[channel_id.to_index()];
483
484 if !symbols.is_empty() {
485 connection.remove_symbols(&symbols).await.map_err(|e| {
486 format!(
487 "Failed to unsubscribe from connection {:?}: {}",
488 channel_id, e
489 )
490 })?;
491
492 log::info!(
493 "Unsubscribed {} symbols from connection {:?}",
494 symbols.len(),
495 channel_id
496 );
497 }
498 }
499
500 log::info!("Successfully unsubscribed from {} symbols", symbols.len());
501 Ok(())
502 }
503
504 pub async fn change_mode(
506 &mut self,
507 symbols: &[u32],
508 mode: Mode,
509 ) -> Result<(), String> {
510 log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
511
512 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
514
515 for &symbol in symbols {
516 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
517 connection_symbols
518 .entry(channel_id)
519 .or_default()
520 .push(symbol);
521 } else {
522 log::debug!("Symbol {} not found in subscriptions", symbol);
523 }
524 }
525
526 for (channel_id, symbols) in connection_symbols {
528 let connection = &mut self.connections[channel_id.to_index()];
529 if symbols.is_empty() {
530 continue;
531 }
532 if let Some(ref cmd) = connection.cmd_tx {
534 let mode_req = crate::models::Request::mode(mode, &symbols).to_string();
535 let _ =
536 cmd.send(tokio_tungstenite::tungstenite::Message::Text(mode_req));
537 for &s in &symbols {
538 connection.subscribed_symbols.insert(s, mode);
539 }
540 log::info!(
541 "Changed mode for {} symbols on connection {:?}",
542 symbols.len(),
543 channel_id
544 );
545 } else if let Some(subscriber) = &mut connection.subscriber {
546 subscriber.set_mode(&symbols, mode).await.map_err(|e| {
548 format!(
549 "Failed to change mode on connection {:?}: {}",
550 channel_id, e
551 )
552 })?;
553 for &s in &symbols {
554 connection.subscribed_symbols.insert(s, mode);
555 }
556 }
557 }
558
559 log::info!("Successfully changed mode for {} symbols", symbols.len());
560 Ok(())
561 }
562
563 pub async fn stop(&mut self) -> Result<(), String> {
565 log::info!("Stopping KiteTickerManager");
566
567 if let Some(health_monitor) = &mut self.health_monitor {
569 health_monitor.stop().await;
570 }
571
572 for processor in &mut self.processors {
574 processor.stop().await;
575 }
576
577 for connection in &mut self.connections {
579 if let Some(handle) = connection.task_handle.take() {
580 handle.abort();
581 let _ = handle.await;
582 }
583 }
584
585 log::info!("KiteTickerManager stopped");
586 Ok(())
587 }
588}