kiteticker_async_manager/
ticker.rs1use crate::models::{
2 Mode, Request, TextMessage, Tick, TickMessage, TickerMessage,
3};
4use crate::parser::packet_length;
5use futures_util::{SinkExt, StreamExt};
6use serde_json::json;
7use smallvec::SmallVec;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, mpsc};
11use tokio::task::JoinHandle;
12use tokio_tungstenite::{connect_async, tungstenite::Message};
13
14#[derive(Debug)]
15pub struct KiteTickerAsync {
19 #[allow(dead_code)]
20 api_key: String,
21 #[allow(dead_code)]
22 access_token: String,
23 cmd_tx: Option<mpsc::UnboundedSender<Message>>,
24 msg_tx: broadcast::Sender<TickerMessage>,
25 raw_tx: broadcast::Sender<Arc<[u8]>>, #[allow(dead_code)]
27 raw_only: bool, writer_handle: Option<JoinHandle<()>>,
29 reader_handle: Option<JoinHandle<()>>,
30 parser_handle: Option<JoinHandle<()>>,
31}
32
33impl KiteTickerAsync {
34 pub async fn connect(
36 api_key: &str,
37 access_token: &str,
38 ) -> Result<Self, String> {
39 Self::connect_with_options(api_key, access_token, false).await
40 }
41
42 pub async fn connect_with_options(
44 api_key: &str,
45 access_token: &str,
46 raw_only: bool,
47 ) -> Result<Self, String> {
48 let socket_url = format!(
49 "wss://{}?api_key={}&access_token={}",
50 "ws.kite.trade", api_key, access_token
51 );
52 let url = url::Url::parse(socket_url.as_str()).unwrap();
53
54 let (ws_stream, _) = connect_async(url).await.map_err(|e| e.to_string())?;
55
56 let (write_half, mut read_half) = ws_stream.split();
57
58 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<Message>();
59 let (msg_tx, _) = broadcast::channel(1000);
61 let (raw_tx, _) = broadcast::channel(1000);
62 let mut write = write_half;
63 let writer_handle = tokio::spawn(async move {
64 while let Some(msg) = cmd_rx.recv().await {
65 if write.send(msg).await.is_err() {
66 break;
67 }
68 }
69 });
70
71 let (parse_tx, mut parse_rx) = mpsc::unbounded_channel::<Message>();
73
74 let msg_sender_for_reader = msg_tx.clone();
76 let reader_handle = tokio::spawn(async move {
77 while let Some(message) = read_half.next().await {
78 match message {
79 Ok(msg) => {
80 if parse_tx.send(msg).is_err() {
82 break;
83 }
84 }
85 Err(e) => {
86 let error_msg =
88 TickerMessage::Error(format!("WebSocket error: {}", e));
89 let _ = msg_sender_for_reader.send(error_msg);
90 if matches!(
91 e,
92 tokio_tungstenite::tungstenite::Error::ConnectionClosed
93 | tokio_tungstenite::tungstenite::Error::AlreadyClosed
94 ) {
95 break;
96 }
97 }
98 }
99 }
100 });
101
102 let msg_sender = msg_tx.clone();
104 let raw_sender = raw_tx.clone();
105 let parser_handle = tokio::spawn(async move {
106 let raw_only_mode = raw_only; while let Some(msg) = parse_rx.recv().await {
108 if let Some(processed) =
109 process_message(msg, &raw_sender, raw_only_mode)
110 {
111 let _ = msg_sender.send(processed);
112 }
113 }
114 });
115
116 Ok(KiteTickerAsync {
117 api_key: api_key.to_string(),
118 access_token: access_token.to_string(),
119 cmd_tx: Some(cmd_tx),
120 msg_tx,
121 raw_tx,
122 raw_only,
123 writer_handle: Some(writer_handle),
124 reader_handle: Some(reader_handle),
125 parser_handle: Some(parser_handle),
126 })
127 }
128
129 pub async fn subscribe(
131 &mut self,
132 instrument_tokens: &[u32],
133 mode: Option<Mode>,
134 ) -> Result<KiteTickerSubscriber, String> {
135 self.subscribe_cmd(instrument_tokens, mode.as_ref()).await?;
136 let default_mode = mode.unwrap_or_default();
137 let st = instrument_tokens
138 .iter()
139 .map(|&t| (t, default_mode))
140 .collect();
141
142 let rx = self.msg_tx.subscribe();
143 Ok(KiteTickerSubscriber {
144 subscribed_tokens: st,
145 rx,
146 cmd_tx: self.cmd_tx.clone().map(Arc::new),
147 })
148 }
149
150 pub async fn close(&mut self) -> Result<(), String> {
152 if let Some(tx) = self.cmd_tx.take() {
153 let _ = tx.send(Message::Close(None));
154 }
155 if let Some(handle) = self.writer_handle.take() {
156 handle.await.map_err(|e| e.to_string())?;
157 }
158 if let Some(handle) = self.reader_handle.take() {
159 handle.await.map_err(|e| e.to_string())?;
160 }
161 if let Some(handle) = self.parser_handle.take() {
162 handle.await.map_err(|e| e.to_string())?;
163 }
164 Ok(())
165 }
166
167 async fn subscribe_cmd(
168 &mut self,
169 instrument_tokens: &[u32],
170 mode: Option<&Mode>,
171 ) -> Result<(), String> {
172 let mode_value = mode.cloned().unwrap_or_default();
173 let msgs = vec![
174 Message::Text(Request::subscribe(instrument_tokens).to_string()),
175 Message::Text(Request::mode(mode_value, instrument_tokens).to_string()),
176 ];
177
178 for msg in msgs {
179 if let Some(tx) = &self.cmd_tx {
180 tx.send(msg).map_err(|e| e.to_string())?;
181 }
182 }
183
184 Ok(())
185 }
186
187 pub fn is_connected(&self) -> bool {
191 self.cmd_tx.is_some()
192 && self
193 .writer_handle
194 .as_ref()
195 .is_some_and(|h| !h.is_finished())
196 && self
197 .reader_handle
198 .as_ref()
199 .is_some_and(|h| !h.is_finished())
200 }
201
202 pub async fn ping(&mut self) -> Result<(), String> {
204 if let Some(tx) = &self.cmd_tx {
205 tx.send(Message::Ping(vec![])).map_err(|e| e.to_string())?;
206 Ok(())
207 } else {
208 Err("Connection is closed".to_string())
209 }
210 }
211
212 pub fn receiver_count(&self) -> usize {
214 self.msg_tx.receiver_count()
215 }
216
217 pub fn channel_capacity(&self) -> usize {
219 1000 }
223
224 pub fn subscribe_raw(&self) -> broadcast::Receiver<Arc<[u8]>> {
226 self.raw_tx.subscribe()
227 }
228
229 pub fn command_sender(&self) -> Option<mpsc::UnboundedSender<Message>> {
231 self.cmd_tx.clone()
232 }
233}
234
235#[derive(Debug)]
236pub struct KiteTickerSubscriber {
240 subscribed_tokens: HashMap<u32, Mode>,
242 rx: broadcast::Receiver<TickerMessage>,
243 cmd_tx: Option<Arc<mpsc::UnboundedSender<Message>>>,
244}
245
246impl KiteTickerSubscriber {
247 pub fn get_subscribed(&self) -> Vec<u32> {
249 self
250 .subscribed_tokens
251 .clone()
252 .into_keys()
253 .collect::<Vec<_>>()
254 }
255
256 fn get_subscribed_or(&self, tokens: &[u32]) -> Vec<u32> {
259 if tokens.is_empty() {
260 self.get_subscribed()
261 } else {
262 tokens
263 .iter()
264 .filter(|t| self.subscribed_tokens.contains_key(t))
265 .copied()
266 .collect::<Vec<_>>()
267 }
268 }
269
270 pub async fn subscribe(
272 &mut self,
273 tokens: &[u32],
274 mode: Option<Mode>,
275 ) -> Result<(), String> {
276 let default_mode = mode.unwrap_or_default();
278 let mut new_tokens: Vec<u32> = Vec::new();
279 for &t in tokens {
280 if let std::collections::hash_map::Entry::Vacant(e) =
281 self.subscribed_tokens.entry(t)
282 {
283 e.insert(default_mode);
284 new_tokens.push(t);
285 }
286 }
287 if new_tokens.is_empty() {
288 return Ok(());
289 }
290 if let Some(tx) = &self.cmd_tx {
291 let _ =
293 tx.send(Message::Text(Request::subscribe(&new_tokens).to_string()));
294 if mode.is_some() {
295 let _ = tx.send(Message::Text(
296 Request::mode(default_mode, &new_tokens).to_string(),
297 ));
298 }
299 }
300 Ok(())
301 }
302
303 pub async fn set_mode(
305 &mut self,
306 instrument_tokens: &[u32],
307 mode: Mode,
308 ) -> Result<(), String> {
309 let tokens = self.get_subscribed_or(instrument_tokens);
310 if tokens.is_empty() {
311 return Ok(());
312 }
313 if let Some(tx) = &self.cmd_tx {
314 let _ = tx.send(Message::Text(Request::mode(mode, &tokens).to_string()));
315 }
316 Ok(())
317 }
318
319 pub async fn unsubscribe(
323 &mut self,
324 instrument_tokens: &[u32],
325 ) -> Result<(), String> {
326 let tokens = self.get_subscribed_or(instrument_tokens);
327 if tokens.is_empty() {
328 return Ok(());
329 }
330 if let Some(tx) = &self.cmd_tx {
331 let _ = tx.send(Message::Text(Request::unsubscribe(&tokens).to_string()));
332 }
333 self.subscribed_tokens.retain(|k, _| !tokens.contains(k));
334 Ok(())
335 }
336
337 pub async fn next_message(
340 &mut self,
341 ) -> Result<Option<TickerMessage>, String> {
342 match self.rx.recv().await {
343 Ok(msg) => Ok(Some(msg)),
344 Err(broadcast::error::RecvError::Closed) => Ok(None),
345 Err(e) => Err(e.to_string()),
346 }
347 }
348
349 pub async fn close(&mut self) -> Result<(), String> {
350 Ok(())
351 }
352}
353
354fn process_message(
355 message: Message,
356 raw_sender: &broadcast::Sender<Arc<[u8]>>,
357 raw_only: bool,
358) -> Option<TickerMessage> {
359 match message {
360 Message::Text(text_message) => process_text_message(text_message),
361 Message::Binary(binary_message) => {
362 let arc = Arc::<[u8]>::from(binary_message.into_boxed_slice());
364 let slice: &[u8] = &arc;
365 let _ = raw_sender.send(arc.clone());
367 if raw_only {
368 return Some(TickerMessage::Raw(arc.to_vec()));
369 }
370 if slice.len() < 2 {
371 Some(TickerMessage::Ticks(vec![]))
372 } else {
373 process_binary(slice)
374 }
375 }
376 Message::Close(closing_message) => closing_message.map(|c| {
377 TickerMessage::ClosingMessage(json!({
378 "code": c.code.to_string(),
379 "reason": c.reason.to_string()
380 }))
381 }),
382 Message::Ping(_) => None,
383 Message::Pong(_) => None,
384 Message::Frame(_) => None,
385 }
386}
387
388fn process_binary(binary_message: &[u8]) -> Option<TickerMessage> {
389 if binary_message.len() < 2 {
390 return None;
391 }
392 let num_packets =
393 u16::from_be_bytes([binary_message[0], binary_message[1]]) as usize;
394 if num_packets > 0 {
395 let mut start = 2;
396 let mut ticks: SmallVec<[TickMessage; 32]> =
398 SmallVec::with_capacity(num_packets.min(32));
399 let mut had_error = false;
400 for _ in 0..num_packets {
401 if start + 2 > binary_message.len() {
402 had_error = true;
403 break;
404 }
405 let packet_len = packet_length(&binary_message[start..start + 2]);
406 let next_start = start + 2 + packet_len;
407 if next_start > binary_message.len() {
408 had_error = true;
409 break;
410 }
411 match Tick::try_from(&binary_message[start + 2..next_start]) {
412 Ok(tick) => ticks.push(TickMessage::new(tick.instrument_token, tick)),
413 Err(_e) => {
414 had_error = true;
416 }
417 }
418 start = next_start;
419 }
420 if !ticks.is_empty() {
421 Some(TickerMessage::Ticks(ticks.into_vec()))
422 } else if had_error {
423 Some(TickerMessage::Error(
424 "Failed to parse tick(s) in frame".to_string(),
425 ))
426 } else {
427 None
428 }
429 } else {
430 None
431 }
432}
433
434fn process_text_message(text_message: String) -> Option<TickerMessage> {
435 serde_json::from_str::<TextMessage>(&text_message)
436 .map(|x| x.into())
437 .ok()
438}