Skip to main content

fyers_rs/models/
ws.rs

1//! WebSocket command and event protocol models.
2
3use std::collections::HashMap;
4use std::time::Duration;
5
6use prost::Message;
7use serde::{Deserialize, Deserializer, Serialize};
8use serde_json::Value;
9
10/// Market-data socket connection/config options that affect protocol behavior.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct DataSocketConfig {
13    /// Enable lighter LTP-focused updates.
14    pub lite_mode: bool,
15    /// Enable reconnect handling in the future manager layer.
16    pub reconnect: bool,
17    /// Maximum reconnect attempts for the future manager layer.
18    pub reconnect_retry: usize,
19    /// Queue processing interval used by documented SDK configuration.
20    pub queue_process_interval: QueueProcessInterval,
21}
22
23impl Default for DataSocketConfig {
24    fn default() -> Self {
25        Self {
26            lite_mode: false,
27            reconnect: true,
28            reconnect_retry: 50,
29            queue_process_interval: QueueProcessInterval::default(),
30        }
31    }
32}
33
34/// Documented market-data queue process interval, constrained to 1ms..=2000ms.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct QueueProcessInterval(Duration);
37
38impl QueueProcessInterval {
39    /// Create a queue process interval from milliseconds.
40    pub fn from_millis(millis: u64) -> Result<Self, String> {
41        if !(1..=2000).contains(&millis) {
42            return Err("queue process interval must be between 1ms and 2000ms".to_owned());
43        }
44
45        Ok(Self(Duration::from_millis(millis)))
46    }
47
48    /// Return the duration value.
49    pub const fn as_duration(self) -> Duration {
50        self.0
51    }
52}
53
54impl Default for QueueProcessInterval {
55    fn default() -> Self {
56        Self(Duration::from_millis(1))
57    }
58}
59
60/// Data socket subscription kind.
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
62pub enum DataSubscriptionKind {
63    #[serde(rename = "SymbolUpdate")]
64    SymbolUpdate,
65    #[serde(rename = "DepthUpdate")]
66    DepthUpdate,
67}
68
69/// Data socket subscribe/unsubscribe command.
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct DataSubscribeRequest {
72    pub symbols: Vec<String>,
73    pub data_type: DataSubscriptionKind,
74}
75
76/// Data socket unsubscribe command.
77pub type DataUnsubscribeRequest = DataSubscribeRequest;
78
79/// Parsed market-data socket event.
80#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
81#[serde(tag = "event", content = "payload")]
82pub enum DataSocketEvent {
83    Connected(DataControlEvent),
84    Subscribed(DataControlEvent),
85    Unsubscribed(DataControlEvent),
86    Mode(DataControlEvent),
87    Error(DataControlEvent),
88    SymbolUpdate(SymbolUpdate),
89    IndexUpdate(IndexUpdate),
90    DepthUpdate(DepthUpdate),
91}
92
93/// Common data-socket control event.
94#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
95pub struct DataControlEvent {
96    #[serde(rename = "type")]
97    pub event_type: String,
98    pub code: i64,
99    pub message: String,
100    pub s: String,
101}
102
103/// Symbol-update event.
104///
105/// On the binary wire (live data socket) this is emitted for every per-scrip
106/// payload whose topic name starts with `sf|...`, with `event_type` set to
107/// either `"sf"` (full mode snapshot/update) or `"lit"` (lite mode). In the
108/// legacy JSON event shape parsed by [`parse_data_event`] the `type` field
109/// of the source JSON object equals `"sf"`.
110#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
111pub struct SymbolUpdate {
112    #[serde(rename = "type")]
113    pub event_type: String,
114    pub symbol: String,
115    #[serde(deserialize_with = "deserialize_f64_from_value")]
116    pub ltp: f64,
117    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
118    pub prev_close_price: Option<f64>,
119    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
120    pub high_price: Option<f64>,
121    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
122    pub low_price: Option<f64>,
123    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
124    pub open_price: Option<f64>,
125    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
126    pub ch: Option<f64>,
127    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
128    pub chp: Option<f64>,
129    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
130    pub vol_traded_today: Option<i64>,
131    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
132    pub last_traded_time: Option<i64>,
133    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
134    pub exch_feed_time: Option<i64>,
135    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
136    pub bid_size: Option<i64>,
137    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
138    pub ask_size: Option<i64>,
139    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
140    pub bid_price: Option<f64>,
141    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
142    pub ask_price: Option<f64>,
143    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
144    pub last_traded_qty: Option<i64>,
145    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
146    pub tot_buy_qty: Option<i64>,
147    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
148    pub tot_sell_qty: Option<i64>,
149    #[serde(default, deserialize_with = "deserialize_optional_f64_from_value")]
150    pub avg_trade_price: Option<f64>,
151}
152
153/// Index-update event.
154///
155/// On the binary wire this is emitted for per-scrip payloads whose topic
156/// name starts with `if|...`. In the legacy JSON shape the source
157/// object's `type` field equals `"if"`.
158#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
159pub struct IndexUpdate {
160    #[serde(rename = "type")]
161    pub event_type: String,
162    pub symbol: String,
163    #[serde(deserialize_with = "deserialize_f64_from_value")]
164    pub ltp: f64,
165    #[serde(deserialize_with = "deserialize_f64_from_value")]
166    pub prev_close_price: f64,
167    #[serde(deserialize_with = "deserialize_f64_from_value")]
168    pub high_price: f64,
169    #[serde(deserialize_with = "deserialize_f64_from_value")]
170    pub low_price: f64,
171    #[serde(deserialize_with = "deserialize_f64_from_value")]
172    pub open_price: f64,
173    #[serde(deserialize_with = "deserialize_f64_from_value")]
174    pub ch: f64,
175    #[serde(deserialize_with = "deserialize_f64_from_value")]
176    pub chp: f64,
177    #[serde(default, deserialize_with = "deserialize_optional_i64_from_value")]
178    pub exch_feed_time: Option<i64>,
179}
180
181/// Depth-update event (5-level L2 order book).
182///
183/// On the binary wire this is emitted for per-scrip payloads whose topic
184/// name starts with `dp|...`. In the legacy JSON shape the source
185/// object's `type` field equals `"dp"`. Depth feeds require depth
186/// subscriptions ([`DataSubscriptionKind::DepthUpdate`]) and aren't
187/// emitted for index symbols.
188#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
189pub struct DepthUpdate {
190    #[serde(rename = "type")]
191    pub event_type: String,
192    pub symbol: String,
193    #[serde(deserialize_with = "deserialize_f64_from_value")]
194    pub bid_price1: f64,
195    #[serde(deserialize_with = "deserialize_f64_from_value")]
196    pub bid_price2: f64,
197    #[serde(deserialize_with = "deserialize_f64_from_value")]
198    pub bid_price3: f64,
199    #[serde(deserialize_with = "deserialize_f64_from_value")]
200    pub bid_price4: f64,
201    #[serde(deserialize_with = "deserialize_f64_from_value")]
202    pub bid_price5: f64,
203    #[serde(deserialize_with = "deserialize_f64_from_value")]
204    pub ask_price1: f64,
205    #[serde(deserialize_with = "deserialize_f64_from_value")]
206    pub ask_price2: f64,
207    #[serde(deserialize_with = "deserialize_f64_from_value")]
208    pub ask_price3: f64,
209    #[serde(deserialize_with = "deserialize_f64_from_value")]
210    pub ask_price4: f64,
211    #[serde(deserialize_with = "deserialize_f64_from_value")]
212    pub ask_price5: f64,
213    #[serde(deserialize_with = "deserialize_i64_from_value")]
214    pub bid_size1: i64,
215    #[serde(deserialize_with = "deserialize_i64_from_value")]
216    pub bid_size2: i64,
217    #[serde(deserialize_with = "deserialize_i64_from_value")]
218    pub bid_size3: i64,
219    #[serde(deserialize_with = "deserialize_i64_from_value")]
220    pub bid_size4: i64,
221    #[serde(deserialize_with = "deserialize_i64_from_value")]
222    pub bid_size5: i64,
223    #[serde(deserialize_with = "deserialize_i64_from_value")]
224    pub ask_size1: i64,
225    #[serde(deserialize_with = "deserialize_i64_from_value")]
226    pub ask_size2: i64,
227    #[serde(deserialize_with = "deserialize_i64_from_value")]
228    pub ask_size3: i64,
229    #[serde(deserialize_with = "deserialize_i64_from_value")]
230    pub ask_size4: i64,
231    #[serde(deserialize_with = "deserialize_i64_from_value")]
232    pub ask_size5: i64,
233    #[serde(deserialize_with = "deserialize_i64_from_value")]
234    pub bid_order1: i64,
235    #[serde(deserialize_with = "deserialize_i64_from_value")]
236    pub bid_order2: i64,
237    #[serde(deserialize_with = "deserialize_i64_from_value")]
238    pub bid_order3: i64,
239    #[serde(deserialize_with = "deserialize_i64_from_value")]
240    pub bid_order4: i64,
241    #[serde(deserialize_with = "deserialize_i64_from_value")]
242    pub bid_order5: i64,
243    #[serde(deserialize_with = "deserialize_i64_from_value")]
244    pub ask_order1: i64,
245    #[serde(deserialize_with = "deserialize_i64_from_value")]
246    pub ask_order2: i64,
247    #[serde(deserialize_with = "deserialize_i64_from_value")]
248    pub ask_order3: i64,
249    #[serde(deserialize_with = "deserialize_i64_from_value")]
250    pub ask_order4: i64,
251    #[serde(deserialize_with = "deserialize_i64_from_value")]
252    pub ask_order5: i64,
253}
254
255/// Parse a JSON-shaped data-socket event.
256///
257/// **Note:** the live Fyers V3 data socket at `wss://socket.fyers.in/hsm/v1-5/prod`
258/// does not use JSON on the wire — it uses a length-prefixed binary
259/// envelope decoded by [`crate::ws::data_protocol::parse_envelope`] and
260/// [`crate::ws::data_protocol::parse_datafeed`]. This function exists for
261/// the legacy JSON shape documented in older Fyers materials, and is kept
262/// for compatibility with any tooling that emits or stores events in that
263/// format. The runtime data-socket path in [`crate::ws::DataSocketConnection`]
264/// does **not** call this function.
265pub fn parse_data_event(input: &str) -> Result<DataSocketEvent, String> {
266    let value: serde_json::Value =
267        serde_json::from_str(input).map_err(|err| format!("invalid data event JSON: {err}"))?;
268    let event_type = value
269        .get("type")
270        .and_then(serde_json::Value::as_str)
271        .ok_or_else(|| "data event is missing string type".to_owned())?;
272
273    match event_type {
274        "cn" => from_value(value, DataSocketEvent::Connected),
275        "sub" => from_value(value, DataSocketEvent::Subscribed),
276        "unsub" => from_value(value, DataSocketEvent::Unsubscribed),
277        "lit" | "ful" => from_value(value, DataSocketEvent::Mode),
278        "error" => from_value(value, DataSocketEvent::Error),
279        "sf" => from_value(value, DataSocketEvent::SymbolUpdate),
280        "if" => from_value(value, DataSocketEvent::IndexUpdate),
281        "dp" => from_value(value, DataSocketEvent::DepthUpdate),
282        other => Err(format!("unknown data event type: {other}")),
283    }
284}
285
286/// Order socket connection/config options that affect protocol behavior.
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct OrderSocketConfig {
289    pub reconnect: bool,
290    pub reconnect_retry: usize,
291    pub ping_interval: Duration,
292}
293
294impl Default for OrderSocketConfig {
295    fn default() -> Self {
296        Self {
297            reconnect: true,
298            reconnect_retry: 50,
299            ping_interval: Duration::from_secs(10),
300        }
301    }
302}
303
304/// Order-socket subscribe/unsubscribe command.
305#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
306pub struct OrderSubscribeRequest {
307    #[serde(rename = "T")]
308    pub command_type: String,
309    #[serde(rename = "SLIST")]
310    pub actions: Vec<String>,
311    #[serde(rename = "SUB_T")]
312    pub subscribe_type: i64,
313}
314
315impl OrderSubscribeRequest {
316    pub fn subscribe(actions: Vec<String>) -> Self {
317        Self {
318            command_type: "SUB_ORD".to_owned(),
319            actions,
320            subscribe_type: 1,
321        }
322    }
323
324    pub fn unsubscribe(actions: Vec<String>) -> Self {
325        Self {
326            command_type: "SUB_ORD".to_owned(),
327            actions,
328            subscribe_type: -1,
329        }
330    }
331}
332
333/// Parsed order socket event.
334#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
335#[serde(tag = "event", content = "payload")]
336pub enum OrderSocketEvent {
337    Order(OrderUpdate),
338    Trade(TradeUpdate),
339    Position(PositionUpdate),
340    General(GeneralUpdate),
341    Edis(EdisUpdate),
342    PriceAlert(PriceAlertUpdate),
343    Subscribed(OrderControlEvent),
344    Unsubscribed(OrderControlEvent),
345    Error(OrderControlEvent),
346    Closed(OrderControlEvent),
347}
348
349#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
350pub struct OrderUpdate {
351    pub s: String,
352    pub orders: OrderSocketOrder,
353}
354
355#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
356pub struct TradeUpdate {
357    pub s: String,
358    pub trades: OrderSocketTrade,
359}
360
361#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
362pub struct PositionUpdate {
363    pub s: String,
364    pub positions: OrderSocketPosition,
365}
366
367/// Documented order WebSocket order update payload.
368#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
369pub struct OrderSocketOrder {
370    #[serde(default, rename = "clientId", alias = "client_id")]
371    pub client_id: Option<String>,
372    #[serde(default)]
373    pub id: Option<String>,
374    #[serde(default, rename = "exchOrdId", alias = "id_exchange")]
375    pub exch_ord_id: Option<String>,
376    #[serde(default)]
377    pub symbol: Option<String>,
378    #[serde(default)]
379    pub qty: Option<i64>,
380    #[serde(default, rename = "remainingQuantity", alias = "qty_remaining")]
381    pub remaining_quantity: Option<i64>,
382    #[serde(default, rename = "filledQty", alias = "qty_filled")]
383    pub filled_qty: Option<i64>,
384    #[serde(default, alias = "org_ord_status")]
385    pub status: Option<i64>,
386    #[serde(default, alias = "oms_msg", alias = "status_msg")]
387    pub message: Option<String>,
388    #[serde(default)]
389    pub segment: Option<i64>,
390    #[serde(default, rename = "limitPrice", alias = "price_limit")]
391    pub limit_price: Option<f64>,
392    #[serde(default, rename = "stopPrice", alias = "price_stop")]
393    pub stop_price: Option<f64>,
394    #[serde(default, rename = "productType", alias = "product_type")]
395    pub product_type: Option<String>,
396    #[serde(default, rename = "type", alias = "ord_type")]
397    pub order_type: Option<i64>,
398    #[serde(default, alias = "tran_side")]
399    pub side: Option<i64>,
400    #[serde(default, rename = "orderValidity", alias = "validity")]
401    pub order_validity: Option<String>,
402    #[serde(default, rename = "orderDateTime", alias = "time_oms")]
403    pub order_date_time: Option<String>,
404    #[serde(default, rename = "parentId", alias = "id_parent")]
405    pub parent_id: Option<String>,
406    #[serde(default, rename = "tradedPrice", alias = "price_traded")]
407    pub traded_price: Option<f64>,
408    #[serde(default, alias = "ord_source")]
409    pub source: Option<String>,
410    #[serde(default, rename = "fyToken", alias = "fy_token", alias = "fytoken")]
411    pub fy_token: Option<String>,
412    #[serde(default, rename = "offlineOrder", alias = "offline_flag")]
413    pub offline_order: Option<bool>,
414    #[serde(default)]
415    pub pan: Option<String>,
416    #[serde(default)]
417    pub exchange: Option<i64>,
418    #[serde(default)]
419    pub instrument: Option<i64>,
420    #[serde(default)]
421    pub id_fyers: Option<String>,
422    #[serde(default, alias = "symbol_exch")]
423    pub ex_sym: Option<String>,
424    #[serde(default, alias = "symbol_desc")]
425    pub description: Option<String>,
426    #[serde(default, rename = "orderNumStatus")]
427    pub order_num_status: Option<String>,
428}
429
430/// Documented order WebSocket trade update payload.
431#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
432pub struct OrderSocketTrade {
433    #[serde(default)]
434    pub symbol: Option<String>,
435    #[serde(default)]
436    pub id: Option<String>,
437    #[serde(default, rename = "orderDateTime", alias = "fill_time")]
438    pub order_date_time: Option<String>,
439    #[serde(default, rename = "orderNumber")]
440    pub order_number: Option<String>,
441    #[serde(default, rename = "tradeNumber", alias = "id_fill")]
442    pub trade_number: Option<String>,
443    #[serde(default, rename = "tradePrice", alias = "price_traded")]
444    pub trade_price: Option<f64>,
445    #[serde(default, rename = "tradeValue", alias = "traded_val")]
446    pub trade_value: Option<f64>,
447    #[serde(default, rename = "tradedQty", alias = "qty_traded")]
448    pub traded_qty: Option<i64>,
449    #[serde(default, alias = "tran_side")]
450    pub side: Option<i64>,
451    #[serde(default, rename = "productType", alias = "product_type")]
452    pub product_type: Option<String>,
453    #[serde(default, rename = "exchangeOrderNo", alias = "id_exchange")]
454    pub exchange_order_no: Option<String>,
455    #[serde(default)]
456    pub segment: Option<i64>,
457    #[serde(default)]
458    pub exchange: Option<i64>,
459    #[serde(default, rename = "fyToken", alias = "fy_token", alias = "fytoken")]
460    pub fy_token: Option<String>,
461    #[serde(default)]
462    pub id_fyers: Option<String>,
463    #[serde(default, rename = "orderType", alias = "ord_type")]
464    pub order_type: Option<i64>,
465    #[serde(default, rename = "clientId", alias = "client_id")]
466    pub client_id: Option<String>,
467}
468
469/// Documented order WebSocket position update payload.
470#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
471pub struct OrderSocketPosition {
472    #[serde(default)]
473    pub symbol: Option<String>,
474    #[serde(default)]
475    pub id: Option<String>,
476    #[serde(default, rename = "buyAvg", alias = "buy_avg")]
477    pub buy_avg: Option<f64>,
478    #[serde(default, rename = "buyQty", alias = "buy_qty")]
479    pub buy_qty: Option<i64>,
480    #[serde(default, rename = "buyVal", alias = "buy_val")]
481    pub buy_val: Option<f64>,
482    #[serde(default, rename = "sellAvg", alias = "sell_avg")]
483    pub sell_avg: Option<f64>,
484    #[serde(default, rename = "sellQty", alias = "sell_qty")]
485    pub sell_qty: Option<i64>,
486    #[serde(default, rename = "sellVal", alias = "sell_val")]
487    pub sell_val: Option<f64>,
488    #[serde(default, rename = "netAvg", alias = "net_avg")]
489    pub net_avg: Option<f64>,
490    #[serde(default, rename = "netQty", alias = "net_qty")]
491    pub net_qty: Option<i64>,
492    #[serde(default, alias = "tran_side")]
493    pub side: Option<i64>,
494    #[serde(default)]
495    pub qty: Option<i64>,
496    #[serde(default, rename = "productType", alias = "product_type")]
497    pub product_type: Option<String>,
498    #[serde(default, alias = "pl_realized")]
499    pub realized_profit: Option<f64>,
500    #[serde(default, rename = "crossCurrency", alias = "cross_curr_flag")]
501    pub cross_currency: Option<String>,
502    #[serde(default, rename = "rbiRefRate", alias = "rbirefrate")]
503    pub rbi_ref_rate: Option<f64>,
504    #[serde(default, rename = "qtyMulti_com", alias = "qty_multiplier")]
505    pub qty_multi_com: Option<f64>,
506    #[serde(default)]
507    pub segment: Option<i64>,
508    #[serde(default)]
509    pub exchange: Option<i64>,
510    #[serde(default, rename = "slNo")]
511    pub sl_no: Option<i64>,
512    #[serde(default, rename = "fyToken", alias = "fy_token", alias = "fytoken")]
513    pub fy_token: Option<String>,
514    #[serde(default, rename = "cfBuyQty", alias = "cf_buy_qty")]
515    pub cf_buy_qty: Option<i64>,
516    #[serde(default, rename = "cfSellQty", alias = "cf_sell_qty")]
517    pub cf_sell_qty: Option<i64>,
518    #[serde(default, rename = "dayBuyQty", alias = "day_buy_qty")]
519    pub day_buy_qty: Option<i64>,
520    #[serde(default, rename = "daySellQty", alias = "day_sell_qty")]
521    pub day_sell_qty: Option<i64>,
522    #[serde(default, alias = "pl_total")]
523    pub pl: Option<f64>,
524    #[serde(default, alias = "pl_unrealized")]
525    pub unrealized_profit: Option<f64>,
526}
527
528#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
529pub struct GeneralUpdate {
530    pub s: String,
531    #[serde(flatten)]
532    pub data: serde_json::Map<String, serde_json::Value>,
533}
534
535#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
536pub struct EdisUpdate {
537    pub s: String,
538    pub edis: serde_json::Value,
539}
540
541#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
542pub struct PriceAlertUpdate {
543    pub s: String,
544    pub pricealerts: serde_json::Value,
545}
546
547#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
548pub struct OrderControlEvent {
549    pub s: String,
550    pub code: i64,
551    pub message: String,
552}
553
554/// Parse an order-socket text frame into a typed event.
555pub fn parse_order_event(input: &str) -> Result<OrderSocketEvent, String> {
556    let value: serde_json::Value =
557        serde_json::from_str(input).map_err(|err| format!("invalid order event JSON: {err}"))?;
558
559    if value.get("orders").is_some() {
560        return from_value(value, OrderSocketEvent::Order);
561    }
562    if value.get("trades").is_some() {
563        return from_value(value, OrderSocketEvent::Trade);
564    }
565    if value.get("positions").is_some() {
566        return from_value(value, OrderSocketEvent::Position);
567    }
568    if value.get("edis").is_some() {
569        return from_value(value, OrderSocketEvent::Edis);
570    }
571    if value.get("pricealerts").is_some() {
572        return from_value(value, OrderSocketEvent::PriceAlert);
573    }
574
575    let code = value
576        .get("code")
577        .and_then(serde_json::Value::as_i64)
578        .unwrap_or_default();
579    let message = value
580        .get("message")
581        .and_then(serde_json::Value::as_str)
582        .unwrap_or_default()
583        .to_ascii_lowercase();
584
585    if code == 1606 || message.contains("unsubscribed") {
586        return from_value(value, OrderSocketEvent::Unsubscribed);
587    }
588    if code == 1605 || message.contains("subscribed") {
589        return from_value(value, OrderSocketEvent::Subscribed);
590    }
591    if value.get("code").is_some() && message.contains("error") {
592        return from_value(value, OrderSocketEvent::Error);
593    }
594    if message.contains("closed") {
595        return from_value(value, OrderSocketEvent::Closed);
596    }
597
598    from_value(value, OrderSocketEvent::General)
599}
600
601/// TBT/depth socket connection/config options that affect protocol behavior.
602#[derive(Debug, Clone, PartialEq, Eq)]
603pub struct TbtSocketConfig {
604    pub reconnect: bool,
605    pub reconnect_retry: usize,
606    pub diff_only: bool,
607    pub ping_interval: Duration,
608}
609
610impl Default for TbtSocketConfig {
611    fn default() -> Self {
612        Self {
613            reconnect: true,
614            reconnect_retry: 50,
615            diff_only: false,
616            ping_interval: Duration::from_secs(30),
617        }
618    }
619}
620
621/// TBT subscription command.
622#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
623pub struct TbtSubscribeRequest {
624    #[serde(rename = "type")]
625    pub request_type: i64,
626    pub data: TbtSubscribeData,
627}
628
629/// TBT subscription command body.
630#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
631pub struct TbtSubscribeData {
632    pub subs: i64,
633    pub symbols: Vec<String>,
634    pub mode: String,
635    pub channel: String,
636}
637
638/// TBT switch-channel command.
639#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
640pub struct TbtSwitchChannelRequest {
641    #[serde(rename = "type")]
642    pub request_type: i64,
643    pub data: TbtSwitchChannelData,
644}
645
646/// TBT switch-channel command body.
647#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
648pub struct TbtSwitchChannelData {
649    #[serde(rename = "resumeChannels", alias = "resume_channels")]
650    pub resume_channels: Vec<String>,
651    #[serde(rename = "pauseChannels", alias = "pause_channels")]
652    pub pause_channels: Vec<String>,
653}
654
655/// Parsed TBT binary event.
656#[derive(Debug, Clone, PartialEq)]
657pub enum TbtEvent {
658    SocketMessage(SocketMessage),
659    Error { msg: String },
660}
661
662/// Parse a TBT protobuf frame.
663pub fn parse_tbt_event(input: &[u8]) -> Result<TbtEvent, String> {
664    let message =
665        SocketMessage::decode(input).map_err(|err| format!("invalid TBT protobuf frame: {err}"))?;
666    if message.error {
667        return Ok(TbtEvent::Error { msg: message.msg });
668    }
669
670    Ok(TbtEvent::SocketMessage(message))
671}
672
673/// Protobuf enum from the documented TBT `msg.proto`.
674#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, prost::Enumeration)]
675#[repr(i32)]
676pub enum MessageType {
677    Ping = 0,
678    Quote = 1,
679    ExtendedQuote = 2,
680    DailyQuote = 3,
681    MarketLevel = 4,
682    Ohlcv = 5,
683    Depth = 6,
684    All = 7,
685    Response = 8,
686}
687
688/// Protobuf `MarketLevel`.
689#[derive(Clone, PartialEq, Message)]
690pub struct Int64Value {
691    #[prost(int64, tag = "1")]
692    pub value: i64,
693}
694
695/// Protobuf wrapper for `uint32` fields used by the documented TBT schema.
696#[derive(Clone, PartialEq, Message)]
697pub struct UInt32Value {
698    #[prost(uint32, tag = "1")]
699    pub value: u32,
700}
701
702/// Protobuf wrapper for `uint64` fields used by the documented TBT schema.
703#[derive(Clone, PartialEq, Message)]
704pub struct UInt64Value {
705    #[prost(uint64, tag = "1")]
706    pub value: u64,
707}
708
709/// Protobuf `MarketLevel`.
710#[derive(Clone, PartialEq, Message)]
711pub struct MarketLevel {
712    #[prost(message, optional, tag = "1")]
713    pub price: Option<Int64Value>,
714    #[prost(message, optional, tag = "2")]
715    pub qty: Option<UInt32Value>,
716    #[prost(message, optional, tag = "3")]
717    pub nord: Option<UInt32Value>,
718    #[prost(message, optional, tag = "4")]
719    pub num: Option<UInt32Value>,
720}
721
722/// Protobuf `Depth`.
723#[derive(Clone, PartialEq, Message)]
724pub struct TbtDepth {
725    #[prost(message, optional, tag = "1")]
726    pub tbq: Option<UInt64Value>,
727    #[prost(message, optional, tag = "2")]
728    pub tsq: Option<UInt64Value>,
729    #[prost(message, repeated, tag = "3")]
730    pub asks: Vec<MarketLevel>,
731    #[prost(message, repeated, tag = "4")]
732    pub bids: Vec<MarketLevel>,
733}
734
735/// Protobuf `Quote`.
736#[derive(Clone, PartialEq, Message)]
737pub struct Quote {
738    #[prost(message, optional, tag = "1")]
739    pub ltp: Option<Int64Value>,
740    #[prost(message, optional, tag = "2")]
741    pub ltt: Option<UInt32Value>,
742    #[prost(message, optional, tag = "3")]
743    pub ltq: Option<UInt32Value>,
744    #[prost(message, optional, tag = "4")]
745    pub vtt: Option<UInt64Value>,
746    #[prost(message, optional, tag = "5")]
747    pub vtt_diff: Option<UInt64Value>,
748    #[prost(message, optional, tag = "6")]
749    pub oi: Option<UInt64Value>,
750    #[prost(message, optional, tag = "7")]
751    pub ltpc: Option<Int64Value>,
752}
753
754/// Protobuf `ExtendedQuote`.
755#[derive(Clone, PartialEq, Message)]
756pub struct ExtendedQuote {
757    #[prost(message, optional, tag = "1")]
758    pub atp: Option<Int64Value>,
759    #[prost(message, optional, tag = "2")]
760    pub cp: Option<Int64Value>,
761    #[prost(message, optional, tag = "3")]
762    pub lc: Option<UInt32Value>,
763    #[prost(message, optional, tag = "4")]
764    pub uc: Option<UInt32Value>,
765    #[prost(message, optional, tag = "5")]
766    pub yh: Option<Int64Value>,
767    #[prost(message, optional, tag = "6")]
768    pub yl: Option<Int64Value>,
769    #[prost(message, optional, tag = "7")]
770    pub poi: Option<UInt64Value>,
771    #[prost(message, optional, tag = "8")]
772    pub oich: Option<Int64Value>,
773    #[prost(message, optional, tag = "9")]
774    pub pc: Option<UInt32Value>,
775}
776
777/// Protobuf `DailyQuote`.
778#[derive(Clone, PartialEq, Message)]
779pub struct DailyQuote {
780    #[prost(message, optional, tag = "1")]
781    pub day_open: Option<Int64Value>,
782    #[prost(message, optional, tag = "2")]
783    pub day_high: Option<Int64Value>,
784    #[prost(message, optional, tag = "3")]
785    pub day_low: Option<Int64Value>,
786    #[prost(message, optional, tag = "4")]
787    pub day_close: Option<Int64Value>,
788    #[prost(message, optional, tag = "5")]
789    pub dhoi: Option<UInt64Value>,
790    #[prost(message, optional, tag = "6")]
791    pub dloi: Option<UInt64Value>,
792}
793
794/// Protobuf `OHLCV`.
795#[derive(Clone, PartialEq, Message)]
796pub struct Ohlcv {
797    #[prost(message, optional, tag = "1")]
798    pub open: Option<Int64Value>,
799    #[prost(message, optional, tag = "2")]
800    pub high: Option<Int64Value>,
801    #[prost(message, optional, tag = "3")]
802    pub low: Option<Int64Value>,
803    #[prost(message, optional, tag = "4")]
804    pub close: Option<Int64Value>,
805    #[prost(message, optional, tag = "5")]
806    pub volume: Option<UInt32Value>,
807    #[prost(message, optional, tag = "6")]
808    pub epoch: Option<UInt32Value>,
809}
810
811/// Protobuf `SymDetail`.
812#[derive(Clone, PartialEq, Message)]
813pub struct SymDetail {
814    #[prost(string, tag = "1")]
815    pub ticksize: String,
816}
817
818/// Protobuf `MarketFeed`.
819#[derive(Clone, PartialEq, Message)]
820pub struct MarketFeed {
821    #[prost(message, optional, tag = "1")]
822    pub quote: Option<Quote>,
823    #[prost(message, optional, tag = "2")]
824    pub eq: Option<ExtendedQuote>,
825    #[prost(message, optional, tag = "3")]
826    pub dq: Option<DailyQuote>,
827    #[prost(message, optional, tag = "4")]
828    pub ohlcv: Option<Ohlcv>,
829    #[prost(message, optional, tag = "5")]
830    pub depth: Option<TbtDepth>,
831    #[prost(message, optional, tag = "6")]
832    pub feed_time: Option<UInt64Value>,
833    #[prost(message, optional, tag = "7")]
834    pub send_time: Option<UInt64Value>,
835    #[prost(string, tag = "8")]
836    pub token: String,
837    #[prost(uint64, tag = "9")]
838    pub sequence_no: u64,
839    #[prost(bool, tag = "10")]
840    pub snapshot: bool,
841    #[prost(string, tag = "11")]
842    pub ticker: String,
843    #[prost(message, optional, tag = "12")]
844    pub symdetail: Option<SymDetail>,
845}
846
847/// Protobuf `SocketMessage`.
848#[derive(Clone, PartialEq, Message)]
849pub struct SocketMessage {
850    #[prost(enumeration = "MessageType", tag = "1")]
851    pub message_type: i32,
852    #[prost(map = "string, message", tag = "2")]
853    pub feeds: HashMap<String, MarketFeed>,
854    #[prost(bool, tag = "3")]
855    pub snapshot: bool,
856    #[prost(string, tag = "4")]
857    pub msg: String,
858    #[prost(bool, tag = "5")]
859    pub error: bool,
860}
861
862fn deserialize_f64_from_value<'de, D>(deserializer: D) -> std::result::Result<f64, D::Error>
863where
864    D: Deserializer<'de>,
865{
866    match Value::deserialize(deserializer)? {
867        Value::Number(value) => value
868            .as_f64()
869            .ok_or_else(|| serde::de::Error::custom("number is not representable as f64")),
870        Value::String(value) => value.parse::<f64>().map_err(serde::de::Error::custom),
871        other => Err(serde::de::Error::custom(format!(
872            "expected string or number, got {other}"
873        ))),
874    }
875}
876
877fn deserialize_i64_from_value<'de, D>(deserializer: D) -> std::result::Result<i64, D::Error>
878where
879    D: Deserializer<'de>,
880{
881    match Value::deserialize(deserializer)? {
882        Value::Number(value) => value
883            .as_i64()
884            .or_else(|| value.as_u64().and_then(|value| i64::try_from(value).ok()))
885            .ok_or_else(|| serde::de::Error::custom("number is not representable as i64")),
886        Value::String(value) => value.parse::<i64>().map_err(serde::de::Error::custom),
887        other => Err(serde::de::Error::custom(format!(
888            "expected string or number, got {other}"
889        ))),
890    }
891}
892
893fn deserialize_optional_f64_from_value<'de, D>(
894    deserializer: D,
895) -> std::result::Result<Option<f64>, D::Error>
896where
897    D: Deserializer<'de>,
898{
899    match Option::<Value>::deserialize(deserializer)? {
900        None | Some(Value::Null) => Ok(None),
901        Some(Value::Number(value)) => value
902            .as_f64()
903            .map(Some)
904            .ok_or_else(|| serde::de::Error::custom("number is not representable as f64")),
905        Some(Value::String(value)) => value
906            .parse::<f64>()
907            .map(Some)
908            .map_err(serde::de::Error::custom),
909        Some(other) => Err(serde::de::Error::custom(format!(
910            "expected string, number, or null, got {other}"
911        ))),
912    }
913}
914
915fn deserialize_optional_i64_from_value<'de, D>(
916    deserializer: D,
917) -> std::result::Result<Option<i64>, D::Error>
918where
919    D: Deserializer<'de>,
920{
921    match Option::<Value>::deserialize(deserializer)? {
922        None | Some(Value::Null) => Ok(None),
923        Some(Value::Number(value)) => value
924            .as_i64()
925            .or_else(|| value.as_u64().and_then(|value| i64::try_from(value).ok()))
926            .map(Some)
927            .ok_or_else(|| serde::de::Error::custom("number is not representable as i64")),
928        Some(Value::String(value)) => value
929            .parse::<i64>()
930            .map(Some)
931            .map_err(serde::de::Error::custom),
932        Some(other) => Err(serde::de::Error::custom(format!(
933            "expected string, number, or null, got {other}"
934        ))),
935    }
936}
937
938fn from_value<T, U>(value: serde_json::Value, wrap: impl FnOnce(T) -> U) -> Result<U, String>
939where
940    T: serde::de::DeserializeOwned,
941{
942    serde_json::from_value(value)
943        .map(wrap)
944        .map_err(|err| format!("failed to decode WebSocket event: {err}"))
945}