kiteticker_async_manager/
ticker.rs1use crate::models::{
2 Mode, Request, TextMessage, Tick, TickMessage, TickerMessage,
3};
4use crate::parser::packet_length;
5use bytes::Bytes;
6use futures_util::{SinkExt, StreamExt};
7use serde_json::json;
8use smallvec::SmallVec;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{broadcast, mpsc};
12use tokio::task::JoinHandle;
13use tokio_tungstenite::{connect_async, tungstenite::Message};
14
15const PARSE_CHANNEL_CAP: usize = 4096;
17
18#[derive(Debug)]
19pub struct KiteTickerAsync {
23 #[allow(dead_code)]
24 api_key: String,
25 #[allow(dead_code)]
26 access_token: String,
27 cmd_tx: Option<mpsc::UnboundedSender<Message>>,
28 msg_tx: broadcast::Sender<TickerMessage>,
29 raw_tx: broadcast::Sender<Bytes>, #[allow(dead_code)]
31 raw_only: bool, writer_handle: Option<JoinHandle<()>>,
33 reader_handle: Option<JoinHandle<()>>,
34 parser_handle: Option<JoinHandle<()>>,
35}
36
37impl KiteTickerAsync {
38 pub async fn connect(
40 api_key: &str,
41 access_token: &str,
42 ) -> Result<Self, String> {
43 Self::connect_with_options(api_key, access_token, false).await
44 }
45
46 pub async fn connect_with_options(
48 api_key: &str,
49 access_token: &str,
50 raw_only: bool,
51 ) -> Result<Self, String> {
52 let mut url = url::Url::parse("wss://ws.kite.trade")
54 .map_err(|e| format!("Invalid base URL: {}", e))?;
55 {
56 let mut qp = url.query_pairs_mut();
57 qp.append_pair("api_key", api_key);
58 qp.append_pair("access_token", access_token);
59 }
60 let (ws_stream, _resp) =
62 connect_async(url.as_str()).await.map_err(|e| match e {
63 tokio_tungstenite::tungstenite::Error::Http(response) => {
64 let status = response.status();
66 let reason = status.canonical_reason().unwrap_or("");
67 format!(
68 "HTTP error during WebSocket handshake: {} {}",
69 status, reason
70 )
71 }
72 other => other.to_string(),
73 })?;
74
75 let (write_half, mut read_half) = ws_stream.split();
76
77 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Message>();
78 let (msg_tx, _) = broadcast::channel(1000);
80 let (raw_tx, _) = broadcast::channel(1000);
81 let mut write = write_half;
82 let writer_handle = tokio::spawn(async move {
83 while let Some(msg) = cmd_rx.recv().await {
84 if write.send(msg).await.is_err() {
85 break;
86 }
87 }
88 });
89
90 let (parse_tx, mut parse_rx) = mpsc::channel::<Message>(PARSE_CHANNEL_CAP);
93
94 let msg_sender_for_reader = msg_tx.clone();
96 let reader_handle = tokio::spawn(async move {
97 while let Some(message) = read_half.next().await {
98 match message {
99 Ok(msg) => {
100 match parse_tx.try_send(msg) {
102 Ok(_) => {}
103 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
104 log::warn!(
105 "Reader: parse channel full, dropping incoming frame"
106 );
107 }
109 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
110 break;
112 }
113 }
114 }
115 Err(e) => {
116 let error_msg =
118 TickerMessage::Error(format!("WebSocket error: {}", e));
119 let _ = msg_sender_for_reader.send(error_msg);
120 if matches!(
121 e,
122 tokio_tungstenite::tungstenite::Error::ConnectionClosed
123 | tokio_tungstenite::tungstenite::Error::AlreadyClosed
124 ) {
125 break;
126 }
127 }
128 }
129 }
130 });
131
132 let msg_sender = msg_tx.clone();
134 let raw_sender = raw_tx.clone();
135 let parser_handle = tokio::spawn(async move {
136 let raw_only_mode = raw_only; while let Some(msg) = parse_rx.recv().await {
138 if let Some(processed) =
139 process_message(msg, &raw_sender, raw_only_mode)
140 {
141 let _ = msg_sender.send(processed);
142 }
143 }
144 });
145
146 Ok(KiteTickerAsync {
147 api_key: api_key.to_string(),
148 access_token: access_token.to_string(),
149 cmd_tx: Some(cmd_tx),
150 msg_tx,
151 raw_tx,
152 raw_only,
153 writer_handle: Some(writer_handle),
154 reader_handle: Some(reader_handle),
155 parser_handle: Some(parser_handle),
156 })
157 }
158
159 pub async fn subscribe(
161 &mut self,
162 instrument_tokens: &[u32],
163 mode: Option<Mode>,
164 ) -> Result<KiteTickerSubscriber, String> {
165 self.subscribe_cmd(instrument_tokens, mode.as_ref()).await?;
166 let default_mode = mode.unwrap_or_default();
167 let st = instrument_tokens
168 .iter()
169 .map(|&t| (t, default_mode))
170 .collect();
171
172 let rx = self.msg_tx.subscribe();
173 Ok(KiteTickerSubscriber {
174 subscribed_tokens: st,
175 rx,
176 cmd_tx: self.cmd_tx.clone().map(Arc::new),
177 })
178 }
179
180 pub async fn close(&mut self) -> Result<(), String> {
182 if let Some(tx) = self.cmd_tx.take() {
183 let _ = tx.send(Message::Close(None));
184 }
185 if let Some(handle) = self.writer_handle.take() {
186 handle.await.map_err(|e| e.to_string())?;
187 }
188 if let Some(handle) = self.reader_handle.take() {
189 handle.await.map_err(|e| e.to_string())?;
190 }
191 if let Some(handle) = self.parser_handle.take() {
192 handle.await.map_err(|e| e.to_string())?;
193 }
194 Ok(())
195 }
196
197 async fn subscribe_cmd(
198 &mut self,
199 instrument_tokens: &[u32],
200 mode: Option<&Mode>,
201 ) -> Result<(), String> {
202 let mode_value = mode.cloned().unwrap_or_default();
203 let msgs = vec![
204 Message::Text(Request::subscribe(instrument_tokens).to_string().into()),
205 Message::Text(
206 Request::mode(mode_value, instrument_tokens)
207 .to_string()
208 .into(),
209 ),
210 ];
211
212 for msg in msgs {
213 if let Some(tx) = &self.cmd_tx {
214 tx.send(msg).map_err(|e| e.to_string())?;
215 }
216 }
217
218 Ok(())
219 }
220
221 pub fn is_connected(&self) -> bool {
225 self.cmd_tx.is_some()
226 && self
227 .writer_handle
228 .as_ref()
229 .is_some_and(|h| !h.is_finished())
230 && self
231 .reader_handle
232 .as_ref()
233 .is_some_and(|h| !h.is_finished())
234 }
235
236 pub async fn ping(&mut self) -> Result<(), String> {
238 if let Some(tx) = &self.cmd_tx {
239 tx.send(Message::Ping(bytes::Bytes::new()))
240 .map_err(|e| e.to_string())?;
241 Ok(())
242 } else {
243 Err("Connection is closed".to_string())
244 }
245 }
246
247 pub fn receiver_count(&self) -> usize {
249 self.msg_tx.receiver_count()
250 }
251
252 pub fn channel_capacity(&self) -> usize {
254 1000 }
258
259 pub fn subscribe_raw_frames(&self) -> broadcast::Receiver<Bytes> {
266 self.raw_tx.subscribe()
267 }
268
269 #[deprecated(
271 note = "use subscribe_raw_frames() instead; now returns bytes::Bytes"
272 )]
273 pub fn subscribe_raw(&self) -> broadcast::Receiver<Bytes> {
274 self.subscribe_raw_frames()
275 }
276
277 pub fn subscribe_full_raw(&self) -> KiteTickerRawSubscriber184 {
286 KiteTickerRawSubscriber184 {
287 rx: self.raw_tx.subscribe(),
288 last_payload: None,
289 }
290 }
291
292 pub fn command_sender(&self) -> Option<mpsc::UnboundedSender<Message>> {
294 self.cmd_tx.clone()
295 }
296}
297
298#[derive(Debug)]
299pub struct KiteTickerSubscriber {
303 subscribed_tokens: HashMap<u32, Mode>,
305 rx: broadcast::Receiver<TickerMessage>,
306 cmd_tx: Option<Arc<mpsc::UnboundedSender<Message>>>,
307}
308
309impl KiteTickerSubscriber {
310 pub fn get_subscribed(&self) -> Vec<u32> {
312 self
313 .subscribed_tokens
314 .clone()
315 .into_keys()
316 .collect::<Vec<_>>()
317 }
318
319 fn get_subscribed_or(&self, tokens: &[u32]) -> Vec<u32> {
322 if tokens.is_empty() {
323 self.get_subscribed()
324 } else {
325 tokens
326 .iter()
327 .filter(|t| self.subscribed_tokens.contains_key(t))
328 .copied()
329 .collect::<Vec<_>>()
330 }
331 }
332
333 pub async fn subscribe(
335 &mut self,
336 tokens: &[u32],
337 mode: Option<Mode>,
338 ) -> Result<(), String> {
339 let default_mode = mode.unwrap_or_default();
341 let mut new_tokens: Vec<u32> = Vec::new();
342 for &t in tokens {
343 if let std::collections::hash_map::Entry::Vacant(e) =
344 self.subscribed_tokens.entry(t)
345 {
346 e.insert(default_mode);
347 new_tokens.push(t);
348 }
349 }
350 if new_tokens.is_empty() {
351 return Ok(());
352 }
353 if let Some(tx) = &self.cmd_tx {
354 let _ = tx.send(Message::Text(
356 Request::subscribe(&new_tokens).to_string().into(),
357 ));
358 if mode.is_some() {
359 let _ = tx.send(Message::Text(
360 Request::mode(default_mode, &new_tokens).to_string().into(),
361 ));
362 }
363 }
364 Ok(())
365 }
366
367 pub async fn set_mode(
369 &mut self,
370 instrument_tokens: &[u32],
371 mode: Mode,
372 ) -> Result<(), String> {
373 let tokens = self.get_subscribed_or(instrument_tokens);
374 if tokens.is_empty() {
375 return Ok(());
376 }
377 if let Some(tx) = &self.cmd_tx {
378 let _ = tx.send(Message::Text(
379 Request::mode(mode, &tokens).to_string().into(),
380 ));
381 }
382 Ok(())
383 }
384
385 pub async fn unsubscribe(
389 &mut self,
390 instrument_tokens: &[u32],
391 ) -> Result<(), String> {
392 let tokens = self.get_subscribed_or(instrument_tokens);
393 if tokens.is_empty() {
394 return Ok(());
395 }
396 if let Some(tx) = &self.cmd_tx {
397 let _ = tx.send(Message::Text(
398 Request::unsubscribe(&tokens).to_string().into(),
399 ));
400 }
401 self.subscribed_tokens.retain(|k, _| !tokens.contains(k));
402 Ok(())
403 }
404
405 pub async fn next_message(
408 &mut self,
409 ) -> Result<Option<TickerMessage>, String> {
410 match self.rx.recv().await {
411 Ok(msg) => Ok(Some(msg)),
412 Err(broadcast::error::RecvError::Closed) => Ok(None),
413 Err(e) => Err(e.to_string()),
414 }
415 }
416
417 pub async fn close(&mut self) -> Result<(), String> {
418 Ok(())
419 }
420}
421
422fn process_message(
423 message: Message,
424 raw_sender: &broadcast::Sender<Bytes>,
425 raw_only: bool,
426) -> Option<TickerMessage> {
427 match message {
428 Message::Text(text_message) => {
429 process_text_message(text_message.to_string())
430 }
431 Message::Binary(binary_message) => {
432 let bytes = binary_message;
434 let slice: &[u8] = &bytes;
435 let _ = raw_sender.send(bytes.clone());
437 if raw_only {
438 return None;
441 }
442 if slice.len() < 2 {
444 None
445 } else {
446 process_binary(slice)
447 }
448 }
449 Message::Close(closing_message) => closing_message.map(|c| {
450 TickerMessage::ClosingMessage(json!({
451 "code": c.code.to_string(),
452 "reason": c.reason.to_string()
453 }))
454 }),
455 Message::Ping(_) => None,
456 Message::Pong(_) => None,
457 Message::Frame(_) => None,
458 }
459}
460
461#[derive(Debug)]
462pub struct KiteTickerRawSubscriber184 {
464 rx: broadcast::Receiver<Bytes>,
465 last_payload: Option<Bytes>,
467}
468
469impl KiteTickerRawSubscriber184 {
470 pub async fn recv_raw(&mut self) -> Result<Option<Bytes>, String> {
473 loop {
474 match self.rx.recv().await {
475 Ok(frame) => {
476 if let Some(bytes) = extract_first_full_payload(&frame) {
477 self.last_payload = Some(bytes.clone());
478 return Ok(Some(bytes));
479 }
480 }
482 Err(broadcast::error::RecvError::Closed) => return Ok(None),
483 Err(e) => return Err(e.to_string()),
484 }
485 }
486 }
487
488 pub async fn recv_raw_ref(&mut self) -> Result<Option<&[u8; 184]>, String> {
491 use crate::tick_as_184 as as_184;
492 match self.recv_raw().await? {
493 Some(bytes) => {
494 self.last_payload = Some(bytes);
496 if let Some(ref b) = self.last_payload {
497 Ok(as_184(b))
498 } else {
499 Ok(None)
500 }
501 }
502 None => Ok(None),
503 }
504 }
505
506 pub async fn recv_raw_tickraw(
512 &mut self,
513 ) -> Result<Option<zerocopy::Ref<&[u8], crate::TickRaw>>, String> {
514 use crate::as_tick_raw;
515 match self.recv_raw().await? {
516 Some(bytes) => {
517 self.last_payload = Some(bytes.clone());
518 if let Some(ref b) = self.last_payload {
519 Ok(as_tick_raw(b))
520 } else {
521 Ok(None)
522 }
523 }
524 None => Ok(None),
525 }
526 }
527
528 pub async fn recv_batch_raw(
530 &mut self,
531 max: usize,
532 ) -> Result<Vec<Bytes>, String> {
533 let mut out = Vec::with_capacity(max.max(1));
534 while out.len() < max {
535 match self.rx.recv().await {
536 Ok(frame) => {
537 extract_all_full_payloads(&frame, max - out.len(), &mut out);
538 if out.len() >= max {
539 break;
540 }
541 }
543 Err(broadcast::error::RecvError::Closed) => break,
544 Err(e) => return Err(e.to_string()),
545 }
546 }
547 Ok(out)
548 }
549}
550
551#[inline]
552fn extract_first_full_payload(frame: &Bytes) -> Option<Bytes> {
553 if frame.len() < 2 {
554 return None;
555 }
556 let mut start = 2usize;
557 let num_packets = u16::from_be_bytes([frame[0], frame[1]]) as usize;
558 for _ in 0..num_packets {
559 if start + 2 > frame.len() {
560 return None;
561 }
562 let packet_len = packet_length(&frame[start..start + 2]);
563 let body_start = start + 2;
564 let next_start = body_start + packet_len;
565 if next_start > frame.len() {
566 return None;
567 }
568 if packet_len == 184 {
569 return Some(frame.slice(body_start..next_start));
571 }
572 start = next_start;
573 }
574 None
575}
576
577#[inline]
578fn extract_all_full_payloads(
579 frame: &Bytes,
580 limit: usize,
581 out: &mut Vec<Bytes>,
582) {
583 if frame.len() < 2 || limit == 0 {
584 return;
585 }
586 let mut start = 2usize;
587 let num_packets = u16::from_be_bytes([frame[0], frame[1]]) as usize;
588 let mut cnt = 0usize;
589 for _ in 0..num_packets {
590 if cnt >= limit {
591 break;
592 }
593 if start + 2 > frame.len() {
594 break;
595 }
596 let packet_len = packet_length(&frame[start..start + 2]);
597 let body_start = start + 2;
598 let next_start = body_start + packet_len;
599 if next_start > frame.len() {
600 break;
601 }
602 if packet_len == 184 {
603 out.push(frame.slice(body_start..next_start));
604 cnt += 1;
605 if cnt >= limit {
606 break;
607 }
608 }
609 start = next_start;
610 }
611}
612
613fn process_binary(binary_message: &[u8]) -> Option<TickerMessage> {
614 if binary_message.len() < 2 {
615 return None;
616 }
617 let num_packets =
618 u16::from_be_bytes([binary_message[0], binary_message[1]]) as usize;
619 if num_packets > 0 {
620 let mut start = 2;
621 let mut ticks: SmallVec<[TickMessage; 32]> =
623 SmallVec::with_capacity(num_packets.min(32));
624 let mut had_error = false;
625 for _ in 0..num_packets {
626 if start + 2 > binary_message.len() {
627 had_error = true;
628 break;
629 }
630 let packet_len = packet_length(&binary_message[start..start + 2]);
631 let next_start = start + 2 + packet_len;
632 if next_start > binary_message.len() {
633 had_error = true;
634 break;
635 }
636 match Tick::try_from(&binary_message[start + 2..next_start]) {
637 Ok(tick) => ticks.push(TickMessage::new(tick.instrument_token, tick)),
638 Err(_e) => {
639 had_error = true;
641 }
642 }
643 start = next_start;
644 }
645 if !ticks.is_empty() {
646 Some(TickerMessage::Ticks(ticks.into_vec()))
647 } else if had_error {
648 Some(TickerMessage::Error(
649 "Failed to parse tick(s) in frame".to_string(),
650 ))
651 } else {
652 None
653 }
654 } else {
655 None
656 }
657}
658
659fn process_text_message(text_message: String) -> Option<TickerMessage> {
660 serde_json::from_str::<TextMessage>(&text_message)
661 .map(|x| x.into())
662 .ok()
663}