Struct KiteTickerManager

Source
pub struct KiteTickerManager { /* private fields */ }
Expand description

High-performance multi-connection WebSocket manager for Kite ticker data

This manager creates 3 independent WebSocket connections and distributes symbols across them using round-robin allocation. Each connection has its own dedicated parser task for maximum performance.

Implementations§

Source§

impl KiteTickerManager

Source

pub fn new( api_key: String, access_token: String, config: KiteManagerConfig, ) -> Self

Creates a new KiteTickerManager instance with the specified configuration

This initializes the manager with the provided API credentials and configuration, but does not start any connections. Call start() to begin operation.

§Arguments
  • api_key - Your Kite Connect API key
  • access_token - Valid access token from Kite Connect
  • config - Manager configuration settings
§Example
use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode};

let config = KiteManagerConfig {
    max_connections: 3,
    max_symbols_per_connection: 3000,
    enable_dedicated_parsers: true,
    default_mode: Mode::LTP,
    ..Default::default()
};

let manager = KiteTickerManager::new(
    "your_api_key".to_string(),
    "your_access_token".to_string(),
    config,
);
Examples found in repository?
examples/basic/runtime_subscription_example.rs (line 35)
8async fn main() -> Result<(), String> {
9  // Initialize with your API credentials
10  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
11  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
12
13  if api_key.is_empty() || access_token.is_empty() {
14    println!(
15      "⚠️ Please set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables"
16    );
17    return Err("Missing API credentials".to_string());
18  }
19
20  // Create configuration
21  let config = KiteManagerConfig {
22    max_symbols_per_connection: 3000,
23    max_connections: 3,
24    connection_buffer_size: 5000,
25    parser_buffer_size: 10000,
26    connection_timeout: Duration::from_secs(30),
27    health_check_interval: Duration::from_secs(5),
28    max_reconnect_attempts: 5,
29    reconnect_delay: Duration::from_secs(2),
30    enable_dedicated_parsers: true,
31    default_mode: Mode::LTP,
32  };
33
34  // Start the manager
35  let mut manager = KiteTickerManager::new(api_key, access_token, config);
36  manager.start().await?;
37
38  println!("🚀 KiteTicker Manager started!");
39
40  // Example of runtime subscription management
41  runtime_subscription_demo(&mut manager).await?;
42
43  // Stop the manager
44  manager.stop().await?;
45  Ok(())
46}
More examples
Hide additional examples
examples/mode_change_test.rs (line 42)
9async fn main() -> Result<(), String> {
10  // Initialize logging
11  env_logger::init();
12
13  println!("🔄 KiteTicker Mode Change Test");
14  println!("═══════════════════════════════");
15
16  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
17  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
18
19  if api_key.is_empty() || access_token.is_empty() {
20    println!(
21      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
22    );
23    demonstrate_mode_change_issue().await;
24    return Ok(());
25  }
26
27  // Create configuration
28  let config = KiteManagerConfig {
29    max_symbols_per_connection: 3000,
30    max_connections: 3,
31    connection_buffer_size: 5000,
32    parser_buffer_size: 10000,
33    connection_timeout: Duration::from_secs(30),
34    health_check_interval: Duration::from_secs(5),
35    max_reconnect_attempts: 5,
36    reconnect_delay: Duration::from_secs(2),
37    enable_dedicated_parsers: true,
38    default_mode: Mode::LTP,
39  };
40
41  println!("🔧 Starting manager...");
42  let mut manager = KiteTickerManager::new(api_key, access_token, config);
43
44  manager.start().await?;
45  println!("✅ Manager started successfully");
46
47  // Test the mode change issue
48  test_mode_change_issue(&mut manager).await?;
49
50  // Stop the manager
51  println!("\n🛑 Stopping manager...");
52  manager.stop().await?;
53
54  println!("🏁 Mode change test completed!");
55  Ok(())
56}
examples/performance/market_scanner.rs (line 36)
12async fn main() -> Result<(), String> {
13  env_logger::init();
14
15  println!("🔍 Market Scanner Example");
16  println!("=========================");
17
18  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21  if api_key.is_empty() || access_token.is_empty() {
22    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23  }
24
25  // High-performance configuration for scanning
26  let config = KiteManagerConfig {
27    max_connections: 3,
28    max_symbols_per_connection: 3000,
29    connection_buffer_size: 20000, // Large buffer for high volume
30    parser_buffer_size: 50000,     // Even larger for parsed data
31    enable_dedicated_parsers: true,
32    default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33    ..Default::default()
34  };
35
36  let mut manager = KiteTickerManager::new(api_key, access_token, config);
37  manager.start().await?;
38
39  // Large symbol set for market scanning
40  let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42  println!(
43    "📊 Scanning {} symbols across {} connections",
44    large_symbol_set.len(),
45    3
46  );
47
48  let start_time = Instant::now();
49  manager
50    .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51    .await?;
52  println!(
53    "✅ Subscribed to {} symbols in {:?}",
54    large_symbol_set.len(),
55    start_time.elapsed()
56  );
57
58  // Get all channels and start parallel processing
59  let channels = manager.get_all_channels();
60  let mut handles = Vec::new();
61
62  for (channel_id, mut receiver) in channels {
63    let handle = tokio::spawn(async move {
64      let mut scanner = MarketScanner::new(channel_id);
65
66      while let Ok(message) = receiver.recv().await {
67        if let TickerMessage::Ticks(ticks) = message {
68          scanner.process_ticks(ticks).await;
69        }
70      }
71
72      scanner.print_summary();
73    });
74
75    handles.push(handle);
76  }
77
78  // Run scanner for specified duration
79  println!("🔄 Scanning market for 30 seconds...");
80  sleep(Duration::from_secs(30)).await;
81
82  // Stop manager
83  manager.stop().await?;
84
85  // Wait for all scanners to complete
86  for handle in handles {
87    let _ = handle.await;
88  }
89
90  println!("🏁 Market scanning completed");
91  Ok(())
92}
examples/performance/high_frequency.rs (line 44)
14async fn main() -> Result<(), String> {
15  env_logger::init();
16
17  println!("⚡ High-Frequency Trading Example");
18  println!("=================================");
19
20  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23  if api_key.is_empty() || access_token.is_empty() {
24    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25  }
26
27  // Ultra-high performance configuration for HFT
28  let config = KiteManagerConfig {
29    max_connections: 3,
30    max_symbols_per_connection: 1000, // Focused symbol set for HFT
31    connection_buffer_size: 100000,   // Maximum possible buffer
32    parser_buffer_size: 200000,       // Ultra-large parser buffer
33    enable_dedicated_parsers: true,
34    default_mode: Mode::Full, // Full market depth data
35    ..Default::default()
36  };
37
38  println!("🚀 Optimized for:");
39  println!("   Sub-microsecond latency");
40  println!("   Maximum throughput");
41  println!("   Real-time order book analysis");
42  println!("   Ultra-low garbage collection");
43
44  let mut manager = KiteTickerManager::new(api_key, access_token, config);
45  manager.start().await?;
46
47  // Focus on highly liquid instruments for HFT
48  let hft_symbols = vec![
49    256265, // NIFTY 50 - highly liquid
50    408065, // HDFC Bank - large volume
51    738561, // Reliance - most traded
52    884737, // ICICI Bank
53    341249, // TCS
54    492033, // ITC
55    779521, // Kotak Bank
56  ];
57
58  println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59  manager
60    .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61    .await?;
62
63  // Setup high-frequency processing engines
64  let channels = manager.get_all_channels();
65  let mut hft_engines = Vec::new();
66
67  for (channel_id, mut receiver) in channels {
68    let engine = tokio::spawn(async move {
69      let mut hft_processor = HFTProcessor::new(channel_id);
70
71      while let Ok(message) = receiver.recv().await {
72        if let TickerMessage::Ticks(ticks) = message {
73          hft_processor.process_ultra_fast(ticks).await;
74        }
75      }
76
77      hft_processor.print_performance_metrics();
78    });
79
80    hft_engines.push(engine);
81  }
82
83  // Run HFT simulation
84  println!("⚡ Starting high-frequency processing...");
85  sleep(Duration::from_secs(60)).await;
86
87  // Stop processing
88  manager.stop().await?;
89
90  // Wait for all engines to complete
91  for engine in hft_engines {
92    let _ = engine.await;
93  }
94
95  println!("🏁 High-frequency trading simulation completed");
96  Ok(())
97}
examples/simple_tick_test.rs (line 37)
8async fn main() -> Result<(), String> {
9  env_logger::init();
10
11  println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12  println!("═══════════════════════════════════════════");
13
14  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17  if api_key.is_empty() || access_token.is_empty() {
18    println!(
19      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20    );
21    return Ok(());
22  }
23
24  let config = KiteManagerConfig {
25    max_symbols_per_connection: 3000,
26    max_connections: 1, // Use only 1 connection for simplicity
27    connection_buffer_size: 5000,
28    parser_buffer_size: 10000,
29    connection_timeout: Duration::from_secs(30),
30    health_check_interval: Duration::from_secs(5),
31    max_reconnect_attempts: 5,
32    reconnect_delay: Duration::from_secs(2),
33    enable_dedicated_parsers: true,
34    default_mode: Mode::LTP,
35  };
36
37  let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39  // Start manager
40  println!("📡 Starting manager...");
41  manager.start().await?;
42  println!("✅ Manager started");
43
44  // Get channel BEFORE subscribing
45  println!("🎯 Getting channels...");
46  let channels = manager.get_all_channels();
47  let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49  // Start listener in background
50  let listener_handle = tokio::spawn(async move {
51    println!("👂 Listener started for {:?}", channel_id);
52
53    let mut tick_count = 0;
54    loop {
55      match timeout(Duration::from_secs(30), receiver.recv()).await {
56        Ok(Ok(message)) => match message {
57          TickerMessage::Ticks(ticks) => {
58            tick_count += ticks.len();
59            println!(
60              "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61              channel_id,
62              ticks.len(),
63              tick_count
64            );
65
66            for tick in &ticks {
67              println!("🔹 FULL TICK DEBUG:");
68              println!("{:#?}", tick);
69              println!("─────────────────────────────────────");
70            }
71          }
72          TickerMessage::Error(err) => {
73            println!("❌ Error: {}", err);
74          }
75          _ => {
76            println!("📨 Other message: {:?}", message);
77          }
78        },
79        Ok(Err(e)) => {
80          println!("❌ Receive error: {}", e);
81          break;
82        }
83        Err(_) => {
84          println!("⏱️  Listener timeout");
85          break;
86        }
87      }
88    }
89    println!("👂 Listener stopped");
90  });
91
92  // Give listener time to start
93  tokio::time::sleep(Duration::from_millis(200)).await;
94
95  // Now subscribe to a symbol
96  println!("📊 Subscribing to symbol 256265...");
97  manager
98    .subscribe_symbols(&[128083204], Some(Mode::Full))
99    .await?;
100  println!("✅ Subscription sent");
101
102  // Wait for ticks
103  println!("⏳ Waiting 10 seconds for ticks...");
104  tokio::time::sleep(Duration::from_secs(10)).await;
105
106  // Stop
107  manager.stop().await?;
108  listener_handle.abort();
109
110  println!("🏁 Test completed");
111  Ok(())
112}
examples/basic/portfolio_monitor.rs (line 61)
21async fn main() -> Result<(), String> {
22  // Initialize logging
23  env_logger::init();
24
25  println!("📊 Portfolio Monitor Example");
26  println!("============================");
27
28  // Get credentials
29  let api_key = std::env::var("KITE_API_KEY")
30    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31  let access_token = std::env::var("KITE_ACCESS_TOKEN")
32    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34  // Define portfolio
35  let portfolio = vec![
36    (256265, "NIFTY 50"),
37    (408065, "HDFC Bank"),
38    (738561, "Reliance"),
39    (5633, "TCS"),
40    (884737, "Asian Paints"),
41  ];
42
43  println!("📈 Monitoring Portfolio:");
44  for (symbol, name) in &portfolio {
45    println!("   • {} ({})", name, symbol);
46  }
47  println!();
48
49  // Create manager with optimized configuration for portfolio monitoring
50  let config = KiteManagerConfig {
51    max_connections: 1, // Single connection for small portfolio
52    max_symbols_per_connection: 100,
53    connection_buffer_size: 2000,
54    parser_buffer_size: 5000,
55    enable_dedicated_parsers: true,
56    default_mode: Mode::Quote, // Quote mode for price + volume
57    ..Default::default()
58  };
59
60  // Start manager
61  let mut manager = KiteTickerManager::new(api_key, access_token, config);
62  manager.start().await?;
63
64  // Subscribe to portfolio symbols
65  let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66  manager
67    .subscribe_symbols(&symbols, Some(Mode::Quote))
68    .await?;
69
70  println!("✅ Subscribed to {} symbols", symbols.len());
71
72  // Create portfolio tracking
73  let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74  for (symbol, name) in portfolio {
75    portfolio_data.insert(
76      symbol,
77      StockInfo {
78        symbol,
79        name: name.to_string(),
80        current_price: 0.0,
81        volume: 0,
82        last_update: Instant::now(),
83      },
84    );
85  }
86
87  // Get data channels
88  let channels = manager.get_all_channels();
89
90  // Start data processing
91  for (channel_id, mut receiver) in channels {
92    let mut portfolio_clone = portfolio_data.clone();
93
94    tokio::spawn(async move {
95      println!("📡 Started monitoring channel {:?}", channel_id);
96
97      while let Ok(message) = receiver.recv().await {
98        if let TickerMessage::Ticks(ticks) = message {
99          for tick in ticks {
100            if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101            {
102              // Update stock data
103              if let Some(price) = tick.content.last_price {
104                stock.current_price = price;
105              }
106              if let Some(volume) = tick.content.volume_traded {
107                stock.volume = volume;
108              }
109              stock.last_update = Instant::now();
110
111              // Display update
112              println!(
113                "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114                stock.name,
115                stock.symbol,
116                stock.current_price,
117                stock.volume,
118                format_time_ago(stock.last_update)
119              );
120            }
121          }
122        }
123      }
124    });
125  }
126
127  // Periodic portfolio summary
128  let summary_portfolio = portfolio_data.clone();
129  tokio::spawn(async move {
130    let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132    loop {
133      interval.tick().await;
134      print_portfolio_summary(&summary_portfolio);
135    }
136  });
137
138  // Monitor for demo duration
139  println!("📊 Monitoring portfolio for 2 minutes...\n");
140  sleep(Duration::from_secs(120)).await;
141
142  // Final summary
143  println!("\n🏁 Demo completed!");
144  print_portfolio_summary(&portfolio_data);
145
146  // Stop manager
147  manager.stop().await?;
148  println!("✅ Portfolio monitor stopped");
149
150  Ok(())
151}
Source

pub async fn start(&mut self) -> Result<(), String>

Initialize all connections and start the manager

Examples found in repository?
examples/basic/runtime_subscription_example.rs (line 36)
8async fn main() -> Result<(), String> {
9  // Initialize with your API credentials
10  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
11  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
12
13  if api_key.is_empty() || access_token.is_empty() {
14    println!(
15      "⚠️ Please set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables"
16    );
17    return Err("Missing API credentials".to_string());
18  }
19
20  // Create configuration
21  let config = KiteManagerConfig {
22    max_symbols_per_connection: 3000,
23    max_connections: 3,
24    connection_buffer_size: 5000,
25    parser_buffer_size: 10000,
26    connection_timeout: Duration::from_secs(30),
27    health_check_interval: Duration::from_secs(5),
28    max_reconnect_attempts: 5,
29    reconnect_delay: Duration::from_secs(2),
30    enable_dedicated_parsers: true,
31    default_mode: Mode::LTP,
32  };
33
34  // Start the manager
35  let mut manager = KiteTickerManager::new(api_key, access_token, config);
36  manager.start().await?;
37
38  println!("🚀 KiteTicker Manager started!");
39
40  // Example of runtime subscription management
41  runtime_subscription_demo(&mut manager).await?;
42
43  // Stop the manager
44  manager.stop().await?;
45  Ok(())
46}
More examples
Hide additional examples
examples/mode_change_test.rs (line 44)
9async fn main() -> Result<(), String> {
10  // Initialize logging
11  env_logger::init();
12
13  println!("🔄 KiteTicker Mode Change Test");
14  println!("═══════════════════════════════");
15
16  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
17  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
18
19  if api_key.is_empty() || access_token.is_empty() {
20    println!(
21      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
22    );
23    demonstrate_mode_change_issue().await;
24    return Ok(());
25  }
26
27  // Create configuration
28  let config = KiteManagerConfig {
29    max_symbols_per_connection: 3000,
30    max_connections: 3,
31    connection_buffer_size: 5000,
32    parser_buffer_size: 10000,
33    connection_timeout: Duration::from_secs(30),
34    health_check_interval: Duration::from_secs(5),
35    max_reconnect_attempts: 5,
36    reconnect_delay: Duration::from_secs(2),
37    enable_dedicated_parsers: true,
38    default_mode: Mode::LTP,
39  };
40
41  println!("🔧 Starting manager...");
42  let mut manager = KiteTickerManager::new(api_key, access_token, config);
43
44  manager.start().await?;
45  println!("✅ Manager started successfully");
46
47  // Test the mode change issue
48  test_mode_change_issue(&mut manager).await?;
49
50  // Stop the manager
51  println!("\n🛑 Stopping manager...");
52  manager.stop().await?;
53
54  println!("🏁 Mode change test completed!");
55  Ok(())
56}
examples/performance/market_scanner.rs (line 37)
12async fn main() -> Result<(), String> {
13  env_logger::init();
14
15  println!("🔍 Market Scanner Example");
16  println!("=========================");
17
18  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21  if api_key.is_empty() || access_token.is_empty() {
22    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23  }
24
25  // High-performance configuration for scanning
26  let config = KiteManagerConfig {
27    max_connections: 3,
28    max_symbols_per_connection: 3000,
29    connection_buffer_size: 20000, // Large buffer for high volume
30    parser_buffer_size: 50000,     // Even larger for parsed data
31    enable_dedicated_parsers: true,
32    default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33    ..Default::default()
34  };
35
36  let mut manager = KiteTickerManager::new(api_key, access_token, config);
37  manager.start().await?;
38
39  // Large symbol set for market scanning
40  let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42  println!(
43    "📊 Scanning {} symbols across {} connections",
44    large_symbol_set.len(),
45    3
46  );
47
48  let start_time = Instant::now();
49  manager
50    .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51    .await?;
52  println!(
53    "✅ Subscribed to {} symbols in {:?}",
54    large_symbol_set.len(),
55    start_time.elapsed()
56  );
57
58  // Get all channels and start parallel processing
59  let channels = manager.get_all_channels();
60  let mut handles = Vec::new();
61
62  for (channel_id, mut receiver) in channels {
63    let handle = tokio::spawn(async move {
64      let mut scanner = MarketScanner::new(channel_id);
65
66      while let Ok(message) = receiver.recv().await {
67        if let TickerMessage::Ticks(ticks) = message {
68          scanner.process_ticks(ticks).await;
69        }
70      }
71
72      scanner.print_summary();
73    });
74
75    handles.push(handle);
76  }
77
78  // Run scanner for specified duration
79  println!("🔄 Scanning market for 30 seconds...");
80  sleep(Duration::from_secs(30)).await;
81
82  // Stop manager
83  manager.stop().await?;
84
85  // Wait for all scanners to complete
86  for handle in handles {
87    let _ = handle.await;
88  }
89
90  println!("🏁 Market scanning completed");
91  Ok(())
92}
examples/performance/high_frequency.rs (line 45)
14async fn main() -> Result<(), String> {
15  env_logger::init();
16
17  println!("⚡ High-Frequency Trading Example");
18  println!("=================================");
19
20  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23  if api_key.is_empty() || access_token.is_empty() {
24    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25  }
26
27  // Ultra-high performance configuration for HFT
28  let config = KiteManagerConfig {
29    max_connections: 3,
30    max_symbols_per_connection: 1000, // Focused symbol set for HFT
31    connection_buffer_size: 100000,   // Maximum possible buffer
32    parser_buffer_size: 200000,       // Ultra-large parser buffer
33    enable_dedicated_parsers: true,
34    default_mode: Mode::Full, // Full market depth data
35    ..Default::default()
36  };
37
38  println!("🚀 Optimized for:");
39  println!("   Sub-microsecond latency");
40  println!("   Maximum throughput");
41  println!("   Real-time order book analysis");
42  println!("   Ultra-low garbage collection");
43
44  let mut manager = KiteTickerManager::new(api_key, access_token, config);
45  manager.start().await?;
46
47  // Focus on highly liquid instruments for HFT
48  let hft_symbols = vec![
49    256265, // NIFTY 50 - highly liquid
50    408065, // HDFC Bank - large volume
51    738561, // Reliance - most traded
52    884737, // ICICI Bank
53    341249, // TCS
54    492033, // ITC
55    779521, // Kotak Bank
56  ];
57
58  println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59  manager
60    .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61    .await?;
62
63  // Setup high-frequency processing engines
64  let channels = manager.get_all_channels();
65  let mut hft_engines = Vec::new();
66
67  for (channel_id, mut receiver) in channels {
68    let engine = tokio::spawn(async move {
69      let mut hft_processor = HFTProcessor::new(channel_id);
70
71      while let Ok(message) = receiver.recv().await {
72        if let TickerMessage::Ticks(ticks) = message {
73          hft_processor.process_ultra_fast(ticks).await;
74        }
75      }
76
77      hft_processor.print_performance_metrics();
78    });
79
80    hft_engines.push(engine);
81  }
82
83  // Run HFT simulation
84  println!("⚡ Starting high-frequency processing...");
85  sleep(Duration::from_secs(60)).await;
86
87  // Stop processing
88  manager.stop().await?;
89
90  // Wait for all engines to complete
91  for engine in hft_engines {
92    let _ = engine.await;
93  }
94
95  println!("🏁 High-frequency trading simulation completed");
96  Ok(())
97}
examples/simple_tick_test.rs (line 41)
8async fn main() -> Result<(), String> {
9  env_logger::init();
10
11  println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12  println!("═══════════════════════════════════════════");
13
14  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17  if api_key.is_empty() || access_token.is_empty() {
18    println!(
19      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20    );
21    return Ok(());
22  }
23
24  let config = KiteManagerConfig {
25    max_symbols_per_connection: 3000,
26    max_connections: 1, // Use only 1 connection for simplicity
27    connection_buffer_size: 5000,
28    parser_buffer_size: 10000,
29    connection_timeout: Duration::from_secs(30),
30    health_check_interval: Duration::from_secs(5),
31    max_reconnect_attempts: 5,
32    reconnect_delay: Duration::from_secs(2),
33    enable_dedicated_parsers: true,
34    default_mode: Mode::LTP,
35  };
36
37  let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39  // Start manager
40  println!("📡 Starting manager...");
41  manager.start().await?;
42  println!("✅ Manager started");
43
44  // Get channel BEFORE subscribing
45  println!("🎯 Getting channels...");
46  let channels = manager.get_all_channels();
47  let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49  // Start listener in background
50  let listener_handle = tokio::spawn(async move {
51    println!("👂 Listener started for {:?}", channel_id);
52
53    let mut tick_count = 0;
54    loop {
55      match timeout(Duration::from_secs(30), receiver.recv()).await {
56        Ok(Ok(message)) => match message {
57          TickerMessage::Ticks(ticks) => {
58            tick_count += ticks.len();
59            println!(
60              "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61              channel_id,
62              ticks.len(),
63              tick_count
64            );
65
66            for tick in &ticks {
67              println!("🔹 FULL TICK DEBUG:");
68              println!("{:#?}", tick);
69              println!("─────────────────────────────────────");
70            }
71          }
72          TickerMessage::Error(err) => {
73            println!("❌ Error: {}", err);
74          }
75          _ => {
76            println!("📨 Other message: {:?}", message);
77          }
78        },
79        Ok(Err(e)) => {
80          println!("❌ Receive error: {}", e);
81          break;
82        }
83        Err(_) => {
84          println!("⏱️  Listener timeout");
85          break;
86        }
87      }
88    }
89    println!("👂 Listener stopped");
90  });
91
92  // Give listener time to start
93  tokio::time::sleep(Duration::from_millis(200)).await;
94
95  // Now subscribe to a symbol
96  println!("📊 Subscribing to symbol 256265...");
97  manager
98    .subscribe_symbols(&[128083204], Some(Mode::Full))
99    .await?;
100  println!("✅ Subscription sent");
101
102  // Wait for ticks
103  println!("⏳ Waiting 10 seconds for ticks...");
104  tokio::time::sleep(Duration::from_secs(10)).await;
105
106  // Stop
107  manager.stop().await?;
108  listener_handle.abort();
109
110  println!("🏁 Test completed");
111  Ok(())
112}
examples/basic/portfolio_monitor.rs (line 62)
21async fn main() -> Result<(), String> {
22  // Initialize logging
23  env_logger::init();
24
25  println!("📊 Portfolio Monitor Example");
26  println!("============================");
27
28  // Get credentials
29  let api_key = std::env::var("KITE_API_KEY")
30    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31  let access_token = std::env::var("KITE_ACCESS_TOKEN")
32    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34  // Define portfolio
35  let portfolio = vec![
36    (256265, "NIFTY 50"),
37    (408065, "HDFC Bank"),
38    (738561, "Reliance"),
39    (5633, "TCS"),
40    (884737, "Asian Paints"),
41  ];
42
43  println!("📈 Monitoring Portfolio:");
44  for (symbol, name) in &portfolio {
45    println!("   • {} ({})", name, symbol);
46  }
47  println!();
48
49  // Create manager with optimized configuration for portfolio monitoring
50  let config = KiteManagerConfig {
51    max_connections: 1, // Single connection for small portfolio
52    max_symbols_per_connection: 100,
53    connection_buffer_size: 2000,
54    parser_buffer_size: 5000,
55    enable_dedicated_parsers: true,
56    default_mode: Mode::Quote, // Quote mode for price + volume
57    ..Default::default()
58  };
59
60  // Start manager
61  let mut manager = KiteTickerManager::new(api_key, access_token, config);
62  manager.start().await?;
63
64  // Subscribe to portfolio symbols
65  let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66  manager
67    .subscribe_symbols(&symbols, Some(Mode::Quote))
68    .await?;
69
70  println!("✅ Subscribed to {} symbols", symbols.len());
71
72  // Create portfolio tracking
73  let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74  for (symbol, name) in portfolio {
75    portfolio_data.insert(
76      symbol,
77      StockInfo {
78        symbol,
79        name: name.to_string(),
80        current_price: 0.0,
81        volume: 0,
82        last_update: Instant::now(),
83      },
84    );
85  }
86
87  // Get data channels
88  let channels = manager.get_all_channels();
89
90  // Start data processing
91  for (channel_id, mut receiver) in channels {
92    let mut portfolio_clone = portfolio_data.clone();
93
94    tokio::spawn(async move {
95      println!("📡 Started monitoring channel {:?}", channel_id);
96
97      while let Ok(message) = receiver.recv().await {
98        if let TickerMessage::Ticks(ticks) = message {
99          for tick in ticks {
100            if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101            {
102              // Update stock data
103              if let Some(price) = tick.content.last_price {
104                stock.current_price = price;
105              }
106              if let Some(volume) = tick.content.volume_traded {
107                stock.volume = volume;
108              }
109              stock.last_update = Instant::now();
110
111              // Display update
112              println!(
113                "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114                stock.name,
115                stock.symbol,
116                stock.current_price,
117                stock.volume,
118                format_time_ago(stock.last_update)
119              );
120            }
121          }
122        }
123      }
124    });
125  }
126
127  // Periodic portfolio summary
128  let summary_portfolio = portfolio_data.clone();
129  tokio::spawn(async move {
130    let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132    loop {
133      interval.tick().await;
134      print_portfolio_summary(&summary_portfolio);
135    }
136  });
137
138  // Monitor for demo duration
139  println!("📊 Monitoring portfolio for 2 minutes...\n");
140  sleep(Duration::from_secs(120)).await;
141
142  // Final summary
143  println!("\n🏁 Demo completed!");
144  print_portfolio_summary(&portfolio_data);
145
146  // Stop manager
147  manager.stop().await?;
148  println!("✅ Portfolio monitor stopped");
149
150  Ok(())
151}
Source

