1use crate::errors::{PolyfillError, Result};
7use crate::types::*;
8use alloy_primitives::{Address, U256};
9use chrono::{DateTime, Utc};
10use rust_decimal::Decimal;
11use serde::{Deserialize, Deserializer};
12use serde_json::Value;
13use std::str::FromStr;
14
15pub mod deserializers {
17 use super::*;
18 use std::fmt::Display;
19
20 pub fn number_from_string<'de, T, D>(deserializer: D) -> std::result::Result<T, D::Error>
22 where
23 D: Deserializer<'de>,
24 T: FromStr + serde::Deserialize<'de> + Clone,
25 <T as FromStr>::Err: Display,
26 {
27 let value = serde_json::Value::deserialize(deserializer)?;
28 match value {
29 serde_json::Value::Number(n) => {
30 if let Some(v) = n.as_u64() {
31 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
32 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
33 } else if let Some(v) = n.as_f64() {
34 T::deserialize(serde_json::Value::Number(
35 serde_json::Number::from_f64(v).unwrap(),
36 ))
37 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
38 } else {
39 Err(serde::de::Error::custom("Invalid number format"))
40 }
41 },
42 serde_json::Value::String(s) => s.parse::<T>().map_err(serde::de::Error::custom),
43 _ => Err(serde::de::Error::custom("Expected number or string")),
44 }
45 }
46
47 pub fn optional_number_from_string<'de, T, D>(
49 deserializer: D,
50 ) -> std::result::Result<Option<T>, D::Error>
51 where
52 D: Deserializer<'de>,
53 T: FromStr + serde::Deserialize<'de> + Clone,
54 <T as FromStr>::Err: Display,
55 {
56 let value = serde_json::Value::deserialize(deserializer)?;
57 match value {
58 serde_json::Value::Null => Ok(None),
59 serde_json::Value::Number(n) => {
60 if let Some(v) = n.as_u64() {
61 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
62 .map(Some)
63 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
64 } else if let Some(v) = n.as_f64() {
65 T::deserialize(serde_json::Value::Number(
66 serde_json::Number::from_f64(v).unwrap(),
67 ))
68 .map(Some)
69 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
70 } else {
71 Err(serde::de::Error::custom("Invalid number format"))
72 }
73 },
74 serde_json::Value::String(s) => {
75 if s.is_empty() {
76 Ok(None)
77 } else {
78 s.parse::<T>().map(Some).map_err(serde::de::Error::custom)
79 }
80 },
81 _ => Err(serde::de::Error::custom("Expected number, string, or null")),
82 }
83 }
84
85 pub fn datetime_from_timestamp<'de, D>(
87 deserializer: D,
88 ) -> std::result::Result<DateTime<Utc>, D::Error>
89 where
90 D: Deserializer<'de>,
91 {
92 let timestamp = number_from_string::<u64, D>(deserializer)?;
93 DateTime::from_timestamp(timestamp as i64, 0)
94 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp"))
95 }
96
97 pub fn optional_datetime_from_timestamp<'de, D>(
99 deserializer: D,
100 ) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
101 where
102 D: Deserializer<'de>,
103 {
104 match optional_number_from_string::<u64, D>(deserializer)? {
105 Some(timestamp) => DateTime::from_timestamp(timestamp as i64, 0)
106 .map(Some)
107 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp")),
108 None => Ok(None),
109 }
110 }
111}
112
113#[derive(Debug, Deserialize)]
115pub struct RawOrderBookResponse {
116 pub market: String,
117 pub asset_id: String,
118 pub hash: String,
119 #[serde(deserialize_with = "deserializers::number_from_string")]
120 pub timestamp: u64,
121 pub bids: Vec<RawBookLevel>,
122 pub asks: Vec<RawBookLevel>,
123}
124
125#[derive(Debug, Deserialize)]
126pub struct RawBookLevel {
127 #[serde(with = "rust_decimal::serde::str")]
128 pub price: Decimal,
129 #[serde(with = "rust_decimal::serde::str")]
130 pub size: Decimal,
131}
132
133#[derive(Debug, Deserialize)]
134pub struct RawOrderResponse {
135 pub id: String,
136 pub status: String,
137 pub market: String,
138 pub asset_id: String,
139 pub maker_address: String,
140 pub owner: String,
141 pub outcome: String,
142 #[serde(rename = "type")]
143 pub order_type: OrderType,
144 pub side: Side,
145 #[serde(with = "rust_decimal::serde::str")]
146 pub original_size: Decimal,
147 #[serde(with = "rust_decimal::serde::str")]
148 pub price: Decimal,
149 #[serde(with = "rust_decimal::serde::str")]
150 pub size_matched: Decimal,
151 #[serde(deserialize_with = "deserializers::number_from_string")]
152 pub expiration: u64,
153 #[serde(deserialize_with = "deserializers::number_from_string")]
154 pub created_at: u64,
155}
156
157#[derive(Debug, Deserialize)]
158pub struct RawTradeResponse {
159 pub id: String,
160 pub market: String,
161 pub asset_id: String,
162 pub side: Side,
163 #[serde(with = "rust_decimal::serde::str")]
164 pub price: Decimal,
165 #[serde(with = "rust_decimal::serde::str")]
166 pub size: Decimal,
167 pub maker_address: String,
168 pub taker_address: String,
169 #[serde(deserialize_with = "deserializers::number_from_string")]
170 pub timestamp: u64,
171}
172
173#[derive(Debug, Deserialize)]
174pub struct RawMarketResponse {
175 pub condition_id: String,
176 pub tokens: [RawToken; 2],
177 pub active: bool,
178 pub closed: bool,
179 pub question: String,
180 pub description: String,
181 pub category: Option<String>,
182 pub end_date_iso: Option<String>,
183 #[serde(with = "rust_decimal::serde::str")]
184 pub minimum_order_size: Decimal,
185 #[serde(with = "rust_decimal::serde::str")]
186 pub minimum_tick_size: Decimal,
187}
188
189#[derive(Debug, Deserialize)]
190pub struct RawToken {
191 pub token_id: String,
192 pub outcome: String,
193}
194
195pub trait Decoder<T> {
197 fn decode(&self) -> Result<T>;
198}
199
200impl Decoder<OrderBook> for RawOrderBookResponse {
201 fn decode(&self) -> Result<OrderBook> {
202 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
203 .ok_or_else(|| PolyfillError::parse("Invalid timestamp".to_string(), None))?;
204
205 let bids = self
206 .bids
207 .iter()
208 .map(|level| BookLevel {
209 price: level.price,
210 size: level.size,
211 })
212 .collect();
213
214 let asks = self
215 .asks
216 .iter()
217 .map(|level| BookLevel {
218 price: level.price,
219 size: level.size,
220 })
221 .collect();
222
223 Ok(OrderBook {
224 token_id: self.asset_id.clone(),
225 timestamp,
226 bids,
227 asks,
228 sequence: 0, })
230 }
231}
232
233impl Decoder<Order> for RawOrderResponse {
234 fn decode(&self) -> Result<Order> {
235 let status = match self.status.as_str() {
236 "LIVE" => OrderStatus::Live,
237 "CANCELLED" => OrderStatus::Cancelled,
238 "FILLED" => OrderStatus::Filled,
239 "PARTIAL" => OrderStatus::Partial,
240 "EXPIRED" => OrderStatus::Expired,
241 _ => {
242 return Err(PolyfillError::parse(
243 format!("Unknown order status: {}", self.status),
244 None,
245 ))
246 },
247 };
248
249 let created_at =
250 chrono::DateTime::from_timestamp(self.created_at as i64, 0).ok_or_else(|| {
251 PolyfillError::parse("Invalid created_at timestamp".to_string(), None)
252 })?;
253
254 let expiration = if self.expiration > 0 {
255 Some(
256 chrono::DateTime::from_timestamp(self.expiration as i64, 0).ok_or_else(|| {
257 PolyfillError::parse("Invalid expiration timestamp".to_string(), None)
258 })?,
259 )
260 } else {
261 None
262 };
263
264 Ok(Order {
265 id: self.id.clone(),
266 token_id: self.asset_id.clone(),
267 side: self.side,
268 price: self.price,
269 original_size: self.original_size,
270 filled_size: self.size_matched,
271 remaining_size: self.original_size - self.size_matched,
272 status,
273 order_type: self.order_type,
274 created_at,
275 updated_at: created_at, expiration,
277 client_id: None,
278 })
279 }
280}
281
282impl Decoder<FillEvent> for RawTradeResponse {
283 fn decode(&self) -> Result<FillEvent> {
284 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
285 .ok_or_else(|| PolyfillError::parse("Invalid trade timestamp".to_string(), None))?;
286
287 let maker_address = Address::from_str(&self.maker_address)
288 .map_err(|e| PolyfillError::parse(format!("Invalid maker address: {}", e), None))?;
289
290 let taker_address = Address::from_str(&self.taker_address)
291 .map_err(|e| PolyfillError::parse(format!("Invalid taker address: {}", e), None))?;
292
293 Ok(FillEvent {
294 id: self.id.clone(),
295 order_id: "".to_string(), token_id: self.asset_id.clone(),
297 side: self.side,
298 price: self.price,
299 size: self.size,
300 timestamp,
301 maker_address,
302 taker_address,
303 fee: Decimal::ZERO, })
305 }
306}
307
308impl Decoder<Market> for RawMarketResponse {
309 fn decode(&self) -> Result<Market> {
310 let tokens = [
311 Token {
312 token_id: self.tokens[0].token_id.clone(),
313 outcome: self.tokens[0].outcome.clone(),
314 price: Decimal::ZERO,
315 winner: false,
316 },
317 Token {
318 token_id: self.tokens[1].token_id.clone(),
319 outcome: self.tokens[1].outcome.clone(),
320 price: Decimal::ZERO,
321 winner: false,
322 },
323 ];
324
325 Ok(Market {
326 condition_id: self.condition_id.clone(),
327 tokens,
328 rewards: crate::types::Rewards {
329 rates: None,
330 min_size: Decimal::ZERO,
331 max_spread: Decimal::ONE,
332 event_start_date: None,
333 event_end_date: None,
334 in_game_multiplier: None,
335 reward_epoch: None,
336 },
337 min_incentive_size: None,
338 max_incentive_spread: None,
339 active: self.active,
340 closed: self.closed,
341 question_id: self.condition_id.clone(), minimum_order_size: self.minimum_order_size,
343 minimum_tick_size: self.minimum_tick_size,
344 description: self.description.clone(),
345 category: self.category.clone(),
346 end_date_iso: self.end_date_iso.clone(),
347 game_start_time: None,
348 question: self.question.clone(),
349 market_slug: format!("market-{}", self.condition_id), seconds_delay: Decimal::ZERO,
351 icon: String::new(),
352 fpmm: String::new(),
353 enable_order_book: false,
355 archived: false,
356 accepting_orders: false,
357 accepting_order_timestamp: None,
358 maker_base_fee: Decimal::ZERO,
359 taker_base_fee: Decimal::ZERO,
360 notifications_enabled: false,
361 neg_risk: false,
362 neg_risk_market_id: String::new(),
363 neg_risk_request_id: String::new(),
364 image: String::new(),
365 is_50_50_outcome: false,
366 })
367 }
368}
369
370pub fn parse_stream_message(raw: &str) -> Result<StreamMessage> {
372 let value: Value = serde_json::from_str(raw)?;
373
374 let msg_type = value["type"]
375 .as_str()
376 .ok_or_else(|| PolyfillError::parse("Missing message type".to_string(), None))?;
377
378 match msg_type {
379 "book_update" => {
380 let data = value["data"].clone();
381 let delta: OrderDelta = serde_json::from_value(data)?;
382 Ok(StreamMessage::BookUpdate { data: delta })
383 },
384 "trade" => {
385 let data = value["data"].clone();
386 let raw_trade: RawTradeResponse = serde_json::from_value(data)?;
387 let fill = raw_trade.decode()?;
388 Ok(StreamMessage::Trade { data: fill })
389 },
390 "order_update" => {
391 let data = value["data"].clone();
392 let raw_order: RawOrderResponse = serde_json::from_value(data)?;
393 let order = raw_order.decode()?;
394 Ok(StreamMessage::OrderUpdate { data: order })
395 },
396 "heartbeat" => {
397 let timestamp = value["timestamp"]
398 .as_str()
399 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
400 .map(|dt| dt.with_timezone(&Utc))
401 .unwrap_or_else(Utc::now);
402 Ok(StreamMessage::Heartbeat { timestamp })
403 },
404 _ => Err(PolyfillError::parse(
405 format!("Unknown message type: {}", msg_type),
406 None,
407 )),
408 }
409}
410
411pub struct BatchDecoder {
413 buffer: Vec<u8>,
414}
415
416impl BatchDecoder {
417 pub fn new() -> Self {
418 Self {
419 buffer: Vec::with_capacity(8192),
420 }
421 }
422
423 pub fn parse_json_stream<T>(&mut self, data: &[u8]) -> Result<Vec<T>>
425 where
426 T: for<'de> serde::Deserialize<'de>,
427 {
428 self.buffer.extend_from_slice(data);
429 let mut results = Vec::new();
430 let mut start = 0;
431
432 while let Some(end) = self.find_json_boundary(start) {
433 let json_slice = &self.buffer[start..end];
434 if let Ok(obj) = serde_json::from_slice::<T>(json_slice) {
435 results.push(obj);
436 }
437 start = end;
438 }
439
440 if start > 0 {
442 self.buffer.drain(0..start);
443 }
444
445 Ok(results)
446 }
447
448 fn find_json_boundary(&self, start: usize) -> Option<usize> {
450 let mut depth = 0;
451 let mut in_string = false;
452 let mut escaped = false;
453
454 for (i, &byte) in self.buffer[start..].iter().enumerate() {
455 if escaped {
456 escaped = false;
457 continue;
458 }
459
460 match byte {
461 b'\\' if in_string => escaped = true,
462 b'"' => in_string = !in_string,
463 b'{' if !in_string => depth += 1,
464 b'}' if !in_string => {
465 depth -= 1;
466 if depth == 0 {
467 return Some(start + i + 1);
468 }
469 },
470 _ => {},
471 }
472 }
473
474 None
475 }
476}
477
478impl Default for BatchDecoder {
479 fn default() -> Self {
480 Self::new()
481 }
482}
483
484pub mod fast_parse {
486 use super::*;
487
488 #[inline]
490 pub fn parse_decimal(s: &str) -> Result<Decimal> {
491 Decimal::from_str(s)
492 .map_err(|e| PolyfillError::parse(format!("Invalid decimal: {}", e), None))
493 }
494
495 #[inline]
497 pub fn parse_address(s: &str) -> Result<Address> {
498 Address::from_str(s)
499 .map_err(|e| PolyfillError::parse(format!("Invalid address: {}", e), None))
500 }
501
502 #[inline]
506 pub fn parse_json_fast<T>(bytes: &mut [u8]) -> Result<T>
507 where
508 T: for<'de> serde::Deserialize<'de>,
509 {
510 match simd_json::serde::from_slice(bytes) {
512 Ok(val) => Ok(val),
513 Err(_) => {
514 serde_json::from_slice(bytes)
516 .map_err(|e| PolyfillError::parse(format!("JSON parse error: {}", e), None))
517 }
518 }
519 }
520
521 #[inline]
523 pub fn parse_json_fast_owned<T>(bytes: &[u8]) -> Result<T>
524 where
525 T: for<'de> serde::Deserialize<'de>,
526 {
527 let mut data = bytes.to_vec();
529 parse_json_fast(&mut data)
530 }
531
532 #[inline]
534 pub fn parse_u256(s: &str) -> Result<U256> {
535 U256::from_str_radix(s, 10)
536 .map_err(|e| PolyfillError::parse(format!("Invalid U256: {}", e), None))
537 }
538
539 #[inline]
541 pub fn parse_side(s: &str) -> Result<Side> {
542 match s.to_uppercase().as_str() {
543 "BUY" => Ok(Side::BUY),
544 "SELL" => Ok(Side::SELL),
545 _ => Err(PolyfillError::parse(format!("Invalid side: {}", s), None)),
546 }
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553
554 #[test]
555 fn test_parse_decimal() {
556 let result = fast_parse::parse_decimal("123.456").unwrap();
557 assert_eq!(result, Decimal::from_str("123.456").unwrap());
558 }
559
560 #[test]
561 fn test_parse_side() {
562 assert_eq!(fast_parse::parse_side("BUY").unwrap(), Side::BUY);
563 assert_eq!(fast_parse::parse_side("sell").unwrap(), Side::SELL);
564 assert!(fast_parse::parse_side("invalid").is_err());
565 }
566
567 #[test]
568 fn test_batch_decoder() {
569 let mut decoder = BatchDecoder::new();
570 let data = r#"{"test":1}{"test":2}"#.as_bytes();
571
572 let results: Vec<serde_json::Value> = decoder.parse_json_stream(data).unwrap();
573 assert_eq!(results.len(), 2);
574 }
575}