1use crate::errors::{PolyfillError, Result};
7use crate::types::*;
8use alloy_primitives::{Address, U256};
9use chrono::{DateTime, Utc};
10use rust_decimal::Decimal;
11use serde::{Deserialize, Deserializer};
12use serde_json::Value;
13use std::str::FromStr;
14
15pub mod deserializers {
17 use super::*;
18 use std::fmt::Display;
19
20 pub fn number_from_string<'de, T, D>(deserializer: D) -> std::result::Result<T, D::Error>
22 where
23 D: Deserializer<'de>,
24 T: FromStr + serde::Deserialize<'de> + Clone,
25 <T as FromStr>::Err: Display,
26 {
27 let value = serde_json::Value::deserialize(deserializer)?;
28 match value {
29 serde_json::Value::Number(n) => {
30 if let Some(v) = n.as_u64() {
31 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
32 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
33 } else if let Some(v) = n.as_f64() {
34 T::deserialize(serde_json::Value::Number(
35 serde_json::Number::from_f64(v).unwrap(),
36 ))
37 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
38 } else {
39 Err(serde::de::Error::custom("Invalid number format"))
40 }
41 },
42 serde_json::Value::String(s) => s.parse::<T>().map_err(serde::de::Error::custom),
43 _ => Err(serde::de::Error::custom("Expected number or string")),
44 }
45 }
46
47 pub fn optional_number_from_string<'de, T, D>(
49 deserializer: D,
50 ) -> std::result::Result<Option<T>, D::Error>
51 where
52 D: Deserializer<'de>,
53 T: FromStr + serde::Deserialize<'de> + Clone,
54 <T as FromStr>::Err: Display,
55 {
56 let value = serde_json::Value::deserialize(deserializer)?;
57 match value {
58 serde_json::Value::Null => Ok(None),
59 serde_json::Value::Number(n) => {
60 if let Some(v) = n.as_u64() {
61 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
62 .map(Some)
63 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
64 } else if let Some(v) = n.as_f64() {
65 T::deserialize(serde_json::Value::Number(
66 serde_json::Number::from_f64(v).unwrap(),
67 ))
68 .map(Some)
69 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
70 } else {
71 Err(serde::de::Error::custom("Invalid number format"))
72 }
73 },
74 serde_json::Value::String(s) => {
75 if s.is_empty() {
76 Ok(None)
77 } else {
78 s.parse::<T>().map(Some).map_err(serde::de::Error::custom)
79 }
80 },
81 _ => Err(serde::de::Error::custom("Expected number, string, or null")),
82 }
83 }
84
85 pub fn datetime_from_timestamp<'de, D>(
87 deserializer: D,
88 ) -> std::result::Result<DateTime<Utc>, D::Error>
89 where
90 D: Deserializer<'de>,
91 {
92 let timestamp = number_from_string::<u64, D>(deserializer)?;
93 DateTime::from_timestamp(timestamp as i64, 0)
94 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp"))
95 }
96
97 pub fn optional_datetime_from_timestamp<'de, D>(
99 deserializer: D,
100 ) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
101 where
102 D: Deserializer<'de>,
103 {
104 match optional_number_from_string::<u64, D>(deserializer)? {
105 Some(timestamp) => DateTime::from_timestamp(timestamp as i64, 0)
106 .map(Some)
107 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp")),
108 None => Ok(None),
109 }
110 }
111
112 pub fn vec_from_null<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
114 where
115 D: Deserializer<'de>,
116 T: serde::Deserialize<'de>,
117 {
118 Ok(Option::<Vec<T>>::deserialize(deserializer)?.unwrap_or_default())
119 }
120
121 pub fn optional_decimal_from_string<'de, D>(
127 deserializer: D,
128 ) -> std::result::Result<Option<Decimal>, D::Error>
129 where
130 D: Deserializer<'de>,
131 {
132 let value = serde_json::Value::deserialize(deserializer)?;
133 match value {
134 serde_json::Value::Null => Ok(None),
135 serde_json::Value::String(s) => {
136 let s = s.trim();
137 if s.is_empty() {
138 Ok(None)
139 } else {
140 s.parse::<Decimal>()
141 .map(Some)
142 .map_err(serde::de::Error::custom)
143 }
144 },
145 serde_json::Value::Number(n) => Decimal::from_str(&n.to_string())
146 .map(Some)
147 .map_err(serde::de::Error::custom),
148 other => Err(serde::de::Error::custom(format!(
149 "Expected decimal as string/number/null, got {other}"
150 ))),
151 }
152 }
153
154 pub fn optional_decimal_from_string_default_on_error<'de, D>(
156 deserializer: D,
157 ) -> std::result::Result<Option<Decimal>, D::Error>
158 where
159 D: Deserializer<'de>,
160 {
161 let value = serde_json::Value::deserialize(deserializer)?;
162 match value {
163 serde_json::Value::Null => Ok(None),
164 serde_json::Value::String(s) => {
165 let s = s.trim();
166 if s.is_empty() {
167 Ok(None)
168 } else {
169 Ok(s.parse::<Decimal>().ok())
170 }
171 },
172 serde_json::Value::Number(n) => Ok(Decimal::from_str(&n.to_string()).ok()),
173 _ => Ok(None),
174 }
175 }
176
177 pub fn decimal_from_string<'de, D>(deserializer: D) -> std::result::Result<Decimal, D::Error>
182 where
183 D: Deserializer<'de>,
184 {
185 optional_decimal_from_string(deserializer)?.ok_or_else(|| {
186 serde::de::Error::custom("Expected decimal as string/number, got null/empty string")
187 })
188 }
189}
190
191#[derive(Debug, Deserialize)]
193pub struct RawOrderBookResponse {
194 pub market: String,
195 pub asset_id: String,
196 pub hash: String,
197 #[serde(deserialize_with = "deserializers::number_from_string")]
198 pub timestamp: u64,
199 pub bids: Vec<RawBookLevel>,
200 pub asks: Vec<RawBookLevel>,
201}
202
203#[derive(Debug, Deserialize)]
204pub struct RawBookLevel {
205 #[serde(with = "rust_decimal::serde::str")]
206 pub price: Decimal,
207 #[serde(with = "rust_decimal::serde::str")]
208 pub size: Decimal,
209}
210
211#[derive(Debug, Deserialize)]
212pub struct RawOrderResponse {
213 pub id: String,
214 pub status: String,
215 pub market: String,
216 pub asset_id: String,
217 pub maker_address: String,
218 pub owner: String,
219 pub outcome: String,
220 #[serde(rename = "type")]
221 pub order_type: OrderType,
222 pub side: Side,
223 #[serde(with = "rust_decimal::serde::str")]
224 pub original_size: Decimal,
225 #[serde(with = "rust_decimal::serde::str")]
226 pub price: Decimal,
227 #[serde(with = "rust_decimal::serde::str")]
228 pub size_matched: Decimal,
229 #[serde(deserialize_with = "deserializers::number_from_string")]
230 pub expiration: u64,
231 #[serde(deserialize_with = "deserializers::number_from_string")]
232 pub created_at: u64,
233}
234
235#[derive(Debug, Deserialize)]
236pub struct RawTradeResponse {
237 pub id: String,
238 pub market: String,
239 pub asset_id: String,
240 pub side: Side,
241 #[serde(with = "rust_decimal::serde::str")]
242 pub price: Decimal,
243 #[serde(with = "rust_decimal::serde::str")]
244 pub size: Decimal,
245 pub maker_address: String,
246 pub taker_address: String,
247 #[serde(deserialize_with = "deserializers::number_from_string")]
248 pub timestamp: u64,
249}
250
251#[derive(Debug, Deserialize)]
252pub struct RawMarketResponse {
253 pub condition_id: String,
254 pub tokens: [RawToken; 2],
255 pub active: bool,
256 pub closed: bool,
257 pub question: String,
258 pub description: String,
259 pub category: Option<String>,
260 pub end_date_iso: Option<String>,
261 #[serde(with = "rust_decimal::serde::str")]
262 pub minimum_order_size: Decimal,
263 #[serde(with = "rust_decimal::serde::str")]
264 pub minimum_tick_size: Decimal,
265}
266
267#[derive(Debug, Deserialize)]
268pub struct RawToken {
269 pub token_id: String,
270 pub outcome: String,
271}
272
273pub trait Decoder<T> {
275 fn decode(&self) -> Result<T>;
276}
277
278impl Decoder<OrderBook> for RawOrderBookResponse {
279 fn decode(&self) -> Result<OrderBook> {
280 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
281 .ok_or_else(|| PolyfillError::parse("Invalid timestamp".to_string(), None))?;
282
283 let bids = self
284 .bids
285 .iter()
286 .map(|level| BookLevel {
287 price: level.price,
288 size: level.size,
289 })
290 .collect();
291
292 let asks = self
293 .asks
294 .iter()
295 .map(|level| BookLevel {
296 price: level.price,
297 size: level.size,
298 })
299 .collect();
300
301 Ok(OrderBook {
302 token_id: self.asset_id.clone(),
303 timestamp,
304 bids,
305 asks,
306 sequence: 0, })
308 }
309}
310
311impl Decoder<Order> for RawOrderResponse {
312 fn decode(&self) -> Result<Order> {
313 let status = match self.status.as_str() {
314 "LIVE" => OrderStatus::Live,
315 "CANCELLED" => OrderStatus::Cancelled,
316 "FILLED" => OrderStatus::Filled,
317 "PARTIAL" => OrderStatus::Partial,
318 "EXPIRED" => OrderStatus::Expired,
319 _ => {
320 return Err(PolyfillError::parse(
321 format!("Unknown order status: {}", self.status),
322 None,
323 ))
324 },
325 };
326
327 let created_at =
328 chrono::DateTime::from_timestamp(self.created_at as i64, 0).ok_or_else(|| {
329 PolyfillError::parse("Invalid created_at timestamp".to_string(), None)
330 })?;
331
332 let expiration = if self.expiration > 0 {
333 Some(
334 chrono::DateTime::from_timestamp(self.expiration as i64, 0).ok_or_else(|| {
335 PolyfillError::parse("Invalid expiration timestamp".to_string(), None)
336 })?,
337 )
338 } else {
339 None
340 };
341
342 Ok(Order {
343 id: self.id.clone(),
344 token_id: self.asset_id.clone(),
345 side: self.side,
346 price: self.price,
347 original_size: self.original_size,
348 filled_size: self.size_matched,
349 remaining_size: self.original_size - self.size_matched,
350 status,
351 order_type: self.order_type,
352 created_at,
353 updated_at: created_at, expiration,
355 client_id: None,
356 })
357 }
358}
359
360impl Decoder<FillEvent> for RawTradeResponse {
361 fn decode(&self) -> Result<FillEvent> {
362 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
363 .ok_or_else(|| PolyfillError::parse("Invalid trade timestamp".to_string(), None))?;
364
365 let maker_address = Address::from_str(&self.maker_address)
366 .map_err(|e| PolyfillError::parse(format!("Invalid maker address: {}", e), None))?;
367
368 let taker_address = Address::from_str(&self.taker_address)
369 .map_err(|e| PolyfillError::parse(format!("Invalid taker address: {}", e), None))?;
370
371 Ok(FillEvent {
372 id: self.id.clone(),
373 order_id: "".to_string(), token_id: self.asset_id.clone(),
375 side: self.side,
376 price: self.price,
377 size: self.size,
378 timestamp,
379 maker_address,
380 taker_address,
381 fee: Decimal::ZERO, })
383 }
384}
385
386impl Decoder<Market> for RawMarketResponse {
387 fn decode(&self) -> Result<Market> {
388 let tokens = [
389 Token {
390 token_id: self.tokens[0].token_id.clone(),
391 outcome: self.tokens[0].outcome.clone(),
392 price: Decimal::ZERO,
393 winner: false,
394 },
395 Token {
396 token_id: self.tokens[1].token_id.clone(),
397 outcome: self.tokens[1].outcome.clone(),
398 price: Decimal::ZERO,
399 winner: false,
400 },
401 ];
402
403 Ok(Market {
404 condition_id: self.condition_id.clone(),
405 tokens,
406 rewards: crate::types::Rewards {
407 rates: None,
408 min_size: Decimal::ZERO,
409 max_spread: Decimal::ONE,
410 event_start_date: None,
411 event_end_date: None,
412 in_game_multiplier: None,
413 reward_epoch: None,
414 },
415 min_incentive_size: None,
416 max_incentive_spread: None,
417 active: self.active,
418 closed: self.closed,
419 question_id: self.condition_id.clone(), minimum_order_size: self.minimum_order_size,
421 minimum_tick_size: self.minimum_tick_size,
422 description: self.description.clone(),
423 category: self.category.clone(),
424 end_date_iso: self.end_date_iso.clone(),
425 game_start_time: None,
426 question: self.question.clone(),
427 market_slug: format!("market-{}", self.condition_id), seconds_delay: Decimal::ZERO,
429 icon: String::new(),
430 fpmm: String::new(),
431 enable_order_book: false,
433 archived: false,
434 accepting_orders: false,
435 accepting_order_timestamp: None,
436 maker_base_fee: Decimal::ZERO,
437 taker_base_fee: Decimal::ZERO,
438 notifications_enabled: false,
439 neg_risk: false,
440 neg_risk_market_id: String::new(),
441 neg_risk_request_id: String::new(),
442 image: String::new(),
443 is_50_50_outcome: false,
444 })
445 }
446}
447
448pub fn parse_stream_messages(raw: &str) -> Result<Vec<StreamMessage>> {
455 parse_stream_messages_bytes(raw.as_bytes())
456}
457
458pub fn parse_stream_messages_bytes(bytes: &[u8]) -> Result<Vec<StreamMessage>> {
460 let value: Value = serde_json::from_slice(bytes)?;
461
462 match value {
463 Value::Object(map) => {
464 let event_type = map.get("event_type").and_then(Value::as_str);
465 match event_type {
466 None => Ok(vec![]),
467 Some(_) => {
468 let msg: StreamMessage = serde_json::from_value(Value::Object(map))?;
469 match msg {
470 StreamMessage::Unknown => Ok(vec![]),
471 other => Ok(vec![other]),
472 }
473 },
474 }
475 },
476 Value::Array(arr) => Ok(arr
477 .into_iter()
478 .filter_map(|elem| {
479 let Value::Object(map) = elem else {
480 return None;
481 };
482
483 let event_type = map.get("event_type").and_then(Value::as_str)?;
484 match event_type {
486 "book" | "price_change" | "tick_size_change" | "last_trade_price"
487 | "best_bid_ask" | "new_market" | "market_resolved" | "trade" | "order" => {},
488 _ => return None,
489 }
490
491 match serde_json::from_value::<StreamMessage>(Value::Object(map)) {
492 Ok(StreamMessage::Unknown) => None,
493 Ok(msg) => Some(msg),
494 Err(_) => None,
495 }
496 })
497 .collect()),
498 _ => Ok(vec![]),
499 }
500}
501
502pub struct BatchDecoder {
504 buffer: Vec<u8>,
505}
506
507impl BatchDecoder {
508 pub fn new() -> Self {
509 Self {
510 buffer: Vec::with_capacity(8192),
511 }
512 }
513
514 pub fn parse_json_stream<T>(&mut self, data: &[u8]) -> Result<Vec<T>>
516 where
517 T: for<'de> serde::Deserialize<'de>,
518 {
519 self.buffer.extend_from_slice(data);
520 let mut results = Vec::new();
521 let mut start = 0;
522
523 while let Some(end) = self.find_json_boundary(start) {
524 let json_slice = &self.buffer[start..end];
525 if let Ok(obj) = serde_json::from_slice::<T>(json_slice) {
526 results.push(obj);
527 }
528 start = end;
529 }
530
531 if start > 0 {
533 self.buffer.drain(0..start);
534 }
535
536 Ok(results)
537 }
538
539 fn find_json_boundary(&self, start: usize) -> Option<usize> {
541 let mut depth = 0;
542 let mut in_string = false;
543 let mut escaped = false;
544
545 for (i, &byte) in self.buffer[start..].iter().enumerate() {
546 if escaped {
547 escaped = false;
548 continue;
549 }
550
551 match byte {
552 b'\\' if in_string => escaped = true,
553 b'"' => in_string = !in_string,
554 b'{' if !in_string => depth += 1,
555 b'}' if !in_string => {
556 depth -= 1;
557 if depth == 0 {
558 return Some(start + i + 1);
559 }
560 },
561 _ => {},
562 }
563 }
564
565 None
566 }
567}
568
569impl Default for BatchDecoder {
570 fn default() -> Self {
571 Self::new()
572 }
573}
574
575pub mod fast_parse {
577 use super::*;
578
579 #[inline]
581 pub fn parse_decimal(s: &str) -> Result<Decimal> {
582 Decimal::from_str(s)
583 .map_err(|e| PolyfillError::parse(format!("Invalid decimal: {}", e), None))
584 }
585
586 #[inline]
588 pub fn parse_address(s: &str) -> Result<Address> {
589 Address::from_str(s)
590 .map_err(|e| PolyfillError::parse(format!("Invalid address: {}", e), None))
591 }
592
593 #[inline]
597 pub fn parse_json_fast<T>(bytes: &mut [u8]) -> Result<T>
598 where
599 T: for<'de> serde::Deserialize<'de>,
600 {
601 match simd_json::serde::from_slice(bytes) {
603 Ok(val) => Ok(val),
604 Err(_) => {
605 serde_json::from_slice(bytes)
607 .map_err(|e| PolyfillError::parse(format!("JSON parse error: {}", e), None))
608 },
609 }
610 }
611
612 #[inline]
614 pub fn parse_json_fast_owned<T>(bytes: &[u8]) -> Result<T>
615 where
616 T: for<'de> serde::Deserialize<'de>,
617 {
618 let mut data = bytes.to_vec();
620 parse_json_fast(&mut data)
621 }
622
623 #[inline]
625 pub fn parse_u256(s: &str) -> Result<U256> {
626 U256::from_str_radix(s, 10)
627 .map_err(|e| PolyfillError::parse(format!("Invalid U256: {}", e), None))
628 }
629
630 #[inline]
632 pub fn parse_side(s: &str) -> Result<Side> {
633 match s.to_uppercase().as_str() {
634 "BUY" => Ok(Side::BUY),
635 "SELL" => Ok(Side::SELL),
636 _ => Err(PolyfillError::parse(format!("Invalid side: {}", s), None)),
637 }
638 }
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644
645 #[test]
646 fn test_parse_decimal() {
647 let result = fast_parse::parse_decimal("123.456").unwrap();
648 assert_eq!(result, Decimal::from_str("123.456").unwrap());
649 }
650
651 #[test]
652 fn test_parse_side() {
653 assert_eq!(fast_parse::parse_side("BUY").unwrap(), Side::BUY);
654 assert_eq!(fast_parse::parse_side("sell").unwrap(), Side::SELL);
655 assert!(fast_parse::parse_side("invalid").is_err());
656 }
657
658 #[test]
659 fn test_batch_decoder() {
660 let mut decoder = BatchDecoder::new();
661 let data = r#"{"test":1}{"test":2}"#.as_bytes();
662
663 let results: Vec<serde_json::Value> = decoder.parse_json_stream(data).unwrap();
664 assert_eq!(results.len(), 2);
665 }
666}