pub async fn subscribe_symbols( &mut self, symbols: &[u32], mode: Option<Mode>, ) -> Result<(), String>

Subscribe to symbols using round-robin distribution

Examples found in repository?
examples/performance/market_scanner.rs (line 50)
12async fn main() -> Result<(), String> {
13  env_logger::init();
14
15  println!("🔍 Market Scanner Example");
16  println!("=========================");
17
18  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21  if api_key.is_empty() || access_token.is_empty() {
22    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23  }
24
25  // High-performance configuration for scanning
26  let config = KiteManagerConfig {
27    max_connections: 3,
28    max_symbols_per_connection: 3000,
29    connection_buffer_size: 20000, // Large buffer for high volume
30    parser_buffer_size: 50000,     // Even larger for parsed data
31    enable_dedicated_parsers: true,
32    default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33    ..Default::default()
34  };
35
36  let mut manager = KiteTickerManager::new(api_key, access_token, config);
37  manager.start().await?;
38
39  // Large symbol set for market scanning
40  let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42  println!(
43    "📊 Scanning {} symbols across {} connections",
44    large_symbol_set.len(),
45    3
46  );
47
48  let start_time = Instant::now();
49  manager
50    .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51    .await?;
52  println!(
53    "✅ Subscribed to {} symbols in {:?}",
54    large_symbol_set.len(),
55    start_time.elapsed()
56  );
57
58  // Get all channels and start parallel processing
59  let channels = manager.get_all_channels();
60  let mut handles = Vec::new();
61
62  for (channel_id, mut receiver) in channels {
63    let handle = tokio::spawn(async move {
64      let mut scanner = MarketScanner::new(channel_id);
65
66      while let Ok(message) = receiver.recv().await {
67        if let TickerMessage::Ticks(ticks) = message {
68          scanner.process_ticks(ticks).await;
69        }
70      }
71
72      scanner.print_summary();
73    });
74
75    handles.push(handle);
76  }
77
78  // Run scanner for specified duration
79  println!("🔄 Scanning market for 30 seconds...");
80  sleep(Duration::from_secs(30)).await;
81
82  // Stop manager
83  manager.stop().await?;
84
85  // Wait for all scanners to complete
86  for handle in handles {
87    let _ = handle.await;
88  }
89
90  println!("🏁 Market scanning completed");
91  Ok(())
92}
More examples
Hide additional examples
examples/basic/runtime_subscription_example.rs (line 58)
48async fn runtime_subscription_demo(
49  manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51  println!("\n📡 Runtime Subscription Management Demo");
52  println!("=====================================");
53
54  // 1. Start with initial symbols
55  println!("\n1️⃣ Initial subscription to base symbols");
56  let initial_symbols = vec![256265, 265, 256777];
57  manager
58    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59    .await?;
60  print_current_state(manager, "Initial subscription").await;
61
62  sleep(Duration::from_secs(2)).await;
63
64  // 2. Add more symbols dynamically
65  println!("\n2️⃣ Adding symbols dynamically");
66  let additional_symbols = vec![274441, 260105, 273929];
67  manager
68    .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69    .await?;
70  print_current_state(manager, "After adding symbols").await;
71
72  sleep(Duration::from_secs(2)).await;
73
74  // 3. Change mode for existing symbols
75  println!("\n3️⃣ Changing subscription mode");
76  let symbols_for_mode_change = vec![256265, 265];
77  manager
78    .change_mode(&symbols_for_mode_change, Mode::Full)
79    .await?;
80  print_current_state(manager, "After mode change").await;
81
82  sleep(Duration::from_secs(2)).await;
83
84  // 4. Remove some symbols
85  println!("\n4️⃣ Removing symbols dynamically");
86  let symbols_to_remove = vec![265, 274441];
87  manager.unsubscribe_symbols(&symbols_to_remove).await?;
88  print_current_state(manager, "After removing symbols").await;
89
90  sleep(Duration::from_secs(2)).await;
91
92  // 5. Add different symbols with different modes
93  println!("\n5️⃣ Adding new symbols with Full mode");
94  let full_mode_symbols = vec![257801, 258825];
95  manager
96    .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97    .await?;
98  print_current_state(manager, "After adding Full mode symbols").await;
99
100  // 6. Monitor live data for a short period
101  println!("\n6️⃣ Monitoring live data (5 seconds)");
102  monitor_live_data(manager, 5).await?;
103
104  // 7. Complete cleanup
105  println!("\n7️⃣ Complete cleanup");
106  let all_symbols: Vec<u32> = manager
107    .get_symbol_distribution()
108    .values()
109    .flat_map(|symbols| symbols.iter().cloned())
110    .collect();
111
112  if !all_symbols.is_empty() {
113    manager.unsubscribe_symbols(&all_symbols).await?;
114    print_current_state(manager, "After complete cleanup").await;
115  }
116
117  println!("\n✅ Runtime subscription demo completed!");
118  Ok(())
119}
examples/performance/high_frequency.rs (line 60)
14async fn main() -> Result<(), String> {
15  env_logger::init();
16
17  println!("⚡ High-Frequency Trading Example");
18  println!("=================================");
19
20  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23  if api_key.is_empty() || access_token.is_empty() {
24    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25  }
26
27  // Ultra-high performance configuration for HFT
28  let config = KiteManagerConfig {
29    max_connections: 3,
30    max_symbols_per_connection: 1000, // Focused symbol set for HFT
31    connection_buffer_size: 100000,   // Maximum possible buffer
32    parser_buffer_size: 200000,       // Ultra-large parser buffer
33    enable_dedicated_parsers: true,
34    default_mode: Mode::Full, // Full market depth data
35    ..Default::default()
36  };
37
38  println!("🚀 Optimized for:");
39  println!("   Sub-microsecond latency");
40  println!("   Maximum throughput");
41  println!("   Real-time order book analysis");
42  println!("   Ultra-low garbage collection");
43
44  let mut manager = KiteTickerManager::new(api_key, access_token, config);
45  manager.start().await?;
46
47  // Focus on highly liquid instruments for HFT
48  let hft_symbols = vec![
49    256265, // NIFTY 50 - highly liquid
50    408065, // HDFC Bank - large volume
51    738561, // Reliance - most traded
52    884737, // ICICI Bank
53    341249, // TCS
54    492033, // ITC
55    779521, // Kotak Bank
56  ];
57
58  println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59  manager
60    .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61    .await?;
62
63  // Setup high-frequency processing engines
64  let channels = manager.get_all_channels();
65  let mut hft_engines = Vec::new();
66
67  for (channel_id, mut receiver) in channels {
68    let engine = tokio::spawn(async move {
69      let mut hft_processor = HFTProcessor::new(channel_id);
70
71      while let Ok(message) = receiver.recv().await {
72        if let TickerMessage::Ticks(ticks) = message {
73          hft_processor.process_ultra_fast(ticks).await;
74        }
75      }
76
77      hft_processor.print_performance_metrics();
78    });
79
80    hft_engines.push(engine);
81  }
82
83  // Run HFT simulation
84  println!("⚡ Starting high-frequency processing...");
85  sleep(Duration::from_secs(60)).await;
86
87  // Stop processing
88  manager.stop().await?;
89
90  // Wait for all engines to complete
91  for engine in hft_engines {
92    let _ = engine.await;
93  }
94
95  println!("🏁 High-frequency trading simulation completed");
96  Ok(())
97}
examples/simple_tick_test.rs (line 98)
8async fn main() -> Result<(), String> {
9  env_logger::init();
10
11  println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12  println!("═══════════════════════════════════════════");
13
14  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17  if api_key.is_empty() || access_token.is_empty() {
18    println!(
19      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20    );
21    return Ok(());
22  }
23
24  let config = KiteManagerConfig {
25    max_symbols_per_connection: 3000,
26    max_connections: 1, // Use only 1 connection for simplicity
27    connection_buffer_size: 5000,
28    parser_buffer_size: 10000,
29    connection_timeout: Duration::from_secs(30),
30    health_check_interval: Duration::from_secs(5),
31    max_reconnect_attempts: 5,
32    reconnect_delay: Duration::from_secs(2),
33    enable_dedicated_parsers: true,
34    default_mode: Mode::LTP,
35  };
36
37  let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39  // Start manager
40  println!("📡 Starting manager...");
41  manager.start().await?;
42  println!("✅ Manager started");
43
44  // Get channel BEFORE subscribing
45  println!("🎯 Getting channels...");
46  let channels = manager.get_all_channels();
47  let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49  // Start listener in background
50  let listener_handle = tokio::spawn(async move {
51    println!("👂 Listener started for {:?}", channel_id);
52
53    let mut tick_count = 0;
54    loop {
55      match timeout(Duration::from_secs(30), receiver.recv()).await {
56        Ok(Ok(message)) => match message {
57          TickerMessage::Ticks(ticks) => {
58            tick_count += ticks.len();
59            println!(
60              "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61              channel_id,
62              ticks.len(),
63              tick_count
64            );
65
66            for tick in &ticks {
67              println!("🔹 FULL TICK DEBUG:");
68              println!("{:#?}", tick);
69              println!("─────────────────────────────────────");
70            }
71          }
72          TickerMessage::Error(err) => {
73            println!("❌ Error: {}", err);
74          }
75          _ => {
76            println!("📨 Other message: {:?}", message);
77          }
78        },
79        Ok(Err(e)) => {
80          println!("❌ Receive error: {}", e);
81          break;
82        }
83        Err(_) => {
84          println!("⏱️  Listener timeout");
85          break;
86        }
87      }
88    }
89    println!("👂 Listener stopped");
90  });
91
92  // Give listener time to start
93  tokio::time::sleep(Duration::from_millis(200)).await;
94
95  // Now subscribe to a symbol
96  println!("📊 Subscribing to symbol 256265...");
97  manager
98    .subscribe_symbols(&[128083204], Some(Mode::Full))
99    .await?;
100  println!("✅ Subscription sent");
101
102  // Wait for ticks
103  println!("⏳ Waiting 10 seconds for ticks...");
104  tokio::time::sleep(Duration::from_secs(10)).await;
105
106  // Stop
107  manager.stop().await?;
108  listener_handle.abort();
109
110  println!("🏁 Test completed");
111  Ok(())
112}
examples/basic/portfolio_monitor.rs (line 67)
21async fn main() -> Result<(), String> {
22  // Initialize logging
23  env_logger::init();
24
25  println!("📊 Portfolio Monitor Example");
26  println!("============================");
27
28  // Get credentials
29  let api_key = std::env::var("KITE_API_KEY")
30    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31  let access_token = std::env::var("KITE_ACCESS_TOKEN")
32    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34  // Define portfolio
35  let portfolio = vec![
36    (256265, "NIFTY 50"),
37    (408065, "HDFC Bank"),
38    (738561, "Reliance"),
39    (5633, "TCS"),
40    (884737, "Asian Paints"),
41  ];
42
43  println!("📈 Monitoring Portfolio:");
44  for (symbol, name) in &portfolio {
45    println!("   • {} ({})", name, symbol);
46  }
47  println!();
48
49  // Create manager with optimized configuration for portfolio monitoring
50  let config = KiteManagerConfig {
51    max_connections: 1, // Single connection for small portfolio
52    max_symbols_per_connection: 100,
53    connection_buffer_size: 2000,
54    parser_buffer_size: 5000,
55    enable_dedicated_parsers: true,
56    default_mode: Mode::Quote, // Quote mode for price + volume
57    ..Default::default()
58  };
59
60  // Start manager
61  let mut manager = KiteTickerManager::new(api_key, access_token, config);
62  manager.start().await?;
63
64  // Subscribe to portfolio symbols
65  let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66  manager
67    .subscribe_symbols(&symbols, Some(Mode::Quote))
68    .await?;
69
70  println!("✅ Subscribed to {} symbols", symbols.len());
71
72  // Create portfolio tracking
73  let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74  for (symbol, name) in portfolio {
75    portfolio_data.insert(
76      symbol,
77      StockInfo {
78        symbol,
79        name: name.to_string(),
80        current_price: 0.0,
81        volume: 0,
82        last_update: Instant::now(),
83      },
84    );
85  }
86
87  // Get data channels
88  let channels = manager.get_all_channels();
89
90  // Start data processing
91  for (channel_id, mut receiver) in channels {
92    let mut portfolio_clone = portfolio_data.clone();
93
94    tokio::spawn(async move {
95      println!("📡 Started monitoring channel {:?}", channel_id);
96
97      while let Ok(message) = receiver.recv().await {
98        if let TickerMessage::Ticks(ticks) = message {
99          for tick in ticks {
100            if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101            {
102              // Update stock data
103              if let Some(price) = tick.content.last_price {
104                stock.current_price = price;
105              }
106              if let Some(volume) = tick.content.volume_traded {
107                stock.volume = volume;
108              }
109              stock.last_update = Instant::now();
110
111              // Display update
112              println!(
113                "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114                stock.name,
115                stock.symbol,
116                stock.current_price,
117                stock.volume,
118                format_time_ago(stock.last_update)
119              );
120            }
121          }
122        }
123      }
124    });
125  }
126
127  // Periodic portfolio summary
128  let summary_portfolio = portfolio_data.clone();
129  tokio::spawn(async move {
130    let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132    loop {
133      interval.tick().await;
134      print_portfolio_summary(&summary_portfolio);
135    }
136  });
137
138  // Monitor for demo duration
139  println!("📊 Monitoring portfolio for 2 minutes...\n");
140  sleep(Duration::from_secs(120)).await;
141
142  // Final summary
143  println!("\n🏁 Demo completed!");
144  print_portfolio_summary(&portfolio_data);
145
146  // Stop manager
147  manager.stop().await?;
148  println!("✅ Portfolio monitor stopped");
149
150  Ok(())
151}
examples/mode_change_test.rs (line 68)
58async fn test_mode_change_issue(
59  manager: &mut KiteTickerManager,
60) -> Result<(), String> {
61  println!("\n🧪 Testing Mode Change Issue");
62  println!("=============================");
63
64  // Step 1: Subscribe to a symbol with LTP mode
65  let test_symbol = 738561; // Test symbol provided by user
66  println!("\n1️⃣ Subscribing to symbol {} with LTP mode", test_symbol);
67  manager
68    .subscribe_symbols(&[test_symbol], Some(Mode::LTP))
69    .await?;
70
71  // Start monitoring ticks to see the mode
72  let channels = manager.get_all_channels();
73  let mut tick_listeners = Vec::new();
74
75  for (channel_id, mut receiver) in channels {
76    let task = tokio::spawn(async move {
77      let mut ticks_received = 0;
78      let start = Instant::now();
79
80      while start.elapsed() < Duration::from_secs(5) && ticks_received < 3 {
81        match timeout(Duration::from_millis(500), receiver.recv()).await {
82          Ok(Ok(message)) => {
83            if let TickerMessage::Ticks(ticks) = message {
84              for tick in &ticks {
85                if tick.instrument_token == test_symbol {
86                  ticks_received += 1;
87                  println!(
88                    "   📊 Received tick for {}: Mode={:?}, LTP={:?}",
89                    tick.instrument_token,
90                    tick.content.mode,
91                    tick.content.last_price
92                  );
93
94                  // Check if OHLC data is present (should not be for LTP mode)
95                  if tick.content.ohlc.is_some() {
96                    println!(
97                      "   ⚠️  OHLC data present in LTP mode - unexpected!"
98                    );
99                  } else {
100                    println!("   ✅ No OHLC data in LTP mode - correct");
101                  }
102                }
103              }
104            }
105          }
106          _ => continue,
107        }
108      }
109      (channel_id, ticks_received)
110    });
111    tick_listeners.push(task);
112  }
113
114  // Wait for initial ticks
115  for task in tick_listeners {
116    if let Ok((channel_id, count)) = task.await {
117      println!(
118        "   📈 {:?}: Received {} ticks for initial LTP subscription",
119        channel_id, count
120      );
121    }
122  }
123
124  sleep(Duration::from_secs(2)).await;
125
126  // Step 2: Attempt to change mode to Full (this is where the issue should manifest)
127  println!(
128    "\n2️⃣ Attempting to change mode from LTP to Full for symbol {}",
129    test_symbol
130  );
131  match manager.change_mode(&[test_symbol], Mode::Full).await {
132    Ok(()) => {
133      println!("   ✅ Mode change command sent successfully");
134    }
135    Err(e) => {
136      println!("   ❌ Mode change command failed: {}", e);
137      return Err(e);
138    }
139  }
140
141  // Step 3: Monitor ticks after mode change to see if it actually worked
142  println!("\n3️⃣ Monitoring ticks after mode change...");
143  let channels = manager.get_all_channels();
144  let mut post_change_listeners = Vec::new();
145
146  for (channel_id, mut receiver) in channels {
147    let task = tokio::spawn(async move {
148      let mut ticks_received = 0;
149      let mut ohlc_present = 0;
150      let mut depth_present = 0;
151      let start = Instant::now();
152
153      while start.elapsed() < Duration::from_secs(10) && ticks_received < 5 {
154        match timeout(Duration::from_millis(500), receiver.recv()).await {
155          Ok(Ok(message)) => {
156            if let TickerMessage::Ticks(ticks) = message {
157              for tick in &ticks {
158                if tick.instrument_token == test_symbol {
159                  ticks_received += 1;
160                  println!(
161                    "   📊 Post-change tick for {}: Mode={:?}, LTP={:?}",
162                    tick.instrument_token,
163                    tick.content.mode,
164                    tick.content.last_price
165                  );
166
167                  // Check if Full mode data is present
168                  if let Some(ohlc) = &tick.content.ohlc {
169                    ohlc_present += 1;
170                    println!(
171                      "   ✅ OHLC data present: O:{} H:{} L:{} C:{}",
172                      ohlc.open, ohlc.high, ohlc.low, ohlc.close
173                    );
174                  } else {
175                    println!("   ❌ OHLC data missing - mode change may not have worked!");
176                  }
177
178                  if let Some(depth) = &tick.content.depth {
179                    depth_present += 1;
180                    println!(
181                      "   ✅ Market depth present: {} buy, {} sell orders",
182                      depth.buy.len(),
183                      depth.sell.len()
184                    );
185                  } else {
186                    println!("   ❌ Market depth missing - mode change may not have worked!");
187                  }
188
189                  // Log the actual mode reported in the tick
190                  info!("Tick mode reported: {:?}", tick.content.mode);
191                }
192              }
193            }
194          }
195          _ => continue,
196        }
197      }
198      (channel_id, ticks_received, ohlc_present, depth_present)
199    });
200    post_change_listeners.push(task);
201  }
202
203  // Wait for post-change ticks
204  let mut total_ticks = 0;
205  let mut total_ohlc = 0;
206  let mut total_depth = 0;
207
208  for task in post_change_listeners {
209    if let Ok((channel_id, ticks, ohlc, depth)) = task.await {
210      println!(
211        "   📈 {:?}: {} ticks, {} with OHLC, {} with depth",
212        channel_id, ticks, ohlc, depth
213      );
214      total_ticks += ticks;
215      total_ohlc += ohlc;
216      total_depth += depth;
217    }
218  }
219
220  // Step 4: Analyze results
221  println!("\n4️⃣ Test Results Analysis");
222  println!("========================");
223
224  if total_ticks == 0 {
225    println!("   ⚠️  No ticks received after mode change - connection issue?");
226  } else if total_ohlc == 0 && total_depth == 0 {
227    println!("   ❌ ISSUE CONFIRMED: Mode change command sent but Full mode data not received");
228    println!("   💡 This confirms that set_mode() alone doesn't work - need unsubscribe+resubscribe");
229  } else if total_ohlc > 0 && total_depth > 0 {
230    println!("   ✅ Mode change worked - Full mode data received");
231  } else {
232    println!(
233      "   ⚠️  Partial mode change - some Full mode data received but not all"
234    );
235  }
236
237  println!("\n📊 Summary:");
238  println!("   Total ticks received: {}", total_ticks);
239  println!("   Ticks with OHLC data: {}", total_ohlc);
240  println!("   Ticks with market depth: {}", total_depth);
241
242  Ok(())
243}
Source

