pub struct KiteTickerSubscriber { /* private fields */ }Expand description
The Websocket client that entered in a pub/sub mode once the client subscribed to a list of instruments
Implementationsยง
Sourceยงimpl KiteTickerSubscriber
impl KiteTickerSubscriber
Sourcepub fn get_subscribed(&self) -> Vec<u32>
pub fn get_subscribed(&self) -> Vec<u32>
Get the list of subscribed instruments
Sourcepub async fn subscribe(
&mut self,
tokens: &[u32],
mode: Option<Mode>,
) -> Result<(), String>
pub async fn subscribe( &mut self, tokens: &[u32], mode: Option<Mode>, ) -> Result<(), String>
Subscribe to new tokens
Sourcepub async fn set_mode(
&mut self,
instrument_tokens: &[u32],
mode: Mode,
) -> Result<(), String>
pub async fn set_mode( &mut self, instrument_tokens: &[u32], mode: Mode, ) -> Result<(), String>
Change the mode of the subscribed instrument tokens
Sourcepub async fn unsubscribe(
&mut self,
instrument_tokens: &[u32],
) -> Result<(), String>
pub async fn unsubscribe( &mut self, instrument_tokens: &[u32], ) -> Result<(), String>
Unsubscribe provided subscribed tokens, if input is empty then all subscribed tokens will unsubscribed
Tokens in the input which are not part of the subscribed tokens will be ignored.
Sourcepub async fn next_message(&mut self) -> Result<Option<TickerMessage>, String>
pub async fn next_message(&mut self) -> Result<Option<TickerMessage>, String>
Get the next message from the server, waiting if necessary. If the result is None then server is terminated
Examples found in repository?
examples/sample.rs (line 15)
4pub async fn main() -> Result<(), String> {
5 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
6 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
7 let ticker = KiteTickerAsync::connect(&api_key, &access_token).await?;
8
9 let token = 408065;
10 // subscribe to an instrument
11 let mut subscriber = ticker.subscribe(&[token], Some(Mode::Full)).await?;
12
13 // await quotes
14 loop {
15 if let Some(msg) = subscriber.next_message().await? {
16 match msg {
17 TickerMessage::Ticks(ticks) => {
18 let tick = ticks.first().unwrap();
19 println!(
20 "Received tick for instrument_token {}, {:?}",
21 tick.instrument_token, tick
22 );
23 break;
24 }
25 _ => {
26 println!("Received message from broker {:?}", msg);
27 continue;
28 }
29 }
30 }
31 }
32
33 Ok(())
34}More examples
examples/basic/single_connection.rs (line 52)
7async fn main() -> Result<(), String> {
8 // Initialize logging
9 env_logger::init();
10
11 println!("๐ Single Connection Example");
12 println!("============================");
13
14 // Get credentials from environment
15 let api_key = std::env::var("KITE_API_KEY")
16 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
17 let access_token = std::env::var("KITE_ACCESS_TOKEN")
18 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
19
20 println!("๐ก Connecting to Kite WebSocket...");
21
22 // Establish WebSocket connection
23 let ticker = KiteTickerAsync::connect(&api_key, &access_token)
24 .await
25 .map_err(|e| format!("Failed to connect: {}", e))?;
26
27 println!("โ
Connected successfully!");
28
29 // Subscribe to a few symbols
30 let symbols = vec![
31 256265, // NIFTY 50
32 408065, // HDFC Bank
33 738561, // Reliance
34 ];
35
36 println!("๐ Subscribing to {} symbols with LTP mode", symbols.len());
37 println!("Symbols: {:?}", symbols);
38
39 let mut subscriber = ticker
40 .subscribe(&symbols, Some(Mode::LTP))
41 .await
42 .map_err(|e| format!("Failed to subscribe: {}", e))?;
43
44 println!("โ
Subscription successful!");
45 println!("๐ Receiving live market data...\n");
46
47 // Receive and process messages
48 let mut message_count = 0;
49 let start_time = std::time::Instant::now();
50
51 loop {
52 match subscriber.next_message().await {
53 Ok(Some(message)) => {
54 match message {
55 TickerMessage::Ticks(ticks) => {
56 message_count += 1;
57
58 for tick in ticks {
59 if let Some(price) = tick.content.last_price {
60 println!("๐ Symbol {}: โน{:.2}", tick.instrument_token, price);
61 }
62 }
63
64 // Show statistics every 10 messages
65 if message_count % 10 == 0 {
66 let elapsed = start_time.elapsed();
67 let rate = message_count as f64 / elapsed.as_secs_f64();
68 println!(
69 "๐ Stats: {} messages in {:?} ({:.1} msg/sec)\n",
70 message_count, elapsed, rate
71 );
72 }
73
74 // Exit after 50 messages for demo
75 if message_count >= 50 {
76 println!(
77 "๐ Demo completed! Received {} messages",
78 message_count
79 );
80 break;
81 }
82 }
83 TickerMessage::Message(message) => {
84 println!("๐ Broker message: {}", message);
85 }
86 TickerMessage::Error(error) => {
87 println!("โ Error: {}", error);
88 }
89 TickerMessage::OrderPostback(order_result) => match order_result {
90 Ok(order) => println!("๐ Order update: {:?}", order),
91 Err(err) => println!("โ Order error: {}", err),
92 },
93 TickerMessage::ClosingMessage(close_msg) => {
94 println!("๐ Connection closing: {}", close_msg);
95 }
96 }
97 }
98 Ok(None) => {
99 println!("๐ Connection closed by server");
100 break;
101 }
102 Err(e) => {
103 println!("โ Error receiving message: {}", e);
104 break;
105 }
106 }
107 }
108
109 // Close the connection
110 println!("๐ Closing connection...");
111 subscriber
112 .close()
113 .await
114 .map_err(|e| format!("Failed to close: {}", e))?;
115 println!("โ
Connection closed successfully");
116
117 Ok(())
118}examples/performance/performance_demo.rs (line 64)
6pub async fn main() -> Result<(), String> {
7 println!("๐ KiteTicker WebSocket Performance Demo");
8
9 // This example demonstrates the performance improvements made to the WebSocket client
10
11 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
12 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
13
14 if api_key.is_empty() || access_token.is_empty() {
15 println!(
16 "โ ๏ธ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
17 );
18 println!(" This demo will show the architectural improvements without a live connection");
19 demonstrate_offline_improvements().await;
20 return Ok(());
21 }
22
23 println!("๐ก Connecting to Kite WebSocket...");
24 let start_time = Instant::now();
25
26 let ticker = match timeout(
27 Duration::from_secs(10),
28 KiteTickerAsync::connect(&api_key, &access_token),
29 )
30 .await
31 {
32 Ok(Ok(ticker)) => {
33 println!("โ
Connected in {:?}", start_time.elapsed());
34 ticker
35 }
36 Ok(Err(e)) => {
37 println!("โ Connection failed: {}", e);
38 return Err(e);
39 }
40 Err(_) => {
41 println!("โฑ๏ธ Connection timeout");
42 return Err("Connection timeout".to_string());
43 }
44 };
45
46 // Test multiple instruments for high-frequency data
47 let instruments = vec![408065, 5633, 738561, 81153]; // Example NSE instruments
48 println!(
49 "๐ Subscribing to {} instruments in Full mode",
50 instruments.len()
51 );
52
53 let mut subscriber = ticker.subscribe(&instruments, Some(Mode::Full)).await?;
54
55 // Performance metrics
56 let mut message_count = 0;
57 let mut tick_count = 0;
58 let start_time = Instant::now();
59 let mut last_report = Instant::now();
60
61 println!("๐ Monitoring performance (Ctrl+C to stop)...");
62
63 loop {
64 match timeout(Duration::from_secs(30), subscriber.next_message()).await {
65 Ok(Ok(Some(msg))) => {
66 message_count += 1;
67
68 match msg {
69 TickerMessage::Ticks(ticks) => {
70 tick_count += ticks.len();
71
72 // Show first few ticks for demonstration
73 if message_count <= 5 {
74 for tick in &ticks {
75 println!(
76 "๐ Tick: {} @ {:?}",
77 tick.instrument_token,
78 tick.content.last_price.unwrap_or(0.0)
79 );
80 }
81 }
82 }
83 TickerMessage::Error(e) => {
84 println!("โ ๏ธ Error: {}", e);
85 }
86 _ => {
87 println!("๐จ Other message: {:?}", msg);
88 }
89 }
90
91 // Report performance every 10 seconds
92 if last_report.elapsed() >= Duration::from_secs(10) {
93 let elapsed = start_time.elapsed();
94 let messages_per_sec = message_count as f64 / elapsed.as_secs_f64();
95 let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
96
97 println!("๐ Performance Report:");
98 println!(
99 " Messages: {} ({:.1}/sec)",
100 message_count, messages_per_sec
101 );
102 println!(" Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
103 println!(" Memory efficient processing โ
");
104 println!(" Bounds checking enabled โ
");
105 println!(" Error resilience โ
");
106
107 last_report = Instant::now();
108 }
109 }
110 Ok(Ok(None)) => {
111 println!("๐ Connection closed");
112 break;
113 }
114 Ok(Err(e)) => {
115 println!("โ Message error: {}", e);
116 break;
117 }
118 Err(_) => {
119 println!("โฑ๏ธ No messages received in 30 seconds");
120 println!("๐ Connection monitoring (health check not accessible in subscriber)");
121 }
122 }
123 }
124
125 println!("๐ Demo completed. Final stats:");
126 let elapsed = start_time.elapsed();
127 println!(" Total runtime: {:?}", elapsed);
128 println!(" Messages processed: {}", message_count);
129 println!(" Ticks processed: {}", tick_count);
130
131 Ok(())
132}Sourcepub async fn close(&mut self) -> Result<(), String>
pub async fn close(&mut self) -> Result<(), String>
Examples found in repository?
examples/basic/single_connection.rs (line 112)
7async fn main() -> Result<(), String> {
8 // Initialize logging
9 env_logger::init();
10
11 println!("๐ Single Connection Example");
12 println!("============================");
13
14 // Get credentials from environment
15 let api_key = std::env::var("KITE_API_KEY")
16 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
17 let access_token = std::env::var("KITE_ACCESS_TOKEN")
18 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
19
20 println!("๐ก Connecting to Kite WebSocket...");
21
22 // Establish WebSocket connection
23 let ticker = KiteTickerAsync::connect(&api_key, &access_token)
24 .await
25 .map_err(|e| format!("Failed to connect: {}", e))?;
26
27 println!("โ
Connected successfully!");
28
29 // Subscribe to a few symbols
30 let symbols = vec![
31 256265, // NIFTY 50
32 408065, // HDFC Bank
33 738561, // Reliance
34 ];
35
36 println!("๐ Subscribing to {} symbols with LTP mode", symbols.len());
37 println!("Symbols: {:?}", symbols);
38
39 let mut subscriber = ticker
40 .subscribe(&symbols, Some(Mode::LTP))
41 .await
42 .map_err(|e| format!("Failed to subscribe: {}", e))?;
43
44 println!("โ
Subscription successful!");
45 println!("๐ Receiving live market data...\n");
46
47 // Receive and process messages
48 let mut message_count = 0;
49 let start_time = std::time::Instant::now();
50
51 loop {
52 match subscriber.next_message().await {
53 Ok(Some(message)) => {
54 match message {
55 TickerMessage::Ticks(ticks) => {
56 message_count += 1;
57
58 for tick in ticks {
59 if let Some(price) = tick.content.last_price {
60 println!("๐ Symbol {}: โน{:.2}", tick.instrument_token, price);
61 }
62 }
63
64 // Show statistics every 10 messages
65 if message_count % 10 == 0 {
66 let elapsed = start_time.elapsed();
67 let rate = message_count as f64 / elapsed.as_secs_f64();
68 println!(
69 "๐ Stats: {} messages in {:?} ({:.1} msg/sec)\n",
70 message_count, elapsed, rate
71 );
72 }
73
74 // Exit after 50 messages for demo
75 if message_count >= 50 {
76 println!(
77 "๐ Demo completed! Received {} messages",
78 message_count
79 );
80 break;
81 }
82 }
83 TickerMessage::Message(message) => {
84 println!("๐ Broker message: {}", message);
85 }
86 TickerMessage::Error(error) => {
87 println!("โ Error: {}", error);
88 }
89 TickerMessage::OrderPostback(order_result) => match order_result {
90 Ok(order) => println!("๐ Order update: {:?}", order),
91 Err(err) => println!("โ Order error: {}", err),
92 },
93 TickerMessage::ClosingMessage(close_msg) => {
94 println!("๐ Connection closing: {}", close_msg);
95 }
96 }
97 }
98 Ok(None) => {
99 println!("๐ Connection closed by server");
100 break;
101 }
102 Err(e) => {
103 println!("โ Error receiving message: {}", e);
104 break;
105 }
106 }
107 }
108
109 // Close the connection
110 println!("๐ Closing connection...");
111 subscriber
112 .close()
113 .await
114 .map_err(|e| format!("Failed to close: {}", e))?;
115 println!("โ
Connection closed successfully");
116
117 Ok(())
118}Trait Implementationsยง
Auto Trait Implementationsยง
impl Freeze for KiteTickerSubscriber
impl !RefUnwindSafe for KiteTickerSubscriber
impl Send for KiteTickerSubscriber
impl Sync for KiteTickerSubscriber
impl Unpin for KiteTickerSubscriber
impl !UnwindSafe for KiteTickerSubscriber
Blanket Implementationsยง
Sourceยงimpl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Sourceยงfn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more