kiteticker-async-manager 0.4.0

High-performance async WebSocket client for Kite Connect API with multi-connection support, dynamic subscription management, and optimized data processing.
Documentation
use kiteticker_async_manager::{KiteTickerAsync, Mode, TickerMessage};
use std::time::{Duration, Instant};
use tokio::time::timeout;

#[tokio::main]
pub async fn main() -> Result<(), String> {
  println!("🚀 KiteTicker WebSocket Performance Demo");

  // This example demonstrates the performance improvements made to the WebSocket client

  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();

  if api_key.is_empty() || access_token.is_empty() {
    println!(
      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
    );
    println!("   This demo will show the architectural improvements without a live connection");
    demonstrate_offline_improvements().await;
    return Ok(());
  }

  println!("📡 Connecting to Kite WebSocket...");
  let start_time = Instant::now();

  let mut ticker = match timeout(
    Duration::from_secs(10),
    KiteTickerAsync::connect(&api_key, &access_token),
  )
  .await
  {
    Ok(Ok(ticker)) => {
      println!("✅ Connected in {:?}", start_time.elapsed());
      ticker
    }
    Ok(Err(e)) => {
      println!("❌ Connection failed: {}", e);
      return Err(e);
    }
    Err(_) => {
      println!("⏱️  Connection timeout");
      return Err("Connection timeout".to_string());
    }
  };

  // Test multiple instruments for high-frequency data
  let instruments = vec![408065, 5633, 738561, 81153]; // Example NSE instruments
  println!(
    "📊 Subscribing to {} instruments in Full mode",
    instruments.len()
  );

  let mut subscriber = ticker.subscribe(&instruments, Some(Mode::Full)).await?;

  // Performance metrics
  let mut message_count = 0;
  let mut tick_count = 0;
  let start_time = Instant::now();
  let mut last_report = Instant::now();

  println!("📈 Monitoring performance (Ctrl+C to stop)...");

  loop {
    match timeout(Duration::from_secs(30), subscriber.next_message()).await {
      Ok(Ok(Some(msg))) => {
        message_count += 1;

        match msg {
          TickerMessage::Ticks(ticks) => {
            tick_count += ticks.len();

            // Show first few ticks for demonstration
            if message_count <= 5 {
              for tick in &ticks {
                println!(
                  "📋 Tick: {} @ {:?}",
                  tick.instrument_token,
                  tick.content.last_price.unwrap_or(0.0)
                );
              }
            }
          }
          TickerMessage::Error(e) => {
            println!("⚠️  Error: {}", e);
          }
          _ => {
            println!("📨 Other message: {:?}", msg);
          }
        }

        // Report performance every 10 seconds
        if last_report.elapsed() >= Duration::from_secs(10) {
          let elapsed = start_time.elapsed();
          let messages_per_sec = message_count as f64 / elapsed.as_secs_f64();
          let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();

          println!("📊 Performance Report:");
          println!(
            "   Messages: {} ({:.1}/sec)",
            message_count, messages_per_sec
          );
          println!("   Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
          println!("   Memory efficient processing ✅");
          println!("   Bounds checking enabled ✅");
          println!("   Error resilience ✅");

          last_report = Instant::now();
        }
      }
      Ok(Ok(None)) => {
        println!("🔌 Connection closed");
        break;
      }
      Ok(Err(e)) => {
        println!("❌ Message error: {}", e);
        break;
      }
      Err(_) => {
        println!("⏱️  No messages received in 30 seconds");
        println!("💓 Connection monitoring (health check not accessible in subscriber)");
      }
    }
  }

  println!("🏁 Demo completed. Final stats:");
  let elapsed = start_time.elapsed();
  println!("   Total runtime: {:?}", elapsed);
  println!("   Messages processed: {}", message_count);
  println!("   Ticks processed: {}", tick_count);

  Ok(())
}

async fn demonstrate_offline_improvements() {
  println!("\n🔧 Demonstrating WebSocket Client Improvements:");

  println!("\n1. Memory Efficiency:");
  println!("   ✅ Pre-allocated vectors for binary message processing");
  println!("   ✅ Reduced string cloning and allocations");
  println!("   ✅ Efficient HashMap operations for subscriptions");

  println!("\n2. Error Handling:");
  println!("   ✅ Bounds checking for binary packet parsing");
  println!("   ✅ Graceful error recovery without breaking the connection");
  println!("   ✅ Connection health monitoring");

  println!("\n3. Performance Optimizations:");
  println!("   ✅ Increased broadcast channel buffer (100 → 1000)");
  println!("   ✅ Improved task management and resource cleanup");
  println!("   ✅ Better ping/pong handling");

  println!("\n4. Architecture Improvements:");
  println!("   ✅ Separation of concerns between reader/writer tasks");
  println!("   ✅ Non-blocking message processing");
  println!("   ✅ Configurable connection parameters");

  // Simulate some processing time
  tokio::time::sleep(Duration::from_millis(500)).await;

  println!("\n🎯 Key Benefits:");
  println!("   📈 Higher throughput for tick data processing");
  println!("   🛡️  Better resilience to network issues");
  println!("   ⚡ Lower latency message delivery");
  println!("   🔧 Easier debugging and monitoring");

  println!("\n💡 To test with real data, set KITE_API_KEY and KITE_ACCESS_TOKEN environment variables");
}