pub fn get_channel( &mut self, channel_id: ChannelId, ) -> Option<Receiver<TickerMessage>>

Get output channel for a specific connection

Source

pub fn get_all_channels(&mut self) -> Vec<(ChannelId, Receiver<TickerMessage>)>

Get all output channels

Examples found in repository?
examples/basic/runtime_subscription_example.rs (line 146)
142async fn monitor_live_data(
143  manager: &mut KiteTickerManager,
144  seconds: u64,
145) -> Result<(), String> {
146  let channels = manager.get_all_channels();
147  let mut tasks = Vec::new();
148
149  for (channel_id, mut receiver) in channels {
150    let task = tokio::spawn(async move {
151      let mut count = 0;
152      let start = std::time::Instant::now();
153
154      while start.elapsed() < Duration::from_secs(seconds) {
155        match tokio::time::timeout(Duration::from_millis(100), receiver.recv())
156          .await
157        {
158          Ok(Ok(message)) => {
159            count += 1;
160            if let TickerMessage::Ticks(ticks) = message {
161              if count <= 2 {
162                // Show first few ticks
163                println!(
164                  "   📋 {:?}: Received {} ticks",
165                  channel_id,
166                  ticks.len()
167                );
168              }
169            }
170          }
171          _ => continue,
172        }
173      }
174      (channel_id, count)
175    });
176    tasks.push(task);
177  }
178
179  // Wait for monitoring to complete
180  for task in tasks {
181    if let Ok((channel_id, count)) = task.await {
182      println!("   📊 {:?}: {} total messages", channel_id, count);
183    }
184  }
185
186  Ok(())
187}
More examples
Hide additional examples
examples/performance/market_scanner.rs (line 59)
12async fn main() -> Result<(), String> {
13  env_logger::init();
14
15  println!("🔍 Market Scanner Example");
16  println!("=========================");
17
18  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21  if api_key.is_empty() || access_token.is_empty() {
22    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23  }
24
25  // High-performance configuration for scanning
26  let config = KiteManagerConfig {
27    max_connections: 3,
28    max_symbols_per_connection: 3000,
29    connection_buffer_size: 20000, // Large buffer for high volume
30    parser_buffer_size: 50000,     // Even larger for parsed data
31    enable_dedicated_parsers: true,
32    default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33    ..Default::default()
34  };
35
36  let mut manager = KiteTickerManager::new(api_key, access_token, config);
37  manager.start().await?;
38
39  // Large symbol set for market scanning
40  let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42  println!(
43    "📊 Scanning {} symbols across {} connections",
44    large_symbol_set.len(),
45    3
46  );
47
48  let start_time = Instant::now();
49  manager
50    .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51    .await?;
52  println!(
53    "✅ Subscribed to {} symbols in {:?}",
54    large_symbol_set.len(),
55    start_time.elapsed()
56  );
57
58  // Get all channels and start parallel processing
59  let channels = manager.get_all_channels();
60  let mut handles = Vec::new();
61
62  for (channel_id, mut receiver) in channels {
63    let handle = tokio::spawn(async move {
64      let mut scanner = MarketScanner::new(channel_id);
65
66      while let Ok(message) = receiver.recv().await {
67        if let TickerMessage::Ticks(ticks) = message {
68          scanner.process_ticks(ticks).await;
69        }
70      }
71
72      scanner.print_summary();
73    });
74
75    handles.push(handle);
76  }
77
78  // Run scanner for specified duration
79  println!("🔄 Scanning market for 30 seconds...");
80  sleep(Duration::from_secs(30)).await;
81
82  // Stop manager
83  manager.stop().await?;
84
85  // Wait for all scanners to complete
86  for handle in handles {
87    let _ = handle.await;
88  }
89
90  println!("🏁 Market scanning completed");
91  Ok(())
92}
examples/performance/high_frequency.rs (line 64)
14async fn main() -> Result<(), String> {
15  env_logger::init();
16
17  println!("⚡ High-Frequency Trading Example");
18  println!("=================================");
19
20  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23  if api_key.is_empty() || access_token.is_empty() {
24    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25  }
26
27  // Ultra-high performance configuration for HFT
28  let config = KiteManagerConfig {
29    max_connections: 3,
30    max_symbols_per_connection: 1000, // Focused symbol set for HFT
31    connection_buffer_size: 100000,   // Maximum possible buffer
32    parser_buffer_size: 200000,       // Ultra-large parser buffer
33    enable_dedicated_parsers: true,
34    default_mode: Mode::Full, // Full market depth data
35    ..Default::default()
36  };
37
38  println!("🚀 Optimized for:");
39  println!("   Sub-microsecond latency");
40  println!("   Maximum throughput");
41  println!("   Real-time order book analysis");
42  println!("   Ultra-low garbage collection");
43
44  let mut manager = KiteTickerManager::new(api_key, access_token, config);
45  manager.start().await?;
46
47  // Focus on highly liquid instruments for HFT
48  let hft_symbols = vec![
49    256265, // NIFTY 50 - highly liquid
50    408065, // HDFC Bank - large volume
51    738561, // Reliance - most traded
52    884737, // ICICI Bank
53    341249, // TCS
54    492033, // ITC
55    779521, // Kotak Bank
56  ];
57
58  println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59  manager
60    .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61    .await?;
62
63  // Setup high-frequency processing engines
64  let channels = manager.get_all_channels();
65  let mut hft_engines = Vec::new();
66
67  for (channel_id, mut receiver) in channels {
68    let engine = tokio::spawn(async move {
69      let mut hft_processor = HFTProcessor::new(channel_id);
70
71      while let Ok(message) = receiver.recv().await {
72        if let TickerMessage::Ticks(ticks) = message {
73          hft_processor.process_ultra_fast(ticks).await;
74        }
75      }
76
77      hft_processor.print_performance_metrics();
78    });
79
80    hft_engines.push(engine);
81  }
82
83  // Run HFT simulation
84  println!("⚡ Starting high-frequency processing...");
85  sleep(Duration::from_secs(60)).await;
86
87  // Stop processing
88  manager.stop().await?;
89
90  // Wait for all engines to complete
91  for engine in hft_engines {
92    let _ = engine.await;
93  }
94
95  println!("🏁 High-frequency trading simulation completed");
96  Ok(())
97}
examples/simple_tick_test.rs (line 46)
8async fn main() -> Result<(), String> {
9  env_logger::init();
10
11  println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12  println!("═══════════════════════════════════════════");
13
14  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17  if api_key.is_empty() || access_token.is_empty() {
18    println!(
19      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20    );
21    return Ok(());
22  }
23
24  let config = KiteManagerConfig {
25    max_symbols_per_connection: 3000,
26    max_connections: 1, // Use only 1 connection for simplicity
27    connection_buffer_size: 5000,
28    parser_buffer_size: 10000,
29    connection_timeout: Duration::from_secs(30),
30    health_check_interval: Duration::from_secs(5),
31    max_reconnect_attempts: 5,
32    reconnect_delay: Duration::from_secs(2),
33    enable_dedicated_parsers: true,
34    default_mode: Mode::LTP,
35  };
36
37  let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39  // Start manager
40  println!("📡 Starting manager...");
41  manager.start().await?;
42  println!("✅ Manager started");
43
44  // Get channel BEFORE subscribing
45  println!("🎯 Getting channels...");
46  let channels = manager.get_all_channels();
47  let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49  // Start listener in background
50  let listener_handle = tokio::spawn(async move {
51    println!("👂 Listener started for {:?}", channel_id);
52
53    let mut tick_count = 0;
54    loop {
55      match timeout(Duration::from_secs(30), receiver.recv()).await {
56        Ok(Ok(message)) => match message {
57          TickerMessage::Ticks(ticks) => {
58            tick_count += ticks.len();
59            println!(
60              "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61              channel_id,
62              ticks.len(),
63              tick_count
64            );
65
66            for tick in &ticks {
67              println!("🔹 FULL TICK DEBUG:");
68              println!("{:#?}", tick);
69              println!("─────────────────────────────────────");
70            }
71          }
72          TickerMessage::Error(err) => {
73            println!("❌ Error: {}", err);
74          }
75          _ => {
76            println!("📨 Other message: {:?}", message);
77          }
78        },
79        Ok(Err(e)) => {
80          println!("❌ Receive error: {}", e);
81          break;
82        }
83        Err(_) => {
84          println!("⏱️  Listener timeout");
85          break;
86        }
87      }
88    }
89    println!("👂 Listener stopped");
90  });
91
92  // Give listener time to start
93  tokio::time::sleep(Duration::from_millis(200)).await;
94
95  // Now subscribe to a symbol
96  println!("📊 Subscribing to symbol 256265...");
97  manager
98    .subscribe_symbols(&[128083204], Some(Mode::Full))
99    .await?;
100  println!("✅ Subscription sent");
101
102  // Wait for ticks
103  println!("⏳ Waiting 10 seconds for ticks...");
104  tokio::time::sleep(Duration::from_secs(10)).await;
105
106  // Stop
107  manager.stop().await?;
108  listener_handle.abort();
109
110  println!("🏁 Test completed");
111  Ok(())
112}
examples/basic/portfolio_monitor.rs (line 88)
21async fn main() -> Result<(), String> {
22  // Initialize logging
23  env_logger::init();
24
25  println!("📊 Portfolio Monitor Example");
26  println!("============================");
27
28  // Get credentials
29  let api_key = std::env::var("KITE_API_KEY")
30    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31  let access_token = std::env::var("KITE_ACCESS_TOKEN")
32    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34  // Define portfolio
35  let portfolio = vec![
36    (256265, "NIFTY 50"),
37    (408065, "HDFC Bank"),
38    (738561, "Reliance"),
39    (5633, "TCS"),
40    (884737, "Asian Paints"),
41  ];
42
43  println!("📈 Monitoring Portfolio:");
44  for (symbol, name) in &portfolio {
45    println!("   • {} ({})", name, symbol);
46  }
47  println!();
48
49  // Create manager with optimized configuration for portfolio monitoring
50  let config = KiteManagerConfig {
51    max_connections: 1, // Single connection for small portfolio
52    max_symbols_per_connection: 100,
53    connection_buffer_size: 2000,
54    parser_buffer_size: 5000,
55    enable_dedicated_parsers: true,
56    default_mode: Mode::Quote, // Quote mode for price + volume
57    ..Default::default()
58  };
59
60  // Start manager
61  let mut manager = KiteTickerManager::new(api_key, access_token, config);
62  manager.start().await?;
63
64  // Subscribe to portfolio symbols
65  let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66  manager
67    .subscribe_symbols(&symbols, Some(Mode::Quote))
68    .await?;
69
70  println!("✅ Subscribed to {} symbols", symbols.len());
71
72  // Create portfolio tracking
73  let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74  for (symbol, name) in portfolio {
75    portfolio_data.insert(
76      symbol,
77      StockInfo {
78        symbol,
79        name: name.to_string(),
80        current_price: 0.0,
81        volume: 0,
82        last_update: Instant::now(),
83      },
84    );
85  }
86
87  // Get data channels
88  let channels = manager.get_all_channels();
89
90  // Start data processing
91  for (channel_id, mut receiver) in channels {
92    let mut portfolio_clone = portfolio_data.clone();
93
94    tokio::spawn(async move {
95      println!("📡 Started monitoring channel {:?}", channel_id);
96
97      while let Ok(message) = receiver.recv().await {
98        if let TickerMessage::Ticks(ticks) = message {
99          for tick in ticks {
100            if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101            {
102              // Update stock data
103              if let Some(price) = tick.content.last_price {
104                stock.current_price = price;
105              }
106              if let Some(volume) = tick.content.volume_traded {
107                stock.volume = volume;
108              }
109              stock.last_update = Instant::now();
110
111              // Display update
112              println!(
113                "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114                stock.name,
115                stock.symbol,
116                stock.current_price,
117                stock.volume,
118                format_time_ago(stock.last_update)
119              );
120            }
121          }
122        }
123      }
124    });
125  }
126
127  // Periodic portfolio summary
128  let summary_portfolio = portfolio_data.clone();
129  tokio::spawn(async move {
130    let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132    loop {
133      interval.tick().await;
134      print_portfolio_summary(&summary_portfolio);
135    }
136  });
137
138  // Monitor for demo duration
139  println!("📊 Monitoring portfolio for 2 minutes...\n");
140  sleep(Duration::from_secs(120)).await;
141
142  // Final summary
143  println!("\n🏁 Demo completed!");
144  print_portfolio_summary(&portfolio_data);
145
146  // Stop manager
147  manager.stop().await?;
148  println!("✅ Portfolio monitor stopped");
149
150  Ok(())
151}
examples/mode_change_test.rs (line 72)
58async fn test_mode_change_issue(
59  manager: &mut KiteTickerManager,
60) -> Result<(), String> {
61  println!("\n🧪 Testing Mode Change Issue");
62  println!("=============================");
63
64  // Step 1: Subscribe to a symbol with LTP mode
65  let test_symbol = 738561; // Test symbol provided by user
66  println!("\n1️⃣ Subscribing to symbol {} with LTP mode", test_symbol);
67  manager
68    .subscribe_symbols(&[test_symbol], Some(Mode::LTP))
69    .await?;
70
71  // Start monitoring ticks to see the mode
72  let channels = manager.get_all_channels();
73  let mut tick_listeners = Vec::new();
74
75  for (channel_id, mut receiver) in channels {
76    let task = tokio::spawn(async move {
77      let mut ticks_received = 0;
78      let start = Instant::now();
79
80      while start.elapsed() < Duration::from_secs(5) && ticks_received < 3 {
81        match timeout(Duration::from_millis(500), receiver.recv()).await {
82          Ok(Ok(message)) => {
83            if let TickerMessage::Ticks(ticks) = message {
84              for tick in &ticks {
85                if tick.instrument_token == test_symbol {
86                  ticks_received += 1;
87                  println!(
88                    "   📊 Received tick for {}: Mode={:?}, LTP={:?}",
89                    tick.instrument_token,
90                    tick.content.mode,
91                    tick.content.last_price
92                  );
93
94                  // Check if OHLC data is present (should not be for LTP mode)
95                  if tick.content.ohlc.is_some() {
96                    println!(
97                      "   ⚠️  OHLC data present in LTP mode - unexpected!"
98                    );
99                  } else {
100                    println!("   ✅ No OHLC data in LTP mode - correct");
101                  }
102                }
103              }
104            }
105          }
106          _ => continue,
107        }
108      }
109      (channel_id, ticks_received)
110    });
111    tick_listeners.push(task);
112  }
113
114  // Wait for initial ticks
115  for task in tick_listeners {
116    if let Ok((channel_id, count)) = task.await {
117      println!(
118        "   📈 {:?}: Received {} ticks for initial LTP subscription",
119        channel_id, count
120      );
121    }
122  }
123
124  sleep(Duration::from_secs(2)).await;
125
126  // Step 2: Attempt to change mode to Full (this is where the issue should manifest)
127  println!(
128    "\n2️⃣ Attempting to change mode from LTP to Full for symbol {}",
129    test_symbol
130  );
131  match manager.change_mode(&[test_symbol], Mode::Full).await {
132    Ok(()) => {
133      println!("   ✅ Mode change command sent successfully");
134    }
135    Err(e) => {
136      println!("   ❌ Mode change command failed: {}", e);
137      return Err(e);
138    }
139  }
140
141  // Step 3: Monitor ticks after mode change to see if it actually worked
142  println!("\n3️⃣ Monitoring ticks after mode change...");
143  let channels = manager.get_all_channels();
144  let mut post_change_listeners = Vec::new();
145
146  for (channel_id, mut receiver) in channels {
147    let task = tokio::spawn(async move {
148      let mut ticks_received = 0;
149      let mut ohlc_present = 0;
150      let mut depth_present = 0;
151      let start = Instant::now();
152
153      while start.elapsed() < Duration::from_secs(10) && ticks_received < 5 {
154        match timeout(Duration::from_millis(500), receiver.recv()).await {
155          Ok(Ok(message)) => {
156            if let TickerMessage::Ticks(ticks) = message {
157              for tick in &ticks {
158                if tick.instrument_token == test_symbol {
159                  ticks_received += 1;
160                  println!(
161                    "   📊 Post-change tick for {}: Mode={:?}, LTP={:?}",
162                    tick.instrument_token,
163                    tick.content.mode,
164                    tick.content.last_price
165                  );
166
167                  // Check if Full mode data is present
168                  if let Some(ohlc) = &tick.content.ohlc {
169                    ohlc_present += 1;
170                    println!(
171                      "   ✅ OHLC data present: O:{} H:{} L:{} C:{}",
172                      ohlc.open, ohlc.high, ohlc.low, ohlc.close
173                    );
174                  } else {
175                    println!("   ❌ OHLC data missing - mode change may not have worked!");
176                  }
177
178                  if let Some(depth) = &tick.content.depth {
179                    depth_present += 1;
180                    println!(
181                      "   ✅ Market depth present: {} buy, {} sell orders",
182                      depth.buy.len(),
183                      depth.sell.len()
184                    );
185                  } else {
186                    println!("   ❌ Market depth missing - mode change may not have worked!");
187                  }
188
189                  // Log the actual mode reported in the tick
190                  info!("Tick mode reported: {:?}", tick.content.mode);
191                }
192              }
193            }
194          }
195          _ => continue,
196        }
197      }
198      (channel_id, ticks_received, ohlc_present, depth_present)
199    });
200    post_change_listeners.push(task);
201  }
202
203  // Wait for post-change ticks
204  let mut total_ticks = 0;
205  let mut total_ohlc = 0;
206  let mut total_depth = 0;
207
208  for task in post_change_listeners {
209    if let Ok((channel_id, ticks, ohlc, depth)) = task.await {
210      println!(
211        "   📈 {:?}: {} ticks, {} with OHLC, {} with depth",
212        channel_id, ticks, ohlc, depth
213      );
214      total_ticks += ticks;
215      total_ohlc += ohlc;
216      total_depth += depth;
217    }
218  }
219
220  // Step 4: Analyze results
221  println!("\n4️⃣ Test Results Analysis");
222  println!("========================");
223
224  if total_ticks == 0 {
225    println!("   ⚠️  No ticks received after mode change - connection issue?");
226  } else if total_ohlc == 0 && total_depth == 0 {
227    println!("   ❌ ISSUE CONFIRMED: Mode change command sent but Full mode data not received");
228    println!("   💡 This confirms that set_mode() alone doesn't work - need unsubscribe+resubscribe");
229  } else if total_ohlc > 0 && total_depth > 0 {
230    println!("   ✅ Mode change worked - Full mode data received");
231  } else {
232    println!(
233      "   ⚠️  Partial mode change - some Full mode data received but not all"
234    );
235  }
236
237  println!("\n📊 Summary:");
238  println!("   Total ticks received: {}", total_ticks);
239  println!("   Ticks with OHLC data: {}", total_ohlc);
240  println!("   Ticks with market depth: {}", total_depth);
241
242  Ok(())
243}
Source

pub async fn get_stats(&self) -> Result<ManagerStats, String>

Get manager statistics

