pub struct KiteTickerAsync { /* private fields */ }Expand description
The WebSocket client for connecting to Kite Connectβs streaming quotes service.
ImplementationsΒ§
SourceΒ§impl KiteTickerAsync
impl KiteTickerAsync
Sourcepub async fn connect(api_key: &str, access_token: &str) -> Result<Self, String>
pub async fn connect(api_key: &str, access_token: &str) -> Result<Self, String>
Establish a connection with the Kite WebSocket server
Examples found in repository?
examples/sample.rs (line 7)
4pub async fn main() -> Result<(), String> {
5 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
6 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
7 let ticker = KiteTickerAsync::connect(&api_key, &access_token).await?;
8
9 let token = 408065;
10 // subscribe to an instrument
11 let mut subscriber = ticker.subscribe(&[token], Some(Mode::Full)).await?;
12
13 // await quotes
14 loop {
15 if let Some(msg) = subscriber.next_message().await? {
16 match msg {
17 TickerMessage::Ticks(ticks) => {
18 let tick = ticks.first().unwrap();
19 println!(
20 "Received tick for instrument_token {}, {:?}",
21 tick.instrument_token, tick
22 );
23 break;
24 }
25 _ => {
26 println!("Received message from broker {:?}", msg);
27 continue;
28 }
29 }
30 }
31 }
32
33 Ok(())
34}More examples
examples/basic/single_connection.rs (line 23)
7async fn main() -> Result<(), String> {
8 // Initialize logging
9 env_logger::init();
10
11 println!("π Single Connection Example");
12 println!("============================");
13
14 // Get credentials from environment
15 let api_key = std::env::var("KITE_API_KEY")
16 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
17 let access_token = std::env::var("KITE_ACCESS_TOKEN")
18 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
19
20 println!("π‘ Connecting to Kite WebSocket...");
21
22 // Establish WebSocket connection
23 let ticker = KiteTickerAsync::connect(&api_key, &access_token)
24 .await
25 .map_err(|e| format!("Failed to connect: {}", e))?;
26
27 println!("β
Connected successfully!");
28
29 // Subscribe to a few symbols
30 let symbols = vec![
31 256265, // NIFTY 50
32 408065, // HDFC Bank
33 738561, // Reliance
34 ];
35
36 println!("π Subscribing to {} symbols with LTP mode", symbols.len());
37 println!("Symbols: {:?}", symbols);
38
39 let mut subscriber = ticker
40 .subscribe(&symbols, Some(Mode::LTP))
41 .await
42 .map_err(|e| format!("Failed to subscribe: {}", e))?;
43
44 println!("β
Subscription successful!");
45 println!("π Receiving live market data...\n");
46
47 // Receive and process messages
48 let mut message_count = 0;
49 let start_time = std::time::Instant::now();
50
51 loop {
52 match subscriber.next_message().await {
53 Ok(Some(message)) => {
54 match message {
55 TickerMessage::Ticks(ticks) => {
56 message_count += 1;
57
58 for tick in ticks {
59 if let Some(price) = tick.content.last_price {
60 println!("π Symbol {}: βΉ{:.2}", tick.instrument_token, price);
61 }
62 }
63
64 // Show statistics every 10 messages
65 if message_count % 10 == 0 {
66 let elapsed = start_time.elapsed();
67 let rate = message_count as f64 / elapsed.as_secs_f64();
68 println!(
69 "π Stats: {} messages in {:?} ({:.1} msg/sec)\n",
70 message_count, elapsed, rate
71 );
72 }
73
74 // Exit after 50 messages for demo
75 if message_count >= 50 {
76 println!(
77 "π Demo completed! Received {} messages",
78 message_count
79 );
80 break;
81 }
82 }
83 TickerMessage::Message(message) => {
84 println!("π Broker message: {}", message);
85 }
86 TickerMessage::Error(error) => {
87 println!("β Error: {}", error);
88 }
89 TickerMessage::OrderPostback(order_result) => match order_result {
90 Ok(order) => println!("π Order update: {:?}", order),
91 Err(err) => println!("β Order error: {}", err),
92 },
93 TickerMessage::ClosingMessage(close_msg) => {
94 println!("π Connection closing: {}", close_msg);
95 }
96 }
97 }
98 Ok(None) => {
99 println!("π Connection closed by server");
100 break;
101 }
102 Err(e) => {
103 println!("β Error receiving message: {}", e);
104 break;
105 }
106 }
107 }
108
109 // Close the connection
110 println!("π Closing connection...");
111 subscriber
112 .close()
113 .await
114 .map_err(|e| format!("Failed to close: {}", e))?;
115 println!("β
Connection closed successfully");
116
117 Ok(())
118}examples/performance/performance_demo.rs (line 28)
6pub async fn main() -> Result<(), String> {
7 println!("π KiteTicker WebSocket Performance Demo");
8
9 // This example demonstrates the performance improvements made to the WebSocket client
10
11 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
12 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
13
14 if api_key.is_empty() || access_token.is_empty() {
15 println!(
16 "β οΈ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
17 );
18 println!(" This demo will show the architectural improvements without a live connection");
19 demonstrate_offline_improvements().await;
20 return Ok(());
21 }
22
23 println!("π‘ Connecting to Kite WebSocket...");
24 let start_time = Instant::now();
25
26 let ticker = match timeout(
27 Duration::from_secs(10),
28 KiteTickerAsync::connect(&api_key, &access_token),
29 )
30 .await
31 {
32 Ok(Ok(ticker)) => {
33 println!("β
Connected in {:?}", start_time.elapsed());
34 ticker
35 }
36 Ok(Err(e)) => {
37 println!("β Connection failed: {}", e);
38 return Err(e);
39 }
40 Err(_) => {
41 println!("β±οΈ Connection timeout");
42 return Err("Connection timeout".to_string());
43 }
44 };
45
46 // Test multiple instruments for high-frequency data
47 let instruments = vec![408065, 5633, 738561, 81153]; // Example NSE instruments
48 println!(
49 "π Subscribing to {} instruments in Full mode",
50 instruments.len()
51 );
52
53 let mut subscriber = ticker.subscribe(&instruments, Some(Mode::Full)).await?;
54
55 // Performance metrics
56 let mut message_count = 0;
57 let mut tick_count = 0;
58 let start_time = Instant::now();
59 let mut last_report = Instant::now();
60
61 println!("π Monitoring performance (Ctrl+C to stop)...");
62
63 loop {
64 match timeout(Duration::from_secs(30), subscriber.next_message()).await {
65 Ok(Ok(Some(msg))) => {
66 message_count += 1;
67
68 match msg {
69 TickerMessage::Ticks(ticks) => {
70 tick_count += ticks.len();
71
72 // Show first few ticks for demonstration
73 if message_count <= 5 {
74 for tick in &ticks {
75 println!(
76 "π Tick: {} @ {:?}",
77 tick.instrument_token,
78 tick.content.last_price.unwrap_or(0.0)
79 );
80 }
81 }
82 }
83 TickerMessage::Error(e) => {
84 println!("β οΈ Error: {}", e);
85 }
86 _ => {
87 println!("π¨ Other message: {:?}", msg);
88 }
89 }
90
91 // Report performance every 10 seconds
92 if last_report.elapsed() >= Duration::from_secs(10) {
93 let elapsed = start_time.elapsed();
94 let messages_per_sec = message_count as f64 / elapsed.as_secs_f64();
95 let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
96
97 println!("π Performance Report:");
98 println!(
99 " Messages: {} ({:.1}/sec)",
100 message_count, messages_per_sec
101 );
102 println!(" Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
103 println!(" Memory efficient processing β
");
104 println!(" Bounds checking enabled β
");
105 println!(" Error resilience β
");
106
107 last_report = Instant::now();
108 }
109 }
110 Ok(Ok(None)) => {
111 println!("π Connection closed");
112 break;
113 }
114 Ok(Err(e)) => {
115 println!("β Message error: {}", e);
116 break;
117 }
118 Err(_) => {
119 println!("β±οΈ No messages received in 30 seconds");
120 println!("π Connection monitoring (health check not accessible in subscriber)");
121 }
122 }
123 }
124
125 println!("π Demo completed. Final stats:");
126 let elapsed = start_time.elapsed();
127 println!(" Total runtime: {:?}", elapsed);
128 println!(" Messages processed: {}", message_count);
129 println!(" Ticks processed: {}", tick_count);
130
131 Ok(())
132}Sourcepub async fn subscribe(
self,
instrument_tokens: &[u32],
mode: Option<Mode>,
) -> Result<KiteTickerSubscriber, String>
pub async fn subscribe( self, instrument_tokens: &[u32], mode: Option<Mode>, ) -> Result<KiteTickerSubscriber, String>
Subscribes the client to a list of instruments
Examples found in repository?
examples/sample.rs (line 11)
4pub async fn main() -> Result<(), String> {
5 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
6 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
7 let ticker = KiteTickerAsync::connect(&api_key, &access_token).await?;
8
9 let token = 408065;
10 // subscribe to an instrument
11 let mut subscriber = ticker.subscribe(&[token], Some(Mode::Full)).await?;
12
13 // await quotes
14 loop {
15 if let Some(msg) = subscriber.next_message().await? {
16 match msg {
17 TickerMessage::Ticks(ticks) => {
18 let tick = ticks.first().unwrap();
19 println!(
20 "Received tick for instrument_token {}, {:?}",
21 tick.instrument_token, tick
22 );
23 break;
24 }
25 _ => {
26 println!("Received message from broker {:?}", msg);
27 continue;
28 }
29 }
30 }
31 }
32
33 Ok(())
34}More examples
examples/basic/single_connection.rs (line 40)
7async fn main() -> Result<(), String> {
8 // Initialize logging
9 env_logger::init();
10
11 println!("π Single Connection Example");
12 println!("============================");
13
14 // Get credentials from environment
15 let api_key = std::env::var("KITE_API_KEY")
16 .map_err(|_| "Please set KITE_API_KEY environment variable")?;
17 let access_token = std::env::var("KITE_ACCESS_TOKEN")
18 .map_err(|_| "Please set KITE_ACCESS_TOKEN environment variable")?;
19
20 println!("π‘ Connecting to Kite WebSocket...");
21
22 // Establish WebSocket connection
23 let ticker = KiteTickerAsync::connect(&api_key, &access_token)
24 .await
25 .map_err(|e| format!("Failed to connect: {}", e))?;
26
27 println!("β
Connected successfully!");
28
29 // Subscribe to a few symbols
30 let symbols = vec![
31 256265, // NIFTY 50
32 408065, // HDFC Bank
33 738561, // Reliance
34 ];
35
36 println!("π Subscribing to {} symbols with LTP mode", symbols.len());
37 println!("Symbols: {:?}", symbols);
38
39 let mut subscriber = ticker
40 .subscribe(&symbols, Some(Mode::LTP))
41 .await
42 .map_err(|e| format!("Failed to subscribe: {}", e))?;
43
44 println!("β
Subscription successful!");
45 println!("π Receiving live market data...\n");
46
47 // Receive and process messages
48 let mut message_count = 0;
49 let start_time = std::time::Instant::now();
50
51 loop {
52 match subscriber.next_message().await {
53 Ok(Some(message)) => {
54 match message {
55 TickerMessage::Ticks(ticks) => {
56 message_count += 1;
57
58 for tick in ticks {
59 if let Some(price) = tick.content.last_price {
60 println!("π Symbol {}: βΉ{:.2}", tick.instrument_token, price);
61 }
62 }
63
64 // Show statistics every 10 messages
65 if message_count % 10 == 0 {
66 let elapsed = start_time.elapsed();
67 let rate = message_count as f64 / elapsed.as_secs_f64();
68 println!(
69 "π Stats: {} messages in {:?} ({:.1} msg/sec)\n",
70 message_count, elapsed, rate
71 );
72 }
73
74 // Exit after 50 messages for demo
75 if message_count >= 50 {
76 println!(
77 "π Demo completed! Received {} messages",
78 message_count
79 );
80 break;
81 }
82 }
83 TickerMessage::Message(message) => {
84 println!("π Broker message: {}", message);
85 }
86 TickerMessage::Error(error) => {
87 println!("β Error: {}", error);
88 }
89 TickerMessage::OrderPostback(order_result) => match order_result {
90 Ok(order) => println!("π Order update: {:?}", order),
91 Err(err) => println!("β Order error: {}", err),
92 },
93 TickerMessage::ClosingMessage(close_msg) => {
94 println!("π Connection closing: {}", close_msg);
95 }
96 }
97 }
98 Ok(None) => {
99 println!("π Connection closed by server");
100 break;
101 }
102 Err(e) => {
103 println!("β Error receiving message: {}", e);
104 break;
105 }
106 }
107 }
108
109 // Close the connection
110 println!("π Closing connection...");
111 subscriber
112 .close()
113 .await
114 .map_err(|e| format!("Failed to close: {}", e))?;
115 println!("β
Connection closed successfully");
116
117 Ok(())
118}examples/performance/performance_demo.rs (line 53)
6pub async fn main() -> Result<(), String> {
7 println!("π KiteTicker WebSocket Performance Demo");
8
9 // This example demonstrates the performance improvements made to the WebSocket client
10
11 let api_key = std::env::var("KITE_API_KEY").unwrap_or_default();
12 let access_token = std::env::var("KITE_ACCESS_TOKEN").unwrap_or_default();
13
14 if api_key.is_empty() || access_token.is_empty() {
15 println!(
16 "β οΈ KITE_API_KEY and KITE_ACCESS_TOKEN environment variables not set"
17 );
18 println!(" This demo will show the architectural improvements without a live connection");
19 demonstrate_offline_improvements().await;
20 return Ok(());
21 }
22
23 println!("π‘ Connecting to Kite WebSocket...");
24 let start_time = Instant::now();
25
26 let ticker = match timeout(
27 Duration::from_secs(10),
28 KiteTickerAsync::connect(&api_key, &access_token),
29 )
30 .await
31 {
32 Ok(Ok(ticker)) => {
33 println!("β
Connected in {:?}", start_time.elapsed());
34 ticker
35 }
36 Ok(Err(e)) => {
37 println!("β Connection failed: {}", e);
38 return Err(e);
39 }
40 Err(_) => {
41 println!("β±οΈ Connection timeout");
42 return Err("Connection timeout".to_string());
43 }
44 };
45
46 // Test multiple instruments for high-frequency data
47 let instruments = vec![408065, 5633, 738561, 81153]; // Example NSE instruments
48 println!(
49 "π Subscribing to {} instruments in Full mode",
50 instruments.len()
51 );
52
53 let mut subscriber = ticker.subscribe(&instruments, Some(Mode::Full)).await?;
54
55 // Performance metrics
56 let mut message_count = 0;
57 let mut tick_count = 0;
58 let start_time = Instant::now();
59 let mut last_report = Instant::now();
60
61 println!("π Monitoring performance (Ctrl+C to stop)...");
62
63 loop {
64 match timeout(Duration::from_secs(30), subscriber.next_message()).await {
65 Ok(Ok(Some(msg))) => {
66 message_count += 1;
67
68 match msg {
69 TickerMessage::Ticks(ticks) => {
70 tick_count += ticks.len();
71
72 // Show first few ticks for demonstration
73 if message_count <= 5 {
74 for tick in &ticks {
75 println!(
76 "π Tick: {} @ {:?}",
77 tick.instrument_token,
78 tick.content.last_price.unwrap_or(0.0)
79 );
80 }
81 }
82 }
83 TickerMessage::Error(e) => {
84 println!("β οΈ Error: {}", e);
85 }
86 _ => {
87 println!("π¨ Other message: {:?}", msg);
88 }
89 }
90
91 // Report performance every 10 seconds
92 if last_report.elapsed() >= Duration::from_secs(10) {
93 let elapsed = start_time.elapsed();
94 let messages_per_sec = message_count as f64 / elapsed.as_secs_f64();
95 let ticks_per_sec = tick_count as f64 / elapsed.as_secs_f64();
96
97 println!("π Performance Report:");
98 println!(
99 " Messages: {} ({:.1}/sec)",
100 message_count, messages_per_sec
101 );
102 println!(" Ticks: {} ({:.1}/sec)", tick_count, ticks_per_sec);
103 println!(" Memory efficient processing β
");
104 println!(" Bounds checking enabled β
");
105 println!(" Error resilience β
");
106
107 last_report = Instant::now();
108 }
109 }
110 Ok(Ok(None)) => {
111 println!("π Connection closed");
112 break;
113 }
114 Ok(Err(e)) => {
115 println!("β Message error: {}", e);
116 break;
117 }
118 Err(_) => {
119 println!("β±οΈ No messages received in 30 seconds");
120 println!("π Connection monitoring (health check not accessible in subscriber)");
121 }
122 }
123 }
124
125 println!("π Demo completed. Final stats:");
126 let elapsed = start_time.elapsed();
127 println!(" Total runtime: {:?}", elapsed);
128 println!(" Messages processed: {}", message_count);
129 println!(" Ticks processed: {}", tick_count);
130
131 Ok(())
132}Sourcepub fn is_connected(&self) -> bool
pub fn is_connected(&self) -> bool
Check if the connection is still alive
Sourcepub fn receiver_count(&self) -> usize
pub fn receiver_count(&self) -> usize
Get the current broadcast channel receiver count
Sourcepub fn channel_capacity(&self) -> usize
pub fn channel_capacity(&self) -> usize
Get the current broadcast channel capacity
Trait ImplementationsΒ§
Auto Trait ImplementationsΒ§
impl Freeze for KiteTickerAsync
impl !RefUnwindSafe for KiteTickerAsync
impl Send for KiteTickerAsync
impl Sync for KiteTickerAsync
impl Unpin for KiteTickerAsync
impl !UnwindSafe for KiteTickerAsync
Blanket ImplementationsΒ§
SourceΒ§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
SourceΒ§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more