market_scanner/
market_scanner.rs1use kiteticker_async_manager::{
5 KiteManagerConfig, KiteTickerManager, Mode, TickerMessage,
6};
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9use tokio::time::sleep;
10
11#[tokio::main]
12async fn main() -> Result<(), String> {
13 env_logger::init();
14
15 println!("š Market Scanner Example");
16 println!("=========================");
17
18 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21 if api_key.is_empty() || access_token.is_empty() {
22 return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23 }
24
25 let config = KiteManagerConfig {
27 max_connections: 3,
28 max_symbols_per_connection: 3000,
29 connection_buffer_size: 20000, parser_buffer_size: 50000, enable_dedicated_parsers: true,
32 default_mode: Mode::LTP, ..Default::default()
34 };
35
36 let mut manager = KiteTickerManager::new(api_key, access_token, config);
37 manager.start().await?;
38
39 let large_symbol_set = generate_symbol_list(8000); println!(
43 "š Scanning {} symbols across {} connections",
44 large_symbol_set.len(),
45 3
46 );
47
48 let start_time = Instant::now();
49 manager
50 .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51 .await?;
52 println!(
53 "ā
Subscribed to {} symbols in {:?}",
54 large_symbol_set.len(),
55 start_time.elapsed()
56 );
57
58 let channels = manager.get_all_channels();
60 let mut handles = Vec::new();
61
62 for (channel_id, mut receiver) in channels {
63 let handle = tokio::spawn(async move {
64 let mut scanner = MarketScanner::new(channel_id);
65
66 while let Ok(message) = receiver.recv().await {
67 if let TickerMessage::Ticks(ticks) = message {
68 scanner.process_ticks(ticks).await;
69 }
70 }
71
72 scanner.print_summary();
73 });
74
75 handles.push(handle);
76 }
77
78 println!("š Scanning market for 30 seconds...");
80 sleep(Duration::from_secs(30)).await;
81
82 manager.stop().await?;
84
85 for handle in handles {
87 let _ = handle.await;
88 }
89
90 println!("š Market scanning completed");
91 Ok(())
92}
93
94struct MarketScanner {
95 channel_id: kiteticker_async_manager::ChannelId,
96 tick_count: u64,
97 symbol_prices: HashMap<u32, f64>,
98 price_changes: HashMap<u32, f64>,
99 start_time: Instant,
100 last_update: Instant,
101}
102
103impl MarketScanner {
104 fn new(channel_id: kiteticker_async_manager::ChannelId) -> Self {
105 let now = Instant::now();
106 Self {
107 channel_id,
108 tick_count: 0,
109 symbol_prices: HashMap::new(),
110 price_changes: HashMap::new(),
111 start_time: now,
112 last_update: now,
113 }
114 }
115
116 async fn process_ticks(
117 &mut self,
118 ticks: Vec<kiteticker_async_manager::TickMessage>,
119 ) {
120 for tick in ticks {
121 self.tick_count += 1;
122
123 if let Some(current_price) = tick.content.last_price {
124 if let Some(&previous_price) =
126 self.symbol_prices.get(&tick.instrument_token)
127 {
128 let change_percent =
129 ((current_price - previous_price) / previous_price) * 100.0;
130 self
131 .price_changes
132 .insert(tick.instrument_token, change_percent);
133
134 if change_percent.abs() > 2.0 {
136 println!(
137 "šØ Alert: Symbol {} moved {:.2}% to ā¹{:.2}",
138 tick.instrument_token, change_percent, current_price
139 );
140 }
141 }
142
143 self
144 .symbol_prices
145 .insert(tick.instrument_token, current_price);
146 }
147 }
148
149 self.last_update = Instant::now();
150
151 if self.tick_count % 1000 == 0 {
153 self.print_progress();
154 }
155 }
156
157 fn print_progress(&self) {
158 let elapsed = self.start_time.elapsed();
159 let rate = self.tick_count as f64 / elapsed.as_secs_f64();
160
161 println!(
162 "š Channel {:?}: {} ticks, {} symbols, {:.0} ticks/sec",
163 self.channel_id,
164 self.tick_count,
165 self.symbol_prices.len(),
166 rate
167 );
168 }
169
170 fn print_summary(&self) {
171 let elapsed = self.start_time.elapsed();
172 let avg_rate = self.tick_count as f64 / elapsed.as_secs_f64();
173
174 let mut movers: Vec<_> = self.price_changes.iter().collect();
176 movers.sort_by(|a, b| b.1.abs().partial_cmp(&a.1.abs()).unwrap());
177
178 println!("\nš Channel {:?} Summary:", self.channel_id);
179 println!(" Total ticks: {}", self.tick_count);
180 println!(" Unique symbols: {}", self.symbol_prices.len());
181 println!(" Average rate: {:.1} ticks/sec", avg_rate);
182 println!(" Duration: {:?}", elapsed);
183
184 if !movers.is_empty() {
185 println!(" Top 5 movers:");
186 for (symbol, change) in movers.iter().take(5) {
187 if let Some(&price) = self.symbol_prices.get(symbol) {
188 println!(" {} -> {:.2}% (ā¹{:.2})", symbol, change, price);
189 }
190 }
191 }
192 }
193}
194
195fn generate_symbol_list(count: usize) -> Vec<u32> {
196 let base_symbols = vec![
199 256265, 408065, 738561, 884737, 341249, 492033, 779521, 2953217, 1850625, 2815745, 140481, 1076033, 3876609, 5900545, 2672641, 177665, ];
203
204 let mut symbols = Vec::new();
205 let mut symbol_id = 100000u32;
206
207 symbols.extend_from_slice(&base_symbols);
209
210 while symbols.len() < count {
212 symbols.push(symbol_id);
213 symbol_id += 1;
214 }
215
216 symbols.truncate(count);
217 symbols
218}