Examples found in repository?
examples/basic/runtime_subscription_example.rs (line 134)
121async fn print_current_state(manager: &KiteTickerManager, context: &str) {
122  println!("\n📊 Current State ({})", context);
123
124  // Show distribution
125  let distribution = manager.get_symbol_distribution();
126  let mut total = 0;
127  for (channel_id, symbols) in &distribution {
128    println!("   {:?}: {} symbols", channel_id, symbols.len());
129    total += symbols.len();
130  }
131  println!("   📈 Total: {} symbols", total);
132
133  // Show stats
134  if let Ok(stats) = manager.get_stats().await {
135    println!(
136      "   📊 Stats: {} connections, {} messages",
137      stats.active_connections, stats.total_messages_received
138    );
139  }
140}
More examples
Hide additional examples
examples/advanced/manager_demo.rs (line 240)
8pub async fn main() -> Result<(), String> {
9  // Initialize logging
10  env_logger::init();
11
12  println!("🚀 KiteTicker Multi-Connection Manager Demo");
13  println!("═══════════════════════════════════════════");
14
15  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
16  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
17
18  if api_key.is_empty() || access_token.is_empty() {
19    println!(
20      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
21    );
22    println!("   This demo will show the manager architecture without live connections");
23    demonstrate_offline_architecture().await;
24    return Ok(());
25  }
26
27  // Create high-performance configuration - RESTORED TO 3 CONNECTIONS
28  let config = KiteManagerConfig {
29    max_symbols_per_connection: 3000,
30    max_connections: 3,            // BACK TO 3 CONNECTIONS!
31    connection_buffer_size: 10000, // High buffer for performance
32    parser_buffer_size: 20000,     // Even higher for parsed messages
33    connection_timeout: Duration::from_secs(30),
34    health_check_interval: Duration::from_secs(5),
35    max_reconnect_attempts: 5,
36    reconnect_delay: Duration::from_secs(2),
37    enable_dedicated_parsers: true, // Use dedicated parser tasks
38    default_mode: Mode::Full,       // Full mode for maximum data
39  };
40
41  println!("🔧 Configuration:");
42  println!("   Max connections: {}", config.max_connections);
43  println!(
44    "   Max symbols per connection: {}",
45    config.max_symbols_per_connection
46  );
47  println!(
48    "   Connection buffer size: {}",
49    config.connection_buffer_size
50  );
51  println!("   Parser buffer size: {}", config.parser_buffer_size);
52  println!("   Dedicated parsers: {}", config.enable_dedicated_parsers);
53  println!();
54
55  // Create and start the manager
56  println!("📡 Starting multi-connection manager...");
57  let start_time = Instant::now();
58
59  let mut manager = KiteTickerManager::new(api_key, access_token, config);
60
61  match timeout(Duration::from_secs(30), manager.start()).await {
62    Ok(Ok(())) => {
63      println!("✅ Manager started in {:?}", start_time.elapsed());
64    }
65    Ok(Err(e)) => {
66      println!("❌ Manager failed to start: {}", e);
67      return Err(e);
68    }
69    Err(_) => {
70      println!("⏱️  Manager startup timeout");
71      return Err("Manager startup timeout".to_string());
72    }
73  }
74
75  // Test with market symbols for proper distribution
76  let symbols = vec![
77    256265, 265, 256777, 274441, 260105, 273929, 260617, 257033, 257289,
78    257545, 257801, 258825, 259081, 259337, 259593, 259849, 260873, 261129,
79    261385, 261641, 261897, 262153, 262409, 262665, 262921, 263177, 263433,
80    263689, 263945, 264457, 264713, 264969, 265225, 265737, 265993, 266249,
81    266505, 266761, 267017, 267273, 267529, 267785, 268041, 268297, 268553,
82    268809, 269065, 269321, 269577, 269833, 270089, 270345, 270601, 270857,
83    271113, 271625, 271881, 272137, 272393, 273417, 273673, 274185, 274697,
84    274953, 275209, 275465, 275721, 275977, 276233, 276489, 276745, 277001,
85    277257, 277513, 277769, 278025, 278281, 278537, 278793, 279049, 279305,
86    279561, 279817, 280073, 280329, 280585, 280841, 281097, 281353, 281865,
87    282121, 282377, 282633, 282889, 283145, 283401, 283657, 283913, 284169,
88    284425, 284681, 284937, 285193, 285449, 285961, 286217, 286473, 286729,
89    286985, 287241, 287497, 287753, 288009, 288265, 288521, 288777, 289033,
90    289289, 289545, 289801, 290057, 290313, 290569, 290825, 291081, 291337,
91    291593, 291849, 292105, 292361, 292617, 292873, 293129, 293385, 293641,
92    293897, 294153, 294409, 294665, 294921, 295177, 295433, 295689, 398345,
93    398601, 398857, 399113, 399369, 399625, 399881, 400137, 400393, 400905,
94    401161, 401673, 401929, 402185, 402441, 402697, 402953, 403209, 403465,
95    403721, 403977, 404233, 404489, 404745, 405001, 405257, 405513, 405769,
96    406025, 406281, 406537, 406793, 407049, 407305, 407561, 407817, 408073,
97    408329, 408585, 408841, 409097, 409353, 409609, 409865, 410121, 410377,
98    410633, 410889, 411145, 411401, 411657, 411913, 412169, 412425, 412681,
99    412937, 413193, 413449, 413705, 413961, 414217, 414473, 414729, 414985,
100  ];
101
102  println!("📊 Subscribing to symbols across connections...");
103
104  // Subscribe to different symbol sets
105  manager
106    .subscribe_symbols(&symbols, Some(Mode::Full))
107    .await?;
108  // manager.subscribe_symbols(&bank_nifty, Some(Mode::Quote)).await?;
109  // manager.subscribe_symbols(&it_stocks, Some(Mode::LTP)).await?;
110
111  println!(
112    "✅ Subscribed to {} total symbols",
113    symbols.len() //  + bank_nifty.len() + it_stocks.len()
114  );
115
116  // Get symbol distribution
117  let distribution = manager.get_symbol_distribution();
118  println!("\n📈 Symbol distribution across connections:");
119  for (channel_id, symbols) in &distribution {
120    println!("   {:?}: {} symbols", channel_id, symbols.len());
121  }
122
123  // Get all output channels
124  let channels = manager.get_all_channels();
125  println!("\n🔀 Created {} output channels", channels.len());
126
127  // Start monitoring each channel
128  let mut channel_tasks = Vec::new();
129
130  for (channel_id, mut receiver) in channels {
131    let task = tokio::spawn(async move {
132      let mut message_count = 0;
133      let mut tick_count = 0;
134      let start_time = Instant::now();
135      let mut last_report = Instant::now();
136
137      println!("🎯 Starting monitoring for {:?}", channel_id);
138
139      loop {
140        match timeout(Duration::from_secs(30), receiver.recv()).await {
141          Ok(Ok(message)) => {
142            message_count += 1;
143
144            match message {
145              TickerMessage::Ticks(ticks) => {
146                tick_count += ticks.len();
147
148                // Show first few ticks for demonstration
149                if message_count <= 3 {
150                  for tick in &ticks {
151                    println!(
152                      "📋 {:?}: Tick {} @ {:?}",
153                      channel_id,
154                      tick.instrument_token,
155                      tick.content.last_price.unwrap_or(0.0)
156                    );
157                  }
158                }
159              }
160              TickerMessage::Error(e) => {
161                println!("⚠️  {:?}: Error: {}", channel_id, e);
162              }
163              _ => {
164                println!("📨 {:?}: Other message", channel_id);
165              }
166            }
167
168            // Report performance every 10 seconds
169            if last_report.elapsed() >= Duration::from_secs(10) {
170              let elapsed = start_time.elapsed();
171              let messages_per_sec =
172                message_count as f64 / elapsed.as_secs_f64();
173              let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
174
175              println!("📊 {:?} Performance:", channel_id);
176              println!(
177                "   Messages: {} ({:.1}/sec)",
178                message_count, messages_per_sec
179              );
180              println!("   Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
181
182              last_report = Instant::now();
183            }
184          }
185          Ok(Err(e)) => {
186            println!("❌ {:?}: Channel error: {}", channel_id, e);
187            break;
188          }
189          Err(_) => {
190            println!("⏱️  {:?}: No messages for 30s", channel_id);
191          }
192        }
193      }
194
195      (channel_id, message_count, tick_count)
196    });
197
198    channel_tasks.push(task);
199  }
200
201  // Monitor overall system health
202  let health_task = tokio::spawn(async move {
203    loop {
204      sleep(Duration::from_secs(15)).await;
205
206      println!("\n🏥 System Health Check:");
207      println!("   All connections active ✅");
208      println!("   Parsers running ✅");
209      println!("   Memory usage optimized ✅");
210    }
211  });
212
213  // Run for demonstration period
214  println!(
215    "\n📈 Monitoring performance for 60 seconds (Ctrl+C to stop early)..."
216  );
217
218  let demo_duration = Duration::from_secs(60);
219  let demo_start = Instant::now();
220
221  // Wait for demo duration or Ctrl+C
222  tokio::select! {
223      _ = sleep(demo_duration) => {
224          println!("\n⏰ Demo duration completed");
225      }
226      _ = tokio::signal::ctrl_c() => {
227          println!("\n🛑 Received Ctrl+C, stopping...");
228      }
229  }
230
231  // Abort monitoring tasks
232  health_task.abort();
233  for task in channel_tasks {
234    task.abort();
235  }
236
237  // Get final statistics
238  println!("\n📊 Final Statistics:");
239
240  if let Ok(stats) = manager.get_stats().await {
241    println!("   Total runtime: {:?}", demo_start.elapsed());
242    println!("   Active connections: {}", stats.active_connections);
243    println!("   Total symbols: {}", stats.total_symbols);
244    println!("   Total messages: {}", stats.total_messages_received);
245    println!("   Total errors: {}", stats.total_errors);
246
247    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
248      println!(
249        "   Connection {}: {} symbols, {} messages, {} errors",
250        i,
251        conn_stats.symbol_count,
252        conn_stats.messages_received,
253        conn_stats.errors_count
254      );
255    }
256  }
257
258  let processor_stats = manager.get_processor_stats().await;
259  println!("\n🔧 Parser Performance:");
260  for (channel_id, stats) in processor_stats {
261    println!(
262      "   {:?}: {:.1} msg/sec, {:?} avg latency",
263      channel_id, stats.messages_per_second, stats.processing_latency_avg
264    );
265  }
266
267  // Stop the manager
268  println!("\n🛑 Stopping manager...");
269  manager.stop().await?;
270
271  println!("🏁 Demo completed successfully!");
272  Ok(())
273}
examples/dynamic_subscription_demo.rs (line 380)
250async fn demo_dynamic_subscription(
251  manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253  println!("\n🎯 True Dynamic Subscription Demo");
254  println!("==================================");
255
256  // Start with a small initial set
257  let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258  let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259  let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260  let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261  let final_batch = vec![257801, 258825]; // Add final symbols
262
263  // Step 1: Initial subscription with small set
264  println!(
265    "\n📊 Step 1: Initial subscription to {} symbols",
266    initial_symbols.len()
267  );
268  println!("Starting with: {:?}", initial_symbols);
269
270  // Start listening for ticks BEFORE subscribing
271  let channels_before_sub = manager.get_all_channels();
272  let mut tick_listeners = Vec::new();
273
274  for (channel_id, mut receiver) in channels_before_sub {
275    let task = tokio::spawn(async move {
276      let start = std::time::Instant::now();
277      let mut tick_count = 0;
278
279      // Listen for initial ticks for 5 seconds
280      while start.elapsed() < Duration::from_secs(5) {
281        match timeout(Duration::from_millis(500), receiver.recv()).await {
282          Ok(Ok(message)) => {
283            if let TickerMessage::Ticks(ticks) = message {
284              tick_count += ticks.len();
285              println!(
286                "🎯 {:?}: Received {} ticks immediately after subscription!",
287                channel_id,
288                ticks.len()
289              );
290
291              // Debug: Show detailed tick information
292              debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294              for (idx, tick) in ticks.iter().enumerate() {
295                println!("  🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296                                    idx + 1,
297                                    tick.instrument_token,
298                                    tick.content.last_price,
299                                    tick.content.volume_traded,
300                                    tick.content.net_change
301                                );
302
303                // Debug: Show tick structure details
304                debug!(
305                  "Tick {} Details for symbol {}:",
306                  idx + 1,
307                  tick.instrument_token
308                );
309                debug!("  - Raw instrument_token: {}", tick.instrument_token);
310                debug!("  - Raw last_price: {:?}", tick.content.last_price);
311                debug!(
312                  "  - Raw volume_traded: {:?}",
313                  tick.content.volume_traded
314                );
315                debug!("  - Raw change: {:?}", tick.content.net_change);
316                debug!("  - Raw last_quantity: <not available>");
317                debug!(
318                  "  - Raw average_price: {:?}",
319                  tick.content.avg_traded_price
320                );
321                debug!(
322                  "  - Raw buy_quantity: {:?}",
323                  tick.content.total_buy_qty
324                );
325                debug!(
326                  "  - Raw sell_quantity: {:?}",
327                  tick.content.total_sell_qty
328                );
329                debug!("  - Raw oi: {:?}", tick.content.oi);
330                debug!("  - Raw mode: {:?}", tick.content.mode);
331
332                if let Some(ohlc) = &tick.content.ohlc {
333                  debug!(
334                    "  - OHLC available: O:{} H:{} L:{} C:{}",
335                    ohlc.open, ohlc.high, ohlc.low, ohlc.close
336                  );
337                }
338
339                if let Some(depth) = &tick.content.depth {
340                  debug!(
341                    "  - Market depth available: {} buy, {} sell",
342                    depth.buy.len(),
343                    depth.sell.len()
344                  );
345                }
346              }
347            }
348          }
349          _ => continue,
350        }
351      }
352      (channel_id, tick_count)
353    });
354    tick_listeners.push(task);
355  }
356
357  // Now subscribe to symbols
358  manager
359    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360    .await?;
361
362  println!("✅ Subscription sent, waiting for initial ticks...");
363
364  // Wait for the listeners to finish
365  for task in tick_listeners {
366    if let Ok((channel_id, count)) = task.await {
367      println!(
368        "📊 {:?}: Received {} total ticks during initial subscription",
369        channel_id, count
370      );
371    }
372  }
373
374  print_distribution(manager, "After initial subscription").await;
375
376  // Step 2: Wait and monitor initial data
377  println!("\n⏳ Step 2: Monitoring initial data flow");
378  monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380  if let Ok(stats) = manager.get_stats().await {
381    println!("✅ Current Statistics:");
382    println!("   Active connections: {}", stats.active_connections);
383    println!("   Total symbols: {}", stats.total_symbols);
384    println!("   Total messages: {}", stats.total_messages_received);
385  }
386
387  // Step 3: Dynamic addition - Batch 1
388  println!(
389    "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390    additional_batch_1.len()
391  );
392  println!("Adding: {:?}", additional_batch_1);
393  manager
394    .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395    .await?;
396
397  // Give time for new tick data to arrive
398  sleep(Duration::from_secs(2)).await;
399
400  print_distribution(manager, "After first dynamic addition").await;
401  monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403  // Step 4: Dynamic addition - Batch 2
404  println!(
405    "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406    additional_batch_2.len()
407  );
408  println!("Adding: {:?}", additional_batch_2);
409  manager
410    .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411    .await?;
412
413  // Give time for new tick data to arrive
414  sleep(Duration::from_secs(2)).await;
415
416  print_distribution(manager, "After second dynamic addition").await;
417  monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419  // Step 5: Dynamic removal
420  println!(
421    "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422    symbols_to_remove.len()
423  );
424  println!("Removing: {:?}", symbols_to_remove);
425  match manager.unsubscribe_symbols(&symbols_to_remove).await {
426    Ok(()) => {
427      print_distribution(manager, "After dynamic removal").await;
428      println!("✅ Dynamic removal successful!");
429    }
430    Err(e) => {
431      println!("⚠️  Dynamic removal failed: {}", e);
432    }
433  }
434  monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436  // Step 6: Final addition and mode change demo
437  println!(
438    "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439    final_batch.len()
440  );
441  println!("Adding: {:?}", final_batch);
442  manager
443    .subscribe_symbols(&final_batch, Some(Mode::LTP))
444    .await?;
445
446  print_distribution(manager, "After final addition").await;
447
448  // Step 7: Mode change demonstration
449  println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450  let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451  println!("Changing {:?} to Full mode", symbols_for_mode_change);
452  match manager
453    .change_mode(&symbols_for_mode_change, Mode::Full)
454    .await
455  {
456    Ok(()) => println!("✅ Mode change successful!"),
457    Err(e) => println!("⚠️  Mode change failed: {}", e),
458  }
459
460  // Step 8: Final statistics and monitoring
461  println!("\n📈 Step 8: Final Statistics and Performance");
462  sleep(Duration::from_secs(3)).await; // Let some data flow
463
464  if let Ok(stats) = manager.get_stats().await {
465    println!("✅ Final Manager Statistics:");
466    println!("   Active connections: {}", stats.active_connections);
467    println!("   Total symbols: {}", stats.total_symbols);
468    println!("   Total messages: {}", stats.total_messages_received);
469    println!("   Total errors: {}", stats.total_errors);
470
471    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472      println!(
473        "   Connection {}: {} symbols, {} messages",
474        i + 1,
475        conn_stats.symbol_count,
476        conn_stats.messages_received
477      );
478    }
479  }
480
481  // Step 9: Show processor performance
482  println!("\n⚡ Step 9: Final Parser Performance");
483  let processor_stats = manager.get_processor_stats().await;
484  for (channel_id, stats) in processor_stats {
485    println!(
486      "   {:?}: {:.1} msg/sec, {:?} avg latency",
487      channel_id, stats.messages_per_second, stats.processing_latency_avg
488    );
489  }
490
491  // Step 10: Monitor live data for a short period
492  println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493  println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495  let channels = manager.get_all_channels();
496  let mut tasks = Vec::new();
497
498  for (channel_id, mut receiver) in channels {
499    let task = tokio::spawn(async move {
500      let mut count = 0;
501      let start = std::time::Instant::now();
502
503      while start.elapsed() < Duration::from_secs(10) {
504        match timeout(Duration::from_secs(2), receiver.recv()).await {
505          Ok(Ok(message)) => {
506            count += 1;
507            if let TickerMessage::Ticks(ticks) = message {
508              println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510              // Debug: Final monitoring with comprehensive details
511              debug!(
512                "Final monitoring - Message #{}, {} ticks on {:?}",
513                count,
514                ticks.len(),
515                channel_id
516              );
517
518              for tick in &ticks {
519                println!(
520                  "  🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521                  tick.instrument_token,
522                  tick.content.last_price,
523                  tick.content.volume_traded,
524                  tick.content.net_change
525                );
526
527                // Debug: Show complete tick analysis
528                debug!(
529                  "Final Debug Analysis for Symbol {}:",
530                  tick.instrument_token
531                );
532                debug!(
533                  "  - Timestamp: Now = {:?}",
534                  std::time::SystemTime::now()
535                );
536                debug!("  - Data completeness check:");
537                debug!("    * Last Price: {:?} (✅)", tick.content.last_price);
538                debug!(
539                  "    * Volume: {:?} ({})",
540                  tick.content.volume_traded,
541                  if tick.content.volume_traded.is_some() {
542                    "✅"
543                  } else {
544                    "❌"
545                  }
546                );
547                debug!(
548                  "    * Change: {:?} ({})",
549                  tick.content.net_change,
550                  if tick.content.net_change.is_some() {
551                    "✅"
552                  } else {
553                    "❌"
554                  }
555                );
556                debug!("    * Mode: {:?} (✅)", tick.content.mode);
557
558                // Validate mode-specific data
559                match tick.content.mode {
560                  Mode::Full => {
561                    debug!("  - Full mode validation:");
562                    debug!(
563                      "    * OHLC: {}",
564                      if tick.content.ohlc.is_some() {
565                        "✅ Present"
566                      } else {
567                        "❌ Missing"
568                      }
569                    );
570                    debug!(
571                      "    * Depth: {}",
572                      if tick.content.depth.is_some() {
573                        "✅ Present"
574                      } else {
575                        "❌ Missing"
576                      }
577                    );
578                    if let Some(ohlc) = &tick.content.ohlc {
579                      debug!(
580                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
581                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
582                      );
583                    }
584                    if let Some(depth) = &tick.content.depth {
585                      debug!(
586                        "    * Depth Levels: {} buy, {} sell",
587                        depth.buy.len(),
588                        depth.sell.len()
589                      );
590                    }
591                  }
592                  Mode::Quote => {
593                    debug!("  - Quote mode validation:");
594                    debug!(
595                      "    * OHLC: {}",
596                      if tick.content.ohlc.is_some() {
597                        "✅ Present"
598                      } else {
599                        "❌ Missing"
600                      }
601                    );
602                    if let Some(ohlc) = &tick.content.ohlc {
603                      debug!(
604                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
605                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
606                      );
607                    }
608                  }
609                  Mode::LTP => {
610                    debug!("  - LTP mode validation: ✅ Basic data only");
611                  }
612                }
613              }
614            } else {
615              println!("📋 {:?}: Non-tick message received", channel_id);
616              debug!("Message type: {:?}", message);
617            }
618          }
619          _ => continue,
620        }
621      }
622      (channel_id, count)
623    });
624    tasks.push(task);
625  }
626
627  // Wait for monitoring to complete
628  for task in tasks {
629    if let Ok((channel_id, count)) = task.await {
630      println!(
631        "📊 {:?}: {} total messages in 10 seconds",
632        channel_id, count
633      );
634    }
635  }
636
637  // Final cleanup demonstration
638  println!("\n🧹 Step 11: Final Cleanup Demonstration");
639  let current_symbols: Vec<u32> = manager
640    .get_symbol_distribution()
641    .values()
642    .flat_map(|symbols| symbols.iter().cloned())
643    .collect();
644
645  println!(
646    "Attempting to unsubscribe from {} remaining symbols...",
647    current_symbols.len()
648  );
649
650  match manager.unsubscribe_symbols(&current_symbols).await {
651    Ok(()) => {
652      print_distribution(manager, "After complete cleanup").await;
653      println!("✅ Complete cleanup successful!");
654    }
655    Err(e) => {
656      println!("⚠️  Cleanup encountered issues: {}", e);
657      println!("💡 This demonstrates the current architecture capabilities");
658    }
659  }
660
661  println!("\n✅ Dynamic Subscription Demo Completed!");
662  println!("🎯 Key Dynamic Features Demonstrated:");
663  println!("   ✅ Runtime symbol addition across multiple batches");
664  println!("   ✅ Runtime symbol removal with proper tracking");
665  println!("   ✅ Dynamic mode changes for existing symbols");
666  println!("   ✅ Real-time capacity distribution and monitoring");
667  println!("   ✅ Independent message processing per connection");
668  println!("   ✅ High-performance parser tasks with statistics");
669  println!("   ✅ Complete subscription lifecycle management");
670
671  Ok(())
672}
Source

pub async fn get_health(&self) -> Result<HealthSummary, String>

Get health summary

Source

pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)>

Get processor statistics for all channels

