kiteticker_async_manager/
ticker.rs

1use crate::models::{
2  Mode, Request, TextMessage, Tick, TickMessage, TickerMessage,
3};
4use crate::parser::packet_length;
5use bytes::Bytes;
6use futures_util::{SinkExt, StreamExt};
7use serde_json::json;
8use smallvec::SmallVec;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::{broadcast, mpsc};
12use tokio::task::JoinHandle;
13use tokio_tungstenite::{connect_async, tungstenite::Message};
14
15// Bounded capacity for reader -> parser channel to avoid unbounded memory growth
16const PARSE_CHANNEL_CAP: usize = 4096;
17
18#[derive(Debug)]
19///
20/// The WebSocket client for connecting to Kite Connect's streaming quotes service.
21///
22pub struct KiteTickerAsync {
23  #[allow(dead_code)]
24  api_key: String,
25  #[allow(dead_code)]
26  access_token: String,
27  cmd_tx: Option<mpsc::UnboundedSender<Message>>,
28  msg_tx: broadcast::Sender<TickerMessage>,
29  raw_tx: broadcast::Sender<Bytes>, // raw binary frames
30  #[allow(dead_code)]
31  raw_only: bool, // if true, skip parsing and emit raw frames as TickerMessage::Raw
32  writer_handle: Option<JoinHandle<()>>,
33  reader_handle: Option<JoinHandle<()>>,
34  parser_handle: Option<JoinHandle<()>>,
35}
36
37impl KiteTickerAsync {
38  /// Establish a connection with the Kite WebSocket server
39  pub async fn connect(
40    api_key: &str,
41    access_token: &str,
42  ) -> Result<Self, String> {
43    Self::connect_with_options(api_key, access_token, false).await
44  }
45
46  /// Connect with options
47  pub async fn connect_with_options(
48    api_key: &str,
49    access_token: &str,
50    raw_only: bool,
51  ) -> Result<Self, String> {
52    // Build URL with proper percent-encoding of query params
53    let mut url = url::Url::parse("wss://ws.kite.trade")
54      .map_err(|e| format!("Invalid base URL: {}", e))?;
55    {
56      let mut qp = url.query_pairs_mut();
57      qp.append_pair("api_key", api_key);
58      qp.append_pair("access_token", access_token);
59    }
60    // tokio-tungstenite >=0.27 accepts types implementing IntoClientRequest (Url is fine)
61    let (ws_stream, _resp) =
62      connect_async(url.as_str()).await.map_err(|e| match e {
63        tokio_tungstenite::tungstenite::Error::Http(response) => {
64          // Provide clearer context for HTTP handshake failures
65          let status = response.status();
66          let reason = status.canonical_reason().unwrap_or("");
67          format!(
68            "HTTP error during WebSocket handshake: {} {}",
69            status, reason
70          )
71        }
72        other => other.to_string(),
73      })?;
74
75    let (write_half, mut read_half) = ws_stream.split();
76
77    let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Message>();
78    // Increase buffer size for high-frequency tick data
79    let (msg_tx, _) = broadcast::channel(1000);
80    let (raw_tx, _) = broadcast::channel(1000);
81    let mut write = write_half;
82    let writer_handle = tokio::spawn(async move {
83      while let Some(msg) = cmd_rx.recv().await {
84        if write.send(msg).await.is_err() {
85          break;
86        }
87      }
88    });
89
90    // Channel to decouple read and parse so the websocket stream isn't blocked by parsing.
91    // Use a bounded channel with try_send to provide lightweight backpressure under bursts.
92    let (parse_tx, mut parse_rx) = mpsc::channel::<Message>(PARSE_CHANNEL_CAP);
93
94    // Reader: only forward messages into parse channel, avoid heavy work here
95    let msg_sender_for_reader = msg_tx.clone();
96    let reader_handle = tokio::spawn(async move {
97      while let Some(message) = read_half.next().await {
98        match message {
99          Ok(msg) => {
100            // Forward to parser using non-blocking try_send; if channel is full, drop frame
101            match parse_tx.try_send(msg) {
102              Ok(_) => {}
103              Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
104                log::warn!(
105                  "Reader: parse channel full, dropping incoming frame"
106                );
107                // Drop and continue to keep read loop unblocked
108              }
109              Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
110                // Parser task gone; exit reader
111                break;
112              }
113            }
114          }
115          Err(e) => {
116            // Send error and continue trying to read
117            let error_msg =
118              TickerMessage::Error(format!("WebSocket error: {}", e));
119            let _ = msg_sender_for_reader.send(error_msg);
120            if matches!(
121              e,
122              tokio_tungstenite::tungstenite::Error::ConnectionClosed
123                | tokio_tungstenite::tungstenite::Error::AlreadyClosed
124            ) {
125              break;
126            }
127          }
128        }
129      }
130    });
131
132    // Parser: processes messages from the channel and publishes results
133    let msg_sender = msg_tx.clone();
134    let raw_sender = raw_tx.clone();
135    let parser_handle = tokio::spawn(async move {
136      let raw_only_mode = raw_only; // capture
137      while let Some(msg) = parse_rx.recv().await {
138        if let Some(processed) =
139          process_message(msg, &raw_sender, raw_only_mode)
140        {
141          let _ = msg_sender.send(processed);
142        }
143      }
144    });
145
146    Ok(KiteTickerAsync {
147      api_key: api_key.to_string(),
148      access_token: access_token.to_string(),
149      cmd_tx: Some(cmd_tx),
150      msg_tx,
151      raw_tx,
152      raw_only,
153      writer_handle: Some(writer_handle),
154      reader_handle: Some(reader_handle),
155      parser_handle: Some(parser_handle),
156    })
157  }
158
159  /// Subscribes the client to a list of instruments
160  pub async fn subscribe(
161    &mut self,
162    instrument_tokens: &[u32],
163    mode: Option<Mode>,
164  ) -> Result<KiteTickerSubscriber, String> {
165    self.subscribe_cmd(instrument_tokens, mode.as_ref()).await?;
166    let default_mode = mode.unwrap_or_default();
167    let st = instrument_tokens
168      .iter()
169      .map(|&t| (t, default_mode))
170      .collect();
171
172    let rx = self.msg_tx.subscribe();
173    Ok(KiteTickerSubscriber {
174      subscribed_tokens: st,
175      rx,
176      cmd_tx: self.cmd_tx.clone().map(Arc::new),
177    })
178  }
179
180  /// Close the websocket connection
181  pub async fn close(&mut self) -> Result<(), String> {
182    if let Some(tx) = self.cmd_tx.take() {
183      let _ = tx.send(Message::Close(None));
184    }
185    if let Some(handle) = self.writer_handle.take() {
186      handle.await.map_err(|e| e.to_string())?;
187    }
188    if let Some(handle) = self.reader_handle.take() {
189      handle.await.map_err(|e| e.to_string())?;
190    }
191    if let Some(handle) = self.parser_handle.take() {
192      handle.await.map_err(|e| e.to_string())?;
193    }
194    Ok(())
195  }
196
197  async fn subscribe_cmd(
198    &mut self,
199    instrument_tokens: &[u32],
200    mode: Option<&Mode>,
201  ) -> Result<(), String> {
202    let mode_value = mode.cloned().unwrap_or_default();
203    let msgs = vec![
204      Message::Text(Request::subscribe(instrument_tokens).to_string().into()),
205      Message::Text(
206        Request::mode(mode_value, instrument_tokens)
207          .to_string()
208          .into(),
209      ),
210    ];
211
212    for msg in msgs {
213      if let Some(tx) = &self.cmd_tx {
214        tx.send(msg).map_err(|e| e.to_string())?;
215      }
216    }
217
218    Ok(())
219  }
220
221  // internal helpers removed after refactor; operations now issued via subscriber command handle
222
223  /// Check if the connection is still alive
224  pub fn is_connected(&self) -> bool {
225    self.cmd_tx.is_some()
226      && self
227        .writer_handle
228        .as_ref()
229        .is_some_and(|h| !h.is_finished())
230      && self
231        .reader_handle
232        .as_ref()
233        .is_some_and(|h| !h.is_finished())
234  }
235
236  /// Send a ping to keep the connection alive
237  pub async fn ping(&mut self) -> Result<(), String> {
238    if let Some(tx) = &self.cmd_tx {
239      tx.send(Message::Ping(bytes::Bytes::new()))
240        .map_err(|e| e.to_string())?;
241      Ok(())
242    } else {
243      Err("Connection is closed".to_string())
244    }
245  }
246
247  /// Get the current broadcast channel receiver count
248  pub fn receiver_count(&self) -> usize {
249    self.msg_tx.receiver_count()
250  }
251
252  /// Get the current broadcast channel capacity
253  pub fn channel_capacity(&self) -> usize {
254    // The broadcast channel doesn't expose capacity directly,
255    // but we can estimate based on our configuration
256    1000 // This matches our increased buffer size
257  }
258
259  /// Subscribe to raw binary frames (zero-copy). Each item is the full websocket frame bytes.
260  ///
261  /// Use this to implement custom parsing or zero-copy peeking on packet bodies.
262  /// Each emitted item is a `bytes::Bytes` that shares the underlying frame buffer (clone is cheap).
263  ///
264  /// See the crate-level docs for an end-to-end example of slicing packet bodies from a frame.
265  pub fn subscribe_raw_frames(&self) -> broadcast::Receiver<Bytes> {
266    self.raw_tx.subscribe()
267  }
268
269  /// Backward-compatible alias for subscribe_raw_frames.
270  #[deprecated(
271    note = "use subscribe_raw_frames() instead; now returns bytes::Bytes"
272  )]
273  pub fn subscribe_raw(&self) -> broadcast::Receiver<Bytes> {
274    self.subscribe_raw_frames()
275  }
276
277  /// Create a subscriber that yields only 184-byte Full tick payloads sliced from frames.
278  ///
279  /// The returned subscriber exposes convenience methods to receive raw `Bytes`, a fixed `[u8;184]`
280  /// reference, or a `zerocopy::Ref<&[u8], TickRaw>` view via `recv_raw_tickraw`.
281  ///
282  /// Note: the typed `Ref` returned by `recv_raw_tickraw` is valid until the next method call that
283  /// overwrites the internal buffer. If you need to hold onto the data longer, clone the `Bytes` and
284  /// re-create the view as needed using `as_tick_raw`.
285  pub fn subscribe_full_raw(&self) -> KiteTickerRawSubscriber184 {
286    KiteTickerRawSubscriber184 {
287      rx: self.raw_tx.subscribe(),
288      last_payload: None,
289    }
290  }
291
292  /// Get a clone of the internal command sender for incremental ops
293  pub fn command_sender(&self) -> Option<mpsc::UnboundedSender<Message>> {
294    self.cmd_tx.clone()
295  }
296}
297
298#[derive(Debug)]
299///
300/// The Websocket client that entered in a pub/sub mode once the client subscribed to a list of instruments
301///
302pub struct KiteTickerSubscriber {
303  // Now independent of owning the ticker; commands go through channel retained in KiteTickerAsync
304  subscribed_tokens: HashMap<u32, Mode>,
305  rx: broadcast::Receiver<TickerMessage>,
306  cmd_tx: Option<Arc<mpsc::UnboundedSender<Message>>>,
307}
308
309impl KiteTickerSubscriber {
310  /// Get the list of subscribed instruments
311  pub fn get_subscribed(&self) -> Vec<u32> {
312    self
313      .subscribed_tokens
314      .clone()
315      .into_keys()
316      .collect::<Vec<_>>()
317  }
318
319  /// get all tokens common between subscribed tokens and input tokens
320  /// and if the input is empty then all subscribed tokens will be unsubscribed
321  fn get_subscribed_or(&self, tokens: &[u32]) -> Vec<u32> {
322    if tokens.is_empty() {
323      self.get_subscribed()
324    } else {
325      tokens
326        .iter()
327        .filter(|t| self.subscribed_tokens.contains_key(t))
328        .copied()
329        .collect::<Vec<_>>()
330    }
331  }
332
333  /// Subscribe to new tokens
334  pub async fn subscribe(
335    &mut self,
336    tokens: &[u32],
337    mode: Option<Mode>,
338  ) -> Result<(), String> {
339    // Only send incremental subscribe for new tokens
340    let default_mode = mode.unwrap_or_default();
341    let mut new_tokens: Vec<u32> = Vec::new();
342    for &t in tokens {
343      if let std::collections::hash_map::Entry::Vacant(e) =
344        self.subscribed_tokens.entry(t)
345      {
346        e.insert(default_mode);
347        new_tokens.push(t);
348      }
349    }
350    if new_tokens.is_empty() {
351      return Ok(());
352    }
353    if let Some(tx) = &self.cmd_tx {
354      // send subscribe
355      let _ = tx.send(Message::Text(
356        Request::subscribe(&new_tokens).to_string().into(),
357      ));
358      if mode.is_some() {
359        let _ = tx.send(Message::Text(
360          Request::mode(default_mode, &new_tokens).to_string().into(),
361        ));
362      }
363    }
364    Ok(())
365  }
366
367  /// Change the mode of the subscribed instrument tokens
368  pub async fn set_mode(
369    &mut self,
370    instrument_tokens: &[u32],
371    mode: Mode,
372  ) -> Result<(), String> {
373    let tokens = self.get_subscribed_or(instrument_tokens);
374    if tokens.is_empty() {
375      return Ok(());
376    }
377    if let Some(tx) = &self.cmd_tx {
378      let _ = tx.send(Message::Text(
379        Request::mode(mode, &tokens).to_string().into(),
380      ));
381    }
382    Ok(())
383  }
384
385  /// Unsubscribe provided subscribed tokens, if input is empty then all subscribed tokens will unsubscribed
386  ///
387  /// Tokens in the input which are not part of the subscribed tokens will be ignored.
388  pub async fn unsubscribe(
389    &mut self,
390    instrument_tokens: &[u32],
391  ) -> Result<(), String> {
392    let tokens = self.get_subscribed_or(instrument_tokens);
393    if tokens.is_empty() {
394      return Ok(());
395    }
396    if let Some(tx) = &self.cmd_tx {
397      let _ = tx.send(Message::Text(
398        Request::unsubscribe(&tokens).to_string().into(),
399      ));
400    }
401    self.subscribed_tokens.retain(|k, _| !tokens.contains(k));
402    Ok(())
403  }
404
405  /// Get the next message from the server, waiting if necessary.
406  /// If the result is None then server is terminated
407  pub async fn next_message(
408    &mut self,
409  ) -> Result<Option<TickerMessage>, String> {
410    match self.rx.recv().await {
411      Ok(msg) => Ok(Some(msg)),
412      Err(broadcast::error::RecvError::Closed) => Ok(None),
413      Err(e) => Err(e.to_string()),
414    }
415  }
416
417  pub async fn close(&mut self) -> Result<(), String> {
418    Ok(())
419  }
420}
421
422fn process_message(
423  message: Message,
424  raw_sender: &broadcast::Sender<Bytes>,
425  raw_only: bool,
426) -> Option<TickerMessage> {
427  match message {
428    Message::Text(text_message) => {
429      process_text_message(text_message.to_string())
430    }
431    Message::Binary(binary_message) => {
432      // Convert once to Bytes to avoid cloning the Vec for raw subscribers
433      let bytes = binary_message;
434      let slice: &[u8] = &bytes;
435      // publish raw first (cheap clone)
436      let _ = raw_sender.send(bytes.clone());
437      if raw_only {
438        // In raw-only mode, rely solely on raw_tx broadcast to deliver zero-copy frames.
439        // Do not emit a TickerMessage to avoid extra allocations or duplicates.
440        return None;
441      }
442      // Drop 1-byte heartbeat frames per protocol (no downstream churn)
443      if slice.len() < 2 {
444        None
445      } else {
446        process_binary(slice)
447      }
448    }
449    Message::Close(closing_message) => closing_message.map(|c| {
450      TickerMessage::ClosingMessage(json!({
451        "code": c.code.to_string(),
452        "reason": c.reason.to_string()
453      }))
454    }),
455    Message::Ping(_) => None,
456    Message::Pong(_) => None,
457    Message::Frame(_) => None,
458  }
459}
460
461#[derive(Debug)]
462/// Subscriber that yields raw 184-byte payloads (Mode::Full) extracted from incoming frames.
463pub struct KiteTickerRawSubscriber184 {
464  rx: broadcast::Receiver<Bytes>,
465  // Keep last payload alive for reference-returning APIs
466  last_payload: Option<Bytes>,
467}
468
469impl KiteTickerRawSubscriber184 {
470  /// Receive the next 184-byte payload, if any frame contains it. Skips non-Full packets.
471  /// Returns Bytes that points to the underlying frame memory (zero-copy); slice is cloned out.
472  pub async fn recv_raw(&mut self) -> Result<Option<Bytes>, String> {
473    loop {
474      match self.rx.recv().await {
475        Ok(frame) => {
476          if let Some(bytes) = extract_first_full_payload(&frame) {
477            self.last_payload = Some(bytes.clone());
478            return Ok(Some(bytes));
479          }
480          // else keep looping for next frame
481        }
482        Err(broadcast::error::RecvError::Closed) => return Ok(None),
483        Err(e) => return Err(e.to_string()),
484      }
485    }
486  }
487
488  /// Receive next payload and return a reference to a fixed 184-byte array.
489  /// The reference remains valid until the next call that overwrites internal buffer.
490  pub async fn recv_raw_ref(&mut self) -> Result<Option<&[u8; 184]>, String> {
491    use crate::tick_as_184 as as_184;
492    match self.recv_raw().await? {
493      Some(bytes) => {
494        // Store to keep alive, then take a ref from stored bytes
495        self.last_payload = Some(bytes);
496        if let Some(ref b) = self.last_payload {
497          Ok(as_184(b))
498        } else {
499          Ok(None)
500        }
501      }
502      None => Ok(None),
503    }
504  }
505
506  /// Receive next payload and return a zero-copy typed view `TickRaw`.
507  ///
508  /// Returns `Some(zerocopy::Ref<&[u8], TickRaw>)` for a Full packet body (184 bytes), otherwise waits.
509  /// The `Ref` dereferences to `&TickRaw` and stays valid until another method call that replaces
510  /// the internal `Bytes` buffer.
511  pub async fn recv_raw_tickraw(
512    &mut self,
513  ) -> Result<Option<zerocopy::Ref<&[u8], crate::TickRaw>>, String> {
514    use crate::as_tick_raw;
515    match self.recv_raw().await? {
516      Some(bytes) => {
517        self.last_payload = Some(bytes.clone());
518        if let Some(ref b) = self.last_payload {
519          Ok(as_tick_raw(b))
520        } else {
521          Ok(None)
522        }
523      }
524      None => Ok(None),
525    }
526  }
527
528  /// Receive up to `max` 184-byte payloads from the next frame(s). This avoids per-packet awaits.
529  pub async fn recv_batch_raw(
530    &mut self,
531    max: usize,
532  ) -> Result<Vec<Bytes>, String> {
533    let mut out = Vec::with_capacity(max.max(1));
534    while out.len() < max {
535      match self.rx.recv().await {
536        Ok(frame) => {
537          extract_all_full_payloads(&frame, max - out.len(), &mut out);
538          if out.len() >= max {
539            break;
540          }
541          // continue to next frame if more needed
542        }
543        Err(broadcast::error::RecvError::Closed) => break,
544        Err(e) => return Err(e.to_string()),
545      }
546    }
547    Ok(out)
548  }
549}
550
551#[inline]
552fn extract_first_full_payload(frame: &Bytes) -> Option<Bytes> {
553  if frame.len() < 2 {
554    return None;
555  }
556  let mut start = 2usize;
557  let num_packets = u16::from_be_bytes([frame[0], frame[1]]) as usize;
558  for _ in 0..num_packets {
559    if start + 2 > frame.len() {
560      return None;
561    }
562    let packet_len = packet_length(&frame[start..start + 2]);
563    let body_start = start + 2;
564    let next_start = body_start + packet_len;
565    if next_start > frame.len() {
566      return None;
567    }
568    if packet_len == 184 {
569      // slice reference into Bytes
570      return Some(frame.slice(body_start..next_start));
571    }
572    start = next_start;
573  }
574  None
575}
576
577#[inline]
578fn extract_all_full_payloads(
579  frame: &Bytes,
580  limit: usize,
581  out: &mut Vec<Bytes>,
582) {
583  if frame.len() < 2 || limit == 0 {
584    return;
585  }
586  let mut start = 2usize;
587  let num_packets = u16::from_be_bytes([frame[0], frame[1]]) as usize;
588  let mut cnt = 0usize;
589  for _ in 0..num_packets {
590    if cnt >= limit {
591      break;
592    }
593    if start + 2 > frame.len() {
594      break;
595    }
596    let packet_len = packet_length(&frame[start..start + 2]);
597    let body_start = start + 2;
598    let next_start = body_start + packet_len;
599    if next_start > frame.len() {
600      break;
601    }
602    if packet_len == 184 {
603      out.push(frame.slice(body_start..next_start));
604      cnt += 1;
605      if cnt >= limit {
606        break;
607      }
608    }
609    start = next_start;
610  }
611}
612
613fn process_binary(binary_message: &[u8]) -> Option<TickerMessage> {
614  if binary_message.len() < 2 {
615    return None;
616  }
617  let num_packets =
618    u16::from_be_bytes([binary_message[0], binary_message[1]]) as usize;
619  if num_packets > 0 {
620    let mut start = 2;
621    // Inline small optimization: most frames contain modest number of ticks
622    let mut ticks: SmallVec<[TickMessage; 32]> =
623      SmallVec::with_capacity(num_packets.min(32));
624    let mut had_error = false;
625    for _ in 0..num_packets {
626      if start + 2 > binary_message.len() {
627        had_error = true;
628        break;
629      }
630      let packet_len = packet_length(&binary_message[start..start + 2]);
631      let next_start = start + 2 + packet_len;
632      if next_start > binary_message.len() {
633        had_error = true;
634        break;
635      }
636      match Tick::try_from(&binary_message[start + 2..next_start]) {
637        Ok(tick) => ticks.push(TickMessage::new(tick.instrument_token, tick)),
638        Err(_e) => {
639          // Skip this packet, continue with others
640          had_error = true;
641        }
642      }
643      start = next_start;
644    }
645    if !ticks.is_empty() {
646      Some(TickerMessage::Ticks(ticks.into_vec()))
647    } else if had_error {
648      Some(TickerMessage::Error(
649        "Failed to parse tick(s) in frame".to_string(),
650      ))
651    } else {
652      None
653    }
654  } else {
655    None
656  }
657}
658
659fn process_text_message(text_message: String) -> Option<TickerMessage> {
660  serde_json::from_str::<TextMessage>(&text_message)
661    .map(|x| x.into())
662    .ok()
663}