Skip to main content

kiteticker_async/
models.rs

1use serde::{Deserialize, Serialize};
2use std::{ops::Div, time::Duration};
3
4fn value(input: &[u8]) -> Option<u32> {
5  let value = i32::from_be_bytes(input[0..=3].try_into().unwrap());
6  value.try_into().ok()
7}
8
9fn value_short(input: &[u8]) -> Option<u16> {
10  let value = i16::from_be_bytes(input[0..=1].try_into().unwrap());
11  value.try_into().ok()
12}
13
14fn price(input: &[u8], exchange: &Exchange) -> Option<f64> {
15  let value = i32::from_be_bytes(input[0..4].try_into().unwrap()) as f64;
16  if exchange.divisor() > 0_f64 {
17    Some(value.div(exchange.divisor()))
18  } else {
19    None
20  }
21}
22
23pub(crate) fn packet_length(bs: &[u8]) -> usize {
24  i16::from_be_bytes(bs[0..=1].try_into().unwrap()) as usize
25}
26
27#[derive(Debug, Clone, Default)]
28///
29/// Quote packet structure
30///
31pub struct Tick {
32  pub mode: Mode,
33  pub instrument_token: u32,
34  pub exchange: Exchange,
35  pub is_tradable: bool,
36  pub is_index: bool,
37
38  pub last_traded_qty: Option<u32>,
39  pub avg_traded_price: Option<f64>,
40  pub last_price: Option<f64>,
41  pub volume_traded: Option<u32>,
42  pub total_buy_qty: Option<u32>,
43  pub total_sell_qty: Option<u32>,
44  pub ohlc: Option<OHLC>,
45
46  pub last_traded_timestamp: Option<Duration>,
47  pub oi: Option<u32>,
48  pub oi_day_high: Option<u32>,
49  pub oi_day_low: Option<u32>,
50  pub exchange_timestamp: Option<Duration>,
51
52  pub net_change: Option<f64>,
53  pub depth: Option<Depth>,
54}
55
56impl Tick {
57  fn set_instrument_token(&mut self, input: &[u8]) -> &mut Self {
58    self.instrument_token = value(&input[0..=3]).unwrap();
59    self.exchange = ((self.instrument_token & 0xFF) as usize).into();
60    self
61  }
62
63  fn set_change(&mut self) -> &mut Self {
64    self.net_change = self
65      .ohlc
66      .as_ref()
67      .map(|o| o.close)
68      .map(|close_price| {
69        if let Some(last_price) = self.last_price {
70          if close_price == 0_f64 {
71            return None;
72          } else {
73            Some(((last_price - close_price) * 100.0).div(close_price))
74          }
75        } else {
76          None
77        }
78      })
79      .unwrap_or_default();
80    self
81  }
82}
83
84impl From<&[u8]> for Tick {
85  fn from(input: &[u8]) -> Self {
86    let mut tick = Tick::default();
87
88    let parse_ltp = |t: &mut Tick, i: &[u8]| {
89      // 0 - 4 bytes : instrument token
90      t.set_instrument_token(i);
91      // 4 - 8 bytes : ltp
92      if let Some(bs) = i.get(4..8) {
93        t.mode = Mode::LTP;
94        t.last_price = price(bs, &t.exchange);
95      }
96    };
97
98    let parse_quote = |t: &mut Tick, i: &[u8], is_index: bool| {
99      if is_index {
100        if let Some(bs) = i.get(8..28) {
101          t.mode = Mode::Quote;
102          // 8 - 24 bytes : ohlc
103          t.ohlc = OHLC::from(&bs[0..16], &t.exchange);
104          // 24 - 28 bytes : Price change
105          // t.net_change = price(&bs[16..=19], &t.exchange);
106          t.set_change();
107        }
108      } else {
109        if let Some(bs) = i.get(8..44) {
110          t.mode = Mode::Quote;
111          // 8 - 12 bytes : last traded quantity
112          t.last_traded_qty = value(&bs[0..4]);
113          // 12 - 16 bytes : avg traded price
114          t.avg_traded_price = price(&bs[4..8], &t.exchange);
115          // 16 - 20 bytes : volume traded today
116          t.volume_traded = value(&bs[8..12]);
117          // 20 - 24 bytes : total buy quantity
118          t.total_buy_qty = value(&bs[12..16]);
119          // 24 - 28 bytes : total sell quantity
120          t.total_sell_qty = value(&bs[16..20]);
121          // 28 - 44 bytes : ohlc
122          t.ohlc = OHLC::from(&bs[20..36], &t.exchange);
123
124          t.set_change();
125        }
126      }
127    };
128
129    let parse_full = |t: &mut Tick, i: &[u8], is_index: bool| {
130      if is_index {
131        if let Some(bs) = i.get(28..32) {
132          t.mode = Mode::Full;
133          // 28 - 32 bytes : exchange time
134          t.exchange_timestamp =
135            value(bs).map(|x| Duration::from_secs(x.into()));
136        }
137      } else {
138        if let Some(bs) = i.get(44..184) {
139          t.mode = Mode::Full;
140          // 44 - 48 bytes : last traded timestamp
141          t.last_traded_timestamp =
142            value(&bs[0..4]).map(|x| Duration::from_secs(x.into()));
143
144          // 48 - 52 bytes : oi
145          t.oi = value(&bs[4..8]);
146          // 52 - 56 bytes : oi day high
147          t.oi_day_high = value(&bs[8..12]);
148          // 56 - 60 bytes : oi day low
149          t.oi_day_low = value(&bs[12..16]);
150          // 60 - 64 bytes : exchange time
151          t.exchange_timestamp =
152            value(&bs[16..20]).map(|x| Duration::from_secs(x.into()));
153          // 64 - 184 bytes : market depth
154          t.depth = Depth::from(&bs[20..140], &t.exchange);
155        }
156      }
157    };
158
159    parse_ltp(&mut tick, input);
160    if !tick.exchange.is_tradable() {
161      tick.is_index = true;
162      tick.is_tradable = false;
163
164      parse_quote(&mut tick, input, true);
165      parse_full(&mut tick, input, true);
166    } else {
167      tick.is_index = false;
168      tick.is_tradable = true;
169
170      parse_quote(&mut tick, input, false);
171      parse_full(&mut tick, input, false);
172    }
173
174    tick
175  }
176}
177
178#[derive(Debug, Clone, Default)]
179///
180/// OHLC packet structure
181///
182pub struct OHLC {
183  pub open: f64,
184  pub high: f64,
185  pub low: f64,
186  pub close: f64,
187}
188
189impl OHLC {
190  fn from(value: &[u8], exchange: &Exchange) -> Option<Self> {
191    if let Some(bs) = value.get(0..16) {
192      Some(OHLC {
193        open: price(&bs[0..=3], exchange).unwrap(),
194        high: price(&bs[4..=7], exchange).unwrap(),
195        low: price(&bs[8..=11], exchange).unwrap(),
196        close: price(&bs[12..=15], exchange).unwrap(),
197      })
198    } else {
199      None
200    }
201  }
202}
203
204#[derive(Debug, Clone, Default)]
205///
206/// Market depth packet structure
207///
208pub struct Depth {
209  pub buy: [DepthItem; 5],
210  pub sell: [DepthItem; 5],
211}
212
213impl Depth {
214  fn from(input: &[u8], exchange: &Exchange) -> Option<Self> {
215    if let Some(bs) = input.get(0..120) {
216      let parse_depth_item = |v: &[u8], start: usize| {
217        v.get(start..start + 10)
218          .and_then(|xs| DepthItem::from(xs, exchange))
219          .unwrap_or_default()
220      };
221      let mut depth = Depth::default();
222      for i in 0..5 {
223        let start = i * 12;
224        depth.buy[i] = parse_depth_item(bs, start)
225      }
226      for i in 0..5 {
227        let start = 60 + i * 12;
228        depth.sell[i] = parse_depth_item(bs, start);
229      }
230
231      Some(depth)
232    } else {
233      None
234    }
235  }
236}
237
238#[derive(Debug, Clone, Default)]
239///
240/// Structure for each market depth entry
241///
242pub struct DepthItem {
243  pub qty: u32,
244  pub price: f64,
245  pub orders: u16,
246}
247
248impl DepthItem {
249  pub fn from(input: &[u8], exchange: &Exchange) -> Option<Self> {
250    if let Some(bs) = input.get(0..10) {
251      Some(DepthItem {
252        qty: value(&bs[0..=3]).unwrap(),
253        price: price(&bs[4..=7], exchange).unwrap(),
254        orders: value_short(&bs[8..=9]).unwrap(),
255      })
256    } else {
257      None
258    }
259  }
260}
261
262#[derive(Debug, Clone)]
263///
264/// Parsed message from websocket
265///
266pub enum TickerMessage {
267  /// Quote packets for subscribed tokens
268  Ticks(Vec<TickMessage>),
269  /// Error response
270  Error(String),
271  /// Order postback
272  Order(serde_json::Value),
273  /// Messages and alerts from broker
274  Message(serde_json::Value),
275  /// Websocket closing frame
276  ClosingMessage(serde_json::Value),
277}
278
279impl From<TextMessage> for TickerMessage {
280  fn from(value: TextMessage) -> Self {
281    let message_type: TextMessageType = value.message_type.into();
282    match message_type {
283      TextMessageType::Order => Self::Order(value.data),
284      TextMessageType::Error => Self::Error(value.data.to_string()),
285      TextMessageType::Message => Self::Message(value.data),
286    }
287  }
288}
289
290#[derive(Debug, Clone, Default)]
291///
292/// Parsed quote packet
293///
294pub struct TickMessage {
295  pub instrument_token: u32,
296  pub content: Tick,
297}
298
299impl TickMessage {
300  pub(crate) fn new(instrument_token: u32, content: Tick) -> Self {
301    Self {
302      instrument_token,
303      content,
304    }
305  }
306}
307
308#[derive(Debug, Clone, Default)]
309///
310/// Exchange options
311///
312pub enum Exchange {
313  #[default]
314  NSE,
315  NFO,
316  CDS,
317  BSE,
318  BFO,
319  BCD,
320  MCX,
321  MCXSX,
322  INDICES,
323}
324
325impl Exchange {
326  fn divisor(&self) -> f64 {
327    match self {
328      Self::CDS => 100_000_0.0,
329      Self::BCD => 100_0.0,
330      _ => 100.0,
331    }
332  }
333
334  fn is_tradable(&self) -> bool {
335    match self {
336      Self::INDICES => false,
337      _ => true,
338    }
339  }
340}
341
342impl From<usize> for Exchange {
343  fn from(value: usize) -> Self {
344    match value {
345      9 => Self::INDICES,
346      8 => Self::MCXSX,
347      7 => Self::MCX,
348      6 => Self::BCD,
349      5 => Self::BFO,
350      4 => Self::BSE,
351      3 => Self::CDS,
352      2 => Self::NFO,
353      1 => Self::NSE,
354      _ => Self::NSE,
355    }
356  }
357}
358
359#[derive(
360  Debug, Clone, Deserialize, Serialize, Default, PartialEq, PartialOrd,
361)]
362#[serde(rename_all = "lowercase")]
363///
364/// Modes in which packets are streamed
365///
366pub enum Mode {
367  Full,
368  #[default]
369  Quote,
370  LTP,
371}
372
373impl TryFrom<usize> for Mode {
374  type Error = String;
375  fn try_from(value: usize) -> Result<Self, Self::Error> {
376    match value {
377      8 => Ok(Self::LTP),
378      44 => Ok(Self::Quote),
379      184 => Ok(Self::Full),
380      _ => Err(format!("Invalid packet size: {}", value)),
381    }
382  }
383}
384
385#[derive(Clone, Debug, Deserialize, Serialize)]
386#[serde(rename_all = "lowercase")]
387///
388/// Websocket request actions
389///
390enum RequestActions {
391  Subscribe,
392  Unsubscribe,
393  Mode,
394}
395
396#[derive(Clone, Debug, Deserialize, Serialize)]
397#[serde(untagged)]
398///
399/// Websocket request data
400///
401enum RequestData {
402  InstrumentTokens(Vec<u32>),
403  InstrumentTokensWithMode(Mode, Vec<u32>),
404}
405
406#[derive(Debug, Clone, Deserialize, Serialize)]
407///
408/// Websocket request structure
409///
410pub struct Request {
411  a: RequestActions,
412  v: RequestData,
413}
414
415impl Request {
416  fn new(action: RequestActions, value: RequestData) -> Request {
417    Request {
418      a: action,
419      v: value,
420    }
421  }
422
423  ///
424  /// Subscribe to a list of instrument tokens
425  ///
426  pub fn subscribe(instrument_tokens: Vec<u32>) -> Request {
427    Request::new(
428      RequestActions::Subscribe,
429      RequestData::InstrumentTokens(instrument_tokens),
430    )
431  }
432
433  ///
434  /// Subscribe to a list of instrument tokens with mode
435  ///
436  pub fn mode(mode: Mode, instrument_tokens: Vec<u32>) -> Request {
437    Request::new(
438      RequestActions::Mode,
439      RequestData::InstrumentTokensWithMode(mode, instrument_tokens),
440    )
441  }
442
443  ///
444  /// Unsubscribe from a list of instrument tokens
445  ///
446  pub fn unsubscribe(instrument_tokens: Vec<u32>) -> Request {
447    Request::new(
448      RequestActions::Unsubscribe,
449      RequestData::InstrumentTokens(instrument_tokens),
450    )
451  }
452}
453
454impl ToString for Request {
455  fn to_string(&self) -> String {
456    serde_json::to_string(self)
457      .expect("failed to serialize TickerInput to JSON")
458  }
459}
460
461#[derive(Debug, Clone)]
462///
463/// Postbacks and non-binary message types
464///
465enum TextMessageType {
466  /// Order postback
467  Order,
468  /// Error response
469  Error,
470  /// Messages and alerts from the broker
471  Message,
472}
473
474impl From<String> for TextMessageType {
475  fn from(value: String) -> Self {
476    match value.as_str() {
477      "order" => Self::Order,
478      "error" => Self::Error,
479      _ => Self::Message,
480    }
481  }
482}
483
484#[derive(Debug, Clone, Deserialize, Serialize)]
485///
486/// Postback and non-binary message structure
487///
488pub struct TextMessage {
489  #[serde(rename = "type")]
490  message_type: String,
491  data: serde_json::Value,
492}