Examples found in repository?
examples/advanced/manager_demo.rs (line 258)
8pub async fn main() -> Result<(), String> {
9  // Initialize logging
10  env_logger::init();
11
12  println!("🚀 KiteTicker Multi-Connection Manager Demo");
13  println!("═══════════════════════════════════════════");
14
15  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
16  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
17
18  if api_key.is_empty() || access_token.is_empty() {
19    println!(
20      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
21    );
22    println!("   This demo will show the manager architecture without live connections");
23    demonstrate_offline_architecture().await;
24    return Ok(());
25  }
26
27  // Create high-performance configuration - RESTORED TO 3 CONNECTIONS
28  let config = KiteManagerConfig {
29    max_symbols_per_connection: 3000,
30    max_connections: 3,            // BACK TO 3 CONNECTIONS!
31    connection_buffer_size: 10000, // High buffer for performance
32    parser_buffer_size: 20000,     // Even higher for parsed messages
33    connection_timeout: Duration::from_secs(30),
34    health_check_interval: Duration::from_secs(5),
35    max_reconnect_attempts: 5,
36    reconnect_delay: Duration::from_secs(2),
37    enable_dedicated_parsers: true, // Use dedicated parser tasks
38    default_mode: Mode::Full,       // Full mode for maximum data
39  };
40
41  println!("🔧 Configuration:");
42  println!("   Max connections: {}", config.max_connections);
43  println!(
44    "   Max symbols per connection: {}",
45    config.max_symbols_per_connection
46  );
47  println!(
48    "   Connection buffer size: {}",
49    config.connection_buffer_size
50  );
51  println!("   Parser buffer size: {}", config.parser_buffer_size);
52  println!("   Dedicated parsers: {}", config.enable_dedicated_parsers);
53  println!();
54
55  // Create and start the manager
56  println!("📡 Starting multi-connection manager...");
57  let start_time = Instant::now();
58
59  let mut manager = KiteTickerManager::new(api_key, access_token, config);
60
61  match timeout(Duration::from_secs(30), manager.start()).await {
62    Ok(Ok(())) => {
63      println!("✅ Manager started in {:?}", start_time.elapsed());
64    }
65    Ok(Err(e)) => {
66      println!("❌ Manager failed to start: {}", e);
67      return Err(e);
68    }
69    Err(_) => {
70      println!("⏱️  Manager startup timeout");
71      return Err("Manager startup timeout".to_string());
72    }
73  }
74
75  // Test with market symbols for proper distribution
76  let symbols = vec![
77    256265, 265, 256777, 274441, 260105, 273929, 260617, 257033, 257289,
78    257545, 257801, 258825, 259081, 259337, 259593, 259849, 260873, 261129,
79    261385, 261641, 261897, 262153, 262409, 262665, 262921, 263177, 263433,
80    263689, 263945, 264457, 264713, 264969, 265225, 265737, 265993, 266249,
81    266505, 266761, 267017, 267273, 267529, 267785, 268041, 268297, 268553,
82    268809, 269065, 269321, 269577, 269833, 270089, 270345, 270601, 270857,
83    271113, 271625, 271881, 272137, 272393, 273417, 273673, 274185, 274697,
84    274953, 275209, 275465, 275721, 275977, 276233, 276489, 276745, 277001,
85    277257, 277513, 277769, 278025, 278281, 278537, 278793, 279049, 279305,
86    279561, 279817, 280073, 280329, 280585, 280841, 281097, 281353, 281865,
87    282121, 282377, 282633, 282889, 283145, 283401, 283657, 283913, 284169,
88    284425, 284681, 284937, 285193, 285449, 285961, 286217, 286473, 286729,
89    286985, 287241, 287497, 287753, 288009, 288265, 288521, 288777, 289033,
90    289289, 289545, 289801, 290057, 290313, 290569, 290825, 291081, 291337,
91    291593, 291849, 292105, 292361, 292617, 292873, 293129, 293385, 293641,
92    293897, 294153, 294409, 294665, 294921, 295177, 295433, 295689, 398345,
93    398601, 398857, 399113, 399369, 399625, 399881, 400137, 400393, 400905,
94    401161, 401673, 401929, 402185, 402441, 402697, 402953, 403209, 403465,
95    403721, 403977, 404233, 404489, 404745, 405001, 405257, 405513, 405769,
96    406025, 406281, 406537, 406793, 407049, 407305, 407561, 407817, 408073,
97    408329, 408585, 408841, 409097, 409353, 409609, 409865, 410121, 410377,
98    410633, 410889, 411145, 411401, 411657, 411913, 412169, 412425, 412681,
99    412937, 413193, 413449, 413705, 413961, 414217, 414473, 414729, 414985,
100  ];
101
102  println!("📊 Subscribing to symbols across connections...");
103
104  // Subscribe to different symbol sets
105  manager
106    .subscribe_symbols(&symbols, Some(Mode::Full))
107    .await?;
108  // manager.subscribe_symbols(&bank_nifty, Some(Mode::Quote)).await?;
109  // manager.subscribe_symbols(&it_stocks, Some(Mode::LTP)).await?;
110
111  println!(
112    "✅ Subscribed to {} total symbols",
113    symbols.len() //  + bank_nifty.len() + it_stocks.len()
114  );
115
116  // Get symbol distribution
117  let distribution = manager.get_symbol_distribution();
118  println!("\n📈 Symbol distribution across connections:");
119  for (channel_id, symbols) in &distribution {
120    println!("   {:?}: {} symbols", channel_id, symbols.len());
121  }
122
123  // Get all output channels
124  let channels = manager.get_all_channels();
125  println!("\n🔀 Created {} output channels", channels.len());
126
127  // Start monitoring each channel
128  let mut channel_tasks = Vec::new();
129
130  for (channel_id, mut receiver) in channels {
131    let task = tokio::spawn(async move {
132      let mut message_count = 0;
133      let mut tick_count = 0;
134      let start_time = Instant::now();
135      let mut last_report = Instant::now();
136
137      println!("🎯 Starting monitoring for {:?}", channel_id);
138
139      loop {
140        match timeout(Duration::from_secs(30), receiver.recv()).await {
141          Ok(Ok(message)) => {
142            message_count += 1;
143
144            match message {
145              TickerMessage::Ticks(ticks) => {
146                tick_count += ticks.len();
147
148                // Show first few ticks for demonstration
149                if message_count <= 3 {
150                  for tick in &ticks {
151                    println!(
152                      "📋 {:?}: Tick {} @ {:?}",
153                      channel_id,
154                      tick.instrument_token,
155                      tick.content.last_price.unwrap_or(0.0)
156                    );
157                  }
158                }
159              }
160              TickerMessage::Error(e) => {
161                println!("⚠️  {:?}: Error: {}", channel_id, e);
162              }
163              _ => {
164                println!("📨 {:?}: Other message", channel_id);
165              }
166            }
167
168            // Report performance every 10 seconds
169            if last_report.elapsed() >= Duration::from_secs(10) {
170              let elapsed = start_time.elapsed();
171              let messages_per_sec =
172                message_count as f64 / elapsed.as_secs_f64();
173              let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
174
175              println!("📊 {:?} Performance:", channel_id);
176              println!(
177                "   Messages: {} ({:.1}/sec)",
178                message_count, messages_per_sec
179              );
180              println!("   Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
181
182              last_report = Instant::now();
183            }
184          }
185          Ok(Err(e)) => {
186            println!("❌ {:?}: Channel error: {}", channel_id, e);
187            break;
188          }
189          Err(_) => {
190            println!("⏱️  {:?}: No messages for 30s", channel_id);
191          }
192        }
193      }
194
195      (channel_id, message_count, tick_count)
196    });
197
198    channel_tasks.push(task);
199  }
200
201  // Monitor overall system health
202  let health_task = tokio::spawn(async move {
203    loop {
204      sleep(Duration::from_secs(15)).await;
205
206      println!("\n🏥 System Health Check:");
207      println!("   All connections active ✅");
208      println!("   Parsers running ✅");
209      println!("   Memory usage optimized ✅");
210    }
211  });
212
213  // Run for demonstration period
214  println!(
215    "\n📈 Monitoring performance for 60 seconds (Ctrl+C to stop early)..."
216  );
217
218  let demo_duration = Duration::from_secs(60);
219  let demo_start = Instant::now();
220
221  // Wait for demo duration or Ctrl+C
222  tokio::select! {
223      _ = sleep(demo_duration) => {
224          println!("\n⏰ Demo duration completed");
225      }
226      _ = tokio::signal::ctrl_c() => {
227          println!("\n🛑 Received Ctrl+C, stopping...");
228      }
229  }
230
231  // Abort monitoring tasks
232  health_task.abort();
233  for task in channel_tasks {
234    task.abort();
235  }
236
237  // Get final statistics
238  println!("\n📊 Final Statistics:");
239
240  if let Ok(stats) = manager.get_stats().await {
241    println!("   Total runtime: {:?}", demo_start.elapsed());
242    println!("   Active connections: {}", stats.active_connections);
243    println!("   Total symbols: {}", stats.total_symbols);
244    println!("   Total messages: {}", stats.total_messages_received);
245    println!("   Total errors: {}", stats.total_errors);
246
247    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
248      println!(
249        "   Connection {}: {} symbols, {} messages, {} errors",
250        i,
251        conn_stats.symbol_count,
252        conn_stats.messages_received,
253        conn_stats.errors_count
254      );
255    }
256  }
257
258  let processor_stats = manager.get_processor_stats().await;
259  println!("\n🔧 Parser Performance:");
260  for (channel_id, stats) in processor_stats {
261    println!(
262      "   {:?}: {:.1} msg/sec, {:?} avg latency",
263      channel_id, stats.messages_per_second, stats.processing_latency_avg
264    );
265  }
266
267  // Stop the manager
268  println!("\n🛑 Stopping manager...");
269  manager.stop().await?;
270
271  println!("🏁 Demo completed successfully!");
272  Ok(())
273}
More examples
Hide additional examples
examples/dynamic_subscription_demo.rs (line 483)
250async fn demo_dynamic_subscription(
251  manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253  println!("\n🎯 True Dynamic Subscription Demo");
254  println!("==================================");
255
256  // Start with a small initial set
257  let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258  let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259  let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260  let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261  let final_batch = vec![257801, 258825]; // Add final symbols
262
263  // Step 1: Initial subscription with small set
264  println!(
265    "\n📊 Step 1: Initial subscription to {} symbols",
266    initial_symbols.len()
267  );
268  println!("Starting with: {:?}", initial_symbols);
269
270  // Start listening for ticks BEFORE subscribing
271  let channels_before_sub = manager.get_all_channels();
272  let mut tick_listeners = Vec::new();
273
274  for (channel_id, mut receiver) in channels_before_sub {
275    let task = tokio::spawn(async move {
276      let start = std::time::Instant::now();
277      let mut tick_count = 0;
278
279      // Listen for initial ticks for 5 seconds
280      while start.elapsed() < Duration::from_secs(5) {
281        match timeout(Duration::from_millis(500), receiver.recv()).await {
282          Ok(Ok(message)) => {
283            if let TickerMessage::Ticks(ticks) = message {
284              tick_count += ticks.len();
285              println!(
286                "🎯 {:?}: Received {} ticks immediately after subscription!",
287                channel_id,
288                ticks.len()
289              );
290
291              // Debug: Show detailed tick information
292              debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294              for (idx, tick) in ticks.iter().enumerate() {
295                println!("  🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296                                    idx + 1,
297                                    tick.instrument_token,
298                                    tick.content.last_price,
299                                    tick.content.volume_traded,
300                                    tick.content.net_change
301                                );
302
303                // Debug: Show tick structure details
304                debug!(
305                  "Tick {} Details for symbol {}:",
306                  idx + 1,
307                  tick.instrument_token
308                );
309                debug!("  - Raw instrument_token: {}", tick.instrument_token);
310                debug!("  - Raw last_price: {:?}", tick.content.last_price);
311                debug!(
312                  "  - Raw volume_traded: {:?}",
313                  tick.content.volume_traded
314                );
315                debug!("  - Raw change: {:?}", tick.content.net_change);
316                debug!("  - Raw last_quantity: <not available>");
317                debug!(
318                  "  - Raw average_price: {:?}",
319                  tick.content.avg_traded_price
320                );
321                debug!(
322                  "  - Raw buy_quantity: {:?}",
323                  tick.content.total_buy_qty
324                );
325                debug!(
326                  "  - Raw sell_quantity: {:?}",
327                  tick.content.total_sell_qty
328                );
329                debug!("  - Raw oi: {:?}", tick.content.oi);
330                debug!("  - Raw mode: {:?}", tick.content.mode);
331
332                if let Some(ohlc) = &tick.content.ohlc {
333                  debug!(
334                    "  - OHLC available: O:{} H:{} L:{} C:{}",
335                    ohlc.open, ohlc.high, ohlc.low, ohlc.close
336                  );
337                }
338
339                if let Some(depth) = &tick.content.depth {
340                  debug!(
341                    "  - Market depth available: {} buy, {} sell",
342                    depth.buy.len(),
343                    depth.sell.len()
344                  );
345                }
346              }
347            }
348          }
349          _ => continue,
350        }
351      }
352      (channel_id, tick_count)
353    });
354    tick_listeners.push(task);
355  }
356
357  // Now subscribe to symbols
358  manager
359    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360    .await?;
361
362  println!("✅ Subscription sent, waiting for initial ticks...");
363
364  // Wait for the listeners to finish
365  for task in tick_listeners {
366    if let Ok((channel_id, count)) = task.await {
367      println!(
368        "📊 {:?}: Received {} total ticks during initial subscription",
369        channel_id, count
370      );
371    }
372  }
373
374  print_distribution(manager, "After initial subscription").await;
375
376  // Step 2: Wait and monitor initial data
377  println!("\n⏳ Step 2: Monitoring initial data flow");
378  monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380  if let Ok(stats) = manager.get_stats().await {
381    println!("✅ Current Statistics:");
382    println!("   Active connections: {}", stats.active_connections);
383    println!("   Total symbols: {}", stats.total_symbols);
384    println!("   Total messages: {}", stats.total_messages_received);
385  }
386
387  // Step 3: Dynamic addition - Batch 1
388  println!(
389    "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390    additional_batch_1.len()
391  );
392  println!("Adding: {:?}", additional_batch_1);
393  manager
394    .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395    .await?;
396
397  // Give time for new tick data to arrive
398  sleep(Duration::from_secs(2)).await;
399
400  print_distribution(manager, "After first dynamic addition").await;
401  monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403  // Step 4: Dynamic addition - Batch 2
404  println!(
405    "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406    additional_batch_2.len()
407  );
408  println!("Adding: {:?}", additional_batch_2);
409  manager
410    .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411    .await?;
412
413  // Give time for new tick data to arrive
414  sleep(Duration::from_secs(2)).await;
415
416  print_distribution(manager, "After second dynamic addition").await;
417  monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419  // Step 5: Dynamic removal
420  println!(
421    "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422    symbols_to_remove.len()
423  );
424  println!("Removing: {:?}", symbols_to_remove);
425  match manager.unsubscribe_symbols(&symbols_to_remove).await {
426    Ok(()) => {
427      print_distribution(manager, "After dynamic removal").await;
428      println!("✅ Dynamic removal successful!");
429    }
430    Err(e) => {
431      println!("⚠️  Dynamic removal failed: {}", e);
432    }
433  }
434  monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436  // Step 6: Final addition and mode change demo
437  println!(
438    "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439    final_batch.len()
440  );
441  println!("Adding: {:?}", final_batch);
442  manager
443    .subscribe_symbols(&final_batch, Some(Mode::LTP))
444    .await?;
445
446  print_distribution(manager, "After final addition").await;
447
448  // Step 7: Mode change demonstration
449  println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450  let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451  println!("Changing {:?} to Full mode", symbols_for_mode_change);
452  match manager
453    .change_mode(&symbols_for_mode_change, Mode::Full)
454    .await
455  {
456    Ok(()) => println!("✅ Mode change successful!"),
457    Err(e) => println!("⚠️  Mode change failed: {}", e),
458  }
459
460  // Step 8: Final statistics and monitoring
461  println!("\n📈 Step 8: Final Statistics and Performance");
462  sleep(Duration::from_secs(3)).await; // Let some data flow
463
464  if let Ok(stats) = manager.get_stats().await {
465    println!("✅ Final Manager Statistics:");
466    println!("   Active connections: {}", stats.active_connections);
467    println!("   Total symbols: {}", stats.total_symbols);
468    println!("   Total messages: {}", stats.total_messages_received);
469    println!("   Total errors: {}", stats.total_errors);
470
471    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472      println!(
473        "   Connection {}: {} symbols, {} messages",
474        i + 1,
475        conn_stats.symbol_count,
476        conn_stats.messages_received
477      );
478    }
479  }
480
481  // Step 9: Show processor performance
482  println!("\n⚡ Step 9: Final Parser Performance");
483  let processor_stats = manager.get_processor_stats().await;
484  for (channel_id, stats) in processor_stats {
485    println!(
486      "   {:?}: {:.1} msg/sec, {:?} avg latency",
487      channel_id, stats.messages_per_second, stats.processing_latency_avg
488    );
489  }
490
491  // Step 10: Monitor live data for a short period
492  println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493  println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495  let channels = manager.get_all_channels();
496  let mut tasks = Vec::new();
497
498  for (channel_id, mut receiver) in channels {
499    let task = tokio::spawn(async move {
500      let mut count = 0;
501      let start = std::time::Instant::now();
502
503      while start.elapsed() < Duration::from_secs(10) {
504        match timeout(Duration::from_secs(2), receiver.recv()).await {
505          Ok(Ok(message)) => {
506            count += 1;
507            if let TickerMessage::Ticks(ticks) = message {
508              println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510              // Debug: Final monitoring with comprehensive details
511              debug!(
512                "Final monitoring - Message #{}, {} ticks on {:?}",
513                count,
514                ticks.len(),
515                channel_id
516              );
517
518              for tick in &ticks {
519                println!(
520                  "  🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521                  tick.instrument_token,
522                  tick.content.last_price,
523                  tick.content.volume_traded,
524                  tick.content.net_change
525                );
526
527                // Debug: Show complete tick analysis
528                debug!(
529                  "Final Debug Analysis for Symbol {}:",
530                  tick.instrument_token
531                );
532                debug!(
533                  "  - Timestamp: Now = {:?}",
534                  std::time::SystemTime::now()
535                );
536                debug!("  - Data completeness check:");
537                debug!("    * Last Price: {:?} (✅)", tick.content.last_price);
538                debug!(
539                  "    * Volume: {:?} ({})",
540                  tick.content.volume_traded,
541                  if tick.content.volume_traded.is_some() {
542                    "✅"
543                  } else {
544                    "❌"
545                  }
546                );
547                debug!(
548                  "    * Change: {:?} ({})",
549                  tick.content.net_change,
550                  if tick.content.net_change.is_some() {
551                    "✅"
552                  } else {
553                    "❌"
554                  }
555                );
556                debug!("    * Mode: {:?} (✅)", tick.content.mode);
557
558                // Validate mode-specific data
559                match tick.content.mode {
560                  Mode::Full => {
561                    debug!("  - Full mode validation:");
562                    debug!(
563                      "    * OHLC: {}",
564                      if tick.content.ohlc.is_some() {
565                        "✅ Present"
566                      } else {
567                        "❌ Missing"
568                      }
569                    );
570                    debug!(
571                      "    * Depth: {}",
572                      if tick.content.depth.is_some() {
573                        "✅ Present"
574                      } else {
575                        "❌ Missing"
576                      }
577                    );
578                    if let Some(ohlc) = &tick.content.ohlc {
579                      debug!(
580                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
581                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
582                      );
583                    }
584                    if let Some(depth) = &tick.content.depth {
585                      debug!(
586                        "    * Depth Levels: {} buy, {} sell",
587                        depth.buy.len(),
588                        depth.sell.len()
589                      );
590                    }
591                  }
592                  Mode::Quote => {
593                    debug!("  - Quote mode validation:");
594                    debug!(
595                      "    * OHLC: {}",
596                      if tick.content.ohlc.is_some() {
597                        "✅ Present"
598                      } else {
599                        "❌ Missing"
600                      }
601                    );
602                    if let Some(ohlc) = &tick.content.ohlc {
603                      debug!(
604                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
605                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
606                      );
607                    }
608                  }
609                  Mode::LTP => {
610                    debug!("  - LTP mode validation: ✅ Basic data only");
611                  }
612                }
613              }
614            } else {
615              println!("📋 {:?}: Non-tick message received", channel_id);
616              debug!("Message type: {:?}", message);
617            }
618          }
619          _ => continue,
620        }
621      }
622      (channel_id, count)
623    });
624    tasks.push(task);
625  }
626
627  // Wait for monitoring to complete
628  for task in tasks {
629    if let Ok((channel_id, count)) = task.await {
630      println!(
631        "📊 {:?}: {} total messages in 10 seconds",
632        channel_id, count
633      );
634    }
635  }
636
637  // Final cleanup demonstration
638  println!("\n🧹 Step 11: Final Cleanup Demonstration");
639  let current_symbols: Vec<u32> = manager
640    .get_symbol_distribution()
641    .values()
642    .flat_map(|symbols| symbols.iter().cloned())
643    .collect();
644
645  println!(
646    "Attempting to unsubscribe from {} remaining symbols...",
647    current_symbols.len()
648  );
649
650  match manager.unsubscribe_symbols(&current_symbols).await {
651    Ok(()) => {
652      print_distribution(manager, "After complete cleanup").await;
653      println!("✅ Complete cleanup successful!");
654    }
655    Err(e) => {
656      println!("⚠️  Cleanup encountered issues: {}", e);
657      println!("💡 This demonstrates the current architecture capabilities");
658    }
659  }
660
661  println!("\n✅ Dynamic Subscription Demo Completed!");
662  println!("🎯 Key Dynamic Features Demonstrated:");
663  println!("   ✅ Runtime symbol addition across multiple batches");
664  println!("   ✅ Runtime symbol removal with proper tracking");
665  println!("   ✅ Dynamic mode changes for existing symbols");
666  println!("   ✅ Real-time capacity distribution and monitoring");
667  println!("   ✅ Independent message processing per connection");
668  println!("   ✅ High-performance parser tasks with statistics");
669  println!("   ✅ Complete subscription lifecycle management");
670
671  Ok(())
672}
Source

pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>>

Get symbol distribution across connections

