Crate kiteticker_async_manager

Crate kiteticker_async_manager 

Source
Expand description

§KiteTicker Async Manager

High-performance async WebSocket client for the Kite Connect API with multi-connection support and dynamic subscription management.

§Features

  • 🚀 Multi-Connection Support - Utilize all 3 allowed WebSocket connections (9,000 symbol capacity)
  • ⚡ High Performance - Dedicated parser tasks, optimized buffers, sub-microsecond latency
  • 🔄 Dynamic Subscriptions - Add/remove symbols at runtime without reconnection
  • 📊 Load Balancing - Automatic symbol distribution across connections
  • 💪 Production Ready - Comprehensive error handling, health monitoring, reconnection
  • 🔧 Async-First Design - Built with Tokio, follows Rust async best practices

§Quick Start

Using the builder:

use kiteticker_async_manager::{KiteTickerManagerBuilder, Mode};
#[tokio::main]
async fn main() -> Result<(), String> {
  let api_key = std::env::var("KITE_API_KEY").unwrap();
  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
  let mut manager = KiteTickerManagerBuilder::new(api_key, access_token)
      .max_connections(3)
      .max_symbols_per_connection(3000)
      .raw_only(true) // receive only raw frames if desired
      .default_mode(Mode::Quote)
      .enable_dedicated_parsers(true)
      .build();
  manager.start().await?;
  Ok(())
}
use kiteticker_async_manager::{KiteTickerManager, KiteManagerConfig, Mode, TickerMessage};

#[tokio::main]
async fn main() -> Result<(), String> {
    // Setup credentials
    let api_key = std::env::var("KITE_API_KEY").unwrap();
    let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
     
    // Create high-performance manager
    let config = KiteManagerConfig {
        max_connections: 3,
        max_symbols_per_connection: 3000,
        enable_dedicated_parsers: true,
        default_mode: Mode::LTP,
        ..Default::default()
    };
     
    // Start manager
    let mut manager = KiteTickerManager::new(api_key, access_token, config);
    manager.start().await?;
     
    // Subscribe to symbols (automatically distributed)
    let symbols = vec![256265, 408065, 738561]; // NIFTY 50, HDFC Bank, Reliance
    manager.subscribe_symbols(&symbols, Some(Mode::Quote)).await?;
     
    // Process data from independent channels
    let channels = manager.get_all_channels();
    for (channel_id, mut receiver) in channels {
        tokio::spawn(async move {
            while let Ok(message) = receiver.recv().await {
                if let TickerMessage::Ticks(ticks) = message {
                    for tick in ticks {
                        println!("Channel {:?}: {} @ ₹{:.2}",
                            channel_id,
                            tick.instrument_token,
                            tick.content.last_price.unwrap_or(0.0));
                    }
                }
            }
        });
    }
     
    // Dynamic operations
    manager.subscribe_symbols(&[5633, 884737], Some(Mode::Full)).await?;  // Add
    manager.unsubscribe_symbols(&[408065]).await?;                        // Remove
    manager.change_mode(&[256265], Mode::Full).await?;                    // Change mode
     
    Ok(())
}

§Single Connection Usage

use kiteticker_async_manager::{KiteTickerAsync, Mode, TickerMessage};

#[tokio::main]
async fn main() -> Result<(), String> {
    let api_key = std::env::var("KITE_API_KEY").unwrap();
    let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
     
    // Connect to WebSocket
    let mut ticker = KiteTickerAsync::connect(&api_key, &access_token).await?;
     
    // Subscribe to symbols
    let symbols = vec![256265, 408065]; // NIFTY 50, HDFC Bank
    let mut subscriber = ticker.subscribe(&symbols, Some(Mode::LTP)).await?;
     
    // Receive data
    while let Ok(Some(message)) = subscriber.next_message().await {
        if let TickerMessage::Ticks(ticks) = message {
            for tick in ticks {
                println!("Symbol {}: ₹{:.2}",
                    tick.instrument_token,
                    tick.content.last_price.unwrap_or(0.0));
            }
        }
    }
     
    Ok(())
}

§Performance Comparison

FeatureSingle ConnectionMulti-Connection ManagerImprovement
Max Symbols3,0009,0003x capacity
ThroughputLimited by 1 connection3 parallel connections3x throughput
Latency~5-10µs~1-2µs5x faster
ResilienceSingle point of failure3 independent connectionsHigh availability
Dynamic OpsManual reconnectionRuntime add/removeZero downtime

§Architecture

The library provides two main components:

§1. KiteTickerAsync - Single WebSocket Connection

  • Direct WebSocket client for simple use cases
  • Up to 3,000 symbols per connection
  • Manual connection management
  • Manages up to 3 WebSocket connections automatically
  • Supports up to 9,000 symbols total
  • Dynamic subscription management
  • Load balancing and health monitoring
  • High-performance optimizations

§Subscription Modes

The library supports three subscription modes:

  • Mode::LTP - Last traded price only (minimal bandwidth)
  • Mode::Quote - Price + volume + OHLC (standard data)
  • Mode::Full - Complete market depth (maximum data)

§Zero-copy raw access (advanced)

For maximum throughput and minimal allocations, you can work directly with the raw WebSocket frame bytes and view individual packets using zero-copy, endian-safe types. This is fully safe and avoids undefined behavior by using zerocopy::Ref and big-endian field wrappers.

