1use crate::errors::{PolyfillError, Result};
7use crate::types::*;
8use alloy_primitives::{Address, U256};
9use chrono::{DateTime, Utc};
10use rust_decimal::Decimal;
11use serde::{Deserialize, Deserializer};
12use serde_json::Value;
13use std::str::FromStr;
14
15pub mod deserializers {
17 use super::*;
18 use std::fmt::Display;
19
20 pub fn number_from_string<'de, T, D>(deserializer: D) -> std::result::Result<T, D::Error>
22 where
23 D: Deserializer<'de>,
24 T: FromStr + serde::Deserialize<'de> + Clone,
25 <T as FromStr>::Err: Display,
26 {
27 let value = serde_json::Value::deserialize(deserializer)?;
28 match value {
29 serde_json::Value::Number(n) => {
30 if let Some(v) = n.as_u64() {
31 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
32 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
33 } else if let Some(v) = n.as_f64() {
34 T::deserialize(serde_json::Value::Number(
35 serde_json::Number::from_f64(v).unwrap(),
36 ))
37 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
38 } else {
39 Err(serde::de::Error::custom("Invalid number format"))
40 }
41 },
42 serde_json::Value::String(s) => s.parse::<T>().map_err(serde::de::Error::custom),
43 _ => Err(serde::de::Error::custom("Expected number or string")),
44 }
45 }
46
47 pub fn optional_number_from_string<'de, T, D>(
49 deserializer: D,
50 ) -> std::result::Result<Option<T>, D::Error>
51 where
52 D: Deserializer<'de>,
53 T: FromStr + serde::Deserialize<'de> + Clone,
54 <T as FromStr>::Err: Display,
55 {
56 let value = serde_json::Value::deserialize(deserializer)?;
57 match value {
58 serde_json::Value::Null => Ok(None),
59 serde_json::Value::Number(n) => {
60 if let Some(v) = n.as_u64() {
61 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
62 .map(Some)
63 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
64 } else if let Some(v) = n.as_f64() {
65 T::deserialize(serde_json::Value::Number(
66 serde_json::Number::from_f64(v).unwrap(),
67 ))
68 .map(Some)
69 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
70 } else {
71 Err(serde::de::Error::custom("Invalid number format"))
72 }
73 },
74 serde_json::Value::String(s) => {
75 if s.is_empty() {
76 Ok(None)
77 } else {
78 s.parse::<T>().map(Some).map_err(serde::de::Error::custom)
79 }
80 },
81 _ => Err(serde::de::Error::custom("Expected number, string, or null")),
82 }
83 }
84
85 pub fn datetime_from_timestamp<'de, D>(
87 deserializer: D,
88 ) -> std::result::Result<DateTime<Utc>, D::Error>
89 where
90 D: Deserializer<'de>,
91 {
92 let timestamp = number_from_string::<u64, D>(deserializer)?;
93 DateTime::from_timestamp(timestamp as i64, 0)
94 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp"))
95 }
96
97 pub fn optional_datetime_from_timestamp<'de, D>(
99 deserializer: D,
100 ) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
101 where
102 D: Deserializer<'de>,
103 {
104 match optional_number_from_string::<u64, D>(deserializer)? {
105 Some(timestamp) => DateTime::from_timestamp(timestamp as i64, 0)
106 .map(Some)
107 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp")),
108 None => Ok(None),
109 }
110 }
111
112 pub fn vec_from_null<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
114 where
115 D: Deserializer<'de>,
116 T: serde::Deserialize<'de>,
117 {
118 Ok(Option::<Vec<T>>::deserialize(deserializer)?.unwrap_or_default())
119 }
120
121 pub fn optional_decimal_from_string<'de, D>(
127 deserializer: D,
128 ) -> std::result::Result<Option<Decimal>, D::Error>
129 where
130 D: Deserializer<'de>,
131 {
132 let value = serde_json::Value::deserialize(deserializer)?;
133 match value {
134 serde_json::Value::Null => Ok(None),
135 serde_json::Value::String(s) => {
136 let s = s.trim();
137 if s.is_empty() {
138 Ok(None)
139 } else {
140 s.parse::<Decimal>()
141 .map(Some)
142 .map_err(serde::de::Error::custom)
143 }
144 },
145 serde_json::Value::Number(n) => Decimal::from_str(&n.to_string())
146 .map(Some)
147 .map_err(serde::de::Error::custom),
148 other => Err(serde::de::Error::custom(format!(
149 "Expected decimal as string/number/null, got {other}"
150 ))),
151 }
152 }
153
154 pub fn optional_decimal_from_string_default_on_error<'de, D>(
156 deserializer: D,
157 ) -> std::result::Result<Option<Decimal>, D::Error>
158 where
159 D: Deserializer<'de>,
160 {
161 let value = serde_json::Value::deserialize(deserializer)?;
162 match value {
163 serde_json::Value::Null => Ok(None),
164 serde_json::Value::String(s) => {
165 let s = s.trim();
166 if s.is_empty() {
167 Ok(None)
168 } else {
169 Ok(s.parse::<Decimal>().ok())
170 }
171 },
172 serde_json::Value::Number(n) => Ok(Decimal::from_str(&n.to_string()).ok()),
173 _ => Ok(None),
174 }
175 }
176}
177
178#[derive(Debug, Deserialize)]
180pub struct RawOrderBookResponse {
181 pub market: String,
182 pub asset_id: String,
183 pub hash: String,
184 #[serde(deserialize_with = "deserializers::number_from_string")]
185 pub timestamp: u64,
186 pub bids: Vec<RawBookLevel>,
187 pub asks: Vec<RawBookLevel>,
188}
189
190#[derive(Debug, Deserialize)]
191pub struct RawBookLevel {
192 #[serde(with = "rust_decimal::serde::str")]
193 pub price: Decimal,
194 #[serde(with = "rust_decimal::serde::str")]
195 pub size: Decimal,
196}
197
198#[derive(Debug, Deserialize)]
199pub struct RawOrderResponse {
200 pub id: String,
201 pub status: String,
202 pub market: String,
203 pub asset_id: String,
204 pub maker_address: String,
205 pub owner: String,
206 pub outcome: String,
207 #[serde(rename = "type")]
208 pub order_type: OrderType,
209 pub side: Side,
210 #[serde(with = "rust_decimal::serde::str")]
211 pub original_size: Decimal,
212 #[serde(with = "rust_decimal::serde::str")]
213 pub price: Decimal,
214 #[serde(with = "rust_decimal::serde::str")]
215 pub size_matched: Decimal,
216 #[serde(deserialize_with = "deserializers::number_from_string")]
217 pub expiration: u64,
218 #[serde(deserialize_with = "deserializers::number_from_string")]
219 pub created_at: u64,
220}
221
222#[derive(Debug, Deserialize)]
223pub struct RawTradeResponse {
224 pub id: String,
225 pub market: String,
226 pub asset_id: String,
227 pub side: Side,
228 #[serde(with = "rust_decimal::serde::str")]
229 pub price: Decimal,
230 #[serde(with = "rust_decimal::serde::str")]
231 pub size: Decimal,
232 pub maker_address: String,
233 pub taker_address: String,
234 #[serde(deserialize_with = "deserializers::number_from_string")]
235 pub timestamp: u64,
236}
237
238#[derive(Debug, Deserialize)]
239pub struct RawMarketResponse {
240 pub condition_id: String,
241 pub tokens: [RawToken; 2],
242 pub active: bool,
243 pub closed: bool,
244 pub question: String,
245 pub description: String,
246 pub category: Option<String>,
247 pub end_date_iso: Option<String>,
248 #[serde(with = "rust_decimal::serde::str")]
249 pub minimum_order_size: Decimal,
250 #[serde(with = "rust_decimal::serde::str")]
251 pub minimum_tick_size: Decimal,
252}
253
254#[derive(Debug, Deserialize)]
255pub struct RawToken {
256 pub token_id: String,
257 pub outcome: String,
258}
259
260pub trait Decoder<T> {
262 fn decode(&self) -> Result<T>;
263}
264
265impl Decoder<OrderBook> for RawOrderBookResponse {
266 fn decode(&self) -> Result<OrderBook> {
267 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
268 .ok_or_else(|| PolyfillError::parse("Invalid timestamp".to_string(), None))?;
269
270 let bids = self
271 .bids
272 .iter()
273 .map(|level| BookLevel {
274 price: level.price,
275 size: level.size,
276 })
277 .collect();
278
279 let asks = self
280 .asks
281 .iter()
282 .map(|level| BookLevel {
283 price: level.price,
284 size: level.size,
285 })
286 .collect();
287
288 Ok(OrderBook {
289 token_id: self.asset_id.clone(),
290 timestamp,
291 bids,
292 asks,
293 sequence: 0, })
295 }
296}
297
298impl Decoder<Order> for RawOrderResponse {
299 fn decode(&self) -> Result<Order> {
300 let status = match self.status.as_str() {
301 "LIVE" => OrderStatus::Live,
302 "CANCELLED" => OrderStatus::Cancelled,
303 "FILLED" => OrderStatus::Filled,
304 "PARTIAL" => OrderStatus::Partial,
305 "EXPIRED" => OrderStatus::Expired,
306 _ => {
307 return Err(PolyfillError::parse(
308 format!("Unknown order status: {}", self.status),
309 None,
310 ))
311 },
312 };
313
314 let created_at =
315 chrono::DateTime::from_timestamp(self.created_at as i64, 0).ok_or_else(|| {
316 PolyfillError::parse("Invalid created_at timestamp".to_string(), None)
317 })?;
318
319 let expiration = if self.expiration > 0 {
320 Some(
321 chrono::DateTime::from_timestamp(self.expiration as i64, 0).ok_or_else(|| {
322 PolyfillError::parse("Invalid expiration timestamp".to_string(), None)
323 })?,
324 )
325 } else {
326 None
327 };
328
329 Ok(Order {
330 id: self.id.clone(),
331 token_id: self.asset_id.clone(),
332 side: self.side,
333 price: self.price,
334 original_size: self.original_size,
335 filled_size: self.size_matched,
336 remaining_size: self.original_size - self.size_matched,
337 status,
338 order_type: self.order_type,
339 created_at,
340 updated_at: created_at, expiration,
342 client_id: None,
343 })
344 }
345}
346
347impl Decoder<FillEvent> for RawTradeResponse {
348 fn decode(&self) -> Result<FillEvent> {
349 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
350 .ok_or_else(|| PolyfillError::parse("Invalid trade timestamp".to_string(), None))?;
351
352 let maker_address = Address::from_str(&self.maker_address)
353 .map_err(|e| PolyfillError::parse(format!("Invalid maker address: {}", e), None))?;
354
355 let taker_address = Address::from_str(&self.taker_address)
356 .map_err(|e| PolyfillError::parse(format!("Invalid taker address: {}", e), None))?;
357
358 Ok(FillEvent {
359 id: self.id.clone(),
360 order_id: "".to_string(), token_id: self.asset_id.clone(),
362 side: self.side,
363 price: self.price,
364 size: self.size,
365 timestamp,
366 maker_address,
367 taker_address,
368 fee: Decimal::ZERO, })
370 }
371}
372
373impl Decoder<Market> for RawMarketResponse {
374 fn decode(&self) -> Result<Market> {
375 let tokens = [
376 Token {
377 token_id: self.tokens[0].token_id.clone(),
378 outcome: self.tokens[0].outcome.clone(),
379 price: Decimal::ZERO,
380 winner: false,
381 },
382 Token {
383 token_id: self.tokens[1].token_id.clone(),
384 outcome: self.tokens[1].outcome.clone(),
385 price: Decimal::ZERO,
386 winner: false,
387 },
388 ];
389
390 Ok(Market {
391 condition_id: self.condition_id.clone(),
392 tokens,
393 rewards: crate::types::Rewards {
394 rates: None,
395 min_size: Decimal::ZERO,
396 max_spread: Decimal::ONE,
397 event_start_date: None,
398 event_end_date: None,
399 in_game_multiplier: None,
400 reward_epoch: None,
401 },
402 min_incentive_size: None,
403 max_incentive_spread: None,
404 active: self.active,
405 closed: self.closed,
406 question_id: self.condition_id.clone(), minimum_order_size: self.minimum_order_size,
408 minimum_tick_size: self.minimum_tick_size,
409 description: self.description.clone(),
410 category: self.category.clone(),
411 end_date_iso: self.end_date_iso.clone(),
412 game_start_time: None,
413 question: self.question.clone(),
414 market_slug: format!("market-{}", self.condition_id), seconds_delay: Decimal::ZERO,
416 icon: String::new(),
417 fpmm: String::new(),
418 enable_order_book: false,
420 archived: false,
421 accepting_orders: false,
422 accepting_order_timestamp: None,
423 maker_base_fee: Decimal::ZERO,
424 taker_base_fee: Decimal::ZERO,
425 notifications_enabled: false,
426 neg_risk: false,
427 neg_risk_market_id: String::new(),
428 neg_risk_request_id: String::new(),
429 image: String::new(),
430 is_50_50_outcome: false,
431 })
432 }
433}
434
435pub fn parse_stream_messages(raw: &str) -> Result<Vec<StreamMessage>> {
442 parse_stream_messages_bytes(raw.as_bytes())
443}
444
445pub fn parse_stream_messages_bytes(bytes: &[u8]) -> Result<Vec<StreamMessage>> {
447 let value: Value = serde_json::from_slice(bytes)?;
448
449 match value {
450 Value::Object(map) => {
451 let event_type = map.get("event_type").and_then(Value::as_str);
452 match event_type {
453 None => Ok(vec![]),
454 Some(_) => {
455 let msg: StreamMessage = serde_json::from_value(Value::Object(map))?;
456 match msg {
457 StreamMessage::Unknown => Ok(vec![]),
458 other => Ok(vec![other]),
459 }
460 },
461 }
462 },
463 Value::Array(arr) => Ok(arr
464 .into_iter()
465 .filter_map(|elem| {
466 let Value::Object(map) = elem else {
467 return None;
468 };
469
470 let event_type = map.get("event_type").and_then(Value::as_str)?;
471 match event_type {
473 "book" | "price_change" | "tick_size_change" | "last_trade_price"
474 | "best_bid_ask" | "new_market" | "market_resolved" | "trade" | "order" => {},
475 _ => return None,
476 }
477
478 match serde_json::from_value::<StreamMessage>(Value::Object(map)) {
479 Ok(StreamMessage::Unknown) => None,
480 Ok(msg) => Some(msg),
481 Err(_) => None,
482 }
483 })
484 .collect()),
485 _ => Ok(vec![]),
486 }
487}
488
489pub struct BatchDecoder {
491 buffer: Vec<u8>,
492}
493
494impl BatchDecoder {
495 pub fn new() -> Self {
496 Self {
497 buffer: Vec::with_capacity(8192),
498 }
499 }
500
501 pub fn parse_json_stream<T>(&mut self, data: &[u8]) -> Result<Vec<T>>
503 where
504 T: for<'de> serde::Deserialize<'de>,
505 {
506 self.buffer.extend_from_slice(data);
507 let mut results = Vec::new();
508 let mut start = 0;
509
510 while let Some(end) = self.find_json_boundary(start) {
511 let json_slice = &self.buffer[start..end];
512 if let Ok(obj) = serde_json::from_slice::<T>(json_slice) {
513 results.push(obj);
514 }
515 start = end;
516 }
517
518 if start > 0 {
520 self.buffer.drain(0..start);
521 }
522
523 Ok(results)
524 }
525
526 fn find_json_boundary(&self, start: usize) -> Option<usize> {
528 let mut depth = 0;
529 let mut in_string = false;
530 let mut escaped = false;
531
532 for (i, &byte) in self.buffer[start..].iter().enumerate() {
533 if escaped {
534 escaped = false;
535 continue;
536 }
537
538 match byte {
539 b'\\' if in_string => escaped = true,
540 b'"' => in_string = !in_string,
541 b'{' if !in_string => depth += 1,
542 b'}' if !in_string => {
543 depth -= 1;
544 if depth == 0 {
545 return Some(start + i + 1);
546 }
547 },
548 _ => {},
549 }
550 }
551
552 None
553 }
554}
555
556impl Default for BatchDecoder {
557 fn default() -> Self {
558 Self::new()
559 }
560}
561
562pub mod fast_parse {
564 use super::*;
565
566 #[inline]
568 pub fn parse_decimal(s: &str) -> Result<Decimal> {
569 Decimal::from_str(s)
570 .map_err(|e| PolyfillError::parse(format!("Invalid decimal: {}", e), None))
571 }
572
573 #[inline]
575 pub fn parse_address(s: &str) -> Result<Address> {
576 Address::from_str(s)
577 .map_err(|e| PolyfillError::parse(format!("Invalid address: {}", e), None))
578 }
579
580 #[inline]
584 pub fn parse_json_fast<T>(bytes: &mut [u8]) -> Result<T>
585 where
586 T: for<'de> serde::Deserialize<'de>,
587 {
588 match simd_json::serde::from_slice(bytes) {
590 Ok(val) => Ok(val),
591 Err(_) => {
592 serde_json::from_slice(bytes)
594 .map_err(|e| PolyfillError::parse(format!("JSON parse error: {}", e), None))
595 },
596 }
597 }
598
599 #[inline]
601 pub fn parse_json_fast_owned<T>(bytes: &[u8]) -> Result<T>
602 where
603 T: for<'de> serde::Deserialize<'de>,
604 {
605 let mut data = bytes.to_vec();
607 parse_json_fast(&mut data)
608 }
609
610 #[inline]
612 pub fn parse_u256(s: &str) -> Result<U256> {
613 U256::from_str_radix(s, 10)
614 .map_err(|e| PolyfillError::parse(format!("Invalid U256: {}", e), None))
615 }
616
617 #[inline]
619 pub fn parse_side(s: &str) -> Result<Side> {
620 match s.to_uppercase().as_str() {
621 "BUY" => Ok(Side::BUY),
622 "SELL" => Ok(Side::SELL),
623 _ => Err(PolyfillError::parse(format!("Invalid side: {}", s), None)),
624 }
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631
632 #[test]
633 fn test_parse_decimal() {
634 let result = fast_parse::parse_decimal("123.456").unwrap();
635 assert_eq!(result, Decimal::from_str("123.456").unwrap());
636 }
637
638 #[test]
639 fn test_parse_side() {
640 assert_eq!(fast_parse::parse_side("BUY").unwrap(), Side::BUY);
641 assert_eq!(fast_parse::parse_side("sell").unwrap(), Side::SELL);
642 assert!(fast_parse::parse_side("invalid").is_err());
643 }
644
645 #[test]
646 fn test_batch_decoder() {
647 let mut decoder = BatchDecoder::new();
648 let data = r#"{"test":1}{"test":2}"#.as_bytes();
649
650 let results: Vec<serde_json::Value> = decoder.parse_json_stream(data).unwrap();
651 assert_eq!(results.len(), 2);
652 }
653}