dynamic_subscription_demo/
dynamic_subscription_demo.rs

1use kiteticker_async_manager::{
2  KiteManagerConfig, KiteTickerManager, Mode, TickerMessage,
3};
4use log::debug;
5use std::time::{Duration, Instant};
6use tokio::time::{sleep, timeout};
7
8#[tokio::main]
9async fn main() -> Result<(), String> {
10  // Initialize logging
11  env_logger::init();
12
13  println!("๐Ÿ”„ KiteTicker Dynamic Subscription Demo");
14  println!("โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
15
16  // Debug: Show current logging configuration
17  debug!("Logging initialized");
18  debug!(
19    "Current log level: {}",
20    std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string())
21  );
22  debug!("For detailed manager logs, set RUST_LOG=debug");
23
24  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
25  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
26
27  if api_key.is_empty() || access_token.is_empty() {
28    println!(
29      "โš ๏ธ  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
30    );
31    demonstrate_offline_dynamic_architecture().await;
32    return Ok(());
33  }
34
35  println!(
36    "๐Ÿ’ก Note: This demo shows enhanced tick printing with detailed market data"
37  );
38  println!("๐Ÿ“… During market hours, you'll see live tick data with OHLC, volume, and market depth");
39  println!("๐Ÿ• Outside market hours, connections work but no tick data flows");
40  println!();
41
42  // Create configuration optimized for dynamic operations
43  let config = KiteManagerConfig {
44    max_symbols_per_connection: 3000,
45    max_connections: 3,
46    connection_buffer_size: 5000,
47    parser_buffer_size: 10000,
48    connection_timeout: Duration::from_secs(30),
49    health_check_interval: Duration::from_secs(5),
50    max_reconnect_attempts: 5,
51    reconnect_delay: Duration::from_secs(2),
52    enable_dedicated_parsers: true,
53    default_mode: Mode::LTP, // Start with LTP for efficiency
54  };
55
56  println!("๐Ÿ”ง Configuration for Dynamic Operations:");
57  println!("   Max connections: {}", config.max_connections);
58  println!(
59    "   Max symbols per connection: {}",
60    config.max_symbols_per_connection
61  );
62  println!("   Default mode: {:?}", config.default_mode);
63  println!();
64
65  // Create and start the manager
66  println!("๐Ÿ“ก Starting multi-connection manager...");
67  let start_time = Instant::now();
68
69  let mut manager = KiteTickerManager::new(api_key, access_token, config);
70
71  match timeout(Duration::from_secs(30), manager.start()).await {
72    Ok(Ok(())) => {
73      println!("โœ… Manager started in {:?}", start_time.elapsed());
74    }
75    Ok(Err(e)) => {
76      println!("โŒ Manager failed to start: {}", e);
77      return Err(e);
78    }
79    Err(_) => {
80      println!("โฑ๏ธ  Manager startup timeout");
81      return Err("Manager startup timeout".to_string());
82    }
83  }
84
85  // Start permanent tick listeners IMMEDIATELY after manager starts
86  println!("๐ŸŽฏ Starting permanent tick listeners...");
87  let channels = manager.get_all_channels();
88  let mut permanent_listeners = Vec::new();
89
90  for (channel_id, mut receiver) in channels {
91    let listener_task = tokio::spawn(async move {
92      let mut total_ticks = 0;
93      loop {
94        match receiver.recv().await {
95          Ok(message) => {
96            if let TickerMessage::Ticks(ticks) = message {
97              total_ticks += ticks.len();
98              println!(
99                "๐ŸŽฏ PERMANENT LISTENER {:?}: {} ticks (total: {})",
100                channel_id,
101                ticks.len(),
102                total_ticks
103              );
104
105              // Debug: Print raw tick data structure
106              debug!("Raw tick count: {}", ticks.len());
107
108              for (i, tick) in ticks.iter().enumerate() {
109                println!("    ๐Ÿ”น Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}, Mode: {:?}",
110                                    i + 1,
111                                    tick.instrument_token,
112                                    tick.content.last_price,
113                                    tick.content.volume_traded,
114                                    tick.content.net_change,
115                                    tick.content.mode
116                                );
117
118                // Debug: Show all available fields
119                debug!(
120                  "DEBUG Full Tick Data for symbol {}:",
121                  tick.instrument_token
122                );
123                debug!("  - Instrument Token: {}", tick.instrument_token);
124                debug!("  - Timestamp: <not available>");
125                debug!("  - Tradable: <not available>");
126                debug!(
127                  "  - Exchange Timestamp: {:?}",
128                  tick.content.exchange_timestamp
129                );
130                debug!("  - Last Traded Time: <not available>");
131                debug!(
132                  "  - Total Buy Quantity: {:?}",
133                  tick.content.total_buy_qty
134                );
135                debug!(
136                  "  - Total Sell Quantity: {:?}",
137                  tick.content.total_sell_qty
138                );
139                debug!(
140                  "  - Average Price: {:?}",
141                  tick.content.avg_traded_price
142                );
143                debug!("  - OI: {:?}", tick.content.oi);
144                debug!("  - OI Day High: {:?}", tick.content.oi_day_high);
145                debug!("  - OI Day Low: {:?}", tick.content.oi_day_low);
146
147                // Show OHLC data if available
148                if let Some(ohlc) = &tick.content.ohlc {
149                  println!(
150                    "      ๐Ÿ“Š OHLC: O:{} H:{} L:{} C:{}",
151                    ohlc.open, ohlc.high, ohlc.low, ohlc.close
152                  );
153                  debug!(
154                    "OHLC data present: O:{} H:{} L:{} C:{}",
155                    ohlc.open, ohlc.high, ohlc.low, ohlc.close
156                  );
157                } else {
158                  debug!(
159                    "OHLC: Not available for symbol {}",
160                    tick.instrument_token
161                  );
162                }
163
164                // Show additional data for full mode
165                if tick.content.mode == Mode::Full {
166                  if let Some(depth) = &tick.content.depth {
167                    println!(
168                      "      ๐Ÿ“ˆ Market Depth: {} buy orders, {} sell orders",
169                      depth.buy.len(),
170                      depth.sell.len()
171                    );
172
173                    // Debug: Show first few depth entries
174                    debug!(
175                      "Market depth for symbol {}: {} buy, {} sell",
176                      tick.instrument_token,
177                      depth.buy.len(),
178                      depth.sell.len()
179                    );
180
181                    if !depth.buy.is_empty() {
182                      debug!(
183                        "Top Buy Orders for symbol {}:",
184                        tick.instrument_token
185                      );
186                      for (idx, buy_order) in
187                        depth.buy.iter().take(3).enumerate()
188                      {
189                        debug!(
190                          "  {}. Price: {}, Qty: {}, Orders: {}",
191                          idx + 1,
192                          buy_order.price,
193                          buy_order.qty,
194                          buy_order.orders
195                        );
196                      }
197                    }
198                    if !depth.sell.is_empty() {
199                      debug!(
200                        "Top Sell Orders for symbol {}:",
201                        tick.instrument_token
202                      );
203                      for (idx, sell_order) in
204                        depth.sell.iter().take(3).enumerate()
205                      {
206                        debug!(
207                          "  {}. Price: {}, Qty: {}, Orders: {}",
208                          idx + 1,
209                          sell_order.price,
210                          sell_order.qty,
211                          sell_order.orders
212                        );
213                      }
214                    }
215                  } else {
216                    debug!(
217                      "Market Depth: Not available for symbol {}",
218                      tick.instrument_token
219                    );
220                  }
221                }
222              }
223            }
224          }
225          Err(e) => {
226            println!("โŒ Listener {:?} error: {}", channel_id, e);
227            break;
228          }
229        }
230      }
231    });
232    permanent_listeners.push(listener_task);
233  }
234
235  // Give listeners a moment to initialize
236  sleep(Duration::from_millis(100)).await;
237  println!("โœ… Permanent listeners started and ready");
238
239  // Demo: Multi-connection distribution workflow
240  demo_dynamic_subscription(&mut manager).await?;
241
242  // Stop the manager
243  println!("\n๐Ÿ›‘ Stopping manager...");
244  manager.stop().await?;
245
246  println!("๐Ÿ Dynamic subscription demo completed successfully!");
247  Ok(())
248}
249
250async fn demo_dynamic_subscription(
251  manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253  println!("\n๐ŸŽฏ True Dynamic Subscription Demo");
254  println!("==================================");
255
256  // Start with a small initial set
257  let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258  let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259  let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260  let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261  let final_batch = vec![257801, 258825]; // Add final symbols
262
263  // Step 1: Initial subscription with small set
264  println!(
265    "\n๐Ÿ“Š Step 1: Initial subscription to {} symbols",
266    initial_symbols.len()
267  );
268  println!("Starting with: {:?}", initial_symbols);
269
270  // Start listening for ticks BEFORE subscribing
271  let channels_before_sub = manager.get_all_channels();
272  let mut tick_listeners = Vec::new();
273
274  for (channel_id, mut receiver) in channels_before_sub {
275    let task = tokio::spawn(async move {
276      let start = std::time::Instant::now();
277      let mut tick_count = 0;
278
279      // Listen for initial ticks for 5 seconds
280      while start.elapsed() < Duration::from_secs(5) {
281        match timeout(Duration::from_millis(500), receiver.recv()).await {
282          Ok(Ok(message)) => {
283            if let TickerMessage::Ticks(ticks) = message {
284              tick_count += ticks.len();
285              println!(
286                "๐ŸŽฏ {:?}: Received {} ticks immediately after subscription!",
287                channel_id,
288                ticks.len()
289              );
290
291              // Debug: Show detailed tick information
292              debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294              for (idx, tick) in ticks.iter().enumerate() {
295                println!("  ๐Ÿ”น Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296                                    idx + 1,
297                                    tick.instrument_token,
298                                    tick.content.last_price,
299                                    tick.content.volume_traded,
300                                    tick.content.net_change
301                                );
302
303                // Debug: Show tick structure details
304                debug!(
305                  "Tick {} Details for symbol {}:",
306                  idx + 1,
307                  tick.instrument_token
308                );
309                debug!("  - Raw instrument_token: {}", tick.instrument_token);
310                debug!("  - Raw last_price: {:?}", tick.content.last_price);
311                debug!(
312                  "  - Raw volume_traded: {:?}",
313                  tick.content.volume_traded
314                );
315                debug!("  - Raw change: {:?}", tick.content.net_change);
316                debug!("  - Raw last_quantity: <not available>");
317                debug!(
318                  "  - Raw average_price: {:?}",
319                  tick.content.avg_traded_price
320                );
321                debug!(
322                  "  - Raw buy_quantity: {:?}",
323                  tick.content.total_buy_qty
324                );
325                debug!(
326                  "  - Raw sell_quantity: {:?}",
327                  tick.content.total_sell_qty
328                );
329                debug!("  - Raw oi: {:?}", tick.content.oi);
330                debug!("  - Raw mode: {:?}", tick.content.mode);
331
332                if let Some(ohlc) = &tick.content.ohlc {
333                  debug!(
334                    "  - OHLC available: O:{} H:{} L:{} C:{}",
335                    ohlc.open, ohlc.high, ohlc.low, ohlc.close
336                  );
337                }
338
339                if let Some(depth) = &tick.content.depth {
340                  debug!(
341                    "  - Market depth available: {} buy, {} sell",
342                    depth.buy.len(),
343                    depth.sell.len()
344                  );
345                }
346              }
347            }
348          }
349          _ => continue,
350        }
351      }
352      (channel_id, tick_count)
353    });
354    tick_listeners.push(task);
355  }
356
357  // Now subscribe to symbols
358  manager
359    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360    .await?;
361
362  println!("โœ… Subscription sent, waiting for initial ticks...");
363
364  // Wait for the listeners to finish
365  for task in tick_listeners {
366    if let Ok((channel_id, count)) = task.await {
367      println!(
368        "๐Ÿ“Š {:?}: Received {} total ticks during initial subscription",
369        channel_id, count
370      );
371    }
372  }
373
374  print_distribution(manager, "After initial subscription").await;
375
376  // Step 2: Wait and monitor initial data
377  println!("\nโณ Step 2: Monitoring initial data flow");
378  monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380  if let Ok(stats) = manager.get_stats().await {
381    println!("โœ… Current Statistics:");
382    println!("   Active connections: {}", stats.active_connections);
383    println!("   Total symbols: {}", stats.total_symbols);
384    println!("   Total messages: {}", stats.total_messages_received);
385  }
386
387  // Step 3: Dynamic addition - Batch 1
388  println!(
389    "\nโž• Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390    additional_batch_1.len()
391  );
392  println!("Adding: {:?}", additional_batch_1);
393  manager
394    .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395    .await?;
396
397  // Give time for new tick data to arrive
398  sleep(Duration::from_secs(2)).await;
399
400  print_distribution(manager, "After first dynamic addition").await;
401  monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403  // Step 4: Dynamic addition - Batch 2
404  println!(
405    "\nโž• Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406    additional_batch_2.len()
407  );
408  println!("Adding: {:?}", additional_batch_2);
409  manager
410    .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411    .await?;
412
413  // Give time for new tick data to arrive
414  sleep(Duration::from_secs(2)).await;
415
416  print_distribution(manager, "After second dynamic addition").await;
417  monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419  // Step 5: Dynamic removal
420  println!(
421    "\nโž– Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422    symbols_to_remove.len()
423  );
424  println!("Removing: {:?}", symbols_to_remove);
425  match manager.unsubscribe_symbols(&symbols_to_remove).await {
426    Ok(()) => {
427      print_distribution(manager, "After dynamic removal").await;
428      println!("โœ… Dynamic removal successful!");
429    }
430    Err(e) => {
431      println!("โš ๏ธ  Dynamic removal failed: {}", e);
432    }
433  }
434  monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436  // Step 6: Final addition and mode change demo
437  println!(
438    "\nโž• Step 6: FINAL ADDITION - Adding {} symbols",
439    final_batch.len()
440  );
441  println!("Adding: {:?}", final_batch);
442  manager
443    .subscribe_symbols(&final_batch, Some(Mode::LTP))
444    .await?;
445
446  print_distribution(manager, "After final addition").await;
447
448  // Step 7: Mode change demonstration
449  println!("\n๐Ÿ”„ Step 7: MODE CHANGE - Changing subscription mode");
450  let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451  println!("Changing {:?} to Full mode", symbols_for_mode_change);
452  match manager
453    .change_mode(&symbols_for_mode_change, Mode::Full)
454    .await
455  {
456    Ok(()) => println!("โœ… Mode change successful!"),
457    Err(e) => println!("โš ๏ธ  Mode change failed: {}", e),
458  }
459
460  // Step 8: Final statistics and monitoring
461  println!("\n๐Ÿ“ˆ Step 8: Final Statistics and Performance");
462  sleep(Duration::from_secs(3)).await; // Let some data flow
463
464  if let Ok(stats) = manager.get_stats().await {
465    println!("โœ… Final Manager Statistics:");
466    println!("   Active connections: {}", stats.active_connections);
467    println!("   Total symbols: {}", stats.total_symbols);
468    println!("   Total messages: {}", stats.total_messages_received);
469    println!("   Total errors: {}", stats.total_errors);
470
471    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472      println!(
473        "   Connection {}: {} symbols, {} messages",
474        i + 1,
475        conn_stats.symbol_count,
476        conn_stats.messages_received
477      );
478    }
479  }
480
481  // Step 9: Show processor performance
482  println!("\nโšก Step 9: Final Parser Performance");
483  let processor_stats = manager.get_processor_stats().await;
484  for (channel_id, stats) in processor_stats {
485    println!(
486      "   {:?}: {:.1} msg/sec, {:?} avg latency",
487      channel_id, stats.messages_per_second, stats.processing_latency_avg
488    );
489  }
490
491  // Step 10: Monitor live data for a short period
492  println!("\n๐Ÿ“บ Step 10: Live Data Monitoring (10 seconds)");
493  println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495  let channels = manager.get_all_channels();
496  let mut tasks = Vec::new();
497
498  for (channel_id, mut receiver) in channels {
499    let task = tokio::spawn(async move {
500      let mut count = 0;
501      let start = std::time::Instant::now();
502
503      while start.elapsed() < Duration::from_secs(10) {
504        match timeout(Duration::from_secs(2), receiver.recv()).await {
505          Ok(Ok(message)) => {
506            count += 1;
507            if let TickerMessage::Ticks(ticks) = message {
508              println!("๐Ÿ“‹ {:?}: {} ticks received", channel_id, ticks.len());
509
510              // Debug: Final monitoring with comprehensive details
511              debug!(
512                "Final monitoring - Message #{}, {} ticks on {:?}",
513                count,
514                ticks.len(),
515                channel_id
516              );
517
518              for tick in &ticks {
519                println!(
520                  "  ๐Ÿ”น Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521                  tick.instrument_token,
522                  tick.content.last_price,
523                  tick.content.volume_traded,
524                  tick.content.net_change
525                );
526
527                // Debug: Show complete tick analysis
528                debug!(
529                  "Final Debug Analysis for Symbol {}:",
530                  tick.instrument_token
531                );
532                debug!(
533                  "  - Timestamp: Now = {:?}",
534                  std::time::SystemTime::now()
535                );
536                debug!("  - Data completeness check:");
537                debug!("    * Last Price: {:?} (โœ…)", tick.content.last_price);
538                debug!(
539                  "    * Volume: {:?} ({})",
540                  tick.content.volume_traded,
541                  if tick.content.volume_traded.is_some() {
542                    "โœ…"
543                  } else {
544                    "โŒ"
545                  }
546                );
547                debug!(
548                  "    * Change: {:?} ({})",
549                  tick.content.net_change,
550                  if tick.content.net_change.is_some() {
551                    "โœ…"
552                  } else {
553                    "โŒ"
554                  }
555                );
556                debug!("    * Mode: {:?} (โœ…)", tick.content.mode);
557
558                // Validate mode-specific data
559                match tick.content.mode {
560                  Mode::Full => {
561                    debug!("  - Full mode validation:");
562                    debug!(
563                      "    * OHLC: {}",
564                      if tick.content.ohlc.is_some() {
565                        "โœ… Present"
566                      } else {
567                        "โŒ Missing"
568                      }
569                    );
570                    debug!(
571                      "    * Depth: {}",
572                      if tick.content.depth.is_some() {
573                        "โœ… Present"
574                      } else {
575                        "โŒ Missing"
576                      }
577                    );
578                    if let Some(ohlc) = &tick.content.ohlc {
579                      debug!(
580                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
581                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
582                      );
583                    }
584                    if let Some(depth) = &tick.content.depth {
585                      debug!(
586                        "    * Depth Levels: {} buy, {} sell",
587                        depth.buy.len(),
588                        depth.sell.len()
589                      );
590                    }
591                  }
592                  Mode::Quote => {
593                    debug!("  - Quote mode validation:");
594                    debug!(
595                      "    * OHLC: {}",
596                      if tick.content.ohlc.is_some() {
597                        "โœ… Present"
598                      } else {
599                        "โŒ Missing"
600                      }
601                    );
602                    if let Some(ohlc) = &tick.content.ohlc {
603                      debug!(
604                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
605                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
606                      );
607                    }
608                  }
609                  Mode::LTP => {
610                    debug!("  - LTP mode validation: โœ… Basic data only");
611                  }
612                }
613              }
614            } else {
615              println!("๐Ÿ“‹ {:?}: Non-tick message received", channel_id);
616              debug!("Message type: {:?}", message);
617            }
618          }
619          _ => continue,
620        }
621      }
622      (channel_id, count)
623    });
624    tasks.push(task);
625  }
626
627  // Wait for monitoring to complete
628  for task in tasks {
629    if let Ok((channel_id, count)) = task.await {
630      println!(
631        "๐Ÿ“Š {:?}: {} total messages in 10 seconds",
632        channel_id, count
633      );
634    }
635  }
636
637  // Final cleanup demonstration
638  println!("\n๐Ÿงน Step 11: Final Cleanup Demonstration");
639  let current_symbols: Vec<u32> = manager
640    .get_symbol_distribution()
641    .values()
642    .flat_map(|symbols| symbols.iter().cloned())
643    .collect();
644
645  println!(
646    "Attempting to unsubscribe from {} remaining symbols...",
647    current_symbols.len()
648  );
649
650  match manager.unsubscribe_symbols(&current_symbols).await {
651    Ok(()) => {
652      print_distribution(manager, "After complete cleanup").await;
653      println!("โœ… Complete cleanup successful!");
654    }
655    Err(e) => {
656      println!("โš ๏ธ  Cleanup encountered issues: {}", e);
657      println!("๐Ÿ’ก This demonstrates the current architecture capabilities");
658    }
659  }
660
661  println!("\nโœ… Dynamic Subscription Demo Completed!");
662  println!("๐ŸŽฏ Key Dynamic Features Demonstrated:");
663  println!("   โœ… Runtime symbol addition across multiple batches");
664  println!("   โœ… Runtime symbol removal with proper tracking");
665  println!("   โœ… Dynamic mode changes for existing symbols");
666  println!("   โœ… Real-time capacity distribution and monitoring");
667  println!("   โœ… Independent message processing per connection");
668  println!("   โœ… High-performance parser tasks with statistics");
669  println!("   โœ… Complete subscription lifecycle management");
670
671  Ok(())
672}
673
674async fn print_distribution(manager: &KiteTickerManager, context: &str) {
675  let distribution = manager.get_symbol_distribution();
676  println!("\n๐Ÿ“ˆ Symbol Distribution ({}):", context);
677
678  let mut total = 0;
679  for (channel_id, symbols) in &distribution {
680    println!("   {:?}: {} symbols", channel_id, symbols.len());
681    total += symbols.len();
682  }
683  println!("   Total: {} symbols", total);
684}
685
686async fn monitor_ticks_briefly(
687  manager: &mut KiteTickerManager,
688  duration_secs: u64,
689  context: &str,
690) {
691  println!(
692    "\n๐Ÿ“บ {} - Monitoring ticks for {} seconds...",
693    context, duration_secs
694  );
695
696  let channels = manager.get_all_channels();
697  let mut tasks = Vec::new();
698
699  for (channel_id, mut receiver) in channels {
700    let task = tokio::spawn(async move {
701      let mut count = 0;
702      let start = std::time::Instant::now();
703
704      while start.elapsed() < Duration::from_secs(duration_secs) {
705        match timeout(Duration::from_secs(2), receiver.recv()).await {
706          Ok(Ok(message)) => {
707            count += 1;
708            if let TickerMessage::Ticks(ticks) = message {
709              println!(
710                "๐Ÿ“Š {:?}: {} ticks in batch #{}",
711                channel_id,
712                ticks.len(),
713                count
714              );
715
716              // Debug: Enhanced tick monitoring
717              debug!(
718                "Monitoring tick batch #{} with {} ticks on {:?}",
719                count,
720                ticks.len(),
721                channel_id
722              );
723
724              for (idx, tick) in ticks.iter().enumerate() {
725                println!("  ๐Ÿ”น Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}, OHLC: [{}/{}/{}/{}]",
726                                    tick.instrument_token,
727                                    tick.content.last_price,
728                                    tick.content.volume_traded,
729                                    tick.content.net_change,
730                                    tick.content.ohlc.as_ref().map(|o| o.open).unwrap_or(0.0),
731                                    tick.content.ohlc.as_ref().map(|o| o.high).unwrap_or(0.0),
732                                    tick.content.ohlc.as_ref().map(|o| o.low).unwrap_or(0.0),
733                                    tick.content.ohlc.as_ref().map(|o| o.close).unwrap_or(0.0)
734                                );
735
736                // Debug: Show tick metadata
737                debug!(
738                  "Tick {} metadata for symbol {}:",
739                  idx + 1,
740                  tick.instrument_token
741                );
742                debug!("  - Received at: {:?}", std::time::SystemTime::now());
743                debug!("  - Mode: {:?}", tick.content.mode);
744                debug!("  - Last Qty: <not available>");
745                debug!("  - Avg Price: {:?}", tick.content.avg_traded_price);
746                debug!(
747                  "  - Buy/Sell Qty: {:?}/{:?}",
748                  tick.content.total_buy_qty, tick.content.total_sell_qty
749                );
750
751                if tick.content.mode == Mode::Full
752                  || tick.content.mode == Mode::Quote
753                {
754                  if tick.content.ohlc.is_some() {
755                    debug!(
756                      "  - โœ… OHLC data present for symbol {}",
757                      tick.instrument_token
758                    );
759                  } else {
760                    debug!("  - โŒ OHLC data missing for symbol {} (expected for mode: {:?})",
761                                               tick.instrument_token, tick.content.mode);
762                  }
763                }
764
765                if tick.content.mode == Mode::Full {
766                  if let Some(depth) = &tick.content.depth {
767                    debug!("  - โœ… Market depth present for symbol {}: {} buy, {} sell levels",
768                                            tick.instrument_token, depth.buy.len(), depth.sell.len());
769                  } else {
770                    debug!("  - โŒ Market depth missing for symbol {} (expected for full mode)",
771                                               tick.instrument_token);
772                  }
773                }
774              }
775            } else {
776              debug!(
777                "Non-tick message received on {:?}: {:?}",
778                channel_id, message
779              );
780            }
781          }
782          _ => continue,
783        }
784      }
785      (channel_id, count)
786    });
787    tasks.push(task);
788  }
789
790  // Wait for monitoring to complete
791  for task in tasks {
792    if let Ok((channel_id, count)) = task.await {
793      println!(
794        "๐Ÿ“Š {:?}: {} total messages in {} seconds",
795        channel_id, count, duration_secs
796      );
797    }
798  }
799}
800
801async fn demonstrate_offline_dynamic_architecture() {
802  println!("\n๐Ÿ”„ Dynamic Subscription Architecture:");
803  println!("โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•");
804
805  println!("\n๐ŸŽฏ Key Dynamic Features:");
806  println!("   โœ… Runtime symbol addition/removal per connection");
807  println!("   โœ… Mode changes for existing symbols");
808  println!("   โœ… Intelligent load balancing across 3 connections");
809  println!("   โœ… Real-time capacity monitoring");
810  println!("   โœ… Efficient WebSocket command batching");
811
812  println!("\n๐Ÿ“Š Capacity Management:");
813  println!("   ๐Ÿ”น Connection 1: 0-3000 symbols");
814  println!("   ๐Ÿ”น Connection 2: 0-3000 symbols");
815  println!("   ๐Ÿ”น Connection 3: 0-3000 symbols");
816  println!("   ๐Ÿ”น Total Capacity: 9000 symbols");
817
818  println!("\nโšก Dynamic Operations:");
819  println!("   ```rust");
820  println!("   // Add symbols at runtime");
821  println!(
822    "   manager.subscribe_symbols(&[408065, 884737], Some(Mode::Full)).await?;"
823  );
824  println!("   ");
825  println!("   // Remove symbols");
826  println!("   manager.unsubscribe_symbols(&[408065]).await?;");
827  println!("   ");
828  println!("   // Change subscription mode");
829  println!("   manager.change_mode(&[884737], Mode::Quote).await?;");
830  println!("   ");
831  println!("   // Check distribution");
832  println!("   let distribution = manager.get_symbol_distribution();");
833  println!("   ```");
834
835  println!("\n๐Ÿš€ Performance Benefits:");
836  println!("   โšก No connection restarts needed");
837  println!("   โšก Minimal network overhead");
838  println!("   โšก Automatic load balancing");
839  println!("   โšก Real-time capacity monitoring");
840
841  println!("\n๐Ÿ’ก Use Cases:");
842  println!("   ๐Ÿ“ˆ Algorithmic trading with changing watchlists");
843  println!("   ๐Ÿ“Š Market scanners with dynamic filters");
844  println!("   ๐Ÿ” Event-driven subscription management");
845  println!("   โฐ Time-based symbol rotation");
846
847  println!("\n๐Ÿ”ง To test with real data:");
848  println!("   export KITE_API_KEY=your_api_key");
849  println!("   export KITE_ACCESS_TOKEN=your_access_token");
850  println!("   export RUST_LOG=debug");
851  println!("   cargo run --example dynamic_subscription_demo");
852}