polyfill_rs/
ws_hot_path.rs1use crate::book::OrderBookManager;
11use crate::errors::{PolyfillError, Result};
12use crate::types::{decimal_to_price, decimal_to_qty, Side};
13use rust_decimal::Decimal;
14use simd_json::prelude::*;
15use std::str::FromStr;
16
17#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
19pub struct WsBookApplyStats {
20 pub book_messages: usize,
21 pub book_levels_applied: usize,
22}
23
24pub struct WsBookUpdateProcessor {
29 buffers: simd_json::Buffers,
30 tape: Option<simd_json::Tape<'static>>,
31}
32
33impl WsBookUpdateProcessor {
34 pub fn new(input_len_hint: usize) -> Self {
38 Self {
39 buffers: simd_json::Buffers::new(input_len_hint),
40 tape: Some(simd_json::Tape::null().reset()),
42 }
43 }
44
45 pub fn process_bytes(
47 &mut self,
48 bytes: &mut [u8],
49 books: &OrderBookManager,
50 ) -> Result<WsBookApplyStats> {
51 let mut tape = self
52 .tape
53 .take()
54 .expect("WsBookUpdateProcessor tape must be present")
55 .reset();
56
57 simd_json::fill_tape(bytes, &mut self.buffers, &mut tape).map_err(|e| {
58 PolyfillError::parse("Failed to parse WebSocket JSON", Some(Box::new(e)))
59 })?;
60
61 let root = tape.as_value();
62 let stats = process_root_value(root, books)?;
63
64 self.tape = Some(tape.reset());
66 Ok(stats)
67 }
68
69 pub fn process_text(
71 &mut self,
72 text: String,
73 books: &OrderBookManager,
74 ) -> Result<WsBookApplyStats> {
75 let mut bytes = text.into_bytes();
76 self.process_bytes(bytes.as_mut_slice(), books)
77 }
78}
79
80fn process_root_value<'tape, 'input>(
81 value: simd_json::tape::Value<'tape, 'input>,
82 books: &OrderBookManager,
83) -> Result<WsBookApplyStats> {
84 if let Some(obj) = value.as_object() {
85 return process_stream_object(obj, books);
86 }
87
88 let Some(arr) = value.as_array() else {
89 return Ok(WsBookApplyStats::default());
90 };
91
92 let mut total = WsBookApplyStats::default();
93 for elem in arr.iter() {
94 let Some(obj) = elem.as_object() else {
95 continue;
96 };
97 let stats = process_stream_object(obj, books)?;
98 total.book_messages += stats.book_messages;
99 total.book_levels_applied += stats.book_levels_applied;
100 }
101
102 Ok(total)
103}
104
105fn process_stream_object<'tape, 'input>(
106 obj: simd_json::tape::Object<'tape, 'input>,
107 books: &OrderBookManager,
108) -> Result<WsBookApplyStats> {
109 let Some(event_type) = obj.get("event_type").and_then(|v| v.into_string()) else {
110 return Ok(WsBookApplyStats::default());
111 };
112
113 if event_type != "book" {
114 return Ok(WsBookApplyStats::default());
115 }
116
117 let asset_id = obj
118 .get("asset_id")
119 .and_then(|v| v.into_string())
120 .ok_or_else(|| PolyfillError::parse("Missing asset_id", None))?;
121
122 let timestamp_value = obj
123 .get("timestamp")
124 .ok_or_else(|| PolyfillError::parse("Missing timestamp", None))?;
125 let timestamp = parse_u64(timestamp_value)
126 .ok_or_else(|| PolyfillError::parse("Invalid timestamp", None))?;
127
128 let bids = obj.get("bids").and_then(|v| v.as_array());
129 let asks = obj.get("asks").and_then(|v| v.as_array());
130
131 let levels_applied = books.with_book_mut(asset_id, |book| {
132 if !book.begin_ws_book_update(asset_id, timestamp)? {
133 return Ok(0);
134 }
135
136 let mut applied = 0usize;
137 if let Some(bids) = bids {
138 applied += apply_levels(book, Side::BUY, bids)?;
139 }
140 if let Some(asks) = asks {
141 applied += apply_levels(book, Side::SELL, asks)?;
142 }
143
144 book.finish_ws_book_update();
145 Ok(applied)
146 })?;
147
148 Ok(WsBookApplyStats {
149 book_messages: 1,
150 book_levels_applied: levels_applied,
151 })
152}
153
154fn parse_u64<'tape, 'input>(value: simd_json::tape::Value<'tape, 'input>) -> Option<u64> {
155 value
156 .as_u64()
157 .or_else(|| value.into_string().and_then(|s| s.parse::<u64>().ok()))
158}
159
160fn apply_levels<'tape, 'input>(
161 book: &mut crate::book::OrderBook,
162 side: Side,
163 levels: simd_json::tape::Array<'tape, 'input>,
164) -> Result<usize> {
165 let mut applied = 0usize;
166 for level in levels.iter() {
167 let Some(obj) = level.as_object() else {
168 continue;
169 };
170
171 let price_str = obj
172 .get("price")
173 .and_then(|v| v.into_string())
174 .ok_or_else(|| PolyfillError::parse("Missing price", None))?;
175 let size_str = obj
176 .get("size")
177 .and_then(|v| v.into_string())
178 .ok_or_else(|| PolyfillError::parse("Missing size", None))?;
179
180 let price_decimal =
181 Decimal::from_str(price_str).map_err(|_| PolyfillError::validation("Invalid price"))?;
182 let size_decimal =
183 Decimal::from_str(size_str).map_err(|_| PolyfillError::validation("Invalid size"))?;
184
185 let price_ticks = decimal_to_price(price_decimal)
186 .map_err(|_| PolyfillError::validation("Invalid price"))?;
187 let size_units =
188 decimal_to_qty(size_decimal).map_err(|_| PolyfillError::validation("Invalid size"))?;
189
190 book.apply_ws_book_level_fast(side, price_ticks, size_units)?;
191 applied += 1;
192 }
193
194 Ok(applied)
195}