1use crate::errors::{PolyfillError, Result};
7use crate::types::*;
8use alloy_primitives::{Address, U256};
9use chrono::{DateTime, Utc};
10use rust_decimal::Decimal;
11use serde::{Deserialize, Deserializer};
12use serde_json::Value;
13use std::str::FromStr;
14
15pub mod deserializers {
17 use super::*;
18 use std::fmt::Display;
19
20 pub fn number_from_string<'de, T, D>(deserializer: D) -> std::result::Result<T, D::Error>
22 where
23 D: Deserializer<'de>,
24 T: FromStr + serde::Deserialize<'de> + Clone,
25 <T as FromStr>::Err: Display,
26 {
27 let value = serde_json::Value::deserialize(deserializer)?;
28 match value {
29 serde_json::Value::Number(n) => {
30 if let Some(v) = n.as_u64() {
31 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
32 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
33 } else if let Some(v) = n.as_f64() {
34 T::deserialize(serde_json::Value::Number(serde_json::Number::from_f64(v).unwrap()))
35 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
36 } else {
37 Err(serde::de::Error::custom("Invalid number format"))
38 }
39 }
40 serde_json::Value::String(s) => {
41 s.parse::<T>().map_err(serde::de::Error::custom)
42 }
43 _ => Err(serde::de::Error::custom("Expected number or string")),
44 }
45 }
46
47 pub fn optional_number_from_string<'de, T, D>(
49 deserializer: D,
50 ) -> std::result::Result<Option<T>, D::Error>
51 where
52 D: Deserializer<'de>,
53 T: FromStr + serde::Deserialize<'de> + Clone,
54 <T as FromStr>::Err: Display,
55 {
56 let value = serde_json::Value::deserialize(deserializer)?;
57 match value {
58 serde_json::Value::Null => Ok(None),
59 serde_json::Value::Number(n) => {
60 if let Some(v) = n.as_u64() {
61 T::deserialize(serde_json::Value::Number(serde_json::Number::from(v)))
62 .map(Some)
63 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
64 } else if let Some(v) = n.as_f64() {
65 T::deserialize(serde_json::Value::Number(serde_json::Number::from_f64(v).unwrap()))
66 .map(Some)
67 .map_err(|_| serde::de::Error::custom("Failed to deserialize number"))
68 } else {
69 Err(serde::de::Error::custom("Invalid number format"))
70 }
71 }
72 serde_json::Value::String(s) => {
73 if s.is_empty() {
74 Ok(None)
75 } else {
76 s.parse::<T>()
77 .map(Some)
78 .map_err(serde::de::Error::custom)
79 }
80 }
81 _ => Err(serde::de::Error::custom("Expected number, string, or null")),
82 }
83 }
84
85 pub fn datetime_from_timestamp<'de, D>(deserializer: D) -> std::result::Result<DateTime<Utc>, D::Error>
87 where
88 D: Deserializer<'de>,
89 {
90 let timestamp = number_from_string::<u64, D>(deserializer)?;
91 DateTime::from_timestamp(timestamp as i64, 0)
92 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp"))
93 }
94
95 pub fn optional_datetime_from_timestamp<'de, D>(
97 deserializer: D,
98 ) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
99 where
100 D: Deserializer<'de>,
101 {
102 match optional_number_from_string::<u64, D>(deserializer)? {
103 Some(timestamp) => DateTime::from_timestamp(timestamp as i64, 0)
104 .map(Some)
105 .ok_or_else(|| serde::de::Error::custom("Invalid timestamp")),
106 None => Ok(None),
107 }
108 }
109}
110
111#[derive(Debug, Deserialize)]
113pub struct RawOrderBookResponse {
114 pub market: String,
115 pub asset_id: String,
116 pub hash: String,
117 #[serde(deserialize_with = "deserializers::number_from_string")]
118 pub timestamp: u64,
119 pub bids: Vec<RawBookLevel>,
120 pub asks: Vec<RawBookLevel>,
121}
122
123#[derive(Debug, Deserialize)]
124pub struct RawBookLevel {
125 #[serde(with = "rust_decimal::serde::str")]
126 pub price: Decimal,
127 #[serde(with = "rust_decimal::serde::str")]
128 pub size: Decimal,
129}
130
131#[derive(Debug, Deserialize)]
132pub struct RawOrderResponse {
133 pub id: String,
134 pub status: String,
135 pub market: String,
136 pub asset_id: String,
137 pub maker_address: String,
138 pub owner: String,
139 pub outcome: String,
140 #[serde(rename = "type")]
141 pub order_type: OrderType,
142 pub side: Side,
143 #[serde(with = "rust_decimal::serde::str")]
144 pub original_size: Decimal,
145 #[serde(with = "rust_decimal::serde::str")]
146 pub price: Decimal,
147 #[serde(with = "rust_decimal::serde::str")]
148 pub size_matched: Decimal,
149 #[serde(deserialize_with = "deserializers::number_from_string")]
150 pub expiration: u64,
151 #[serde(deserialize_with = "deserializers::number_from_string")]
152 pub created_at: u64,
153}
154
155#[derive(Debug, Deserialize)]
156pub struct RawTradeResponse {
157 pub id: String,
158 pub market: String,
159 pub asset_id: String,
160 pub side: Side,
161 #[serde(with = "rust_decimal::serde::str")]
162 pub price: Decimal,
163 #[serde(with = "rust_decimal::serde::str")]
164 pub size: Decimal,
165 pub maker_address: String,
166 pub taker_address: String,
167 #[serde(deserialize_with = "deserializers::number_from_string")]
168 pub timestamp: u64,
169}
170
171#[derive(Debug, Deserialize)]
172pub struct RawMarketResponse {
173 pub condition_id: String,
174 pub tokens: [RawToken; 2],
175 pub active: bool,
176 pub closed: bool,
177 pub question: String,
178 pub description: String,
179 pub category: Option<String>,
180 pub end_date_iso: Option<String>,
181 #[serde(with = "rust_decimal::serde::str")]
182 pub minimum_order_size: Decimal,
183 #[serde(with = "rust_decimal::serde::str")]
184 pub minimum_tick_size: Decimal,
185}
186
187#[derive(Debug, Deserialize)]
188pub struct RawToken {
189 pub token_id: String,
190 pub outcome: String,
191}
192
193pub trait Decoder<T> {
195 fn decode(&self) -> Result<T>;
196}
197
198impl Decoder<OrderBook> for RawOrderBookResponse {
199 fn decode(&self) -> Result<OrderBook> {
200 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
201 .ok_or_else(|| PolyfillError::parse("Invalid timestamp".to_string(), None))?;
202
203 let bids = self
204 .bids
205 .iter()
206 .map(|level| BookLevel {
207 price: level.price,
208 size: level.size,
209 })
210 .collect();
211
212 let asks = self
213 .asks
214 .iter()
215 .map(|level| BookLevel {
216 price: level.price,
217 size: level.size,
218 })
219 .collect();
220
221 Ok(OrderBook {
222 token_id: self.asset_id.clone(),
223 timestamp,
224 bids,
225 asks,
226 sequence: 0, })
228 }
229}
230
231impl Decoder<Order> for RawOrderResponse {
232 fn decode(&self) -> Result<Order> {
233 let status = match self.status.as_str() {
234 "LIVE" => OrderStatus::Live,
235 "CANCELLED" => OrderStatus::Cancelled,
236 "FILLED" => OrderStatus::Filled,
237 "PARTIAL" => OrderStatus::Partial,
238 "EXPIRED" => OrderStatus::Expired,
239 _ => return Err(PolyfillError::parse(
240 format!("Unknown order status: {}", self.status),
241 None,
242 )),
243 };
244
245 let created_at = chrono::DateTime::from_timestamp(self.created_at as i64, 0)
246 .ok_or_else(|| PolyfillError::parse("Invalid created_at timestamp".to_string(), None))?;
247
248 let expiration = if self.expiration > 0 {
249 Some(chrono::DateTime::from_timestamp(self.expiration as i64, 0)
250 .ok_or_else(|| PolyfillError::parse("Invalid expiration timestamp".to_string(), None))?)
251 } else {
252 None
253 };
254
255 Ok(Order {
256 id: self.id.clone(),
257 token_id: self.asset_id.clone(),
258 side: self.side,
259 price: self.price,
260 original_size: self.original_size,
261 filled_size: self.size_matched,
262 remaining_size: self.original_size - self.size_matched,
263 status,
264 order_type: self.order_type,
265 created_at,
266 updated_at: created_at, expiration,
268 client_id: None,
269 })
270 }
271}
272
273impl Decoder<FillEvent> for RawTradeResponse {
274 fn decode(&self) -> Result<FillEvent> {
275 let timestamp = chrono::DateTime::from_timestamp(self.timestamp as i64, 0)
276 .ok_or_else(|| PolyfillError::parse("Invalid trade timestamp".to_string(), None))?;
277
278 let maker_address = Address::from_str(&self.maker_address)
279 .map_err(|e| PolyfillError::parse(format!("Invalid maker address: {}", e), None))?;
280
281 let taker_address = Address::from_str(&self.taker_address)
282 .map_err(|e| PolyfillError::parse(format!("Invalid taker address: {}", e), None))?;
283
284 Ok(FillEvent {
285 id: self.id.clone(),
286 order_id: "".to_string(), token_id: self.asset_id.clone(),
288 side: self.side,
289 price: self.price,
290 size: self.size,
291 timestamp,
292 maker_address,
293 taker_address,
294 fee: Decimal::ZERO, })
296 }
297}
298
299impl Decoder<Market> for RawMarketResponse {
300 fn decode(&self) -> Result<Market> {
301 let tokens = [
302 Token {
303 token_id: self.tokens[0].token_id.clone(),
304 outcome: self.tokens[0].outcome.clone(),
305 },
306 Token {
307 token_id: self.tokens[1].token_id.clone(),
308 outcome: self.tokens[1].outcome.clone(),
309 },
310 ];
311
312 Ok(Market {
313 condition_id: self.condition_id.clone(),
314 tokens,
315 rewards: crate::types::Rewards {
316 rates: None,
317 min_size: Decimal::ZERO,
318 max_spread: Decimal::ONE,
319 event_start_date: None,
320 event_end_date: None,
321 in_game_multiplier: None,
322 reward_epoch: None,
323 },
324 min_incentive_size: None,
325 max_incentive_spread: None,
326 active: self.active,
327 closed: self.closed,
328 question_id: self.condition_id.clone(), minimum_order_size: self.minimum_order_size,
330 minimum_tick_size: self.minimum_tick_size,
331 description: self.description.clone(),
332 category: self.category.clone(),
333 end_date_iso: self.end_date_iso.clone(),
334 game_start_time: None,
335 question: self.question.clone(),
336 market_slug: format!("market-{}", self.condition_id), seconds_delay: Decimal::ZERO,
338 icon: String::new(),
339 fpmm: String::new(),
340 })
341 }
342}
343
344pub fn parse_stream_message(raw: &str) -> Result<StreamMessage> {
346 let value: Value = serde_json::from_str(raw)?;
347
348 let msg_type = value["type"]
349 .as_str()
350 .ok_or_else(|| PolyfillError::parse("Missing message type".to_string(), None))?;
351
352 match msg_type {
353 "book_update" => {
354 let data = value["data"].clone();
355 let delta: OrderDelta = serde_json::from_value(data)?;
356 Ok(StreamMessage::BookUpdate { data: delta })
357 }
358 "trade" => {
359 let data = value["data"].clone();
360 let raw_trade: RawTradeResponse = serde_json::from_value(data)?;
361 let fill = raw_trade.decode()?;
362 Ok(StreamMessage::Trade { data: fill })
363 }
364 "order_update" => {
365 let data = value["data"].clone();
366 let raw_order: RawOrderResponse = serde_json::from_value(data)?;
367 let order = raw_order.decode()?;
368 Ok(StreamMessage::OrderUpdate { data: order })
369 }
370 "heartbeat" => {
371 let timestamp = value["timestamp"]
372 .as_str()
373 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
374 .map(|dt| dt.with_timezone(&Utc))
375 .unwrap_or_else(|| Utc::now());
376 Ok(StreamMessage::Heartbeat { timestamp })
377 }
378 _ => Err(PolyfillError::parse(
379 format!("Unknown message type: {}", msg_type),
380 None,
381 )),
382 }
383}
384
385pub struct BatchDecoder {
387 buffer: Vec<u8>,
388}
389
390impl BatchDecoder {
391 pub fn new() -> Self {
392 Self {
393 buffer: Vec::with_capacity(8192),
394 }
395 }
396
397 pub fn parse_json_stream<T>(&mut self, data: &[u8]) -> Result<Vec<T>>
399 where
400 T: for<'de> serde::Deserialize<'de>,
401 {
402 self.buffer.extend_from_slice(data);
403 let mut results = Vec::new();
404 let mut start = 0;
405
406 while let Some(end) = self.find_json_boundary(start) {
407 let json_slice = &self.buffer[start..end];
408 if let Ok(obj) = serde_json::from_slice::<T>(json_slice) {
409 results.push(obj);
410 }
411 start = end;
412 }
413
414 if start > 0 {
416 self.buffer.drain(0..start);
417 }
418
419 Ok(results)
420 }
421
422 fn find_json_boundary(&self, start: usize) -> Option<usize> {
424 let mut depth = 0;
425 let mut in_string = false;
426 let mut escaped = false;
427
428 for (i, &byte) in self.buffer[start..].iter().enumerate() {
429 if escaped {
430 escaped = false;
431 continue;
432 }
433
434 match byte {
435 b'\\' if in_string => escaped = true,
436 b'"' => in_string = !in_string,
437 b'{' if !in_string => depth += 1,
438 b'}' if !in_string => {
439 depth -= 1;
440 if depth == 0 {
441 return Some(start + i + 1);
442 }
443 }
444 _ => {}
445 }
446 }
447
448 None
449 }
450}
451
452impl Default for BatchDecoder {
453 fn default() -> Self {
454 Self::new()
455 }
456}
457
458pub mod fast_parse {
460 use super::*;
461
462 #[inline]
464 pub fn parse_decimal(s: &str) -> Result<Decimal> {
465 Decimal::from_str(s)
466 .map_err(|e| PolyfillError::parse(format!("Invalid decimal: {}", e), None))
467 }
468
469 #[inline]
471 pub fn parse_address(s: &str) -> Result<Address> {
472 Address::from_str(s)
473 .map_err(|e| PolyfillError::parse(format!("Invalid address: {}", e), None))
474 }
475
476 #[inline]
478 pub fn parse_u256(s: &str) -> Result<U256> {
479 U256::from_str_radix(s, 10)
480 .map_err(|e| PolyfillError::parse(format!("Invalid U256: {}", e), None))
481 }
482
483 #[inline]
485 pub fn parse_side(s: &str) -> Result<Side> {
486 match s.to_uppercase().as_str() {
487 "BUY" => Ok(Side::BUY),
488 "SELL" => Ok(Side::SELL),
489 _ => Err(PolyfillError::parse(format!("Invalid side: {}", s), None)),
490 }
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn test_parse_decimal() {
500 let result = fast_parse::parse_decimal("123.456").unwrap();
501 assert_eq!(result, Decimal::from_str("123.456").unwrap());
502 }
503
504 #[test]
505 fn test_parse_side() {
506 assert_eq!(fast_parse::parse_side("BUY").unwrap(), Side::BUY);
507 assert_eq!(fast_parse::parse_side("sell").unwrap(), Side::SELL);
508 assert!(fast_parse::parse_side("invalid").is_err());
509 }
510
511 #[test]
512 fn test_batch_decoder() {
513 let mut decoder = BatchDecoder::new();
514 let data = r#"{"test":1}{"test":2}"#.as_bytes();
515
516 let results: Vec<serde_json::Value> = decoder.parse_json_stream(data).unwrap();
517 assert_eq!(results.len(), 2);
518 }
519}