1use crate::errors::{PolyfillError, Result};
7use crate::types::*;
8use alloy_primitives::{Address, U256};
9use chrono::{DateTime, Utc};
10use rust_decimal::Decimal;
11use serde::{Deserialize, Deserializer};
12use serde_json::Value;
13use std::str::FromStr;
14
15pub mod deserializers {
17 use super::*;
18 use std::fmt::Display;
19
20 pub fn number_from_string<'de, T, D>(deserializer: D) -> std::result::Result<T, D::Error>
22 where
23 D: Deserializer<'de>,
24 T: FromStr + serde::Deserialize<'de> + Clone,
25 <T as FromStr>::Err: Display,
26 {
27 let value = serde_json::Value::deserialize(deserializer)?;
28 match value {
29 serde_json::Value::Number(n) => {
30 if let Some(v) = n.as_u64() {
31 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
32 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
33 } else if let Some(v) = n.as_f64() {
34 T::deserialize(serde_json::Value::Number(
35 serde_json::Number::from_f64(v).unwrap(),
36 ))
37 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
38 } else {
39 Err(serde::de::Error::custom("Invalid number format"))
40 }
41 },
42 serde_json::Value::String(s) => s.parse::<T>().map_err(serde::de::Error::custom),
43 _ => Err(serde::de::Error::custom("Expected number or string")),
44 }
45 }
46
47 pub fn optional_number_from_string<'de, T, D>(
49 deserializer: D,
50 ) -> std::result::Result<Option<T>, D::Error>
51 where
52 D: Deserializer<'de>,
53 T: FromStr + serde::Deserialize<'de> + Clone,
54 <T as FromStr>::Err: Display,
55 {
56 let value = serde_json::Value::deserialize(deserializer)?;
57 match value {
58 serde_json::Value::Null => Ok(None),
59 serde_json::Value::Number(n) => {
60 if let Some(v) = n.as_u64() {
61 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
62 .map(Some)
63 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
64 } else if let Some(v) = n.as_f64() {
65 T::deserialize(serde_json::Value::Number(
66 serde_json::Number::from_f64(v).unwrap(),
67 ))
68 .map(Some)
69 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
70 } else {
71 Err(serde::de::Error::custom("Invalid number format"))
72 }
73 },
74 serde_json::Value::String(s) => {
75 if s.is_empty() {
76 Ok(None)
77 } else {
78 s.parse::<T>().map(Some).map_err(serde::de::Error::custom)
79 }
80 },
81 _ => Err(serde::de::Error::custom("Expected number, string, or null")),
82 }
83 }
84
85 pub fn datetime_from_timestamp<'de, D>(
87 deserializer: D,
88 ) -> std::result::Result<DateTime<Utc>, D::Error>
89 where
90 D: Deserializer<'de>,
91 {
92 let timestamp = number_from_string::<u64, D>(deserializer)?;
93 DateTime::from_timestamp(timestamp as i64, 0)
94 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp"))
95 }
96
97 pub fn optional_datetime_from_timestamp<'de, D>(
99 deserializer: D,
100 ) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
101 where
102 D: Deserializer<'de>,
103 {
104 match optional_number_from_string::<u64, D>(deserializer)? {
105 Some(timestamp) => DateTime::from_timestamp(timestamp as i64, 0)
106 .map(Some)
107 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp")),
108 None => Ok(None),
109 }
110 }
111}
112
113#[derive(Debug, Deserialize)]
115pub struct RawOrderBookResponse {
116 pub market: String,
117 pub asset_id: String,
118 pub hash: String,
119 #[serde(deserialize_with = "deserializers::number_from_string")]
120 pub timestamp: u64,
121 pub bids: Vec<RawBookLevel>,
122 pub asks: Vec<RawBookLevel>,
123}
124
125#[derive(Debug, Deserialize)]
126pub struct RawBookLevel {
127 #[serde(with = "rust_decimal::serde::str")]
128 pub price: Decimal,
129 #[serde(with = "rust_decimal::serde::str")]
130 pub size: Decimal,
131}
132
133#[derive(Debug, Deserialize)]
134pub struct RawOrderResponse {
135 pub id: String,
136 pub status: String,
137 pub market: String,
138 pub asset_id: String,
139 pub maker_address: String,
140 pub owner: String,
141 pub outcome: String,
142 #[serde(rename = "type")]
143 pub order_type: OrderType,
144 pub side: Side,
145 #[serde(with = "rust_decimal::serde::str")]
146 pub original_size: Decimal,
147 #[serde(with = "rust_decimal::serde::str")]
148 pub price: Decimal,
149 #[serde(with = "rust_decimal::serde::str")]
150 pub size_matched: Decimal,
151 #[serde(deserialize_with = "deserializers::number_from_string")]
152 pub expiration: u64,
153 #[serde(deserialize_with = "deserializers::number_from_string")]
154 pub created_at: u64,
155}
156
157#[derive(Debug, Deserialize)]
158pub struct RawTradeResponse {
159 pub id: String,
160 pub market: String,
161 pub asset_id: String,
162 pub side: Side,
163 #[serde(with = "rust_decimal::serde::str")]
164 pub price: Decimal,
165 #[serde(with = "rust_decimal::serde::str")]
166 pub size: Decimal,
167 pub maker_address: String,
168 pub taker_address: String,
169 #[serde(deserialize_with = "deserializers::number_from_string")]
170 pub timestamp: u64,
171}
172
173#[derive(Debug, Deserialize)]
174pub struct RawMarketResponse {
175 pub condition_id: String,
176 pub tokens: [RawToken; 2],
177 pub active: bool,
178 pub closed: bool,
179 pub question: String,
180 pub description: String,
181 pub category: Option<String>,
182 pub end_date_iso: Option<String>,
183 #[serde(with = "rust_decimal::serde::str")]
184 pub minimum_order_size: Decimal,
185 #[serde(with = "rust_decimal::serde::str")]
186 pub minimum_tick_size: Decimal,
187}
188
189#[derive(Debug, Deserialize)]
190pub struct RawToken {
191 pub token_id: String,
192 pub outcome: String,
193}
194
195pub trait Decoder<T> {
197 fn decode(&self) -> Result<T>;
198}
199
200impl Decoder<OrderBook> for RawOrderBookResponse {
201 fn decode(&self) -> Result<OrderBook> {
202 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
203 .ok_or_else(|| PolyfillError::parse("Invalid timestamp".to_string(), None))?;
204
205 let bids = self
206 .bids
207 .iter()
208 .map(|level| BookLevel {
209 price: level.price,
210 size: level.size,
211 })
212 .collect();
213
214 let asks = self
215 .asks
216 .iter()
217 .map(|level| BookLevel {
218 price: level.price,
219 size: level.size,
220 })
221 .collect();
222
223 Ok(OrderBook {
224 token_id: self.asset_id.clone(),
225 timestamp,
226 bids,
227 asks,
228 sequence: 0, })
230 }
231}
232
233impl Decoder<Order> for RawOrderResponse {
234 fn decode(&self) -> Result<Order> {
235 let status = match self.status.as_str() {
236 "LIVE" => OrderStatus::Live,
237 "CANCELLED" => OrderStatus::Cancelled,
238 "FILLED" => OrderStatus::Filled,
239 "PARTIAL" => OrderStatus::Partial,
240 "EXPIRED" => OrderStatus::Expired,
241 _ => {
242 return Err(PolyfillError::parse(
243 format!("Unknown order status: {}", self.status),
244 None,
245 ))
246 },
247 };
248
249 let created_at =
250 chrono::DateTime::from_timestamp(self.created_at as i64, 0).ok_or_else(|| {
251 PolyfillError::parse("Invalid created_at timestamp".to_string(), None)
252 })?;
253
254 let expiration = if self.expiration > 0 {
255 Some(
256 chrono::DateTime::from_timestamp(self.expiration as i64, 0).ok_or_else(|| {
257 PolyfillError::parse("Invalid expiration timestamp".to_string(), None)
258 })?,
259 )
260 } else {
261 None
262 };
263
264 Ok(Order {
265 id: self.id.clone(),
266 token_id: self.asset_id.clone(),
267 side: self.side,
268 price: self.price,
269 original_size: self.original_size,
270 filled_size: self.size_matched,
271 remaining_size: self.original_size - self.size_matched,
272 status,
273 order_type: self.order_type,
274 created_at,
275 updated_at: created_at, expiration,
277 client_id: None,
278 })
279 }
280}
281
282impl Decoder<FillEvent> for RawTradeResponse {
283 fn decode(&self) -> Result<FillEvent> {
284 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
285 .ok_or_else(|| PolyfillError::parse("Invalid trade timestamp".to_string(), None))?;
286
287 let maker_address = Address::from_str(&self.maker_address)
288 .map_err(|e| PolyfillError::parse(format!("Invalid maker address: {}", e), None))?;
289
290 let taker_address = Address::from_str(&self.taker_address)
291 .map_err(|e| PolyfillError::parse(format!("Invalid taker address: {}", e), None))?;
292
293 Ok(FillEvent {
294 id: self.id.clone(),
295 order_id: "".to_string(), token_id: self.asset_id.clone(),
297 side: self.side,
298 price: self.price,
299 size: self.size,
300 timestamp,
301 maker_address,
302 taker_address,
303 fee: Decimal::ZERO, })
305 }
306}
307
308impl Decoder<Market> for RawMarketResponse {
309 fn decode(&self) -> Result<Market> {
310 let tokens = [
311 Token {
312 token_id: self.tokens[0].token_id.clone(),
313 outcome: self.tokens[0].outcome.clone(),
314 },
315 Token {
316 token_id: self.tokens[1].token_id.clone(),
317 outcome: self.tokens[1].outcome.clone(),
318 },
319 ];
320
321 Ok(Market {
322 condition_id: self.condition_id.clone(),
323 tokens,
324 rewards: crate::types::Rewards {
325 rates: None,
326 min_size: Decimal::ZERO,
327 max_spread: Decimal::ONE,
328 event_start_date: None,
329 event_end_date: None,
330 in_game_multiplier: None,
331 reward_epoch: None,
332 },
333 min_incentive_size: None,
334 max_incentive_spread: None,
335 active: self.active,
336 closed: self.closed,
337 question_id: self.condition_id.clone(), minimum_order_size: self.minimum_order_size,
339 minimum_tick_size: self.minimum_tick_size,
340 description: self.description.clone(),
341 category: self.category.clone(),
342 end_date_iso: self.end_date_iso.clone(),
343 game_start_time: None,
344 question: self.question.clone(),
345 market_slug: format!("market-{}", self.condition_id), seconds_delay: Decimal::ZERO,
347 icon: String::new(),
348 fpmm: String::new(),
349 })
350 }
351}
352
353pub fn parse_stream_message(raw: &str) -> Result<StreamMessage> {
355 let value: Value = serde_json::from_str(raw)?;
356
357 let msg_type = value["type"]
358 .as_str()
359 .ok_or_else(|| PolyfillError::parse("Missing message type".to_string(), None))?;
360
361 match msg_type {
362 "book_update" => {
363 let data = value["data"].clone();
364 let delta: OrderDelta = serde_json::from_value(data)?;
365 Ok(StreamMessage::BookUpdate { data: delta })
366 },
367 "trade" => {
368 let data = value["data"].clone();
369 let raw_trade: RawTradeResponse = serde_json::from_value(data)?;
370 let fill = raw_trade.decode()?;
371 Ok(StreamMessage::Trade { data: fill })
372 },
373 "order_update" => {
374 let data = value["data"].clone();
375 let raw_order: RawOrderResponse = serde_json::from_value(data)?;
376 let order = raw_order.decode()?;
377 Ok(StreamMessage::OrderUpdate { data: order })
378 },
379 "heartbeat" => {
380 let timestamp = value["timestamp"]
381 .as_str()
382 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
383 .map(|dt| dt.with_timezone(&Utc))
384 .unwrap_or_else(Utc::now);
385 Ok(StreamMessage::Heartbeat { timestamp })
386 },
387 _ => Err(PolyfillError::parse(
388 format!("Unknown message type: {}", msg_type),
389 None,
390 )),
391 }
392}
393
394pub struct BatchDecoder {
396 buffer: Vec<u8>,
397}
398
399impl BatchDecoder {
400 pub fn new() -> Self {
401 Self {
402 buffer: Vec::with_capacity(8192),
403 }
404 }
405
406 pub fn parse_json_stream<T>(&mut self, data: &[u8]) -> Result<Vec<T>>
408 where
409 T: for<'de> serde::Deserialize<'de>,
410 {
411 self.buffer.extend_from_slice(data);
412 let mut results = Vec::new();
413 let mut start = 0;
414
415 while let Some(end) = self.find_json_boundary(start) {
416 let json_slice = &self.buffer[start..end];
417 if let Ok(obj) = serde_json::from_slice::<T>(json_slice) {
418 results.push(obj);
419 }
420 start = end;
421 }
422
423 if start > 0 {
425 self.buffer.drain(0..start);
426 }
427
428 Ok(results)
429 }
430
431 fn find_json_boundary(&self, start: usize) -> Option<usize> {
433 let mut depth = 0;
434 let mut in_string = false;
435 let mut escaped = false;
436
437 for (i, &byte) in self.buffer[start..].iter().enumerate() {
438 if escaped {
439 escaped = false;
440 continue;
441 }
442
443 match byte {
444 b'\\' if in_string => escaped = true,
445 b'"' => in_string = !in_string,
446 b'{' if !in_string => depth += 1,
447 b'}' if !in_string => {
448 depth -= 1;
449 if depth == 0 {
450 return Some(start + i + 1);
451 }
452 },
453 _ => {},
454 }
455 }
456
457 None
458 }
459}
460
461impl Default for BatchDecoder {
462 fn default() -> Self {
463 Self::new()
464 }
465}
466
467pub mod fast_parse {
469 use super::*;
470
471 #[inline]
473 pub fn parse_decimal(s: &str) -> Result<Decimal> {
474 Decimal::from_str(s)
475 .map_err(|e| PolyfillError::parse(format!("Invalid decimal: {}", e), None))
476 }
477
478 #[inline]
480 pub fn parse_address(s: &str) -> Result<Address> {
481 Address::from_str(s)
482 .map_err(|e| PolyfillError::parse(format!("Invalid address: {}", e), None))
483 }
484
485 #[inline]
487 pub fn parse_u256(s: &str) -> Result<U256> {
488 U256::from_str_radix(s, 10)
489 .map_err(|e| PolyfillError::parse(format!("Invalid U256: {}", e), None))
490 }
491
492 #[inline]
494 pub fn parse_side(s: &str) -> Result<Side> {
495 match s.to_uppercase().as_str() {
496 "BUY" => Ok(Side::BUY),
497 "SELL" => Ok(Side::SELL),
498 _ => Err(PolyfillError::parse(format!("Invalid side: {}", s), None)),
499 }
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[test]
508 fn test_parse_decimal() {
509 let result = fast_parse::parse_decimal("123.456").unwrap();
510 assert_eq!(result, Decimal::from_str("123.456").unwrap());
511 }
512
513 #[test]
514 fn test_parse_side() {
515 assert_eq!(fast_parse::parse_side("BUY").unwrap(), Side::BUY);
516 assert_eq!(fast_parse::parse_side("sell").unwrap(), Side::SELL);
517 assert!(fast_parse::parse_side("invalid").is_err());
518 }
519
520 #[test]
521 fn test_batch_decoder() {
522 let mut decoder = BatchDecoder::new();
523 let data = r#"{"test":1}{"test":2}"#.as_bytes();
524
525 let results: Vec<serde_json::Value> = decoder.parse_json_stream(data).unwrap();
526 assert_eq!(results.len(), 2);
527 }
528}