Examples found in repository?
examples/basic/runtime_subscription_example.rs (line 107)
48async fn runtime_subscription_demo(
49  manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51  println!("\n📡 Runtime Subscription Management Demo");
52  println!("=====================================");
53
54  // 1. Start with initial symbols
55  println!("\n1️⃣ Initial subscription to base symbols");
56  let initial_symbols = vec![256265, 265, 256777];
57  manager
58    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59    .await?;
60  print_current_state(manager, "Initial subscription").await;
61
62  sleep(Duration::from_secs(2)).await;
63
64  // 2. Add more symbols dynamically
65  println!("\n2️⃣ Adding symbols dynamically");
66  let additional_symbols = vec![274441, 260105, 273929];
67  manager
68    .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69    .await?;
70  print_current_state(manager, "After adding symbols").await;
71
72  sleep(Duration::from_secs(2)).await;
73
74  // 3. Change mode for existing symbols
75  println!("\n3️⃣ Changing subscription mode");
76  let symbols_for_mode_change = vec![256265, 265];
77  manager
78    .change_mode(&symbols_for_mode_change, Mode::Full)
79    .await?;
80  print_current_state(manager, "After mode change").await;
81
82  sleep(Duration::from_secs(2)).await;
83
84  // 4. Remove some symbols
85  println!("\n4️⃣ Removing symbols dynamically");
86  let symbols_to_remove = vec![265, 274441];
87  manager.unsubscribe_symbols(&symbols_to_remove).await?;
88  print_current_state(manager, "After removing symbols").await;
89
90  sleep(Duration::from_secs(2)).await;
91
92  // 5. Add different symbols with different modes
93  println!("\n5️⃣ Adding new symbols with Full mode");
94  let full_mode_symbols = vec![257801, 258825];
95  manager
96    .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97    .await?;
98  print_current_state(manager, "After adding Full mode symbols").await;
99
100  // 6. Monitor live data for a short period
101  println!("\n6️⃣ Monitoring live data (5 seconds)");
102  monitor_live_data(manager, 5).await?;
103
104  // 7. Complete cleanup
105  println!("\n7️⃣ Complete cleanup");
106  let all_symbols: Vec<u32> = manager
107    .get_symbol_distribution()
108    .values()
109    .flat_map(|symbols| symbols.iter().cloned())
110    .collect();
111
112  if !all_symbols.is_empty() {
113    manager.unsubscribe_symbols(&all_symbols).await?;
114    print_current_state(manager, "After complete cleanup").await;
115  }
116
117  println!("\n✅ Runtime subscription demo completed!");
118  Ok(())
119}
120
121async fn print_current_state(manager: &KiteTickerManager, context: &str) {
122  println!("\n📊 Current State ({})", context);
123
124  // Show distribution
125  let distribution = manager.get_symbol_distribution();
126  let mut total = 0;
127  for (channel_id, symbols) in &distribution {
128    println!("   {:?}: {} symbols", channel_id, symbols.len());
129    total += symbols.len();
130  }
131  println!("   📈 Total: {} symbols", total);
132
133  // Show stats
134  if let Ok(stats) = manager.get_stats().await {
135    println!(
136      "   📊 Stats: {} connections, {} messages",
137      stats.active_connections, stats.total_messages_received
138    );
139  }
140}
More examples
Hide additional examples
examples/advanced/manager_demo.rs (line 117)
8pub async fn main() -> Result<(), String> {
9  // Initialize logging
10  env_logger::init();
11
12  println!("🚀 KiteTicker Multi-Connection Manager Demo");
13  println!("═══════════════════════════════════════════");
14
15  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
16  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
17
18  if api_key.is_empty() || access_token.is_empty() {
19    println!(
20      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
21    );
22    println!("   This demo will show the manager architecture without live connections");
23    demonstrate_offline_architecture().await;
24    return Ok(());
25  }
26
27  // Create high-performance configuration - RESTORED TO 3 CONNECTIONS
28  let config = KiteManagerConfig {
29    max_symbols_per_connection: 3000,
30    max_connections: 3,            // BACK TO 3 CONNECTIONS!
31    connection_buffer_size: 10000, // High buffer for performance
32    parser_buffer_size: 20000,     // Even higher for parsed messages
33    connection_timeout: Duration::from_secs(30),
34    health_check_interval: Duration::from_secs(5),
35    max_reconnect_attempts: 5,
36    reconnect_delay: Duration::from_secs(2),
37    enable_dedicated_parsers: true, // Use dedicated parser tasks
38    default_mode: Mode::Full,       // Full mode for maximum data
39  };
40
41  println!("🔧 Configuration:");
42  println!("   Max connections: {}", config.max_connections);
43  println!(
44    "   Max symbols per connection: {}",
45    config.max_symbols_per_connection
46  );
47  println!(
48    "   Connection buffer size: {}",
49    config.connection_buffer_size
50  );
51  println!("   Parser buffer size: {}", config.parser_buffer_size);
52  println!("   Dedicated parsers: {}", config.enable_dedicated_parsers);
53  println!();
54
55  // Create and start the manager
56  println!("📡 Starting multi-connection manager...");
57  let start_time = Instant::now();
58
59  let mut manager = KiteTickerManager::new(api_key, access_token, config);
60
61  match timeout(Duration::from_secs(30), manager.start()).await {
62    Ok(Ok(())) => {
63      println!("✅ Manager started in {:?}", start_time.elapsed());
64    }
65    Ok(Err(e)) => {
66      println!("❌ Manager failed to start: {}", e);
67      return Err(e);
68    }
69    Err(_) => {
70      println!("⏱️  Manager startup timeout");
71      return Err("Manager startup timeout".to_string());
72    }
73  }
74
75  // Test with market symbols for proper distribution
76  let symbols = vec![
77    256265, 265, 256777, 274441, 260105, 273929, 260617, 257033, 257289,
78    257545, 257801, 258825, 259081, 259337, 259593, 259849, 260873, 261129,
79    261385, 261641, 261897, 262153, 262409, 262665, 262921, 263177, 263433,
80    263689, 263945, 264457, 264713, 264969, 265225, 265737, 265993, 266249,
81    266505, 266761, 267017, 267273, 267529, 267785, 268041, 268297, 268553,
82    268809, 269065, 269321, 269577, 269833, 270089, 270345, 270601, 270857,
83    271113, 271625, 271881, 272137, 272393, 273417, 273673, 274185, 274697,
84    274953, 275209, 275465, 275721, 275977, 276233, 276489, 276745, 277001,
85    277257, 277513, 277769, 278025, 278281, 278537, 278793, 279049, 279305,
86    279561, 279817, 280073, 280329, 280585, 280841, 281097, 281353, 281865,
87    282121, 282377, 282633, 282889, 283145, 283401, 283657, 283913, 284169,
88    284425, 284681, 284937, 285193, 285449, 285961, 286217, 286473, 286729,
89    286985, 287241, 287497, 287753, 288009, 288265, 288521, 288777, 289033,
90    289289, 289545, 289801, 290057, 290313, 290569, 290825, 291081, 291337,
91    291593, 291849, 292105, 292361, 292617, 292873, 293129, 293385, 293641,
92    293897, 294153, 294409, 294665, 294921, 295177, 295433, 295689, 398345,
93    398601, 398857, 399113, 399369, 399625, 399881, 400137, 400393, 400905,
94    401161, 401673, 401929, 402185, 402441, 402697, 402953, 403209, 403465,
95    403721, 403977, 404233, 404489, 404745, 405001, 405257, 405513, 405769,
96    406025, 406281, 406537, 406793, 407049, 407305, 407561, 407817, 408073,
97    408329, 408585, 408841, 409097, 409353, 409609, 409865, 410121, 410377,
98    410633, 410889, 411145, 411401, 411657, 411913, 412169, 412425, 412681,
99    412937, 413193, 413449, 413705, 413961, 414217, 414473, 414729, 414985,
100  ];
101
102  println!("📊 Subscribing to symbols across connections...");
103
104  // Subscribe to different symbol sets
105  manager
106    .subscribe_symbols(&symbols, Some(Mode::Full))
107    .await?;
108  // manager.subscribe_symbols(&bank_nifty, Some(Mode::Quote)).await?;
109  // manager.subscribe_symbols(&it_stocks, Some(Mode::LTP)).await?;
110
111  println!(
112    "✅ Subscribed to {} total symbols",
113    symbols.len() //  + bank_nifty.len() + it_stocks.len()
114  );
115
116  // Get symbol distribution
117  let distribution = manager.get_symbol_distribution();
118  println!("\n📈 Symbol distribution across connections:");
119  for (channel_id, symbols) in &distribution {
120    println!("   {:?}: {} symbols", channel_id, symbols.len());
121  }
122
123  // Get all output channels
124  let channels = manager.get_all_channels();
125  println!("\n🔀 Created {} output channels", channels.len());
126
127  // Start monitoring each channel
128  let mut channel_tasks = Vec::new();
129
130  for (channel_id, mut receiver) in channels {
131    let task = tokio::spawn(async move {
132      let mut message_count = 0;
133      let mut tick_count = 0;
134      let start_time = Instant::now();
135      let mut last_report = Instant::now();
136
137      println!("🎯 Starting monitoring for {:?}", channel_id);
138
139      loop {
140        match timeout(Duration::from_secs(30), receiver.recv()).await {
141          Ok(Ok(message)) => {
142            message_count += 1;
143
144            match message {
145              TickerMessage::Ticks(ticks) => {
146                tick_count += ticks.len();
147
148                // Show first few ticks for demonstration
149                if message_count <= 3 {
150                  for tick in &ticks {
151                    println!(
152                      "📋 {:?}: Tick {} @ {:?}",
153                      channel_id,
154                      tick.instrument_token,
155                      tick.content.last_price.unwrap_or(0.0)
156                    );
157                  }
158                }
159              }
160              TickerMessage::Error(e) => {
161                println!("⚠️  {:?}: Error: {}", channel_id, e);
162              }
163              _ => {
164                println!("📨 {:?}: Other message", channel_id);
165              }
166            }
167
168            // Report performance every 10 seconds
169            if last_report.elapsed() >= Duration::from_secs(10) {
170              let elapsed = start_time.elapsed();
171              let messages_per_sec =
172                message_count as f64 / elapsed.as_secs_f64();
173              let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
174
175              println!("📊 {:?} Performance:", channel_id);
176              println!(
177                "   Messages: {} ({:.1}/sec)",
178                message_count, messages_per_sec
179              );
180              println!("   Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
181
182              last_report = Instant::now();
183            }
184          }
185          Ok(Err(e)) => {
186            println!("❌ {:?}: Channel error: {}", channel_id, e);
187            break;
188          }
189          Err(_) => {
190            println!("⏱️  {:?}: No messages for 30s", channel_id);
191          }
192        }
193      }
194
195      (channel_id, message_count, tick_count)
196    });
197
198    channel_tasks.push(task);
199  }
200
201  // Monitor overall system health
202  let health_task = tokio::spawn(async move {
203    loop {
204      sleep(Duration::from_secs(15)).await;
205
206      println!("\n🏥 System Health Check:");
207      println!("   All connections active ✅");
208      println!("   Parsers running ✅");
209      println!("   Memory usage optimized ✅");
210    }
211  });
212
213  // Run for demonstration period
214  println!(
215    "\n📈 Monitoring performance for 60 seconds (Ctrl+C to stop early)..."
216  );
217
218  let demo_duration = Duration::from_secs(60);
219  let demo_start = Instant::now();
220
221  // Wait for demo duration or Ctrl+C
222  tokio::select! {
223      _ = sleep(demo_duration) => {
224          println!("\n⏰ Demo duration completed");
225      }
226      _ = tokio::signal::ctrl_c() => {
227          println!("\n🛑 Received Ctrl+C, stopping...");
228      }
229  }
230
231  // Abort monitoring tasks
232  health_task.abort();
233  for task in channel_tasks {
234    task.abort();
235  }
236
237  // Get final statistics
238  println!("\n📊 Final Statistics:");
239
240  if let Ok(stats) = manager.get_stats().await {
241    println!("   Total runtime: {:?}", demo_start.elapsed());
242    println!("   Active connections: {}", stats.active_connections);
243    println!("   Total symbols: {}", stats.total_symbols);
244    println!("   Total messages: {}", stats.total_messages_received);
245    println!("   Total errors: {}", stats.total_errors);
246
247    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
248      println!(
249        "   Connection {}: {} symbols, {} messages, {} errors",
250        i,
251        conn_stats.symbol_count,
252        conn_stats.messages_received,
253        conn_stats.errors_count
254      );
255    }
256  }
257
258  let processor_stats = manager.get_processor_stats().await;
259  println!("\n🔧 Parser Performance:");
260  for (channel_id, stats) in processor_stats {
261    println!(
262      "   {:?}: {:.1} msg/sec, {:?} avg latency",
263      channel_id, stats.messages_per_second, stats.processing_latency_avg
264    );
265  }
266
267  // Stop the manager
268  println!("\n🛑 Stopping manager...");
269  manager.stop().await?;
270
271  println!("🏁 Demo completed successfully!");
272  Ok(())
273}
examples/dynamic_subscription_demo.rs (line 640)
250async fn demo_dynamic_subscription(
251  manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253  println!("\n🎯 True Dynamic Subscription Demo");
254  println!("==================================");
255
256  // Start with a small initial set
257  let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258  let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259  let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260  let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261  let final_batch = vec![257801, 258825]; // Add final symbols
262
263  // Step 1: Initial subscription with small set
264  println!(
265    "\n📊 Step 1: Initial subscription to {} symbols",
266    initial_symbols.len()
267  );
268  println!("Starting with: {:?}", initial_symbols);
269
270  // Start listening for ticks BEFORE subscribing
271  let channels_before_sub = manager.get_all_channels();
272  let mut tick_listeners = Vec::new();
273
274  for (channel_id, mut receiver) in channels_before_sub {
275    let task = tokio::spawn(async move {
276      let start = std::time::Instant::now();
277      let mut tick_count = 0;
278
279      // Listen for initial ticks for 5 seconds
280      while start.elapsed() < Duration::from_secs(5) {
281        match timeout(Duration::from_millis(500), receiver.recv()).await {
282          Ok(Ok(message)) => {
283            if let TickerMessage::Ticks(ticks) = message {
284              tick_count += ticks.len();
285              println!(
286                "🎯 {:?}: Received {} ticks immediately after subscription!",
287                channel_id,
288                ticks.len()
289              );
290
291              // Debug: Show detailed tick information
292              debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294              for (idx, tick) in ticks.iter().enumerate() {
295                println!("  🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296                                    idx + 1,
297                                    tick.instrument_token,
298                                    tick.content.last_price,
299                                    tick.content.volume_traded,
300                                    tick.content.net_change
301                                );
302
303                // Debug: Show tick structure details
304                debug!(
305                  "Tick {} Details for symbol {}:",
306                  idx + 1,
307                  tick.instrument_token
308                );
309                debug!("  - Raw instrument_token: {}", tick.instrument_token);
310                debug!("  - Raw last_price: {:?}", tick.content.last_price);
311                debug!(
312                  "  - Raw volume_traded: {:?}",
313                  tick.content.volume_traded
314                );
315                debug!("  - Raw change: {:?}", tick.content.net_change);
316                debug!("  - Raw last_quantity: <not available>");
317                debug!(
318                  "  - Raw average_price: {:?}",
319                  tick.content.avg_traded_price
320                );
321                debug!(
322                  "  - Raw buy_quantity: {:?}",
323                  tick.content.total_buy_qty
324                );
325                debug!(
326                  "  - Raw sell_quantity: {:?}",
327                  tick.content.total_sell_qty
328                );
329                debug!("  - Raw oi: {:?}", tick.content.oi);
330                debug!("  - Raw mode: {:?}", tick.content.mode);
331
332                if let Some(ohlc) = &tick.content.ohlc {
333                  debug!(
334                    "  - OHLC available: O:{} H:{} L:{} C:{}",
335                    ohlc.open, ohlc.high, ohlc.low, ohlc.close
336                  );
337                }
338
339                if let Some(depth) = &tick.content.depth {
340                  debug!(
341                    "  - Market depth available: {} buy, {} sell",
342                    depth.buy.len(),
343                    depth.sell.len()
344                  );
345                }
346              }
347            }
348          }
349          _ => continue,
350        }
351      }
352      (channel_id, tick_count)
353    });
354    tick_listeners.push(task);
355  }
356
357  // Now subscribe to symbols
358  manager
359    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360    .await?;
361
362  println!("✅ Subscription sent, waiting for initial ticks...");
363
364  // Wait for the listeners to finish
365  for task in tick_listeners {
366    if let Ok((channel_id, count)) = task.await {
367      println!(
368        "📊 {:?}: Received {} total ticks during initial subscription",
369        channel_id, count
370      );
371    }
372  }
373
374  print_distribution(manager, "After initial subscription").await;
375
376  // Step 2: Wait and monitor initial data
377  println!("\n⏳ Step 2: Monitoring initial data flow");
378  monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380  if let Ok(stats) = manager.get_stats().await {
381    println!("✅ Current Statistics:");
382    println!("   Active connections: {}", stats.active_connections);
383    println!("   Total symbols: {}", stats.total_symbols);
384    println!("   Total messages: {}", stats.total_messages_received);
385  }
386
387  // Step 3: Dynamic addition - Batch 1
388  println!(
389    "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390    additional_batch_1.len()
391  );
392  println!("Adding: {:?}", additional_batch_1);
393  manager
394    .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395    .await?;
396
397  // Give time for new tick data to arrive
398  sleep(Duration::from_secs(2)).await;
399
400  print_distribution(manager, "After first dynamic addition").await;
401  monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403  // Step 4: Dynamic addition - Batch 2
404  println!(
405    "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406    additional_batch_2.len()
407  );
408  println!("Adding: {:?}", additional_batch_2);
409  manager
410    .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411    .await?;
412
413  // Give time for new tick data to arrive
414  sleep(Duration::from_secs(2)).await;
415
416  print_distribution(manager, "After second dynamic addition").await;
417  monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419  // Step 5: Dynamic removal
420  println!(
421    "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422    symbols_to_remove.len()
423  );
424  println!("Removing: {:?}", symbols_to_remove);
425  match manager.unsubscribe_symbols(&symbols_to_remove).await {
426    Ok(()) => {
427      print_distribution(manager, "After dynamic removal").await;
428      println!("✅ Dynamic removal successful!");
429    }
430    Err(e) => {
431      println!("⚠️  Dynamic removal failed: {}", e);
432    }
433  }
434  monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436  // Step 6: Final addition and mode change demo
437  println!(
438    "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439    final_batch.len()
440  );
441  println!("Adding: {:?}", final_batch);
442  manager
443    .subscribe_symbols(&final_batch, Some(Mode::LTP))
444    .await?;
445
446  print_distribution(manager, "After final addition").await;
447
448  // Step 7: Mode change demonstration
449  println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450  let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451  println!("Changing {:?} to Full mode", symbols_for_mode_change);
452  match manager
453    .change_mode(&symbols_for_mode_change, Mode::Full)
454    .await
455  {
456    Ok(()) => println!("✅ Mode change successful!"),
457    Err(e) => println!("⚠️  Mode change failed: {}", e),
458  }
459
460  // Step 8: Final statistics and monitoring
461  println!("\n📈 Step 8: Final Statistics and Performance");
462  sleep(Duration::from_secs(3)).await; // Let some data flow
463
464  if let Ok(stats) = manager.get_stats().await {
465    println!("✅ Final Manager Statistics:");
466    println!("   Active connections: {}", stats.active_connections);
467    println!("   Total symbols: {}", stats.total_symbols);
468    println!("   Total messages: {}", stats.total_messages_received);
469    println!("   Total errors: {}", stats.total_errors);
470
471    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472      println!(
473        "   Connection {}: {} symbols, {} messages",
474        i + 1,
475        conn_stats.symbol_count,
476        conn_stats.messages_received
477      );
478    }
479  }
480
481  // Step 9: Show processor performance
482  println!("\n⚡ Step 9: Final Parser Performance");
483  let processor_stats = manager.get_processor_stats().await;
484  for (channel_id, stats) in processor_stats {
485    println!(
486      "   {:?}: {:.1} msg/sec, {:?} avg latency",
487      channel_id, stats.messages_per_second, stats.processing_latency_avg
488    );
489  }
490
491  // Step 10: Monitor live data for a short period
492  println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493  println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495  let channels = manager.get_all_channels();
496  let mut tasks = Vec::new();
497
498  for (channel_id, mut receiver) in channels {
499    let task = tokio::spawn(async move {
500      let mut count = 0;
501      let start = std::time::Instant::now();
502
503      while start.elapsed() < Duration::from_secs(10) {
504        match timeout(Duration::from_secs(2), receiver.recv()).await {
505          Ok(Ok(message)) => {
506            count += 1;
507            if let TickerMessage::Ticks(ticks) = message {
508              println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510              // Debug: Final monitoring with comprehensive details
511              debug!(
512                "Final monitoring - Message #{}, {} ticks on {:?}",
513                count,
514                ticks.len(),
515                channel_id
516              );
517
518              for tick in &ticks {
519                println!(
520                  "  🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521                  tick.instrument_token,
522                  tick.content.last_price,
523                  tick.content.volume_traded,
524                  tick.content.net_change
525                );
526
527                // Debug: Show complete tick analysis
528                debug!(
529                  "Final Debug Analysis for Symbol {}:",
530                  tick.instrument_token
531                );
532                debug!(
533                  "  - Timestamp: Now = {:?}",
534                  std::time::SystemTime::now()
535                );
536                debug!("  - Data completeness check:");
537                debug!("    * Last Price: {:?} (✅)", tick.content.last_price);
538                debug!(
539                  "    * Volume: {:?} ({})",
540                  tick.content.volume_traded,
541                  if tick.content.volume_traded.is_some() {
542                    "✅"
543                  } else {
544                    "❌"
545                  }
546                );
547                debug!(
548                  "    * Change: {:?} ({})",
549                  tick.content.net_change,
550                  if tick.content.net_change.is_some() {
551                    "✅"
552                  } else {
553                    "❌"
554                  }
555                );
556                debug!("    * Mode: {:?} (✅)", tick.content.mode);
557
558                // Validate mode-specific data
559                match tick.content.mode {
560                  Mode::Full => {
561                    debug!("  - Full mode validation:");
562                    debug!(
563                      "    * OHLC: {}",
564                      if tick.content.ohlc.is_some() {
565                        "✅ Present"
566                      } else {
567                        "❌ Missing"
568                      }
569                    );
570                    debug!(
571                      "    * Depth: {}",
572                      if tick.content.depth.is_some() {
573                        "✅ Present"
574                      } else {
575                        "❌ Missing"
576                      }
577                    );
578                    if let Some(ohlc) = &tick.content.ohlc {
579                      debug!(
580                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
581                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
582                      );
583                    }
584                    if let Some(depth) = &tick.content.depth {
585                      debug!(
586                        "    * Depth Levels: {} buy, {} sell",
587                        depth.buy.len(),
588                        depth.sell.len()
589                      );
590                    }
591                  }
592                  Mode::Quote => {
593                    debug!("  - Quote mode validation:");
594                    debug!(
595                      "    * OHLC: {}",
596                      if tick.content.ohlc.is_some() {
597                        "✅ Present"
598                      } else {
599                        "❌ Missing"
600                      }
601                    );
602                    if let Some(ohlc) = &tick.content.ohlc {
603                      debug!(
604                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
605                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
606                      );
607                    }
608                  }
609                  Mode::LTP => {
610                    debug!("  - LTP mode validation: ✅ Basic data only");
611                  }
612                }
613              }
614            } else {
615              println!("📋 {:?}: Non-tick message received", channel_id);
616              debug!("Message type: {:?}", message);
617            }
618          }
619          _ => continue,
620        }
621      }
622      (channel_id, count)
623    });
624    tasks.push(task);
625  }
626
627  // Wait for monitoring to complete
628  for task in tasks {
629    if let Ok((channel_id, count)) = task.await {
630      println!(
631        "📊 {:?}: {} total messages in 10 seconds",
632        channel_id, count
633      );
634    }
635  }
636
637  // Final cleanup demonstration
638  println!("\n🧹 Step 11: Final Cleanup Demonstration");
639  let current_symbols: Vec<u32> = manager
640    .get_symbol_distribution()
641    .values()
642    .flat_map(|symbols| symbols.iter().cloned())
643    .collect();
644
645  println!(
646    "Attempting to unsubscribe from {} remaining symbols...",
647    current_symbols.len()
648  );
649
650  match manager.unsubscribe_symbols(&current_symbols).await {
651    Ok(()) => {
652      print_distribution(manager, "After complete cleanup").await;
653      println!("✅ Complete cleanup successful!");
654    }
655    Err(e) => {
656      println!("⚠️  Cleanup encountered issues: {}", e);
657      println!("💡 This demonstrates the current architecture capabilities");
658    }
659  }
660
661  println!("\n✅ Dynamic Subscription Demo Completed!");
662  println!("🎯 Key Dynamic Features Demonstrated:");
663  println!("   ✅ Runtime symbol addition across multiple batches");
664  println!("   ✅ Runtime symbol removal with proper tracking");
665  println!("   ✅ Dynamic mode changes for existing symbols");
666  println!("   ✅ Real-time capacity distribution and monitoring");
667  println!("   ✅ Independent message processing per connection");
668  println!("   ✅ High-performance parser tasks with statistics");
669  println!("   ✅ Complete subscription lifecycle management");
670
671  Ok(())
672}
673
674async fn print_distribution(manager: &KiteTickerManager, context: &str) {
675  let distribution = manager.get_symbol_distribution();
676  println!("\n📈 Symbol Distribution ({}):", context);
677
678  let mut total = 0;
679  for (channel_id, symbols) in &distribution {
680    println!("   {:?}: {} symbols", channel_id, symbols.len());
681    total += symbols.len();
682  }
683  println!("   Total: {} symbols", total);
684}
Source

pub async fn unsubscribe_symbols( &mut self, symbols: &[u32], ) -> Result<(), String>

Unsubscribe from symbols

