kiteticker_async_manager/
ticker.rs

1use crate::models::{
2  Mode, Request, TextMessage, Tick, TickMessage, TickerMessage,
3};
4use crate::parser::packet_length;
5use futures_util::{SinkExt, StreamExt};
6use serde_json::json;
7use smallvec::SmallVec;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, mpsc};
11use tokio::task::JoinHandle;
12use tokio_tungstenite::{connect_async, tungstenite::Message};
13
14// Bounded capacity for reader -> parser channel to avoid unbounded memory growth
15const PARSE_CHANNEL_CAP: usize = 4096;
16
17#[derive(Debug)]
18///
19/// The WebSocket client for connecting to Kite Connect's streaming quotes service.
20///
21pub struct KiteTickerAsync {
22  #[allow(dead_code)]
23  api_key: String,
24  #[allow(dead_code)]
25  access_token: String,
26  cmd_tx: Option<mpsc::UnboundedSender<Message>>,
27  msg_tx: broadcast::Sender<TickerMessage>,
28  raw_tx: broadcast::Sender<Arc<[u8]>>, // raw binary frames
29  #[allow(dead_code)]
30  raw_only: bool, // if true, skip parsing and emit raw frames as TickerMessage::Raw
31  writer_handle: Option<JoinHandle<()>>,
32  reader_handle: Option<JoinHandle<()>>,
33  parser_handle: Option<JoinHandle<()>>,
34}
35
36impl KiteTickerAsync {
37  /// Establish a connection with the Kite WebSocket server
38  pub async fn connect(
39    api_key: &str,
40    access_token: &str,
41  ) -> Result<Self, String> {
42    Self::connect_with_options(api_key, access_token, false).await
43  }
44
45  /// Connect with options
46  pub async fn connect_with_options(
47    api_key: &str,
48    access_token: &str,
49    raw_only: bool,
50  ) -> Result<Self, String> {
51    let socket_url = format!(
52      "wss://{}?api_key={}&access_token={}",
53      "ws.kite.trade", api_key, access_token
54    );
55    let url = url::Url::parse(socket_url.as_str()).unwrap();
56
57    let (ws_stream, _) = connect_async(url).await.map_err(|e| e.to_string())?;
58
59    let (write_half, mut read_half) = ws_stream.split();
60
61    let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Message>();
62    // Increase buffer size for high-frequency tick data
63    let (msg_tx, _) = broadcast::channel(1000);
64    let (raw_tx, _) = broadcast::channel(1000);
65    let mut write = write_half;
66    let writer_handle = tokio::spawn(async move {
67      while let Some(msg) = cmd_rx.recv().await {
68        if write.send(msg).await.is_err() {
69          break;
70        }
71      }
72    });
73
74    // Channel to decouple read and parse so the websocket stream isn't blocked by parsing.
75    // Use a bounded channel with try_send to provide lightweight backpressure under bursts.
76    let (parse_tx, mut parse_rx) = mpsc::channel::<Message>(PARSE_CHANNEL_CAP);
77
78    // Reader: only forward messages into parse channel, avoid heavy work here
79    let msg_sender_for_reader = msg_tx.clone();
80    let reader_handle = tokio::spawn(async move {
81      while let Some(message) = read_half.next().await {
82        match message {
83          Ok(msg) => {
84            // Forward to parser using non-blocking try_send; if channel is full, drop frame
85            match parse_tx.try_send(msg) {
86              Ok(_) => {}
87              Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
88                log::warn!(
89                  "Reader: parse channel full, dropping incoming frame"
90                );
91                // Drop and continue to keep read loop unblocked
92              }
93              Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
94                // Parser task gone; exit reader
95                break;
96              }
97            }
98          }
99          Err(e) => {
100            // Send error and continue trying to read
101            let error_msg =
102              TickerMessage::Error(format!("WebSocket error: {}", e));
103            let _ = msg_sender_for_reader.send(error_msg);
104            if matches!(
105              e,
106              tokio_tungstenite::tungstenite::Error::ConnectionClosed
107                | tokio_tungstenite::tungstenite::Error::AlreadyClosed
108            ) {
109              break;
110            }
111          }
112        }
113      }
114    });
115
116    // Parser: processes messages from the channel and publishes results
117    let msg_sender = msg_tx.clone();
118    let raw_sender = raw_tx.clone();
119    let parser_handle = tokio::spawn(async move {
120      let raw_only_mode = raw_only; // capture
121      while let Some(msg) = parse_rx.recv().await {
122        if let Some(processed) =
123          process_message(msg, &raw_sender, raw_only_mode)
124        {
125          let _ = msg_sender.send(processed);
126        }
127      }
128    });
129
130    Ok(KiteTickerAsync {
131      api_key: api_key.to_string(),
132      access_token: access_token.to_string(),
133      cmd_tx: Some(cmd_tx),
134      msg_tx,
135      raw_tx,
136      raw_only,
137      writer_handle: Some(writer_handle),
138      reader_handle: Some(reader_handle),
139      parser_handle: Some(parser_handle),
140    })
141  }
142
143  /// Subscribes the client to a list of instruments
144  pub async fn subscribe(
145    &mut self,
146    instrument_tokens: &[u32],
147    mode: Option<Mode>,
148  ) -> Result<KiteTickerSubscriber, String> {
149    self.subscribe_cmd(instrument_tokens, mode.as_ref()).await?;
150    let default_mode = mode.unwrap_or_default();
151    let st = instrument_tokens
152      .iter()
153      .map(|&t| (t, default_mode))
154      .collect();
155
156    let rx = self.msg_tx.subscribe();
157    Ok(KiteTickerSubscriber {
158      subscribed_tokens: st,
159      rx,
160      cmd_tx: self.cmd_tx.clone().map(Arc::new),
161    })
162  }
163
164  /// Close the websocket connection
165  pub async fn close(&mut self) -> Result<(), String> {
166    if let Some(tx) = self.cmd_tx.take() {
167      let _ = tx.send(Message::Close(None));
168    }
169    if let Some(handle) = self.writer_handle.take() {
170      handle.await.map_err(|e| e.to_string())?;
171    }
172    if let Some(handle) = self.reader_handle.take() {
173      handle.await.map_err(|e| e.to_string())?;
174    }
175    if let Some(handle) = self.parser_handle.take() {
176      handle.await.map_err(|e| e.to_string())?;
177    }
178    Ok(())
179  }
180
181  async fn subscribe_cmd(
182    &mut self,
183    instrument_tokens: &[u32],
184    mode: Option<&Mode>,
185  ) -> Result<(), String> {
186    let mode_value = mode.cloned().unwrap_or_default();
187    let msgs = vec![
188      Message::Text(Request::subscribe(instrument_tokens).to_string()),
189      Message::Text(Request::mode(mode_value, instrument_tokens).to_string()),
190    ];
191
192    for msg in msgs {
193      if let Some(tx) = &self.cmd_tx {
194        tx.send(msg).map_err(|e| e.to_string())?;
195      }
196    }
197
198    Ok(())
199  }
200
201  // internal helpers removed after refactor; operations now issued via subscriber command handle
202
203  /// Check if the connection is still alive
204  pub fn is_connected(&self) -> bool {
205    self.cmd_tx.is_some()
206      && self
207        .writer_handle
208        .as_ref()
209        .is_some_and(|h| !h.is_finished())
210      && self
211        .reader_handle
212        .as_ref()
213        .is_some_and(|h| !h.is_finished())
214  }
215
216  /// Send a ping to keep the connection alive
217  pub async fn ping(&mut self) -> Result<(), String> {
218    if let Some(tx) = &self.cmd_tx {
219      tx.send(Message::Ping(vec![])).map_err(|e| e.to_string())?;
220      Ok(())
221    } else {
222      Err("Connection is closed".to_string())
223    }
224  }
225
226  /// Get the current broadcast channel receiver count
227  pub fn receiver_count(&self) -> usize {
228    self.msg_tx.receiver_count()
229  }
230
231  /// Get the current broadcast channel capacity
232  pub fn channel_capacity(&self) -> usize {
233    // The broadcast channel doesn't expose capacity directly,
234    // but we can estimate based on our configuration
235    1000 // This matches our increased buffer size
236  }
237
238  /// Subscribe to raw binary frames (zero-copy). Each item is an Arc<[u8]> of the full tungstenite binary frame.
239  pub fn subscribe_raw(&self) -> broadcast::Receiver<Arc<[u8]>> {
240    self.raw_tx.subscribe()
241  }
242
243  /// Get a clone of the internal command sender for incremental ops
244  pub fn command_sender(&self) -> Option<mpsc::UnboundedSender<Message>> {
245    self.cmd_tx.clone()
246  }
247}
248
249#[derive(Debug)]
250///
251/// The Websocket client that entered in a pub/sub mode once the client subscribed to a list of instruments
252///
253pub struct KiteTickerSubscriber {
254  // Now independent of owning the ticker; commands go through channel retained in KiteTickerAsync
255  subscribed_tokens: HashMap<u32, Mode>,
256  rx: broadcast::Receiver<TickerMessage>,
257  cmd_tx: Option<Arc<mpsc::UnboundedSender<Message>>>,
258}
259
260impl KiteTickerSubscriber {
261  /// Get the list of subscribed instruments
262  pub fn get_subscribed(&self) -> Vec<u32> {
263    self
264      .subscribed_tokens
265      .clone()
266      .into_keys()
267      .collect::<Vec<_>>()
268  }
269
270  /// get all tokens common between subscribed tokens and input tokens
271  /// and if the input is empty then all subscribed tokens will be unsubscribed
272  fn get_subscribed_or(&self, tokens: &[u32]) -> Vec<u32> {
273    if tokens.is_empty() {
274      self.get_subscribed()
275    } else {
276      tokens
277        .iter()
278        .filter(|t| self.subscribed_tokens.contains_key(t))
279        .copied()
280        .collect::<Vec<_>>()
281    }
282  }
283
284  /// Subscribe to new tokens
285  pub async fn subscribe(
286    &mut self,
287    tokens: &[u32],
288    mode: Option<Mode>,
289  ) -> Result<(), String> {
290    // Only send incremental subscribe for new tokens
291    let default_mode = mode.unwrap_or_default();
292    let mut new_tokens: Vec<u32> = Vec::new();
293    for &t in tokens {
294      if let std::collections::hash_map::Entry::Vacant(e) =
295        self.subscribed_tokens.entry(t)
296      {
297        e.insert(default_mode);
298        new_tokens.push(t);
299      }
300    }
301    if new_tokens.is_empty() {
302      return Ok(());
303    }
304    if let Some(tx) = &self.cmd_tx {
305      // send subscribe
306      let _ =
307        tx.send(Message::Text(Request::subscribe(&new_tokens).to_string()));
308      if mode.is_some() {
309        let _ = tx.send(Message::Text(
310          Request::mode(default_mode, &new_tokens).to_string(),
311        ));
312      }
313    }
314    Ok(())
315  }
316
317  /// Change the mode of the subscribed instrument tokens
318  pub async fn set_mode(
319    &mut self,
320    instrument_tokens: &[u32],
321    mode: Mode,
322  ) -> Result<(), String> {
323    let tokens = self.get_subscribed_or(instrument_tokens);
324    if tokens.is_empty() {
325      return Ok(());
326    }
327    if let Some(tx) = &self.cmd_tx {
328      let _ = tx.send(Message::Text(Request::mode(mode, &tokens).to_string()));
329    }
330    Ok(())
331  }
332
333  /// Unsubscribe provided subscribed tokens, if input is empty then all subscribed tokens will unsubscribed
334  ///
335  /// Tokens in the input which are not part of the subscribed tokens will be ignored.
336  pub async fn unsubscribe(
337    &mut self,
338    instrument_tokens: &[u32],
339  ) -> Result<(), String> {
340    let tokens = self.get_subscribed_or(instrument_tokens);
341    if tokens.is_empty() {
342      return Ok(());
343    }
344    if let Some(tx) = &self.cmd_tx {
345      let _ = tx.send(Message::Text(Request::unsubscribe(&tokens).to_string()));
346    }
347    self.subscribed_tokens.retain(|k, _| !tokens.contains(k));
348    Ok(())
349  }
350
351  /// Get the next message from the server, waiting if necessary.
352  /// If the result is None then server is terminated
353  pub async fn next_message(
354    &mut self,
355  ) -> Result<Option<TickerMessage>, String> {
356    match self.rx.recv().await {
357      Ok(msg) => Ok(Some(msg)),
358      Err(broadcast::error::RecvError::Closed) => Ok(None),
359      Err(e) => Err(e.to_string()),
360    }
361  }
362
363  pub async fn close(&mut self) -> Result<(), String> {
364    Ok(())
365  }
366}
367
368fn process_message(
369  message: Message,
370  raw_sender: &broadcast::Sender<Arc<[u8]>>,
371  raw_only: bool,
372) -> Option<TickerMessage> {
373  match message {
374    Message::Text(text_message) => process_text_message(text_message),
375    Message::Binary(binary_message) => {
376      // Convert once to Arc<[u8]> to avoid cloning the Vec for raw subscribers
377      let arc = Arc::<[u8]>::from(binary_message.into_boxed_slice());
378      let slice: &[u8] = &arc;
379      // publish raw first (cheap arc clone)
380      let _ = raw_sender.send(arc.clone());
381      if raw_only {
382        // In raw-only mode, rely solely on raw_tx broadcast to deliver zero-copy frames.
383        // Do not emit a TickerMessage to avoid extra allocations or duplicates.
384        return None;
385      }
386      // Drop 1-byte heartbeat frames per protocol (no downstream churn)
387      if slice.len() < 2 {
388        None
389      } else {
390        process_binary(slice)
391      }
392    }
393    Message::Close(closing_message) => closing_message.map(|c| {
394      TickerMessage::ClosingMessage(json!({
395        "code": c.code.to_string(),
396        "reason": c.reason.to_string()
397      }))
398    }),
399    Message::Ping(_) => None,
400    Message::Pong(_) => None,
401    Message::Frame(_) => None,
402  }
403}
404
405fn process_binary(binary_message: &[u8]) -> Option<TickerMessage> {
406  if binary_message.len() < 2 {
407    return None;
408  }
409  let num_packets =
410    u16::from_be_bytes([binary_message[0], binary_message[1]]) as usize;
411  if num_packets > 0 {
412    let mut start = 2;
413    // Inline small optimization: most frames contain modest number of ticks
414    let mut ticks: SmallVec<[TickMessage; 32]> =
415      SmallVec::with_capacity(num_packets.min(32));
416    let mut had_error = false;
417    for _ in 0..num_packets {
418      if start + 2 > binary_message.len() {
419        had_error = true;
420        break;
421      }
422      let packet_len = packet_length(&binary_message[start..start + 2]);
423      let next_start = start + 2 + packet_len;
424      if next_start > binary_message.len() {
425        had_error = true;
426        break;
427      }
428      match Tick::try_from(&binary_message[start + 2..next_start]) {
429        Ok(tick) => ticks.push(TickMessage::new(tick.instrument_token, tick)),
430        Err(_e) => {
431          // Skip this packet, continue with others
432          had_error = true;
433        }
434      }
435      start = next_start;
436    }
437    if !ticks.is_empty() {
438      Some(TickerMessage::Ticks(ticks.into_vec()))
439    } else if had_error {
440      Some(TickerMessage::Error(
441        "Failed to parse tick(s) in frame".to_string(),
442      ))
443    } else {
444      None
445    }
446  } else {
447    None
448  }
449}
450
451fn process_text_message(text_message: String) -> Option<TickerMessage> {
452  serde_json::from_str::<TextMessage>(&text_message)
453    .map(|x| x.into())
454    .ok()
455}