kiteticker_async_manager/manager/
connection_manager.rs1use crate::manager::{
16 ChannelId, ConnectionStats, HealthMonitor, HealthSummary, KiteManagerConfig,
17 ManagedConnection, ManagerStats, MessageProcessor, ProcessorStats,
18};
19use crate::models::{Mode, TickerMessage};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::sync::{broadcast, mpsc, RwLock};
24
25#[derive(Debug)]
31pub struct KiteTickerManager {
32 config: KiteManagerConfig,
34
35 api_key: String,
37 access_token: String,
38
39 connections: Vec<ManagedConnection>,
41
42 processors: Vec<MessageProcessor>,
44
45 output_channels: Vec<broadcast::Receiver<TickerMessage>>,
47
48 symbol_mapping: HashMap<u32, ChannelId>,
50
51 health_monitor: Option<HealthMonitor>,
53
54 next_connection_index: usize,
56
57 #[allow(dead_code)]
59 start_time: Instant,
60 raw_only: bool,
62}
63
64#[derive(Debug, Clone)]
66pub struct KiteTickerManagerBuilder {
67 api_key: String,
68 access_token: String,
69 config: KiteManagerConfig,
70 raw_only: bool,
71}
72
73impl KiteTickerManagerBuilder {
74 pub fn new(
76 api_key: impl Into<String>,
77 access_token: impl Into<String>,
78 ) -> Self {
79 Self {
80 api_key: api_key.into(),
81 access_token: access_token.into(),
82 config: KiteManagerConfig::default(),
83 raw_only: false,
84 }
85 }
86
87 pub fn max_connections(mut self, n: usize) -> Self {
88 self.config.max_connections = n;
89 self
90 }
91 pub fn max_symbols_per_connection(mut self, n: usize) -> Self {
92 self.config.max_symbols_per_connection = n;
93 self
94 }
95 pub fn connection_timeout(mut self, d: std::time::Duration) -> Self {
96 self.config.connection_timeout = d;
97 self
98 }
99 pub fn health_check_interval(mut self, d: std::time::Duration) -> Self {
100 self.config.health_check_interval = d;
101 self
102 }
103 pub fn reconnect_attempts(mut self, attempts: usize) -> Self {
104 self.config.max_reconnect_attempts = attempts;
105 self
106 }
107 pub fn reconnect_delay(mut self, d: std::time::Duration) -> Self {
108 self.config.reconnect_delay = d;
109 self
110 }
111 pub fn enable_dedicated_parsers(mut self, enable: bool) -> Self {
112 self.config.enable_dedicated_parsers = enable;
113 self
114 }
115 pub fn default_mode(mut self, mode: Mode) -> Self {
116 self.config.default_mode = mode;
117 self
118 }
119 pub fn connection_buffer_size(mut self, sz: usize) -> Self {
120 self.config.connection_buffer_size = sz;
121 self
122 }
123 pub fn parser_buffer_size(mut self, sz: usize) -> Self {
124 self.config.parser_buffer_size = sz;
125 self
126 }
127 pub fn raw_only(mut self, raw: bool) -> Self {
128 self.raw_only = raw;
129 self
130 }
131
132 pub fn config(mut self, config: KiteManagerConfig) -> Self {
134 self.config = config;
135 self
136 }
137
138 pub fn build(self) -> KiteTickerManager {
140 KiteTickerManager::new(self.api_key, self.access_token, self.config)
141 .with_raw_only(self.raw_only)
142 }
143}
144
145impl KiteTickerManager {
146 pub fn new(
177 api_key: String,
178 access_token: String,
179 config: KiteManagerConfig,
180 ) -> Self {
181 Self {
182 config,
183 api_key,
184 access_token,
185 connections: Vec::new(),
186 processors: Vec::new(),
187 output_channels: Vec::new(),
188 symbol_mapping: HashMap::new(),
189 health_monitor: None,
190 next_connection_index: 0,
191 start_time: Instant::now(),
192 raw_only: false,
193 }
194 }
195
196 pub fn with_raw_only(mut self, raw: bool) -> Self {
198 self.raw_only = raw;
199 self
200 }
201
202 pub async fn start(&mut self) -> Result<(), String> {
204 log::info!(
205 "Starting KiteTickerManager with {} connections",
206 self.config.max_connections
207 );
208
209 for i in 0..self.config.max_connections {
211 let channel_id = ChannelId::from_index(i)
212 .ok_or_else(|| format!("Invalid connection index: {}", i))?;
213
214 let (connection_sender, processor_receiver) = mpsc::unbounded_channel();
216
217 let mut connection =
219 ManagedConnection::new(channel_id, connection_sender);
220
221 if self.raw_only {
223 connection
224 .connect_with_raw(
225 &self.api_key,
226 &self.access_token,
227 &self.config,
228 true,
229 )
230 .await
231 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
232 } else {
233 connection
234 .connect(&self.api_key, &self.access_token, &self.config)
235 .await
236 .map_err(|e| format!("Failed to connect WebSocket {}: {}", i, e))?;
237 }
238
239 let (mut processor, output_receiver) = MessageProcessor::new(
241 channel_id,
242 processor_receiver,
243 self.config.parser_buffer_size,
244 );
245
246 if self.config.enable_dedicated_parsers {
248 processor.start();
249 log::info!("Started dedicated parser for connection {}", i);
250 }
251
252 self.connections.push(connection);
253 self.processors.push(processor);
254 self.output_channels.push(output_receiver);
255 }
256
257 if self.config.health_check_interval.as_secs() > 0 {
259 let connection_stats: Vec<Arc<RwLock<ConnectionStats>>> = self
260 .connections
261 .iter()
262 .map(|c| Arc::clone(&c.stats))
263 .collect();
264
265 let mut health_monitor =
266 HealthMonitor::new(connection_stats, self.config.health_check_interval);
267 health_monitor.start();
268 self.health_monitor = Some(health_monitor);
269
270 log::info!("Started health monitor");
271 }
272
273 log::info!(
274 "KiteTickerManager started successfully with {} connections",
275 self.connections.len()
276 );
277
278 Ok(())
279 }
280
281 pub async fn subscribe_symbols(
283 &mut self,
284 symbols: &[u32],
285 mode: Option<Mode>,
286 ) -> Result<(), String> {
287 let mode = mode.unwrap_or(self.config.default_mode);
288
289 log::info!(
290 "Subscribing to {} symbols with mode: {:?}",
291 symbols.len(),
292 mode
293 );
294
295 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
297
298 for &symbol in symbols {
299 if self.symbol_mapping.contains_key(&symbol) {
301 log::debug!("Symbol {} already subscribed", symbol);
302 continue;
303 }
304
305 let connection_id = self.find_available_connection()?;
307
308 self.symbol_mapping.insert(symbol, connection_id);
310 connection_symbols
311 .entry(connection_id)
312 .or_default()
313 .push(symbol);
314 }
315
316 for (connection_id, symbols) in connection_symbols {
318 let connection = &mut self.connections[connection_id.to_index()];
319 let mode_clone = mode; if !symbols.is_empty() {
322 if connection.subscribed_symbols.is_empty() {
324 connection
325 .subscribe_symbols(&symbols, mode_clone)
326 .await
327 .map_err(|e| {
328 format!(
329 "Failed to subscribe on connection {:?}: {}",
330 connection_id, e
331 )
332 })?;
333 } else {
334 connection
335 .add_symbols(&symbols, mode_clone)
336 .await
337 .map_err(|e| {
338 format!(
339 "Failed to add symbols on connection {:?}: {}",
340 connection_id, e
341 )
342 })?;
343 }
344
345 log::info!(
346 "Subscribed {} symbols on connection {:?}",
347 symbols.len(),
348 connection_id
349 );
350 }
351 }
352
353 log::info!("Successfully subscribed to {} new symbols", symbols.len());
354 Ok(())
355 }
356
357 fn find_available_connection(&mut self) -> Result<ChannelId, String> {
359 let _start_index = self.next_connection_index;
360
361 for _ in 0..self.config.max_connections {
363 let connection = &self.connections[self.next_connection_index];
364
365 if connection
366 .can_accept_symbols(1, self.config.max_symbols_per_connection)
367 {
368 let channel_id = connection.id;
369 self.next_connection_index =
370 (self.next_connection_index + 1) % self.config.max_connections;
371 return Ok(channel_id);
372 }
373
374 self.next_connection_index =
375 (self.next_connection_index + 1) % self.config.max_connections;
376 }
377
378 Err("All connections are at capacity".to_string())
379 }
380
381 pub fn get_channel(
383 &mut self,
384 channel_id: ChannelId,
385 ) -> Option<broadcast::Receiver<TickerMessage>> {
386 if channel_id.to_index() < self.output_channels.len() {
387 Some(self.output_channels[channel_id.to_index()].resubscribe())
388 } else {
389 None
390 }
391 }
392
393 pub fn get_all_channels(
395 &mut self,
396 ) -> Vec<(ChannelId, broadcast::Receiver<TickerMessage>)> {
397 let mut channels = Vec::new();
398
399 for (i, channel) in self.output_channels.iter().enumerate() {
400 if let Some(channel_id) = ChannelId::from_index(i) {
401 channels.push((channel_id, channel.resubscribe()));
402 }
403 }
404
405 channels
406 }
407
408 pub async fn get_stats(&self) -> Result<ManagerStats, String> {
410 if let Some(health_monitor) = &self.health_monitor {
411 Ok(health_monitor.get_manager_stats().await)
412 } else {
413 Err("Health monitor not available".to_string())
414 }
415 }
416
417 pub async fn get_health(&self) -> Result<HealthSummary, String> {
419 if let Some(health_monitor) = &self.health_monitor {
420 Ok(health_monitor.get_health_summary().await)
421 } else {
422 Err("Health monitor not available".to_string())
423 }
424 }
425
426 pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)> {
428 let mut stats = Vec::new();
429
430 for processor in &self.processors {
431 let processor_stats = processor.get_stats().await;
432 stats.push((processor.channel_id, processor_stats));
433 }
434
435 stats
436 }
437
438 pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>> {
440 let mut distribution: HashMap<ChannelId, Vec<u32>> = HashMap::new();
441
442 for (&symbol, &channel_id) in &self.symbol_mapping {
443 distribution.entry(channel_id).or_default().push(symbol);
444 }
445
446 distribution
447 }
448
449 pub async fn unsubscribe_symbols(
451 &mut self,
452 symbols: &[u32],
453 ) -> Result<(), String> {
454 log::info!("Unsubscribing from {} symbols", symbols.len());
455
456 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
458
459 for &symbol in symbols {
460 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
461 connection_symbols
462 .entry(channel_id)
463 .or_default()
464 .push(symbol);
465 self.symbol_mapping.remove(&symbol);
466 } else {
467 log::debug!("Symbol {} not found in subscriptions", symbol);
468 }
469 }
470
471 for (channel_id, symbols) in connection_symbols {
473 let connection = &mut self.connections[channel_id.to_index()];
474
475 if !symbols.is_empty() {
476 connection.remove_symbols(&symbols).await.map_err(|e| {
477 format!(
478 "Failed to unsubscribe from connection {:?}: {}",
479 channel_id, e
480 )
481 })?;
482
483 log::info!(
484 "Unsubscribed {} symbols from connection {:?}",
485 symbols.len(),
486 channel_id
487 );
488 }
489 }
490
491 log::info!("Successfully unsubscribed from {} symbols", symbols.len());
492 Ok(())
493 }
494
495 pub async fn change_mode(
497 &mut self,
498 symbols: &[u32],
499 mode: Mode,
500 ) -> Result<(), String> {
501 log::info!("Changing mode for {} symbols to {:?}", symbols.len(), mode);
502
503 let mut connection_symbols: HashMap<ChannelId, Vec<u32>> = HashMap::new();
505
506 for &symbol in symbols {
507 if let Some(&channel_id) = self.symbol_mapping.get(&symbol) {
508 connection_symbols
509 .entry(channel_id)
510 .or_default()
511 .push(symbol);
512 } else {
513 log::debug!("Symbol {} not found in subscriptions", symbol);
514 }
515 }
516
517 for (channel_id, symbols) in connection_symbols {
519 let connection = &mut self.connections[channel_id.to_index()];
520 if symbols.is_empty() {
521 continue;
522 }
523 if let Some(ref cmd) = connection.cmd_tx {
525 let mode_req = crate::models::Request::mode(mode, &symbols).to_string();
526 let _ =
527 cmd.send(tokio_tungstenite::tungstenite::Message::Text(mode_req));
528 for &s in &symbols {
529 connection.subscribed_symbols.insert(s, mode);
530 }
531 log::info!(
532 "Changed mode for {} symbols on connection {:?}",
533 symbols.len(),
534 channel_id
535 );
536 } else if let Some(subscriber) = &mut connection.subscriber {
537 subscriber.set_mode(&symbols, mode).await.map_err(|e| {
539 format!(
540 "Failed to change mode on connection {:?}: {}",
541 channel_id, e
542 )
543 })?;
544 for &s in &symbols {
545 connection.subscribed_symbols.insert(s, mode);
546 }
547 }
548 }
549
550 log::info!("Successfully changed mode for {} symbols", symbols.len());
551 Ok(())
552 }
553
554 pub async fn stop(&mut self) -> Result<(), String> {
556 log::info!("Stopping KiteTickerManager");
557
558 if let Some(health_monitor) = &mut self.health_monitor {
560 health_monitor.stop().await;
561 }
562
563 for processor in &mut self.processors {
565 processor.stop().await;
566 }
567
568 for connection in &mut self.connections {
570 if let Some(handle) = connection.task_handle.take() {
571 handle.abort();
572 let _ = handle.await;
573 }
574 }
575
576 log::info!("KiteTickerManager stopped");
577 Ok(())
578 }
579}