Examples found in repository?
examples/basic/runtime_subscription_example.rs (line 87)
48async fn runtime_subscription_demo(
49  manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51  println!("\n📡 Runtime Subscription Management Demo");
52  println!("=====================================");
53
54  // 1. Start with initial symbols
55  println!("\n1️⃣ Initial subscription to base symbols");
56  let initial_symbols = vec![256265, 265, 256777];
57  manager
58    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59    .await?;
60  print_current_state(manager, "Initial subscription").await;
61
62  sleep(Duration::from_secs(2)).await;
63
64  // 2. Add more symbols dynamically
65  println!("\n2️⃣ Adding symbols dynamically");
66  let additional_symbols = vec![274441, 260105, 273929];
67  manager
68    .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69    .await?;
70  print_current_state(manager, "After adding symbols").await;
71
72  sleep(Duration::from_secs(2)).await;
73
74  // 3. Change mode for existing symbols
75  println!("\n3️⃣ Changing subscription mode");
76  let symbols_for_mode_change = vec![256265, 265];
77  manager
78    .change_mode(&symbols_for_mode_change, Mode::Full)
79    .await?;
80  print_current_state(manager, "After mode change").await;
81
82  sleep(Duration::from_secs(2)).await;
83
84  // 4. Remove some symbols
85  println!("\n4️⃣ Removing symbols dynamically");
86  let symbols_to_remove = vec![265, 274441];
87  manager.unsubscribe_symbols(&symbols_to_remove).await?;
88  print_current_state(manager, "After removing symbols").await;
89
90  sleep(Duration::from_secs(2)).await;
91
92  // 5. Add different symbols with different modes
93  println!("\n5️⃣ Adding new symbols with Full mode");
94  let full_mode_symbols = vec![257801, 258825];
95  manager
96    .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97    .await?;
98  print_current_state(manager, "After adding Full mode symbols").await;
99
100  // 6. Monitor live data for a short period
101  println!("\n6️⃣ Monitoring live data (5 seconds)");
102  monitor_live_data(manager, 5).await?;
103
104  // 7. Complete cleanup
105  println!("\n7️⃣ Complete cleanup");
106  let all_symbols: Vec<u32> = manager
107    .get_symbol_distribution()
108    .values()
109    .flat_map(|symbols| symbols.iter().cloned())
110    .collect();
111
112  if !all_symbols.is_empty() {
113    manager.unsubscribe_symbols(&all_symbols).await?;
114    print_current_state(manager, "After complete cleanup").await;
115  }
116
117  println!("\n✅ Runtime subscription demo completed!");
118  Ok(())
119}
More examples
Hide additional examples
examples/dynamic_subscription_demo.rs (line 425)
250async fn demo_dynamic_subscription(
251  manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253  println!("\n🎯 True Dynamic Subscription Demo");
254  println!("==================================");
255
256  // Start with a small initial set
257  let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258  let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259  let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260  let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261  let final_batch = vec![257801, 258825]; // Add final symbols
262
263  // Step 1: Initial subscription with small set
264  println!(
265    "\n📊 Step 1: Initial subscription to {} symbols",
266    initial_symbols.len()
267  );
268  println!("Starting with: {:?}", initial_symbols);
269
270  // Start listening for ticks BEFORE subscribing
271  let channels_before_sub = manager.get_all_channels();
272  let mut tick_listeners = Vec::new();
273
274  for (channel_id, mut receiver) in channels_before_sub {
275    let task = tokio::spawn(async move {
276      let start = std::time::Instant::now();
277      let mut tick_count = 0;
278
279      // Listen for initial ticks for 5 seconds
280      while start.elapsed() < Duration::from_secs(5) {
281        match timeout(Duration::from_millis(500), receiver.recv()).await {
282          Ok(Ok(message)) => {
283            if let TickerMessage::Ticks(ticks) = message {
284              tick_count += ticks.len();
285              println!(
286                "🎯 {:?}: Received {} ticks immediately after subscription!",
287                channel_id,
288                ticks.len()
289              );
290
291              // Debug: Show detailed tick information
292              debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294              for (idx, tick) in ticks.iter().enumerate() {
295                println!("  🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296                                    idx + 1,
297                                    tick.instrument_token,
298                                    tick.content.last_price,
299                                    tick.content.volume_traded,
300                                    tick.content.net_change
301                                );
302
303                // Debug: Show tick structure details
304                debug!(
305                  "Tick {} Details for symbol {}:",
306                  idx + 1,
307                  tick.instrument_token
308                );
309                debug!("  - Raw instrument_token: {}", tick.instrument_token);
310                debug!("  - Raw last_price: {:?}", tick.content.last_price);
311                debug!(
312                  "  - Raw volume_traded: {:?}",
313                  tick.content.volume_traded
314                );
315                debug!("  - Raw change: {:?}", tick.content.net_change);
316                debug!("  - Raw last_quantity: <not available>");
317                debug!(
318                  "  - Raw average_price: {:?}",
319                  tick.content.avg_traded_price
320                );
321                debug!(
322                  "  - Raw buy_quantity: {:?}",
323                  tick.content.total_buy_qty
324                );
325                debug!(
326                  "  - Raw sell_quantity: {:?}",
327                  tick.content.total_sell_qty
328                );
329                debug!("  - Raw oi: {:?}", tick.content.oi);
330                debug!("  - Raw mode: {:?}", tick.content.mode);
331
332                if let Some(ohlc) = &tick.content.ohlc {
333                  debug!(
334                    "  - OHLC available: O:{} H:{} L:{} C:{}",
335                    ohlc.open, ohlc.high, ohlc.low, ohlc.close
336                  );
337                }
338
339                if let Some(depth) = &tick.content.depth {
340                  debug!(
341                    "  - Market depth available: {} buy, {} sell",
342                    depth.buy.len(),
343                    depth.sell.len()
344                  );
345                }
346              }
347            }
348          }
349          _ => continue,
350        }
351      }
352      (channel_id, tick_count)
353    });
354    tick_listeners.push(task);
355  }
356
357  // Now subscribe to symbols
358  manager
359    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360    .await?;
361
362  println!("✅ Subscription sent, waiting for initial ticks...");
363
364  // Wait for the listeners to finish
365  for task in tick_listeners {
366    if let Ok((channel_id, count)) = task.await {
367      println!(
368        "📊 {:?}: Received {} total ticks during initial subscription",
369        channel_id, count
370      );
371    }
372  }
373
374  print_distribution(manager, "After initial subscription").await;
375
376  // Step 2: Wait and monitor initial data
377  println!("\n⏳ Step 2: Monitoring initial data flow");
378  monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380  if let Ok(stats) = manager.get_stats().await {
381    println!("✅ Current Statistics:");
382    println!("   Active connections: {}", stats.active_connections);
383    println!("   Total symbols: {}", stats.total_symbols);
384    println!("   Total messages: {}", stats.total_messages_received);
385  }
386
387  // Step 3: Dynamic addition - Batch 1
388  println!(
389    "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390    additional_batch_1.len()
391  );
392  println!("Adding: {:?}", additional_batch_1);
393  manager
394    .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395    .await?;
396
397  // Give time for new tick data to arrive
398  sleep(Duration::from_secs(2)).await;
399
400  print_distribution(manager, "After first dynamic addition").await;
401  monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403  // Step 4: Dynamic addition - Batch 2
404  println!(
405    "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406    additional_batch_2.len()
407  );
408  println!("Adding: {:?}", additional_batch_2);
409  manager
410    .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411    .await?;
412
413  // Give time for new tick data to arrive
414  sleep(Duration::from_secs(2)).await;
415
416  print_distribution(manager, "After second dynamic addition").await;
417  monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419  // Step 5: Dynamic removal
420  println!(
421    "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422    symbols_to_remove.len()
423  );
424  println!("Removing: {:?}", symbols_to_remove);
425  match manager.unsubscribe_symbols(&symbols_to_remove).await {
426    Ok(()) => {
427      print_distribution(manager, "After dynamic removal").await;
428      println!("✅ Dynamic removal successful!");
429    }
430    Err(e) => {
431      println!("⚠️  Dynamic removal failed: {}", e);
432    }
433  }
434  monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436  // Step 6: Final addition and mode change demo
437  println!(
438    "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439    final_batch.len()
440  );
441  println!("Adding: {:?}", final_batch);
442  manager
443    .subscribe_symbols(&final_batch, Some(Mode::LTP))
444    .await?;
445
446  print_distribution(manager, "After final addition").await;
447
448  // Step 7: Mode change demonstration
449  println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450  let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451  println!("Changing {:?} to Full mode", symbols_for_mode_change);
452  match manager
453    .change_mode(&symbols_for_mode_change, Mode::Full)
454    .await
455  {
456    Ok(()) => println!("✅ Mode change successful!"),
457    Err(e) => println!("⚠️  Mode change failed: {}", e),
458  }
459
460  // Step 8: Final statistics and monitoring
461  println!("\n📈 Step 8: Final Statistics and Performance");
462  sleep(Duration::from_secs(3)).await; // Let some data flow
463
464  if let Ok(stats) = manager.get_stats().await {
465    println!("✅ Final Manager Statistics:");
466    println!("   Active connections: {}", stats.active_connections);
467    println!("   Total symbols: {}", stats.total_symbols);
468    println!("   Total messages: {}", stats.total_messages_received);
469    println!("   Total errors: {}", stats.total_errors);
470
471    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472      println!(
473        "   Connection {}: {} symbols, {} messages",
474        i + 1,
475        conn_stats.symbol_count,
476        conn_stats.messages_received
477      );
478    }
479  }
480
481  // Step 9: Show processor performance
482  println!("\n⚡ Step 9: Final Parser Performance");
483  let processor_stats = manager.get_processor_stats().await;
484  for (channel_id, stats) in processor_stats {
485    println!(
486      "   {:?}: {:.1} msg/sec, {:?} avg latency",
487      channel_id, stats.messages_per_second, stats.processing_latency_avg
488    );
489  }
490
491  // Step 10: Monitor live data for a short period
492  println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493  println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495  let channels = manager.get_all_channels();
496  let mut tasks = Vec::new();
497
498  for (channel_id, mut receiver) in channels {
499    let task = tokio::spawn(async move {
500      let mut count = 0;
501      let start = std::time::Instant::now();
502
503      while start.elapsed() < Duration::from_secs(10) {
504        match timeout(Duration::from_secs(2), receiver.recv()).await {
505          Ok(Ok(message)) => {
506            count += 1;
507            if let TickerMessage::Ticks(ticks) = message {
508              println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510              // Debug: Final monitoring with comprehensive details
511              debug!(
512                "Final monitoring - Message #{}, {} ticks on {:?}",
513                count,
514                ticks.len(),
515                channel_id
516              );
517
518              for tick in &ticks {
519                println!(
520                  "  🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521                  tick.instrument_token,
522                  tick.content.last_price,
523                  tick.content.volume_traded,
524                  tick.content.net_change
525                );
526
527                // Debug: Show complete tick analysis
528                debug!(
529                  "Final Debug Analysis for Symbol {}:",
530                  tick.instrument_token
531                );
532                debug!(
533                  "  - Timestamp: Now = {:?}",
534                  std::time::SystemTime::now()
535                );
536                debug!("  - Data completeness check:");
537                debug!("    * Last Price: {:?} (✅)", tick.content.last_price);
538                debug!(
539                  "    * Volume: {:?} ({})",
540                  tick.content.volume_traded,
541                  if tick.content.volume_traded.is_some() {
542                    "✅"
543                  } else {
544                    "❌"
545                  }
546                );
547                debug!(
548                  "    * Change: {:?} ({})",
549                  tick.content.net_change,
550                  if tick.content.net_change.is_some() {
551                    "✅"
552                  } else {
553                    "❌"
554                  }
555                );
556                debug!("    * Mode: {:?} (✅)", tick.content.mode);
557
558                // Validate mode-specific data
559                match tick.content.mode {
560                  Mode::Full => {
561                    debug!("  - Full mode validation:");
562                    debug!(
563                      "    * OHLC: {}",
564                      if tick.content.ohlc.is_some() {
565                        "✅ Present"
566                      } else {
567                        "❌ Missing"
568                      }
569                    );
570                    debug!(
571                      "    * Depth: {}",
572                      if tick.content.depth.is_some() {
573                        "✅ Present"
574                      } else {
575                        "❌ Missing"
576                      }
577                    );
578                    if let Some(ohlc) = &tick.content.ohlc {
579                      debug!(
580                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
581                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
582                      );
583                    }
584                    if let Some(depth) = &tick.content.depth {
585                      debug!(
586                        "    * Depth Levels: {} buy, {} sell",
587                        depth.buy.len(),
588                        depth.sell.len()
589                      );
590                    }
591                  }
592                  Mode::Quote => {
593                    debug!("  - Quote mode validation:");
594                    debug!(
595                      "    * OHLC: {}",
596                      if tick.content.ohlc.is_some() {
597                        "✅ Present"
598                      } else {
599                        "❌ Missing"
600                      }
601                    );
602                    if let Some(ohlc) = &tick.content.ohlc {
603                      debug!(
604                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
605                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
606                      );
607                    }
608                  }
609                  Mode::LTP => {
610                    debug!("  - LTP mode validation: ✅ Basic data only");
611                  }
612                }
613              }
614            } else {
615              println!("📋 {:?}: Non-tick message received", channel_id);
616              debug!("Message type: {:?}", message);
617            }
618          }
619          _ => continue,
620        }
621      }
622      (channel_id, count)
623    });
624    tasks.push(task);
625  }
626
627  // Wait for monitoring to complete
628  for task in tasks {
629    if let Ok((channel_id, count)) = task.await {
630      println!(
631        "📊 {:?}: {} total messages in 10 seconds",
632        channel_id, count
633      );
634    }
635  }
636
637  // Final cleanup demonstration
638  println!("\n🧹 Step 11: Final Cleanup Demonstration");
639  let current_symbols: Vec<u32> = manager
640    .get_symbol_distribution()
641    .values()
642    .flat_map(|symbols| symbols.iter().cloned())
643    .collect();
644
645  println!(
646    "Attempting to unsubscribe from {} remaining symbols...",
647    current_symbols.len()
648  );
649
650  match manager.unsubscribe_symbols(&current_symbols).await {
651    Ok(()) => {
652      print_distribution(manager, "After complete cleanup").await;
653      println!("✅ Complete cleanup successful!");
654    }
655    Err(e) => {
656      println!("⚠️  Cleanup encountered issues: {}", e);
657      println!("💡 This demonstrates the current architecture capabilities");
658    }
659  }
660
661  println!("\n✅ Dynamic Subscription Demo Completed!");
662  println!("🎯 Key Dynamic Features Demonstrated:");
663  println!("   ✅ Runtime symbol addition across multiple batches");
664  println!("   ✅ Runtime symbol removal with proper tracking");
665  println!("   ✅ Dynamic mode changes for existing symbols");
666  println!("   ✅ Real-time capacity distribution and monitoring");
667  println!("   ✅ Independent message processing per connection");
668  println!("   ✅ High-performance parser tasks with statistics");
669  println!("   ✅ Complete subscription lifecycle management");
670
671  Ok(())
672}
Source

pub async fn change_mode( &mut self, symbols: &[u32], mode: Mode, ) -> Result<(), String>

Dynamically change subscription mode for existing symbols

Examples found in repository?
examples/basic/runtime_subscription_example.rs (line 78)
48async fn runtime_subscription_demo(
49  manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51  println!("\n📡 Runtime Subscription Management Demo");
52  println!("=====================================");
53
54  // 1. Start with initial symbols
55  println!("\n1️⃣ Initial subscription to base symbols");
56  let initial_symbols = vec![256265, 265, 256777];
57  manager
58    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59    .await?;
60  print_current_state(manager, "Initial subscription").await;
61
62  sleep(Duration::from_secs(2)).await;
63
64  // 2. Add more symbols dynamically
65  println!("\n2️⃣ Adding symbols dynamically");
66  let additional_symbols = vec![274441, 260105, 273929];
67  manager
68    .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69    .await?;
70  print_current_state(manager, "After adding symbols").await;
71
72  sleep(Duration::from_secs(2)).await;
73
74  // 3. Change mode for existing symbols
75  println!("\n3️⃣ Changing subscription mode");
76  let symbols_for_mode_change = vec![256265, 265];
77  manager
78    .change_mode(&symbols_for_mode_change, Mode::Full)
79    .await?;
80  print_current_state(manager, "After mode change").await;
81
82  sleep(Duration::from_secs(2)).await;
83
84  // 4. Remove some symbols
85  println!("\n4️⃣ Removing symbols dynamically");
86  let symbols_to_remove = vec![265, 274441];
87  manager.unsubscribe_symbols(&symbols_to_remove).await?;
88  print_current_state(manager, "After removing symbols").await;
89
90  sleep(Duration::from_secs(2)).await;
91
92  // 5. Add different symbols with different modes
93  println!("\n5️⃣ Adding new symbols with Full mode");
94  let full_mode_symbols = vec![257801, 258825];
95  manager
96    .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97    .await?;
98  print_current_state(manager, "After adding Full mode symbols").await;
99
100  // 6. Monitor live data for a short period
101  println!("\n6️⃣ Monitoring live data (5 seconds)");
102  monitor_live_data(manager, 5).await?;
103
104  // 7. Complete cleanup
105  println!("\n7️⃣ Complete cleanup");
106  let all_symbols: Vec<u32> = manager
107    .get_symbol_distribution()
108    .values()
109    .flat_map(|symbols| symbols.iter().cloned())
110    .collect();
111
112  if !all_symbols.is_empty() {
113    manager.unsubscribe_symbols(&all_symbols).await?;
114    print_current_state(manager, "After complete cleanup").await;
115  }
116
117  println!("\n✅ Runtime subscription demo completed!");
118  Ok(())
119}
More examples
Hide additional examples
examples/mode_change_test.rs (line 131)
58async fn test_mode_change_issue(
59  manager: &mut KiteTickerManager,
60) -> Result<(), String> {
61  println!("\n🧪 Testing Mode Change Issue");
62  println!("=============================");
63
64  // Step 1: Subscribe to a symbol with LTP mode
65  let test_symbol = 738561; // Test symbol provided by user
66  println!("\n1️⃣ Subscribing to symbol {} with LTP mode", test_symbol);
67  manager
68    .subscribe_symbols(&[test_symbol], Some(Mode::LTP))
69    .await?;
70
71  // Start monitoring ticks to see the mode
72  let channels = manager.get_all_channels();
73  let mut tick_listeners = Vec::new();
74
75  for (channel_id, mut receiver) in channels {
76    let task = tokio::spawn(async move {
77      let mut ticks_received = 0;
78      let start = Instant::now();
79
80      while start.elapsed() < Duration::from_secs(5) && ticks_received < 3 {
81        match timeout(Duration::from_millis(500), receiver.recv()).await {
82          Ok(Ok(message)) => {
83            if let TickerMessage::Ticks(ticks) = message {
84              for tick in &ticks {
85                if tick.instrument_token == test_symbol {
86                  ticks_received += 1;
87                  println!(
88                    "   📊 Received tick for {}: Mode={:?}, LTP={:?}",
89                    tick.instrument_token,
90                    tick.content.mode,
91                    tick.content.last_price
92                  );
93
94                  // Check if OHLC data is present (should not be for LTP mode)
95                  if tick.content.ohlc.is_some() {
96                    println!(
97                      "   ⚠️  OHLC data present in LTP mode - unexpected!"
98                    );
99                  } else {
100                    println!("   ✅ No OHLC data in LTP mode - correct");
101                  }
102                }
103              }
104            }
105          }
106          _ => continue,
107        }
108      }
109      (channel_id, ticks_received)
110    });
111    tick_listeners.push(task);
112  }
113
114  // Wait for initial ticks
115  for task in tick_listeners {
116    if let Ok((channel_id, count)) = task.await {
117      println!(
118        "   📈 {:?}: Received {} ticks for initial LTP subscription",
119        channel_id, count
120      );
121    }
122  }
123
124  sleep(Duration::from_secs(2)).await;
125
126  // Step 2: Attempt to change mode to Full (this is where the issue should manifest)
127  println!(
128    "\n2️⃣ Attempting to change mode from LTP to Full for symbol {}",
129    test_symbol
130  );
131  match manager.change_mode(&[test_symbol], Mode::Full).await {
132    Ok(()) => {
133      println!("   ✅ Mode change command sent successfully");
134    }
135    Err(e) => {
136      println!("   ❌ Mode change command failed: {}", e);
137      return Err(e);
138    }
139  }
140
141  // Step 3: Monitor ticks after mode change to see if it actually worked
142  println!("\n3️⃣ Monitoring ticks after mode change...");
143  let channels = manager.get_all_channels();
144  let mut post_change_listeners = Vec::new();
145
146  for (channel_id, mut receiver) in channels {
147    let task = tokio::spawn(async move {
148      let mut ticks_received = 0;
149      let mut ohlc_present = 0;
150      let mut depth_present = 0;
151      let start = Instant::now();
152
153      while start.elapsed() < Duration::from_secs(10) && ticks_received < 5 {
154        match timeout(Duration::from_millis(500), receiver.recv()).await {
155          Ok(Ok(message)) => {
156            if let TickerMessage::Ticks(ticks) = message {
157              for tick in &ticks {
158                if tick.instrument_token == test_symbol {
159                  ticks_received += 1;
160                  println!(
161                    "   📊 Post-change tick for {}: Mode={:?}, LTP={:?}",
162                    tick.instrument_token,
163                    tick.content.mode,
164                    tick.content.last_price
165                  );
166
167                  // Check if Full mode data is present
168                  if let Some(ohlc) = &tick.content.ohlc {
169                    ohlc_present += 1;
170                    println!(
171                      "   ✅ OHLC data present: O:{} H:{} L:{} C:{}",
172                      ohlc.open, ohlc.high, ohlc.low, ohlc.close
173                    );
174                  } else {
175                    println!("   ❌ OHLC data missing - mode change may not have worked!");
176                  }
177
178                  if let Some(depth) = &tick.content.depth {
179                    depth_present += 1;
180                    println!(
181                      "   ✅ Market depth present: {} buy, {} sell orders",
182                      depth.buy.len(),
183                      depth.sell.len()
184                    );
185                  } else {
186                    println!("   ❌ Market depth missing - mode change may not have worked!");
187                  }
188
189                  // Log the actual mode reported in the tick
190                  info!("Tick mode reported: {:?}", tick.content.mode);
191                }
192              }
193            }
194          }
195          _ => continue,
196        }
197      }
198      (channel_id, ticks_received, ohlc_present, depth_present)
199    });
200    post_change_listeners.push(task);
201  }
202
203  // Wait for post-change ticks
204  let mut total_ticks = 0;
205  let mut total_ohlc = 0;
206  let mut total_depth = 0;
207
208  for task in post_change_listeners {
209    if let Ok((channel_id, ticks, ohlc, depth)) = task.await {
210      println!(
211        "   📈 {:?}: {} ticks, {} with OHLC, {} with depth",
212        channel_id, ticks, ohlc, depth
213      );
214      total_ticks += ticks;
215      total_ohlc += ohlc;
216      total_depth += depth;
217    }
218  }
219
220  // Step 4: Analyze results
221  println!("\n4️⃣ Test Results Analysis");
222  println!("========================");
223
224  if total_ticks == 0 {
225    println!("   ⚠️  No ticks received after mode change - connection issue?");
226  } else if total_ohlc == 0 && total_depth == 0 {
227    println!("   ❌ ISSUE CONFIRMED: Mode change command sent but Full mode data not received");
228    println!("   💡 This confirms that set_mode() alone doesn't work - need unsubscribe+resubscribe");
229  } else if total_ohlc > 0 && total_depth > 0 {
230    println!("   ✅ Mode change worked - Full mode data received");
231  } else {
232    println!(
233      "   ⚠️  Partial mode change - some Full mode data received but not all"
234    );
235  }
236
237  println!("\n📊 Summary:");
238  println!("   Total ticks received: {}", total_ticks);
239  println!("   Ticks with OHLC data: {}", total_ohlc);
240  println!("   Ticks with market depth: {}", total_depth);
241
242  Ok(())
243}
examples/dynamic_subscription_demo.rs (line 453)
250async fn demo_dynamic_subscription(
251  manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253  println!("\n🎯 True Dynamic Subscription Demo");
254  println!("==================================");
255
256  // Start with a small initial set
257  let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258  let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259  let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260  let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261  let final_batch = vec![257801, 258825]; // Add final symbols
262
263  // Step 1: Initial subscription with small set
264  println!(
265    "\n📊 Step 1: Initial subscription to {} symbols",
266    initial_symbols.len()
267  );
268  println!("Starting with: {:?}", initial_symbols);
269
270  // Start listening for ticks BEFORE subscribing
271  let channels_before_sub = manager.get_all_channels();
272  let mut tick_listeners = Vec::new();
273
274  for (channel_id, mut receiver) in channels_before_sub {
275    let task = tokio::spawn(async move {
276      let start = std::time::Instant::now();
277      let mut tick_count = 0;
278
279      // Listen for initial ticks for 5 seconds
280      while start.elapsed() < Duration::from_secs(5) {
281        match timeout(Duration::from_millis(500), receiver.recv()).await {
282          Ok(Ok(message)) => {
283            if let TickerMessage::Ticks(ticks) = message {
284              tick_count += ticks.len();
285              println!(
286                "🎯 {:?}: Received {} ticks immediately after subscription!",
287                channel_id,
288                ticks.len()
289              );
290
291              // Debug: Show detailed tick information
292              debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294              for (idx, tick) in ticks.iter().enumerate() {
295                println!("  🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296                                    idx + 1,
297                                    tick.instrument_token,
298                                    tick.content.last_price,
299                                    tick.content.volume_traded,
300                                    tick.content.net_change
301                                );
302
303                // Debug: Show tick structure details
304                debug!(
305                  "Tick {} Details for symbol {}:",
306                  idx + 1,
307                  tick.instrument_token
308                );
309                debug!("  - Raw instrument_token: {}", tick.instrument_token);
310                debug!("  - Raw last_price: {:?}", tick.content.last_price);
311                debug!(
312                  "  - Raw volume_traded: {:?}",
313                  tick.content.volume_traded
314                );
315                debug!("  - Raw change: {:?}", tick.content.net_change);
316                debug!("  - Raw last_quantity: <not available>");
317                debug!(
318                  "  - Raw average_price: {:?}",
319                  tick.content.avg_traded_price
320                );
321                debug!(
322                  "  - Raw buy_quantity: {:?}",
323                  tick.content.total_buy_qty
324                );
325                debug!(
326                  "  - Raw sell_quantity: {:?}",
327                  tick.content.total_sell_qty
328                );
329                debug!("  - Raw oi: {:?}", tick.content.oi);
330                debug!("  - Raw mode: {:?}", tick.content.mode);
331
332                if let Some(ohlc) = &tick.content.ohlc {
333                  debug!(
334                    "  - OHLC available: O:{} H:{} L:{} C:{}",
335                    ohlc.open, ohlc.high, ohlc.low, ohlc.close
336                  );
337                }
338
339                if let Some(depth) = &tick.content.depth {
340                  debug!(
341                    "  - Market depth available: {} buy, {} sell",
342                    depth.buy.len(),
343                    depth.sell.len()
344                  );
345                }
346              }
347            }
348          }
349          _ => continue,
350        }
351      }
352      (channel_id, tick_count)
353    });
354    tick_listeners.push(task);
355  }
356
357  // Now subscribe to symbols
358  manager
359    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360    .await?;
361
362  println!("✅ Subscription sent, waiting for initial ticks...");
363
364  // Wait for the listeners to finish
365  for task in tick_listeners {
366    if let Ok((channel_id, count)) = task.await {
367      println!(
368        "📊 {:?}: Received {} total ticks during initial subscription",
369        channel_id, count
370      );
371    }
372  }
373
374  print_distribution(manager, "After initial subscription").await;
375
376  // Step 2: Wait and monitor initial data
377  println!("\n⏳ Step 2: Monitoring initial data flow");
378  monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380  if let Ok(stats) = manager.get_stats().await {
381    println!("✅ Current Statistics:");
382    println!("   Active connections: {}", stats.active_connections);
383    println!("   Total symbols: {}", stats.total_symbols);
384    println!("   Total messages: {}", stats.total_messages_received);
385  }
386
387  // Step 3: Dynamic addition - Batch 1
388  println!(
389    "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390    additional_batch_1.len()
391  );
392  println!("Adding: {:?}", additional_batch_1);
393  manager
394    .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395    .await?;
396
397  // Give time for new tick data to arrive
398  sleep(Duration::from_secs(2)).await;
399
400  print_distribution(manager, "After first dynamic addition").await;
401  monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403  // Step 4: Dynamic addition - Batch 2
404  println!(
405    "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406    additional_batch_2.len()
407  );
408  println!("Adding: {:?}", additional_batch_2);
409  manager
410    .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411    .await?;
412
413  // Give time for new tick data to arrive
414  sleep(Duration::from_secs(2)).await;
415
416  print_distribution(manager, "After second dynamic addition").await;
417  monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419  // Step 5: Dynamic removal
420  println!(
421    "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422    symbols_to_remove.len()
423  );
424  println!("Removing: {:?}", symbols_to_remove);
425  match manager.unsubscribe_symbols(&symbols_to_remove).await {
426    Ok(()) => {
427      print_distribution(manager, "After dynamic removal").await;
428      println!("✅ Dynamic removal successful!");
429    }
430    Err(e) => {
431      println!("⚠️  Dynamic removal failed: {}", e);
432    }
433  }
434  monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436  // Step 6: Final addition and mode change demo
437  println!(
438    "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439    final_batch.len()
440  );
441  println!("Adding: {:?}", final_batch);
442  manager
443    .subscribe_symbols(&final_batch, Some(Mode::LTP))
444    .await?;
445
446  print_distribution(manager, "After final addition").await;
447
448  // Step 7: Mode change demonstration
449  println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450  let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451  println!("Changing {:?} to Full mode", symbols_for_mode_change);
452  match manager
453    .change_mode(&symbols_for_mode_change, Mode::Full)
454    .await
455  {
456    Ok(()) => println!("✅ Mode change successful!"),
457    Err(e) => println!("⚠️  Mode change failed: {}", e),
458  }
459
460  // Step 8: Final statistics and monitoring
461  println!("\n📈 Step 8: Final Statistics and Performance");
462  sleep(Duration::from_secs(3)).await; // Let some data flow
463
464  if let Ok(stats) = manager.get_stats().await {
465    println!("✅ Final Manager Statistics:");
466    println!("   Active connections: {}", stats.active_connections);
467    println!("   Total symbols: {}", stats.total_symbols);
468    println!("   Total messages: {}", stats.total_messages_received);
469    println!("   Total errors: {}", stats.total_errors);
470
471    for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472      println!(
473        "   Connection {}: {} symbols, {} messages",
474        i + 1,
475        conn_stats.symbol_count,
476        conn_stats.messages_received
477      );
478    }
479  }
480
481  // Step 9: Show processor performance
482  println!("\n⚡ Step 9: Final Parser Performance");
483  let processor_stats = manager.get_processor_stats().await;
484  for (channel_id, stats) in processor_stats {
485    println!(
486      "   {:?}: {:.1} msg/sec, {:?} avg latency",
487      channel_id, stats.messages_per_second, stats.processing_latency_avg
488    );
489  }
490
491  // Step 10: Monitor live data for a short period
492  println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493  println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495  let channels = manager.get_all_channels();
496  let mut tasks = Vec::new();
497
498  for (channel_id, mut receiver) in channels {
499    let task = tokio::spawn(async move {
500      let mut count = 0;
501      let start = std::time::Instant::now();
502
503      while start.elapsed() < Duration::from_secs(10) {
504        match timeout(Duration::from_secs(2), receiver.recv()).await {
505          Ok(Ok(message)) => {
506            count += 1;
507            if let TickerMessage::Ticks(ticks) = message {
508              println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510              // Debug: Final monitoring with comprehensive details
511              debug!(
512                "Final monitoring - Message #{}, {} ticks on {:?}",
513                count,
514                ticks.len(),
515                channel_id
516              );
517
518              for tick in &ticks {
519                println!(
520                  "  🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521                  tick.instrument_token,
522                  tick.content.last_price,
523                  tick.content.volume_traded,
524                  tick.content.net_change
525                );
526
527                // Debug: Show complete tick analysis
528                debug!(
529                  "Final Debug Analysis for Symbol {}:",
530                  tick.instrument_token
531                );
532                debug!(
533                  "  - Timestamp: Now = {:?}",
534                  std::time::SystemTime::now()
535                );
536                debug!("  - Data completeness check:");
537                debug!("    * Last Price: {:?} (✅)", tick.content.last_price);
538                debug!(
539                  "    * Volume: {:?} ({})",
540                  tick.content.volume_traded,
541                  if tick.content.volume_traded.is_some() {
542                    "✅"
543                  } else {
544                    "❌"
545                  }
546                );
547                debug!(
548                  "    * Change: {:?} ({})",
549                  tick.content.net_change,
550                  if tick.content.net_change.is_some() {
551                    "✅"
552                  } else {
553                    "❌"
554                  }
555                );
556                debug!("    * Mode: {:?} (✅)", tick.content.mode);
557
558                // Validate mode-specific data
559                match tick.content.mode {
560                  Mode::Full => {
561                    debug!("  - Full mode validation:");
562                    debug!(
563                      "    * OHLC: {}",
564                      if tick.content.ohlc.is_some() {
565                        "✅ Present"
566                      } else {
567                        "❌ Missing"
568                      }
569                    );
570                    debug!(
571                      "    * Depth: {}",
572                      if tick.content.depth.is_some() {
573                        "✅ Present"
574                      } else {
575                        "❌ Missing"
576                      }
577                    );
578                    if let Some(ohlc) = &tick.content.ohlc {
579                      debug!(
580                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
581                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
582                      );
583                    }
584                    if let Some(depth) = &tick.content.depth {
585                      debug!(
586                        "    * Depth Levels: {} buy, {} sell",
587                        depth.buy.len(),
588                        depth.sell.len()
589                      );
590                    }
591                  }
592                  Mode::Quote => {
593                    debug!("  - Quote mode validation:");
594                    debug!(
595                      "    * OHLC: {}",
596                      if tick.content.ohlc.is_some() {
597                        "✅ Present"
598                      } else {
599                        "❌ Missing"
600                      }
601                    );
602                    if let Some(ohlc) = &tick.content.ohlc {
603                      debug!(
604                        "    * OHLC Values: O:{} H:{} L:{} C:{}",
605                        ohlc.open, ohlc.high, ohlc.low, ohlc.close
606                      );
607                    }
608                  }
609                  Mode::LTP => {
610                    debug!("  - LTP mode validation: ✅ Basic data only");
611                  }
612                }
613              }
614            } else {
615              println!("📋 {:?}: Non-tick message received", channel_id);
616              debug!("Message type: {:?}", message);
617            }
618          }
619          _ => continue,
620        }
621      }
622      (channel_id, count)
623    });
624    tasks.push(task);
625  }
626
627  // Wait for monitoring to complete
628  for task in tasks {
629    if let Ok((channel_id, count)) = task.await {
630      println!(
631        "📊 {:?}: {} total messages in 10 seconds",
632        channel_id, count
633      );
634    }
635  }
636
637  // Final cleanup demonstration
638  println!("\n🧹 Step 11: Final Cleanup Demonstration");
639  let current_symbols: Vec<u32> = manager
640    .get_symbol_distribution()
641    .values()
642    .flat_map(|symbols| symbols.iter().cloned())
643    .collect();
644
645  println!(
646    "Attempting to unsubscribe from {} remaining symbols...",
647    current_symbols.len()
648  );
649
650  match manager.unsubscribe_symbols(&current_symbols).await {
651    Ok(()) => {
652      print_distribution(manager, "After complete cleanup").await;
653      println!("✅ Complete cleanup successful!");
654    }
655    Err(e) => {
656      println!("⚠️  Cleanup encountered issues: {}", e);
657      println!("💡 This demonstrates the current architecture capabilities");
658    }
659  }
660
661  println!("\n✅ Dynamic Subscription Demo Completed!");
662  println!("🎯 Key Dynamic Features Demonstrated:");
663  println!("   ✅ Runtime symbol addition across multiple batches");
664  println!("   ✅ Runtime symbol removal with proper tracking");
665  println!("   ✅ Dynamic mode changes for existing symbols");
666  println!("   ✅ Real-time capacity distribution and monitoring");
667  println!("   ✅ Independent message processing per connection");
668  println!("   ✅ High-performance parser tasks with statistics");
669  println!("   ✅ Complete subscription lifecycle management");
670
671  Ok(())
672}
Source

