kiteticker_async_manager/
ticker.rs

1use crate::models::{
2  Mode, Request, TextMessage, Tick, TickMessage, TickerMessage,
3};
4use crate::parser::packet_length;
5use futures_util::{SinkExt, StreamExt};
6use serde_json::json;
7use smallvec::SmallVec;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, mpsc};
11use tokio::task::JoinHandle;
12use tokio_tungstenite::{connect_async, tungstenite::Message};
13
14#[derive(Debug)]
15///
16/// The WebSocket client for connecting to Kite Connect's streaming quotes service.
17///
18pub struct KiteTickerAsync {
19  #[allow(dead_code)]
20  api_key: String,
21  #[allow(dead_code)]
22  access_token: String,
23  cmd_tx: Option<mpsc::UnboundedSender<Message>>,
24  msg_tx: broadcast::Sender<TickerMessage>,
25  raw_tx: broadcast::Sender<Arc<[u8]>>, // raw binary frames
26  #[allow(dead_code)]
27  raw_only: bool, // if true, skip parsing and emit raw frames as TickerMessage::Raw
28  writer_handle: Option<JoinHandle<()>>,
29  reader_handle: Option<JoinHandle<()>>,
30  parser_handle: Option<JoinHandle<()>>,
31}
32
33impl KiteTickerAsync {
34  /// Establish a connection with the Kite WebSocket server
35  pub async fn connect(
36    api_key: &str,
37    access_token: &str,
38  ) -> Result<Self, String> {
39    Self::connect_with_options(api_key, access_token, false).await
40  }
41
42  /// Connect with options
43  pub async fn connect_with_options(
44    api_key: &str,
45    access_token: &str,
46    raw_only: bool,
47  ) -> Result<Self, String> {
48    let socket_url = format!(
49      "wss://{}?api_key={}&access_token={}",
50      "ws.kite.trade", api_key, access_token
51    );
52    let url = url::Url::parse(socket_url.as_str()).unwrap();
53
54    let (ws_stream, _) = connect_async(url).await.map_err(|e| e.to_string())?;
55
56    let (write_half, mut read_half) = ws_stream.split();
57
58    let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Message>();
59    // Increase buffer size for high-frequency tick data
60    let (msg_tx, _) = broadcast::channel(1000);
61    let (raw_tx, _) = broadcast::channel(1000);
62    let mut write = write_half;
63    let writer_handle = tokio::spawn(async move {
64      while let Some(msg) = cmd_rx.recv().await {
65        if write.send(msg).await.is_err() {
66          break;
67        }
68      }
69    });
70
71    // Channel to decouple read and parse so the websocket stream isn't blocked by parsing
72    let (parse_tx, mut parse_rx) = mpsc::unbounded_channel::<Message>();
73
74    // Reader: only forward messages into parse channel, avoid heavy work here
75    let msg_sender_for_reader = msg_tx.clone();
76    let reader_handle = tokio::spawn(async move {
77      while let Some(message) = read_half.next().await {
78        match message {
79          Ok(msg) => {
80            // Forward to parser; if parser is gone, exit
81            if parse_tx.send(msg).is_err() {
82              break;
83            }
84          }
85          Err(e) => {
86            // Send error and continue trying to read
87            let error_msg =
88              TickerMessage::Error(format!("WebSocket error: {}", e));
89            let _ = msg_sender_for_reader.send(error_msg);
90            if matches!(
91              e,
92              tokio_tungstenite::tungstenite::Error::ConnectionClosed
93                | tokio_tungstenite::tungstenite::Error::AlreadyClosed
94            ) {
95              break;
96            }
97          }
98        }
99      }
100    });
101
102    // Parser: processes messages from the channel and publishes results
103    let msg_sender = msg_tx.clone();
104    let raw_sender = raw_tx.clone();
105    let parser_handle = tokio::spawn(async move {
106      let raw_only_mode = raw_only; // capture
107      while let Some(msg) = parse_rx.recv().await {
108        if let Some(processed) =
109          process_message(msg, &raw_sender, raw_only_mode)
110        {
111          let _ = msg_sender.send(processed);
112        }
113      }
114    });
115
116    Ok(KiteTickerAsync {
117      api_key: api_key.to_string(),
118      access_token: access_token.to_string(),
119      cmd_tx: Some(cmd_tx),
120      msg_tx,
121      raw_tx,
122      raw_only,
123      writer_handle: Some(writer_handle),
124      reader_handle: Some(reader_handle),
125      parser_handle: Some(parser_handle),
126    })
127  }
128
129  /// Subscribes the client to a list of instruments
130  pub async fn subscribe(
131    &mut self,
132    instrument_tokens: &[u32],
133    mode: Option<Mode>,
134  ) -> Result<KiteTickerSubscriber, String> {
135    self.subscribe_cmd(instrument_tokens, mode.as_ref()).await?;
136    let default_mode = mode.unwrap_or_default();
137    let st = instrument_tokens
138      .iter()
139      .map(|&t| (t, default_mode))
140      .collect();
141
142    let rx = self.msg_tx.subscribe();
143    Ok(KiteTickerSubscriber {
144      subscribed_tokens: st,
145      rx,
146      cmd_tx: self.cmd_tx.clone().map(Arc::new),
147    })
148  }
149
150  /// Close the websocket connection
151  pub async fn close(&mut self) -> Result<(), String> {
152    if let Some(tx) = self.cmd_tx.take() {
153      let _ = tx.send(Message::Close(None));
154    }
155    if let Some(handle) = self.writer_handle.take() {
156      handle.await.map_err(|e| e.to_string())?;
157    }
158    if let Some(handle) = self.reader_handle.take() {
159      handle.await.map_err(|e| e.to_string())?;
160    }
161    if let Some(handle) = self.parser_handle.take() {
162      handle.await.map_err(|e| e.to_string())?;
163    }
164    Ok(())
165  }
166
167  async fn subscribe_cmd(
168    &mut self,
169    instrument_tokens: &[u32],
170    mode: Option<&Mode>,
171  ) -> Result<(), String> {
172    let mode_value = mode.cloned().unwrap_or_default();
173    let msgs = vec![
174      Message::Text(Request::subscribe(instrument_tokens).to_string()),
175      Message::Text(Request::mode(mode_value, instrument_tokens).to_string()),
176    ];
177
178    for msg in msgs {
179      if let Some(tx) = &self.cmd_tx {
180        tx.send(msg).map_err(|e| e.to_string())?;
181      }
182    }
183
184    Ok(())
185  }
186
187  // internal helpers removed after refactor; operations now issued via subscriber command handle
188
189  /// Check if the connection is still alive
190  pub fn is_connected(&self) -> bool {
191    self.cmd_tx.is_some()
192      && self
193        .writer_handle
194        .as_ref()
195        .is_some_and(|h| !h.is_finished())
196      && self
197        .reader_handle
198        .as_ref()
199        .is_some_and(|h| !h.is_finished())
200  }
201
202  /// Send a ping to keep the connection alive
203  pub async fn ping(&mut self) -> Result<(), String> {
204    if let Some(tx) = &self.cmd_tx {
205      tx.send(Message::Ping(vec![])).map_err(|e| e.to_string())?;
206      Ok(())
207    } else {
208      Err("Connection is closed".to_string())
209    }
210  }
211
212  /// Get the current broadcast channel receiver count
213  pub fn receiver_count(&self) -> usize {
214    self.msg_tx.receiver_count()
215  }
216
217  /// Get the current broadcast channel capacity
218  pub fn channel_capacity(&self) -> usize {
219    // The broadcast channel doesn't expose capacity directly,
220    // but we can estimate based on our configuration
221    1000 // This matches our increased buffer size
222  }
223
224  /// Subscribe to raw binary frames (zero-copy). Each item is an Arc<[u8]> of the full tungstenite binary frame.
225  pub fn subscribe_raw(&self) -> broadcast::Receiver<Arc<[u8]>> {
226    self.raw_tx.subscribe()
227  }
228
229  /// Get a clone of the internal command sender for incremental ops
230  pub fn command_sender(&self) -> Option<mpsc::UnboundedSender<Message>> {
231    self.cmd_tx.clone()
232  }
233}
234
235#[derive(Debug)]
236///
237/// The Websocket client that entered in a pub/sub mode once the client subscribed to a list of instruments
238///
239pub struct KiteTickerSubscriber {
240  // Now independent of owning the ticker; commands go through channel retained in KiteTickerAsync
241  subscribed_tokens: HashMap<u32, Mode>,
242  rx: broadcast::Receiver<TickerMessage>,
243  cmd_tx: Option<Arc<mpsc::UnboundedSender<Message>>>,
244}
245
246impl KiteTickerSubscriber {
247  /// Get the list of subscribed instruments
248  pub fn get_subscribed(&self) -> Vec<u32> {
249    self
250      .subscribed_tokens
251      .clone()
252      .into_keys()
253      .collect::<Vec<_>>()
254  }
255
256  /// get all tokens common between subscribed tokens and input tokens
257  /// and if the input is empty then all subscribed tokens will be unsubscribed
258  fn get_subscribed_or(&self, tokens: &[u32]) -> Vec<u32> {
259    if tokens.is_empty() {
260      self.get_subscribed()
261    } else {
262      tokens
263        .iter()
264        .filter(|t| self.subscribed_tokens.contains_key(t))
265        .copied()
266        .collect::<Vec<_>>()
267    }
268  }
269
270  /// Subscribe to new tokens
271  pub async fn subscribe(
272    &mut self,
273    tokens: &[u32],
274    mode: Option<Mode>,
275  ) -> Result<(), String> {
276    // Only send incremental subscribe for new tokens
277    let default_mode = mode.unwrap_or_default();
278    let mut new_tokens: Vec<u32> = Vec::new();
279    for &t in tokens {
280      if let std::collections::hash_map::Entry::Vacant(e) =
281        self.subscribed_tokens.entry(t)
282      {
283        e.insert(default_mode);
284        new_tokens.push(t);
285      }
286    }
287    if new_tokens.is_empty() {
288      return Ok(());
289    }
290    if let Some(tx) = &self.cmd_tx {
291      // send subscribe
292      let _ =
293        tx.send(Message::Text(Request::subscribe(&new_tokens).to_string()));
294      if mode.is_some() {
295        let _ = tx.send(Message::Text(
296          Request::mode(default_mode, &new_tokens).to_string(),
297        ));
298      }
299    }
300    Ok(())
301  }
302
303  /// Change the mode of the subscribed instrument tokens
304  pub async fn set_mode(
305    &mut self,
306    instrument_tokens: &[u32],
307    mode: Mode,
308  ) -> Result<(), String> {
309    let tokens = self.get_subscribed_or(instrument_tokens);
310    if tokens.is_empty() {
311      return Ok(());
312    }
313    if let Some(tx) = &self.cmd_tx {
314      let _ = tx.send(Message::Text(Request::mode(mode, &tokens).to_string()));
315    }
316    Ok(())
317  }
318
319  /// Unsubscribe provided subscribed tokens, if input is empty then all subscribed tokens will unsubscribed
320  ///
321  /// Tokens in the input which are not part of the subscribed tokens will be ignored.
322  pub async fn unsubscribe(
323    &mut self,
324    instrument_tokens: &[u32],
325  ) -> Result<(), String> {
326    let tokens = self.get_subscribed_or(instrument_tokens);
327    if tokens.is_empty() {
328      return Ok(());
329    }
330    if let Some(tx) = &self.cmd_tx {
331      let _ = tx.send(Message::Text(Request::unsubscribe(&tokens).to_string()));
332    }
333    self.subscribed_tokens.retain(|k, _| !tokens.contains(k));
334    Ok(())
335  }
336
337  /// Get the next message from the server, waiting if necessary.
338  /// If the result is None then server is terminated
339  pub async fn next_message(
340    &mut self,
341  ) -> Result<Option<TickerMessage>, String> {
342    match self.rx.recv().await {
343      Ok(msg) => Ok(Some(msg)),
344      Err(broadcast::error::RecvError::Closed) => Ok(None),
345      Err(e) => Err(e.to_string()),
346    }
347  }
348
349  pub async fn close(&mut self) -> Result<(), String> {
350    Ok(())
351  }
352}
353
354fn process_message(
355  message: Message,
356  raw_sender: &broadcast::Sender<Arc<[u8]>>,
357  raw_only: bool,
358) -> Option<TickerMessage> {
359  match message {
360    Message::Text(text_message) => process_text_message(text_message),
361    Message::Binary(binary_message) => {
362      // Convert once to Arc<[u8]> to avoid cloning the Vec for raw subscribers
363      let arc = Arc::<[u8]>::from(binary_message.into_boxed_slice());
364      let slice: &[u8] = &arc;
365      // publish raw first (cheap arc clone)
366      let _ = raw_sender.send(arc.clone());
367      if raw_only {
368        return Some(TickerMessage::Raw(arc.to_vec()));
369      }
370      if slice.len() < 2 {
371        Some(TickerMessage::Ticks(vec![]))
372      } else {
373        process_binary(slice)
374      }
375    }
376    Message::Close(closing_message) => closing_message.map(|c| {
377      TickerMessage::ClosingMessage(json!({
378        "code": c.code.to_string(),
379        "reason": c.reason.to_string()
380      }))
381    }),
382    Message::Ping(_) => None,
383    Message::Pong(_) => None,
384    Message::Frame(_) => None,
385  }
386}
387
388fn process_binary(binary_message: &[u8]) -> Option<TickerMessage> {
389  if binary_message.len() < 2 {
390    return None;
391  }
392  let num_packets =
393    u16::from_be_bytes([binary_message[0], binary_message[1]]) as usize;
394  if num_packets > 0 {
395    let mut start = 2;
396    // Inline small optimization: most frames contain modest number of ticks
397    let mut ticks: SmallVec<[TickMessage; 32]> =
398      SmallVec::with_capacity(num_packets.min(32));
399    let mut had_error = false;
400    for _ in 0..num_packets {
401      if start + 2 > binary_message.len() {
402        had_error = true;
403        break;
404      }
405      let packet_len = packet_length(&binary_message[start..start + 2]);
406      let next_start = start + 2 + packet_len;
407      if next_start > binary_message.len() {
408        had_error = true;
409        break;
410      }
411      match Tick::try_from(&binary_message[start + 2..next_start]) {
412        Ok(tick) => ticks.push(TickMessage::new(tick.instrument_token, tick)),
413        Err(_e) => {
414          // Skip this packet, continue with others
415          had_error = true;
416        }
417      }
418      start = next_start;
419    }
420    if !ticks.is_empty() {
421      Some(TickerMessage::Ticks(ticks.into_vec()))
422    } else if had_error {
423      Some(TickerMessage::Error(
424        "Failed to parse tick(s) in frame".to_string(),
425      ))
426    } else {
427      None
428    }
429  } else {
430    None
431  }
432}
433
434fn process_text_message(text_message: String) -> Option<TickerMessage> {
435  serde_json::from_str::<TextMessage>(&text_message)
436    .map(|x| x.into())
437    .ok()
438}