pub struct KiteTickerManager { /* private fields */ }Expand description
High-performance multi-connection WebSocket manager for Kite ticker data
This manager creates 3 independent WebSocket connections and distributes symbols across them using round-robin allocation. Each connection has its own dedicated parser task for maximum performance.
Implementations§
Source§impl KiteTickerManager
impl KiteTickerManager
Sourcepub fn new(
api_key: String,
access_token: String,
config: KiteManagerConfig,
) -> Self
pub fn new( api_key: String, access_token: String, config: KiteManagerConfig, ) -> Self
Creates a new KiteTickerManager instance with the specified configuration
This initializes the manager with the provided API credentials and configuration,
but does not start any connections. Call start() to begin operation.
§Arguments
api_key- Your Kite Connect API keyaccess_token- Valid access token from Kite Connectconfig- Manager configuration settings
§Example
use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode};
let config = KiteManagerConfig {
max_connections: 3,
max_symbols_per_connection: 3000,
enable_dedicated_parsers: true,
default_mode: Mode::LTP,
..Default::default()
};
let manager = KiteTickerManager::new(
"your_api_key".to_string(),
"your_access_token".to_string(),
config,
);Examples found in repository?
8async fn main() -> Result<(), String> {
9 // Initialize with your API credentials
10 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
11 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
12
13 if api_key.is_empty() || access_token.is_empty() {
14 println!(
15 "⚠️ Please set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables"
16 );
17 return Err("Missing API credentials".to_string());
18 }
19
20 // Create configuration
21 let config = KiteManagerConfig {
22 max_symbols_per_connection: 3000,
23 max_connections: 3,
24 connection_buffer_size: 5000,
25 parser_buffer_size: 10000,
26 connection_timeout: Duration::from_secs(30),
27 health_check_interval: Duration::from_secs(5),
28 max_reconnect_attempts: 5,
29 reconnect_delay: Duration::from_secs(2),
30 enable_dedicated_parsers: true,
31 default_mode: Mode::LTP,
32 };
33
34 // Start the manager
35 let mut manager = KiteTickerManager::new(api_key, access_token, config);
36 manager.start().await?;
37
38 println!("🚀 KiteTicker Manager started!");
39
40 // Example of runtime subscription management
41 runtime_subscription_demo(&mut manager).await?;
42
43 // Stop the manager
44 manager.stop().await?;
45 Ok(())
46}More examples
9async fn main() -> Result<(), String> {
10 // Initialize logging
11 env_logger::init();
12
13 println!("🔄 KiteTicker Mode Change Test");
14 println!("═══════════════════════════════");
15
16 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
17 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
18
19 if api_key.is_empty() || access_token.is_empty() {
20 println!(
21 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
22 );
23 demonstrate_mode_change_issue().await;
24 return Ok(());
25 }
26
27 // Create configuration
28 let config = KiteManagerConfig {
29 max_symbols_per_connection: 3000,
30 max_connections: 3,
31 connection_buffer_size: 5000,
32 parser_buffer_size: 10000,
33 connection_timeout: Duration::from_secs(30),
34 health_check_interval: Duration::from_secs(5),
35 max_reconnect_attempts: 5,
36 reconnect_delay: Duration::from_secs(2),
37 enable_dedicated_parsers: true,
38 default_mode: Mode::LTP,
39 };
40
41 println!("🔧 Starting manager...");
42 let mut manager = KiteTickerManager::new(api_key, access_token, config);
43
44 manager.start().await?;
45 println!("✅ Manager started successfully");
46
47 // Test the mode change issue
48 test_mode_change_issue(&mut manager).await?;
49
50 // Stop the manager
51 println!("\n🛑 Stopping manager...");
52 manager.stop().await?;
53
54 println!("🏁 Mode change test completed!");
55 Ok(())
56}12async fn main() -> Result<(), String> {
13 env_logger::init();
14
15 println!("🔍 Market Scanner Example");
16 println!("=========================");
17
18 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21 if api_key.is_empty() || access_token.is_empty() {
22 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23 }
24
25 // High-performance configuration for scanning
26 let config = KiteManagerConfig {
27 max_connections: 3,
28 max_symbols_per_connection: 3000,
29 connection_buffer_size: 20000, // Large buffer for high volume
30 parser_buffer_size: 50000, // Even larger for parsed data
31 enable_dedicated_parsers: true,
32 default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33 ..Default::default()
34 };
35
36 let mut manager = KiteTickerManager::new(api_key, access_token, config);
37 manager.start().await?;
38
39 // Large symbol set for market scanning
40 let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42 println!(
43 "📊 Scanning {} symbols across {} connections",
44 large_symbol_set.len(),
45 3
46 );
47
48 let start_time = Instant::now();
49 manager
50 .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51 .await?;
52 println!(
53 "✅ Subscribed to {} symbols in {:?}",
54 large_symbol_set.len(),
55 start_time.elapsed()
56 );
57
58 // Get all channels and start parallel processing
59 let channels = manager.get_all_channels();
60 let mut handles = Vec::new();
61
62 for (channel_id, mut receiver) in channels {
63 let handle = tokio::spawn(async move {
64 let mut scanner = MarketScanner::new(channel_id);
65
66 while let Ok(message) = receiver.recv().await {
67 if let TickerMessage::Ticks(ticks) = message {
68 scanner.process_ticks(ticks).await;
69 }
70 }
71
72 scanner.print_summary();
73 });
74
75 handles.push(handle);
76 }
77
78 // Run scanner for specified duration
79 println!("🔄 Scanning market for 30 seconds...");
80 sleep(Duration::from_secs(30)).await;
81
82 // Stop manager
83 manager.stop().await?;
84
85 // Wait for all scanners to complete
86 for handle in handles {
87 let _ = handle.await;
88 }
89
90 println!("🏁 Market scanning completed");
91 Ok(())
92}14async fn main() -> Result<(), String> {
15 env_logger::init();
16
17 println!("⚡ High-Frequency Trading Example");
18 println!("=================================");
19
20 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23 if api_key.is_empty() || access_token.is_empty() {
24 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25 }
26
27 // Ultra-high performance configuration for HFT
28 let config = KiteManagerConfig {
29 max_connections: 3,
30 max_symbols_per_connection: 1000, // Focused symbol set for HFT
31 connection_buffer_size: 100000, // Maximum possible buffer
32 parser_buffer_size: 200000, // Ultra-large parser buffer
33 enable_dedicated_parsers: true,
34 default_mode: Mode::Full, // Full market depth data
35 ..Default::default()
36 };
37
38 println!("🚀 Optimized for:");
39 println!(" Sub-microsecond latency");
40 println!(" Maximum throughput");
41 println!(" Real-time order book analysis");
42 println!(" Ultra-low garbage collection");
43
44 let mut manager = KiteTickerManager::new(api_key, access_token, config);
45 manager.start().await?;
46
47 // Focus on highly liquid instruments for HFT
48 let hft_symbols = vec![
49 256265, // NIFTY 50 - highly liquid
50 408065, // HDFC Bank - large volume
51 738561, // Reliance - most traded
52 884737, // ICICI Bank
53 341249, // TCS
54 492033, // ITC
55 779521, // Kotak Bank
56 ];
57
58 println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59 manager
60 .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61 .await?;
62
63 // Setup high-frequency processing engines
64 let channels = manager.get_all_channels();
65 let mut hft_engines = Vec::new();
66
67 for (channel_id, mut receiver) in channels {
68 let engine = tokio::spawn(async move {
69 let mut hft_processor = HFTProcessor::new(channel_id);
70
71 while let Ok(message) = receiver.recv().await {
72 if let TickerMessage::Ticks(ticks) = message {
73 hft_processor.process_ultra_fast(ticks).await;
74 }
75 }
76
77 hft_processor.print_performance_metrics();
78 });
79
80 hft_engines.push(engine);
81 }
82
83 // Run HFT simulation
84 println!("⚡ Starting high-frequency processing...");
85 sleep(Duration::from_secs(60)).await;
86
87 // Stop processing
88 manager.stop().await?;
89
90 // Wait for all engines to complete
91 for engine in hft_engines {
92 let _ = engine.await;
93 }
94
95 println!("🏁 High-frequency trading simulation completed");
96 Ok(())
97}8async fn main() -> Result<(), String> {
9 env_logger::init();
10
11 println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12 println!("═══════════════════════════════════════════");
13
14 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17 if api_key.is_empty() || access_token.is_empty() {
18 println!(
19 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20 );
21 return Ok(());
22 }
23
24 let config = KiteManagerConfig {
25 max_symbols_per_connection: 3000,
26 max_connections: 1, // Use only 1 connection for simplicity
27 connection_buffer_size: 5000,
28 parser_buffer_size: 10000,
29 connection_timeout: Duration::from_secs(30),
30 health_check_interval: Duration::from_secs(5),
31 max_reconnect_attempts: 5,
32 reconnect_delay: Duration::from_secs(2),
33 enable_dedicated_parsers: true,
34 default_mode: Mode::LTP,
35 };
36
37 let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39 // Start manager
40 println!("📡 Starting manager...");
41 manager.start().await?;
42 println!("✅ Manager started");
43
44 // Get channel BEFORE subscribing
45 println!("🎯 Getting channels...");
46 let channels = manager.get_all_channels();
47 let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49 // Start listener in background
50 let listener_handle = tokio::spawn(async move {
51 println!("👂 Listener started for {:?}", channel_id);
52
53 let mut tick_count = 0;
54 loop {
55 match timeout(Duration::from_secs(30), receiver.recv()).await {
56 Ok(Ok(message)) => match message {
57 TickerMessage::Ticks(ticks) => {
58 tick_count += ticks.len();
59 println!(
60 "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61 channel_id,
62 ticks.len(),
63 tick_count
64 );
65
66 for tick in &ticks {
67 println!("🔹 FULL TICK DEBUG:");
68 println!("{:#?}", tick);
69 println!("─────────────────────────────────────");
70 }
71 }
72 TickerMessage::Error(err) => {
73 println!("❌ Error: {}", err);
74 }
75 _ => {
76 println!("📨 Other message: {:?}", message);
77 }
78 },
79 Ok(Err(e)) => {
80 println!("❌ Receive error: {}", e);
81 break;
82 }
83 Err(_) => {
84 println!("⏱️ Listener timeout");
85 break;
86 }
87 }
88 }
89 println!("👂 Listener stopped");
90 });
91
92 // Give listener time to start
93 tokio::time::sleep(Duration::from_millis(200)).await;
94
95 // Now subscribe to a symbol
96 println!("📊 Subscribing to symbol 256265...");
97 manager
98 .subscribe_symbols(&[128083204], Some(Mode::Full))
99 .await?;
100 println!("✅ Subscription sent");
101
102 // Wait for ticks
103 println!("⏳ Waiting 10 seconds for ticks...");
104 tokio::time::sleep(Duration::from_secs(10)).await;
105
106 // Stop
107 manager.stop().await?;
108 listener_handle.abort();
109
110 println!("🏁 Test completed");
111 Ok(())
112}21async fn main() -> Result<(), String> {
22 // Initialize logging
23 env_logger::init();
24
25 println!("📊 Portfolio Monitor Example");
26 println!("============================");
27
28 // Get credentials
29 let api_key = std::env::var("KITE_API_KEY")
30 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31 let access_token = std::env::var("KITE_ACCESS_TOKEN")
32 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34 // Define portfolio
35 let portfolio = vec![
36 (256265, "NIFTY 50"),
37 (408065, "HDFC Bank"),
38 (738561, "Reliance"),
39 (5633, "TCS"),
40 (884737, "Asian Paints"),
41 ];
42
43 println!("📈 Monitoring Portfolio:");
44 for (symbol, name) in &portfolio {
45 println!(" • {} ({})", name, symbol);
46 }
47 println!();
48
49 // Create manager with optimized configuration for portfolio monitoring
50 let config = KiteManagerConfig {
51 max_connections: 1, // Single connection for small portfolio
52 max_symbols_per_connection: 100,
53 connection_buffer_size: 2000,
54 parser_buffer_size: 5000,
55 enable_dedicated_parsers: true,
56 default_mode: Mode::Quote, // Quote mode for price + volume
57 ..Default::default()
58 };
59
60 // Start manager
61 let mut manager = KiteTickerManager::new(api_key, access_token, config);
62 manager.start().await?;
63
64 // Subscribe to portfolio symbols
65 let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66 manager
67 .subscribe_symbols(&symbols, Some(Mode::Quote))
68 .await?;
69
70 println!("✅ Subscribed to {} symbols", symbols.len());
71
72 // Create portfolio tracking
73 let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74 for (symbol, name) in portfolio {
75 portfolio_data.insert(
76 symbol,
77 StockInfo {
78 symbol,
79 name: name.to_string(),
80 current_price: 0.0,
81 volume: 0,
82 last_update: Instant::now(),
83 },
84 );
85 }
86
87 // Get data channels
88 let channels = manager.get_all_channels();
89
90 // Start data processing
91 for (channel_id, mut receiver) in channels {
92 let mut portfolio_clone = portfolio_data.clone();
93
94 tokio::spawn(async move {
95 println!("📡 Started monitoring channel {:?}", channel_id);
96
97 while let Ok(message) = receiver.recv().await {
98 if let TickerMessage::Ticks(ticks) = message {
99 for tick in ticks {
100 if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101 {
102 // Update stock data
103 if let Some(price) = tick.content.last_price {
104 stock.current_price = price;
105 }
106 if let Some(volume) = tick.content.volume_traded {
107 stock.volume = volume;
108 }
109 stock.last_update = Instant::now();
110
111 // Display update
112 println!(
113 "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114 stock.name,
115 stock.symbol,
116 stock.current_price,
117 stock.volume,
118 format_time_ago(stock.last_update)
119 );
120 }
121 }
122 }
123 }
124 });
125 }
126
127 // Periodic portfolio summary
128 let summary_portfolio = portfolio_data.clone();
129 tokio::spawn(async move {
130 let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132 loop {
133 interval.tick().await;
134 print_portfolio_summary(&summary_portfolio);
135 }
136 });
137
138 // Monitor for demo duration
139 println!("📊 Monitoring portfolio for 2 minutes...\n");
140 sleep(Duration::from_secs(120)).await;
141
142 // Final summary
143 println!("\n🏁 Demo completed!");
144 print_portfolio_summary(&portfolio_data);
145
146 // Stop manager
147 manager.stop().await?;
148 println!("✅ Portfolio monitor stopped");
149
150 Ok(())
151}Sourcepub async fn start(&mut self) -> Result<(), String>
pub async fn start(&mut self) -> Result<(), String>
Initialize all connections and start the manager
Examples found in repository?
8async fn main() -> Result<(), String> {
9 // Initialize with your API credentials
10 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
11 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
12
13 if api_key.is_empty() || access_token.is_empty() {
14 println!(
15 "⚠️ Please set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables"
16 );
17 return Err("Missing API credentials".to_string());
18 }
19
20 // Create configuration
21 let config = KiteManagerConfig {
22 max_symbols_per_connection: 3000,
23 max_connections: 3,
24 connection_buffer_size: 5000,
25 parser_buffer_size: 10000,
26 connection_timeout: Duration::from_secs(30),
27 health_check_interval: Duration::from_secs(5),
28 max_reconnect_attempts: 5,
29 reconnect_delay: Duration::from_secs(2),
30 enable_dedicated_parsers: true,
31 default_mode: Mode::LTP,
32 };
33
34 // Start the manager
35 let mut manager = KiteTickerManager::new(api_key, access_token, config);
36 manager.start().await?;
37
38 println!("🚀 KiteTicker Manager started!");
39
40 // Example of runtime subscription management
41 runtime_subscription_demo(&mut manager).await?;
42
43 // Stop the manager
44 manager.stop().await?;
45 Ok(())
46}More examples
9async fn main() -> Result<(), String> {
10 // Initialize logging
11 env_logger::init();
12
13 println!("🔄 KiteTicker Mode Change Test");
14 println!("═══════════════════════════════");
15
16 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
17 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
18
19 if api_key.is_empty() || access_token.is_empty() {
20 println!(
21 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
22 );
23 demonstrate_mode_change_issue().await;
24 return Ok(());
25 }
26
27 // Create configuration
28 let config = KiteManagerConfig {
29 max_symbols_per_connection: 3000,
30 max_connections: 3,
31 connection_buffer_size: 5000,
32 parser_buffer_size: 10000,
33 connection_timeout: Duration::from_secs(30),
34 health_check_interval: Duration::from_secs(5),
35 max_reconnect_attempts: 5,
36 reconnect_delay: Duration::from_secs(2),
37 enable_dedicated_parsers: true,
38 default_mode: Mode::LTP,
39 };
40
41 println!("🔧 Starting manager...");
42 let mut manager = KiteTickerManager::new(api_key, access_token, config);
43
44 manager.start().await?;
45 println!("✅ Manager started successfully");
46
47 // Test the mode change issue
48 test_mode_change_issue(&mut manager).await?;
49
50 // Stop the manager
51 println!("\n🛑 Stopping manager...");
52 manager.stop().await?;
53
54 println!("🏁 Mode change test completed!");
55 Ok(())
56}12async fn main() -> Result<(), String> {
13 env_logger::init();
14
15 println!("🔍 Market Scanner Example");
16 println!("=========================");
17
18 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21 if api_key.is_empty() || access_token.is_empty() {
22 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23 }
24
25 // High-performance configuration for scanning
26 let config = KiteManagerConfig {
27 max_connections: 3,
28 max_symbols_per_connection: 3000,
29 connection_buffer_size: 20000, // Large buffer for high volume
30 parser_buffer_size: 50000, // Even larger for parsed data
31 enable_dedicated_parsers: true,
32 default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33 ..Default::default()
34 };
35
36 let mut manager = KiteTickerManager::new(api_key, access_token, config);
37 manager.start().await?;
38
39 // Large symbol set for market scanning
40 let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42 println!(
43 "📊 Scanning {} symbols across {} connections",
44 large_symbol_set.len(),
45 3
46 );
47
48 let start_time = Instant::now();
49 manager
50 .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51 .await?;
52 println!(
53 "✅ Subscribed to {} symbols in {:?}",
54 large_symbol_set.len(),
55 start_time.elapsed()
56 );
57
58 // Get all channels and start parallel processing
59 let channels = manager.get_all_channels();
60 let mut handles = Vec::new();
61
62 for (channel_id, mut receiver) in channels {
63 let handle = tokio::spawn(async move {
64 let mut scanner = MarketScanner::new(channel_id);
65
66 while let Ok(message) = receiver.recv().await {
67 if let TickerMessage::Ticks(ticks) = message {
68 scanner.process_ticks(ticks).await;
69 }
70 }
71
72 scanner.print_summary();
73 });
74
75 handles.push(handle);
76 }
77
78 // Run scanner for specified duration
79 println!("🔄 Scanning market for 30 seconds...");
80 sleep(Duration::from_secs(30)).await;
81
82 // Stop manager
83 manager.stop().await?;
84
85 // Wait for all scanners to complete
86 for handle in handles {
87 let _ = handle.await;
88 }
89
90 println!("🏁 Market scanning completed");
91 Ok(())
92}14async fn main() -> Result<(), String> {
15 env_logger::init();
16
17 println!("⚡ High-Frequency Trading Example");
18 println!("=================================");
19
20 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23 if api_key.is_empty() || access_token.is_empty() {
24 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25 }
26
27 // Ultra-high performance configuration for HFT
28 let config = KiteManagerConfig {
29 max_connections: 3,
30 max_symbols_per_connection: 1000, // Focused symbol set for HFT
31 connection_buffer_size: 100000, // Maximum possible buffer
32 parser_buffer_size: 200000, // Ultra-large parser buffer
33 enable_dedicated_parsers: true,
34 default_mode: Mode::Full, // Full market depth data
35 ..Default::default()
36 };
37
38 println!("🚀 Optimized for:");
39 println!(" Sub-microsecond latency");
40 println!(" Maximum throughput");
41 println!(" Real-time order book analysis");
42 println!(" Ultra-low garbage collection");
43
44 let mut manager = KiteTickerManager::new(api_key, access_token, config);
45 manager.start().await?;
46
47 // Focus on highly liquid instruments for HFT
48 let hft_symbols = vec![
49 256265, // NIFTY 50 - highly liquid
50 408065, // HDFC Bank - large volume
51 738561, // Reliance - most traded
52 884737, // ICICI Bank
53 341249, // TCS
54 492033, // ITC
55 779521, // Kotak Bank
56 ];
57
58 println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59 manager
60 .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61 .await?;
62
63 // Setup high-frequency processing engines
64 let channels = manager.get_all_channels();
65 let mut hft_engines = Vec::new();
66
67 for (channel_id, mut receiver) in channels {
68 let engine = tokio::spawn(async move {
69 let mut hft_processor = HFTProcessor::new(channel_id);
70
71 while let Ok(message) = receiver.recv().await {
72 if let TickerMessage::Ticks(ticks) = message {
73 hft_processor.process_ultra_fast(ticks).await;
74 }
75 }
76
77 hft_processor.print_performance_metrics();
78 });
79
80 hft_engines.push(engine);
81 }
82
83 // Run HFT simulation
84 println!("⚡ Starting high-frequency processing...");
85 sleep(Duration::from_secs(60)).await;
86
87 // Stop processing
88 manager.stop().await?;
89
90 // Wait for all engines to complete
91 for engine in hft_engines {
92 let _ = engine.await;
93 }
94
95 println!("🏁 High-frequency trading simulation completed");
96 Ok(())
97}8async fn main() -> Result<(), String> {
9 env_logger::init();
10
11 println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12 println!("═══════════════════════════════════════════");
13
14 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17 if api_key.is_empty() || access_token.is_empty() {
18 println!(
19 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20 );
21 return Ok(());
22 }
23
24 let config = KiteManagerConfig {
25 max_symbols_per_connection: 3000,
26 max_connections: 1, // Use only 1 connection for simplicity
27 connection_buffer_size: 5000,
28 parser_buffer_size: 10000,
29 connection_timeout: Duration::from_secs(30),
30 health_check_interval: Duration::from_secs(5),
31 max_reconnect_attempts: 5,
32 reconnect_delay: Duration::from_secs(2),
33 enable_dedicated_parsers: true,
34 default_mode: Mode::LTP,
35 };
36
37 let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39 // Start manager
40 println!("📡 Starting manager...");
41 manager.start().await?;
42 println!("✅ Manager started");
43
44 // Get channel BEFORE subscribing
45 println!("🎯 Getting channels...");
46 let channels = manager.get_all_channels();
47 let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49 // Start listener in background
50 let listener_handle = tokio::spawn(async move {
51 println!("👂 Listener started for {:?}", channel_id);
52
53 let mut tick_count = 0;
54 loop {
55 match timeout(Duration::from_secs(30), receiver.recv()).await {
56 Ok(Ok(message)) => match message {
57 TickerMessage::Ticks(ticks) => {
58 tick_count += ticks.len();
59 println!(
60 "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61 channel_id,
62 ticks.len(),
63 tick_count
64 );
65
66 for tick in &ticks {
67 println!("🔹 FULL TICK DEBUG:");
68 println!("{:#?}", tick);
69 println!("─────────────────────────────────────");
70 }
71 }
72 TickerMessage::Error(err) => {
73 println!("❌ Error: {}", err);
74 }
75 _ => {
76 println!("📨 Other message: {:?}", message);
77 }
78 },
79 Ok(Err(e)) => {
80 println!("❌ Receive error: {}", e);
81 break;
82 }
83 Err(_) => {
84 println!("⏱️ Listener timeout");
85 break;
86 }
87 }
88 }
89 println!("👂 Listener stopped");
90 });
91
92 // Give listener time to start
93 tokio::time::sleep(Duration::from_millis(200)).await;
94
95 // Now subscribe to a symbol
96 println!("📊 Subscribing to symbol 256265...");
97 manager
98 .subscribe_symbols(&[128083204], Some(Mode::Full))
99 .await?;
100 println!("✅ Subscription sent");
101
102 // Wait for ticks
103 println!("⏳ Waiting 10 seconds for ticks...");
104 tokio::time::sleep(Duration::from_secs(10)).await;
105
106 // Stop
107 manager.stop().await?;
108 listener_handle.abort();
109
110 println!("🏁 Test completed");
111 Ok(())
112}21async fn main() -> Result<(), String> {
22 // Initialize logging
23 env_logger::init();
24
25 println!("📊 Portfolio Monitor Example");
26 println!("============================");
27
28 // Get credentials
29 let api_key = std::env::var("KITE_API_KEY")
30 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31 let access_token = std::env::var("KITE_ACCESS_TOKEN")
32 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34 // Define portfolio
35 let portfolio = vec![
36 (256265, "NIFTY 50"),
37 (408065, "HDFC Bank"),
38 (738561, "Reliance"),
39 (5633, "TCS"),
40 (884737, "Asian Paints"),
41 ];
42
43 println!("📈 Monitoring Portfolio:");
44 for (symbol, name) in &portfolio {
45 println!(" • {} ({})", name, symbol);
46 }
47 println!();
48
49 // Create manager with optimized configuration for portfolio monitoring
50 let config = KiteManagerConfig {
51 max_connections: 1, // Single connection for small portfolio
52 max_symbols_per_connection: 100,
53 connection_buffer_size: 2000,
54 parser_buffer_size: 5000,
55 enable_dedicated_parsers: true,
56 default_mode: Mode::Quote, // Quote mode for price + volume
57 ..Default::default()
58 };
59
60 // Start manager
61 let mut manager = KiteTickerManager::new(api_key, access_token, config);
62 manager.start().await?;
63
64 // Subscribe to portfolio symbols
65 let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66 manager
67 .subscribe_symbols(&symbols, Some(Mode::Quote))
68 .await?;
69
70 println!("✅ Subscribed to {} symbols", symbols.len());
71
72 // Create portfolio tracking
73 let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74 for (symbol, name) in portfolio {
75 portfolio_data.insert(
76 symbol,
77 StockInfo {
78 symbol,
79 name: name.to_string(),
80 current_price: 0.0,
81 volume: 0,
82 last_update: Instant::now(),
83 },
84 );
85 }
86
87 // Get data channels
88 let channels = manager.get_all_channels();
89
90 // Start data processing
91 for (channel_id, mut receiver) in channels {
92 let mut portfolio_clone = portfolio_data.clone();
93
94 tokio::spawn(async move {
95 println!("📡 Started monitoring channel {:?}", channel_id);
96
97 while let Ok(message) = receiver.recv().await {
98 if let TickerMessage::Ticks(ticks) = message {
99 for tick in ticks {
100 if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101 {
102 // Update stock data
103 if let Some(price) = tick.content.last_price {
104 stock.current_price = price;
105 }
106 if let Some(volume) = tick.content.volume_traded {
107 stock.volume = volume;
108 }
109 stock.last_update = Instant::now();
110
111 // Display update
112 println!(
113 "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114 stock.name,
115 stock.symbol,
116 stock.current_price,
117 stock.volume,
118 format_time_ago(stock.last_update)
119 );
120 }
121 }
122 }
123 }
124 });
125 }
126
127 // Periodic portfolio summary
128 let summary_portfolio = portfolio_data.clone();
129 tokio::spawn(async move {
130 let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132 loop {
133 interval.tick().await;
134 print_portfolio_summary(&summary_portfolio);
135 }
136 });
137
138 // Monitor for demo duration
139 println!("📊 Monitoring portfolio for 2 minutes...\n");
140 sleep(Duration::from_secs(120)).await;
141
142 // Final summary
143 println!("\n🏁 Demo completed!");
144 print_portfolio_summary(&portfolio_data);
145
146 // Stop manager
147 manager.stop().await?;
148 println!("✅ Portfolio monitor stopped");
149
150 Ok(())
151}Sourcepub async fn subscribe_symbols(
&mut self,
symbols: &[u32],
mode: Option<Mode>,
) -> Result<(), String>
pub async fn subscribe_symbols( &mut self, symbols: &[u32], mode: Option<Mode>, ) -> Result<(), String>
Subscribe to symbols using round-robin distribution
Examples found in repository?
12async fn main() -> Result<(), String> {
13 env_logger::init();
14
15 println!("🔍 Market Scanner Example");
16 println!("=========================");
17
18 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21 if api_key.is_empty() || access_token.is_empty() {
22 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23 }
24
25 // High-performance configuration for scanning
26 let config = KiteManagerConfig {
27 max_connections: 3,
28 max_symbols_per_connection: 3000,
29 connection_buffer_size: 20000, // Large buffer for high volume
30 parser_buffer_size: 50000, // Even larger for parsed data
31 enable_dedicated_parsers: true,
32 default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33 ..Default::default()
34 };
35
36 let mut manager = KiteTickerManager::new(api_key, access_token, config);
37 manager.start().await?;
38
39 // Large symbol set for market scanning
40 let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42 println!(
43 "📊 Scanning {} symbols across {} connections",
44 large_symbol_set.len(),
45 3
46 );
47
48 let start_time = Instant::now();
49 manager
50 .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51 .await?;
52 println!(
53 "✅ Subscribed to {} symbols in {:?}",
54 large_symbol_set.len(),
55 start_time.elapsed()
56 );
57
58 // Get all channels and start parallel processing
59 let channels = manager.get_all_channels();
60 let mut handles = Vec::new();
61
62 for (channel_id, mut receiver) in channels {
63 let handle = tokio::spawn(async move {
64 let mut scanner = MarketScanner::new(channel_id);
65
66 while let Ok(message) = receiver.recv().await {
67 if let TickerMessage::Ticks(ticks) = message {
68 scanner.process_ticks(ticks).await;
69 }
70 }
71
72 scanner.print_summary();
73 });
74
75 handles.push(handle);
76 }
77
78 // Run scanner for specified duration
79 println!("🔄 Scanning market for 30 seconds...");
80 sleep(Duration::from_secs(30)).await;
81
82 // Stop manager
83 manager.stop().await?;
84
85 // Wait for all scanners to complete
86 for handle in handles {
87 let _ = handle.await;
88 }
89
90 println!("🏁 Market scanning completed");
91 Ok(())
92}More examples
48async fn runtime_subscription_demo(
49 manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51 println!("\n📡 Runtime Subscription Management Demo");
52 println!("=====================================");
53
54 // 1. Start with initial symbols
55 println!("\n1️⃣ Initial subscription to base symbols");
56 let initial_symbols = vec![256265, 265, 256777];
57 manager
58 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59 .await?;
60 print_current_state(manager, "Initial subscription").await;
61
62 sleep(Duration::from_secs(2)).await;
63
64 // 2. Add more symbols dynamically
65 println!("\n2️⃣ Adding symbols dynamically");
66 let additional_symbols = vec![274441, 260105, 273929];
67 manager
68 .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69 .await?;
70 print_current_state(manager, "After adding symbols").await;
71
72 sleep(Duration::from_secs(2)).await;
73
74 // 3. Change mode for existing symbols
75 println!("\n3️⃣ Changing subscription mode");
76 let symbols_for_mode_change = vec![256265, 265];
77 manager
78 .change_mode(&symbols_for_mode_change, Mode::Full)
79 .await?;
80 print_current_state(manager, "After mode change").await;
81
82 sleep(Duration::from_secs(2)).await;
83
84 // 4. Remove some symbols
85 println!("\n4️⃣ Removing symbols dynamically");
86 let symbols_to_remove = vec![265, 274441];
87 manager.unsubscribe_symbols(&symbols_to_remove).await?;
88 print_current_state(manager, "After removing symbols").await;
89
90 sleep(Duration::from_secs(2)).await;
91
92 // 5. Add different symbols with different modes
93 println!("\n5️⃣ Adding new symbols with Full mode");
94 let full_mode_symbols = vec![257801, 258825];
95 manager
96 .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97 .await?;
98 print_current_state(manager, "After adding Full mode symbols").await;
99
100 // 6. Monitor live data for a short period
101 println!("\n6️⃣ Monitoring live data (5 seconds)");
102 monitor_live_data(manager, 5).await?;
103
104 // 7. Complete cleanup
105 println!("\n7️⃣ Complete cleanup");
106 let all_symbols: Vec<u32> = manager
107 .get_symbol_distribution()
108 .values()
109 .flat_map(|symbols| symbols.iter().cloned())
110 .collect();
111
112 if !all_symbols.is_empty() {
113 manager.unsubscribe_symbols(&all_symbols).await?;
114 print_current_state(manager, "After complete cleanup").await;
115 }
116
117 println!("\n✅ Runtime subscription demo completed!");
118 Ok(())
119}14async fn main() -> Result<(), String> {
15 env_logger::init();
16
17 println!("⚡ High-Frequency Trading Example");
18 println!("=================================");
19
20 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23 if api_key.is_empty() || access_token.is_empty() {
24 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25 }
26
27 // Ultra-high performance configuration for HFT
28 let config = KiteManagerConfig {
29 max_connections: 3,
30 max_symbols_per_connection: 1000, // Focused symbol set for HFT
31 connection_buffer_size: 100000, // Maximum possible buffer
32 parser_buffer_size: 200000, // Ultra-large parser buffer
33 enable_dedicated_parsers: true,
34 default_mode: Mode::Full, // Full market depth data
35 ..Default::default()
36 };
37
38 println!("🚀 Optimized for:");
39 println!(" Sub-microsecond latency");
40 println!(" Maximum throughput");
41 println!(" Real-time order book analysis");
42 println!(" Ultra-low garbage collection");
43
44 let mut manager = KiteTickerManager::new(api_key, access_token, config);
45 manager.start().await?;
46
47 // Focus on highly liquid instruments for HFT
48 let hft_symbols = vec![
49 256265, // NIFTY 50 - highly liquid
50 408065, // HDFC Bank - large volume
51 738561, // Reliance - most traded
52 884737, // ICICI Bank
53 341249, // TCS
54 492033, // ITC
55 779521, // Kotak Bank
56 ];
57
58 println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59 manager
60 .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61 .await?;
62
63 // Setup high-frequency processing engines
64 let channels = manager.get_all_channels();
65 let mut hft_engines = Vec::new();
66
67 for (channel_id, mut receiver) in channels {
68 let engine = tokio::spawn(async move {
69 let mut hft_processor = HFTProcessor::new(channel_id);
70
71 while let Ok(message) = receiver.recv().await {
72 if let TickerMessage::Ticks(ticks) = message {
73 hft_processor.process_ultra_fast(ticks).await;
74 }
75 }
76
77 hft_processor.print_performance_metrics();
78 });
79
80 hft_engines.push(engine);
81 }
82
83 // Run HFT simulation
84 println!("⚡ Starting high-frequency processing...");
85 sleep(Duration::from_secs(60)).await;
86
87 // Stop processing
88 manager.stop().await?;
89
90 // Wait for all engines to complete
91 for engine in hft_engines {
92 let _ = engine.await;
93 }
94
95 println!("🏁 High-frequency trading simulation completed");
96 Ok(())
97}8async fn main() -> Result<(), String> {
9 env_logger::init();
10
11 println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12 println!("═══════════════════════════════════════════");
13
14 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17 if api_key.is_empty() || access_token.is_empty() {
18 println!(
19 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20 );
21 return Ok(());
22 }
23
24 let config = KiteManagerConfig {
25 max_symbols_per_connection: 3000,
26 max_connections: 1, // Use only 1 connection for simplicity
27 connection_buffer_size: 5000,
28 parser_buffer_size: 10000,
29 connection_timeout: Duration::from_secs(30),
30 health_check_interval: Duration::from_secs(5),
31 max_reconnect_attempts: 5,
32 reconnect_delay: Duration::from_secs(2),
33 enable_dedicated_parsers: true,
34 default_mode: Mode::LTP,
35 };
36
37 let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39 // Start manager
40 println!("📡 Starting manager...");
41 manager.start().await?;
42 println!("✅ Manager started");
43
44 // Get channel BEFORE subscribing
45 println!("🎯 Getting channels...");
46 let channels = manager.get_all_channels();
47 let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49 // Start listener in background
50 let listener_handle = tokio::spawn(async move {
51 println!("👂 Listener started for {:?}", channel_id);
52
53 let mut tick_count = 0;
54 loop {
55 match timeout(Duration::from_secs(30), receiver.recv()).await {
56 Ok(Ok(message)) => match message {
57 TickerMessage::Ticks(ticks) => {
58 tick_count += ticks.len();
59 println!(
60 "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61 channel_id,
62 ticks.len(),
63 tick_count
64 );
65
66 for tick in &ticks {
67 println!("🔹 FULL TICK DEBUG:");
68 println!("{:#?}", tick);
69 println!("─────────────────────────────────────");
70 }
71 }
72 TickerMessage::Error(err) => {
73 println!("❌ Error: {}", err);
74 }
75 _ => {
76 println!("📨 Other message: {:?}", message);
77 }
78 },
79 Ok(Err(e)) => {
80 println!("❌ Receive error: {}", e);
81 break;
82 }
83 Err(_) => {
84 println!("⏱️ Listener timeout");
85 break;
86 }
87 }
88 }
89 println!("👂 Listener stopped");
90 });
91
92 // Give listener time to start
93 tokio::time::sleep(Duration::from_millis(200)).await;
94
95 // Now subscribe to a symbol
96 println!("📊 Subscribing to symbol 256265...");
97 manager
98 .subscribe_symbols(&[128083204], Some(Mode::Full))
99 .await?;
100 println!("✅ Subscription sent");
101
102 // Wait for ticks
103 println!("⏳ Waiting 10 seconds for ticks...");
104 tokio::time::sleep(Duration::from_secs(10)).await;
105
106 // Stop
107 manager.stop().await?;
108 listener_handle.abort();
109
110 println!("🏁 Test completed");
111 Ok(())
112}21async fn main() -> Result<(), String> {
22 // Initialize logging
23 env_logger::init();
24
25 println!("📊 Portfolio Monitor Example");
26 println!("============================");
27
28 // Get credentials
29 let api_key = std::env::var("KITE_API_KEY")
30 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31 let access_token = std::env::var("KITE_ACCESS_TOKEN")
32 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34 // Define portfolio
35 let portfolio = vec![
36 (256265, "NIFTY 50"),
37 (408065, "HDFC Bank"),
38 (738561, "Reliance"),
39 (5633, "TCS"),
40 (884737, "Asian Paints"),
41 ];
42
43 println!("📈 Monitoring Portfolio:");
44 for (symbol, name) in &portfolio {
45 println!(" • {} ({})", name, symbol);
46 }
47 println!();
48
49 // Create manager with optimized configuration for portfolio monitoring
50 let config = KiteManagerConfig {
51 max_connections: 1, // Single connection for small portfolio
52 max_symbols_per_connection: 100,
53 connection_buffer_size: 2000,
54 parser_buffer_size: 5000,
55 enable_dedicated_parsers: true,
56 default_mode: Mode::Quote, // Quote mode for price + volume
57 ..Default::default()
58 };
59
60 // Start manager
61 let mut manager = KiteTickerManager::new(api_key, access_token, config);
62 manager.start().await?;
63
64 // Subscribe to portfolio symbols
65 let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66 manager
67 .subscribe_symbols(&symbols, Some(Mode::Quote))
68 .await?;
69
70 println!("✅ Subscribed to {} symbols", symbols.len());
71
72 // Create portfolio tracking
73 let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74 for (symbol, name) in portfolio {
75 portfolio_data.insert(
76 symbol,
77 StockInfo {
78 symbol,
79 name: name.to_string(),
80 current_price: 0.0,
81 volume: 0,
82 last_update: Instant::now(),
83 },
84 );
85 }
86
87 // Get data channels
88 let channels = manager.get_all_channels();
89
90 // Start data processing
91 for (channel_id, mut receiver) in channels {
92 let mut portfolio_clone = portfolio_data.clone();
93
94 tokio::spawn(async move {
95 println!("📡 Started monitoring channel {:?}", channel_id);
96
97 while let Ok(message) = receiver.recv().await {
98 if let TickerMessage::Ticks(ticks) = message {
99 for tick in ticks {
100 if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101 {
102 // Update stock data
103 if let Some(price) = tick.content.last_price {
104 stock.current_price = price;
105 }
106 if let Some(volume) = tick.content.volume_traded {
107 stock.volume = volume;
108 }
109 stock.last_update = Instant::now();
110
111 // Display update
112 println!(
113 "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114 stock.name,
115 stock.symbol,
116 stock.current_price,
117 stock.volume,
118 format_time_ago(stock.last_update)
119 );
120 }
121 }
122 }
123 }
124 });
125 }
126
127 // Periodic portfolio summary
128 let summary_portfolio = portfolio_data.clone();
129 tokio::spawn(async move {
130 let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132 loop {
133 interval.tick().await;
134 print_portfolio_summary(&summary_portfolio);
135 }
136 });
137
138 // Monitor for demo duration
139 println!("📊 Monitoring portfolio for 2 minutes...\n");
140 sleep(Duration::from_secs(120)).await;
141
142 // Final summary
143 println!("\n🏁 Demo completed!");
144 print_portfolio_summary(&portfolio_data);
145
146 // Stop manager
147 manager.stop().await?;
148 println!("✅ Portfolio monitor stopped");
149
150 Ok(())
151}58async fn test_mode_change_issue(
59 manager: &mut KiteTickerManager,
60) -> Result<(), String> {
61 println!("\n🧪 Testing Mode Change Issue");
62 println!("=============================");
63
64 // Step 1: Subscribe to a symbol with LTP mode
65 let test_symbol = 738561; // Test symbol provided by user
66 println!("\n1️⃣ Subscribing to symbol {} with LTP mode", test_symbol);
67 manager
68 .subscribe_symbols(&[test_symbol], Some(Mode::LTP))
69 .await?;
70
71 // Start monitoring ticks to see the mode
72 let channels = manager.get_all_channels();
73 let mut tick_listeners = Vec::new();
74
75 for (channel_id, mut receiver) in channels {
76 let task = tokio::spawn(async move {
77 let mut ticks_received = 0;
78 let start = Instant::now();
79
80 while start.elapsed() < Duration::from_secs(5) && ticks_received < 3 {
81 match timeout(Duration::from_millis(500), receiver.recv()).await {
82 Ok(Ok(message)) => {
83 if let TickerMessage::Ticks(ticks) = message {
84 for tick in &ticks {
85 if tick.instrument_token == test_symbol {
86 ticks_received += 1;
87 println!(
88 " 📊 Received tick for {}: Mode={:?}, LTP={:?}",
89 tick.instrument_token,
90 tick.content.mode,
91 tick.content.last_price
92 );
93
94 // Check if OHLC data is present (should not be for LTP mode)
95 if tick.content.ohlc.is_some() {
96 println!(
97 " ⚠️ OHLC data present in LTP mode - unexpected!"
98 );
99 } else {
100 println!(" ✅ No OHLC data in LTP mode - correct");
101 }
102 }
103 }
104 }
105 }
106 _ => continue,
107 }
108 }
109 (channel_id, ticks_received)
110 });
111 tick_listeners.push(task);
112 }
113
114 // Wait for initial ticks
115 for task in tick_listeners {
116 if let Ok((channel_id, count)) = task.await {
117 println!(
118 " 📈 {:?}: Received {} ticks for initial LTP subscription",
119 channel_id, count
120 );
121 }
122 }
123
124 sleep(Duration::from_secs(2)).await;
125
126 // Step 2: Attempt to change mode to Full (this is where the issue should manifest)
127 println!(
128 "\n2️⃣ Attempting to change mode from LTP to Full for symbol {}",
129 test_symbol
130 );
131 match manager.change_mode(&[test_symbol], Mode::Full).await {
132 Ok(()) => {
133 println!(" ✅ Mode change command sent successfully");
134 }
135 Err(e) => {
136 println!(" ❌ Mode change command failed: {}", e);
137 return Err(e);
138 }
139 }
140
141 // Step 3: Monitor ticks after mode change to see if it actually worked
142 println!("\n3️⃣ Monitoring ticks after mode change...");
143 let channels = manager.get_all_channels();
144 let mut post_change_listeners = Vec::new();
145
146 for (channel_id, mut receiver) in channels {
147 let task = tokio::spawn(async move {
148 let mut ticks_received = 0;
149 let mut ohlc_present = 0;
150 let mut depth_present = 0;
151 let start = Instant::now();
152
153 while start.elapsed() < Duration::from_secs(10) && ticks_received < 5 {
154 match timeout(Duration::from_millis(500), receiver.recv()).await {
155 Ok(Ok(message)) => {
156 if let TickerMessage::Ticks(ticks) = message {
157 for tick in &ticks {
158 if tick.instrument_token == test_symbol {
159 ticks_received += 1;
160 println!(
161 " 📊 Post-change tick for {}: Mode={:?}, LTP={:?}",
162 tick.instrument_token,
163 tick.content.mode,
164 tick.content.last_price
165 );
166
167 // Check if Full mode data is present
168 if let Some(ohlc) = &tick.content.ohlc {
169 ohlc_present += 1;
170 println!(
171 " ✅ OHLC data present: O:{} H:{} L:{} C:{}",
172 ohlc.open, ohlc.high, ohlc.low, ohlc.close
173 );
174 } else {
175 println!(" ❌ OHLC data missing - mode change may not have worked!");
176 }
177
178 if let Some(depth) = &tick.content.depth {
179 depth_present += 1;
180 println!(
181 " ✅ Market depth present: {} buy, {} sell orders",
182 depth.buy.len(),
183 depth.sell.len()
184 );
185 } else {
186 println!(" ❌ Market depth missing - mode change may not have worked!");
187 }
188
189 // Log the actual mode reported in the tick
190 info!("Tick mode reported: {:?}", tick.content.mode);
191 }
192 }
193 }
194 }
195 _ => continue,
196 }
197 }
198 (channel_id, ticks_received, ohlc_present, depth_present)
199 });
200 post_change_listeners.push(task);
201 }
202
203 // Wait for post-change ticks
204 let mut total_ticks = 0;
205 let mut total_ohlc = 0;
206 let mut total_depth = 0;
207
208 for task in post_change_listeners {
209 if let Ok((channel_id, ticks, ohlc, depth)) = task.await {
210 println!(
211 " 📈 {:?}: {} ticks, {} with OHLC, {} with depth",
212 channel_id, ticks, ohlc, depth
213 );
214 total_ticks += ticks;
215 total_ohlc += ohlc;
216 total_depth += depth;
217 }
218 }
219
220 // Step 4: Analyze results
221 println!("\n4️⃣ Test Results Analysis");
222 println!("========================");
223
224 if total_ticks == 0 {
225 println!(" ⚠️ No ticks received after mode change - connection issue?");
226 } else if total_ohlc == 0 && total_depth == 0 {
227 println!(" ❌ ISSUE CONFIRMED: Mode change command sent but Full mode data not received");
228 println!(" 💡 This confirms that set_mode() alone doesn't work - need unsubscribe+resubscribe");
229 } else if total_ohlc > 0 && total_depth > 0 {
230 println!(" ✅ Mode change worked - Full mode data received");
231 } else {
232 println!(
233 " ⚠️ Partial mode change - some Full mode data received but not all"
234 );
235 }
236
237 println!("\n📊 Summary:");
238 println!(" Total ticks received: {}", total_ticks);
239 println!(" Ticks with OHLC data: {}", total_ohlc);
240 println!(" Ticks with market depth: {}", total_depth);
241
242 Ok(())
243}Sourcepub fn get_channel(
&mut self,
channel_id: ChannelId,
) -> Option<Receiver<TickerMessage>>
pub fn get_channel( &mut self, channel_id: ChannelId, ) -> Option<Receiver<TickerMessage>>
Get output channel for a specific connection
Sourcepub fn get_all_channels(&mut self) -> Vec<(ChannelId, Receiver<TickerMessage>)>
pub fn get_all_channels(&mut self) -> Vec<(ChannelId, Receiver<TickerMessage>)>
Get all output channels
Examples found in repository?
142async fn monitor_live_data(
143 manager: &mut KiteTickerManager,
144 seconds: u64,
145) -> Result<(), String> {
146 let channels = manager.get_all_channels();
147 let mut tasks = Vec::new();
148
149 for (channel_id, mut receiver) in channels {
150 let task = tokio::spawn(async move {
151 let mut count = 0;
152 let start = std::time::Instant::now();
153
154 while start.elapsed() < Duration::from_secs(seconds) {
155 match tokio::time::timeout(Duration::from_millis(100), receiver.recv())
156 .await
157 {
158 Ok(Ok(message)) => {
159 count += 1;
160 if let TickerMessage::Ticks(ticks) = message {
161 if count <= 2 {
162 // Show first few ticks
163 println!(
164 " 📋 {:?}: Received {} ticks",
165 channel_id,
166 ticks.len()
167 );
168 }
169 }
170 }
171 _ => continue,
172 }
173 }
174 (channel_id, count)
175 });
176 tasks.push(task);
177 }
178
179 // Wait for monitoring to complete
180 for task in tasks {
181 if let Ok((channel_id, count)) = task.await {
182 println!(" 📊 {:?}: {} total messages", channel_id, count);
183 }
184 }
185
186 Ok(())
187}More examples
12async fn main() -> Result<(), String> {
13 env_logger::init();
14
15 println!("🔍 Market Scanner Example");
16 println!("=========================");
17
18 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21 if api_key.is_empty() || access_token.is_empty() {
22 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23 }
24
25 // High-performance configuration for scanning
26 let config = KiteManagerConfig {
27 max_connections: 3,
28 max_symbols_per_connection: 3000,
29 connection_buffer_size: 20000, // Large buffer for high volume
30 parser_buffer_size: 50000, // Even larger for parsed data
31 enable_dedicated_parsers: true,
32 default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33 ..Default::default()
34 };
35
36 let mut manager = KiteTickerManager::new(api_key, access_token, config);
37 manager.start().await?;
38
39 // Large symbol set for market scanning
40 let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42 println!(
43 "📊 Scanning {} symbols across {} connections",
44 large_symbol_set.len(),
45 3
46 );
47
48 let start_time = Instant::now();
49 manager
50 .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51 .await?;
52 println!(
53 "✅ Subscribed to {} symbols in {:?}",
54 large_symbol_set.len(),
55 start_time.elapsed()
56 );
57
58 // Get all channels and start parallel processing
59 let channels = manager.get_all_channels();
60 let mut handles = Vec::new();
61
62 for (channel_id, mut receiver) in channels {
63 let handle = tokio::spawn(async move {
64 let mut scanner = MarketScanner::new(channel_id);
65
66 while let Ok(message) = receiver.recv().await {
67 if let TickerMessage::Ticks(ticks) = message {
68 scanner.process_ticks(ticks).await;
69 }
70 }
71
72 scanner.print_summary();
73 });
74
75 handles.push(handle);
76 }
77
78 // Run scanner for specified duration
79 println!("🔄 Scanning market for 30 seconds...");
80 sleep(Duration::from_secs(30)).await;
81
82 // Stop manager
83 manager.stop().await?;
84
85 // Wait for all scanners to complete
86 for handle in handles {
87 let _ = handle.await;
88 }
89
90 println!("🏁 Market scanning completed");
91 Ok(())
92}14async fn main() -> Result<(), String> {
15 env_logger::init();
16
17 println!("⚡ High-Frequency Trading Example");
18 println!("=================================");
19
20 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23 if api_key.is_empty() || access_token.is_empty() {
24 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25 }
26
27 // Ultra-high performance configuration for HFT
28 let config = KiteManagerConfig {
29 max_connections: 3,
30 max_symbols_per_connection: 1000, // Focused symbol set for HFT
31 connection_buffer_size: 100000, // Maximum possible buffer
32 parser_buffer_size: 200000, // Ultra-large parser buffer
33 enable_dedicated_parsers: true,
34 default_mode: Mode::Full, // Full market depth data
35 ..Default::default()
36 };
37
38 println!("🚀 Optimized for:");
39 println!(" Sub-microsecond latency");
40 println!(" Maximum throughput");
41 println!(" Real-time order book analysis");
42 println!(" Ultra-low garbage collection");
43
44 let mut manager = KiteTickerManager::new(api_key, access_token, config);
45 manager.start().await?;
46
47 // Focus on highly liquid instruments for HFT
48 let hft_symbols = vec![
49 256265, // NIFTY 50 - highly liquid
50 408065, // HDFC Bank - large volume
51 738561, // Reliance - most traded
52 884737, // ICICI Bank
53 341249, // TCS
54 492033, // ITC
55 779521, // Kotak Bank
56 ];
57
58 println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59 manager
60 .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61 .await?;
62
63 // Setup high-frequency processing engines
64 let channels = manager.get_all_channels();
65 let mut hft_engines = Vec::new();
66
67 for (channel_id, mut receiver) in channels {
68 let engine = tokio::spawn(async move {
69 let mut hft_processor = HFTProcessor::new(channel_id);
70
71 while let Ok(message) = receiver.recv().await {
72 if let TickerMessage::Ticks(ticks) = message {
73 hft_processor.process_ultra_fast(ticks).await;
74 }
75 }
76
77 hft_processor.print_performance_metrics();
78 });
79
80 hft_engines.push(engine);
81 }
82
83 // Run HFT simulation
84 println!("⚡ Starting high-frequency processing...");
85 sleep(Duration::from_secs(60)).await;
86
87 // Stop processing
88 manager.stop().await?;
89
90 // Wait for all engines to complete
91 for engine in hft_engines {
92 let _ = engine.await;
93 }
94
95 println!("🏁 High-frequency trading simulation completed");
96 Ok(())
97}8async fn main() -> Result<(), String> {
9 env_logger::init();
10
11 println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12 println!("═══════════════════════════════════════════");
13
14 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17 if api_key.is_empty() || access_token.is_empty() {
18 println!(
19 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20 );
21 return Ok(());
22 }
23
24 let config = KiteManagerConfig {
25 max_symbols_per_connection: 3000,
26 max_connections: 1, // Use only 1 connection for simplicity
27 connection_buffer_size: 5000,
28 parser_buffer_size: 10000,
29 connection_timeout: Duration::from_secs(30),
30 health_check_interval: Duration::from_secs(5),
31 max_reconnect_attempts: 5,
32 reconnect_delay: Duration::from_secs(2),
33 enable_dedicated_parsers: true,
34 default_mode: Mode::LTP,
35 };
36
37 let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39 // Start manager
40 println!("📡 Starting manager...");
41 manager.start().await?;
42 println!("✅ Manager started");
43
44 // Get channel BEFORE subscribing
45 println!("🎯 Getting channels...");
46 let channels = manager.get_all_channels();
47 let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49 // Start listener in background
50 let listener_handle = tokio::spawn(async move {
51 println!("👂 Listener started for {:?}", channel_id);
52
53 let mut tick_count = 0;
54 loop {
55 match timeout(Duration::from_secs(30), receiver.recv()).await {
56 Ok(Ok(message)) => match message {
57 TickerMessage::Ticks(ticks) => {
58 tick_count += ticks.len();
59 println!(
60 "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61 channel_id,
62 ticks.len(),
63 tick_count
64 );
65
66 for tick in &ticks {
67 println!("🔹 FULL TICK DEBUG:");
68 println!("{:#?}", tick);
69 println!("─────────────────────────────────────");
70 }
71 }
72 TickerMessage::Error(err) => {
73 println!("❌ Error: {}", err);
74 }
75 _ => {
76 println!("📨 Other message: {:?}", message);
77 }
78 },
79 Ok(Err(e)) => {
80 println!("❌ Receive error: {}", e);
81 break;
82 }
83 Err(_) => {
84 println!("⏱️ Listener timeout");
85 break;
86 }
87 }
88 }
89 println!("👂 Listener stopped");
90 });
91
92 // Give listener time to start
93 tokio::time::sleep(Duration::from_millis(200)).await;
94
95 // Now subscribe to a symbol
96 println!("📊 Subscribing to symbol 256265...");
97 manager
98 .subscribe_symbols(&[128083204], Some(Mode::Full))
99 .await?;
100 println!("✅ Subscription sent");
101
102 // Wait for ticks
103 println!("⏳ Waiting 10 seconds for ticks...");
104 tokio::time::sleep(Duration::from_secs(10)).await;
105
106 // Stop
107 manager.stop().await?;
108 listener_handle.abort();
109
110 println!("🏁 Test completed");
111 Ok(())
112}21async fn main() -> Result<(), String> {
22 // Initialize logging
23 env_logger::init();
24
25 println!("📊 Portfolio Monitor Example");
26 println!("============================");
27
28 // Get credentials
29 let api_key = std::env::var("KITE_API_KEY")
30 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31 let access_token = std::env::var("KITE_ACCESS_TOKEN")
32 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34 // Define portfolio
35 let portfolio = vec![
36 (256265, "NIFTY 50"),
37 (408065, "HDFC Bank"),
38 (738561, "Reliance"),
39 (5633, "TCS"),
40 (884737, "Asian Paints"),
41 ];
42
43 println!("📈 Monitoring Portfolio:");
44 for (symbol, name) in &portfolio {
45 println!(" • {} ({})", name, symbol);
46 }
47 println!();
48
49 // Create manager with optimized configuration for portfolio monitoring
50 let config = KiteManagerConfig {
51 max_connections: 1, // Single connection for small portfolio
52 max_symbols_per_connection: 100,
53 connection_buffer_size: 2000,
54 parser_buffer_size: 5000,
55 enable_dedicated_parsers: true,
56 default_mode: Mode::Quote, // Quote mode for price + volume
57 ..Default::default()
58 };
59
60 // Start manager
61 let mut manager = KiteTickerManager::new(api_key, access_token, config);
62 manager.start().await?;
63
64 // Subscribe to portfolio symbols
65 let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66 manager
67 .subscribe_symbols(&symbols, Some(Mode::Quote))
68 .await?;
69
70 println!("✅ Subscribed to {} symbols", symbols.len());
71
72 // Create portfolio tracking
73 let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74 for (symbol, name) in portfolio {
75 portfolio_data.insert(
76 symbol,
77 StockInfo {
78 symbol,
79 name: name.to_string(),
80 current_price: 0.0,
81 volume: 0,
82 last_update: Instant::now(),
83 },
84 );
85 }
86
87 // Get data channels
88 let channels = manager.get_all_channels();
89
90 // Start data processing
91 for (channel_id, mut receiver) in channels {
92 let mut portfolio_clone = portfolio_data.clone();
93
94 tokio::spawn(async move {
95 println!("📡 Started monitoring channel {:?}", channel_id);
96
97 while let Ok(message) = receiver.recv().await {
98 if let TickerMessage::Ticks(ticks) = message {
99 for tick in ticks {
100 if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101 {
102 // Update stock data
103 if let Some(price) = tick.content.last_price {
104 stock.current_price = price;
105 }
106 if let Some(volume) = tick.content.volume_traded {
107 stock.volume = volume;
108 }
109 stock.last_update = Instant::now();
110
111 // Display update
112 println!(
113 "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114 stock.name,
115 stock.symbol,
116 stock.current_price,
117 stock.volume,
118 format_time_ago(stock.last_update)
119 );
120 }
121 }
122 }
123 }
124 });
125 }
126
127 // Periodic portfolio summary
128 let summary_portfolio = portfolio_data.clone();
129 tokio::spawn(async move {
130 let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132 loop {
133 interval.tick().await;
134 print_portfolio_summary(&summary_portfolio);
135 }
136 });
137
138 // Monitor for demo duration
139 println!("📊 Monitoring portfolio for 2 minutes...\n");
140 sleep(Duration::from_secs(120)).await;
141
142 // Final summary
143 println!("\n🏁 Demo completed!");
144 print_portfolio_summary(&portfolio_data);
145
146 // Stop manager
147 manager.stop().await?;
148 println!("✅ Portfolio monitor stopped");
149
150 Ok(())
151}58async fn test_mode_change_issue(
59 manager: &mut KiteTickerManager,
60) -> Result<(), String> {
61 println!("\n🧪 Testing Mode Change Issue");
62 println!("=============================");
63
64 // Step 1: Subscribe to a symbol with LTP mode
65 let test_symbol = 738561; // Test symbol provided by user
66 println!("\n1️⃣ Subscribing to symbol {} with LTP mode", test_symbol);
67 manager
68 .subscribe_symbols(&[test_symbol], Some(Mode::LTP))
69 .await?;
70
71 // Start monitoring ticks to see the mode
72 let channels = manager.get_all_channels();
73 let mut tick_listeners = Vec::new();
74
75 for (channel_id, mut receiver) in channels {
76 let task = tokio::spawn(async move {
77 let mut ticks_received = 0;
78 let start = Instant::now();
79
80 while start.elapsed() < Duration::from_secs(5) && ticks_received < 3 {
81 match timeout(Duration::from_millis(500), receiver.recv()).await {
82 Ok(Ok(message)) => {
83 if let TickerMessage::Ticks(ticks) = message {
84 for tick in &ticks {
85 if tick.instrument_token == test_symbol {
86 ticks_received += 1;
87 println!(
88 " 📊 Received tick for {}: Mode={:?}, LTP={:?}",
89 tick.instrument_token,
90 tick.content.mode,
91 tick.content.last_price
92 );
93
94 // Check if OHLC data is present (should not be for LTP mode)
95 if tick.content.ohlc.is_some() {
96 println!(
97 " ⚠️ OHLC data present in LTP mode - unexpected!"
98 );
99 } else {
100 println!(" ✅ No OHLC data in LTP mode - correct");
101 }
102 }
103 }
104 }
105 }
106 _ => continue,
107 }
108 }
109 (channel_id, ticks_received)
110 });
111 tick_listeners.push(task);
112 }
113
114 // Wait for initial ticks
115 for task in tick_listeners {
116 if let Ok((channel_id, count)) = task.await {
117 println!(
118 " 📈 {:?}: Received {} ticks for initial LTP subscription",
119 channel_id, count
120 );
121 }
122 }
123
124 sleep(Duration::from_secs(2)).await;
125
126 // Step 2: Attempt to change mode to Full (this is where the issue should manifest)
127 println!(
128 "\n2️⃣ Attempting to change mode from LTP to Full for symbol {}",
129 test_symbol
130 );
131 match manager.change_mode(&[test_symbol], Mode::Full).await {
132 Ok(()) => {
133 println!(" ✅ Mode change command sent successfully");
134 }
135 Err(e) => {
136 println!(" ❌ Mode change command failed: {}", e);
137 return Err(e);
138 }
139 }
140
141 // Step 3: Monitor ticks after mode change to see if it actually worked
142 println!("\n3️⃣ Monitoring ticks after mode change...");
143 let channels = manager.get_all_channels();
144 let mut post_change_listeners = Vec::new();
145
146 for (channel_id, mut receiver) in channels {
147 let task = tokio::spawn(async move {
148 let mut ticks_received = 0;
149 let mut ohlc_present = 0;
150 let mut depth_present = 0;
151 let start = Instant::now();
152
153 while start.elapsed() < Duration::from_secs(10) && ticks_received < 5 {
154 match timeout(Duration::from_millis(500), receiver.recv()).await {
155 Ok(Ok(message)) => {
156 if let TickerMessage::Ticks(ticks) = message {
157 for tick in &ticks {
158 if tick.instrument_token == test_symbol {
159 ticks_received += 1;
160 println!(
161 " 📊 Post-change tick for {}: Mode={:?}, LTP={:?}",
162 tick.instrument_token,
163 tick.content.mode,
164 tick.content.last_price
165 );
166
167 // Check if Full mode data is present
168 if let Some(ohlc) = &tick.content.ohlc {
169 ohlc_present += 1;
170 println!(
171 " ✅ OHLC data present: O:{} H:{} L:{} C:{}",
172 ohlc.open, ohlc.high, ohlc.low, ohlc.close
173 );
174 } else {
175 println!(" ❌ OHLC data missing - mode change may not have worked!");
176 }
177
178 if let Some(depth) = &tick.content.depth {
179 depth_present += 1;
180 println!(
181 " ✅ Market depth present: {} buy, {} sell orders",
182 depth.buy.len(),
183 depth.sell.len()
184 );
185 } else {
186 println!(" ❌ Market depth missing - mode change may not have worked!");
187 }
188
189 // Log the actual mode reported in the tick
190 info!("Tick mode reported: {:?}", tick.content.mode);
191 }
192 }
193 }
194 }
195 _ => continue,
196 }
197 }
198 (channel_id, ticks_received, ohlc_present, depth_present)
199 });
200 post_change_listeners.push(task);
201 }
202
203 // Wait for post-change ticks
204 let mut total_ticks = 0;
205 let mut total_ohlc = 0;
206 let mut total_depth = 0;
207
208 for task in post_change_listeners {
209 if let Ok((channel_id, ticks, ohlc, depth)) = task.await {
210 println!(
211 " 📈 {:?}: {} ticks, {} with OHLC, {} with depth",
212 channel_id, ticks, ohlc, depth
213 );
214 total_ticks += ticks;
215 total_ohlc += ohlc;
216 total_depth += depth;
217 }
218 }
219
220 // Step 4: Analyze results
221 println!("\n4️⃣ Test Results Analysis");
222 println!("========================");
223
224 if total_ticks == 0 {
225 println!(" ⚠️ No ticks received after mode change - connection issue?");
226 } else if total_ohlc == 0 && total_depth == 0 {
227 println!(" ❌ ISSUE CONFIRMED: Mode change command sent but Full mode data not received");
228 println!(" 💡 This confirms that set_mode() alone doesn't work - need unsubscribe+resubscribe");
229 } else if total_ohlc > 0 && total_depth > 0 {
230 println!(" ✅ Mode change worked - Full mode data received");
231 } else {
232 println!(
233 " ⚠️ Partial mode change - some Full mode data received but not all"
234 );
235 }
236
237 println!("\n📊 Summary:");
238 println!(" Total ticks received: {}", total_ticks);
239 println!(" Ticks with OHLC data: {}", total_ohlc);
240 println!(" Ticks with market depth: {}", total_depth);
241
242 Ok(())
243}Sourcepub async fn get_stats(&self) -> Result<ManagerStats, String>
pub async fn get_stats(&self) -> Result<ManagerStats, String>
Get manager statistics
Examples found in repository?
121async fn print_current_state(manager: &KiteTickerManager, context: &str) {
122 println!("\n📊 Current State ({})", context);
123
124 // Show distribution
125 let distribution = manager.get_symbol_distribution();
126 let mut total = 0;
127 for (channel_id, symbols) in &distribution {
128 println!(" {:?}: {} symbols", channel_id, symbols.len());
129 total += symbols.len();
130 }
131 println!(" 📈 Total: {} symbols", total);
132
133 // Show stats
134 if let Ok(stats) = manager.get_stats().await {
135 println!(
136 " 📊 Stats: {} connections, {} messages",
137 stats.active_connections, stats.total_messages_received
138 );
139 }
140}More examples
8pub async fn main() -> Result<(), String> {
9 // Initialize logging
10 env_logger::init();
11
12 println!("🚀 KiteTicker Multi-Connection Manager Demo");
13 println!("═══════════════════════════════════════════");
14
15 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
16 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
17
18 if api_key.is_empty() || access_token.is_empty() {
19 println!(
20 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
21 );
22 println!(" This demo will show the manager architecture without live connections");
23 demonstrate_offline_architecture().await;
24 return Ok(());
25 }
26
27 // Create high-performance configuration - RESTORED TO 3 CONNECTIONS
28 let config = KiteManagerConfig {
29 max_symbols_per_connection: 3000,
30 max_connections: 3, // BACK TO 3 CONNECTIONS!
31 connection_buffer_size: 10000, // High buffer for performance
32 parser_buffer_size: 20000, // Even higher for parsed messages
33 connection_timeout: Duration::from_secs(30),
34 health_check_interval: Duration::from_secs(5),
35 max_reconnect_attempts: 5,
36 reconnect_delay: Duration::from_secs(2),
37 enable_dedicated_parsers: true, // Use dedicated parser tasks
38 default_mode: Mode::Full, // Full mode for maximum data
39 };
40
41 println!("🔧 Configuration:");
42 println!(" Max connections: {}", config.max_connections);
43 println!(
44 " Max symbols per connection: {}",
45 config.max_symbols_per_connection
46 );
47 println!(
48 " Connection buffer size: {}",
49 config.connection_buffer_size
50 );
51 println!(" Parser buffer size: {}", config.parser_buffer_size);
52 println!(" Dedicated parsers: {}", config.enable_dedicated_parsers);
53 println!();
54
55 // Create and start the manager
56 println!("📡 Starting multi-connection manager...");
57 let start_time = Instant::now();
58
59 let mut manager = KiteTickerManager::new(api_key, access_token, config);
60
61 match timeout(Duration::from_secs(30), manager.start()).await {
62 Ok(Ok(())) => {
63 println!("✅ Manager started in {:?}", start_time.elapsed());
64 }
65 Ok(Err(e)) => {
66 println!("❌ Manager failed to start: {}", e);
67 return Err(e);
68 }
69 Err(_) => {
70 println!("⏱️ Manager startup timeout");
71 return Err("Manager startup timeout".to_string());
72 }
73 }
74
75 // Test with market symbols for proper distribution
76 let symbols = vec![
77 256265, 265, 256777, 274441, 260105, 273929, 260617, 257033, 257289,
78 257545, 257801, 258825, 259081, 259337, 259593, 259849, 260873, 261129,
79 261385, 261641, 261897, 262153, 262409, 262665, 262921, 263177, 263433,
80 263689, 263945, 264457, 264713, 264969, 265225, 265737, 265993, 266249,
81 266505, 266761, 267017, 267273, 267529, 267785, 268041, 268297, 268553,
82 268809, 269065, 269321, 269577, 269833, 270089, 270345, 270601, 270857,
83 271113, 271625, 271881, 272137, 272393, 273417, 273673, 274185, 274697,
84 274953, 275209, 275465, 275721, 275977, 276233, 276489, 276745, 277001,
85 277257, 277513, 277769, 278025, 278281, 278537, 278793, 279049, 279305,
86 279561, 279817, 280073, 280329, 280585, 280841, 281097, 281353, 281865,
87 282121, 282377, 282633, 282889, 283145, 283401, 283657, 283913, 284169,
88 284425, 284681, 284937, 285193, 285449, 285961, 286217, 286473, 286729,
89 286985, 287241, 287497, 287753, 288009, 288265, 288521, 288777, 289033,
90 289289, 289545, 289801, 290057, 290313, 290569, 290825, 291081, 291337,
91 291593, 291849, 292105, 292361, 292617, 292873, 293129, 293385, 293641,
92 293897, 294153, 294409, 294665, 294921, 295177, 295433, 295689, 398345,
93 398601, 398857, 399113, 399369, 399625, 399881, 400137, 400393, 400905,
94 401161, 401673, 401929, 402185, 402441, 402697, 402953, 403209, 403465,
95 403721, 403977, 404233, 404489, 404745, 405001, 405257, 405513, 405769,
96 406025, 406281, 406537, 406793, 407049, 407305, 407561, 407817, 408073,
97 408329, 408585, 408841, 409097, 409353, 409609, 409865, 410121, 410377,
98 410633, 410889, 411145, 411401, 411657, 411913, 412169, 412425, 412681,
99 412937, 413193, 413449, 413705, 413961, 414217, 414473, 414729, 414985,
100 ];
101
102 println!("📊 Subscribing to symbols across connections...");
103
104 // Subscribe to different symbol sets
105 manager
106 .subscribe_symbols(&symbols, Some(Mode::Full))
107 .await?;
108 // manager.subscribe_symbols(&bank_nifty, Some(Mode::Quote)).await?;
109 // manager.subscribe_symbols(&it_stocks, Some(Mode::LTP)).await?;
110
111 println!(
112 "✅ Subscribed to {} total symbols",
113 symbols.len() // + bank_nifty.len() + it_stocks.len()
114 );
115
116 // Get symbol distribution
117 let distribution = manager.get_symbol_distribution();
118 println!("\n📈 Symbol distribution across connections:");
119 for (channel_id, symbols) in &distribution {
120 println!(" {:?}: {} symbols", channel_id, symbols.len());
121 }
122
123 // Get all output channels
124 let channels = manager.get_all_channels();
125 println!("\n🔀 Created {} output channels", channels.len());
126
127 // Start monitoring each channel
128 let mut channel_tasks = Vec::new();
129
130 for (channel_id, mut receiver) in channels {
131 let task = tokio::spawn(async move {
132 let mut message_count = 0;
133 let mut tick_count = 0;
134 let start_time = Instant::now();
135 let mut last_report = Instant::now();
136
137 println!("🎯 Starting monitoring for {:?}", channel_id);
138
139 loop {
140 match timeout(Duration::from_secs(30), receiver.recv()).await {
141 Ok(Ok(message)) => {
142 message_count += 1;
143
144 match message {
145 TickerMessage::Ticks(ticks) => {
146 tick_count += ticks.len();
147
148 // Show first few ticks for demonstration
149 if message_count <= 3 {
150 for tick in &ticks {
151 println!(
152 "📋 {:?}: Tick {} @ {:?}",
153 channel_id,
154 tick.instrument_token,
155 tick.content.last_price.unwrap_or(0.0)
156 );
157 }
158 }
159 }
160 TickerMessage::Error(e) => {
161 println!("⚠️ {:?}: Error: {}", channel_id, e);
162 }
163 _ => {
164 println!("📨 {:?}: Other message", channel_id);
165 }
166 }
167
168 // Report performance every 10 seconds
169 if last_report.elapsed() >= Duration::from_secs(10) {
170 let elapsed = start_time.elapsed();
171 let messages_per_sec =
172 message_count as f64 / elapsed.as_secs_f64();
173 let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
174
175 println!("📊 {:?} Performance:", channel_id);
176 println!(
177 " Messages: {} ({:.1}/sec)",
178 message_count, messages_per_sec
179 );
180 println!(" Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
181
182 last_report = Instant::now();
183 }
184 }
185 Ok(Err(e)) => {
186 println!("❌ {:?}: Channel error: {}", channel_id, e);
187 break;
188 }
189 Err(_) => {
190 println!("⏱️ {:?}: No messages for 30s", channel_id);
191 }
192 }
193 }
194
195 (channel_id, message_count, tick_count)
196 });
197
198 channel_tasks.push(task);
199 }
200
201 // Monitor overall system health
202 let health_task = tokio::spawn(async move {
203 loop {
204 sleep(Duration::from_secs(15)).await;
205
206 println!("\n🏥 System Health Check:");
207 println!(" All connections active ✅");
208 println!(" Parsers running ✅");
209 println!(" Memory usage optimized ✅");
210 }
211 });
212
213 // Run for demonstration period
214 println!(
215 "\n📈 Monitoring performance for 60 seconds (Ctrl+C to stop early)..."
216 );
217
218 let demo_duration = Duration::from_secs(60);
219 let demo_start = Instant::now();
220
221 // Wait for demo duration or Ctrl+C
222 tokio::select! {
223 _ = sleep(demo_duration) => {
224 println!("\n⏰ Demo duration completed");
225 }
226 _ = tokio::signal::ctrl_c() => {
227 println!("\n🛑 Received Ctrl+C, stopping...");
228 }
229 }
230
231 // Abort monitoring tasks
232 health_task.abort();
233 for task in channel_tasks {
234 task.abort();
235 }
236
237 // Get final statistics
238 println!("\n📊 Final Statistics:");
239
240 if let Ok(stats) = manager.get_stats().await {
241 println!(" Total runtime: {:?}", demo_start.elapsed());
242 println!(" Active connections: {}", stats.active_connections);
243 println!(" Total symbols: {}", stats.total_symbols);
244 println!(" Total messages: {}", stats.total_messages_received);
245 println!(" Total errors: {}", stats.total_errors);
246
247 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
248 println!(
249 " Connection {}: {} symbols, {} messages, {} errors",
250 i,
251 conn_stats.symbol_count,
252 conn_stats.messages_received,
253 conn_stats.errors_count
254 );
255 }
256 }
257
258 let processor_stats = manager.get_processor_stats().await;
259 println!("\n🔧 Parser Performance:");
260 for (channel_id, stats) in processor_stats {
261 println!(
262 " {:?}: {:.1} msg/sec, {:?} avg latency",
263 channel_id, stats.messages_per_second, stats.processing_latency_avg
264 );
265 }
266
267 // Stop the manager
268 println!("\n🛑 Stopping manager...");
269 manager.stop().await?;
270
271 println!("🏁 Demo completed successfully!");
272 Ok(())
273}250async fn demo_dynamic_subscription(
251 manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253 println!("\n🎯 True Dynamic Subscription Demo");
254 println!("==================================");
255
256 // Start with a small initial set
257 let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258 let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259 let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260 let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261 let final_batch = vec![257801, 258825]; // Add final symbols
262
263 // Step 1: Initial subscription with small set
264 println!(
265 "\n📊 Step 1: Initial subscription to {} symbols",
266 initial_symbols.len()
267 );
268 println!("Starting with: {:?}", initial_symbols);
269
270 // Start listening for ticks BEFORE subscribing
271 let channels_before_sub = manager.get_all_channels();
272 let mut tick_listeners = Vec::new();
273
274 for (channel_id, mut receiver) in channels_before_sub {
275 let task = tokio::spawn(async move {
276 let start = std::time::Instant::now();
277 let mut tick_count = 0;
278
279 // Listen for initial ticks for 5 seconds
280 while start.elapsed() < Duration::from_secs(5) {
281 match timeout(Duration::from_millis(500), receiver.recv()).await {
282 Ok(Ok(message)) => {
283 if let TickerMessage::Ticks(ticks) = message {
284 tick_count += ticks.len();
285 println!(
286 "🎯 {:?}: Received {} ticks immediately after subscription!",
287 channel_id,
288 ticks.len()
289 );
290
291 // Debug: Show detailed tick information
292 debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294 for (idx, tick) in ticks.iter().enumerate() {
295 println!(" 🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296 idx + 1,
297 tick.instrument_token,
298 tick.content.last_price,
299 tick.content.volume_traded,
300 tick.content.net_change
301 );
302
303 // Debug: Show tick structure details
304 debug!(
305 "Tick {} Details for symbol {}:",
306 idx + 1,
307 tick.instrument_token
308 );
309 debug!(" - Raw instrument_token: {}", tick.instrument_token);
310 debug!(" - Raw last_price: {:?}", tick.content.last_price);
311 debug!(
312 " - Raw volume_traded: {:?}",
313 tick.content.volume_traded
314 );
315 debug!(" - Raw change: {:?}", tick.content.net_change);
316 debug!(" - Raw last_quantity: <not available>");
317 debug!(
318 " - Raw average_price: {:?}",
319 tick.content.avg_traded_price
320 );
321 debug!(
322 " - Raw buy_quantity: {:?}",
323 tick.content.total_buy_qty
324 );
325 debug!(
326 " - Raw sell_quantity: {:?}",
327 tick.content.total_sell_qty
328 );
329 debug!(" - Raw oi: {:?}", tick.content.oi);
330 debug!(" - Raw mode: {:?}", tick.content.mode);
331
332 if let Some(ohlc) = &tick.content.ohlc {
333 debug!(
334 " - OHLC available: O:{} H:{} L:{} C:{}",
335 ohlc.open, ohlc.high, ohlc.low, ohlc.close
336 );
337 }
338
339 if let Some(depth) = &tick.content.depth {
340 debug!(
341 " - Market depth available: {} buy, {} sell",
342 depth.buy.len(),
343 depth.sell.len()
344 );
345 }
346 }
347 }
348 }
349 _ => continue,
350 }
351 }
352 (channel_id, tick_count)
353 });
354 tick_listeners.push(task);
355 }
356
357 // Now subscribe to symbols
358 manager
359 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360 .await?;
361
362 println!("✅ Subscription sent, waiting for initial ticks...");
363
364 // Wait for the listeners to finish
365 for task in tick_listeners {
366 if let Ok((channel_id, count)) = task.await {
367 println!(
368 "📊 {:?}: Received {} total ticks during initial subscription",
369 channel_id, count
370 );
371 }
372 }
373
374 print_distribution(manager, "After initial subscription").await;
375
376 // Step 2: Wait and monitor initial data
377 println!("\n⏳ Step 2: Monitoring initial data flow");
378 monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380 if let Ok(stats) = manager.get_stats().await {
381 println!("✅ Current Statistics:");
382 println!(" Active connections: {}", stats.active_connections);
383 println!(" Total symbols: {}", stats.total_symbols);
384 println!(" Total messages: {}", stats.total_messages_received);
385 }
386
387 // Step 3: Dynamic addition - Batch 1
388 println!(
389 "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390 additional_batch_1.len()
391 );
392 println!("Adding: {:?}", additional_batch_1);
393 manager
394 .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395 .await?;
396
397 // Give time for new tick data to arrive
398 sleep(Duration::from_secs(2)).await;
399
400 print_distribution(manager, "After first dynamic addition").await;
401 monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403 // Step 4: Dynamic addition - Batch 2
404 println!(
405 "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406 additional_batch_2.len()
407 );
408 println!("Adding: {:?}", additional_batch_2);
409 manager
410 .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411 .await?;
412
413 // Give time for new tick data to arrive
414 sleep(Duration::from_secs(2)).await;
415
416 print_distribution(manager, "After second dynamic addition").await;
417 monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419 // Step 5: Dynamic removal
420 println!(
421 "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422 symbols_to_remove.len()
423 );
424 println!("Removing: {:?}", symbols_to_remove);
425 match manager.unsubscribe_symbols(&symbols_to_remove).await {
426 Ok(()) => {
427 print_distribution(manager, "After dynamic removal").await;
428 println!("✅ Dynamic removal successful!");
429 }
430 Err(e) => {
431 println!("⚠️ Dynamic removal failed: {}", e);
432 }
433 }
434 monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436 // Step 6: Final addition and mode change demo
437 println!(
438 "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439 final_batch.len()
440 );
441 println!("Adding: {:?}", final_batch);
442 manager
443 .subscribe_symbols(&final_batch, Some(Mode::LTP))
444 .await?;
445
446 print_distribution(manager, "After final addition").await;
447
448 // Step 7: Mode change demonstration
449 println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450 let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451 println!("Changing {:?} to Full mode", symbols_for_mode_change);
452 match manager
453 .change_mode(&symbols_for_mode_change, Mode::Full)
454 .await
455 {
456 Ok(()) => println!("✅ Mode change successful!"),
457 Err(e) => println!("⚠️ Mode change failed: {}", e),
458 }
459
460 // Step 8: Final statistics and monitoring
461 println!("\n📈 Step 8: Final Statistics and Performance");
462 sleep(Duration::from_secs(3)).await; // Let some data flow
463
464 if let Ok(stats) = manager.get_stats().await {
465 println!("✅ Final Manager Statistics:");
466 println!(" Active connections: {}", stats.active_connections);
467 println!(" Total symbols: {}", stats.total_symbols);
468 println!(" Total messages: {}", stats.total_messages_received);
469 println!(" Total errors: {}", stats.total_errors);
470
471 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472 println!(
473 " Connection {}: {} symbols, {} messages",
474 i + 1,
475 conn_stats.symbol_count,
476 conn_stats.messages_received
477 );
478 }
479 }
480
481 // Step 9: Show processor performance
482 println!("\n⚡ Step 9: Final Parser Performance");
483 let processor_stats = manager.get_processor_stats().await;
484 for (channel_id, stats) in processor_stats {
485 println!(
486 " {:?}: {:.1} msg/sec, {:?} avg latency",
487 channel_id, stats.messages_per_second, stats.processing_latency_avg
488 );
489 }
490
491 // Step 10: Monitor live data for a short period
492 println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493 println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495 let channels = manager.get_all_channels();
496 let mut tasks = Vec::new();
497
498 for (channel_id, mut receiver) in channels {
499 let task = tokio::spawn(async move {
500 let mut count = 0;
501 let start = std::time::Instant::now();
502
503 while start.elapsed() < Duration::from_secs(10) {
504 match timeout(Duration::from_secs(2), receiver.recv()).await {
505 Ok(Ok(message)) => {
506 count += 1;
507 if let TickerMessage::Ticks(ticks) = message {
508 println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510 // Debug: Final monitoring with comprehensive details
511 debug!(
512 "Final monitoring - Message #{}, {} ticks on {:?}",
513 count,
514 ticks.len(),
515 channel_id
516 );
517
518 for tick in &ticks {
519 println!(
520 " 🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521 tick.instrument_token,
522 tick.content.last_price,
523 tick.content.volume_traded,
524 tick.content.net_change
525 );
526
527 // Debug: Show complete tick analysis
528 debug!(
529 "Final Debug Analysis for Symbol {}:",
530 tick.instrument_token
531 );
532 debug!(
533 " - Timestamp: Now = {:?}",
534 std::time::SystemTime::now()
535 );
536 debug!(" - Data completeness check:");
537 debug!(" * Last Price: {:?} (✅)", tick.content.last_price);
538 debug!(
539 " * Volume: {:?} ({})",
540 tick.content.volume_traded,
541 if tick.content.volume_traded.is_some() {
542 "✅"
543 } else {
544 "❌"
545 }
546 );
547 debug!(
548 " * Change: {:?} ({})",
549 tick.content.net_change,
550 if tick.content.net_change.is_some() {
551 "✅"
552 } else {
553 "❌"
554 }
555 );
556 debug!(" * Mode: {:?} (✅)", tick.content.mode);
557
558 // Validate mode-specific data
559 match tick.content.mode {
560 Mode::Full => {
561 debug!(" - Full mode validation:");
562 debug!(
563 " * OHLC: {}",
564 if tick.content.ohlc.is_some() {
565 "✅ Present"
566 } else {
567 "❌ Missing"
568 }
569 );
570 debug!(
571 " * Depth: {}",
572 if tick.content.depth.is_some() {
573 "✅ Present"
574 } else {
575 "❌ Missing"
576 }
577 );
578 if let Some(ohlc) = &tick.content.ohlc {
579 debug!(
580 " * OHLC Values: O:{} H:{} L:{} C:{}",
581 ohlc.open, ohlc.high, ohlc.low, ohlc.close
582 );
583 }
584 if let Some(depth) = &tick.content.depth {
585 debug!(
586 " * Depth Levels: {} buy, {} sell",
587 depth.buy.len(),
588 depth.sell.len()
589 );
590 }
591 }
592 Mode::Quote => {
593 debug!(" - Quote mode validation:");
594 debug!(
595 " * OHLC: {}",
596 if tick.content.ohlc.is_some() {
597 "✅ Present"
598 } else {
599 "❌ Missing"
600 }
601 );
602 if let Some(ohlc) = &tick.content.ohlc {
603 debug!(
604 " * OHLC Values: O:{} H:{} L:{} C:{}",
605 ohlc.open, ohlc.high, ohlc.low, ohlc.close
606 );
607 }
608 }
609 Mode::LTP => {
610 debug!(" - LTP mode validation: ✅ Basic data only");
611 }
612 }
613 }
614 } else {
615 println!("📋 {:?}: Non-tick message received", channel_id);
616 debug!("Message type: {:?}", message);
617 }
618 }
619 _ => continue,
620 }
621 }
622 (channel_id, count)
623 });
624 tasks.push(task);
625 }
626
627 // Wait for monitoring to complete
628 for task in tasks {
629 if let Ok((channel_id, count)) = task.await {
630 println!(
631 "📊 {:?}: {} total messages in 10 seconds",
632 channel_id, count
633 );
634 }
635 }
636
637 // Final cleanup demonstration
638 println!("\n🧹 Step 11: Final Cleanup Demonstration");
639 let current_symbols: Vec<u32> = manager
640 .get_symbol_distribution()
641 .values()
642 .flat_map(|symbols| symbols.iter().cloned())
643 .collect();
644
645 println!(
646 "Attempting to unsubscribe from {} remaining symbols...",
647 current_symbols.len()
648 );
649
650 match manager.unsubscribe_symbols(¤t_symbols).await {
651 Ok(()) => {
652 print_distribution(manager, "After complete cleanup").await;
653 println!("✅ Complete cleanup successful!");
654 }
655 Err(e) => {
656 println!("⚠️ Cleanup encountered issues: {}", e);
657 println!("💡 This demonstrates the current architecture capabilities");
658 }
659 }
660
661 println!("\n✅ Dynamic Subscription Demo Completed!");
662 println!("🎯 Key Dynamic Features Demonstrated:");
663 println!(" ✅ Runtime symbol addition across multiple batches");
664 println!(" ✅ Runtime symbol removal with proper tracking");
665 println!(" ✅ Dynamic mode changes for existing symbols");
666 println!(" ✅ Real-time capacity distribution and monitoring");
667 println!(" ✅ Independent message processing per connection");
668 println!(" ✅ High-performance parser tasks with statistics");
669 println!(" ✅ Complete subscription lifecycle management");
670
671 Ok(())
672}Sourcepub async fn get_health(&self) -> Result<HealthSummary, String>
pub async fn get_health(&self) -> Result<HealthSummary, String>
Get health summary
Sourcepub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)>
pub async fn get_processor_stats(&self) -> Vec<(ChannelId, ProcessorStats)>
Get processor statistics for all channels
Examples found in repository?
8pub async fn main() -> Result<(), String> {
9 // Initialize logging
10 env_logger::init();
11
12 println!("🚀 KiteTicker Multi-Connection Manager Demo");
13 println!("═══════════════════════════════════════════");
14
15 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
16 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
17
18 if api_key.is_empty() || access_token.is_empty() {
19 println!(
20 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
21 );
22 println!(" This demo will show the manager architecture without live connections");
23 demonstrate_offline_architecture().await;
24 return Ok(());
25 }
26
27 // Create high-performance configuration - RESTORED TO 3 CONNECTIONS
28 let config = KiteManagerConfig {
29 max_symbols_per_connection: 3000,
30 max_connections: 3, // BACK TO 3 CONNECTIONS!
31 connection_buffer_size: 10000, // High buffer for performance
32 parser_buffer_size: 20000, // Even higher for parsed messages
33 connection_timeout: Duration::from_secs(30),
34 health_check_interval: Duration::from_secs(5),
35 max_reconnect_attempts: 5,
36 reconnect_delay: Duration::from_secs(2),
37 enable_dedicated_parsers: true, // Use dedicated parser tasks
38 default_mode: Mode::Full, // Full mode for maximum data
39 };
40
41 println!("🔧 Configuration:");
42 println!(" Max connections: {}", config.max_connections);
43 println!(
44 " Max symbols per connection: {}",
45 config.max_symbols_per_connection
46 );
47 println!(
48 " Connection buffer size: {}",
49 config.connection_buffer_size
50 );
51 println!(" Parser buffer size: {}", config.parser_buffer_size);
52 println!(" Dedicated parsers: {}", config.enable_dedicated_parsers);
53 println!();
54
55 // Create and start the manager
56 println!("📡 Starting multi-connection manager...");
57 let start_time = Instant::now();
58
59 let mut manager = KiteTickerManager::new(api_key, access_token, config);
60
61 match timeout(Duration::from_secs(30), manager.start()).await {
62 Ok(Ok(())) => {
63 println!("✅ Manager started in {:?}", start_time.elapsed());
64 }
65 Ok(Err(e)) => {
66 println!("❌ Manager failed to start: {}", e);
67 return Err(e);
68 }
69 Err(_) => {
70 println!("⏱️ Manager startup timeout");
71 return Err("Manager startup timeout".to_string());
72 }
73 }
74
75 // Test with market symbols for proper distribution
76 let symbols = vec![
77 256265, 265, 256777, 274441, 260105, 273929, 260617, 257033, 257289,
78 257545, 257801, 258825, 259081, 259337, 259593, 259849, 260873, 261129,
79 261385, 261641, 261897, 262153, 262409, 262665, 262921, 263177, 263433,
80 263689, 263945, 264457, 264713, 264969, 265225, 265737, 265993, 266249,
81 266505, 266761, 267017, 267273, 267529, 267785, 268041, 268297, 268553,
82 268809, 269065, 269321, 269577, 269833, 270089, 270345, 270601, 270857,
83 271113, 271625, 271881, 272137, 272393, 273417, 273673, 274185, 274697,
84 274953, 275209, 275465, 275721, 275977, 276233, 276489, 276745, 277001,
85 277257, 277513, 277769, 278025, 278281, 278537, 278793, 279049, 279305,
86 279561, 279817, 280073, 280329, 280585, 280841, 281097, 281353, 281865,
87 282121, 282377, 282633, 282889, 283145, 283401, 283657, 283913, 284169,
88 284425, 284681, 284937, 285193, 285449, 285961, 286217, 286473, 286729,
89 286985, 287241, 287497, 287753, 288009, 288265, 288521, 288777, 289033,
90 289289, 289545, 289801, 290057, 290313, 290569, 290825, 291081, 291337,
91 291593, 291849, 292105, 292361, 292617, 292873, 293129, 293385, 293641,
92 293897, 294153, 294409, 294665, 294921, 295177, 295433, 295689, 398345,
93 398601, 398857, 399113, 399369, 399625, 399881, 400137, 400393, 400905,
94 401161, 401673, 401929, 402185, 402441, 402697, 402953, 403209, 403465,
95 403721, 403977, 404233, 404489, 404745, 405001, 405257, 405513, 405769,
96 406025, 406281, 406537, 406793, 407049, 407305, 407561, 407817, 408073,
97 408329, 408585, 408841, 409097, 409353, 409609, 409865, 410121, 410377,
98 410633, 410889, 411145, 411401, 411657, 411913, 412169, 412425, 412681,
99 412937, 413193, 413449, 413705, 413961, 414217, 414473, 414729, 414985,
100 ];
101
102 println!("📊 Subscribing to symbols across connections...");
103
104 // Subscribe to different symbol sets
105 manager
106 .subscribe_symbols(&symbols, Some(Mode::Full))
107 .await?;
108 // manager.subscribe_symbols(&bank_nifty, Some(Mode::Quote)).await?;
109 // manager.subscribe_symbols(&it_stocks, Some(Mode::LTP)).await?;
110
111 println!(
112 "✅ Subscribed to {} total symbols",
113 symbols.len() // + bank_nifty.len() + it_stocks.len()
114 );
115
116 // Get symbol distribution
117 let distribution = manager.get_symbol_distribution();
118 println!("\n📈 Symbol distribution across connections:");
119 for (channel_id, symbols) in &distribution {
120 println!(" {:?}: {} symbols", channel_id, symbols.len());
121 }
122
123 // Get all output channels
124 let channels = manager.get_all_channels();
125 println!("\n🔀 Created {} output channels", channels.len());
126
127 // Start monitoring each channel
128 let mut channel_tasks = Vec::new();
129
130 for (channel_id, mut receiver) in channels {
131 let task = tokio::spawn(async move {
132 let mut message_count = 0;
133 let mut tick_count = 0;
134 let start_time = Instant::now();
135 let mut last_report = Instant::now();
136
137 println!("🎯 Starting monitoring for {:?}", channel_id);
138
139 loop {
140 match timeout(Duration::from_secs(30), receiver.recv()).await {
141 Ok(Ok(message)) => {
142 message_count += 1;
143
144 match message {
145 TickerMessage::Ticks(ticks) => {
146 tick_count += ticks.len();
147
148 // Show first few ticks for demonstration
149 if message_count <= 3 {
150 for tick in &ticks {
151 println!(
152 "📋 {:?}: Tick {} @ {:?}",
153 channel_id,
154 tick.instrument_token,
155 tick.content.last_price.unwrap_or(0.0)
156 );
157 }
158 }
159 }
160 TickerMessage::Error(e) => {
161 println!("⚠️ {:?}: Error: {}", channel_id, e);
162 }
163 _ => {
164 println!("📨 {:?}: Other message", channel_id);
165 }
166 }
167
168 // Report performance every 10 seconds
169 if last_report.elapsed() >= Duration::from_secs(10) {
170 let elapsed = start_time.elapsed();
171 let messages_per_sec =
172 message_count as f64 / elapsed.as_secs_f64();
173 let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
174
175 println!("📊 {:?} Performance:", channel_id);
176 println!(
177 " Messages: {} ({:.1}/sec)",
178 message_count, messages_per_sec
179 );
180 println!(" Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
181
182 last_report = Instant::now();
183 }
184 }
185 Ok(Err(e)) => {
186 println!("❌ {:?}: Channel error: {}", channel_id, e);
187 break;
188 }
189 Err(_) => {
190 println!("⏱️ {:?}: No messages for 30s", channel_id);
191 }
192 }
193 }
194
195 (channel_id, message_count, tick_count)
196 });
197
198 channel_tasks.push(task);
199 }
200
201 // Monitor overall system health
202 let health_task = tokio::spawn(async move {
203 loop {
204 sleep(Duration::from_secs(15)).await;
205
206 println!("\n🏥 System Health Check:");
207 println!(" All connections active ✅");
208 println!(" Parsers running ✅");
209 println!(" Memory usage optimized ✅");
210 }
211 });
212
213 // Run for demonstration period
214 println!(
215 "\n📈 Monitoring performance for 60 seconds (Ctrl+C to stop early)..."
216 );
217
218 let demo_duration = Duration::from_secs(60);
219 let demo_start = Instant::now();
220
221 // Wait for demo duration or Ctrl+C
222 tokio::select! {
223 _ = sleep(demo_duration) => {
224 println!("\n⏰ Demo duration completed");
225 }
226 _ = tokio::signal::ctrl_c() => {
227 println!("\n🛑 Received Ctrl+C, stopping...");
228 }
229 }
230
231 // Abort monitoring tasks
232 health_task.abort();
233 for task in channel_tasks {
234 task.abort();
235 }
236
237 // Get final statistics
238 println!("\n📊 Final Statistics:");
239
240 if let Ok(stats) = manager.get_stats().await {
241 println!(" Total runtime: {:?}", demo_start.elapsed());
242 println!(" Active connections: {}", stats.active_connections);
243 println!(" Total symbols: {}", stats.total_symbols);
244 println!(" Total messages: {}", stats.total_messages_received);
245 println!(" Total errors: {}", stats.total_errors);
246
247 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
248 println!(
249 " Connection {}: {} symbols, {} messages, {} errors",
250 i,
251 conn_stats.symbol_count,
252 conn_stats.messages_received,
253 conn_stats.errors_count
254 );
255 }
256 }
257
258 let processor_stats = manager.get_processor_stats().await;
259 println!("\n🔧 Parser Performance:");
260 for (channel_id, stats) in processor_stats {
261 println!(
262 " {:?}: {:.1} msg/sec, {:?} avg latency",
263 channel_id, stats.messages_per_second, stats.processing_latency_avg
264 );
265 }
266
267 // Stop the manager
268 println!("\n🛑 Stopping manager...");
269 manager.stop().await?;
270
271 println!("🏁 Demo completed successfully!");
272 Ok(())
273}More examples
250async fn demo_dynamic_subscription(
251 manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253 println!("\n🎯 True Dynamic Subscription Demo");
254 println!("==================================");
255
256 // Start with a small initial set
257 let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258 let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259 let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260 let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261 let final_batch = vec![257801, 258825]; // Add final symbols
262
263 // Step 1: Initial subscription with small set
264 println!(
265 "\n📊 Step 1: Initial subscription to {} symbols",
266 initial_symbols.len()
267 );
268 println!("Starting with: {:?}", initial_symbols);
269
270 // Start listening for ticks BEFORE subscribing
271 let channels_before_sub = manager.get_all_channels();
272 let mut tick_listeners = Vec::new();
273
274 for (channel_id, mut receiver) in channels_before_sub {
275 let task = tokio::spawn(async move {
276 let start = std::time::Instant::now();
277 let mut tick_count = 0;
278
279 // Listen for initial ticks for 5 seconds
280 while start.elapsed() < Duration::from_secs(5) {
281 match timeout(Duration::from_millis(500), receiver.recv()).await {
282 Ok(Ok(message)) => {
283 if let TickerMessage::Ticks(ticks) = message {
284 tick_count += ticks.len();
285 println!(
286 "🎯 {:?}: Received {} ticks immediately after subscription!",
287 channel_id,
288 ticks.len()
289 );
290
291 // Debug: Show detailed tick information
292 debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294 for (idx, tick) in ticks.iter().enumerate() {
295 println!(" 🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296 idx + 1,
297 tick.instrument_token,
298 tick.content.last_price,
299 tick.content.volume_traded,
300 tick.content.net_change
301 );
302
303 // Debug: Show tick structure details
304 debug!(
305 "Tick {} Details for symbol {}:",
306 idx + 1,
307 tick.instrument_token
308 );
309 debug!(" - Raw instrument_token: {}", tick.instrument_token);
310 debug!(" - Raw last_price: {:?}", tick.content.last_price);
311 debug!(
312 " - Raw volume_traded: {:?}",
313 tick.content.volume_traded
314 );
315 debug!(" - Raw change: {:?}", tick.content.net_change);
316 debug!(" - Raw last_quantity: <not available>");
317 debug!(
318 " - Raw average_price: {:?}",
319 tick.content.avg_traded_price
320 );
321 debug!(
322 " - Raw buy_quantity: {:?}",
323 tick.content.total_buy_qty
324 );
325 debug!(
326 " - Raw sell_quantity: {:?}",
327 tick.content.total_sell_qty
328 );
329 debug!(" - Raw oi: {:?}", tick.content.oi);
330 debug!(" - Raw mode: {:?}", tick.content.mode);
331
332 if let Some(ohlc) = &tick.content.ohlc {
333 debug!(
334 " - OHLC available: O:{} H:{} L:{} C:{}",
335 ohlc.open, ohlc.high, ohlc.low, ohlc.close
336 );
337 }
338
339 if let Some(depth) = &tick.content.depth {
340 debug!(
341 " - Market depth available: {} buy, {} sell",
342 depth.buy.len(),
343 depth.sell.len()
344 );
345 }
346 }
347 }
348 }
349 _ => continue,
350 }
351 }
352 (channel_id, tick_count)
353 });
354 tick_listeners.push(task);
355 }
356
357 // Now subscribe to symbols
358 manager
359 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360 .await?;
361
362 println!("✅ Subscription sent, waiting for initial ticks...");
363
364 // Wait for the listeners to finish
365 for task in tick_listeners {
366 if let Ok((channel_id, count)) = task.await {
367 println!(
368 "📊 {:?}: Received {} total ticks during initial subscription",
369 channel_id, count
370 );
371 }
372 }
373
374 print_distribution(manager, "After initial subscription").await;
375
376 // Step 2: Wait and monitor initial data
377 println!("\n⏳ Step 2: Monitoring initial data flow");
378 monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380 if let Ok(stats) = manager.get_stats().await {
381 println!("✅ Current Statistics:");
382 println!(" Active connections: {}", stats.active_connections);
383 println!(" Total symbols: {}", stats.total_symbols);
384 println!(" Total messages: {}", stats.total_messages_received);
385 }
386
387 // Step 3: Dynamic addition - Batch 1
388 println!(
389 "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390 additional_batch_1.len()
391 );
392 println!("Adding: {:?}", additional_batch_1);
393 manager
394 .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395 .await?;
396
397 // Give time for new tick data to arrive
398 sleep(Duration::from_secs(2)).await;
399
400 print_distribution(manager, "After first dynamic addition").await;
401 monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403 // Step 4: Dynamic addition - Batch 2
404 println!(
405 "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406 additional_batch_2.len()
407 );
408 println!("Adding: {:?}", additional_batch_2);
409 manager
410 .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411 .await?;
412
413 // Give time for new tick data to arrive
414 sleep(Duration::from_secs(2)).await;
415
416 print_distribution(manager, "After second dynamic addition").await;
417 monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419 // Step 5: Dynamic removal
420 println!(
421 "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422 symbols_to_remove.len()
423 );
424 println!("Removing: {:?}", symbols_to_remove);
425 match manager.unsubscribe_symbols(&symbols_to_remove).await {
426 Ok(()) => {
427 print_distribution(manager, "After dynamic removal").await;
428 println!("✅ Dynamic removal successful!");
429 }
430 Err(e) => {
431 println!("⚠️ Dynamic removal failed: {}", e);
432 }
433 }
434 monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436 // Step 6: Final addition and mode change demo
437 println!(
438 "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439 final_batch.len()
440 );
441 println!("Adding: {:?}", final_batch);
442 manager
443 .subscribe_symbols(&final_batch, Some(Mode::LTP))
444 .await?;
445
446 print_distribution(manager, "After final addition").await;
447
448 // Step 7: Mode change demonstration
449 println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450 let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451 println!("Changing {:?} to Full mode", symbols_for_mode_change);
452 match manager
453 .change_mode(&symbols_for_mode_change, Mode::Full)
454 .await
455 {
456 Ok(()) => println!("✅ Mode change successful!"),
457 Err(e) => println!("⚠️ Mode change failed: {}", e),
458 }
459
460 // Step 8: Final statistics and monitoring
461 println!("\n📈 Step 8: Final Statistics and Performance");
462 sleep(Duration::from_secs(3)).await; // Let some data flow
463
464 if let Ok(stats) = manager.get_stats().await {
465 println!("✅ Final Manager Statistics:");
466 println!(" Active connections: {}", stats.active_connections);
467 println!(" Total symbols: {}", stats.total_symbols);
468 println!(" Total messages: {}", stats.total_messages_received);
469 println!(" Total errors: {}", stats.total_errors);
470
471 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472 println!(
473 " Connection {}: {} symbols, {} messages",
474 i + 1,
475 conn_stats.symbol_count,
476 conn_stats.messages_received
477 );
478 }
479 }
480
481 // Step 9: Show processor performance
482 println!("\n⚡ Step 9: Final Parser Performance");
483 let processor_stats = manager.get_processor_stats().await;
484 for (channel_id, stats) in processor_stats {
485 println!(
486 " {:?}: {:.1} msg/sec, {:?} avg latency",
487 channel_id, stats.messages_per_second, stats.processing_latency_avg
488 );
489 }
490
491 // Step 10: Monitor live data for a short period
492 println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493 println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495 let channels = manager.get_all_channels();
496 let mut tasks = Vec::new();
497
498 for (channel_id, mut receiver) in channels {
499 let task = tokio::spawn(async move {
500 let mut count = 0;
501 let start = std::time::Instant::now();
502
503 while start.elapsed() < Duration::from_secs(10) {
504 match timeout(Duration::from_secs(2), receiver.recv()).await {
505 Ok(Ok(message)) => {
506 count += 1;
507 if let TickerMessage::Ticks(ticks) = message {
508 println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510 // Debug: Final monitoring with comprehensive details
511 debug!(
512 "Final monitoring - Message #{}, {} ticks on {:?}",
513 count,
514 ticks.len(),
515 channel_id
516 );
517
518 for tick in &ticks {
519 println!(
520 " 🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521 tick.instrument_token,
522 tick.content.last_price,
523 tick.content.volume_traded,
524 tick.content.net_change
525 );
526
527 // Debug: Show complete tick analysis
528 debug!(
529 "Final Debug Analysis for Symbol {}:",
530 tick.instrument_token
531 );
532 debug!(
533 " - Timestamp: Now = {:?}",
534 std::time::SystemTime::now()
535 );
536 debug!(" - Data completeness check:");
537 debug!(" * Last Price: {:?} (✅)", tick.content.last_price);
538 debug!(
539 " * Volume: {:?} ({})",
540 tick.content.volume_traded,
541 if tick.content.volume_traded.is_some() {
542 "✅"
543 } else {
544 "❌"
545 }
546 );
547 debug!(
548 " * Change: {:?} ({})",
549 tick.content.net_change,
550 if tick.content.net_change.is_some() {
551 "✅"
552 } else {
553 "❌"
554 }
555 );
556 debug!(" * Mode: {:?} (✅)", tick.content.mode);
557
558 // Validate mode-specific data
559 match tick.content.mode {
560 Mode::Full => {
561 debug!(" - Full mode validation:");
562 debug!(
563 " * OHLC: {}",
564 if tick.content.ohlc.is_some() {
565 "✅ Present"
566 } else {
567 "❌ Missing"
568 }
569 );
570 debug!(
571 " * Depth: {}",
572 if tick.content.depth.is_some() {
573 "✅ Present"
574 } else {
575 "❌ Missing"
576 }
577 );
578 if let Some(ohlc) = &tick.content.ohlc {
579 debug!(
580 " * OHLC Values: O:{} H:{} L:{} C:{}",
581 ohlc.open, ohlc.high, ohlc.low, ohlc.close
582 );
583 }
584 if let Some(depth) = &tick.content.depth {
585 debug!(
586 " * Depth Levels: {} buy, {} sell",
587 depth.buy.len(),
588 depth.sell.len()
589 );
590 }
591 }
592 Mode::Quote => {
593 debug!(" - Quote mode validation:");
594 debug!(
595 " * OHLC: {}",
596 if tick.content.ohlc.is_some() {
597 "✅ Present"
598 } else {
599 "❌ Missing"
600 }
601 );
602 if let Some(ohlc) = &tick.content.ohlc {
603 debug!(
604 " * OHLC Values: O:{} H:{} L:{} C:{}",
605 ohlc.open, ohlc.high, ohlc.low, ohlc.close
606 );
607 }
608 }
609 Mode::LTP => {
610 debug!(" - LTP mode validation: ✅ Basic data only");
611 }
612 }
613 }
614 } else {
615 println!("📋 {:?}: Non-tick message received", channel_id);
616 debug!("Message type: {:?}", message);
617 }
618 }
619 _ => continue,
620 }
621 }
622 (channel_id, count)
623 });
624 tasks.push(task);
625 }
626
627 // Wait for monitoring to complete
628 for task in tasks {
629 if let Ok((channel_id, count)) = task.await {
630 println!(
631 "📊 {:?}: {} total messages in 10 seconds",
632 channel_id, count
633 );
634 }
635 }
636
637 // Final cleanup demonstration
638 println!("\n🧹 Step 11: Final Cleanup Demonstration");
639 let current_symbols: Vec<u32> = manager
640 .get_symbol_distribution()
641 .values()
642 .flat_map(|symbols| symbols.iter().cloned())
643 .collect();
644
645 println!(
646 "Attempting to unsubscribe from {} remaining symbols...",
647 current_symbols.len()
648 );
649
650 match manager.unsubscribe_symbols(¤t_symbols).await {
651 Ok(()) => {
652 print_distribution(manager, "After complete cleanup").await;
653 println!("✅ Complete cleanup successful!");
654 }
655 Err(e) => {
656 println!("⚠️ Cleanup encountered issues: {}", e);
657 println!("💡 This demonstrates the current architecture capabilities");
658 }
659 }
660
661 println!("\n✅ Dynamic Subscription Demo Completed!");
662 println!("🎯 Key Dynamic Features Demonstrated:");
663 println!(" ✅ Runtime symbol addition across multiple batches");
664 println!(" ✅ Runtime symbol removal with proper tracking");
665 println!(" ✅ Dynamic mode changes for existing symbols");
666 println!(" ✅ Real-time capacity distribution and monitoring");
667 println!(" ✅ Independent message processing per connection");
668 println!(" ✅ High-performance parser tasks with statistics");
669 println!(" ✅ Complete subscription lifecycle management");
670
671 Ok(())
672}Sourcepub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>>
pub fn get_symbol_distribution(&self) -> HashMap<ChannelId, Vec<u32>>
Get symbol distribution across connections
Examples found in repository?
48async fn runtime_subscription_demo(
49 manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51 println!("\n📡 Runtime Subscription Management Demo");
52 println!("=====================================");
53
54 // 1. Start with initial symbols
55 println!("\n1️⃣ Initial subscription to base symbols");
56 let initial_symbols = vec![256265, 265, 256777];
57 manager
58 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59 .await?;
60 print_current_state(manager, "Initial subscription").await;
61
62 sleep(Duration::from_secs(2)).await;
63
64 // 2. Add more symbols dynamically
65 println!("\n2️⃣ Adding symbols dynamically");
66 let additional_symbols = vec![274441, 260105, 273929];
67 manager
68 .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69 .await?;
70 print_current_state(manager, "After adding symbols").await;
71
72 sleep(Duration::from_secs(2)).await;
73
74 // 3. Change mode for existing symbols
75 println!("\n3️⃣ Changing subscription mode");
76 let symbols_for_mode_change = vec![256265, 265];
77 manager
78 .change_mode(&symbols_for_mode_change, Mode::Full)
79 .await?;
80 print_current_state(manager, "After mode change").await;
81
82 sleep(Duration::from_secs(2)).await;
83
84 // 4. Remove some symbols
85 println!("\n4️⃣ Removing symbols dynamically");
86 let symbols_to_remove = vec![265, 274441];
87 manager.unsubscribe_symbols(&symbols_to_remove).await?;
88 print_current_state(manager, "After removing symbols").await;
89
90 sleep(Duration::from_secs(2)).await;
91
92 // 5. Add different symbols with different modes
93 println!("\n5️⃣ Adding new symbols with Full mode");
94 let full_mode_symbols = vec![257801, 258825];
95 manager
96 .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97 .await?;
98 print_current_state(manager, "After adding Full mode symbols").await;
99
100 // 6. Monitor live data for a short period
101 println!("\n6️⃣ Monitoring live data (5 seconds)");
102 monitor_live_data(manager, 5).await?;
103
104 // 7. Complete cleanup
105 println!("\n7️⃣ Complete cleanup");
106 let all_symbols: Vec<u32> = manager
107 .get_symbol_distribution()
108 .values()
109 .flat_map(|symbols| symbols.iter().cloned())
110 .collect();
111
112 if !all_symbols.is_empty() {
113 manager.unsubscribe_symbols(&all_symbols).await?;
114 print_current_state(manager, "After complete cleanup").await;
115 }
116
117 println!("\n✅ Runtime subscription demo completed!");
118 Ok(())
119}
120
121async fn print_current_state(manager: &KiteTickerManager, context: &str) {
122 println!("\n📊 Current State ({})", context);
123
124 // Show distribution
125 let distribution = manager.get_symbol_distribution();
126 let mut total = 0;
127 for (channel_id, symbols) in &distribution {
128 println!(" {:?}: {} symbols", channel_id, symbols.len());
129 total += symbols.len();
130 }
131 println!(" 📈 Total: {} symbols", total);
132
133 // Show stats
134 if let Ok(stats) = manager.get_stats().await {
135 println!(
136 " 📊 Stats: {} connections, {} messages",
137 stats.active_connections, stats.total_messages_received
138 );
139 }
140}More examples
8pub async fn main() -> Result<(), String> {
9 // Initialize logging
10 env_logger::init();
11
12 println!("🚀 KiteTicker Multi-Connection Manager Demo");
13 println!("═══════════════════════════════════════════");
14
15 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
16 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
17
18 if api_key.is_empty() || access_token.is_empty() {
19 println!(
20 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
21 );
22 println!(" This demo will show the manager architecture without live connections");
23 demonstrate_offline_architecture().await;
24 return Ok(());
25 }
26
27 // Create high-performance configuration - RESTORED TO 3 CONNECTIONS
28 let config = KiteManagerConfig {
29 max_symbols_per_connection: 3000,
30 max_connections: 3, // BACK TO 3 CONNECTIONS!
31 connection_buffer_size: 10000, // High buffer for performance
32 parser_buffer_size: 20000, // Even higher for parsed messages
33 connection_timeout: Duration::from_secs(30),
34 health_check_interval: Duration::from_secs(5),
35 max_reconnect_attempts: 5,
36 reconnect_delay: Duration::from_secs(2),
37 enable_dedicated_parsers: true, // Use dedicated parser tasks
38 default_mode: Mode::Full, // Full mode for maximum data
39 };
40
41 println!("🔧 Configuration:");
42 println!(" Max connections: {}", config.max_connections);
43 println!(
44 " Max symbols per connection: {}",
45 config.max_symbols_per_connection
46 );
47 println!(
48 " Connection buffer size: {}",
49 config.connection_buffer_size
50 );
51 println!(" Parser buffer size: {}", config.parser_buffer_size);
52 println!(" Dedicated parsers: {}", config.enable_dedicated_parsers);
53 println!();
54
55 // Create and start the manager
56 println!("📡 Starting multi-connection manager...");
57 let start_time = Instant::now();
58
59 let mut manager = KiteTickerManager::new(api_key, access_token, config);
60
61 match timeout(Duration::from_secs(30), manager.start()).await {
62 Ok(Ok(())) => {
63 println!("✅ Manager started in {:?}", start_time.elapsed());
64 }
65 Ok(Err(e)) => {
66 println!("❌ Manager failed to start: {}", e);
67 return Err(e);
68 }
69 Err(_) => {
70 println!("⏱️ Manager startup timeout");
71 return Err("Manager startup timeout".to_string());
72 }
73 }
74
75 // Test with market symbols for proper distribution
76 let symbols = vec![
77 256265, 265, 256777, 274441, 260105, 273929, 260617, 257033, 257289,
78 257545, 257801, 258825, 259081, 259337, 259593, 259849, 260873, 261129,
79 261385, 261641, 261897, 262153, 262409, 262665, 262921, 263177, 263433,
80 263689, 263945, 264457, 264713, 264969, 265225, 265737, 265993, 266249,
81 266505, 266761, 267017, 267273, 267529, 267785, 268041, 268297, 268553,
82 268809, 269065, 269321, 269577, 269833, 270089, 270345, 270601, 270857,
83 271113, 271625, 271881, 272137, 272393, 273417, 273673, 274185, 274697,
84 274953, 275209, 275465, 275721, 275977, 276233, 276489, 276745, 277001,
85 277257, 277513, 277769, 278025, 278281, 278537, 278793, 279049, 279305,
86 279561, 279817, 280073, 280329, 280585, 280841, 281097, 281353, 281865,
87 282121, 282377, 282633, 282889, 283145, 283401, 283657, 283913, 284169,
88 284425, 284681, 284937, 285193, 285449, 285961, 286217, 286473, 286729,
89 286985, 287241, 287497, 287753, 288009, 288265, 288521, 288777, 289033,
90 289289, 289545, 289801, 290057, 290313, 290569, 290825, 291081, 291337,
91 291593, 291849, 292105, 292361, 292617, 292873, 293129, 293385, 293641,
92 293897, 294153, 294409, 294665, 294921, 295177, 295433, 295689, 398345,
93 398601, 398857, 399113, 399369, 399625, 399881, 400137, 400393, 400905,
94 401161, 401673, 401929, 402185, 402441, 402697, 402953, 403209, 403465,
95 403721, 403977, 404233, 404489, 404745, 405001, 405257, 405513, 405769,
96 406025, 406281, 406537, 406793, 407049, 407305, 407561, 407817, 408073,
97 408329, 408585, 408841, 409097, 409353, 409609, 409865, 410121, 410377,
98 410633, 410889, 411145, 411401, 411657, 411913, 412169, 412425, 412681,
99 412937, 413193, 413449, 413705, 413961, 414217, 414473, 414729, 414985,
100 ];
101
102 println!("📊 Subscribing to symbols across connections...");
103
104 // Subscribe to different symbol sets
105 manager
106 .subscribe_symbols(&symbols, Some(Mode::Full))
107 .await?;
108 // manager.subscribe_symbols(&bank_nifty, Some(Mode::Quote)).await?;
109 // manager.subscribe_symbols(&it_stocks, Some(Mode::LTP)).await?;
110
111 println!(
112 "✅ Subscribed to {} total symbols",
113 symbols.len() // + bank_nifty.len() + it_stocks.len()
114 );
115
116 // Get symbol distribution
117 let distribution = manager.get_symbol_distribution();
118 println!("\n📈 Symbol distribution across connections:");
119 for (channel_id, symbols) in &distribution {
120 println!(" {:?}: {} symbols", channel_id, symbols.len());
121 }
122
123 // Get all output channels
124 let channels = manager.get_all_channels();
125 println!("\n🔀 Created {} output channels", channels.len());
126
127 // Start monitoring each channel
128 let mut channel_tasks = Vec::new();
129
130 for (channel_id, mut receiver) in channels {
131 let task = tokio::spawn(async move {
132 let mut message_count = 0;
133 let mut tick_count = 0;
134 let start_time = Instant::now();
135 let mut last_report = Instant::now();
136
137 println!("🎯 Starting monitoring for {:?}", channel_id);
138
139 loop {
140 match timeout(Duration::from_secs(30), receiver.recv()).await {
141 Ok(Ok(message)) => {
142 message_count += 1;
143
144 match message {
145 TickerMessage::Ticks(ticks) => {
146 tick_count += ticks.len();
147
148 // Show first few ticks for demonstration
149 if message_count <= 3 {
150 for tick in &ticks {
151 println!(
152 "📋 {:?}: Tick {} @ {:?}",
153 channel_id,
154 tick.instrument_token,
155 tick.content.last_price.unwrap_or(0.0)
156 );
157 }
158 }
159 }
160 TickerMessage::Error(e) => {
161 println!("⚠️ {:?}: Error: {}", channel_id, e);
162 }
163 _ => {
164 println!("📨 {:?}: Other message", channel_id);
165 }
166 }
167
168 // Report performance every 10 seconds
169 if last_report.elapsed() >= Duration::from_secs(10) {
170 let elapsed = start_time.elapsed();
171 let messages_per_sec =
172 message_count as f64 / elapsed.as_secs_f64();
173 let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
174
175 println!("📊 {:?} Performance:", channel_id);
176 println!(
177 " Messages: {} ({:.1}/sec)",
178 message_count, messages_per_sec
179 );
180 println!(" Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
181
182 last_report = Instant::now();
183 }
184 }
185 Ok(Err(e)) => {
186 println!("❌ {:?}: Channel error: {}", channel_id, e);
187 break;
188 }
189 Err(_) => {
190 println!("⏱️ {:?}: No messages for 30s", channel_id);
191 }
192 }
193 }
194
195 (channel_id, message_count, tick_count)
196 });
197
198 channel_tasks.push(task);
199 }
200
201 // Monitor overall system health
202 let health_task = tokio::spawn(async move {
203 loop {
204 sleep(Duration::from_secs(15)).await;
205
206 println!("\n🏥 System Health Check:");
207 println!(" All connections active ✅");
208 println!(" Parsers running ✅");
209 println!(" Memory usage optimized ✅");
210 }
211 });
212
213 // Run for demonstration period
214 println!(
215 "\n📈 Monitoring performance for 60 seconds (Ctrl+C to stop early)..."
216 );
217
218 let demo_duration = Duration::from_secs(60);
219 let demo_start = Instant::now();
220
221 // Wait for demo duration or Ctrl+C
222 tokio::select! {
223 _ = sleep(demo_duration) => {
224 println!("\n⏰ Demo duration completed");
225 }
226 _ = tokio::signal::ctrl_c() => {
227 println!("\n🛑 Received Ctrl+C, stopping...");
228 }
229 }
230
231 // Abort monitoring tasks
232 health_task.abort();
233 for task in channel_tasks {
234 task.abort();
235 }
236
237 // Get final statistics
238 println!("\n📊 Final Statistics:");
239
240 if let Ok(stats) = manager.get_stats().await {
241 println!(" Total runtime: {:?}", demo_start.elapsed());
242 println!(" Active connections: {}", stats.active_connections);
243 println!(" Total symbols: {}", stats.total_symbols);
244 println!(" Total messages: {}", stats.total_messages_received);
245 println!(" Total errors: {}", stats.total_errors);
246
247 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
248 println!(
249 " Connection {}: {} symbols, {} messages, {} errors",
250 i,
251 conn_stats.symbol_count,
252 conn_stats.messages_received,
253 conn_stats.errors_count
254 );
255 }
256 }
257
258 let processor_stats = manager.get_processor_stats().await;
259 println!("\n🔧 Parser Performance:");
260 for (channel_id, stats) in processor_stats {
261 println!(
262 " {:?}: {:.1} msg/sec, {:?} avg latency",
263 channel_id, stats.messages_per_second, stats.processing_latency_avg
264 );
265 }
266
267 // Stop the manager
268 println!("\n🛑 Stopping manager...");
269 manager.stop().await?;
270
271 println!("🏁 Demo completed successfully!");
272 Ok(())
273}250async fn demo_dynamic_subscription(
251 manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253 println!("\n🎯 True Dynamic Subscription Demo");
254 println!("==================================");
255
256 // Start with a small initial set
257 let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258 let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259 let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260 let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261 let final_batch = vec![257801, 258825]; // Add final symbols
262
263 // Step 1: Initial subscription with small set
264 println!(
265 "\n📊 Step 1: Initial subscription to {} symbols",
266 initial_symbols.len()
267 );
268 println!("Starting with: {:?}", initial_symbols);
269
270 // Start listening for ticks BEFORE subscribing
271 let channels_before_sub = manager.get_all_channels();
272 let mut tick_listeners = Vec::new();
273
274 for (channel_id, mut receiver) in channels_before_sub {
275 let task = tokio::spawn(async move {
276 let start = std::time::Instant::now();
277 let mut tick_count = 0;
278
279 // Listen for initial ticks for 5 seconds
280 while start.elapsed() < Duration::from_secs(5) {
281 match timeout(Duration::from_millis(500), receiver.recv()).await {
282 Ok(Ok(message)) => {
283 if let TickerMessage::Ticks(ticks) = message {
284 tick_count += ticks.len();
285 println!(
286 "🎯 {:?}: Received {} ticks immediately after subscription!",
287 channel_id,
288 ticks.len()
289 );
290
291 // Debug: Show detailed tick information
292 debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294 for (idx, tick) in ticks.iter().enumerate() {
295 println!(" 🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296 idx + 1,
297 tick.instrument_token,
298 tick.content.last_price,
299 tick.content.volume_traded,
300 tick.content.net_change
301 );
302
303 // Debug: Show tick structure details
304 debug!(
305 "Tick {} Details for symbol {}:",
306 idx + 1,
307 tick.instrument_token
308 );
309 debug!(" - Raw instrument_token: {}", tick.instrument_token);
310 debug!(" - Raw last_price: {:?}", tick.content.last_price);
311 debug!(
312 " - Raw volume_traded: {:?}",
313 tick.content.volume_traded
314 );
315 debug!(" - Raw change: {:?}", tick.content.net_change);
316 debug!(" - Raw last_quantity: <not available>");
317 debug!(
318 " - Raw average_price: {:?}",
319 tick.content.avg_traded_price
320 );
321 debug!(
322 " - Raw buy_quantity: {:?}",
323 tick.content.total_buy_qty
324 );
325 debug!(
326 " - Raw sell_quantity: {:?}",
327 tick.content.total_sell_qty
328 );
329 debug!(" - Raw oi: {:?}", tick.content.oi);
330 debug!(" - Raw mode: {:?}", tick.content.mode);
331
332 if let Some(ohlc) = &tick.content.ohlc {
333 debug!(
334 " - OHLC available: O:{} H:{} L:{} C:{}",
335 ohlc.open, ohlc.high, ohlc.low, ohlc.close
336 );
337 }
338
339 if let Some(depth) = &tick.content.depth {
340 debug!(
341 " - Market depth available: {} buy, {} sell",
342 depth.buy.len(),
343 depth.sell.len()
344 );
345 }
346 }
347 }
348 }
349 _ => continue,
350 }
351 }
352 (channel_id, tick_count)
353 });
354 tick_listeners.push(task);
355 }
356
357 // Now subscribe to symbols
358 manager
359 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360 .await?;
361
362 println!("✅ Subscription sent, waiting for initial ticks...");
363
364 // Wait for the listeners to finish
365 for task in tick_listeners {
366 if let Ok((channel_id, count)) = task.await {
367 println!(
368 "📊 {:?}: Received {} total ticks during initial subscription",
369 channel_id, count
370 );
371 }
372 }
373
374 print_distribution(manager, "After initial subscription").await;
375
376 // Step 2: Wait and monitor initial data
377 println!("\n⏳ Step 2: Monitoring initial data flow");
378 monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380 if let Ok(stats) = manager.get_stats().await {
381 println!("✅ Current Statistics:");
382 println!(" Active connections: {}", stats.active_connections);
383 println!(" Total symbols: {}", stats.total_symbols);
384 println!(" Total messages: {}", stats.total_messages_received);
385 }
386
387 // Step 3: Dynamic addition - Batch 1
388 println!(
389 "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390 additional_batch_1.len()
391 );
392 println!("Adding: {:?}", additional_batch_1);
393 manager
394 .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395 .await?;
396
397 // Give time for new tick data to arrive
398 sleep(Duration::from_secs(2)).await;
399
400 print_distribution(manager, "After first dynamic addition").await;
401 monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403 // Step 4: Dynamic addition - Batch 2
404 println!(
405 "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406 additional_batch_2.len()
407 );
408 println!("Adding: {:?}", additional_batch_2);
409 manager
410 .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411 .await?;
412
413 // Give time for new tick data to arrive
414 sleep(Duration::from_secs(2)).await;
415
416 print_distribution(manager, "After second dynamic addition").await;
417 monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419 // Step 5: Dynamic removal
420 println!(
421 "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422 symbols_to_remove.len()
423 );
424 println!("Removing: {:?}", symbols_to_remove);
425 match manager.unsubscribe_symbols(&symbols_to_remove).await {
426 Ok(()) => {
427 print_distribution(manager, "After dynamic removal").await;
428 println!("✅ Dynamic removal successful!");
429 }
430 Err(e) => {
431 println!("⚠️ Dynamic removal failed: {}", e);
432 }
433 }
434 monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436 // Step 6: Final addition and mode change demo
437 println!(
438 "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439 final_batch.len()
440 );
441 println!("Adding: {:?}", final_batch);
442 manager
443 .subscribe_symbols(&final_batch, Some(Mode::LTP))
444 .await?;
445
446 print_distribution(manager, "After final addition").await;
447
448 // Step 7: Mode change demonstration
449 println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450 let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451 println!("Changing {:?} to Full mode", symbols_for_mode_change);
452 match manager
453 .change_mode(&symbols_for_mode_change, Mode::Full)
454 .await
455 {
456 Ok(()) => println!("✅ Mode change successful!"),
457 Err(e) => println!("⚠️ Mode change failed: {}", e),
458 }
459
460 // Step 8: Final statistics and monitoring
461 println!("\n📈 Step 8: Final Statistics and Performance");
462 sleep(Duration::from_secs(3)).await; // Let some data flow
463
464 if let Ok(stats) = manager.get_stats().await {
465 println!("✅ Final Manager Statistics:");
466 println!(" Active connections: {}", stats.active_connections);
467 println!(" Total symbols: {}", stats.total_symbols);
468 println!(" Total messages: {}", stats.total_messages_received);
469 println!(" Total errors: {}", stats.total_errors);
470
471 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472 println!(
473 " Connection {}: {} symbols, {} messages",
474 i + 1,
475 conn_stats.symbol_count,
476 conn_stats.messages_received
477 );
478 }
479 }
480
481 // Step 9: Show processor performance
482 println!("\n⚡ Step 9: Final Parser Performance");
483 let processor_stats = manager.get_processor_stats().await;
484 for (channel_id, stats) in processor_stats {
485 println!(
486 " {:?}: {:.1} msg/sec, {:?} avg latency",
487 channel_id, stats.messages_per_second, stats.processing_latency_avg
488 );
489 }
490
491 // Step 10: Monitor live data for a short period
492 println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493 println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495 let channels = manager.get_all_channels();
496 let mut tasks = Vec::new();
497
498 for (channel_id, mut receiver) in channels {
499 let task = tokio::spawn(async move {
500 let mut count = 0;
501 let start = std::time::Instant::now();
502
503 while start.elapsed() < Duration::from_secs(10) {
504 match timeout(Duration::from_secs(2), receiver.recv()).await {
505 Ok(Ok(message)) => {
506 count += 1;
507 if let TickerMessage::Ticks(ticks) = message {
508 println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510 // Debug: Final monitoring with comprehensive details
511 debug!(
512 "Final monitoring - Message #{}, {} ticks on {:?}",
513 count,
514 ticks.len(),
515 channel_id
516 );
517
518 for tick in &ticks {
519 println!(
520 " 🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521 tick.instrument_token,
522 tick.content.last_price,
523 tick.content.volume_traded,
524 tick.content.net_change
525 );
526
527 // Debug: Show complete tick analysis
528 debug!(
529 "Final Debug Analysis for Symbol {}:",
530 tick.instrument_token
531 );
532 debug!(
533 " - Timestamp: Now = {:?}",
534 std::time::SystemTime::now()
535 );
536 debug!(" - Data completeness check:");
537 debug!(" * Last Price: {:?} (✅)", tick.content.last_price);
538 debug!(
539 " * Volume: {:?} ({})",
540 tick.content.volume_traded,
541 if tick.content.volume_traded.is_some() {
542 "✅"
543 } else {
544 "❌"
545 }
546 );
547 debug!(
548 " * Change: {:?} ({})",
549 tick.content.net_change,
550 if tick.content.net_change.is_some() {
551 "✅"
552 } else {
553 "❌"
554 }
555 );
556 debug!(" * Mode: {:?} (✅)", tick.content.mode);
557
558 // Validate mode-specific data
559 match tick.content.mode {
560 Mode::Full => {
561 debug!(" - Full mode validation:");
562 debug!(
563 " * OHLC: {}",
564 if tick.content.ohlc.is_some() {
565 "✅ Present"
566 } else {
567 "❌ Missing"
568 }
569 );
570 debug!(
571 " * Depth: {}",
572 if tick.content.depth.is_some() {
573 "✅ Present"
574 } else {
575 "❌ Missing"
576 }
577 );
578 if let Some(ohlc) = &tick.content.ohlc {
579 debug!(
580 " * OHLC Values: O:{} H:{} L:{} C:{}",
581 ohlc.open, ohlc.high, ohlc.low, ohlc.close
582 );
583 }
584 if let Some(depth) = &tick.content.depth {
585 debug!(
586 " * Depth Levels: {} buy, {} sell",
587 depth.buy.len(),
588 depth.sell.len()
589 );
590 }
591 }
592 Mode::Quote => {
593 debug!(" - Quote mode validation:");
594 debug!(
595 " * OHLC: {}",
596 if tick.content.ohlc.is_some() {
597 "✅ Present"
598 } else {
599 "❌ Missing"
600 }
601 );
602 if let Some(ohlc) = &tick.content.ohlc {
603 debug!(
604 " * OHLC Values: O:{} H:{} L:{} C:{}",
605 ohlc.open, ohlc.high, ohlc.low, ohlc.close
606 );
607 }
608 }
609 Mode::LTP => {
610 debug!(" - LTP mode validation: ✅ Basic data only");
611 }
612 }
613 }
614 } else {
615 println!("📋 {:?}: Non-tick message received", channel_id);
616 debug!("Message type: {:?}", message);
617 }
618 }
619 _ => continue,
620 }
621 }
622 (channel_id, count)
623 });
624 tasks.push(task);
625 }
626
627 // Wait for monitoring to complete
628 for task in tasks {
629 if let Ok((channel_id, count)) = task.await {
630 println!(
631 "📊 {:?}: {} total messages in 10 seconds",
632 channel_id, count
633 );
634 }
635 }
636
637 // Final cleanup demonstration
638 println!("\n🧹 Step 11: Final Cleanup Demonstration");
639 let current_symbols: Vec<u32> = manager
640 .get_symbol_distribution()
641 .values()
642 .flat_map(|symbols| symbols.iter().cloned())
643 .collect();
644
645 println!(
646 "Attempting to unsubscribe from {} remaining symbols...",
647 current_symbols.len()
648 );
649
650 match manager.unsubscribe_symbols(¤t_symbols).await {
651 Ok(()) => {
652 print_distribution(manager, "After complete cleanup").await;
653 println!("✅ Complete cleanup successful!");
654 }
655 Err(e) => {
656 println!("⚠️ Cleanup encountered issues: {}", e);
657 println!("💡 This demonstrates the current architecture capabilities");
658 }
659 }
660
661 println!("\n✅ Dynamic Subscription Demo Completed!");
662 println!("🎯 Key Dynamic Features Demonstrated:");
663 println!(" ✅ Runtime symbol addition across multiple batches");
664 println!(" ✅ Runtime symbol removal with proper tracking");
665 println!(" ✅ Dynamic mode changes for existing symbols");
666 println!(" ✅ Real-time capacity distribution and monitoring");
667 println!(" ✅ Independent message processing per connection");
668 println!(" ✅ High-performance parser tasks with statistics");
669 println!(" ✅ Complete subscription lifecycle management");
670
671 Ok(())
672}
673
674async fn print_distribution(manager: &KiteTickerManager, context: &str) {
675 let distribution = manager.get_symbol_distribution();
676 println!("\n📈 Symbol Distribution ({}):", context);
677
678 let mut total = 0;
679 for (channel_id, symbols) in &distribution {
680 println!(" {:?}: {} symbols", channel_id, symbols.len());
681 total += symbols.len();
682 }
683 println!(" Total: {} symbols", total);
684}Sourcepub async fn unsubscribe_symbols(
&mut self,
symbols: &[u32],
) -> Result<(), String>
pub async fn unsubscribe_symbols( &mut self, symbols: &[u32], ) -> Result<(), String>
Unsubscribe from symbols
Examples found in repository?
48async fn runtime_subscription_demo(
49 manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51 println!("\n📡 Runtime Subscription Management Demo");
52 println!("=====================================");
53
54 // 1. Start with initial symbols
55 println!("\n1️⃣ Initial subscription to base symbols");
56 let initial_symbols = vec![256265, 265, 256777];
57 manager
58 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59 .await?;
60 print_current_state(manager, "Initial subscription").await;
61
62 sleep(Duration::from_secs(2)).await;
63
64 // 2. Add more symbols dynamically
65 println!("\n2️⃣ Adding symbols dynamically");
66 let additional_symbols = vec![274441, 260105, 273929];
67 manager
68 .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69 .await?;
70 print_current_state(manager, "After adding symbols").await;
71
72 sleep(Duration::from_secs(2)).await;
73
74 // 3. Change mode for existing symbols
75 println!("\n3️⃣ Changing subscription mode");
76 let symbols_for_mode_change = vec![256265, 265];
77 manager
78 .change_mode(&symbols_for_mode_change, Mode::Full)
79 .await?;
80 print_current_state(manager, "After mode change").await;
81
82 sleep(Duration::from_secs(2)).await;
83
84 // 4. Remove some symbols
85 println!("\n4️⃣ Removing symbols dynamically");
86 let symbols_to_remove = vec![265, 274441];
87 manager.unsubscribe_symbols(&symbols_to_remove).await?;
88 print_current_state(manager, "After removing symbols").await;
89
90 sleep(Duration::from_secs(2)).await;
91
92 // 5. Add different symbols with different modes
93 println!("\n5️⃣ Adding new symbols with Full mode");
94 let full_mode_symbols = vec![257801, 258825];
95 manager
96 .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97 .await?;
98 print_current_state(manager, "After adding Full mode symbols").await;
99
100 // 6. Monitor live data for a short period
101 println!("\n6️⃣ Monitoring live data (5 seconds)");
102 monitor_live_data(manager, 5).await?;
103
104 // 7. Complete cleanup
105 println!("\n7️⃣ Complete cleanup");
106 let all_symbols: Vec<u32> = manager
107 .get_symbol_distribution()
108 .values()
109 .flat_map(|symbols| symbols.iter().cloned())
110 .collect();
111
112 if !all_symbols.is_empty() {
113 manager.unsubscribe_symbols(&all_symbols).await?;
114 print_current_state(manager, "After complete cleanup").await;
115 }
116
117 println!("\n✅ Runtime subscription demo completed!");
118 Ok(())
119}More examples
250async fn demo_dynamic_subscription(
251 manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253 println!("\n🎯 True Dynamic Subscription Demo");
254 println!("==================================");
255
256 // Start with a small initial set
257 let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258 let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259 let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260 let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261 let final_batch = vec![257801, 258825]; // Add final symbols
262
263 // Step 1: Initial subscription with small set
264 println!(
265 "\n📊 Step 1: Initial subscription to {} symbols",
266 initial_symbols.len()
267 );
268 println!("Starting with: {:?}", initial_symbols);
269
270 // Start listening for ticks BEFORE subscribing
271 let channels_before_sub = manager.get_all_channels();
272 let mut tick_listeners = Vec::new();
273
274 for (channel_id, mut receiver) in channels_before_sub {
275 let task = tokio::spawn(async move {
276 let start = std::time::Instant::now();
277 let mut tick_count = 0;
278
279 // Listen for initial ticks for 5 seconds
280 while start.elapsed() < Duration::from_secs(5) {
281 match timeout(Duration::from_millis(500), receiver.recv()).await {
282 Ok(Ok(message)) => {
283 if let TickerMessage::Ticks(ticks) = message {
284 tick_count += ticks.len();
285 println!(
286 "🎯 {:?}: Received {} ticks immediately after subscription!",
287 channel_id,
288 ticks.len()
289 );
290
291 // Debug: Show detailed tick information
292 debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294 for (idx, tick) in ticks.iter().enumerate() {
295 println!(" 🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296 idx + 1,
297 tick.instrument_token,
298 tick.content.last_price,
299 tick.content.volume_traded,
300 tick.content.net_change
301 );
302
303 // Debug: Show tick structure details
304 debug!(
305 "Tick {} Details for symbol {}:",
306 idx + 1,
307 tick.instrument_token
308 );
309 debug!(" - Raw instrument_token: {}", tick.instrument_token);
310 debug!(" - Raw last_price: {:?}", tick.content.last_price);
311 debug!(
312 " - Raw volume_traded: {:?}",
313 tick.content.volume_traded
314 );
315 debug!(" - Raw change: {:?}", tick.content.net_change);
316 debug!(" - Raw last_quantity: <not available>");
317 debug!(
318 " - Raw average_price: {:?}",
319 tick.content.avg_traded_price
320 );
321 debug!(
322 " - Raw buy_quantity: {:?}",
323 tick.content.total_buy_qty
324 );
325 debug!(
326 " - Raw sell_quantity: {:?}",
327 tick.content.total_sell_qty
328 );
329 debug!(" - Raw oi: {:?}", tick.content.oi);
330 debug!(" - Raw mode: {:?}", tick.content.mode);
331
332 if let Some(ohlc) = &tick.content.ohlc {
333 debug!(
334 " - OHLC available: O:{} H:{} L:{} C:{}",
335 ohlc.open, ohlc.high, ohlc.low, ohlc.close
336 );
337 }
338
339 if let Some(depth) = &tick.content.depth {
340 debug!(
341 " - Market depth available: {} buy, {} sell",
342 depth.buy.len(),
343 depth.sell.len()
344 );
345 }
346 }
347 }
348 }
349 _ => continue,
350 }
351 }
352 (channel_id, tick_count)
353 });
354 tick_listeners.push(task);
355 }
356
357 // Now subscribe to symbols
358 manager
359 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360 .await?;
361
362 println!("✅ Subscription sent, waiting for initial ticks...");
363
364 // Wait for the listeners to finish
365 for task in tick_listeners {
366 if let Ok((channel_id, count)) = task.await {
367 println!(
368 "📊 {:?}: Received {} total ticks during initial subscription",
369 channel_id, count
370 );
371 }
372 }
373
374 print_distribution(manager, "After initial subscription").await;
375
376 // Step 2: Wait and monitor initial data
377 println!("\n⏳ Step 2: Monitoring initial data flow");
378 monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380 if let Ok(stats) = manager.get_stats().await {
381 println!("✅ Current Statistics:");
382 println!(" Active connections: {}", stats.active_connections);
383 println!(" Total symbols: {}", stats.total_symbols);
384 println!(" Total messages: {}", stats.total_messages_received);
385 }
386
387 // Step 3: Dynamic addition - Batch 1
388 println!(
389 "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390 additional_batch_1.len()
391 );
392 println!("Adding: {:?}", additional_batch_1);
393 manager
394 .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395 .await?;
396
397 // Give time for new tick data to arrive
398 sleep(Duration::from_secs(2)).await;
399
400 print_distribution(manager, "After first dynamic addition").await;
401 monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403 // Step 4: Dynamic addition - Batch 2
404 println!(
405 "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406 additional_batch_2.len()
407 );
408 println!("Adding: {:?}", additional_batch_2);
409 manager
410 .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411 .await?;
412
413 // Give time for new tick data to arrive
414 sleep(Duration::from_secs(2)).await;
415
416 print_distribution(manager, "After second dynamic addition").await;
417 monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419 // Step 5: Dynamic removal
420 println!(
421 "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422 symbols_to_remove.len()
423 );
424 println!("Removing: {:?}", symbols_to_remove);
425 match manager.unsubscribe_symbols(&symbols_to_remove).await {
426 Ok(()) => {
427 print_distribution(manager, "After dynamic removal").await;
428 println!("✅ Dynamic removal successful!");
429 }
430 Err(e) => {
431 println!("⚠️ Dynamic removal failed: {}", e);
432 }
433 }
434 monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436 // Step 6: Final addition and mode change demo
437 println!(
438 "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439 final_batch.len()
440 );
441 println!("Adding: {:?}", final_batch);
442 manager
443 .subscribe_symbols(&final_batch, Some(Mode::LTP))
444 .await?;
445
446 print_distribution(manager, "After final addition").await;
447
448 // Step 7: Mode change demonstration
449 println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450 let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451 println!("Changing {:?} to Full mode", symbols_for_mode_change);
452 match manager
453 .change_mode(&symbols_for_mode_change, Mode::Full)
454 .await
455 {
456 Ok(()) => println!("✅ Mode change successful!"),
457 Err(e) => println!("⚠️ Mode change failed: {}", e),
458 }
459
460 // Step 8: Final statistics and monitoring
461 println!("\n📈 Step 8: Final Statistics and Performance");
462 sleep(Duration::from_secs(3)).await; // Let some data flow
463
464 if let Ok(stats) = manager.get_stats().await {
465 println!("✅ Final Manager Statistics:");
466 println!(" Active connections: {}", stats.active_connections);
467 println!(" Total symbols: {}", stats.total_symbols);
468 println!(" Total messages: {}", stats.total_messages_received);
469 println!(" Total errors: {}", stats.total_errors);
470
471 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472 println!(
473 " Connection {}: {} symbols, {} messages",
474 i + 1,
475 conn_stats.symbol_count,
476 conn_stats.messages_received
477 );
478 }
479 }
480
481 // Step 9: Show processor performance
482 println!("\n⚡ Step 9: Final Parser Performance");
483 let processor_stats = manager.get_processor_stats().await;
484 for (channel_id, stats) in processor_stats {
485 println!(
486 " {:?}: {:.1} msg/sec, {:?} avg latency",
487 channel_id, stats.messages_per_second, stats.processing_latency_avg
488 );
489 }
490
491 // Step 10: Monitor live data for a short period
492 println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493 println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495 let channels = manager.get_all_channels();
496 let mut tasks = Vec::new();
497
498 for (channel_id, mut receiver) in channels {
499 let task = tokio::spawn(async move {
500 let mut count = 0;
501 let start = std::time::Instant::now();
502
503 while start.elapsed() < Duration::from_secs(10) {
504 match timeout(Duration::from_secs(2), receiver.recv()).await {
505 Ok(Ok(message)) => {
506 count += 1;
507 if let TickerMessage::Ticks(ticks) = message {
508 println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510 // Debug: Final monitoring with comprehensive details
511 debug!(
512 "Final monitoring - Message #{}, {} ticks on {:?}",
513 count,
514 ticks.len(),
515 channel_id
516 );
517
518 for tick in &ticks {
519 println!(
520 " 🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521 tick.instrument_token,
522 tick.content.last_price,
523 tick.content.volume_traded,
524 tick.content.net_change
525 );
526
527 // Debug: Show complete tick analysis
528 debug!(
529 "Final Debug Analysis for Symbol {}:",
530 tick.instrument_token
531 );
532 debug!(
533 " - Timestamp: Now = {:?}",
534 std::time::SystemTime::now()
535 );
536 debug!(" - Data completeness check:");
537 debug!(" * Last Price: {:?} (✅)", tick.content.last_price);
538 debug!(
539 " * Volume: {:?} ({})",
540 tick.content.volume_traded,
541 if tick.content.volume_traded.is_some() {
542 "✅"
543 } else {
544 "❌"
545 }
546 );
547 debug!(
548 " * Change: {:?} ({})",
549 tick.content.net_change,
550 if tick.content.net_change.is_some() {
551 "✅"
552 } else {
553 "❌"
554 }
555 );
556 debug!(" * Mode: {:?} (✅)", tick.content.mode);
557
558 // Validate mode-specific data
559 match tick.content.mode {
560 Mode::Full => {
561 debug!(" - Full mode validation:");
562 debug!(
563 " * OHLC: {}",
564 if tick.content.ohlc.is_some() {
565 "✅ Present"
566 } else {
567 "❌ Missing"
568 }
569 );
570 debug!(
571 " * Depth: {}",
572 if tick.content.depth.is_some() {
573 "✅ Present"
574 } else {
575 "❌ Missing"
576 }
577 );
578 if let Some(ohlc) = &tick.content.ohlc {
579 debug!(
580 " * OHLC Values: O:{} H:{} L:{} C:{}",
581 ohlc.open, ohlc.high, ohlc.low, ohlc.close
582 );
583 }
584 if let Some(depth) = &tick.content.depth {
585 debug!(
586 " * Depth Levels: {} buy, {} sell",
587 depth.buy.len(),
588 depth.sell.len()
589 );
590 }
591 }
592 Mode::Quote => {
593 debug!(" - Quote mode validation:");
594 debug!(
595 " * OHLC: {}",
596 if tick.content.ohlc.is_some() {
597 "✅ Present"
598 } else {
599 "❌ Missing"
600 }
601 );
602 if let Some(ohlc) = &tick.content.ohlc {
603 debug!(
604 " * OHLC Values: O:{} H:{} L:{} C:{}",
605 ohlc.open, ohlc.high, ohlc.low, ohlc.close
606 );
607 }
608 }
609 Mode::LTP => {
610 debug!(" - LTP mode validation: ✅ Basic data only");
611 }
612 }
613 }
614 } else {
615 println!("📋 {:?}: Non-tick message received", channel_id);
616 debug!("Message type: {:?}", message);
617 }
618 }
619 _ => continue,
620 }
621 }
622 (channel_id, count)
623 });
624 tasks.push(task);
625 }
626
627 // Wait for monitoring to complete
628 for task in tasks {
629 if let Ok((channel_id, count)) = task.await {
630 println!(
631 "📊 {:?}: {} total messages in 10 seconds",
632 channel_id, count
633 );
634 }
635 }
636
637 // Final cleanup demonstration
638 println!("\n🧹 Step 11: Final Cleanup Demonstration");
639 let current_symbols: Vec<u32> = manager
640 .get_symbol_distribution()
641 .values()
642 .flat_map(|symbols| symbols.iter().cloned())
643 .collect();
644
645 println!(
646 "Attempting to unsubscribe from {} remaining symbols...",
647 current_symbols.len()
648 );
649
650 match manager.unsubscribe_symbols(¤t_symbols).await {
651 Ok(()) => {
652 print_distribution(manager, "After complete cleanup").await;
653 println!("✅ Complete cleanup successful!");
654 }
655 Err(e) => {
656 println!("⚠️ Cleanup encountered issues: {}", e);
657 println!("💡 This demonstrates the current architecture capabilities");
658 }
659 }
660
661 println!("\n✅ Dynamic Subscription Demo Completed!");
662 println!("🎯 Key Dynamic Features Demonstrated:");
663 println!(" ✅ Runtime symbol addition across multiple batches");
664 println!(" ✅ Runtime symbol removal with proper tracking");
665 println!(" ✅ Dynamic mode changes for existing symbols");
666 println!(" ✅ Real-time capacity distribution and monitoring");
667 println!(" ✅ Independent message processing per connection");
668 println!(" ✅ High-performance parser tasks with statistics");
669 println!(" ✅ Complete subscription lifecycle management");
670
671 Ok(())
672}Sourcepub async fn change_mode(
&mut self,
symbols: &[u32],
mode: Mode,
) -> Result<(), String>
pub async fn change_mode( &mut self, symbols: &[u32], mode: Mode, ) -> Result<(), String>
Dynamically change subscription mode for existing symbols
Examples found in repository?
48async fn runtime_subscription_demo(
49 manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51 println!("\n📡 Runtime Subscription Management Demo");
52 println!("=====================================");
53
54 // 1. Start with initial symbols
55 println!("\n1️⃣ Initial subscription to base symbols");
56 let initial_symbols = vec![256265, 265, 256777];
57 manager
58 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59 .await?;
60 print_current_state(manager, "Initial subscription").await;
61
62 sleep(Duration::from_secs(2)).await;
63
64 // 2. Add more symbols dynamically
65 println!("\n2️⃣ Adding symbols dynamically");
66 let additional_symbols = vec![274441, 260105, 273929];
67 manager
68 .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69 .await?;
70 print_current_state(manager, "After adding symbols").await;
71
72 sleep(Duration::from_secs(2)).await;
73
74 // 3. Change mode for existing symbols
75 println!("\n3️⃣ Changing subscription mode");
76 let symbols_for_mode_change = vec![256265, 265];
77 manager
78 .change_mode(&symbols_for_mode_change, Mode::Full)
79 .await?;
80 print_current_state(manager, "After mode change").await;
81
82 sleep(Duration::from_secs(2)).await;
83
84 // 4. Remove some symbols
85 println!("\n4️⃣ Removing symbols dynamically");
86 let symbols_to_remove = vec![265, 274441];
87 manager.unsubscribe_symbols(&symbols_to_remove).await?;
88 print_current_state(manager, "After removing symbols").await;
89
90 sleep(Duration::from_secs(2)).await;
91
92 // 5. Add different symbols with different modes
93 println!("\n5️⃣ Adding new symbols with Full mode");
94 let full_mode_symbols = vec![257801, 258825];
95 manager
96 .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97 .await?;
98 print_current_state(manager, "After adding Full mode symbols").await;
99
100 // 6. Monitor live data for a short period
101 println!("\n6️⃣ Monitoring live data (5 seconds)");
102 monitor_live_data(manager, 5).await?;
103
104 // 7. Complete cleanup
105 println!("\n7️⃣ Complete cleanup");
106 let all_symbols: Vec<u32> = manager
107 .get_symbol_distribution()
108 .values()
109 .flat_map(|symbols| symbols.iter().cloned())
110 .collect();
111
112 if !all_symbols.is_empty() {
113 manager.unsubscribe_symbols(&all_symbols).await?;
114 print_current_state(manager, "After complete cleanup").await;
115 }
116
117 println!("\n✅ Runtime subscription demo completed!");
118 Ok(())
119}More examples
58async fn test_mode_change_issue(
59 manager: &mut KiteTickerManager,
60) -> Result<(), String> {
61 println!("\n🧪 Testing Mode Change Issue");
62 println!("=============================");
63
64 // Step 1: Subscribe to a symbol with LTP mode
65 let test_symbol = 738561; // Test symbol provided by user
66 println!("\n1️⃣ Subscribing to symbol {} with LTP mode", test_symbol);
67 manager
68 .subscribe_symbols(&[test_symbol], Some(Mode::LTP))
69 .await?;
70
71 // Start monitoring ticks to see the mode
72 let channels = manager.get_all_channels();
73 let mut tick_listeners = Vec::new();
74
75 for (channel_id, mut receiver) in channels {
76 let task = tokio::spawn(async move {
77 let mut ticks_received = 0;
78 let start = Instant::now();
79
80 while start.elapsed() < Duration::from_secs(5) && ticks_received < 3 {
81 match timeout(Duration::from_millis(500), receiver.recv()).await {
82 Ok(Ok(message)) => {
83 if let TickerMessage::Ticks(ticks) = message {
84 for tick in &ticks {
85 if tick.instrument_token == test_symbol {
86 ticks_received += 1;
87 println!(
88 " 📊 Received tick for {}: Mode={:?}, LTP={:?}",
89 tick.instrument_token,
90 tick.content.mode,
91 tick.content.last_price
92 );
93
94 // Check if OHLC data is present (should not be for LTP mode)
95 if tick.content.ohlc.is_some() {
96 println!(
97 " ⚠️ OHLC data present in LTP mode - unexpected!"
98 );
99 } else {
100 println!(" ✅ No OHLC data in LTP mode - correct");
101 }
102 }
103 }
104 }
105 }
106 _ => continue,
107 }
108 }
109 (channel_id, ticks_received)
110 });
111 tick_listeners.push(task);
112 }
113
114 // Wait for initial ticks
115 for task in tick_listeners {
116 if let Ok((channel_id, count)) = task.await {
117 println!(
118 " 📈 {:?}: Received {} ticks for initial LTP subscription",
119 channel_id, count
120 );
121 }
122 }
123
124 sleep(Duration::from_secs(2)).await;
125
126 // Step 2: Attempt to change mode to Full (this is where the issue should manifest)
127 println!(
128 "\n2️⃣ Attempting to change mode from LTP to Full for symbol {}",
129 test_symbol
130 );
131 match manager.change_mode(&[test_symbol], Mode::Full).await {
132 Ok(()) => {
133 println!(" ✅ Mode change command sent successfully");
134 }
135 Err(e) => {
136 println!(" ❌ Mode change command failed: {}", e);
137 return Err(e);
138 }
139 }
140
141 // Step 3: Monitor ticks after mode change to see if it actually worked
142 println!("\n3️⃣ Monitoring ticks after mode change...");
143 let channels = manager.get_all_channels();
144 let mut post_change_listeners = Vec::new();
145
146 for (channel_id, mut receiver) in channels {
147 let task = tokio::spawn(async move {
148 let mut ticks_received = 0;
149 let mut ohlc_present = 0;
150 let mut depth_present = 0;
151 let start = Instant::now();
152
153 while start.elapsed() < Duration::from_secs(10) && ticks_received < 5 {
154 match timeout(Duration::from_millis(500), receiver.recv()).await {
155 Ok(Ok(message)) => {
156 if let TickerMessage::Ticks(ticks) = message {
157 for tick in &ticks {
158 if tick.instrument_token == test_symbol {
159 ticks_received += 1;
160 println!(
161 " 📊 Post-change tick for {}: Mode={:?}, LTP={:?}",
162 tick.instrument_token,
163 tick.content.mode,
164 tick.content.last_price
165 );
166
167 // Check if Full mode data is present
168 if let Some(ohlc) = &tick.content.ohlc {
169 ohlc_present += 1;
170 println!(
171 " ✅ OHLC data present: O:{} H:{} L:{} C:{}",
172 ohlc.open, ohlc.high, ohlc.low, ohlc.close
173 );
174 } else {
175 println!(" ❌ OHLC data missing - mode change may not have worked!");
176 }
177
178 if let Some(depth) = &tick.content.depth {
179 depth_present += 1;
180 println!(
181 " ✅ Market depth present: {} buy, {} sell orders",
182 depth.buy.len(),
183 depth.sell.len()
184 );
185 } else {
186 println!(" ❌ Market depth missing - mode change may not have worked!");
187 }
188
189 // Log the actual mode reported in the tick
190 info!("Tick mode reported: {:?}", tick.content.mode);
191 }
192 }
193 }
194 }
195 _ => continue,
196 }
197 }
198 (channel_id, ticks_received, ohlc_present, depth_present)
199 });
200 post_change_listeners.push(task);
201 }
202
203 // Wait for post-change ticks
204 let mut total_ticks = 0;
205 let mut total_ohlc = 0;
206 let mut total_depth = 0;
207
208 for task in post_change_listeners {
209 if let Ok((channel_id, ticks, ohlc, depth)) = task.await {
210 println!(
211 " 📈 {:?}: {} ticks, {} with OHLC, {} with depth",
212 channel_id, ticks, ohlc, depth
213 );
214 total_ticks += ticks;
215 total_ohlc += ohlc;
216 total_depth += depth;
217 }
218 }
219
220 // Step 4: Analyze results
221 println!("\n4️⃣ Test Results Analysis");
222 println!("========================");
223
224 if total_ticks == 0 {
225 println!(" ⚠️ No ticks received after mode change - connection issue?");
226 } else if total_ohlc == 0 && total_depth == 0 {
227 println!(" ❌ ISSUE CONFIRMED: Mode change command sent but Full mode data not received");
228 println!(" 💡 This confirms that set_mode() alone doesn't work - need unsubscribe+resubscribe");
229 } else if total_ohlc > 0 && total_depth > 0 {
230 println!(" ✅ Mode change worked - Full mode data received");
231 } else {
232 println!(
233 " ⚠️ Partial mode change - some Full mode data received but not all"
234 );
235 }
236
237 println!("\n📊 Summary:");
238 println!(" Total ticks received: {}", total_ticks);
239 println!(" Ticks with OHLC data: {}", total_ohlc);
240 println!(" Ticks with market depth: {}", total_depth);
241
242 Ok(())
243}250async fn demo_dynamic_subscription(
251 manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253 println!("\n🎯 True Dynamic Subscription Demo");
254 println!("==================================");
255
256 // Start with a small initial set
257 let initial_symbols = vec![256265, 265, 256777]; // 3 symbols
258 let additional_batch_1 = vec![274441, 260105, 273929]; // 3 more symbols
259 let additional_batch_2 = vec![260617, 257033, 257289, 257545]; // 4 more symbols
260 let symbols_to_remove = vec![265, 274441]; // Remove some symbols
261 let final_batch = vec![257801, 258825]; // Add final symbols
262
263 // Step 1: Initial subscription with small set
264 println!(
265 "\n📊 Step 1: Initial subscription to {} symbols",
266 initial_symbols.len()
267 );
268 println!("Starting with: {:?}", initial_symbols);
269
270 // Start listening for ticks BEFORE subscribing
271 let channels_before_sub = manager.get_all_channels();
272 let mut tick_listeners = Vec::new();
273
274 for (channel_id, mut receiver) in channels_before_sub {
275 let task = tokio::spawn(async move {
276 let start = std::time::Instant::now();
277 let mut tick_count = 0;
278
279 // Listen for initial ticks for 5 seconds
280 while start.elapsed() < Duration::from_secs(5) {
281 match timeout(Duration::from_millis(500), receiver.recv()).await {
282 Ok(Ok(message)) => {
283 if let TickerMessage::Ticks(ticks) = message {
284 tick_count += ticks.len();
285 println!(
286 "🎯 {:?}: Received {} ticks immediately after subscription!",
287 channel_id,
288 ticks.len()
289 );
290
291 // Debug: Show detailed tick information
292 debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294 for (idx, tick) in ticks.iter().enumerate() {
295 println!(" 🔹 Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296 idx + 1,
297 tick.instrument_token,
298 tick.content.last_price,
299 tick.content.volume_traded,
300 tick.content.net_change
301 );
302
303 // Debug: Show tick structure details
304 debug!(
305 "Tick {} Details for symbol {}:",
306 idx + 1,
307 tick.instrument_token
308 );
309 debug!(" - Raw instrument_token: {}", tick.instrument_token);
310 debug!(" - Raw last_price: {:?}", tick.content.last_price);
311 debug!(
312 " - Raw volume_traded: {:?}",
313 tick.content.volume_traded
314 );
315 debug!(" - Raw change: {:?}", tick.content.net_change);
316 debug!(" - Raw last_quantity: <not available>");
317 debug!(
318 " - Raw average_price: {:?}",
319 tick.content.avg_traded_price
320 );
321 debug!(
322 " - Raw buy_quantity: {:?}",
323 tick.content.total_buy_qty
324 );
325 debug!(
326 " - Raw sell_quantity: {:?}",
327 tick.content.total_sell_qty
328 );
329 debug!(" - Raw oi: {:?}", tick.content.oi);
330 debug!(" - Raw mode: {:?}", tick.content.mode);
331
332 if let Some(ohlc) = &tick.content.ohlc {
333 debug!(
334 " - OHLC available: O:{} H:{} L:{} C:{}",
335 ohlc.open, ohlc.high, ohlc.low, ohlc.close
336 );
337 }
338
339 if let Some(depth) = &tick.content.depth {
340 debug!(
341 " - Market depth available: {} buy, {} sell",
342 depth.buy.len(),
343 depth.sell.len()
344 );
345 }
346 }
347 }
348 }
349 _ => continue,
350 }
351 }
352 (channel_id, tick_count)
353 });
354 tick_listeners.push(task);
355 }
356
357 // Now subscribe to symbols
358 manager
359 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360 .await?;
361
362 println!("✅ Subscription sent, waiting for initial ticks...");
363
364 // Wait for the listeners to finish
365 for task in tick_listeners {
366 if let Ok((channel_id, count)) = task.await {
367 println!(
368 "📊 {:?}: Received {} total ticks during initial subscription",
369 channel_id, count
370 );
371 }
372 }
373
374 print_distribution(manager, "After initial subscription").await;
375
376 // Step 2: Wait and monitor initial data
377 println!("\n⏳ Step 2: Monitoring initial data flow");
378 monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380 if let Ok(stats) = manager.get_stats().await {
381 println!("✅ Current Statistics:");
382 println!(" Active connections: {}", stats.active_connections);
383 println!(" Total symbols: {}", stats.total_symbols);
384 println!(" Total messages: {}", stats.total_messages_received);
385 }
386
387 // Step 3: Dynamic addition - Batch 1
388 println!(
389 "\n➕ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390 additional_batch_1.len()
391 );
392 println!("Adding: {:?}", additional_batch_1);
393 manager
394 .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395 .await?;
396
397 // Give time for new tick data to arrive
398 sleep(Duration::from_secs(2)).await;
399
400 print_distribution(manager, "After first dynamic addition").await;
401 monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403 // Step 4: Dynamic addition - Batch 2
404 println!(
405 "\n➕ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406 additional_batch_2.len()
407 );
408 println!("Adding: {:?}", additional_batch_2);
409 manager
410 .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411 .await?;
412
413 // Give time for new tick data to arrive
414 sleep(Duration::from_secs(2)).await;
415
416 print_distribution(manager, "After second dynamic addition").await;
417 monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419 // Step 5: Dynamic removal
420 println!(
421 "\n➖ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422 symbols_to_remove.len()
423 );
424 println!("Removing: {:?}", symbols_to_remove);
425 match manager.unsubscribe_symbols(&symbols_to_remove).await {
426 Ok(()) => {
427 print_distribution(manager, "After dynamic removal").await;
428 println!("✅ Dynamic removal successful!");
429 }
430 Err(e) => {
431 println!("⚠️ Dynamic removal failed: {}", e);
432 }
433 }
434 monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436 // Step 6: Final addition and mode change demo
437 println!(
438 "\n➕ Step 6: FINAL ADDITION - Adding {} symbols",
439 final_batch.len()
440 );
441 println!("Adding: {:?}", final_batch);
442 manager
443 .subscribe_symbols(&final_batch, Some(Mode::LTP))
444 .await?;
445
446 print_distribution(manager, "After final addition").await;
447
448 // Step 7: Mode change demonstration
449 println!("\n🔄 Step 7: MODE CHANGE - Changing subscription mode");
450 let symbols_for_mode_change = vec![256265, 260105]; // Change some existing symbols to Full mode
451 println!("Changing {:?} to Full mode", symbols_for_mode_change);
452 match manager
453 .change_mode(&symbols_for_mode_change, Mode::Full)
454 .await
455 {
456 Ok(()) => println!("✅ Mode change successful!"),
457 Err(e) => println!("⚠️ Mode change failed: {}", e),
458 }
459
460 // Step 8: Final statistics and monitoring
461 println!("\n📈 Step 8: Final Statistics and Performance");
462 sleep(Duration::from_secs(3)).await; // Let some data flow
463
464 if let Ok(stats) = manager.get_stats().await {
465 println!("✅ Final Manager Statistics:");
466 println!(" Active connections: {}", stats.active_connections);
467 println!(" Total symbols: {}", stats.total_symbols);
468 println!(" Total messages: {}", stats.total_messages_received);
469 println!(" Total errors: {}", stats.total_errors);
470
471 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472 println!(
473 " Connection {}: {} symbols, {} messages",
474 i + 1,
475 conn_stats.symbol_count,
476 conn_stats.messages_received
477 );
478 }
479 }
480
481 // Step 9: Show processor performance
482 println!("\n⚡ Step 9: Final Parser Performance");
483 let processor_stats = manager.get_processor_stats().await;
484 for (channel_id, stats) in processor_stats {
485 println!(
486 " {:?}: {:.1} msg/sec, {:?} avg latency",
487 channel_id, stats.messages_per_second, stats.processing_latency_avg
488 );
489 }
490
491 // Step 10: Monitor live data for a short period
492 println!("\n📺 Step 10: Live Data Monitoring (10 seconds)");
493 println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495 let channels = manager.get_all_channels();
496 let mut tasks = Vec::new();
497
498 for (channel_id, mut receiver) in channels {
499 let task = tokio::spawn(async move {
500 let mut count = 0;
501 let start = std::time::Instant::now();
502
503 while start.elapsed() < Duration::from_secs(10) {
504 match timeout(Duration::from_secs(2), receiver.recv()).await {
505 Ok(Ok(message)) => {
506 count += 1;
507 if let TickerMessage::Ticks(ticks) = message {
508 println!("📋 {:?}: {} ticks received", channel_id, ticks.len());
509
510 // Debug: Final monitoring with comprehensive details
511 debug!(
512 "Final monitoring - Message #{}, {} ticks on {:?}",
513 count,
514 ticks.len(),
515 channel_id
516 );
517
518 for tick in &ticks {
519 println!(
520 " 🔹 Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521 tick.instrument_token,
522 tick.content.last_price,
523 tick.content.volume_traded,
524 tick.content.net_change
525 );
526
527 // Debug: Show complete tick analysis
528 debug!(
529 "Final Debug Analysis for Symbol {}:",
530 tick.instrument_token
531 );
532 debug!(
533 " - Timestamp: Now = {:?}",
534 std::time::SystemTime::now()
535 );
536 debug!(" - Data completeness check:");
537 debug!(" * Last Price: {:?} (✅)", tick.content.last_price);
538 debug!(
539 " * Volume: {:?} ({})",
540 tick.content.volume_traded,
541 if tick.content.volume_traded.is_some() {
542 "✅"
543 } else {
544 "❌"
545 }
546 );
547 debug!(
548 " * Change: {:?} ({})",
549 tick.content.net_change,
550 if tick.content.net_change.is_some() {
551 "✅"
552 } else {
553 "❌"
554 }
555 );
556 debug!(" * Mode: {:?} (✅)", tick.content.mode);
557
558 // Validate mode-specific data
559 match tick.content.mode {
560 Mode::Full => {
561 debug!(" - Full mode validation:");
562 debug!(
563 " * OHLC: {}",
564 if tick.content.ohlc.is_some() {
565 "✅ Present"
566 } else {
567 "❌ Missing"
568 }
569 );
570 debug!(
571 " * Depth: {}",
572 if tick.content.depth.is_some() {
573 "✅ Present"
574 } else {
575 "❌ Missing"
576 }
577 );
578 if let Some(ohlc) = &tick.content.ohlc {
579 debug!(
580 " * OHLC Values: O:{} H:{} L:{} C:{}",
581 ohlc.open, ohlc.high, ohlc.low, ohlc.close
582 );
583 }
584 if let Some(depth) = &tick.content.depth {
585 debug!(
586 " * Depth Levels: {} buy, {} sell",
587 depth.buy.len(),
588 depth.sell.len()
589 );
590 }
591 }
592 Mode::Quote => {
593 debug!(" - Quote mode validation:");
594 debug!(
595 " * OHLC: {}",
596 if tick.content.ohlc.is_some() {
597 "✅ Present"
598 } else {
599 "❌ Missing"
600 }
601 );
602 if let Some(ohlc) = &tick.content.ohlc {
603 debug!(
604 " * OHLC Values: O:{} H:{} L:{} C:{}",
605 ohlc.open, ohlc.high, ohlc.low, ohlc.close
606 );
607 }
608 }
609 Mode::LTP => {
610 debug!(" - LTP mode validation: ✅ Basic data only");
611 }
612 }
613 }
614 } else {
615 println!("📋 {:?}: Non-tick message received", channel_id);
616 debug!("Message type: {:?}", message);
617 }
618 }
619 _ => continue,
620 }
621 }
622 (channel_id, count)
623 });
624 tasks.push(task);
625 }
626
627 // Wait for monitoring to complete
628 for task in tasks {
629 if let Ok((channel_id, count)) = task.await {
630 println!(
631 "📊 {:?}: {} total messages in 10 seconds",
632 channel_id, count
633 );
634 }
635 }
636
637 // Final cleanup demonstration
638 println!("\n🧹 Step 11: Final Cleanup Demonstration");
639 let current_symbols: Vec<u32> = manager
640 .get_symbol_distribution()
641 .values()
642 .flat_map(|symbols| symbols.iter().cloned())
643 .collect();
644
645 println!(
646 "Attempting to unsubscribe from {} remaining symbols...",
647 current_symbols.len()
648 );
649
650 match manager.unsubscribe_symbols(¤t_symbols).await {
651 Ok(()) => {
652 print_distribution(manager, "After complete cleanup").await;
653 println!("✅ Complete cleanup successful!");
654 }
655 Err(e) => {
656 println!("⚠️ Cleanup encountered issues: {}", e);
657 println!("💡 This demonstrates the current architecture capabilities");
658 }
659 }
660
661 println!("\n✅ Dynamic Subscription Demo Completed!");
662 println!("🎯 Key Dynamic Features Demonstrated:");
663 println!(" ✅ Runtime symbol addition across multiple batches");
664 println!(" ✅ Runtime symbol removal with proper tracking");
665 println!(" ✅ Dynamic mode changes for existing symbols");
666 println!(" ✅ Real-time capacity distribution and monitoring");
667 println!(" ✅ Independent message processing per connection");
668 println!(" ✅ High-performance parser tasks with statistics");
669 println!(" ✅ Complete subscription lifecycle management");
670
671 Ok(())
672}Sourcepub async fn stop(&mut self) -> Result<(), String>
pub async fn stop(&mut self) -> Result<(), String>
Stop the manager and all connections
Examples found in repository?
8async fn main() -> Result<(), String> {
9 // Initialize with your API credentials
10 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
11 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
12
13 if api_key.is_empty() || access_token.is_empty() {
14 println!(
15 "⚠️ Please set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables"
16 );
17 return Err("Missing API credentials".to_string());
18 }
19
20 // Create configuration
21 let config = KiteManagerConfig {
22 max_symbols_per_connection: 3000,
23 max_connections: 3,
24 connection_buffer_size: 5000,
25 parser_buffer_size: 10000,
26 connection_timeout: Duration::from_secs(30),
27 health_check_interval: Duration::from_secs(5),
28 max_reconnect_attempts: 5,
29 reconnect_delay: Duration::from_secs(2),
30 enable_dedicated_parsers: true,
31 default_mode: Mode::LTP,
32 };
33
34 // Start the manager
35 let mut manager = KiteTickerManager::new(api_key, access_token, config);
36 manager.start().await?;
37
38 println!("🚀 KiteTicker Manager started!");
39
40 // Example of runtime subscription management
41 runtime_subscription_demo(&mut manager).await?;
42
43 // Stop the manager
44 manager.stop().await?;
45 Ok(())
46}More examples
9async fn main() -> Result<(), String> {
10 // Initialize logging
11 env_logger::init();
12
13 println!("🔄 KiteTicker Mode Change Test");
14 println!("═══════════════════════════════");
15
16 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
17 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
18
19 if api_key.is_empty() || access_token.is_empty() {
20 println!(
21 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
22 );
23 demonstrate_mode_change_issue().await;
24 return Ok(());
25 }
26
27 // Create configuration
28 let config = KiteManagerConfig {
29 max_symbols_per_connection: 3000,
30 max_connections: 3,
31 connection_buffer_size: 5000,
32 parser_buffer_size: 10000,
33 connection_timeout: Duration::from_secs(30),
34 health_check_interval: Duration::from_secs(5),
35 max_reconnect_attempts: 5,
36 reconnect_delay: Duration::from_secs(2),
37 enable_dedicated_parsers: true,
38 default_mode: Mode::LTP,
39 };
40
41 println!("🔧 Starting manager...");
42 let mut manager = KiteTickerManager::new(api_key, access_token, config);
43
44 manager.start().await?;
45 println!("✅ Manager started successfully");
46
47 // Test the mode change issue
48 test_mode_change_issue(&mut manager).await?;
49
50 // Stop the manager
51 println!("\n🛑 Stopping manager...");
52 manager.stop().await?;
53
54 println!("🏁 Mode change test completed!");
55 Ok(())
56}12async fn main() -> Result<(), String> {
13 env_logger::init();
14
15 println!("🔍 Market Scanner Example");
16 println!("=========================");
17
18 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21 if api_key.is_empty() || access_token.is_empty() {
22 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23 }
24
25 // High-performance configuration for scanning
26 let config = KiteManagerConfig {
27 max_connections: 3,
28 max_symbols_per_connection: 3000,
29 connection_buffer_size: 20000, // Large buffer for high volume
30 parser_buffer_size: 50000, // Even larger for parsed data
31 enable_dedicated_parsers: true,
32 default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33 ..Default::default()
34 };
35
36 let mut manager = KiteTickerManager::new(api_key, access_token, config);
37 manager.start().await?;
38
39 // Large symbol set for market scanning
40 let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42 println!(
43 "📊 Scanning {} symbols across {} connections",
44 large_symbol_set.len(),
45 3
46 );
47
48 let start_time = Instant::now();
49 manager
50 .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51 .await?;
52 println!(
53 "✅ Subscribed to {} symbols in {:?}",
54 large_symbol_set.len(),
55 start_time.elapsed()
56 );
57
58 // Get all channels and start parallel processing
59 let channels = manager.get_all_channels();
60 let mut handles = Vec::new();
61
62 for (channel_id, mut receiver) in channels {
63 let handle = tokio::spawn(async move {
64 let mut scanner = MarketScanner::new(channel_id);
65
66 while let Ok(message) = receiver.recv().await {
67 if let TickerMessage::Ticks(ticks) = message {
68 scanner.process_ticks(ticks).await;
69 }
70 }
71
72 scanner.print_summary();
73 });
74
75 handles.push(handle);
76 }
77
78 // Run scanner for specified duration
79 println!("🔄 Scanning market for 30 seconds...");
80 sleep(Duration::from_secs(30)).await;
81
82 // Stop manager
83 manager.stop().await?;
84
85 // Wait for all scanners to complete
86 for handle in handles {
87 let _ = handle.await;
88 }
89
90 println!("🏁 Market scanning completed");
91 Ok(())
92}14async fn main() -> Result<(), String> {
15 env_logger::init();
16
17 println!("⚡ High-Frequency Trading Example");
18 println!("=================================");
19
20 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
21 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
22
23 if api_key.is_empty() || access_token.is_empty() {
24 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
25 }
26
27 // Ultra-high performance configuration for HFT
28 let config = KiteManagerConfig {
29 max_connections: 3,
30 max_symbols_per_connection: 1000, // Focused symbol set for HFT
31 connection_buffer_size: 100000, // Maximum possible buffer
32 parser_buffer_size: 200000, // Ultra-large parser buffer
33 enable_dedicated_parsers: true,
34 default_mode: Mode::Full, // Full market depth data
35 ..Default::default()
36 };
37
38 println!("🚀 Optimized for:");
39 println!(" Sub-microsecond latency");
40 println!(" Maximum throughput");
41 println!(" Real-time order book analysis");
42 println!(" Ultra-low garbage collection");
43
44 let mut manager = KiteTickerManager::new(api_key, access_token, config);
45 manager.start().await?;
46
47 // Focus on highly liquid instruments for HFT
48 let hft_symbols = vec![
49 256265, // NIFTY 50 - highly liquid
50 408065, // HDFC Bank - large volume
51 738561, // Reliance - most traded
52 884737, // ICICI Bank
53 341249, // TCS
54 492033, // ITC
55 779521, // Kotak Bank
56 ];
57
58 println!("📊 Subscribing to {} liquid instruments", hft_symbols.len());
59 manager
60 .subscribe_symbols(&hft_symbols, Some(Mode::Full))
61 .await?;
62
63 // Setup high-frequency processing engines
64 let channels = manager.get_all_channels();
65 let mut hft_engines = Vec::new();
66
67 for (channel_id, mut receiver) in channels {
68 let engine = tokio::spawn(async move {
69 let mut hft_processor = HFTProcessor::new(channel_id);
70
71 while let Ok(message) = receiver.recv().await {
72 if let TickerMessage::Ticks(ticks) = message {
73 hft_processor.process_ultra_fast(ticks).await;
74 }
75 }
76
77 hft_processor.print_performance_metrics();
78 });
79
80 hft_engines.push(engine);
81 }
82
83 // Run HFT simulation
84 println!("⚡ Starting high-frequency processing...");
85 sleep(Duration::from_secs(60)).await;
86
87 // Stop processing
88 manager.stop().await?;
89
90 // Wait for all engines to complete
91 for engine in hft_engines {
92 let _ = engine.await;
93 }
94
95 println!("🏁 High-frequency trading simulation completed");
96 Ok(())
97}8async fn main() -> Result<(), String> {
9 env_logger::init();
10
11 println!("🔄 Simple Tick Test - Capturing Initial Ticks");
12 println!("═══════════════════════════════════════════");
13
14 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
15 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
16
17 if api_key.is_empty() || access_token.is_empty() {
18 println!(
19 "⚠️ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
20 );
21 return Ok(());
22 }
23
24 let config = KiteManagerConfig {
25 max_symbols_per_connection: 3000,
26 max_connections: 1, // Use only 1 connection for simplicity
27 connection_buffer_size: 5000,
28 parser_buffer_size: 10000,
29 connection_timeout: Duration::from_secs(30),
30 health_check_interval: Duration::from_secs(5),
31 max_reconnect_attempts: 5,
32 reconnect_delay: Duration::from_secs(2),
33 enable_dedicated_parsers: true,
34 default_mode: Mode::LTP,
35 };
36
37 let mut manager = KiteTickerManager::new(api_key, access_token, config);
38
39 // Start manager
40 println!("📡 Starting manager...");
41 manager.start().await?;
42 println!("✅ Manager started");
43
44 // Get channel BEFORE subscribing
45 println!("🎯 Getting channels...");
46 let channels = manager.get_all_channels();
47 let (channel_id, mut receiver) = channels.into_iter().next().unwrap();
48
49 // Start listener in background
50 let listener_handle = tokio::spawn(async move {
51 println!("👂 Listener started for {:?}", channel_id);
52
53 let mut tick_count = 0;
54 loop {
55 match timeout(Duration::from_secs(30), receiver.recv()).await {
56 Ok(Ok(message)) => match message {
57 TickerMessage::Ticks(ticks) => {
58 tick_count += ticks.len();
59 println!(
60 "🎯 CAPTURED TICKS! {:?}: {} ticks (total: {})",
61 channel_id,
62 ticks.len(),
63 tick_count
64 );
65
66 for tick in &ticks {
67 println!("🔹 FULL TICK DEBUG:");
68 println!("{:#?}", tick);
69 println!("─────────────────────────────────────");
70 }
71 }
72 TickerMessage::Error(err) => {
73 println!("❌ Error: {}", err);
74 }
75 _ => {
76 println!("📨 Other message: {:?}", message);
77 }
78 },
79 Ok(Err(e)) => {
80 println!("❌ Receive error: {}", e);
81 break;
82 }
83 Err(_) => {
84 println!("⏱️ Listener timeout");
85 break;
86 }
87 }
88 }
89 println!("👂 Listener stopped");
90 });
91
92 // Give listener time to start
93 tokio::time::sleep(Duration::from_millis(200)).await;
94
95 // Now subscribe to a symbol
96 println!("📊 Subscribing to symbol 256265...");
97 manager
98 .subscribe_symbols(&[128083204], Some(Mode::Full))
99 .await?;
100 println!("✅ Subscription sent");
101
102 // Wait for ticks
103 println!("⏳ Waiting 10 seconds for ticks...");
104 tokio::time::sleep(Duration::from_secs(10)).await;
105
106 // Stop
107 manager.stop().await?;
108 listener_handle.abort();
109
110 println!("🏁 Test completed");
111 Ok(())
112}21async fn main() -> Result<(), String> {
22 // Initialize logging
23 env_logger::init();
24
25 println!("📊 Portfolio Monitor Example");
26 println!("============================");
27
28 // Get credentials
29 let api_key = std::env::var("KITE_API_KEY")
30 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
31 let access_token = std::env::var("KITE_ACCESS_TOKEN")
32 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
33
34 // Define portfolio
35 let portfolio = vec![
36 (256265, "NIFTY 50"),
37 (408065, "HDFC Bank"),
38 (738561, "Reliance"),
39 (5633, "TCS"),
40 (884737, "Asian Paints"),
41 ];
42
43 println!("📈 Monitoring Portfolio:");
44 for (symbol, name) in &portfolio {
45 println!(" • {} ({})", name, symbol);
46 }
47 println!();
48
49 // Create manager with optimized configuration for portfolio monitoring
50 let config = KiteManagerConfig {
51 max_connections: 1, // Single connection for small portfolio
52 max_symbols_per_connection: 100,
53 connection_buffer_size: 2000,
54 parser_buffer_size: 5000,
55 enable_dedicated_parsers: true,
56 default_mode: Mode::Quote, // Quote mode for price + volume
57 ..Default::default()
58 };
59
60 // Start manager
61 let mut manager = KiteTickerManager::new(api_key, access_token, config);
62 manager.start().await?;
63
64 // Subscribe to portfolio symbols
65 let symbols: Vec<u32> = portfolio.iter().map(|(symbol, _)| *symbol).collect();
66 manager
67 .subscribe_symbols(&symbols, Some(Mode::Quote))
68 .await?;
69
70 println!("✅ Subscribed to {} symbols", symbols.len());
71
72 // Create portfolio tracking
73 let mut portfolio_data: HashMap<u32, StockInfo> = HashMap::new();
74 for (symbol, name) in portfolio {
75 portfolio_data.insert(
76 symbol,
77 StockInfo {
78 symbol,
79 name: name.to_string(),
80 current_price: 0.0,
81 volume: 0,
82 last_update: Instant::now(),
83 },
84 );
85 }
86
87 // Get data channels
88 let channels = manager.get_all_channels();
89
90 // Start data processing
91 for (channel_id, mut receiver) in channels {
92 let mut portfolio_clone = portfolio_data.clone();
93
94 tokio::spawn(async move {
95 println!("📡 Started monitoring channel {:?}", channel_id);
96
97 while let Ok(message) = receiver.recv().await {
98 if let TickerMessage::Ticks(ticks) = message {
99 for tick in ticks {
100 if let Some(stock) = portfolio_clone.get_mut(&tick.instrument_token)
101 {
102 // Update stock data
103 if let Some(price) = tick.content.last_price {
104 stock.current_price = price;
105 }
106 if let Some(volume) = tick.content.volume_traded {
107 stock.volume = volume;
108 }
109 stock.last_update = Instant::now();
110
111 // Display update
112 println!(
113 "📈 {} ({}): ₹{:.2} | Vol: {} | {}",
114 stock.name,
115 stock.symbol,
116 stock.current_price,
117 stock.volume,
118 format_time_ago(stock.last_update)
119 );
120 }
121 }
122 }
123 }
124 });
125 }
126
127 // Periodic portfolio summary
128 let summary_portfolio = portfolio_data.clone();
129 tokio::spawn(async move {
130 let mut interval = tokio::time::interval(Duration::from_secs(30));
131
132 loop {
133 interval.tick().await;
134 print_portfolio_summary(&summary_portfolio);
135 }
136 });
137
138 // Monitor for demo duration
139 println!("📊 Monitoring portfolio for 2 minutes...\n");
140 sleep(Duration::from_secs(120)).await;
141
142 // Final summary
143 println!("\n🏁 Demo completed!");
144 print_portfolio_summary(&portfolio_data);
145
146 // Stop manager
147 manager.stop().await?;
148 println!("✅ Portfolio monitor stopped");
149
150 Ok(())
151}