kiteticker_async_manager/manager/
connection_manager.rs1use crate::manager::{
16 ChannelId, ConnectionStats, HealthMonitor, HealthSummary, KiteManagerConfig,
17 ManagedConnection, ManagerStats, MessageProcessor, ProcessorStats,
18};
19use crate::models::{Mode, TickerMessage};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25#[derive(Debug)]
31pub struct KiteTickerManager {
32 config: KiteManagerConfig,
34
35 api_key: String,
37 access_token: String,
38
39 connections: Vec<ManagedConnection>,
41
42 processors: Vec<MessageProcessor>,
44
45 output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48 symbol_mapping: HashMap<u32, ChannelId>,
50
51 health_monitor: Option<HealthMonitor>,
53
54 next_connection_index: usize,
56
57 #[allow(dead_code)]
59 start_time: Instant,
60 raw_only: bool,
62}
63
64#[derive(Debug, Clone)]
66pub struct KiteTickerManagerBuilder {
67 api_key: String,
68 access_token: String,
69 config: KiteManagerConfig,
70 raw_only: bool,
71}
72
73impl KiteTickerManagerBuilder {
74 pub fn new(
76 api_key: impl Into<String>,
77 access_token: impl Into<String>,
78 ) -> Self {
79 Self {
80 api_key: api_key.into(),
81 access_token: access_token.into(),
82 config: KiteManagerConfig::default(),
83 raw_only: false,
84 }
85 }
86
87 pub fn max_connections(mut self, n: usize) -> Self {
88 self.config.max_connections = n;
89 self
90 }
91 pub fn max_symbols_per_connection(mut self, n: usize) -> Self {
92 self.config.max_symbols_per_connection = n;
93 self
94 }
95 pub fn connection_timeout(mut self, d: std::time::Duration) -> Self {
96 self.config.connection_timeout = d;
97 self
98 }
99 pub fn health_check_interval(mut self, d: std::time::Duration) -> Self {
100 self.config.health_check_interval = d;
101 self
102 }
103 pub fn reconnect_attempts(mut self, attempts: usize) -> Self {
104 self.config.max_reconnect_attempts = attempts;
105 self
106 }
107 pub fn reconnect_delay(mut self, d: std::time::Duration) -> Self {
108 self.config.reconnect_delay = d;
109 self
110 }
111 pub fn enable_dedicated_parsers(mut self, enable: bool) -> Self {
112 self.config.enable_dedicated_parsers = enable;
113 self
114 }
115 pub fn default_mode(mut self, mode: Mode) -> Self {
116 self.config.default_mode = mode;
117 self
118 }
119 pub fn connection_buffer_size(mut self, sz: usize) -> Self {
120 self.config.connection_buffer_size = sz;
121 self
122 }
123 pub fn parser_buffer_size(mut self, sz: usize) -> Self {
124 self.config.parser_buffer_size = sz;
125 self
126 }
127 pub fn raw_only(mut self, raw: bool) -> Self {
128 self.raw_only = raw;
129 self
130 }
131
132 pub fn config(mut self, config: KiteManagerConfig) -> Self {
134 self.config = config;
135 self
136 }
137
138 pub fn build(self) -> KiteTickerManager {
140 KiteTickerManager::new(self.api_key, self.access_token, self.config)
141 .with_raw_only(self.raw_only)
142 }
143}
144
145impl KiteTickerManager {
146 pub fn new(
177 api_key: String,
178 access_token: String,
179 config: KiteManagerConfig,
180 ) -> Self {
181 Self {
182 config,
183 api_key,
184 access_token,
185 connections: Vec::new(),
186 processors: Vec::new(),
187 output_channels: Vec::new(),
188 symbol_mapping: HashMap::new(),
189 health_monitor: None,
190 next_connection_index: 0,
191 start_time: Instant::now(),
192 raw_only: false,
193 }
194 }
195
196 pub fn with_raw_only(mut self, raw: bool) -> Self {
198 self.raw_only = raw;
199 self
200 }
201
202 pub async fn start(&mut self) -> Result<(), String> {
204 log::info!(
205 "Starting KiteTickerManager with {} connections",
206 self.config.max_connections
207 );
208
209 for i in 0..self.config.max_connections {
211 let channel_id = ChannelId::from_index(i)
212 .ok_or_else(|| format!("Invalid connection index: {}", i))?;
213
214 let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
216
217 let mut connection =
219 ManagedConnection::new(channel_id, connection_sender);
220
221 if self.raw_only {
223 connection
224 .connect_with_raw(
225 &self.api_key,
226 &self.access_token,
227 &self.config,
228 true,
229 )
230 .await
231 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
232 } else {
233 connection
234 .connect(&self.api_key, &self.access_token, &self.config)
235 .await
236 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
237 }
238
239 let (mut processor, output_receiver) = MessageProcessor::new(
241 channel_id,
242 processor_receiver,
243 self.config.parser_buffer_size,
244 );
245
246 if self.config.enable_dedicated_parsers {
248 processor.start();
249 log::info!("Started dedicated parser for connection {}", i);
250 }
251
252 self.connections.push(connection);
253 self.processors.push(processor);
254 self.output_channels.push(output_receiver);
255 }
256
257 if self.config.health_check_interval.as_secs() > 0 {
259 let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = self
260 .connections
261 .iter()
262 .map(|c| Arc::clone(&c.stats))
263 .collect();
264
265 let mut health_monitor =
266 HealthMonitor::new(connection_stats, self.config.health_check_interval);
267 health_monitor.start();
268 self.health_monitor = Some(health_monitor);
269
270 log::info!("Started health monitor");
271 }
272
273 log::info!(
274 "KiteTickerManager started successfully with {} connections",
275 self.connections.len()
276 );
277
278 Ok(())
279 }
280
281 pub async fn subscribe_symbols(
283 &mut self,
284 symbols: &[u32],
285 mode: Option<Mode>,
286 ) -> Result<(), String> {
287 let mode = mode.unwrap_or(self.config.default_mode);
288
289 log::info!(
290 "Subscribing to {} symbols with mode: {:?}",
291 symbols.len(),
292 mode
293 );
294
295 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
297
298 for &symbol in symbols {
299 if self.symbol_mapping.contains_key(&symbol) {
301 log::debug!("Symbol {} already subscribed", symbol);
302 continue;
303 }
304
305 let connection_id = self.find_available_connection()?;
307
308 self.symbol_mapping.insert(symbol, connection_id);
310 connection_symbols
311 .entry(connection_id)
312 .or_default()
313 .push(symbol);
314 }
315
316 for (connection_id, symbols) in connection_symbols {
318 let connection = &mut self.connections[connection_id.to_index()];
319 let mode_clone = mode; if !symbols.is_empty() {
322 if connection.subscribed_symbols.is_empty() {
324 connection
326 .subscribe_symbols(&symbols, mode_clone)
327 .await
328 .map_err(|e| {
329 format!(
330 "Failed to subscribe on connection {:?}: {}",
331 connection_id, e
332 )
333 })?;
334
335 connection.start_message_processing().await.map_err(|e| {
337 format!(
338 "Failed to start message processing on connection {:?}: {}",
339 connection_id, e
340 )
341 })?;
342 } else {
343 connection
344 .add_symbols(&symbols, mode_clone)
345 .await
346 .map_err(|e| {
347 format!(
348 "Failed to add symbols on connection {:?}: {}",
349 connection_id, e
350 )
351 })?;
352 }
353
354 log::info!(
355 "Subscribed {} symbols on connection {:?}",
356 symbols.len(),
357 connection_id
358 );
359 }
360 }
361
362 log::info!("Successfully subscribed to {} new symbols", symbols.len());
363 Ok(())
364 }
365
366 fn find_available_connection(&mut self) -> Result<ChannelId, String> {
368 let _start_index = self.next_connection_index;
369
370 for _ in 0..self.config.max_connections {
372 let connection = &self.connections[self.next_connection_index];
373
374 if connection
375 .can_accept_symbols(1, self.config.max_symbols_per_connection)
376 {
377 let channel_id = connection.id;
378 self.next_connection_index =
379 (self.next_connection_index + 1) % self.config.max_connections;
380 return Ok(channel_id);
381 }
382
383 self.next_connection_index =
384 (self.next_connection_index + 1) % self.config.max_connections;
385 }
386
387 Err("All connections are at capacity".to_string())
388 }
389
390 pub fn get_channel(
392 &mut self,
393 channel_id: ChannelId,
394 ) -> Option<broadcast::Receiver<TickerMessage>> {
395 if channel_id.to_index() < self.output_channels.len() {
396 Some(self.output_channels[channel_id.to_index()].resubscribe())
397 } else {
398 None
399 }
400 }
401
402 pub fn get_all_channels(
404 &mut self,
405 ) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
406 let mut channels = Vec::new();
407
408 for (i, channel) in self.output_channels.iter().enumerate() {
409 if let Some(channel_id) = ChannelId::from_index(i) {
410 channels.push((channel_id, channel.resubscribe()));
411 }
412 }
413
414 channels
415 }
416
417 pub fn get_raw_frame_channel(
420 &self,
421 channel_id: ChannelId,
422 ) -> Option<tokio::sync::broadcast::Receiver<bytes::Bytes>> {
423 self
424 .connections
425 .get(channel_id.to_index())
426 .and_then(|mc| mc.ticker.as_ref())
427 .map(|t| t.subscribe_raw_frames())
428 }
429
430 pub fn get_full_raw_subscriber(
433 &self,
434 channel_id: ChannelId,
435 ) -> Option<crate::KiteTickerRawSubscriber184> {
436 self
437 .connections
438 .get(channel_id.to_index())
439 .and_then(|mc| mc.ticker.as_ref())
440 .map(|t| t.subscribe_full_raw())
441 }
442
443 pub fn get_all_raw_frame_channels(
446 &self,
447 ) -> Vec<(ChannelId, tokio::sync::broadcast::Receiver<bytes::Bytes>)> {
448 let mut out = Vec::with_capacity(self.connections.len());
449 for (i, mc) in self.connections.iter().enumerate() {
450 if let Some(ch) = ChannelId::from_index(i) {
451 if let Some(t) = mc.ticker.as_ref() {
452 out.push((ch, t.subscribe_raw_frames()));
453 }
454 }
455 }
456 out
457 }
458
459 pub async fn get_stats(&self) -> Result<ManagerStats, String> {
461 if let Some(health_monitor) = &self.health_monitor {
462 Ok(health_monitor.get_manager_stats().await)
463 } else {
464 Err("Health monitor not available".to_string())
465 }
466 }
467
468 pub async fn get_health(&self) -> Result<HealthSummary, String> {
470 if let Some(health_monitor) = &self.health_monitor {
471 Ok(health_monitor.get_health_summary().await)
472 } else {
473 Err("Health monitor not available".to_string())
474 }
475 }
476
477 pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
479 let mut stats = Vec::new();
480
481 for processor in &self.processors {
482 let processor_stats = processor.get_stats().await;
483 stats.push((processor.channel_id, processor_stats));
484 }
485
486 stats
487 }
488
489 pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
491 let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
492
493 for (&symbol, &channel_id) in &self.symbol_mapping {
494 distribution.entry(channel_id).or_default().push(symbol);
495 }
496
497 distribution
498 }
499
500 pub async fn unsubscribe_symbols(
502 &mut self,
503 symbols: &[u32],
504 ) -> Result<(), String> {
505 log::info!("Unsubscribing from {} symbols", symbols.len());
506
507 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
509
510 for &symbol in symbols {
511 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
512 connection_symbols
513 .entry(channel_id)
514 .or_default()
515 .push(symbol);
516 self.symbol_mapping.remove(&symbol);
517 } else {
518 log::debug!("Symbol {} not found in subscriptions", symbol);
519 }
520 }
521
522 for (channel_id, symbols) in connection_symbols {
524 let connection = &mut self.connections[channel_id.to_index()];
525
526 if !symbols.is_empty() {
527 connection.remove_symbols(&symbols).await.map_err(|e| {
528 format!(
529 "Failed to unsubscribe from connection {:?}: {}",
530 channel_id, e
531 )
532 })?;
533
534 log::info!(
535 "Unsubscribed {} symbols from connection {:?}",
536 symbols.len(),
537 channel_id
538 );
539 }
540 }
541
542 log::info!("Successfully unsubscribed from {} symbols", symbols.len());
543 Ok(())
544 }
545
546 pub async fn change_mode(
548 &mut self,
549 symbols: &[u32],
550 mode: Mode,
551 ) -> Result<(), String> {
552 log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
553
554 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
556
557 for &symbol in symbols {
558 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
559 connection_symbols
560 .entry(channel_id)
561 .or_default()
562 .push(symbol);
563 } else {
564 log::debug!("Symbol {} not found in subscriptions", symbol);
565 }
566 }
567
568 for (channel_id, symbols) in connection_symbols {
570 let connection = &mut self.connections[channel_id.to_index()];
571 if symbols.is_empty() {
572 continue;
573 }
574 if let Some(ref cmd) = connection.cmd_tx {
576 let mode_req = crate::models::Request::mode(mode, &symbols).to_string();
577 let _ = cmd.send(tokio_tungstenite::tungstenite::Message::Text(
578 mode_req.into(),
579 ));
580 for &s in &symbols {
581 connection.subscribed_symbols.insert(s, mode);
582 }
583 log::info!(
584 "Changed mode for {} symbols on connection {:?}",
585 symbols.len(),
586 channel_id
587 );
588 } else if let Some(subscriber) = &mut connection.subscriber {
589 subscriber.set_mode(&symbols, mode).await.map_err(|e| {
591 format!(
592 "Failed to change mode on connection {:?}: {}",
593 channel_id, e
594 )
595 })?;
596 for &s in &symbols {
597 connection.subscribed_symbols.insert(s, mode);
598 }
599 }
600 }
601
602 log::info!("Successfully changed mode for {} symbols", symbols.len());
603 Ok(())
604 }
605
606 pub async fn stop(&mut self) -> Result<(), String> {
608 log::info!("Stopping KiteTickerManager");
609
610 if let Some(health_monitor) = &mut self.health_monitor {
612 health_monitor.stop().await;
613 }
614
615 for processor in &mut self.processors {
617 processor.stop().await;
618 }
619
620 for connection in &mut self.connections {
622 if let Some(handle) = connection.task_handle.take() {
623 handle.abort();
624 let _ = handle.await;
625 }
626 }
627
628 log::info!("KiteTickerManager stopped");
629 Ok(())
630 }
631}