Struct KiteTickerAsync

Source
pub struct KiteTickerAsync { /* private fields */ }
Expand description

The WebSocket client for connecting to Kite Connect’s streaming quotes service.

ImplementationsΒ§

SourceΒ§

impl KiteTickerAsync

Source

pub async fn connect(api_key: &str, access_token: &str) -> Result<Self, String>

Establish a connection with the Kite WebSocket server

Examples found in repository?
examples/sample.rs (line 7)
4pub async fn main() -> Result<(), String> {
5  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
6  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
7  let ticker = KiteTickerAsync::connect(&api_key, &access_token).await?;
8
9  let token = 408065;
10  // subscribe to an instrument
11  let mut subscriber = ticker.subscribe(&[token], Some(Mode::Full)).await?;
12
13  // await quotes
14  loop {
15    if let Some(msg) = subscriber.next_message().await? {
16      match msg {
17        TickerMessage::Ticks(ticks) => {
18          let tick = ticks.first().unwrap();
19          println!(
20            "Received tick for instrument_token {}, {:?}",
21            tick.instrument_token, tick
22          );
23          break;
24        }
25        _ => {
26          println!("Received message from broker {:?}", msg);
27          continue;
28        }
29      }
30    }
31  }
32
33  Ok(())
34}
More examples
Hide additional examples
examples/basic/single_connection.rs (line 23)
7async fn main() -> Result<(), String> {
8  // Initialize logging
9  env_logger::init();
10
11  println!("πŸ”Œ Single Connection Example");
12  println!("============================");
13
14  // Get credentials from environment
15  let api_key = std::env::var("KITE_API_KEY")
16    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
17  let access_token = std::env::var("KITE_ACCESS_TOKEN")
18    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
19
20  println!("πŸ“‘ Connecting to Kite WebSocket...");
21
22  // Establish WebSocket connection
23  let ticker = KiteTickerAsync::connect(&api_key, &access_token)
24    .await
25    .map_err(|e| format!("Failed to connect: {}", e))?;
26
27  println!("βœ… Connected successfully!");
28
29  // Subscribe to a few symbols
30  let symbols = vec![
31    256265, // NIFTY 50
32    408065, // HDFC Bank
33    738561, // Reliance
34  ];
35
36  println!("πŸ“Š Subscribing to {} symbols with LTP mode", symbols.len());
37  println!("Symbols: {:?}", symbols);
38
39  let mut subscriber = ticker
40    .subscribe(&symbols, Some(Mode::LTP))
41    .await
42    .map_err(|e| format!("Failed to subscribe: {}", e))?;
43
44  println!("βœ… Subscription successful!");
45  println!("πŸ“ˆ Receiving live market data...\n");
46
47  // Receive and process messages
48  let mut message_count = 0;
49  let start_time = std::time::Instant::now();
50
51  loop {
52    match subscriber.next_message().await {
53      Ok(Some(message)) => {
54        match message {
55          TickerMessage::Ticks(ticks) => {
56            message_count += 1;
57
58            for tick in ticks {
59              if let Some(price) = tick.content.last_price {
60                println!("πŸ“ˆ Symbol {}: β‚Ή{:.2}", tick.instrument_token, price);
61              }
62            }
63
64            // Show statistics every 10 messages
65            if message_count % 10 == 0 {
66              let elapsed = start_time.elapsed();
67              let rate = message_count as f64 / elapsed.as_secs_f64();
68              println!(
69                "πŸ“Š Stats: {} messages in {:?} ({:.1} msg/sec)\n",
70                message_count, elapsed, rate
71              );
72            }
73
74            // Exit after 50 messages for demo
75            if message_count >= 50 {
76              println!(
77                "🏁 Demo completed! Received {} messages",
78                message_count
79              );
80              break;
81            }
82          }
83          TickerMessage::Message(message) => {
84            println!("πŸ“œ Broker message: {}", message);
85          }
86          TickerMessage::Error(error) => {
87            println!("❌ Error: {}", error);
88          }
89          TickerMessage::OrderPostback(order_result) => match order_result {
90            Ok(order) => println!("πŸ“‹ Order update: {:?}", order),
91            Err(err) => println!("❌ Order error: {}", err),
92          },
93          TickerMessage::ClosingMessage(close_msg) => {
94            println!("πŸ”Œ Connection closing: {}", close_msg);
95          }
96        }
97      }
98      Ok(None) => {
99        println!("πŸ”Œ Connection closed by server");
100        break;
101      }
102      Err(e) => {
103        println!("❌ Error receiving message: {}", e);
104        break;
105      }
106    }
107  }
108
109  // Close the connection
110  println!("πŸ›‘ Closing connection...");
111  subscriber
112    .close()
113    .await
114    .map_err(|e| format!("Failed to close: {}", e))?;
115  println!("βœ… Connection closed successfully");
116
117  Ok(())
118}
examples/performance/performance_demo.rs (line 28)
6pub async fn main() -> Result<(), String> {
7  println!("πŸš€ KiteTicker WebSocket Performance Demo");
8
9  // This example demonstrates the performance improvements made to the WebSocket client
10
11  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
12  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
13
14  if api_key.is_empty() || access_token.is_empty() {
15    println!(
16      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
17    );
18    println!("   This demo will show the architectural improvements without a live connection");
19    demonstrate_offline_improvements().await;
20    return Ok(());
21  }
22
23  println!("πŸ“‘ Connecting to Kite WebSocket...");
24  let start_time = Instant::now();
25
26  let ticker = match timeout(
27    Duration::from_secs(10),
28    KiteTickerAsync::connect(&api_key, &access_token),
29  )
30  .await
31  {
32    Ok(Ok(ticker)) => {
33      println!("βœ… Connected in {:?}", start_time.elapsed());
34      ticker
35    }
36    Ok(Err(e)) => {
37      println!("❌ Connection failed: {}", e);
38      return Err(e);
39    }
40    Err(_) => {
41      println!("⏱️  Connection timeout");
42      return Err("Connection timeout".to_string());
43    }
44  };
45
46  // Test multiple instruments for high-frequency data
47  let instruments = vec![408065, 5633, 738561, 81153]; // Example NSE instruments
48  println!(
49    "πŸ“Š Subscribing to {} instruments in Full mode",
50    instruments.len()
51  );
52
53  let mut subscriber = ticker.subscribe(&instruments, Some(Mode::Full)).await?;
54
55  // Performance metrics
56  let mut message_count = 0;
57  let mut tick_count = 0;
58  let start_time = Instant::now();
59  let mut last_report = Instant::now();
60
61  println!("πŸ“ˆ Monitoring performance (Ctrl+C to stop)...");
62
63  loop {
64    match timeout(Duration::from_secs(30), subscriber.next_message()).await {
65      Ok(Ok(Some(msg))) => {
66        message_count += 1;
67
68        match msg {
69          TickerMessage::Ticks(ticks) => {
70            tick_count += ticks.len();
71
72            // Show first few ticks for demonstration
73            if message_count <= 5 {
74              for tick in &ticks {
75                println!(
76                  "πŸ“‹ Tick: {} @ {:?}",
77                  tick.instrument_token,
78                  tick.content.last_price.unwrap_or(0.0)
79                );
80              }
81            }
82          }
83          TickerMessage::Error(e) => {
84            println!("⚠️  Error: {}", e);
85          }
86          _ => {
87            println!("πŸ“¨ Other message: {:?}", msg);
88          }
89        }
90
91        // Report performance every 10 seconds
92        if last_report.elapsed() >= Duration::from_secs(10) {
93          let elapsed = start_time.elapsed();
94          let messages_per_sec = message_count as f64 / elapsed.as_secs_f64();
95          let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
96
97          println!("πŸ“Š Performance Report:");
98          println!(
99            "   Messages: {} ({:.1}/sec)",
100            message_count, messages_per_sec
101          );
102          println!("   Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
103          println!("   Memory efficient processing βœ…");
104          println!("   Bounds checking enabled βœ…");
105          println!("   Error resilience βœ…");
106
107          last_report = Instant::now();
108        }
109      }
110      Ok(Ok(None)) => {
111        println!("πŸ”Œ Connection closed");
112        break;
113      }
114      Ok(Err(e)) => {
115        println!("❌ Message error: {}", e);
116        break;
117      }
118      Err(_) => {
119        println!("⏱️  No messages received in 30 seconds");
120        println!("πŸ’“ Connection monitoring (health check not accessible in subscriber)");
121      }
122    }
123  }
124
125  println!("🏁 Demo completed. Final stats:");
126  let elapsed = start_time.elapsed();
127  println!("   Total runtime: {:?}", elapsed);
128  println!("   Messages processed: {}", message_count);
129  println!("   Ticks processed: {}", tick_count);
130
131  Ok(())
132}
Source

pub async fn subscribe( self, instrument_tokens: &[u32], mode: Option<Mode>, ) -> Result<KiteTickerSubscriber, String>

Subscribes the client to a list of instruments

Examples found in repository?
examples/sample.rs (line 11)
4pub async fn main() -> Result<(), String> {
5  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
6  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
7  let ticker = KiteTickerAsync::connect(&api_key, &access_token).await?;
8
9  let token = 408065;
10  // subscribe to an instrument
11  let mut subscriber = ticker.subscribe(&[token], Some(Mode::Full)).await?;
12
13  // await quotes
14  loop {
15    if let Some(msg) = subscriber.next_message().await? {
16      match msg {
17        TickerMessage::Ticks(ticks) => {
18          let tick = ticks.first().unwrap();
19          println!(
20            "Received tick for instrument_token {}, {:?}",
21            tick.instrument_token, tick
22          );
23          break;
24        }
25        _ => {
26          println!("Received message from broker {:?}", msg);
27          continue;
28        }
29      }
30    }
31  }
32
33  Ok(())
34}
More examples
Hide additional examples
examples/basic/single_connection.rs (line 40)
7async fn main() -> Result<(), String> {
8  // Initialize logging
9  env_logger::init();
10
11  println!("πŸ”Œ Single Connection Example");
12  println!("============================");
13
14  // Get credentials from environment
15  let api_key = std::env::var("KITE_API_KEY")
16    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
17  let access_token = std::env::var("KITE_ACCESS_TOKEN")
18    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
19
20  println!("πŸ“‘ Connecting to Kite WebSocket...");
21
22  // Establish WebSocket connection
23  let ticker = KiteTickerAsync::connect(&api_key, &access_token)
24    .await
25    .map_err(|e| format!("Failed to connect: {}", e))?;
26
27  println!("βœ… Connected successfully!");
28
29  // Subscribe to a few symbols
30  let symbols = vec![
31    256265, // NIFTY 50
32    408065, // HDFC Bank
33    738561, // Reliance
34  ];
35
36  println!("πŸ“Š Subscribing to {} symbols with LTP mode", symbols.len());
37  println!("Symbols: {:?}", symbols);
38
39  let mut subscriber = ticker
40    .subscribe(&symbols, Some(Mode::LTP))
41    .await
42    .map_err(|e| format!("Failed to subscribe: {}", e))?;
43
44  println!("βœ… Subscription successful!");
45  println!("πŸ“ˆ Receiving live market data...\n");
46
47  // Receive and process messages
48  let mut message_count = 0;
49  let start_time = std::time::Instant::now();
50
51  loop {
52    match subscriber.next_message().await {
53      Ok(Some(message)) => {
54        match message {
55          TickerMessage::Ticks(ticks) => {
56            message_count += 1;
57
58            for tick in ticks {
59              if let Some(price) = tick.content.last_price {
60                println!("πŸ“ˆ Symbol {}: β‚Ή{:.2}", tick.instrument_token, price);
61              }
62            }
63
64            // Show statistics every 10 messages
65            if message_count % 10 == 0 {
66              let elapsed = start_time.elapsed();
67              let rate = message_count as f64 / elapsed.as_secs_f64();
68              println!(
69                "πŸ“Š Stats: {} messages in {:?} ({:.1} msg/sec)\n",
70                message_count, elapsed, rate
71              );
72            }
73
74            // Exit after 50 messages for demo
75            if message_count >= 50 {
76              println!(
77                "🏁 Demo completed! Received {} messages",
78                message_count
79              );
80              break;
81            }
82          }
83          TickerMessage::Message(message) => {
84            println!("πŸ“œ Broker message: {}", message);
85          }
86          TickerMessage::Error(error) => {
87            println!("❌ Error: {}", error);
88          }
89          TickerMessage::OrderPostback(order_result) => match order_result {
90            Ok(order) => println!("πŸ“‹ Order update: {:?}", order),
91            Err(err) => println!("❌ Order error: {}", err),
92          },
93          TickerMessage::ClosingMessage(close_msg) => {
94            println!("πŸ”Œ Connection closing: {}", close_msg);
95          }
96        }
97      }
98      Ok(None) => {
99        println!("πŸ”Œ Connection closed by server");
100        break;
101      }
102      Err(e) => {
103        println!("❌ Error receiving message: {}", e);
104        break;
105      }
106    }
107  }
108
109  // Close the connection
110  println!("πŸ›‘ Closing connection...");
111  subscriber
112    .close()
113    .await
114    .map_err(|e| format!("Failed to close: {}", e))?;
115  println!("βœ… Connection closed successfully");
116
117  Ok(())
118}
examples/performance/performance_demo.rs (line 53)
6pub async fn main() -> Result<(), String> {
7  println!("πŸš€ KiteTicker WebSocket Performance Demo");
8
9  // This example demonstrates the performance improvements made to the WebSocket client
10
11  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
12  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
13
14  if api_key.is_empty() || access_token.is_empty() {
15    println!(
16      "⚠️  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
17    );
18    println!("   This demo will show the architectural improvements without a live connection");
19    demonstrate_offline_improvements().await;
20    return Ok(());
21  }
22
23  println!("πŸ“‘ Connecting to Kite WebSocket...");
24  let start_time = Instant::now();
25
26  let ticker = match timeout(
27    Duration::from_secs(10),
28    KiteTickerAsync::connect(&api_key, &access_token),
29  )
30  .await
31  {
32    Ok(Ok(ticker)) => {
33      println!("βœ… Connected in {:?}", start_time.elapsed());
34      ticker
35    }
36    Ok(Err(e)) => {
37      println!("❌ Connection failed: {}", e);
38      return Err(e);
39    }
40    Err(_) => {
41      println!("⏱️  Connection timeout");
42      return Err("Connection timeout".to_string());
43    }
44  };
45
46  // Test multiple instruments for high-frequency data
47  let instruments = vec![408065, 5633, 738561, 81153]; // Example NSE instruments
48  println!(
49    "πŸ“Š Subscribing to {} instruments in Full mode",
50    instruments.len()
51  );
52
53  let mut subscriber = ticker.subscribe(&instruments, Some(Mode::Full)).await?;
54
55  // Performance metrics
56  let mut message_count = 0;
57  let mut tick_count = 0;
58  let start_time = Instant::now();
59  let mut last_report = Instant::now();
60
61  println!("πŸ“ˆ Monitoring performance (Ctrl+C to stop)...");
62
63  loop {
64    match timeout(Duration::from_secs(30), subscriber.next_message()).await {
65      Ok(Ok(Some(msg))) => {
66        message_count += 1;
67
68        match msg {
69          TickerMessage::Ticks(ticks) => {
70            tick_count += ticks.len();
71
72            // Show first few ticks for demonstration
73            if message_count <= 5 {
74              for tick in &ticks {
75                println!(
76                  "πŸ“‹ Tick: {} @ {:?}",
77                  tick.instrument_token,
78                  tick.content.last_price.unwrap_or(0.0)
79                );
80              }
81            }
82          }
83          TickerMessage::Error(e) => {
84            println!("⚠️  Error: {}", e);
85          }
86          _ => {
87            println!("πŸ“¨ Other message: {:?}", msg);
88          }
89        }
90
91        // Report performance every 10 seconds
92        if last_report.elapsed() >= Duration::from_secs(10) {
93          let elapsed = start_time.elapsed();
94          let messages_per_sec = message_count as f64 / elapsed.as_secs_f64();
95          let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
96
97          println!("πŸ“Š Performance Report:");
98          println!(
99            "   Messages: {} ({:.1}/sec)",
100            message_count, messages_per_sec
101          );
102          println!("   Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
103          println!("   Memory efficient processing βœ…");
104          println!("   Bounds checking enabled βœ…");
105          println!("   Error resilience βœ…");
106
107          last_report = Instant::now();
108        }
109      }
110      Ok(Ok(None)) => {
111        println!("πŸ”Œ Connection closed");
112        break;
113      }
114      Ok(Err(e)) => {
115        println!("❌ Message error: {}", e);
116        break;
117      }
118      Err(_) => {
119        println!("⏱️  No messages received in 30 seconds");
120        println!("πŸ’“ Connection monitoring (health check not accessible in subscriber)");
121      }
122    }
123  }
124
125  println!("🏁 Demo completed. Final stats:");
126  let elapsed = start_time.elapsed();
127  println!("   Total runtime: {:?}", elapsed);
128  println!("   Messages processed: {}", message_count);
129  println!("   Ticks processed: {}", tick_count);
130
131  Ok(())
132}
Source

pub async fn close(&mut self) -> Result<(), String>

Close the websocket connection

Source

pub fn is_connected(&self) -> bool

Check if the connection is still alive

Source

pub async fn ping(&mut self) -> Result<(), String>

Send a ping to keep the connection alive

Source

pub fn receiver_count(&self) -> usize

Get the current broadcast channel receiver count

Source

pub fn channel_capacity(&self) -> usize

Get the current broadcast channel capacity

Trait ImplementationsΒ§

SourceΒ§

impl Debug for KiteTickerAsync

SourceΒ§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait ImplementationsΒ§

Blanket ImplementationsΒ§

SourceΒ§

impl<T> Any for T
where T: 'static + ?Sized,

SourceΒ§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
SourceΒ§

impl<T> Borrow<T> for T
where T: ?Sized,

SourceΒ§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
SourceΒ§

impl<T> BorrowMut<T> for T
where T: ?Sized,

SourceΒ§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
SourceΒ§

impl<T> From<T> for T

SourceΒ§

fn from(t: T) -> T

Returns the argument unchanged.

SourceΒ§

impl<T, U> Into<U> for T
where U: From<T>,

SourceΒ§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

SourceΒ§

impl<T> Same for T

SourceΒ§

type Output = T

Should always be Self
SourceΒ§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

SourceΒ§

type Error = Infallible

The type returned in the event of a conversion error.
SourceΒ§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
SourceΒ§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

SourceΒ§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
SourceΒ§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
SourceΒ§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

SourceΒ§

fn vzip(self) -> V

SourceΒ§

impl<T> ErasedDestructor for T
where T: 'static,