1use crate::{
10 errors::{MarketDataError, Result},
11 types::{Bar, Quote, Trade},
12 websocket::{WebSocketClient, WebSocketStream},
13 {HealthStatus, MarketDataProvider, QuoteStream, TradeStream},
14};
15use async_trait::async_trait;
16use chrono::{DateTime, Utc};
17use dashmap::DashMap;
18use futures::{Stream, StreamExt};
19use governor::{Quota, RateLimiter};
20use parking_lot::RwLock;
21use rust_decimal::Decimal;
22use serde::{Deserialize, Serialize};
23use std::{
24 sync::Arc,
25 time::Duration,
26};
27use tokio::sync::broadcast;
28use tokio_tungstenite::tungstenite::Message;
29use tracing::{debug, error, info, warn};
30
31const POLYGON_WS_URL: &str = "wss://socket.polygon.io";
32const POLYGON_REST_URL: &str = "https://api.polygon.io";
33const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(60);
34const INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
35const CHANNEL_BUFFER_SIZE: usize = 10000;
36const RATE_LIMIT_PER_MINUTE: u32 = 1000;
37
38#[derive(Debug, Clone)]
41pub enum PolygonEvent {
42 Status {
44 status: String,
45 message: String,
46 },
47
48 Trade {
50 symbol: String,
51 timestamp: i64,
52 price: f64,
53 size: u64,
54 conditions: Vec<i32>,
55 exchange: u8,
56 },
57
58 Quote {
60 symbol: String,
61 timestamp: i64,
62 bid_price: f64,
63 ask_price: f64,
64 bid_size: u64,
65 ask_size: u64,
66 bid_exchange: u8,
67 ask_exchange: u8,
68 },
69
70 AggregateBar {
72 symbol: String,
73 start_timestamp: i64,
74 end_timestamp: i64,
75 open: f64,
76 high: f64,
77 low: f64,
78 close: f64,
79 volume: u64,
80 vwap: f64,
81 },
82
83 Level2 {
85 symbol: String,
86 timestamp: i64,
87 bids: Vec<PriceLevel>,
88 asks: Vec<PriceLevel>,
89 },
90}
91
92impl<'de> Deserialize<'de> for PolygonEvent {
94 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
95 where
96 D: serde::Deserializer<'de>,
97 {
98 use serde::de::Error;
99
100 #[derive(Debug, Deserialize)]
101 struct RawEvent {
102 ev: String,
103 #[serde(default)]
104 status: Option<String>,
105 #[serde(default)]
106 message: Option<String>,
107 #[serde(default, rename = "sym")]
108 symbol: Option<String>,
109 #[serde(default, rename = "t")]
110 timestamp: Option<i64>,
111 #[serde(default, rename = "p")]
112 price: Option<f64>,
113 #[serde(default, rename = "s")]
114 size_or_start: Option<i64>,
115 #[serde(default, rename = "c")]
116 conditions_or_close: Option<serde_json::Value>,
117 #[serde(default, rename = "x")]
118 exchange: Option<u8>,
119 #[serde(default, rename = "bp")]
120 bid_price: Option<f64>,
121 #[serde(default, rename = "ap")]
122 ask_price: Option<f64>,
123 #[serde(default, rename = "bs")]
124 bid_size: Option<u64>,
125 #[serde(default, rename = "as")]
126 ask_size: Option<u64>,
127 #[serde(default, rename = "bx")]
128 bid_exchange: Option<u8>,
129 #[serde(default, rename = "ax")]
130 ask_exchange: Option<u8>,
131 #[serde(default, rename = "e")]
132 end_timestamp: Option<i64>,
133 #[serde(default, rename = "o")]
134 open: Option<f64>,
135 #[serde(default, rename = "h")]
136 high: Option<f64>,
137 #[serde(default, rename = "l")]
138 low: Option<f64>,
139 #[serde(default, rename = "v")]
140 volume: Option<u64>,
141 #[serde(default, rename = "vw")]
142 vwap: Option<f64>,
143 #[serde(default, rename = "b")]
144 bids: Option<Vec<PriceLevel>>,
145 #[serde(default, rename = "a")]
146 asks: Option<Vec<PriceLevel>>,
147 }
148
149 let raw = RawEvent::deserialize(deserializer)?;
150
151 match raw.ev.as_str() {
152 "status" => Ok(PolygonEvent::Status {
153 status: raw.status.ok_or_else(|| D::Error::missing_field("status"))?,
154 message: raw.message.ok_or_else(|| D::Error::missing_field("message"))?,
155 }),
156 "T" => Ok(PolygonEvent::Trade {
157 symbol: raw.symbol.ok_or_else(|| D::Error::missing_field("sym"))?,
158 timestamp: raw.timestamp.ok_or_else(|| D::Error::missing_field("t"))?,
159 price: raw.price.ok_or_else(|| D::Error::missing_field("p"))?,
160 size: raw.size_or_start.ok_or_else(|| D::Error::missing_field("s"))? as u64,
161 conditions: raw.conditions_or_close
162 .and_then(|v| serde_json::from_value(v).ok())
163 .unwrap_or_default(),
164 exchange: raw.exchange.ok_or_else(|| D::Error::missing_field("x"))?,
165 }),
166 "Q" => Ok(PolygonEvent::Quote {
167 symbol: raw.symbol.ok_or_else(|| D::Error::missing_field("sym"))?,
168 timestamp: raw.timestamp.ok_or_else(|| D::Error::missing_field("t"))?,
169 bid_price: raw.bid_price.ok_or_else(|| D::Error::missing_field("bp"))?,
170 ask_price: raw.ask_price.ok_or_else(|| D::Error::missing_field("ap"))?,
171 bid_size: raw.bid_size.ok_or_else(|| D::Error::missing_field("bs"))?,
172 ask_size: raw.ask_size.ok_or_else(|| D::Error::missing_field("as"))?,
173 bid_exchange: raw.bid_exchange.ok_or_else(|| D::Error::missing_field("bx"))?,
174 ask_exchange: raw.ask_exchange.ok_or_else(|| D::Error::missing_field("ax"))?,
175 }),
176 "AM" => Ok(PolygonEvent::AggregateBar {
177 symbol: raw.symbol.ok_or_else(|| D::Error::missing_field("sym"))?,
178 start_timestamp: raw.size_or_start.ok_or_else(|| D::Error::missing_field("s"))?,
179 end_timestamp: raw.end_timestamp.ok_or_else(|| D::Error::missing_field("e"))?,
180 open: raw.open.ok_or_else(|| D::Error::missing_field("o"))?,
181 high: raw.high.ok_or_else(|| D::Error::missing_field("h"))?,
182 low: raw.low.ok_or_else(|| D::Error::missing_field("l"))?,
183 close: raw.conditions_or_close
184 .and_then(|v| v.as_f64())
185 .ok_or_else(|| D::Error::missing_field("c"))?,
186 volume: raw.volume.ok_or_else(|| D::Error::missing_field("v"))?,
187 vwap: raw.vwap.ok_or_else(|| D::Error::missing_field("vw"))?,
188 }),
189 "L2" => Ok(PolygonEvent::Level2 {
190 symbol: raw.symbol.ok_or_else(|| D::Error::missing_field("sym"))?,
191 timestamp: raw.timestamp.ok_or_else(|| D::Error::missing_field("t"))?,
192 bids: raw.bids.unwrap_or_default(),
193 asks: raw.asks.unwrap_or_default(),
194 }),
195 other => Err(D::Error::unknown_variant(other, &["status", "T", "Q", "AM", "L2"])),
196 }
197 }
198}
199
200impl Serialize for PolygonEvent {
202 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
203 where
204 S: serde::Serializer,
205 {
206 use serde::ser::SerializeMap;
207
208 let mut map = serializer.serialize_map(None)?;
209
210 match self {
211 PolygonEvent::Status { status, message } => {
212 map.serialize_entry("ev", "status")?;
213 map.serialize_entry("status", status)?;
214 map.serialize_entry("message", message)?;
215 }
216 PolygonEvent::Trade {
217 symbol,
218 timestamp,
219 price,
220 size,
221 conditions,
222 exchange,
223 } => {
224 map.serialize_entry("ev", "T")?;
225 map.serialize_entry("sym", symbol)?;
226 map.serialize_entry("t", timestamp)?;
227 map.serialize_entry("p", price)?;
228 map.serialize_entry("s", size)?;
229 map.serialize_entry("c", conditions)?;
230 map.serialize_entry("x", exchange)?;
231 }
232 PolygonEvent::Quote {
233 symbol,
234 timestamp,
235 bid_price,
236 ask_price,
237 bid_size,
238 ask_size,
239 bid_exchange,
240 ask_exchange,
241 } => {
242 map.serialize_entry("ev", "Q")?;
243 map.serialize_entry("sym", symbol)?;
244 map.serialize_entry("t", timestamp)?;
245 map.serialize_entry("bp", bid_price)?;
246 map.serialize_entry("ap", ask_price)?;
247 map.serialize_entry("bs", bid_size)?;
248 map.serialize_entry("as", ask_size)?;
249 map.serialize_entry("bx", bid_exchange)?;
250 map.serialize_entry("ax", ask_exchange)?;
251 }
252 PolygonEvent::AggregateBar {
253 symbol,
254 start_timestamp,
255 end_timestamp,
256 open,
257 high,
258 low,
259 close,
260 volume,
261 vwap,
262 } => {
263 map.serialize_entry("ev", "AM")?;
264 map.serialize_entry("sym", symbol)?;
265 map.serialize_entry("s", start_timestamp)?;
266 map.serialize_entry("e", end_timestamp)?;
267 map.serialize_entry("o", open)?;
268 map.serialize_entry("h", high)?;
269 map.serialize_entry("l", low)?;
270 map.serialize_entry("c", close)?;
271 map.serialize_entry("v", volume)?;
272 map.serialize_entry("vw", vwap)?;
273 }
274 PolygonEvent::Level2 {
275 symbol,
276 timestamp,
277 bids,
278 asks,
279 } => {
280 map.serialize_entry("ev", "L2")?;
281 map.serialize_entry("sym", symbol)?;
282 map.serialize_entry("t", timestamp)?;
283 map.serialize_entry("b", bids)?;
284 map.serialize_entry("a", asks)?;
285 }
286 }
287
288 map.end()
289 }
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct PriceLevel {
294 #[serde(rename = "p")]
295 pub price: f64,
296 #[serde(rename = "s")]
297 pub size: u64,
298}
299
300#[derive(Debug, Clone, PartialEq, Eq, Hash)]
302pub enum PolygonChannel {
303 Trades,
304 Quotes,
305 AggregateBars,
306 Level2,
307}
308
309impl PolygonChannel {
310 fn prefix(&self) -> &'static str {
311 match self {
312 PolygonChannel::Trades => "T",
313 PolygonChannel::Quotes => "Q",
314 PolygonChannel::AggregateBars => "AM",
315 PolygonChannel::Level2 => "L2",
316 }
317 }
318
319 fn format_subscription(&self, symbol: &str) -> String {
320 format!("{}.{}", self.prefix(), symbol)
321 }
322}
323
324#[derive(Debug, Clone)]
326struct Subscription {
327 symbols: Vec<String>,
328 channels: Vec<PolygonChannel>,
329 active: bool,
330}
331
332pub struct PolygonWebSocket {
334 api_key: String,
335 ws_url: String,
336 subscriptions: Arc<DashMap<String, Subscription>>,
337 event_tx: Arc<RwLock<Option<broadcast::Sender<PolygonEvent>>>>,
338 reconnect_delay: Arc<RwLock<Duration>>,
339 rate_limiter: Arc<RateLimiter<
340 governor::state::direct::NotKeyed,
341 governor::state::InMemoryState,
342 governor::clock::DefaultClock,
343 >>,
344 connection_active: Arc<RwLock<bool>>,
345}
346
347impl PolygonWebSocket {
348 pub fn new(api_key: String) -> Self {
350 let quota = Quota::per_minute(std::num::NonZeroU32::new(RATE_LIMIT_PER_MINUTE).unwrap());
351 let rate_limiter = Arc::new(RateLimiter::direct(quota));
352
353 Self {
354 api_key,
355 ws_url: POLYGON_WS_URL.to_string(),
356 subscriptions: Arc::new(DashMap::new()),
357 event_tx: Arc::new(RwLock::new(None)),
358 reconnect_delay: Arc::new(RwLock::new(INITIAL_RECONNECT_DELAY)),
359 rate_limiter,
360 connection_active: Arc::new(RwLock::new(false)),
361 }
362 }
363
364 pub async fn connect(&self) -> Result<()> {
366 let ws_client = WebSocketClient::new(format!("{}/stocks", self.ws_url))
367 .with_reconnect_delay(INITIAL_RECONNECT_DELAY)
368 .with_max_attempts(10);
369
370 let mut stream = ws_client.connect_with_retry().await?;
371
372 self.authenticate(&mut stream).await?;
374
375 *self.connection_active.write() = true;
377
378 let (tx, _) = broadcast::channel(CHANNEL_BUFFER_SIZE);
380 *self.event_tx.write() = Some(tx.clone());
381
382 let event_tx = self.event_tx.clone();
384 let subscriptions = self.subscriptions.clone();
385 let connection_active = self.connection_active.clone();
386 let reconnect_delay = self.reconnect_delay.clone();
387
388 tokio::spawn(async move {
389 Self::process_messages(
390 stream,
391 event_tx,
392 subscriptions,
393 connection_active,
394 reconnect_delay,
395 )
396 .await;
397 });
398
399 info!("Polygon WebSocket connected and authenticated");
400 Ok(())
401 }
402
403 async fn authenticate(&self, stream: &mut WebSocketStream) -> Result<()> {
405 let auth_msg = serde_json::json!({
406 "action": "auth",
407 "params": self.api_key
408 });
409
410 stream
411 .send(Message::Text(auth_msg.to_string()))
412 .await
413 .map_err(|e| MarketDataError::Auth(format!("Failed to send auth: {}", e)))?;
414
415 match tokio::time::timeout(Duration::from_secs(10), stream.next()).await {
417 Ok(Some(Ok(Message::Text(msg)))) => {
418 if let Ok(events) = serde_json::from_str::<Vec<PolygonEvent>>(&msg) {
419 for event in events {
420 if let PolygonEvent::Status { status, message } = event {
421 if status == "auth_success" {
422 info!("Polygon authentication successful");
423 return Ok(());
424 } else if status == "auth_failed" {
425 return Err(MarketDataError::Auth(format!(
426 "Authentication failed: {}",
427 message
428 )));
429 }
430 }
431 }
432 }
433 Err(MarketDataError::Auth(
434 "Unexpected auth response".to_string(),
435 ))
436 }
437 Ok(Some(Ok(_))) => Err(MarketDataError::Auth(
438 "Invalid auth response format".to_string(),
439 )),
440 Ok(Some(Err(e))) => Err(e),
441 Ok(None) => Err(MarketDataError::Auth(
442 "Connection closed during auth".to_string(),
443 )),
444 Err(_) => Err(MarketDataError::Timeout),
445 }
446 }
447
448 pub async fn subscribe(
450 &self,
451 symbols: Vec<String>,
452 channels: Vec<PolygonChannel>,
453 ) -> Result<()> {
454 self.rate_limiter.until_ready().await;
455
456 let subscription_strings: Vec<String> = symbols
457 .iter()
458 .flat_map(|symbol| {
459 channels
460 .iter()
461 .map(move |channel| channel.format_subscription(symbol))
462 })
463 .collect();
464
465 let _subscribe_msg = serde_json::json!({
466 "action": "subscribe",
467 "params": subscription_strings.join(",")
468 });
469
470 for symbol in &symbols {
472 self.subscriptions.insert(
473 symbol.clone(),
474 Subscription {
475 symbols: vec![symbol.clone()],
476 channels: channels.clone(),
477 active: true,
478 },
479 );
480 }
481
482 if let Some(_tx) = self.event_tx.read().as_ref() {
484 debug!(
487 "Subscribing to {} channels for {} symbols",
488 channels.len(),
489 symbols.len()
490 );
491 }
492
493 info!(
494 "Subscribed to {} symbols across {} channels",
495 symbols.len(),
496 channels.len()
497 );
498 Ok(())
499 }
500
501 pub async fn unsubscribe(&self, symbols: Vec<String>) -> Result<()> {
503 self.rate_limiter.until_ready().await;
504
505 let mut unsubscribe_list = Vec::new();
506
507 for symbol in &symbols {
508 if let Some(sub) = self.subscriptions.get(symbol.as_str()) {
509 for channel in &sub.channels {
510 unsubscribe_list.push(channel.format_subscription(symbol));
511 }
512 drop(sub); self.subscriptions.remove(symbol.as_str());
514 }
515 }
516
517 let _unsubscribe_msg = serde_json::json!({
518 "action": "unsubscribe",
519 "params": unsubscribe_list.join(",")
520 });
521
522 debug!("Unsubscribed from {} symbols", symbols.len());
523 Ok(())
524 }
525
526 pub fn stream(&self) -> impl Stream<Item = PolygonEvent> {
528 let rx = self
529 .event_tx
530 .read()
531 .as_ref()
532 .expect("Not connected")
533 .subscribe();
534
535 futures::stream::unfold(rx, |mut rx| async move {
536 match rx.recv().await {
537 Ok(event) => Some((event, rx)),
538 Err(_) => None,
539 }
540 })
541 }
542
543 pub fn quote_stream(&self) -> impl Stream<Item = Result<Quote>> {
545 self.stream().filter_map(|event| async move {
546 match event {
547 PolygonEvent::Quote {
548 symbol,
549 timestamp,
550 bid_price,
551 ask_price,
552 bid_size,
553 ask_size,
554 ..
555 } => {
556 let timestamp_dt = Self::timestamp_to_datetime(timestamp);
557 Some(Ok(Quote {
558 symbol,
559 timestamp: timestamp_dt,
560 bid: Decimal::from_f64_retain(bid_price)?,
561 ask: Decimal::from_f64_retain(ask_price)?,
562 bid_size,
563 ask_size,
564 }))
565 }
566 _ => None,
567 }
568 })
569 }
570
571 pub fn trade_stream(&self) -> impl Stream<Item = Result<Trade>> {
573 self.stream().filter_map(|event| async move {
574 match event {
575 PolygonEvent::Trade {
576 symbol,
577 timestamp,
578 price,
579 size,
580 conditions,
581 ..
582 } => {
583 let timestamp_dt = Self::timestamp_to_datetime(timestamp);
584 Some(Ok(Trade {
585 symbol,
586 timestamp: timestamp_dt,
587 price: Decimal::from_f64_retain(price)?,
588 size,
589 conditions: conditions.iter().map(|c| c.to_string()).collect(),
590 }))
591 }
592 _ => None,
593 }
594 })
595 }
596
597 pub fn bar_stream(&self) -> impl Stream<Item = Result<Bar>> {
599 self.stream().filter_map(|event| async move {
600 match event {
601 PolygonEvent::AggregateBar {
602 symbol,
603 start_timestamp,
604 open,
605 high,
606 low,
607 close,
608 volume,
609 ..
610 } => {
611 let timestamp_dt = Self::timestamp_to_datetime(start_timestamp);
612 Some(Ok(Bar {
613 symbol,
614 timestamp: timestamp_dt,
615 open: Decimal::from_f64_retain(open)?,
616 high: Decimal::from_f64_retain(high)?,
617 low: Decimal::from_f64_retain(low)?,
618 close: Decimal::from_f64_retain(close)?,
619 volume,
620 }))
621 }
622 _ => None,
623 }
624 })
625 }
626
627 async fn process_messages(
629 mut stream: WebSocketStream,
630 event_tx: Arc<RwLock<Option<broadcast::Sender<PolygonEvent>>>>,
631 subscriptions: Arc<DashMap<String, Subscription>>,
632 connection_active: Arc<RwLock<bool>>,
633 reconnect_delay: Arc<RwLock<Duration>>,
634 ) {
635 let mut message_count = 0u64;
636 let mut error_count = 0u64;
637
638 while let Some(msg) = stream.next().await {
639 match msg {
640 Ok(Message::Text(text)) => {
641 message_count += 1;
642
643 match serde_json::from_str::<Vec<PolygonEvent>>(&text) {
645 Ok(events) => {
646 if let Some(tx) = event_tx.read().as_ref() {
647 for event in events {
648 if let PolygonEvent::Status { status, message } = &event {
650 info!("Polygon status: {} - {}", status, message);
651 }
652
653 let _ = tx.send(event);
655 }
656 }
657
658 *reconnect_delay.write() = INITIAL_RECONNECT_DELAY;
660 }
661 Err(e) => {
662 error_count += 1;
663 error!("Failed to parse Polygon message: {}", e);
664 debug!("Raw message: {}", text);
665
666 if error_count > 100 {
667 warn!("Too many parse errors, reconnecting...");
668 break;
669 }
670 }
671 }
672 }
673 Ok(Message::Binary(_)) => {
674 warn!("Unexpected binary message from Polygon");
675 }
676 Ok(Message::Ping(data)) => {
677 if let Err(e) = stream.send(Message::Pong(data)).await {
678 error!("Failed to send pong: {}", e);
679 break;
680 }
681 }
682 Ok(Message::Close(_)) => {
683 info!("WebSocket closed by Polygon");
684 break;
685 }
686 Err(e) => {
687 error!("WebSocket error: {}", e);
688 break;
689 }
690 _ => {}
691 }
692
693 if message_count % 10000 == 0 {
695 info!(
696 "Processed {} messages, {} errors, {} active subscriptions",
697 message_count,
698 error_count,
699 subscriptions.len()
700 );
701 }
702 }
703
704 *connection_active.write() = false;
706
707 let mut delay = *reconnect_delay.read();
709 if delay < MAX_RECONNECT_DELAY {
710 delay = (delay * 2).min(MAX_RECONNECT_DELAY);
711 *reconnect_delay.write() = delay;
712 }
713
714 warn!(
715 "Polygon WebSocket disconnected. Reconnecting in {:?}...",
716 delay
717 );
718 }
719
720 fn timestamp_to_datetime(nanos: i64) -> DateTime<Utc> {
722 let secs = nanos / 1_000_000_000;
723 let nsecs = (nanos % 1_000_000_000) as u32;
724 DateTime::from_timestamp(secs, nsecs).unwrap_or_else(|| Utc::now())
725 }
726
727 pub fn is_connected(&self) -> bool {
729 *self.connection_active.read()
730 }
731
732 pub fn get_subscriptions(&self) -> Vec<(String, Vec<PolygonChannel>)> {
734 self.subscriptions
735 .iter()
736 .filter(|entry| entry.value().active)
737 .map(|entry| (entry.key().clone(), entry.value().channels.clone()))
738 .collect()
739 }
740}
741
742pub struct PolygonClient {
744 api_key: String,
745 base_url: String,
746 ws: Arc<PolygonWebSocket>,
747}
748
749impl PolygonClient {
750 pub fn new(api_key: String) -> Self {
751 let ws = Arc::new(PolygonWebSocket::new(api_key.clone()));
752
753 Self {
754 api_key,
755 base_url: POLYGON_REST_URL.to_string(),
756 ws,
757 }
758 }
759
760 pub fn websocket(&self) -> Arc<PolygonWebSocket> {
761 self.ws.clone()
762 }
763}
764
765#[async_trait]
766impl MarketDataProvider for PolygonClient {
767 async fn get_quote(&self, symbol: &str) -> Result<Quote> {
768 let url = format!(
769 "{}/v2/last/nbbo/{}?apiKey={}",
770 self.base_url, symbol, self.api_key
771 );
772
773 let response: serde_json::Value = reqwest::get(&url)
774 .await
775 .map_err(|e| MarketDataError::Network(e.to_string()))?
776 .json()
777 .await
778 .map_err(|e| MarketDataError::Parse(e.to_string()))?;
779
780 let results = response["results"]
781 .as_object()
782 .ok_or_else(|| MarketDataError::Parse("No results".to_string()))?;
783
784 Ok(Quote {
785 symbol: symbol.to_string(),
786 timestamp: Utc::now(),
787 bid: Decimal::from_f64_retain(
788 results["P"]
789 .as_f64()
790 .ok_or_else(|| MarketDataError::Parse("Invalid bid".to_string()))?,
791 )
792 .ok_or_else(|| MarketDataError::Parse("Invalid bid decimal".to_string()))?,
793 ask: Decimal::from_f64_retain(
794 results["p"]
795 .as_f64()
796 .ok_or_else(|| MarketDataError::Parse("Invalid ask".to_string()))?,
797 )
798 .ok_or_else(|| MarketDataError::Parse("Invalid ask decimal".to_string()))?,
799 bid_size: results["S"].as_u64().unwrap_or(0),
800 ask_size: results["s"].as_u64().unwrap_or(0),
801 })
802 }
803
804 async fn get_bars(
805 &self,
806 symbol: &str,
807 start: DateTime<Utc>,
808 end: DateTime<Utc>,
809 timeframe: crate::types::Timeframe,
810 ) -> Result<Vec<Bar>> {
811 let tf_str = match timeframe {
813 crate::types::Timeframe::Minute1 => "1/minute",
814 crate::types::Timeframe::Minute5 => "5/minute",
815 crate::types::Timeframe::Minute15 => "15/minute",
816 crate::types::Timeframe::Hour1 => "1/hour",
817 crate::types::Timeframe::Day1 => "1/day",
818 };
819
820 let url = format!(
821 "{}/v2/aggs/ticker/{}/range/{}/{}/{}?apiKey={}",
822 self.base_url,
823 symbol,
824 tf_str,
825 start.timestamp_millis(),
826 end.timestamp_millis(),
827 self.api_key
828 );
829
830 let response: serde_json::Value = reqwest::get(&url)
831 .await
832 .map_err(|e| MarketDataError::Network(e.to_string()))?
833 .json()
834 .await
835 .map_err(|e| MarketDataError::Parse(e.to_string()))?;
836
837 let results = response["results"]
838 .as_array()
839 .ok_or_else(|| MarketDataError::Parse("No results".to_string()))?;
840
841 results
842 .iter()
843 .map(|bar| {
844 Ok(Bar {
845 symbol: symbol.to_string(),
846 timestamp: DateTime::from_timestamp_millis(
847 bar["t"]
848 .as_i64()
849 .ok_or_else(|| MarketDataError::Parse("Invalid timestamp".to_string()))?,
850 )
851 .ok_or_else(|| MarketDataError::Parse("Invalid datetime".to_string()))?,
852 open: Decimal::from_f64_retain(
853 bar["o"]
854 .as_f64()
855 .ok_or_else(|| MarketDataError::Parse("Invalid open".to_string()))?,
856 )
857 .ok_or_else(|| MarketDataError::Parse("Invalid open decimal".to_string()))?,
858 high: Decimal::from_f64_retain(
859 bar["h"]
860 .as_f64()
861 .ok_or_else(|| MarketDataError::Parse("Invalid high".to_string()))?,
862 )
863 .ok_or_else(|| MarketDataError::Parse("Invalid high decimal".to_string()))?,
864 low: Decimal::from_f64_retain(
865 bar["l"]
866 .as_f64()
867 .ok_or_else(|| MarketDataError::Parse("Invalid low".to_string()))?,
868 )
869 .ok_or_else(|| MarketDataError::Parse("Invalid low decimal".to_string()))?,
870 close: Decimal::from_f64_retain(
871 bar["c"]
872 .as_f64()
873 .ok_or_else(|| MarketDataError::Parse("Invalid close".to_string()))?,
874 )
875 .ok_or_else(|| MarketDataError::Parse("Invalid close decimal".to_string()))?,
876 volume: bar["v"].as_u64().unwrap_or(0),
877 })
878 })
879 .collect()
880 }
881
882 async fn subscribe_quotes(&self, symbols: Vec<String>) -> Result<QuoteStream> {
883 if !self.ws.is_connected() {
884 self.ws.connect().await?;
885 }
886
887 self.ws
888 .subscribe(symbols.clone(), vec![PolygonChannel::Quotes])
889 .await?;
890
891 let stream = self.ws.quote_stream();
892 Ok(Box::pin(stream))
893 }
894
895 async fn subscribe_trades(&self, symbols: Vec<String>) -> Result<TradeStream> {
896 if !self.ws.is_connected() {
897 self.ws.connect().await?;
898 }
899
900 self.ws
901 .subscribe(symbols.clone(), vec![PolygonChannel::Trades])
902 .await?;
903
904 let stream = self.ws.trade_stream();
905 Ok(Box::pin(stream))
906 }
907
908 async fn health_check(&self) -> Result<HealthStatus> {
909 if self.ws.is_connected() {
910 Ok(HealthStatus::Healthy)
911 } else {
912 Ok(HealthStatus::Degraded)
913 }
914 }
915}
916
917#[cfg(test)]
918mod tests {
919 use super::*;
920
921 #[test]
922 fn test_polygon_channel_formatting() {
923 assert_eq!(
924 PolygonChannel::Trades.format_subscription("AAPL"),
925 "T.AAPL"
926 );
927 assert_eq!(
928 PolygonChannel::Quotes.format_subscription("TSLA"),
929 "Q.TSLA"
930 );
931 assert_eq!(
932 PolygonChannel::AggregateBars.format_subscription("MSFT"),
933 "AM.MSFT"
934 );
935 }
936
937 #[test]
938 fn test_timestamp_conversion() {
939 let nanos = 1640000000000000000i64; let dt = PolygonWebSocket::timestamp_to_datetime(nanos);
941 assert_eq!(dt.timestamp(), 1640000000);
942 }
943
944 #[tokio::test]
945 async fn test_websocket_creation() {
946 let ws = PolygonWebSocket::new("test_key".to_string());
947 assert!(!ws.is_connected());
948 assert_eq!(ws.get_subscriptions().len(), 0);
949 }
950
951 #[test]
952 fn test_polygon_event_deserialization() {
953 let status_json = r#"[{"ev":"status","status":"connected","message":"Connected Successfully"}]"#;
955 let events: Vec<PolygonEvent> = serde_json::from_str(status_json).unwrap();
956 assert_eq!(events.len(), 1);
957 match &events[0] {
958 PolygonEvent::Status { status, .. } => {
959 assert_eq!(status, "connected");
960 }
961 _ => panic!("Wrong event type"),
962 }
963
964 let trade_json = r#"[{"ev":"T","sym":"AAPL","t":1640000000000000000,"p":150.00,"s":100,"c":[12,37],"x":4}]"#;
966 let events: Vec<PolygonEvent> = serde_json::from_str(trade_json).unwrap();
967 assert_eq!(events.len(), 1);
968
969 match &events[0] {
970 PolygonEvent::Trade {
971 symbol, price, size, ..
972 } => {
973 assert_eq!(symbol, "AAPL");
974 assert_eq!(*price, 150.00);
975 assert_eq!(*size, 100);
976 }
977 _ => panic!("Wrong event type"),
978 }
979 }
980
981 #[test]
982 fn test_quote_event_deserialization() {
983 let quote_json = r#"[{"ev":"Q","sym":"TSLA","t":1640000000000000000,"bp":900.00,"ap":901.00,"bs":50,"as":75,"bx":4,"ax":4}]"#;
984
985 let events: Vec<PolygonEvent> = serde_json::from_str(quote_json).unwrap();
986 assert_eq!(events.len(), 1);
987
988 match &events[0] {
989 PolygonEvent::Quote {
990 symbol,
991 bid_price,
992 ask_price,
993 ..
994 } => {
995 assert_eq!(symbol, "TSLA");
996 assert_eq!(*bid_price, 900.00);
997 assert_eq!(*ask_price, 901.00);
998 }
999 _ => panic!("Wrong event type"),
1000 }
1001 }
1002
1003 #[tokio::test]
1004 async fn test_subscription_management() {
1005 let ws = PolygonWebSocket::new("test_key".to_string());
1006
1007 assert_eq!(ws.get_subscriptions().len(), 0);
1009
1010 }
1014
1015 #[test]
1016 fn test_aggregate_bar_deserialization() {
1017 let bar_json = r#"[{"ev":"AM","sym":"NVDA","s":1640000000000000000,"e":1640000060000000000,"o":500.00,"h":505.00,"l":499.00,"c":503.00,"v":1000000,"vw":502.00}]"#;
1018
1019 let events: Vec<PolygonEvent> = serde_json::from_str(bar_json).unwrap();
1020 assert_eq!(events.len(), 1);
1021
1022 match &events[0] {
1023 PolygonEvent::AggregateBar {
1024 symbol,
1025 open,
1026 high,
1027 low,
1028 close,
1029 volume,
1030 ..
1031 } => {
1032 assert_eq!(symbol, "NVDA");
1033 assert_eq!(*open, 500.00);
1034 assert_eq!(*high, 505.00);
1035 assert_eq!(*low, 499.00);
1036 assert_eq!(*close, 503.00);
1037 assert_eq!(*volume, 1000000);
1038 }
1039 _ => panic!("Wrong event type"),
1040 }
1041 }
1042}