pub async fn stop(&mut self) -> Result<(), String>

Stop the manager and all connections

Examples found in repository?
examples/basic/runtime_subscription_example.rs (line 44)
8async fn main() -> Result<(), String> {
9  // Initialize with your API credentials
10  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
11  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
12
13  if api_key.is_empty() || access_token.is_empty() {
14    println!(
15      "⚠️ Please set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables"
16    );
17    return Err("Missing API credentials".to_string());
18  }
19
20  // Create configuration
21  let config = KiteManagerConfig {
22    max_symbols_per_connection: 3000,
23    max_connections: 3,
24    connection_buffer_size: 5000,
25    parser_buffer_size: 10000,
26    connection_timeout: Duration::from_secs(30),
27    health_check_interval: Duration::from_secs(5),
28    max_reconnect_attempts: 5,
29    reconnect_delay: Duration::from_secs(2),
30    enable_dedicated_parsers: true,
31    default_mode: Mode::LTP,
32  };
33
34  // Start the manager
35  let mut manager = KiteTickerManager::new(api_key, access_token, config);
36  manager.start().await?;
37
38  println!("🚀 KiteTicker Manager started!");
39
40  // Example of runtime subscription management
41  runtime_subscription_demo(&mut manager).await?;
42
43  // Stop the manager
44  manager.stop().await?;
45  Ok(())
46}
More examples
Hide additional examples
examples/mode_change_test.rs (line 52)
9async fn main() -> Result<(), String> {
10  // Initialize logging
11  env_logger::init();
12
13  println!("🔄 KiteTicker Mode Change Test");
14  println!("═══════════════════════════════");
15
16  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
17  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
18
19  if api_key.is_empty() || access_token.is_empty() {
20    println!(
21      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
22    );
23    demonstrate_mode_change_issue().await;
24    return Ok(());
25  }
26
27  // Create configuration
28  let config = KiteManagerConfig {
29    max_symbols_per_connection: 3000,
30    max_connections: 3,
31    connection_buffer_size: 5000,
32    parser_buffer_size: 10000,
33    connection_timeout: Duration::from_secs(30),
34    health_check_interval: Duration::from_secs(5),
35    max_reconnect_attempts: 5,
36    reconnect_delay: Duration::from_secs(2),
37    enable_dedicated_parsers: true,
38    default_mode: Mode::LTP,
39  };
40
41  println!("🔧 Starting manager...");
42  let mut manager = KiteTickerManager::new(api_key, access_token, config);
43
44  manager.start().await?;
45  println!("✅ Manager started successfully");
46
47  // Test the mode change issue
48  test_mode_change_issue(&mut manager).await?;
49
50  // Stop the manager
51  println!("\n🛑 Stopping manager...");
52  manager.stop().await?;
53
54  println!("🏁 Mode change test completed!");
55  Ok(())
56}
examples/performance/market_scanner.rs (line 83)
12async fn main() -> Result<(), String> {
13  env_logger::init();
14
15  println!("🔍 Market Scanner Example");
16  println!("=========================");
17
18  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21  if api_key.is_empty() || access_token.is_empty() {
22    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23  }
24
25  // High-performance configuration for scanning
26  let config = KiteManagerConfig {
27    max_connections: 3,
28    max_symbols_per_connection: 3000,
29    connection_buffer_size: 20000, // Large buffer for high volume
30    parser_buffer_size: 50000,     // Even larger for parsed data
31    enable_dedicated_parsers: true,
32    default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33    ..Default::default()
34  };
35
36  let mut manager = KiteTickerManager::new(api_key, access_token, config);
37  manager.start().await?;
38
39  // Large symbol set for market scanning
40  let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42  println!(
43    "📊 Scanning {} symbols across {} connections",
44    large_symbol_set.len(),
45    3
46  );
47
48  let start_time = Instant::now();
49  manager
50    .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51    .await?;
52  println!(
53    "✅ Subscribed to {} symbols in {:?}",
54    large_symbol_set.len(),
55    start_time.elapsed()
56  );
57
58  // Get all channels and start parallel processing
59  let channels = manager.get_all_channels();
60  let mut handles = Vec::new();
61
62  for (channel_id, mut receiver) in channels {
63    let handle = tokio::spawn(async move {
64      let mut scanner = MarketScanner::new(channel_id);
65
66      while let Ok(message) = receiver.recv().await {
67        if let TickerMessage::Ticks(ticks) = message {
68          scanner.process_ticks(ticks).await;
69        }
70      }
71
72      scanner.print_summary();
73    });
74
75    handles.push(handle);
76  }
77
78  // Run scanner for specified duration
79  println!("🔄 Scanning market for 30 seconds...");
80  sleep(Duration::from_secs(30)).await;
81
82  // Stop manager
83  manager.stop().await?;
84
85  // Wait for all scanners to complete
86  for handle in handles {
87    let _ = handle.await;
88  }
89
90  println!("🏁 Market scanning completed");
91  Ok(())
92}
examples/performance/high_frequency.rs (line 88)
14async fn main() -> Result<(), String> {
15  env_logger::init();
16
17  println!("⚡ High-Frequency Trading Example");
18  println!("=================================");
19
20  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23  if api_key.is_empty() || access_token.is_empty() {
24    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25  }
26
27  // Ultra-high performance configuration for HFT
28  let config = KiteManagerConfig {
29    max_connections: 3,
30    max_symbols_per_connection: 1000, // Focused symbol set for HFT
31    connection_buffer_size: 100000,   // Maximum possible buffer
32    parser_buffer_size: 200000,       // Ultra-large parser buffer
33    enable_dedicated_parsers: true,
34    default_mode: Mode::Full, // Full market depth data
35    ..Default::default()
36  };
37
38  println!("🚀 Optimized for:");
39  println!("   Sub-microsecond latency");
40  println!("   Maximum throughput");
41  println!("   Real-time order book analysis");
42  println!("   Ultra-low garbage collection");
43
44  let mut manager = KiteTickerManager::new(api_key, access_token, config);
45  manager.start().await?;
46
47  // Focus on highly liquid instruments for HFT
48  let hft_symbols = vec![
49    256265, // NIFTY 50 - highly liquid
50    408065, // HDFC Bank - large volume
51    738561, // Reliance - most traded
52    884737, // ICICI Bank
53    341249, // TCS
54    492033, // ITC
55    779521, // Kotak Bank
56  ];
57
58  println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59  manager
60    .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61    .await?;
62
63  // Setup high-frequency processing engines
64  let channels = manager.get_all_channels();
65  let mut hft_engines = Vec::new();
66
67  for (channel_id, mut receiver) in channels {
68    let engine = tokio::spawn(async move {
69      let mut hft_processor = HFTProcessor::new(channel_id);
70
71      while let Ok(message) = receiver.recv().await {
72        if let TickerMessage::Ticks(ticks) = message {
73          hft_processor.process_ultra_fast(ticks).await;
74        }
75      }
76
77      hft_processor.print_performance_metrics();
78    });
79
80    hft_engines.push(engine);
81  }
82
83  // Run HFT simulation
84  println!("⚡ Starting high-frequency processing...");
85  sleep(Duration::from_secs(60)).await;
86
87  // Stop processing
88  manager.stop().await?;
89
90  // Wait for all engines to complete
91  for engine in hft_engines {
92    let _ = engine.await;
93  }
94
95  println!("🏁 High-frequency trading simulation completed");
96  Ok(())
97}
examples/simple_tick_test.rs (line 107)
8async fn main() -> Result<(), String> {
9  env_logger::init();
10
11  println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12  println!("═══════════════════════════════════════════");
13
14  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17  if api_key.is_empty() || access_token.is_empty() {
18    println!(
19      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20    );
21    return Ok(());
22  }
23
24  let config = KiteManagerConfig {
25    max_symbols_per_connection: 3000,
26    max_connections: 1, // Use only 1 connection for simplicity
27    connection_buffer_size: 5000,
28    parser_buffer_size: 10000,
29    connection_timeout: Duration::from_secs(30),
30    health_check_interval: Duration::from_secs(5),
31    max_reconnect_attempts: 5,
32    reconnect_delay: Duration::from_secs(2),
33    enable_dedicated_parsers: true,
34    default_mode: Mode::LTP,
35  };
36
37  let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39  // Start manager
40  println!("📡 Starting manager...");
41  manager.start().await?;
42  println!("✅ Manager started");
43
44  // Get channel BEFORE subscribing
45  println!("🎯 Getting channels...");
46  let channels = manager.get_all_channels();
47  let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49  // Start listener in background
50  let listener_handle = tokio::spawn(async move {
51    println!("👂 Listener started for {:?}", channel_id);
52
53    let mut tick_count = 0;
54    loop {
55      match timeout(Duration::from_secs(30), receiver.recv()).await {
56        Ok(Ok(message)) => match message {
57          TickerMessage::Ticks(ticks) => {
58            tick_count += ticks.len();
59            println!(
60              "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61              channel_id,
62              ticks.len(),
63              tick_count
64            );
65
66            for tick in &ticks {
67              println!("🔹 FULL TICK DEBUG:");
68              println!("{:#?}", tick);
69              println!("─────────────────────────────────────");
70            }
71          }
72          TickerMessage::Error(err) => {
73            println!("❌ Error: {}", err);
74          }
75          _ => {
76            println!("📨 Other message: {:?}", message);
77          }
78        },
79        Ok(Err(e)) => {
80          println!("❌ Receive error: {}", e);
81          break;
82        }
83        Err(_) => {
84          println!("⏱️  Listener timeout");
85          break;
86        }
87      }
88    }
89    println!("👂 Listener stopped");
90  });
91
92  // Give listener time to start
93  tokio::time::sleep(Duration::from_millis(200)).await;
94
95  // Now subscribe to a symbol
96  println!("📊 Subscribing to symbol 256265...");
97  manager
98    .subscribe_symbols(&[128083204], Some(Mode::Full))
99    .await?;
100  println!("✅ Subscription sent");
101
102  // Wait for ticks
103  println!("⏳ Waiting 10 seconds for ticks...");
104  tokio::time::sleep(Duration::from_secs(10)).await;
105
106  // Stop
107  manager.stop().await?;
108  listener_handle.abort();
109
110  println!("🏁 Test completed");
111  Ok(())
112}
examples/basic/portfolio_monitor.rs (line 147)
21async fn main() -> Result<(), String> {
22  // Initialize logging
23  env_logger::init();
24
25  println!("📊 Portfolio Monitor Example");
26  println!("============================");
27
28  // Get credentials
29  let api_key = std::env::var("KITE_API_KEY")
30    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31  let access_token = std::env::var("KITE_ACCESS_TOKEN")
32    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34  // Define portfolio
35  let portfolio = vec![
36    (256265, "NIFTY 50"),
37    (408065, "HDFC Bank"),
38    (738561, "Reliance"),
39    (5633, "TCS"),
40    (884737, "Asian Paints"),
41  ];
42
43  println!("📈 Monitoring Portfolio:");
44  for (symbol, name) in &portfolio {
45    println!("   • {} ({})", name, symbol);
46  }
47  println!();
48
49  // Create manager with optimized configuration for portfolio monitoring
50  let config = KiteManagerConfig {
51    max_connections: 1, // Single connection for small portfolio
52    max_symbols_per_connection: 100,
53    connection_buffer_size: 2000,
54    parser_buffer_size: 5000,
55    enable_dedicated_parsers: true,
56    default_mode: Mode::Quote, // Quote mode for price + volume
57    ..Default::default()
58  };
59
60  // Start manager
61  let mut manager = KiteTickerManager::new(api_key, access_token, config);
62  manager.start().await?;
63
64  // Subscribe to portfolio symbols
65  let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66  manager
67    .subscribe_symbols(&symbols, Some(Mode::Quote))
68    .await?;
69
70  println!("✅ Subscribed to {} symbols", symbols.len());
71
72  // Create portfolio tracking
73  let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74  for (symbol, name) in portfolio {
75    portfolio_data.insert(
76      symbol,
77      StockInfo {
78        symbol,
79        name: name.to_string(),
80        current_price: 0.0,
81        volume: 0,
82        last_update: Instant::now(),
83      },
84    );
85  }
86
87  // Get data channels
88  let channels = manager.get_all_channels();
89
90  // Start data processing
91  for (channel_id, mut receiver) in channels {
92    let mut portfolio_clone = portfolio_data.clone();
93
94    tokio::spawn(async move {
95      println!("📡 Started monitoring channel {:?}", channel_id);
96
97      while let Ok(message) = receiver.recv().await {
98        if let TickerMessage::Ticks(ticks) = message {
99          for tick in ticks {
100            if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101            {
102              // Update stock data
103              if let Some(price) = tick.content.last_price {
104                stock.current_price = price;
105              }
106              if let Some(volume) = tick.content.volume_traded {
107                stock.volume = volume;
108              }
109              stock.last_update = Instant::now();
110
111              // Display update
112              println!(
113                "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114                stock.name,
115                stock.symbol,
116                stock.current_price,
117                stock.volume,
118                format_time_ago(stock.last_update)
119              );
120            }
121          }
122        }
123      }
124    });
125  }
126
127  // Periodic portfolio summary
128  let summary_portfolio = portfolio_data.clone();
129  tokio::spawn(async move {
130    let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132    loop {
133      interval.tick().await;
134      print_portfolio_summary(&summary_portfolio);
135    }
136  });
137
138  // Monitor for demo duration
139  println!("📊 Monitoring portfolio for 2 minutes...\n");
140  sleep(Duration::from_secs(120)).await;
141
142  // Final summary
143  println!("\n🏁 Demo completed!");
144  print_portfolio_summary(&portfolio_data);
145
146  // Stop manager
147  manager.stop().await?;
148  println!("✅ Portfolio monitor stopped");
149
150  Ok(())
151}

Trait Implementations§

Source§

impl Debug for KiteTickerManager

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> ErasedDestructor for T
where T: 'static,