1use kiteticker_async_manager::{
2 KiteManagerConfig, KiteTickerManager, Mode, TickerMessage,
3};
4use log::debug;
5use std::time::{Duration, Instant};
6use tokio::time::{sleep, timeout};
7
8#[tokio::main]
9async fn main() -> Result<(), String> {
10 env_logger::init();
12
13 println!("๐ KiteTicker Dynamic Subscription Demo");
14 println!("โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ");
15
16 debug!("Logging initialized");
18 debug!(
19 "Current log level: {}",
20 std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string())
21 );
22 debug!("For detailed manager logs, set RUST_LOG=debug");
23
24 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
25 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
26
27 if api_key.is_empty() || access_token.is_empty() {
28 println!(
29 "โ ๏ธ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
30 );
31 demonstrate_offline_dynamic_architecture().await;
32 return Ok(());
33 }
34
35 println!(
36 "๐ก Note: This demo shows enhanced tick printing with detailed market data"
37 );
38 println!("๐
During market hours, you'll see live tick data with OHLC, volume, and market depth");
39 println!("๐ Outside market hours, connections work but no tick data flows");
40 println!();
41
42 let config = KiteManagerConfig {
44 max_symbols_per_connection: 3000,
45 max_connections: 3,
46 connection_buffer_size: 5000,
47 parser_buffer_size: 10000,
48 connection_timeout: Duration::from_secs(30),
49 health_check_interval: Duration::from_secs(5),
50 max_reconnect_attempts: 5,
51 reconnect_delay: Duration::from_secs(2),
52 enable_dedicated_parsers: true,
53 default_mode: Mode::LTP, };
55
56 println!("๐ง Configuration for Dynamic Operations:");
57 println!(" Max connections: {}", config.max_connections);
58 println!(
59 " Max symbols per connection: {}",
60 config.max_symbols_per_connection
61 );
62 println!(" Default mode: {:?}", config.default_mode);
63 println!();
64
65 println!("๐ก Starting multi-connection manager...");
67 let start_time = Instant::now();
68
69 let mut manager = KiteTickerManager::new(api_key, access_token, config);
70
71 match timeout(Duration::from_secs(30), manager.start()).await {
72 Ok(Ok(())) => {
73 println!("โ
Manager started in {:?}", start_time.elapsed());
74 }
75 Ok(Err(e)) => {
76 println!("โ Manager failed to start: {}", e);
77 return Err(e);
78 }
79 Err(_) => {
80 println!("โฑ๏ธ Manager startup timeout");
81 return Err("Manager startup timeout".to_string());
82 }
83 }
84
85 println!("๐ฏ Starting permanent tick listeners...");
87 let channels = manager.get_all_channels();
88 let mut permanent_listeners = Vec::new();
89
90 for (channel_id, mut receiver) in channels {
91 let listener_task = tokio::spawn(async move {
92 let mut total_ticks = 0;
93 loop {
94 match receiver.recv().await {
95 Ok(message) => {
96 if let TickerMessage::Ticks(ticks) = message {
97 total_ticks += ticks.len();
98 println!(
99 "๐ฏ PERMANENT LISTENER {:?}: {} ticks (total: {})",
100 channel_id,
101 ticks.len(),
102 total_ticks
103 );
104
105 debug!("Raw tick count: {}", ticks.len());
107
108 for (i, tick) in ticks.iter().enumerate() {
109 println!(" ๐น Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}, Mode: {:?}",
110 i + 1,
111 tick.instrument_token,
112 tick.content.last_price,
113 tick.content.volume_traded,
114 tick.content.net_change,
115 tick.content.mode
116 );
117
118 debug!(
120 "DEBUG Full Tick Data for symbol {}:",
121 tick.instrument_token
122 );
123 debug!(" - Instrument Token: {}", tick.instrument_token);
124 debug!(" - Timestamp: <not available>");
125 debug!(" - Tradable: <not available>");
126 debug!(
127 " - Exchange Timestamp: {:?}",
128 tick.content.exchange_timestamp
129 );
130 debug!(" - Last Traded Time: <not available>");
131 debug!(
132 " - Total Buy Quantity: {:?}",
133 tick.content.total_buy_qty
134 );
135 debug!(
136 " - Total Sell Quantity: {:?}",
137 tick.content.total_sell_qty
138 );
139 debug!(
140 " - Average Price: {:?}",
141 tick.content.avg_traded_price
142 );
143 debug!(" - OI: {:?}", tick.content.oi);
144 debug!(" - OI Day High: {:?}", tick.content.oi_day_high);
145 debug!(" - OI Day Low: {:?}", tick.content.oi_day_low);
146
147 if let Some(ohlc) = &tick.content.ohlc {
149 println!(
150 " ๐ OHLC: O:{} H:{} L:{} C:{}",
151 ohlc.open, ohlc.high, ohlc.low, ohlc.close
152 );
153 debug!(
154 "OHLC data present: O:{} H:{} L:{} C:{}",
155 ohlc.open, ohlc.high, ohlc.low, ohlc.close
156 );
157 } else {
158 debug!(
159 "OHLC: Not available for symbol {}",
160 tick.instrument_token
161 );
162 }
163
164 if tick.content.mode == Mode::Full {
166 if let Some(depth) = &tick.content.depth {
167 println!(
168 " ๐ Market Depth: {} buy orders, {} sell orders",
169 depth.buy.len(),
170 depth.sell.len()
171 );
172
173 debug!(
175 "Market depth for symbol {}: {} buy, {} sell",
176 tick.instrument_token,
177 depth.buy.len(),
178 depth.sell.len()
179 );
180
181 if !depth.buy.is_empty() {
182 debug!(
183 "Top Buy Orders for symbol {}:",
184 tick.instrument_token
185 );
186 for (idx, buy_order) in
187 depth.buy.iter().take(3).enumerate()
188 {
189 debug!(
190 " {}. Price: {}, Qty: {}, Orders: {}",
191 idx + 1,
192 buy_order.price,
193 buy_order.qty,
194 buy_order.orders
195 );
196 }
197 }
198 if !depth.sell.is_empty() {
199 debug!(
200 "Top Sell Orders for symbol {}:",
201 tick.instrument_token
202 );
203 for (idx, sell_order) in
204 depth.sell.iter().take(3).enumerate()
205 {
206 debug!(
207 " {}. Price: {}, Qty: {}, Orders: {}",
208 idx + 1,
209 sell_order.price,
210 sell_order.qty,
211 sell_order.orders
212 );
213 }
214 }
215 } else {
216 debug!(
217 "Market Depth: Not available for symbol {}",
218 tick.instrument_token
219 );
220 }
221 }
222 }
223 }
224 }
225 Err(e) => {
226 println!("โ Listener {:?} error: {}", channel_id, e);
227 break;
228 }
229 }
230 }
231 });
232 permanent_listeners.push(listener_task);
233 }
234
235 sleep(Duration::from_millis(100)).await;
237 println!("โ
Permanent listeners started and ready");
238
239 demo_dynamic_subscription(&mut manager).await?;
241
242 println!("\n๐ Stopping manager...");
244 manager.stop().await?;
245
246 println!("๐ Dynamic subscription demo completed successfully!");
247 Ok(())
248}
249
250async fn demo_dynamic_subscription(
251 manager: &mut KiteTickerManager,
252) -> Result<(), String> {
253 println!("\n๐ฏ True Dynamic Subscription Demo");
254 println!("==================================");
255
256 let initial_symbols = vec![256265, 265, 256777]; let additional_batch_1 = vec![274441, 260105, 273929]; let additional_batch_2 = vec![260617, 257033, 257289, 257545]; let symbols_to_remove = vec![265, 274441]; let final_batch = vec![257801, 258825]; println!(
265 "\n๐ Step 1: Initial subscription to {} symbols",
266 initial_symbols.len()
267 );
268 println!("Starting with: {:?}", initial_symbols);
269
270 let channels_before_sub = manager.get_all_channels();
272 let mut tick_listeners = Vec::new();
273
274 for (channel_id, mut receiver) in channels_before_sub {
275 let task = tokio::spawn(async move {
276 let start = std::time::Instant::now();
277 let mut tick_count = 0;
278
279 while start.elapsed() < Duration::from_secs(5) {
281 match timeout(Duration::from_millis(500), receiver.recv()).await {
282 Ok(Ok(message)) => {
283 if let TickerMessage::Ticks(ticks) = message {
284 tick_count += ticks.len();
285 println!(
286 "๐ฏ {:?}: Received {} ticks immediately after subscription!",
287 channel_id,
288 ticks.len()
289 );
290
291 debug!("Processing {} ticks on {:?}", ticks.len(), channel_id);
293
294 for (idx, tick) in ticks.iter().enumerate() {
295 println!(" ๐น Tick {}: Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
296 idx + 1,
297 tick.instrument_token,
298 tick.content.last_price,
299 tick.content.volume_traded,
300 tick.content.net_change
301 );
302
303 debug!(
305 "Tick {} Details for symbol {}:",
306 idx + 1,
307 tick.instrument_token
308 );
309 debug!(" - Raw instrument_token: {}", tick.instrument_token);
310 debug!(" - Raw last_price: {:?}", tick.content.last_price);
311 debug!(
312 " - Raw volume_traded: {:?}",
313 tick.content.volume_traded
314 );
315 debug!(" - Raw change: {:?}", tick.content.net_change);
316 debug!(" - Raw last_quantity: <not available>");
317 debug!(
318 " - Raw average_price: {:?}",
319 tick.content.avg_traded_price
320 );
321 debug!(
322 " - Raw buy_quantity: {:?}",
323 tick.content.total_buy_qty
324 );
325 debug!(
326 " - Raw sell_quantity: {:?}",
327 tick.content.total_sell_qty
328 );
329 debug!(" - Raw oi: {:?}", tick.content.oi);
330 debug!(" - Raw mode: {:?}", tick.content.mode);
331
332 if let Some(ohlc) = &tick.content.ohlc {
333 debug!(
334 " - OHLC available: O:{} H:{} L:{} C:{}",
335 ohlc.open, ohlc.high, ohlc.low, ohlc.close
336 );
337 }
338
339 if let Some(depth) = &tick.content.depth {
340 debug!(
341 " - Market depth available: {} buy, {} sell",
342 depth.buy.len(),
343 depth.sell.len()
344 );
345 }
346 }
347 }
348 }
349 _ => continue,
350 }
351 }
352 (channel_id, tick_count)
353 });
354 tick_listeners.push(task);
355 }
356
357 manager
359 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
360 .await?;
361
362 println!("โ
Subscription sent, waiting for initial ticks...");
363
364 for task in tick_listeners {
366 if let Ok((channel_id, count)) = task.await {
367 println!(
368 "๐ {:?}: Received {} total ticks during initial subscription",
369 channel_id, count
370 );
371 }
372 }
373
374 print_distribution(manager, "After initial subscription").await;
375
376 println!("\nโณ Step 2: Monitoring initial data flow");
378 monitor_ticks_briefly(manager, 5, "Initial Subscription Data").await;
379
380 if let Ok(stats) = manager.get_stats().await {
381 println!("โ
Current Statistics:");
382 println!(" Active connections: {}", stats.active_connections);
383 println!(" Total symbols: {}", stats.total_symbols);
384 println!(" Total messages: {}", stats.total_messages_received);
385 }
386
387 println!(
389 "\nโ Step 3: DYNAMIC ADDITION - Adding {} more symbols",
390 additional_batch_1.len()
391 );
392 println!("Adding: {:?}", additional_batch_1);
393 manager
394 .subscribe_symbols(&additional_batch_1, Some(Mode::Quote))
395 .await?;
396
397 sleep(Duration::from_secs(2)).await;
399
400 print_distribution(manager, "After first dynamic addition").await;
401 monitor_ticks_briefly(manager, 3, "After First Addition").await;
402
403 println!(
405 "\nโ Step 4: DYNAMIC ADDITION - Adding {} more symbols",
406 additional_batch_2.len()
407 );
408 println!("Adding: {:?}", additional_batch_2);
409 manager
410 .subscribe_symbols(&additional_batch_2, Some(Mode::Full))
411 .await?;
412
413 sleep(Duration::from_secs(2)).await;
415
416 print_distribution(manager, "After second dynamic addition").await;
417 monitor_ticks_briefly(manager, 3, "After Second Addition").await;
418
419 println!(
421 "\nโ Step 5: DYNAMIC REMOVAL - Removing {} symbols",
422 symbols_to_remove.len()
423 );
424 println!("Removing: {:?}", symbols_to_remove);
425 match manager.unsubscribe_symbols(&symbols_to_remove).await {
426 Ok(()) => {
427 print_distribution(manager, "After dynamic removal").await;
428 println!("โ
Dynamic removal successful!");
429 }
430 Err(e) => {
431 println!("โ ๏ธ Dynamic removal failed: {}", e);
432 }
433 }
434 monitor_ticks_briefly(manager, 3, "After Symbol Removal").await;
435
436 println!(
438 "\nโ Step 6: FINAL ADDITION - Adding {} symbols",
439 final_batch.len()
440 );
441 println!("Adding: {:?}", final_batch);
442 manager
443 .subscribe_symbols(&final_batch, Some(Mode::LTP))
444 .await?;
445
446 print_distribution(manager, "After final addition").await;
447
448 println!("\n๐ Step 7: MODE CHANGE - Changing subscription mode");
450 let symbols_for_mode_change = vec![256265, 260105]; println!("Changing {:?} to Full mode", symbols_for_mode_change);
452 match manager
453 .change_mode(&symbols_for_mode_change, Mode::Full)
454 .await
455 {
456 Ok(()) => println!("โ
Mode change successful!"),
457 Err(e) => println!("โ ๏ธ Mode change failed: {}", e),
458 }
459
460 println!("\n๐ Step 8: Final Statistics and Performance");
462 sleep(Duration::from_secs(3)).await; if let Ok(stats) = manager.get_stats().await {
465 println!("โ
Final Manager Statistics:");
466 println!(" Active connections: {}", stats.active_connections);
467 println!(" Total symbols: {}", stats.total_symbols);
468 println!(" Total messages: {}", stats.total_messages_received);
469 println!(" Total errors: {}", stats.total_errors);
470
471 for (i, conn_stats) in stats.connection_stats.iter().enumerate() {
472 println!(
473 " Connection {}: {} symbols, {} messages",
474 i + 1,
475 conn_stats.symbol_count,
476 conn_stats.messages_received
477 );
478 }
479 }
480
481 println!("\nโก Step 9: Final Parser Performance");
483 let processor_stats = manager.get_processor_stats().await;
484 for (channel_id, stats) in processor_stats {
485 println!(
486 " {:?}: {:.1} msg/sec, {:?} avg latency",
487 channel_id, stats.messages_per_second, stats.processing_latency_avg
488 );
489 }
490
491 println!("\n๐บ Step 10: Live Data Monitoring (10 seconds)");
493 println!("Monitoring real-time tick data from all dynamically managed connections...");
494
495 let channels = manager.get_all_channels();
496 let mut tasks = Vec::new();
497
498 for (channel_id, mut receiver) in channels {
499 let task = tokio::spawn(async move {
500 let mut count = 0;
501 let start = std::time::Instant::now();
502
503 while start.elapsed() < Duration::from_secs(10) {
504 match timeout(Duration::from_secs(2), receiver.recv()).await {
505 Ok(Ok(message)) => {
506 count += 1;
507 if let TickerMessage::Ticks(ticks) = message {
508 println!("๐ {:?}: {} ticks received", channel_id, ticks.len());
509
510 debug!(
512 "Final monitoring - Message #{}, {} ticks on {:?}",
513 count,
514 ticks.len(),
515 channel_id
516 );
517
518 for tick in &ticks {
519 println!(
520 " ๐น Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}",
521 tick.instrument_token,
522 tick.content.last_price,
523 tick.content.volume_traded,
524 tick.content.net_change
525 );
526
527 debug!(
529 "Final Debug Analysis for Symbol {}:",
530 tick.instrument_token
531 );
532 debug!(
533 " - Timestamp: Now = {:?}",
534 std::time::SystemTime::now()
535 );
536 debug!(" - Data completeness check:");
537 debug!(" * Last Price: {:?} (โ
)", tick.content.last_price);
538 debug!(
539 " * Volume: {:?} ({})",
540 tick.content.volume_traded,
541 if tick.content.volume_traded.is_some() {
542 "โ
"
543 } else {
544 "โ"
545 }
546 );
547 debug!(
548 " * Change: {:?} ({})",
549 tick.content.net_change,
550 if tick.content.net_change.is_some() {
551 "โ
"
552 } else {
553 "โ"
554 }
555 );
556 debug!(" * Mode: {:?} (โ
)", tick.content.mode);
557
558 match tick.content.mode {
560 Mode::Full => {
561 debug!(" - Full mode validation:");
562 debug!(
563 " * OHLC: {}",
564 if tick.content.ohlc.is_some() {
565 "โ
Present"
566 } else {
567 "โ Missing"
568 }
569 );
570 debug!(
571 " * Depth: {}",
572 if tick.content.depth.is_some() {
573 "โ
Present"
574 } else {
575 "โ Missing"
576 }
577 );
578 if let Some(ohlc) = &tick.content.ohlc {
579 debug!(
580 " * OHLC Values: O:{} H:{} L:{} C:{}",
581 ohlc.open, ohlc.high, ohlc.low, ohlc.close
582 );
583 }
584 if let Some(depth) = &tick.content.depth {
585 debug!(
586 " * Depth Levels: {} buy, {} sell",
587 depth.buy.len(),
588 depth.sell.len()
589 );
590 }
591 }
592 Mode::Quote => {
593 debug!(" - Quote mode validation:");
594 debug!(
595 " * OHLC: {}",
596 if tick.content.ohlc.is_some() {
597 "โ
Present"
598 } else {
599 "โ Missing"
600 }
601 );
602 if let Some(ohlc) = &tick.content.ohlc {
603 debug!(
604 " * OHLC Values: O:{} H:{} L:{} C:{}",
605 ohlc.open, ohlc.high, ohlc.low, ohlc.close
606 );
607 }
608 }
609 Mode::LTP => {
610 debug!(" - LTP mode validation: โ
Basic data only");
611 }
612 }
613 }
614 } else {
615 println!("๐ {:?}: Non-tick message received", channel_id);
616 debug!("Message type: {:?}", message);
617 }
618 }
619 _ => continue,
620 }
621 }
622 (channel_id, count)
623 });
624 tasks.push(task);
625 }
626
627 for task in tasks {
629 if let Ok((channel_id, count)) = task.await {
630 println!(
631 "๐ {:?}: {} total messages in 10 seconds",
632 channel_id, count
633 );
634 }
635 }
636
637 println!("\n๐งน Step 11: Final Cleanup Demonstration");
639 let current_symbols: Vec<u32> = manager
640 .get_symbol_distribution()
641 .values()
642 .flat_map(|symbols| symbols.iter().cloned())
643 .collect();
644
645 println!(
646 "Attempting to unsubscribe from {} remaining symbols...",
647 current_symbols.len()
648 );
649
650 match manager.unsubscribe_symbols(¤t_symbols).await {
651 Ok(()) => {
652 print_distribution(manager, "After complete cleanup").await;
653 println!("โ
Complete cleanup successful!");
654 }
655 Err(e) => {
656 println!("โ ๏ธ Cleanup encountered issues: {}", e);
657 println!("๐ก This demonstrates the current architecture capabilities");
658 }
659 }
660
661 println!("\nโ
Dynamic Subscription Demo Completed!");
662 println!("๐ฏ Key Dynamic Features Demonstrated:");
663 println!(" โ
Runtime symbol addition across multiple batches");
664 println!(" โ
Runtime symbol removal with proper tracking");
665 println!(" โ
Dynamic mode changes for existing symbols");
666 println!(" โ
Real-time capacity distribution and monitoring");
667 println!(" โ
Independent message processing per connection");
668 println!(" โ
High-performance parser tasks with statistics");
669 println!(" โ
Complete subscription lifecycle management");
670
671 Ok(())
672}
673
674async fn print_distribution(manager: &KiteTickerManager, context: &str) {
675 let distribution = manager.get_symbol_distribution();
676 println!("\n๐ Symbol Distribution ({}):", context);
677
678 let mut total = 0;
679 for (channel_id, symbols) in &distribution {
680 println!(" {:?}: {} symbols", channel_id, symbols.len());
681 total += symbols.len();
682 }
683 println!(" Total: {} symbols", total);
684}
685
686async fn monitor_ticks_briefly(
687 manager: &mut KiteTickerManager,
688 duration_secs: u64,
689 context: &str,
690) {
691 println!(
692 "\n๐บ {} - Monitoring ticks for {} seconds...",
693 context, duration_secs
694 );
695
696 let channels = manager.get_all_channels();
697 let mut tasks = Vec::new();
698
699 for (channel_id, mut receiver) in channels {
700 let task = tokio::spawn(async move {
701 let mut count = 0;
702 let start = std::time::Instant::now();
703
704 while start.elapsed() < Duration::from_secs(duration_secs) {
705 match timeout(Duration::from_secs(2), receiver.recv()).await {
706 Ok(Ok(message)) => {
707 count += 1;
708 if let TickerMessage::Ticks(ticks) = message {
709 println!(
710 "๐ {:?}: {} ticks in batch #{}",
711 channel_id,
712 ticks.len(),
713 count
714 );
715
716 debug!(
718 "Monitoring tick batch #{} with {} ticks on {:?}",
719 count,
720 ticks.len(),
721 channel_id
722 );
723
724 for (idx, tick) in ticks.iter().enumerate() {
725 println!(" ๐น Symbol: {}, LTP: {:?}, Volume: {:?}, Change: {:?}, OHLC: [{}/{}/{}/{}]",
726 tick.instrument_token,
727 tick.content.last_price,
728 tick.content.volume_traded,
729 tick.content.net_change,
730 tick.content.ohlc.as_ref().map(|o| o.open).unwrap_or(0.0),
731 tick.content.ohlc.as_ref().map(|o| o.high).unwrap_or(0.0),
732 tick.content.ohlc.as_ref().map(|o| o.low).unwrap_or(0.0),
733 tick.content.ohlc.as_ref().map(|o| o.close).unwrap_or(0.0)
734 );
735
736 debug!(
738 "Tick {} metadata for symbol {}:",
739 idx + 1,
740 tick.instrument_token
741 );
742 debug!(" - Received at: {:?}", std::time::SystemTime::now());
743 debug!(" - Mode: {:?}", tick.content.mode);
744 debug!(" - Last Qty: <not available>");
745 debug!(" - Avg Price: {:?}", tick.content.avg_traded_price);
746 debug!(
747 " - Buy/Sell Qty: {:?}/{:?}",
748 tick.content.total_buy_qty, tick.content.total_sell_qty
749 );
750
751 if tick.content.mode == Mode::Full
752 || tick.content.mode == Mode::Quote
753 {
754 if tick.content.ohlc.is_some() {
755 debug!(
756 " - โ
OHLC data present for symbol {}",
757 tick.instrument_token
758 );
759 } else {
760 debug!(" - โ OHLC data missing for symbol {} (expected for mode: {:?})",
761 tick.instrument_token, tick.content.mode);
762 }
763 }
764
765 if tick.content.mode == Mode::Full {
766 if let Some(depth) = &tick.content.depth {
767 debug!(" - โ
Market depth present for symbol {}: {} buy, {} sell levels",
768 tick.instrument_token, depth.buy.len(), depth.sell.len());
769 } else {
770 debug!(" - โ Market depth missing for symbol {} (expected for full mode)",
771 tick.instrument_token);
772 }
773 }
774 }
775 } else {
776 debug!(
777 "Non-tick message received on {:?}: {:?}",
778 channel_id, message
779 );
780 }
781 }
782 _ => continue,
783 }
784 }
785 (channel_id, count)
786 });
787 tasks.push(task);
788 }
789
790 for task in tasks {
792 if let Ok((channel_id, count)) = task.await {
793 println!(
794 "๐ {:?}: {} total messages in {} seconds",
795 channel_id, count, duration_secs
796 );
797 }
798 }
799}
800
801async fn demonstrate_offline_dynamic_architecture() {
802 println!("\n๐ Dynamic Subscription Architecture:");
803 println!("โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ");
804
805 println!("\n๐ฏ Key Dynamic Features:");
806 println!(" โ
Runtime symbol addition/removal per connection");
807 println!(" โ
Mode changes for existing symbols");
808 println!(" โ
Intelligent load balancing across 3 connections");
809 println!(" โ
Real-time capacity monitoring");
810 println!(" โ
Efficient WebSocket command batching");
811
812 println!("\n๐ Capacity Management:");
813 println!(" ๐น Connection 1: 0-3000 symbols");
814 println!(" ๐น Connection 2: 0-3000 symbols");
815 println!(" ๐น Connection 3: 0-3000 symbols");
816 println!(" ๐น Total Capacity: 9000 symbols");
817
818 println!("\nโก Dynamic Operations:");
819 println!(" ```rust");
820 println!(" // Add symbols at runtime");
821 println!(
822 " manager.subscribe_symbols(&[408065, 884737], Some(Mode::Full)).await?;"
823 );
824 println!(" ");
825 println!(" // Remove symbols");
826 println!(" manager.unsubscribe_symbols(&[408065]).await?;");
827 println!(" ");
828 println!(" // Change subscription mode");
829 println!(" manager.change_mode(&[884737], Mode::Quote).await?;");
830 println!(" ");
831 println!(" // Check distribution");
832 println!(" let distribution = manager.get_symbol_distribution();");
833 println!(" ```");
834
835 println!("\n๐ Performance Benefits:");
836 println!(" โก No connection restarts needed");
837 println!(" โก Minimal network overhead");
838 println!(" โก Automatic load balancing");
839 println!(" โก Real-time capacity monitoring");
840
841 println!("\n๐ก Use Cases:");
842 println!(" ๐ Algorithmic trading with changing watchlists");
843 println!(" ๐ Market scanners with dynamic filters");
844 println!(" ๐ Event-driven subscription management");
845 println!(" โฐ Time-based symbol rotation");
846
847 println!("\n๐ง To test with real data:");
848 println!(" export KITE_API_KEY=your_api_key");
849 println!(" export KITE_ACCESS_TOKEN=your_access_token");
850 println!(" export RUST_LOG=debug");
851 println!(" cargo run --example dynamic_subscription_demo");
852}