Key points:

  • Subscribe to raw frames via KiteTickerAsync::subscribe_raw_frames() or KiteTickerManager::get_raw_frame_channel(ChannelId) / get_all_raw_frame_channels(), which yield bytes::Bytes frames.
  • Extract packet bodies (length-prefixed) from a frame and select the size you need.
  • Use helpers like as_tick_raw, as_index_quote_32, and as_inst_header_64 to obtain zerocopy::Ref<&[u8], T> that dereferences to a typed view.
  • The Ref is valid as long as the backing bytes live; examples store Bytes to keep it alive.

Example (snippets):

use kiteticker_async_manager::{KiteTickerAsync, Mode, as_tick_raw};
use bytes::Bytes;

let api_key = std::env::var("KITE_API_KEY").unwrap();
let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
let mut ticker = KiteTickerAsync::connect_with_options(&api_key, &access_token, true).await?;
let _sub = ticker.subscribe(&[256265], Some(Mode::Full)).await?;
let mut frames = ticker.subscribe_raw_frames();

// Receive a frame and pull out a 184-byte Full packet body
let frame: Bytes = frames.recv().await.unwrap();
let num = u16::from_be_bytes([frame[0], frame[1]]) as usize;
let mut off = 2usize;
for _ in 0..num {
  let len = u16::from_be_bytes([frame[off], frame[off+1]]) as usize;
  let body = frame.slice(off+2..off+2+len);
  if len == 184 {
    if let Some(view_ref) = as_tick_raw(&body) {
      let tick = &*view_ref; // &TickRaw
      let token = tick.header.instrument_token.get();
      let ltp_scaled = tick.header.last_price.get();
      // ... use fields ...
    }
  }
  off += 2 + len;
}

Manager-level example (per-connection frames):

use kiteticker_async_manager::{KiteTickerManagerBuilder, Mode, as_tick_raw};
let api_key = std::env::var("KITE_API_KEY").unwrap();
let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap();
let mut mgr = KiteTickerManagerBuilder::new(api_key, access_token)
  .raw_only(true)
  .build();
mgr.start().await?;
mgr.subscribe_symbols(&[256265], Some(Mode::Full)).await?;
for (id, mut rx) in mgr.get_all_raw_frame_channels() {
  tokio::spawn(async move {
    while let Ok(frame) = rx.recv().await {
      if frame.len() < 2 { continue; }
      let mut off = 2usize;
      let num = u16::from_be_bytes([frame[0], frame[1]]) as usize;
      for _ in 0..num {
        if off + 2 > frame.len() { break; }
        let len = u16::from_be_bytes([frame[off], frame[off+1]]) as usize;
        let body = frame.slice(off+2..off+2+len);
        if len == 184 {
          if let Some(view) = as_tick_raw(&body) {
            let token = view.header.instrument_token.get();
            let _ = (id, token);
          }
        }
        off += 2 + len;
      }
    }
  });
}

Safety model: all raw structs derive Unaligned and use big_endian wrappers for integer fields. as_* helpers return Option<zerocopy::Ref<&[u8], T>> which validates size and alignment. No unsafe is required.

§Examples

See the examples directory for:

  • Basic Examples - Simple usage patterns
  • Advanced Examples - Complex multi-connection scenarios
  • Performance Examples - Optimization and benchmarking

§Documentation

Re-exports§

pub use manager::ChannelId;
pub use manager::HealthSummary;
pub use manager::KiteManagerConfig;
pub use manager::KiteTickerManager;
pub use manager::KiteTickerManagerBuilder;
pub use manager::ManagerStats;
pub use ticker::KiteTickerAsync;
pub use ticker::KiteTickerSubscriber;
pub use ticker::KiteTickerRawSubscriber184;

Modules§

manager
ticker

Structs§

Depth
Market depth packet structure
DepthItem
Structure for each market depth entry
DepthItemRaw
A single depth entry: qty(u32), price_be([u8; 4] i32), orders(u16), pad(u16)
DepthRaw
5 buy + 5 sell entries = 120 bytes
IndexQuoteRaw32
32-byte Index Quote packet (token + LTP + HLOC + price_change + exch ts)
InstHeaderRaw64
64-byte instrument header (equity/derivative) without depth
OHLC
OHLC packet structure
Order
ParseTickError
Errors that can occur while parsing tick data
Request
Websocket request structure
TextMessage
Postback and non-binary message structure
Tick
Quote packet structure
TickHeaderRaw
First 64 bytes of Full payload contain header/meta before market depth.
TickMessage
Parsed quote packet
TickRaw
Complete 184-byte Full packet body

Enums§

Exchange
Exchange options
Mode
Modes in which packets are streamed
OrderStatus
OrderTransactionType
OrderValidity
TickerMessage
Parsed message from websocket

Constants§

INDEX_QUOTE_SIZE
Size of index quote packet body (common snapshot when market closed)
INST_HEADER_SIZE
Size of instrument header (non-index) without depth
TICK_FULL_SIZE
Size of a full quote packet body used by parser Tick (not including the 2-byte length prefix). Our raw view targets the 184-byte payload region per packet for Mode::Full on equities.

Functions§

as_index_quote_32
Try view as IndexQuoteRaw32 from a 32-byte slice. Returns None if the length is not 32 bytes.
as_inst_header_64
Try view as InstHeaderRaw64 from a 64-byte slice. Returns None if the length is not 64 bytes.
as_tick_raw
Try view as TickRaw from a slice (zero-copy, unaligned-safe).
tick_as_184
Try get a fixed array reference of 184 bytes from a slice (for APIs that prefer arrays)