1use crate::errors::{PolyfillError, Result};
7use crate::types::*;
8use alloy_primitives::{Address, U256};
9use chrono::{DateTime, Utc};
10use rust_decimal::Decimal;
11use serde::{Deserialize, Deserializer};
12use serde_json::Value;
13use std::str::FromStr;
14
15pub mod deserializers {
17 use super::*;
18 use std::fmt::Display;
19
20 pub fn number_from_string<'de, T, D>(deserializer: D) -> std::result::Result<T, D::Error>
22 where
23 D: Deserializer<'de>,
24 T: FromStr + serde::Deserialize<'de> + Clone,
25 <T as FromStr>::Err: Display,
26 {
27 let value = serde_json::Value::deserialize(deserializer)?;
28 match value {
29 serde_json::Value::Number(n) => {
30 if let Some(v) = n.as_u64() {
31 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
32 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
33 } else if let Some(v) = n.as_f64() {
34 T::deserialize(serde_json::Value::Number(
35 serde_json::Number::from_f64(v).unwrap(),
36 ))
37 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
38 } else {
39 Err(serde::de::Error::custom("Invalid number format"))
40 }
41 },
42 serde_json::Value::String(s) => s.parse::<T>().map_err(serde::de::Error::custom),
43 _ => Err(serde::de::Error::custom("Expected number or string")),
44 }
45 }
46
47 pub fn optional_number_from_string<'de, T, D>(
49 deserializer: D,
50 ) -> std::result::Result<Option<T>, D::Error>
51 where
52 D: Deserializer<'de>,
53 T: FromStr + serde::Deserialize<'de> + Clone,
54 <T as FromStr>::Err: Display,
55 {
56 let value = serde_json::Value::deserialize(deserializer)?;
57 match value {
58 serde_json::Value::Null => Ok(None),
59 serde_json::Value::Number(n) => {
60 if let Some(v) = n.as_u64() {
61 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
62 .map(Some)
63 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
64 } else if let Some(v) = n.as_f64() {
65 T::deserialize(serde_json::Value::Number(
66 serde_json::Number::from_f64(v).unwrap(),
67 ))
68 .map(Some)
69 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
70 } else {
71 Err(serde::de::Error::custom("Invalid number format"))
72 }
73 },
74 serde_json::Value::String(s) => {
75 if s.is_empty() {
76 Ok(None)
77 } else {
78 s.parse::<T>().map(Some).map_err(serde::de::Error::custom)
79 }
80 },
81 _ => Err(serde::de::Error::custom("Expected number, string, or null")),
82 }
83 }
84
85 pub fn datetime_from_timestamp<'de, D>(
87 deserializer: D,
88 ) -> std::result::Result<DateTime<Utc>, D::Error>
89 where
90 D: Deserializer<'de>,
91 {
92 let timestamp = number_from_string::<u64, D>(deserializer)?;
93 DateTime::from_timestamp(timestamp as i64, 0)
94 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp"))
95 }
96
97 pub fn optional_datetime_from_timestamp<'de, D>(
99 deserializer: D,
100 ) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
101 where
102 D: Deserializer<'de>,
103 {
104 match optional_number_from_string::<u64, D>(deserializer)? {
105 Some(timestamp) => DateTime::from_timestamp(timestamp as i64, 0)
106 .map(Some)
107 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp")),
108 None => Ok(None),
109 }
110 }
111
112 pub fn vec_from_null<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
114 where
115 D: Deserializer<'de>,
116 T: serde::Deserialize<'de>,
117 {
118 Ok(Option::<Vec<T>>::deserialize(deserializer)?.unwrap_or_default())
119 }
120
121 pub fn optional_decimal_from_string<'de, D>(
127 deserializer: D,
128 ) -> std::result::Result<Option<Decimal>, D::Error>
129 where
130 D: Deserializer<'de>,
131 {
132 let value = serde_json::Value::deserialize(deserializer)?;
133 match value {
134 serde_json::Value::Null => Ok(None),
135 serde_json::Value::String(s) => {
136 let s = s.trim();
137 if s.is_empty() {
138 Ok(None)
139 } else {
140 s.parse::<Decimal>()
141 .map(Some)
142 .map_err(serde::de::Error::custom)
143 }
144 },
145 serde_json::Value::Number(n) => Decimal::from_str(&n.to_string())
146 .map(Some)
147 .map_err(serde::de::Error::custom),
148 other => Err(serde::de::Error::custom(format!(
149 "Expected decimal as string/number/null, got {other}"
150 ))),
151 }
152 }
153
154 pub fn optional_decimal_from_string_default_on_error<'de, D>(
156 deserializer: D,
157 ) -> std::result::Result<Option<Decimal>, D::Error>
158 where
159 D: Deserializer<'de>,
160 {
161 let value = serde_json::Value::deserialize(deserializer)?;
162 match value {
163 serde_json::Value::Null => Ok(None),
164 serde_json::Value::String(s) => {
165 let s = s.trim();
166 if s.is_empty() {
167 Ok(None)
168 } else {
169 Ok(s.parse::<Decimal>().ok())
170 }
171 },
172 serde_json::Value::Number(n) => Ok(Decimal::from_str(&n.to_string()).ok()),
173 _ => Ok(None),
174 }
175 }
176
177 pub fn decimal_from_string<'de, D>(deserializer: D) -> std::result::Result<Decimal, D::Error>
182 where
183 D: Deserializer<'de>,
184 {
185 optional_decimal_from_string(deserializer)?.ok_or_else(|| {
186 serde::de::Error::custom("Expected decimal as string/number, got null/empty string")
187 })
188 }
189
190 pub fn decimal_from_string_or_zero<'de, D>(
192 deserializer: D,
193 ) -> std::result::Result<Decimal, D::Error>
194 where
195 D: Deserializer<'de>,
196 {
197 Ok(optional_decimal_from_string(deserializer)?.unwrap_or(Decimal::ZERO))
198 }
199}
200
201#[derive(Debug, Deserialize)]
203pub struct RawOrderBookResponse {
204 pub market: String,
205 pub asset_id: String,
206 pub hash: String,
207 #[serde(deserialize_with = "deserializers::number_from_string")]
208 pub timestamp: u64,
209 pub bids: Vec<RawBookLevel>,
210 pub asks: Vec<RawBookLevel>,
211}
212
213#[derive(Debug, Deserialize)]
214pub struct RawBookLevel {
215 #[serde(with = "rust_decimal::serde::str")]
216 pub price: Decimal,
217 #[serde(with = "rust_decimal::serde::str")]
218 pub size: Decimal,
219}
220
221#[derive(Debug, Deserialize)]
222pub struct RawOrderResponse {
223 pub id: String,
224 pub status: String,
225 pub market: String,
226 pub asset_id: String,
227 pub maker_address: String,
228 pub owner: String,
229 pub outcome: String,
230 #[serde(rename = "type")]
231 pub order_type: OrderType,
232 pub side: Side,
233 #[serde(with = "rust_decimal::serde::str")]
234 pub original_size: Decimal,
235 #[serde(with = "rust_decimal::serde::str")]
236 pub price: Decimal,
237 #[serde(with = "rust_decimal::serde::str")]
238 pub size_matched: Decimal,
239 #[serde(deserialize_with = "deserializers::number_from_string")]
240 pub expiration: u64,
241 #[serde(deserialize_with = "deserializers::number_from_string")]
242 pub created_at: u64,
243}
244
245#[derive(Debug, Deserialize)]
246pub struct RawTradeResponse {
247 pub id: String,
248 pub market: String,
249 pub asset_id: String,
250 pub side: Side,
251 #[serde(with = "rust_decimal::serde::str")]
252 pub price: Decimal,
253 #[serde(with = "rust_decimal::serde::str")]
254 pub size: Decimal,
255 pub maker_address: String,
256 pub taker_address: String,
257 #[serde(deserialize_with = "deserializers::number_from_string")]
258 pub timestamp: u64,
259}
260
261#[derive(Debug, Deserialize)]
262pub struct RawMarketResponse {
263 pub condition_id: String,
264 pub tokens: [RawToken; 2],
265 pub active: bool,
266 pub closed: bool,
267 pub question: String,
268 pub description: String,
269 pub category: Option<String>,
270 pub end_date_iso: Option<String>,
271 #[serde(with = "rust_decimal::serde::str")]
272 pub minimum_order_size: Decimal,
273 #[serde(with = "rust_decimal::serde::str")]
274 pub minimum_tick_size: Decimal,
275}
276
277#[derive(Debug, Deserialize)]
278pub struct RawToken {
279 pub token_id: String,
280 pub outcome: String,
281}
282
283pub trait Decoder<T> {
285 fn decode(&self) -> Result<T>;
286}
287
288impl Decoder<OrderBook> for RawOrderBookResponse {
289 fn decode(&self) -> Result<OrderBook> {
290 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
291 .ok_or_else(|| PolyfillError::parse("Invalid timestamp".to_string(), None))?;
292
293 let bids = self
294 .bids
295 .iter()
296 .map(|level| BookLevel {
297 price: level.price,
298 size: level.size,
299 })
300 .collect();
301
302 let asks = self
303 .asks
304 .iter()
305 .map(|level| BookLevel {
306 price: level.price,
307 size: level.size,
308 })
309 .collect();
310
311 Ok(OrderBook {
312 token_id: self.asset_id.clone(),
313 timestamp,
314 bids,
315 asks,
316 sequence: 0, })
318 }
319}
320
321impl Decoder<Order> for RawOrderResponse {
322 fn decode(&self) -> Result<Order> {
323 let status = match self.status.as_str() {
324 "LIVE" => OrderStatus::Live,
325 "CANCELLED" => OrderStatus::Cancelled,
326 "FILLED" => OrderStatus::Filled,
327 "PARTIAL" => OrderStatus::Partial,
328 "EXPIRED" => OrderStatus::Expired,
329 _ => {
330 return Err(PolyfillError::parse(
331 format!("Unknown order status: {}", self.status),
332 None,
333 ))
334 },
335 };
336
337 let created_at =
338 chrono::DateTime::from_timestamp(self.created_at as i64, 0).ok_or_else(|| {
339 PolyfillError::parse("Invalid created_at timestamp".to_string(), None)
340 })?;
341
342 let expiration = if self.expiration > 0 {
343 Some(
344 chrono::DateTime::from_timestamp(self.expiration as i64, 0).ok_or_else(|| {
345 PolyfillError::parse("Invalid expiration timestamp".to_string(), None)
346 })?,
347 )
348 } else {
349 None
350 };
351
352 Ok(Order {
353 id: self.id.clone(),
354 token_id: self.asset_id.clone(),
355 side: self.side,
356 price: self.price,
357 original_size: self.original_size,
358 filled_size: self.size_matched,
359 remaining_size: self.original_size - self.size_matched,
360 status,
361 order_type: self.order_type,
362 created_at,
363 updated_at: created_at, expiration,
365 client_id: None,
366 })
367 }
368}
369
370impl Decoder<FillEvent> for RawTradeResponse {
371 fn decode(&self) -> Result<FillEvent> {
372 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
373 .ok_or_else(|| PolyfillError::parse("Invalid trade timestamp".to_string(), None))?;
374
375 let maker_address = Address::from_str(&self.maker_address)
376 .map_err(|e| PolyfillError::parse(format!("Invalid maker address: {}", e), None))?;
377
378 let taker_address = Address::from_str(&self.taker_address)
379 .map_err(|e| PolyfillError::parse(format!("Invalid taker address: {}", e), None))?;
380
381 Ok(FillEvent {
382 id: self.id.clone(),
383 order_id: "".to_string(), token_id: self.asset_id.clone(),
385 side: self.side,
386 price: self.price,
387 size: self.size,
388 timestamp,
389 maker_address,
390 taker_address,
391 fee: Decimal::ZERO, })
393 }
394}
395
396impl Decoder<Market> for RawMarketResponse {
397 fn decode(&self) -> Result<Market> {
398 let tokens = [
399 Token {
400 token_id: self.tokens[0].token_id.clone(),
401 outcome: self.tokens[0].outcome.clone(),
402 price: Decimal::ZERO,
403 winner: false,
404 },
405 Token {
406 token_id: self.tokens[1].token_id.clone(),
407 outcome: self.tokens[1].outcome.clone(),
408 price: Decimal::ZERO,
409 winner: false,
410 },
411 ];
412
413 Ok(Market {
414 condition_id: self.condition_id.clone(),
415 tokens,
416 rewards: crate::types::Rewards {
417 rates: None,
418 min_size: Decimal::ZERO,
419 max_spread: Decimal::ONE,
420 event_start_date: None,
421 event_end_date: None,
422 in_game_multiplier: None,
423 reward_epoch: None,
424 },
425 min_incentive_size: None,
426 max_incentive_spread: None,
427 active: self.active,
428 closed: self.closed,
429 question_id: self.condition_id.clone(), minimum_order_size: self.minimum_order_size,
431 minimum_tick_size: self.minimum_tick_size,
432 description: self.description.clone(),
433 category: self.category.clone(),
434 end_date_iso: self.end_date_iso.clone(),
435 game_start_time: None,
436 question: self.question.clone(),
437 market_slug: format!("market-{}", self.condition_id), seconds_delay: Decimal::ZERO,
439 icon: String::new(),
440 fpmm: String::new(),
441 enable_order_book: false,
443 archived: false,
444 accepting_orders: false,
445 accepting_order_timestamp: None,
446 maker_base_fee: Decimal::ZERO,
447 taker_base_fee: Decimal::ZERO,
448 notifications_enabled: false,
449 neg_risk: false,
450 neg_risk_market_id: String::new(),
451 neg_risk_request_id: String::new(),
452 image: String::new(),
453 is_50_50_outcome: false,
454 })
455 }
456}
457
458pub fn parse_stream_messages(raw: &str) -> Result<Vec<StreamMessage>> {
465 parse_stream_messages_bytes(raw.as_bytes())
466}
467
468pub fn parse_stream_messages_bytes(bytes: &[u8]) -> Result<Vec<StreamMessage>> {
470 let value: Value = serde_json::from_slice(bytes)?;
471
472 match value {
473 Value::Object(map) => {
474 let event_type = map.get("event_type").and_then(Value::as_str);
475 match event_type {
476 None => Ok(vec![]),
477 Some(_) => {
478 let msg: StreamMessage = serde_json::from_value(Value::Object(map))?;
479 match msg {
480 StreamMessage::Unknown => Ok(vec![]),
481 other => Ok(vec![other]),
482 }
483 },
484 }
485 },
486 Value::Array(arr) => Ok(arr
487 .into_iter()
488 .filter_map(|elem| {
489 let Value::Object(map) = elem else {
490 return None;
491 };
492
493 let event_type = map.get("event_type").and_then(Value::as_str)?;
494 match event_type {
496 "book" | "price_change" | "tick_size_change" | "last_trade_price"
497 | "best_bid_ask" | "new_market" | "market_resolved" | "trade" | "order" => {},
498 _ => return None,
499 }
500
501 match serde_json::from_value::<StreamMessage>(Value::Object(map)) {
502 Ok(StreamMessage::Unknown) => None,
503 Ok(msg) => Some(msg),
504 Err(_) => None,
505 }
506 })
507 .collect()),
508 _ => Ok(vec![]),
509 }
510}
511
512pub struct BatchDecoder {
514 buffer: Vec<u8>,
515}
516
517impl BatchDecoder {
518 pub fn new() -> Self {
519 Self {
520 buffer: Vec::with_capacity(8192),
521 }
522 }
523
524 pub fn parse_json_stream<T>(&mut self, data: &[u8]) -> Result<Vec<T>>
526 where
527 T: for<'de> serde::Deserialize<'de>,
528 {
529 self.buffer.extend_from_slice(data);
530 let mut results = Vec::new();
531 let mut start = 0;
532
533 while let Some(end) = self.find_json_boundary(start) {
534 let json_slice = &self.buffer[start..end];
535 if let Ok(obj) = serde_json::from_slice::<T>(json_slice) {
536 results.push(obj);
537 }
538 start = end;
539 }
540
541 if start > 0 {
543 self.buffer.drain(0..start);
544 }
545
546 Ok(results)
547 }
548
549 fn find_json_boundary(&self, start: usize) -> Option<usize> {
551 let mut depth = 0;
552 let mut in_string = false;
553 let mut escaped = false;
554
555 for (i, &byte) in self.buffer[start..].iter().enumerate() {
556 if escaped {
557 escaped = false;
558 continue;
559 }
560
561 match byte {
562 b'\\' if in_string => escaped = true,
563 b'"' => in_string = !in_string,
564 b'{' if !in_string => depth += 1,
565 b'}' if !in_string => {
566 depth -= 1;
567 if depth == 0 {
568 return Some(start + i + 1);
569 }
570 },
571 _ => {},
572 }
573 }
574
575 None
576 }
577}
578
579impl Default for BatchDecoder {
580 fn default() -> Self {
581 Self::new()
582 }
583}
584
585pub mod fast_parse {
587 use super::*;
588
589 #[inline]
591 pub fn parse_decimal(s: &str) -> Result<Decimal> {
592 Decimal::from_str(s)
593 .map_err(|e| PolyfillError::parse(format!("Invalid decimal: {}", e), None))
594 }
595
596 #[inline]
598 pub fn parse_address(s: &str) -> Result<Address> {
599 Address::from_str(s)
600 .map_err(|e| PolyfillError::parse(format!("Invalid address: {}", e), None))
601 }
602
603 #[inline]
607 pub fn parse_json_fast<T>(bytes: &mut [u8]) -> Result<T>
608 where
609 T: for<'de> serde::Deserialize<'de>,
610 {
611 match simd_json::serde::from_slice(bytes) {
613 Ok(val) => Ok(val),
614 Err(_) => {
615 serde_json::from_slice(bytes)
617 .map_err(|e| PolyfillError::parse(format!("JSON parse error: {}", e), None))
618 },
619 }
620 }
621
622 #[inline]
624 pub fn parse_json_fast_owned<T>(bytes: &[u8]) -> Result<T>
625 where
626 T: for<'de> serde::Deserialize<'de>,
627 {
628 let mut data = bytes.to_vec();
630 parse_json_fast(&mut data)
631 }
632
633 #[inline]
635 pub fn parse_u256(s: &str) -> Result<U256> {
636 U256::from_str_radix(s, 10)
637 .map_err(|e| PolyfillError::parse(format!("Invalid U256: {}", e), None))
638 }
639
640 #[inline]
642 pub fn parse_side(s: &str) -> Result<Side> {
643 match s.to_uppercase().as_str() {
644 "BUY" => Ok(Side::BUY),
645 "SELL" => Ok(Side::SELL),
646 _ => Err(PolyfillError::parse(format!("Invalid side: {}", s), None)),
647 }
648 }
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654
655 #[test]
656 fn test_parse_decimal() {
657 let result = fast_parse::parse_decimal("123.456").unwrap();
658 assert_eq!(result, Decimal::from_str("123.456").unwrap());
659 }
660
661 #[test]
662 fn test_parse_side() {
663 assert_eq!(fast_parse::parse_side("BUY").unwrap(), Side::BUY);
664 assert_eq!(fast_parse::parse_side("sell").unwrap(), Side::SELL);
665 assert!(fast_parse::parse_side("invalid").is_err());
666 }
667
668 #[test]
669 fn test_batch_decoder() {
670 let mut decoder = BatchDecoder::new();
671 let data = r#"{"test":1}{"test":2}"#.as_bytes();
672
673 let results: Vec<serde_json::Value> = decoder.parse_json_stream(data).unwrap();
674 assert_eq!(results.len(), 2);
675 }
676}