use kiteticker_async_manager::{
DistributionStrategy, Mode, MultiApiKiteTickerManager, TickerMessage,
};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), String> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.init();
println!("đ Multi-API KiteTicker Manager Demo\n");
let credentials = load_credentials()?;
if credentials.is_empty() {
return Err("No API credentials found. Please set KITE_API_KEY_N and KITE_ACCESS_TOKEN_N environment variables.".to_string());
}
println!("đ Loaded {} API key(s):\n", credentials.len());
for (id, _, _) in &credentials {
println!(" â {}", id);
}
println!();
let mut builder = MultiApiKiteTickerManager::builder()
.max_connections_per_api(3)
.max_symbols_per_connection(3000)
.distribution_strategy(DistributionStrategy::RoundRobin)
.default_mode(Mode::Quote)
.enable_health_monitoring(true);
for (id, api_key, access_token) in credentials {
builder = builder.add_api_key(id, api_key, access_token);
}
let mut manager = builder.build();
println!("đ Starting multi-API manager...");
manager.start().await?;
println!("â
Manager started successfully\n");
let nifty_symbols = vec![
256265, 260105, ];
let stock_symbols = vec![
408065, 738561, 779521, 492033, ];
println!("đ Example 1: Auto-distributing symbols across API keys");
manager
.subscribe_symbols(&nifty_symbols, Some(Mode::Quote))
.await?;
println!(" â Subscribed to {} index symbols\n", nifty_symbols.len());
let distribution = manager.get_symbol_distribution();
println!("đ Symbol Distribution:");
for (api_key_id, conn_map) in &distribution {
println!(" API Key: {}", api_key_id.0);
for (conn_idx, symbols) in conn_map {
println!(" Connection {}: {:?}", conn_idx, symbols);
}
}
println!();
let api_keys = manager.get_api_keys();
if !api_keys.is_empty() {
let first_api = &api_keys[0];
println!("đ Example 2: Manually assigning symbols to {}", first_api.0);
manager
.subscribe_symbols_to_api(first_api.clone(), &stock_symbols, Some(Mode::LTP))
.await?;
println!(" â Subscribed to {} stock symbols\n", stock_symbols.len());
}
println!("đ¨ Example 3: Receiving unified message stream");
println!(" (Press Ctrl+C to stop)\n");
let mut unified_channel = manager.get_unified_channel();
let stats_handle = tokio::spawn({
let manager_clone = manager.get_api_keys();
async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
println!("\nđ Periodic Stats Check");
println!(" API Keys: {:?}", manager_clone);
}
}
});
let mut message_count = 0;
let mut tick_count_by_api = std::collections::HashMap::new();
let timeout_duration = Duration::from_secs(60); let start_time = std::time::Instant::now();
while start_time.elapsed() < timeout_duration {
match tokio::time::timeout(Duration::from_secs(5), unified_channel.recv()).await
{
Ok(Ok((api_key_id, message))) => {
message_count += 1;
match message {
TickerMessage::Ticks(ticks) => {
let count = tick_count_by_api.entry(api_key_id.0.clone()).or_insert(0);
*count += ticks.len();
if message_count % 10 == 0 {
println!(
" [{:>15}] Received {} ticks (Total messages: {})",
api_key_id.0,
ticks.len(),
message_count
);
if let Some(tick) = ticks.first() {
println!(
" ââ Token: {}, Mode: {:?}, LTP: {:?}",
tick.instrument_token,
tick.content.mode,
tick.content.last_price
);
}
}
}
TickerMessage::Error(err) => {
println!(" [{:>15}] Error: {}", api_key_id.0, err);
}
other => {
println!(" [{:>15}] Other message: {:?}", api_key_id.0, other);
}
}
}
Ok(Err(e)) => {
println!(" â ī¸ Channel error: {}", e);
}
Err(_) => {
println!(" âąī¸ No messages for 5 seconds, checking stats...");
let stats = manager.get_stats().await;
println!("\nđ Current Statistics:");
println!(" Total API Keys: {}", stats.total_api_keys);
println!(" Total Connections: {}", stats.total_connections);
println!(" Total Symbols: {}", stats.total_symbols);
println!(" Total Messages: {}", stats.total_messages_received);
println!(" Uptime: {:?}", stats.uptime);
println!("\n Per-API Stats:");
for api_stat in &stats.per_api_stats {
println!(" {}:", api_stat.api_key_id);
println!(" Active Connections: {}", api_stat.active_connections);
println!(" Symbols: {}", api_stat.total_symbols);
println!(" Messages: {}", api_stat.total_messages_received);
}
println!();
}
}
}
println!("\nâšī¸ Demo completed after 60 seconds\n");
println!("đ Final Tick Count by API Key:");
for (api_key, count) in tick_count_by_api {
println!(" {}: {} ticks", api_key, count);
}
println!();
let final_stats = manager.get_stats().await;
println!("đ Final Statistics:");
println!(" Total Messages Received: {}", final_stats.total_messages_received);
println!(" Total Messages Parsed: {}", final_stats.total_messages_parsed);
println!(" Total Errors: {}", final_stats.total_errors);
println!(" Uptime: {:?}", final_stats.uptime);
println!("\nđ Example 4: Dynamic subscription management");
if !nifty_symbols.is_empty() {
println!(" Changing mode for index symbols to Full...");
manager
.change_mode(&nifty_symbols, Mode::Full)
.await?;
println!(" â Mode changed");
}
tokio::time::sleep(Duration::from_secs(2)).await;
if !stock_symbols.is_empty() {
println!(" Unsubscribing from stock symbols...");
manager.unsubscribe_symbols(&stock_symbols).await?;
println!(" â Unsubscribed from {} symbols", stock_symbols.len());
}
println!("\nđ Updated Symbol Distribution:");
let final_distribution = manager.get_symbol_distribution();
for (api_key_id, conn_map) in &final_distribution {
println!(" API Key: {}", api_key_id.0);
for (conn_idx, symbols) in conn_map {
println!(" Connection {}: {:?}", conn_idx, symbols);
}
}
println!();
stats_handle.abort();
let _ = stats_handle.await;
println!("đ Stopping manager...");
manager.stop().await?;
println!("â
Manager stopped successfully\n");
println!("đ Multi-API demo completed!");
Ok(())
}
fn load_credentials() -> Result<Vec<(String, String, String)>, String> {
let mut credentials = Vec::new();
for i in 1..=5 {
let api_key_var = format!("KITE_API_KEY_{}", i);
let access_token_var = format!("KITE_ACCESS_TOKEN_{}", i);
if let (Ok(api_key), Ok(access_token)) = (
std::env::var(&api_key_var),
std::env::var(&access_token_var),
) {
let id = format!("account_{}", i);
credentials.push((id, api_key, access_token));
}
}
Ok(credentials)
}