market_scanner/
market_scanner.rs

1// Market scanner example - High-volume symbol scanning
2// This example demonstrates scanning large numbers of symbols efficiently
3
4use kiteticker_async_manager::{
5  KiteManagerConfig, KiteTickerManager, Mode, TickerMessage,
6};
7use std::collections::HashMap;
8use std::time::{Duration, Instant};
9use tokio::time::sleep;
10
11#[tokio::main]
12async fn main() -> Result<(), String> {
13  env_logger::init();
14
15  println!("šŸ” Market Scanner Example");
16  println!("=========================");
17
18  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
19  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
20
21  if api_key.is_empty() || access_token.is_empty() {
22    return Err("Please set KITE_API_KEY and KITE_ACCESS_TOKEN".to_string());
23  }
24
25  // High-performance configuration for scanning
26  let config = KiteManagerConfig {
27    max_connections: 3,
28    max_symbols_per_connection: 3000,
29    connection_buffer_size: 20000, // Large buffer for high volume
30    parser_buffer_size: 50000,     // Even larger for parsed data
31    enable_dedicated_parsers: true,
32    default_mode: Mode::LTP, // LTP mode for scanning (minimal bandwidth)
33    ..Default::default()
34  };
35
36  let mut manager = KiteTickerManager::new(api_key, access_token, config);
37  manager.start().await?;
38
39  // Large symbol set for market scanning
40  let large_symbol_set = generate_symbol_list(8000); // 8000 symbols across 3 connections
41
42  println!(
43    "šŸ“Š Scanning {} symbols across {} connections",
44    large_symbol_set.len(),
45    3
46  );
47
48  let start_time = Instant::now();
49  manager
50    .subscribe_symbols(&large_symbol_set, Some(Mode::LTP))
51    .await?;
52  println!(
53    "āœ… Subscribed to {} symbols in {:?}",
54    large_symbol_set.len(),
55    start_time.elapsed()
56  );
57
58  // Get all channels and start parallel processing
59  let channels = manager.get_all_channels();
60  let mut handles = Vec::new();
61
62  for (channel_id, mut receiver) in channels {
63    let handle = tokio::spawn(async move {
64      let mut scanner = MarketScanner::new(channel_id);
65
66      while let Ok(message) = receiver.recv().await {
67        if let TickerMessage::Ticks(ticks) = message {
68          scanner.process_ticks(ticks).await;
69        }
70      }
71
72      scanner.print_summary();
73    });
74
75    handles.push(handle);
76  }
77
78  // Run scanner for specified duration
79  println!("šŸ”„ Scanning market for 30 seconds...");
80  sleep(Duration::from_secs(30)).await;
81
82  // Stop manager
83  manager.stop().await?;
84
85  // Wait for all scanners to complete
86  for handle in handles {
87    let _ = handle.await;
88  }
89
90  println!("šŸ Market scanning completed");
91  Ok(())
92}
93
94struct MarketScanner {
95  channel_id: kiteticker_async_manager::ChannelId,
96  tick_count: u64,
97  symbol_prices: HashMap<u32, f64>,
98  price_changes: HashMap<u32, f64>,
99  start_time: Instant,
100  last_update: Instant,
101}
102
103impl MarketScanner {
104  fn new(channel_id: kiteticker_async_manager::ChannelId) -> Self {
105    let now = Instant::now();
106    Self {
107      channel_id,
108      tick_count: 0,
109      symbol_prices: HashMap::new(),
110      price_changes: HashMap::new(),
111      start_time: now,
112      last_update: now,
113    }
114  }
115
116  async fn process_ticks(
117    &mut self,
118    ticks: Vec<kiteticker_async_manager::TickMessage>,
119  ) {
120    for tick in ticks {
121      self.tick_count += 1;
122
123      if let Some(current_price) = tick.content.last_price {
124        // Track price changes
125        if let Some(&previous_price) =
126          self.symbol_prices.get(&tick.instrument_token)
127        {
128          let change_percent =
129            ((current_price - previous_price) / previous_price) * 100.0;
130          self
131            .price_changes
132            .insert(tick.instrument_token, change_percent);
133
134          // Alert on significant price movements
135          if change_percent.abs() > 2.0 {
136            println!(
137              "🚨 Alert: Symbol {} moved {:.2}% to ₹{:.2}",
138              tick.instrument_token, change_percent, current_price
139            );
140          }
141        }
142
143        self
144          .symbol_prices
145          .insert(tick.instrument_token, current_price);
146      }
147    }
148
149    self.last_update = Instant::now();
150
151    // Print progress every 1000 ticks
152    if self.tick_count % 1000 == 0 {
153      self.print_progress();
154    }
155  }
156
157  fn print_progress(&self) {
158    let elapsed = self.start_time.elapsed();
159    let rate = self.tick_count as f64 / elapsed.as_secs_f64();
160
161    println!(
162      "šŸ“ˆ Channel {:?}: {} ticks, {} symbols, {:.0} ticks/sec",
163      self.channel_id,
164      self.tick_count,
165      self.symbol_prices.len(),
166      rate
167    );
168  }
169
170  fn print_summary(&self) {
171    let elapsed = self.start_time.elapsed();
172    let avg_rate = self.tick_count as f64 / elapsed.as_secs_f64();
173
174    // Find top movers
175    let mut movers: Vec<_> = self.price_changes.iter().collect();
176    movers.sort_by(|a, b| b.1.abs().partial_cmp(&a.1.abs()).unwrap());
177
178    println!("\nšŸ“Š Channel {:?} Summary:", self.channel_id);
179    println!("   Total ticks: {}", self.tick_count);
180    println!("   Unique symbols: {}", self.symbol_prices.len());
181    println!("   Average rate: {:.1} ticks/sec", avg_rate);
182    println!("   Duration: {:?}", elapsed);
183
184    if !movers.is_empty() {
185      println!("   Top 5 movers:");
186      for (symbol, change) in movers.iter().take(5) {
187        if let Some(&price) = self.symbol_prices.get(symbol) {
188          println!("     {} -> {:.2}% (₹{:.2})", symbol, change, price);
189        }
190      }
191    }
192  }
193}
194
195fn generate_symbol_list(count: usize) -> Vec<u32> {
196  // Generate a realistic symbol list for testing
197  // In production, you would load this from a file or database
198  let base_symbols = vec![
199    256265, 408065, 738561, 884737, 341249, 492033, // NIFTY 50 stocks
200    779521, 2953217, 1850625, 2815745, 140481, // Additional large caps
201    1076033, 3876609, 5900545, 2672641, 177665, // Mid caps
202  ];
203
204  let mut symbols = Vec::new();
205  let mut symbol_id = 100000u32;
206
207  // Add base symbols
208  symbols.extend_from_slice(&base_symbols);
209
210  // Generate additional symbols to reach desired count
211  while symbols.len() < count {
212    symbols.push(symbol_id);
213    symbol_id += 1;
214  }
215
216  symbols.truncate(count);
217  symbols
218}