runtime_subscription_example/
runtime_subscription_example.rs

1use kiteticker_async_manager::{
2  KiteManagerConfig, KiteTickerManager, Mode, TickerMessage,
3};
4use std::time::Duration;
5use tokio::time::sleep;
6
7#[tokio::main]
8async fn main() -> Result<(), String> {
9  // Initialize with your API credentials
10  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
11  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
12
13  if api_key.is_empty() || access_token.is_empty() {
14    println!(
15      "āš ļø Please set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables"
16    );
17    return Err("Missing API credentials".to_string());
18  }
19
20  // Create configuration
21  let config = KiteManagerConfig {
22    max_symbols_per_connection: 3000,
23    max_connections: 3,
24    connection_buffer_size: 5000,
25    parser_buffer_size: 10000,
26    connection_timeout: Duration::from_secs(30),
27    health_check_interval: Duration::from_secs(5),
28    max_reconnect_attempts: 5,
29    reconnect_delay: Duration::from_secs(2),
30    enable_dedicated_parsers: true,
31    default_mode: Mode::LTP,
32  };
33
34  // Start the manager
35  let mut manager = KiteTickerManager::new(api_key, access_token, config);
36  manager.start().await?;
37
38  println!("šŸš€ KiteTicker Manager started!");
39
40  // Example of runtime subscription management
41  runtime_subscription_demo(&mut manager).await?;
42
43  // Stop the manager
44  manager.stop().await?;
45  Ok(())
46}
47
48async fn runtime_subscription_demo(
49  manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51  println!("\nšŸ“” Runtime Subscription Management Demo");
52  println!("=====================================");
53
54  // 1. Start with initial symbols
55  println!("\n1ļøāƒ£ Initial subscription to base symbols");
56  let initial_symbols = vec![256265, 265, 256777];
57  manager
58    .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59    .await?;
60  print_current_state(manager, "Initial subscription").await;
61
62  sleep(Duration::from_secs(2)).await;
63
64  // 2. Add more symbols dynamically
65  println!("\n2ļøāƒ£ Adding symbols dynamically");
66  let additional_symbols = vec![274441, 260105, 273929];
67  manager
68    .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69    .await?;
70  print_current_state(manager, "After adding symbols").await;
71
72  sleep(Duration::from_secs(2)).await;
73
74  // 3. Change mode for existing symbols
75  println!("\n3ļøāƒ£ Changing subscription mode");
76  let symbols_for_mode_change = vec![256265, 265];
77  manager
78    .change_mode(&symbols_for_mode_change, Mode::Full)
79    .await?;
80  print_current_state(manager, "After mode change").await;
81
82  sleep(Duration::from_secs(2)).await;
83
84  // 4. Remove some symbols
85  println!("\n4ļøāƒ£ Removing symbols dynamically");
86  let symbols_to_remove = vec![265, 274441];
87  manager.unsubscribe_symbols(&symbols_to_remove).await?;
88  print_current_state(manager, "After removing symbols").await;
89
90  sleep(Duration::from_secs(2)).await;
91
92  // 5. Add different symbols with different modes
93  println!("\n5ļøāƒ£ Adding new symbols with Full mode");
94  let full_mode_symbols = vec![257801, 258825];
95  manager
96    .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97    .await?;
98  print_current_state(manager, "After adding Full mode symbols").await;
99
100  // 6. Monitor live data for a short period
101  println!("\n6ļøāƒ£ Monitoring live data (5 seconds)");
102  monitor_live_data(manager, 5).await?;
103
104  // 7. Complete cleanup
105  println!("\n7ļøāƒ£ Complete cleanup");
106  let all_symbols: Vec<u32> = manager
107    .get_symbol_distribution()
108    .values()
109    .flat_map(|symbols| symbols.iter().cloned())
110    .collect();
111
112  if !all_symbols.is_empty() {
113    manager.unsubscribe_symbols(&all_symbols).await?;
114    print_current_state(manager, "After complete cleanup").await;
115  }
116
117  println!("\nāœ… Runtime subscription demo completed!");
118  Ok(())
119}
120
121async fn print_current_state(manager: &KiteTickerManager, context: &str) {
122  println!("\nšŸ“Š Current State ({})", context);
123
124  // Show distribution
125  let distribution = manager.get_symbol_distribution();
126  let mut total = 0;
127  for (channel_id, symbols) in &distribution {
128    println!("   {:?}: {} symbols", channel_id, symbols.len());
129    total += symbols.len();
130  }
131  println!("   šŸ“ˆ Total: {} symbols", total);
132
133  // Show stats
134  if let Ok(stats) = manager.get_stats().await {
135    println!(
136      "   šŸ“Š Stats: {} connections, {} messages",
137      stats.active_connections, stats.total_messages_received
138    );
139  }
140}
141
142async fn monitor_live_data(
143  manager: &mut KiteTickerManager,
144  seconds: u64,
145) -> Result<(), String> {
146  let channels = manager.get_all_channels();
147  let mut tasks = Vec::new();
148
149  for (channel_id, mut receiver) in channels {
150    let task = tokio::spawn(async move {
151      let mut count = 0;
152      let start = std::time::Instant::now();
153
154      while start.elapsed() < Duration::from_secs(seconds) {
155        match tokio::time::timeout(Duration::from_millis(100), receiver.recv())
156          .await
157        {
158          Ok(Ok(message)) => {
159            count += 1;
160            if let TickerMessage::Ticks(ticks) = message {
161              if count <= 2 {
162                // Show first few ticks
163                println!(
164                  "   šŸ“‹ {:?}: Received {} ticks",
165                  channel_id,
166                  ticks.len()
167                );
168              }
169            }
170          }
171          _ => continue,
172        }
173      }
174      (channel_id, count)
175    });
176    tasks.push(task);
177  }
178
179  // Wait for monitoring to complete
180  for task in tasks {
181    if let Ok((channel_id, count)) = task.await {
182      println!("   šŸ“Š {:?}: {} total messages", channel_id, count);
183    }
184  }
185
186  Ok(())
187}