kiteticker_async_manager/
ticker.rs1use crate::models::{
2 Mode, Request, TextMessage, Tick, TickMessage, TickerMessage,
3};
4use crate::parser::packet_length;
5use futures_util::{SinkExt, StreamExt};
6use serde_json::json;
7use smallvec::SmallVec;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, mpsc};
11use tokio::task::JoinHandle;
12use tokio_tungstenite::{connect_async, tungstenite::Message};
13
14const PARSE_CHANNEL_CAP: usize = 4096;
16
17#[derive(Debug)]
18pub struct KiteTickerAsync {
22 #[allow(dead_code)]
23 api_key: String,
24 #[allow(dead_code)]
25 access_token: String,
26 cmd_tx: Option<mpsc::UnboundedSender<Message>>,
27 msg_tx: broadcast::Sender<TickerMessage>,
28 raw_tx: broadcast::Sender<Arc<[u8]>>, #[allow(dead_code)]
30 raw_only: bool, writer_handle: Option<JoinHandle<()>>,
32 reader_handle: Option<JoinHandle<()>>,
33 parser_handle: Option<JoinHandle<()>>,
34}
35
36impl KiteTickerAsync {
37 pub async fn connect(
39 api_key: &str,
40 access_token: &str,
41 ) -> Result<Self, String> {
42 Self::connect_with_options(api_key, access_token, false).await
43 }
44
45 pub async fn connect_with_options(
47 api_key: &str,
48 access_token: &str,
49 raw_only: bool,
50 ) -> Result<Self, String> {
51 let socket_url = format!(
52 "wss://{}?api_key={}&access_token={}",
53 "ws.kite.trade", api_key, access_token
54 );
55 let url = url::Url::parse(socket_url.as_str()).unwrap();
56
57 let (ws_stream, _) = connect_async(url).await.map_err(|e| e.to_string())?;
58
59 let (write_half, mut read_half) = ws_stream.split();
60
61 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Message>();
62 let (msg_tx, _) = broadcast::channel(1000);
64 let (raw_tx, _) = broadcast::channel(1000);
65 let mut write = write_half;
66 let writer_handle = tokio::spawn(async move {
67 while let Some(msg) = cmd_rx.recv().await {
68 if write.send(msg).await.is_err() {
69 break;
70 }
71 }
72 });
73
74 let (parse_tx, mut parse_rx) = mpsc::channel::<Message>(PARSE_CHANNEL_CAP);
77
78 let msg_sender_for_reader = msg_tx.clone();
80 let reader_handle = tokio::spawn(async move {
81 while let Some(message) = read_half.next().await {
82 match message {
83 Ok(msg) => {
84 match parse_tx.try_send(msg) {
86 Ok(_) => {}
87 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
88 log::warn!(
89 "Reader: parse channel full, dropping incoming frame"
90 );
91 }
93 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
94 break;
96 }
97 }
98 }
99 Err(e) => {
100 let error_msg =
102 TickerMessage::Error(format!("WebSocket error: {}", e));
103 let _ = msg_sender_for_reader.send(error_msg);
104 if matches!(
105 e,
106 tokio_tungstenite::tungstenite::Error::ConnectionClosed
107 | tokio_tungstenite::tungstenite::Error::AlreadyClosed
108 ) {
109 break;
110 }
111 }
112 }
113 }
114 });
115
116 let msg_sender = msg_tx.clone();
118 let raw_sender = raw_tx.clone();
119 let parser_handle = tokio::spawn(async move {
120 let raw_only_mode = raw_only; while let Some(msg) = parse_rx.recv().await {
122 if let Some(processed) =
123 process_message(msg, &raw_sender, raw_only_mode)
124 {
125 let _ = msg_sender.send(processed);
126 }
127 }
128 });
129
130 Ok(KiteTickerAsync {
131 api_key: api_key.to_string(),
132 access_token: access_token.to_string(),
133 cmd_tx: Some(cmd_tx),
134 msg_tx,
135 raw_tx,
136 raw_only,
137 writer_handle: Some(writer_handle),
138 reader_handle: Some(reader_handle),
139 parser_handle: Some(parser_handle),
140 })
141 }
142
143 pub async fn subscribe(
145 &mut self,
146 instrument_tokens: &[u32],
147 mode: Option<Mode>,
148 ) -> Result<KiteTickerSubscriber, String> {
149 self.subscribe_cmd(instrument_tokens, mode.as_ref()).await?;
150 let default_mode = mode.unwrap_or_default();
151 let st = instrument_tokens
152 .iter()
153 .map(|&t| (t, default_mode))
154 .collect();
155
156 let rx = self.msg_tx.subscribe();
157 Ok(KiteTickerSubscriber {
158 subscribed_tokens: st,
159 rx,
160 cmd_tx: self.cmd_tx.clone().map(Arc::new),
161 })
162 }
163
164 pub async fn close(&mut self) -> Result<(), String> {
166 if let Some(tx) = self.cmd_tx.take() {
167 let _ = tx.send(Message::Close(None));
168 }
169 if let Some(handle) = self.writer_handle.take() {
170 handle.await.map_err(|e| e.to_string())?;
171 }
172 if let Some(handle) = self.reader_handle.take() {
173 handle.await.map_err(|e| e.to_string())?;
174 }
175 if let Some(handle) = self.parser_handle.take() {
176 handle.await.map_err(|e| e.to_string())?;
177 }
178 Ok(())
179 }
180
181 async fn subscribe_cmd(
182 &mut self,
183 instrument_tokens: &[u32],
184 mode: Option<&Mode>,
185 ) -> Result<(), String> {
186 let mode_value = mode.cloned().unwrap_or_default();
187 let msgs = vec![
188 Message::Text(Request::subscribe(instrument_tokens).to_string()),
189 Message::Text(Request::mode(mode_value, instrument_tokens).to_string()),
190 ];
191
192 for msg in msgs {
193 if let Some(tx) = &self.cmd_tx {
194 tx.send(msg).map_err(|e| e.to_string())?;
195 }
196 }
197
198 Ok(())
199 }
200
201 pub fn is_connected(&self) -> bool {
205 self.cmd_tx.is_some()
206 && self
207 .writer_handle
208 .as_ref()
209 .is_some_and(|h| !h.is_finished())
210 && self
211 .reader_handle
212 .as_ref()
213 .is_some_and(|h| !h.is_finished())
214 }
215
216 pub async fn ping(&mut self) -> Result<(), String> {
218 if let Some(tx) = &self.cmd_tx {
219 tx.send(Message::Ping(vec![])).map_err(|e| e.to_string())?;
220 Ok(())
221 } else {
222 Err("Connection is closed".to_string())
223 }
224 }
225
226 pub fn receiver_count(&self) -> usize {
228 self.msg_tx.receiver_count()
229 }
230
231 pub fn channel_capacity(&self) -> usize {
233 1000 }
237
238 pub fn subscribe_raw(&self) -> broadcast::Receiver<Arc<[u8]>> {
240 self.raw_tx.subscribe()
241 }
242
243 pub fn command_sender(&self) -> Option<mpsc::UnboundedSender<Message>> {
245 self.cmd_tx.clone()
246 }
247}
248
249#[derive(Debug)]
250pub struct KiteTickerSubscriber {
254 subscribed_tokens: HashMap<u32, Mode>,
256 rx: broadcast::Receiver<TickerMessage>,
257 cmd_tx: Option<Arc<mpsc::UnboundedSender<Message>>>,
258}
259
260impl KiteTickerSubscriber {
261 pub fn get_subscribed(&self) -> Vec<u32> {
263 self
264 .subscribed_tokens
265 .clone()
266 .into_keys()
267 .collect::<Vec<_>>()
268 }
269
270 fn get_subscribed_or(&self, tokens: &[u32]) -> Vec<u32> {
273 if tokens.is_empty() {
274 self.get_subscribed()
275 } else {
276 tokens
277 .iter()
278 .filter(|t| self.subscribed_tokens.contains_key(t))
279 .copied()
280 .collect::<Vec<_>>()
281 }
282 }
283
284 pub async fn subscribe(
286 &mut self,
287 tokens: &[u32],
288 mode: Option<Mode>,
289 ) -> Result<(), String> {
290 let default_mode = mode.unwrap_or_default();
292 let mut new_tokens: Vec<u32> = Vec::new();
293 for &t in tokens {
294 if let std::collections::hash_map::Entry::Vacant(e) =
295 self.subscribed_tokens.entry(t)
296 {
297 e.insert(default_mode);
298 new_tokens.push(t);
299 }
300 }
301 if new_tokens.is_empty() {
302 return Ok(());
303 }
304 if let Some(tx) = &self.cmd_tx {
305 let _ =
307 tx.send(Message::Text(Request::subscribe(&new_tokens).to_string()));
308 if mode.is_some() {
309 let _ = tx.send(Message::Text(
310 Request::mode(default_mode, &new_tokens).to_string(),
311 ));
312 }
313 }
314 Ok(())
315 }
316
317 pub async fn set_mode(
319 &mut self,
320 instrument_tokens: &[u32],
321 mode: Mode,
322 ) -> Result<(), String> {
323 let tokens = self.get_subscribed_or(instrument_tokens);
324 if tokens.is_empty() {
325 return Ok(());
326 }
327 if let Some(tx) = &self.cmd_tx {
328 let _ = tx.send(Message::Text(Request::mode(mode, &tokens).to_string()));
329 }
330 Ok(())
331 }
332
333 pub async fn unsubscribe(
337 &mut self,
338 instrument_tokens: &[u32],
339 ) -> Result<(), String> {
340 let tokens = self.get_subscribed_or(instrument_tokens);
341 if tokens.is_empty() {
342 return Ok(());
343 }
344 if let Some(tx) = &self.cmd_tx {
345 let _ = tx.send(Message::Text(Request::unsubscribe(&tokens).to_string()));
346 }
347 self.subscribed_tokens.retain(|k, _| !tokens.contains(k));
348 Ok(())
349 }
350
351 pub async fn next_message(
354 &mut self,
355 ) -> Result<Option<TickerMessage>, String> {
356 match self.rx.recv().await {
357 Ok(msg) => Ok(Some(msg)),
358 Err(broadcast::error::RecvError::Closed) => Ok(None),
359 Err(e) => Err(e.to_string()),
360 }
361 }
362
363 pub async fn close(&mut self) -> Result<(), String> {
364 Ok(())
365 }
366}
367
368fn process_message(
369 message: Message,
370 raw_sender: &broadcast::Sender<Arc<[u8]>>,
371 raw_only: bool,
372) -> Option<TickerMessage> {
373 match message {
374 Message::Text(text_message) => process_text_message(text_message),
375 Message::Binary(binary_message) => {
376 let arc = Arc::<[u8]>::from(binary_message.into_boxed_slice());
378 let slice: &[u8] = &arc;
379 let _ = raw_sender.send(arc.clone());
381 if raw_only {
382 return None;
385 }
386 if slice.len() < 2 {
388 None
389 } else {
390 process_binary(slice)
391 }
392 }
393 Message::Close(closing_message) => closing_message.map(|c| {
394 TickerMessage::ClosingMessage(json!({
395 "code": c.code.to_string(),
396 "reason": c.reason.to_string()
397 }))
398 }),
399 Message::Ping(_) => None,
400 Message::Pong(_) => None,
401 Message::Frame(_) => None,
402 }
403}
404
405fn process_binary(binary_message: &[u8]) -> Option<TickerMessage> {
406 if binary_message.len() < 2 {
407 return None;
408 }
409 let num_packets =
410 u16::from_be_bytes([binary_message[0], binary_message[1]]) as usize;
411 if num_packets > 0 {
412 let mut start = 2;
413 let mut ticks: SmallVec<[TickMessage; 32]> =
415 SmallVec::with_capacity(num_packets.min(32));
416 let mut had_error = false;
417 for _ in 0..num_packets {
418 if start + 2 > binary_message.len() {
419 had_error = true;
420 break;
421 }
422 let packet_len = packet_length(&binary_message[start..start + 2]);
423 let next_start = start + 2 + packet_len;
424 if next_start > binary_message.len() {
425 had_error = true;
426 break;
427 }
428 match Tick::try_from(&binary_message[start + 2..next_start]) {
429 Ok(tick) => ticks.push(TickMessage::new(tick.instrument_token, tick)),
430 Err(_e) => {
431 had_error = true;
433 }
434 }
435 start = next_start;
436 }
437 if !ticks.is_empty() {
438 Some(TickerMessage::Ticks(ticks.into_vec()))
439 } else if had_error {
440 Some(TickerMessage::Error(
441 "Failed to parse tick(s) in frame".to_string(),
442 ))
443 } else {
444 None
445 }
446 } else {
447 None
448 }
449}
450
451fn process_text_message(text_message: String) -> Option<TickerMessage> {
452 serde_json::from_str::<TextMessage>(&text_message)
453 .map(|x| x.into())
454 .ok()
455}