runtime_subscription_example/
runtime_subscription_example.rs1use kiteticker_async_manager::{
2 KiteManagerConfig, KiteTickerManager, Mode, TickerMessage,
3};
4use std::time::Duration;
5use tokio::time::sleep;
6
7#[tokio::main]
8async fn main() -> Result<(), String> {
9 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
11 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
12
13 if api_key.is_empty() || access_token.is_empty() {
14 println!(
15 "ā ļø Please set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables"
16 );
17 return Err("Missing API credentials".to_string());
18 }
19
20 let config = KiteManagerConfig {
22 max_symbols_per_connection: 3000,
23 max_connections: 3,
24 connection_buffer_size: 5000,
25 parser_buffer_size: 10000,
26 connection_timeout: Duration::from_secs(30),
27 health_check_interval: Duration::from_secs(5),
28 max_reconnect_attempts: 5,
29 reconnect_delay: Duration::from_secs(2),
30 enable_dedicated_parsers: true,
31 default_mode: Mode::LTP,
32 };
33
34 let mut manager = KiteTickerManager::new(api_key, access_token, config);
36 manager.start().await?;
37
38 println!("š KiteTicker Manager started!");
39
40 runtime_subscription_demo(&mut manager).await?;
42
43 manager.stop().await?;
45 Ok(())
46}
47
48async fn runtime_subscription_demo(
49 manager: &mut KiteTickerManager,
50) -> Result<(), String> {
51 println!("\nš” Runtime Subscription Management Demo");
52 println!("=====================================");
53
54 println!("\n1ļøā£ Initial subscription to base symbols");
56 let initial_symbols = vec![256265, 265, 256777];
57 manager
58 .subscribe_symbols(&initial_symbols, Some(Mode::LTP))
59 .await?;
60 print_current_state(manager, "Initial subscription").await;
61
62 sleep(Duration::from_secs(2)).await;
63
64 println!("\n2ļøā£ Adding symbols dynamically");
66 let additional_symbols = vec![274441, 260105, 273929];
67 manager
68 .subscribe_symbols(&additional_symbols, Some(Mode::Quote))
69 .await?;
70 print_current_state(manager, "After adding symbols").await;
71
72 sleep(Duration::from_secs(2)).await;
73
74 println!("\n3ļøā£ Changing subscription mode");
76 let symbols_for_mode_change = vec![256265, 265];
77 manager
78 .change_mode(&symbols_for_mode_change, Mode::Full)
79 .await?;
80 print_current_state(manager, "After mode change").await;
81
82 sleep(Duration::from_secs(2)).await;
83
84 println!("\n4ļøā£ Removing symbols dynamically");
86 let symbols_to_remove = vec![265, 274441];
87 manager.unsubscribe_symbols(&symbols_to_remove).await?;
88 print_current_state(manager, "After removing symbols").await;
89
90 sleep(Duration::from_secs(2)).await;
91
92 println!("\n5ļøā£ Adding new symbols with Full mode");
94 let full_mode_symbols = vec![257801, 258825];
95 manager
96 .subscribe_symbols(&full_mode_symbols, Some(Mode::Full))
97 .await?;
98 print_current_state(manager, "After adding Full mode symbols").await;
99
100 println!("\n6ļøā£ Monitoring live data (5 seconds)");
102 monitor_live_data(manager, 5).await?;
103
104 println!("\n7ļøā£ Complete cleanup");
106 let all_symbols: Vec<u32> = manager
107 .get_symbol_distribution()
108 .values()
109 .flat_map(|symbols| symbols.iter().cloned())
110 .collect();
111
112 if !all_symbols.is_empty() {
113 manager.unsubscribe_symbols(&all_symbols).await?;
114 print_current_state(manager, "After complete cleanup").await;
115 }
116
117 println!("\nā
Runtime subscription demo completed!");
118 Ok(())
119}
120
121async fn print_current_state(manager: &KiteTickerManager, context: &str) {
122 println!("\nš Current State ({})", context);
123
124 let distribution = manager.get_symbol_distribution();
126 let mut total = 0;
127 for (channel_id, symbols) in &distribution {
128 println!(" {:?}: {} symbols", channel_id, symbols.len());
129 total += symbols.len();
130 }
131 println!(" š Total: {} symbols", total);
132
133 if let Ok(stats) = manager.get_stats().await {
135 println!(
136 " š Stats: {} connections, {} messages",
137 stats.active_connections, stats.total_messages_received
138 );
139 }
140}
141
142async fn monitor_live_data(
143 manager: &mut KiteTickerManager,
144 seconds: u64,
145) -> Result<(), String> {
146 let channels = manager.get_all_channels();
147 let mut tasks = Vec::new();
148
149 for (channel_id, mut receiver) in channels {
150 let task = tokio::spawn(async move {
151 let mut count = 0;
152 let start = std::time::Instant::now();
153
154 while start.elapsed() < Duration::from_secs(seconds) {
155 match tokio::time::timeout(Duration::from_millis(100), receiver.recv())
156 .await
157 {
158 Ok(Ok(message)) => {
159 count += 1;
160 if let TickerMessage::Ticks(ticks) = message {
161 if count <= 2 {
162 println!(
164 " š {:?}: Received {} ticks",
165 channel_id,
166 ticks.len()
167 );
168 }
169 }
170 }
171 _ => continue,
172 }
173 }
174 (channel_id, count)
175 });
176 tasks.push(task);
177 }
178
179 for task in tasks {
181 if let Ok((channel_id, count)) = task.await {
182 println!(" š {:?}: {} total messages", channel_id, count);
183 }
184 }
185
186 Ok(())
187}