Struct KiteTickerSubscriber

Source
pub struct KiteTickerSubscriber { /* private fields */ }
Expand description

The Websocket client that entered in a pub/sub mode once the client subscribed to a list of instruments

Implementationsยง

Sourceยง

impl KiteTickerSubscriber

Source

pub fn get_subscribed(&self) -> Vec<u32>

Get the list of subscribed instruments

Source

pub async fn subscribe( &mut self, tokens: &[u32], mode: Option<Mode>, ) -> Result<(), String>

Subscribe to new tokens

Source

pub async fn set_mode( &mut self, instrument_tokens: &[u32], mode: Mode, ) -> Result<(), String>

Change the mode of the subscribed instrument tokens

Source

pub async fn unsubscribe( &mut self, instrument_tokens: &[u32], ) -> Result<(), String>

Unsubscribe provided subscribed tokens, if input is empty then all subscribed tokens will unsubscribed

Tokens in the input which are not part of the subscribed tokens will be ignored.

Source

pub async fn next_message(&mut self) -> Result<Option<TickerMessage>, String>

Get the next message from the server, waiting if necessary. If the result is None then server is terminated

Examples found in repository?
examples/sample.rs (line 15)
4pub async fn main() -> Result<(), String> {
5  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
6  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
7  let ticker = KiteTickerAsync::connect(&api_key, &access_token).await?;
8
9  let token = 408065;
10  // subscribe to an instrument
11  let mut subscriber = ticker.subscribe(&[token], Some(Mode::Full)).await?;
12
13  // await quotes
14  loop {
15    if let Some(msg) = subscriber.next_message().await? {
16      match msg {
17        TickerMessage::Ticks(ticks) => {
18          let tick = ticks.first().unwrap();
19          println!(
20            "Received tick for instrument_token {}, {:?}",
21            tick.instrument_token, tick
22          );
23          break;
24        }
25        _ => {
26          println!("Received message from broker {:?}", msg);
27          continue;
28        }
29      }
30    }
31  }
32
33  Ok(())
34}
More examples
Hide additional examples
examples/basic/single_connection.rs (line 52)
7async fn main() -> Result<(), String> {
8  // Initialize logging
9  env_logger::init();
10
11  println!("๐Ÿ”Œ Single Connection Example");
12  println!("============================");
13
14  // Get credentials from environment
15  let api_key = std::env::var("KITE_API_KEY")
16    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
17  let access_token = std::env::var("KITE_ACCESS_TOKEN")
18    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
19
20  println!("๐Ÿ“ก Connecting to Kite WebSocket...");
21
22  // Establish WebSocket connection
23  let ticker = KiteTickerAsync::connect(&api_key, &access_token)
24    .await
25    .map_err(|e| format!("Failed to connect: {}", e))?;
26
27  println!("โœ… Connected successfully!");
28
29  // Subscribe to a few symbols
30  let symbols = vec![
31    256265, // NIFTY 50
32    408065, // HDFC Bank
33    738561, // Reliance
34  ];
35
36  println!("๐Ÿ“Š Subscribing to {} symbols with LTP mode", symbols.len());
37  println!("Symbols: {:?}", symbols);
38
39  let mut subscriber = ticker
40    .subscribe(&symbols, Some(Mode::LTP))
41    .await
42    .map_err(|e| format!("Failed to subscribe: {}", e))?;
43
44  println!("โœ… Subscription successful!");
45  println!("๐Ÿ“ˆ Receiving live market data...\n");
46
47  // Receive and process messages
48  let mut message_count = 0;
49  let start_time = std::time::Instant::now();
50
51  loop {
52    match subscriber.next_message().await {
53      Ok(Some(message)) => {
54        match message {
55          TickerMessage::Ticks(ticks) => {
56            message_count += 1;
57
58            for tick in ticks {
59              if let Some(price) = tick.content.last_price {
60                println!("๐Ÿ“ˆ Symbol {}: โ‚น{:.2}", tick.instrument_token, price);
61              }
62            }
63
64            // Show statistics every 10 messages
65            if message_count % 10 == 0 {
66              let elapsed = start_time.elapsed();
67              let rate = message_count as f64 / elapsed.as_secs_f64();
68              println!(
69                "๐Ÿ“Š Stats: {} messages in {:?} ({:.1} msg/sec)\n",
70                message_count, elapsed, rate
71              );
72            }
73
74            // Exit after 50 messages for demo
75            if message_count >= 50 {
76              println!(
77                "๐Ÿ Demo completed! Received {} messages",
78                message_count
79              );
80              break;
81            }
82          }
83          TickerMessage::Message(message) => {
84            println!("๐Ÿ“œ Broker message: {}", message);
85          }
86          TickerMessage::Error(error) => {
87            println!("โŒ Error: {}", error);
88          }
89          TickerMessage::OrderPostback(order_result) => match order_result {
90            Ok(order) => println!("๐Ÿ“‹ Order update: {:?}", order),
91            Err(err) => println!("โŒ Order error: {}", err),
92          },
93          TickerMessage::ClosingMessage(close_msg) => {
94            println!("๐Ÿ”Œ Connection closing: {}", close_msg);
95          }
96        }
97      }
98      Ok(None) => {
99        println!("๐Ÿ”Œ Connection closed by server");
100        break;
101      }
102      Err(e) => {
103        println!("โŒ Error receiving message: {}", e);
104        break;
105      }
106    }
107  }
108
109  // Close the connection
110  println!("๐Ÿ›‘ Closing connection...");
111  subscriber
112    .close()
113    .await
114    .map_err(|e| format!("Failed to close: {}", e))?;
115  println!("โœ… Connection closed successfully");
116
117  Ok(())
118}
examples/performance/performance_demo.rs (line 64)
6pub async fn main() -> Result<(), String> {
7  println!("๐Ÿš€ KiteTicker WebSocket Performance Demo");
8
9  // This example demonstrates the performance improvements made to the WebSocket client
10
11  let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
12  let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
13
14  if api_key.is_empty() || access_token.is_empty() {
15    println!(
16      "โš ๏ธ  KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
17    );
18    println!("   This demo will show the architectural improvements without a live connection");
19    demonstrate_offline_improvements().await;
20    return Ok(());
21  }
22
23  println!("๐Ÿ“ก Connecting to Kite WebSocket...");
24  let start_time = Instant::now();
25
26  let ticker = match timeout(
27    Duration::from_secs(10),
28    KiteTickerAsync::connect(&api_key, &access_token),
29  )
30  .await
31  {
32    Ok(Ok(ticker)) => {
33      println!("โœ… Connected in {:?}", start_time.elapsed());
34      ticker
35    }
36    Ok(Err(e)) => {
37      println!("โŒ Connection failed: {}", e);
38      return Err(e);
39    }
40    Err(_) => {
41      println!("โฑ๏ธ  Connection timeout");
42      return Err("Connection timeout".to_string());
43    }
44  };
45
46  // Test multiple instruments for high-frequency data
47  let instruments = vec![408065, 5633, 738561, 81153]; // Example NSE instruments
48  println!(
49    "๐Ÿ“Š Subscribing to {} instruments in Full mode",
50    instruments.len()
51  );
52
53  let mut subscriber = ticker.subscribe(&instruments, Some(Mode::Full)).await?;
54
55  // Performance metrics
56  let mut message_count = 0;
57  let mut tick_count = 0;
58  let start_time = Instant::now();
59  let mut last_report = Instant::now();
60
61  println!("๐Ÿ“ˆ Monitoring performance (Ctrl+C to stop)...");
62
63  loop {
64    match timeout(Duration::from_secs(30), subscriber.next_message()).await {
65      Ok(Ok(Some(msg))) => {
66        message_count += 1;
67
68        match msg {
69          TickerMessage::Ticks(ticks) => {
70            tick_count += ticks.len();
71
72            // Show first few ticks for demonstration
73            if message_count <= 5 {
74              for tick in &ticks {
75                println!(
76                  "๐Ÿ“‹ Tick: {} @ {:?}",
77                  tick.instrument_token,
78                  tick.content.last_price.unwrap_or(0.0)
79                );
80              }
81            }
82          }
83          TickerMessage::Error(e) => {
84            println!("โš ๏ธ  Error: {}", e);
85          }
86          _ => {
87            println!("๐Ÿ“จ Other message: {:?}", msg);
88          }
89        }
90
91        // Report performance every 10 seconds
92        if last_report.elapsed() >= Duration::from_secs(10) {
93          let elapsed = start_time.elapsed();
94          let messages_per_sec = message_count as f64 / elapsed.as_secs_f64();
95          let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
96
97          println!("๐Ÿ“Š Performance Report:");
98          println!(
99            "   Messages: {} ({:.1}/sec)",
100            message_count, messages_per_sec
101          );
102          println!("   Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
103          println!("   Memory efficient processing โœ…");
104          println!("   Bounds checking enabled โœ…");
105          println!("   Error resilience โœ…");
106
107          last_report = Instant::now();
108        }
109      }
110      Ok(Ok(None)) => {
111        println!("๐Ÿ”Œ Connection closed");
112        break;
113      }
114      Ok(Err(e)) => {
115        println!("โŒ Message error: {}", e);
116        break;
117      }
118      Err(_) => {
119        println!("โฑ๏ธ  No messages received in 30 seconds");
120        println!("๐Ÿ’“ Connection monitoring (health check not accessible in subscriber)");
121      }
122    }
123  }
124
125  println!("๐Ÿ Demo completed. Final stats:");
126  let elapsed = start_time.elapsed();
127  println!("   Total runtime: {:?}", elapsed);
128  println!("   Messages processed: {}", message_count);
129  println!("   Ticks processed: {}", tick_count);
130
131  Ok(())
132}
Source

pub async fn close(&mut self) -> Result<(), String>

Examples found in repository?
examples/basic/single_connection.rs (line 112)
7async fn main() -> Result<(), String> {
8  // Initialize logging
9  env_logger::init();
10
11  println!("๐Ÿ”Œ Single Connection Example");
12  println!("============================");
13
14  // Get credentials from environment
15  let api_key = std::env::var("KITE_API_KEY")
16    .map_err(|_| "Please set KITE_API_KEY environment variable")?;
17  let access_token = std::env::var("KITE_ACCESS_TOKEN")
18    .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
19
20  println!("๐Ÿ“ก Connecting to Kite WebSocket...");
21
22  // Establish WebSocket connection
23  let ticker = KiteTickerAsync::connect(&api_key, &access_token)
24    .await
25    .map_err(|e| format!("Failed to connect: {}", e))?;
26
27  println!("โœ… Connected successfully!");
28
29  // Subscribe to a few symbols
30  let symbols = vec![
31    256265, // NIFTY 50
32    408065, // HDFC Bank
33    738561, // Reliance
34  ];
35
36  println!("๐Ÿ“Š Subscribing to {} symbols with LTP mode", symbols.len());
37  println!("Symbols: {:?}", symbols);
38
39  let mut subscriber = ticker
40    .subscribe(&symbols, Some(Mode::LTP))
41    .await
42    .map_err(|e| format!("Failed to subscribe: {}", e))?;
43
44  println!("โœ… Subscription successful!");
45  println!("๐Ÿ“ˆ Receiving live market data...\n");
46
47  // Receive and process messages
48  let mut message_count = 0;
49  let start_time = std::time::Instant::now();
50
51  loop {
52    match subscriber.next_message().await {
53      Ok(Some(message)) => {
54        match message {
55          TickerMessage::Ticks(ticks) => {
56            message_count += 1;
57
58            for tick in ticks {
59              if let Some(price) = tick.content.last_price {
60                println!("๐Ÿ“ˆ Symbol {}: โ‚น{:.2}", tick.instrument_token, price);
61              }
62            }
63
64            // Show statistics every 10 messages
65            if message_count % 10 == 0 {
66              let elapsed = start_time.elapsed();
67              let rate = message_count as f64 / elapsed.as_secs_f64();
68              println!(
69                "๐Ÿ“Š Stats: {} messages in {:?} ({:.1} msg/sec)\n",
70                message_count, elapsed, rate
71              );
72            }
73
74            // Exit after 50 messages for demo
75            if message_count >= 50 {
76              println!(
77                "๐Ÿ Demo completed! Received {} messages",
78                message_count
79              );
80              break;
81            }
82          }
83          TickerMessage::Message(message) => {
84            println!("๐Ÿ“œ Broker message: {}", message);
85          }
86          TickerMessage::Error(error) => {
87            println!("โŒ Error: {}", error);
88          }
89          TickerMessage::OrderPostback(order_result) => match order_result {
90            Ok(order) => println!("๐Ÿ“‹ Order update: {:?}", order),
91            Err(err) => println!("โŒ Order error: {}", err),
92          },
93          TickerMessage::ClosingMessage(close_msg) => {
94            println!("๐Ÿ”Œ Connection closing: {}", close_msg);
95          }
96        }
97      }
98      Ok(None) => {
99        println!("๐Ÿ”Œ Connection closed by server");
100        break;
101      }
102      Err(e) => {
103        println!("โŒ Error receiving message: {}", e);
104        break;
105      }
106    }
107  }
108
109  // Close the connection
110  println!("๐Ÿ›‘ Closing connection...");
111  subscriber
112    .close()
113    .await
114    .map_err(|e| format!("Failed to close: {}", e))?;
115  println!("โœ… Connection closed successfully");
116
117  Ok(())
118}

Trait Implementationsยง

Sourceยง

impl Debug for KiteTickerSubscriber

Sourceยง

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementationsยง

Blanket Implementationsยง

Sourceยง

impl<T> Any for T
where T: 'static + ?Sized,

Sourceยง

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Sourceยง

impl<T> Borrow<T> for T
where T: ?Sized,

Sourceยง

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Sourceยง

impl<T> BorrowMut<T> for T
where T: ?Sized,

Sourceยง

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Sourceยง

impl<T> From<T> for T

Sourceยง

fn from(t: T) -> T

Returns the argument unchanged.

Sourceยง

impl<T, U> Into<U> for T
where U: From<T>,

Sourceยง

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Sourceยง

impl<T> Same for T

Sourceยง

type Output = T

Should always be Self
Sourceยง

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Sourceยง

type Error = Infallible

The type returned in the event of a conversion error.
Sourceยง

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Sourceยง

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Sourceยง

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Sourceยง

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Sourceยง

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Sourceยง

fn vzip(self) -> V

Sourceยง

impl<T> ErasedDestructor for T
where T: 'static,