use crate::{
errors::{MarketDataError, Result},
types::{Bar, Quote, Trade},
websocket::{WebSocketClient, WebSocketStream},
{HealthStatus, MarketDataProvider, QuoteStream, TradeStream},
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use futures::{Stream, StreamExt};
use governor::{Quota, RateLimiter};
use parking_lot::RwLock;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::{
sync::Arc,
time::Duration,
};
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error, info, warn};
const POLYGON_WS_URL: &str = "wss://socket.polygon.io";
const POLYGON_REST_URL: &str = "https://api.polygon.io";
const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(60);
const INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
const CHANNEL_BUFFER_SIZE: usize = 10000;
const RATE_LIMIT_PER_MINUTE: u32 = 1000;
#[derive(Debug, Clone)]
pub enum PolygonEvent {
Status {
status: String,
message: String,
},
Trade {
symbol: String,
timestamp: i64,
price: f64,
size: u64,
conditions: Vec<i32>,
exchange: u8,
},
Quote {
symbol: String,
timestamp: i64,
bid_price: f64,
ask_price: f64,
bid_size: u64,
ask_size: u64,
bid_exchange: u8,
ask_exchange: u8,
},
AggregateBar {
symbol: String,
start_timestamp: i64,
end_timestamp: i64,
open: f64,
high: f64,
low: f64,
close: f64,
volume: u64,
vwap: f64,
},
Level2 {
symbol: String,
timestamp: i64,
bids: Vec<PriceLevel>,
asks: Vec<PriceLevel>,
},
}
impl<'de> Deserialize<'de> for PolygonEvent {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
#[derive(Debug, Deserialize)]
struct RawEvent {
ev: String,
#[serde(default)]
status: Option<String>,
#[serde(default)]
message: Option<String>,
#[serde(default, rename = "sym")]
symbol: Option<String>,
#[serde(default, rename = "t")]
timestamp: Option<i64>,
#[serde(default, rename = "p")]
price: Option<f64>,
#[serde(default, rename = "s")]
size_or_start: Option<i64>,
#[serde(default, rename = "c")]
conditions_or_close: Option<serde_json::Value>,
#[serde(default, rename = "x")]
exchange: Option<u8>,
#[serde(default, rename = "bp")]
bid_price: Option<f64>,
#[serde(default, rename = "ap")]
ask_price: Option<f64>,
#[serde(default, rename = "bs")]
bid_size: Option<u64>,
#[serde(default, rename = "as")]
ask_size: Option<u64>,
#[serde(default, rename = "bx")]
bid_exchange: Option<u8>,
#[serde(default, rename = "ax")]
ask_exchange: Option<u8>,
#[serde(default, rename = "e")]
end_timestamp: Option<i64>,
#[serde(default, rename = "o")]
open: Option<f64>,
#[serde(default, rename = "h")]
high: Option<f64>,
#[serde(default, rename = "l")]
low: Option<f64>,
#[serde(default, rename = "v")]
volume: Option<u64>,
#[serde(default, rename = "vw")]
vwap: Option<f64>,
#[serde(default, rename = "b")]
bids: Option<Vec<PriceLevel>>,
#[serde(default, rename = "a")]
asks: Option<Vec<PriceLevel>>,
}
let raw = RawEvent::deserialize(deserializer)?;
match raw.ev.as_str() {
"status" => Ok(PolygonEvent::Status {
status: raw.status.ok_or_else(|| D::Error::missing_field("status"))?,
message: raw.message.ok_or_else(|| D::Error::missing_field("message"))?,
}),
"T" => Ok(PolygonEvent::Trade {
symbol: raw.symbol.ok_or_else(|| D::Error::missing_field("sym"))?,
timestamp: raw.timestamp.ok_or_else(|| D::Error::missing_field("t"))?,
price: raw.price.ok_or_else(|| D::Error::missing_field("p"))?,
size: raw.size_or_start.ok_or_else(|| D::Error::missing_field("s"))? as u64,
conditions: raw.conditions_or_close
.and_then(|v| serde_json::from_value(v).ok())
.unwrap_or_default(),
exchange: raw.exchange.ok_or_else(|| D::Error::missing_field("x"))?,
}),
"Q" => Ok(PolygonEvent::Quote {
symbol: raw.symbol.ok_or_else(|| D::Error::missing_field("sym"))?,
timestamp: raw.timestamp.ok_or_else(|| D::Error::missing_field("t"))?,
bid_price: raw.bid_price.ok_or_else(|| D::Error::missing_field("bp"))?,
ask_price: raw.ask_price.ok_or_else(|| D::Error::missing_field("ap"))?,
bid_size: raw.bid_size.ok_or_else(|| D::Error::missing_field("bs"))?,
ask_size: raw.ask_size.ok_or_else(|| D::Error::missing_field("as"))?,
bid_exchange: raw.bid_exchange.ok_or_else(|| D::Error::missing_field("bx"))?,
ask_exchange: raw.ask_exchange.ok_or_else(|| D::Error::missing_field("ax"))?,
}),
"AM" => Ok(PolygonEvent::AggregateBar {
symbol: raw.symbol.ok_or_else(|| D::Error::missing_field("sym"))?,
start_timestamp: raw.size_or_start.ok_or_else(|| D::Error::missing_field("s"))?,
end_timestamp: raw.end_timestamp.ok_or_else(|| D::Error::missing_field("e"))?,
open: raw.open.ok_or_else(|| D::Error::missing_field("o"))?,
high: raw.high.ok_or_else(|| D::Error::missing_field("h"))?,
low: raw.low.ok_or_else(|| D::Error::missing_field("l"))?,
close: raw.conditions_or_close
.and_then(|v| v.as_f64())
.ok_or_else(|| D::Error::missing_field("c"))?,
volume: raw.volume.ok_or_else(|| D::Error::missing_field("v"))?,
vwap: raw.vwap.ok_or_else(|| D::Error::missing_field("vw"))?,
}),
"L2" => Ok(PolygonEvent::Level2 {
symbol: raw.symbol.ok_or_else(|| D::Error::missing_field("sym"))?,
timestamp: raw.timestamp.ok_or_else(|| D::Error::missing_field("t"))?,
bids: raw.bids.unwrap_or_default(),
asks: raw.asks.unwrap_or_default(),
}),
other => Err(D::Error::unknown_variant(other, &["status", "T", "Q", "AM", "L2"])),
}
}
}
impl Serialize for PolygonEvent {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(None)?;
match self {
PolygonEvent::Status { status, message } => {
map.serialize_entry("ev", "status")?;
map.serialize_entry("status", status)?;
map.serialize_entry("message", message)?;
}
PolygonEvent::Trade {
symbol,
timestamp,
price,
size,
conditions,
exchange,
} => {
map.serialize_entry("ev", "T")?;
map.serialize_entry("sym", symbol)?;
map.serialize_entry("t", timestamp)?;
map.serialize_entry("p", price)?;
map.serialize_entry("s", size)?;
map.serialize_entry("c", conditions)?;
map.serialize_entry("x", exchange)?;
}
PolygonEvent::Quote {
symbol,
timestamp,
bid_price,
ask_price,
bid_size,
ask_size,
bid_exchange,
ask_exchange,
} => {
map.serialize_entry("ev", "Q")?;
map.serialize_entry("sym", symbol)?;
map.serialize_entry("t", timestamp)?;
map.serialize_entry("bp", bid_price)?;
map.serialize_entry("ap", ask_price)?;
map.serialize_entry("bs", bid_size)?;
map.serialize_entry("as", ask_size)?;
map.serialize_entry("bx", bid_exchange)?;
map.serialize_entry("ax", ask_exchange)?;
}
PolygonEvent::AggregateBar {
symbol,
start_timestamp,
end_timestamp,
open,
high,
low,
close,
volume,
vwap,
} => {
map.serialize_entry("ev", "AM")?;
map.serialize_entry("sym", symbol)?;
map.serialize_entry("s", start_timestamp)?;
map.serialize_entry("e", end_timestamp)?;
map.serialize_entry("o", open)?;
map.serialize_entry("h", high)?;
map.serialize_entry("l", low)?;
map.serialize_entry("c", close)?;
map.serialize_entry("v", volume)?;
map.serialize_entry("vw", vwap)?;
}
PolygonEvent::Level2 {
symbol,
timestamp,
bids,
asks,
} => {
map.serialize_entry("ev", "L2")?;
map.serialize_entry("sym", symbol)?;
map.serialize_entry("t", timestamp)?;
map.serialize_entry("b", bids)?;
map.serialize_entry("a", asks)?;
}
}
map.end()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PriceLevel {
#[serde(rename = "p")]
pub price: f64,
#[serde(rename = "s")]
pub size: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PolygonChannel {
Trades,
Quotes,
AggregateBars,
Level2,
}
impl PolygonChannel {
fn prefix(&self) -> &'static str {
match self {
PolygonChannel::Trades => "T",
PolygonChannel::Quotes => "Q",
PolygonChannel::AggregateBars => "AM",
PolygonChannel::Level2 => "L2",
}
}
fn format_subscription(&self, symbol: &str) -> String {
format!("{}.{}", self.prefix(), symbol)
}
}
#[derive(Debug, Clone)]
struct Subscription {
symbols: Vec<String>,
channels: Vec<PolygonChannel>,
active: bool,
}
pub struct PolygonWebSocket {
api_key: String,
ws_url: String,
subscriptions: Arc<DashMap<String, Subscription>>,
event_tx: Arc<RwLock<Option<broadcast::Sender<PolygonEvent>>>>,
reconnect_delay: Arc<RwLock<Duration>>,
rate_limiter: Arc<RateLimiter<
governor::state::direct::NotKeyed,
governor::state::InMemoryState,
governor::clock::DefaultClock,
>>,
connection_active: Arc<RwLock<bool>>,
}
impl PolygonWebSocket {
pub fn new(api_key: String) -> Self {
let quota = Quota::per_minute(std::num::NonZeroU32::new(RATE_LIMIT_PER_MINUTE).unwrap());
let rate_limiter = Arc::new(RateLimiter::direct(quota));
Self {
api_key,
ws_url: POLYGON_WS_URL.to_string(),
subscriptions: Arc::new(DashMap::new()),
event_tx: Arc::new(RwLock::new(None)),
reconnect_delay: Arc::new(RwLock::new(INITIAL_RECONNECT_DELAY)),
rate_limiter,
connection_active: Arc::new(RwLock::new(false)),
}
}
pub async fn connect(&self) -> Result<()> {
let ws_client = WebSocketClient::new(format!("{}/stocks", self.ws_url))
.with_reconnect_delay(INITIAL_RECONNECT_DELAY)
.with_max_attempts(10);
let mut stream = ws_client.connect_with_retry().await?;
self.authenticate(&mut stream).await?;
*self.connection_active.write() = true;
let (tx, _) = broadcast::channel(CHANNEL_BUFFER_SIZE);
*self.event_tx.write() = Some(tx.clone());
let event_tx = self.event_tx.clone();
let subscriptions = self.subscriptions.clone();
let connection_active = self.connection_active.clone();
let reconnect_delay = self.reconnect_delay.clone();
tokio::spawn(async move {
Self::process_messages(
stream,
event_tx,
subscriptions,
connection_active,
reconnect_delay,
)
.await;
});
info!("Polygon WebSocket connected and authenticated");
Ok(())
}
async fn authenticate(&self, stream: &mut WebSocketStream) -> Result<()> {
let auth_msg = serde_json::json!({
"action": "auth",
"params": self.api_key
});
stream
.send(Message::Text(auth_msg.to_string()))
.await
.map_err(|e| MarketDataError::Auth(format!("Failed to send auth: {}", e)))?;
match tokio::time::timeout(Duration::from_secs(10), stream.next()).await {
Ok(Some(Ok(Message::Text(msg)))) => {
if let Ok(events) = serde_json::from_str::<Vec<PolygonEvent>>(&msg) {
for event in events {
if let PolygonEvent::Status { status, message } = event {
if status == "auth_success" {
info!("Polygon authentication successful");
return Ok(());
} else if status == "auth_failed" {
return Err(MarketDataError::Auth(format!(
"Authentication failed: {}",
message
)));
}
}
}
}
Err(MarketDataError::Auth(
"Unexpected auth response".to_string(),
))
}
Ok(Some(Ok(_))) => Err(MarketDataError::Auth(
"Invalid auth response format".to_string(),
)),
Ok(Some(Err(e))) => Err(e),
Ok(None) => Err(MarketDataError::Auth(
"Connection closed during auth".to_string(),
)),
Err(_) => Err(MarketDataError::Timeout),
}
}
pub async fn subscribe(
&self,
symbols: Vec<String>,
channels: Vec<PolygonChannel>,
) -> Result<()> {
self.rate_limiter.until_ready().await;
let subscription_strings: Vec<String> = symbols
.iter()
.flat_map(|symbol| {
channels
.iter()
.map(move |channel| channel.format_subscription(symbol))
})
.collect();
let _subscribe_msg = serde_json::json!({
"action": "subscribe",
"params": subscription_strings.join(",")
});
for symbol in &symbols {
self.subscriptions.insert(
symbol.clone(),
Subscription {
symbols: vec![symbol.clone()],
channels: channels.clone(),
active: true,
},
);
}
if let Some(_tx) = self.event_tx.read().as_ref() {
debug!(
"Subscribing to {} channels for {} symbols",
channels.len(),
symbols.len()
);
}
info!(
"Subscribed to {} symbols across {} channels",
symbols.len(),
channels.len()
);
Ok(())
}
pub async fn unsubscribe(&self, symbols: Vec<String>) -> Result<()> {
self.rate_limiter.until_ready().await;
let mut unsubscribe_list = Vec::new();
for symbol in &symbols {
if let Some(sub) = self.subscriptions.get(symbol.as_str()) {
for channel in &sub.channels {
unsubscribe_list.push(channel.format_subscription(symbol));
}
drop(sub); self.subscriptions.remove(symbol.as_str());
}
}
let _unsubscribe_msg = serde_json::json!({
"action": "unsubscribe",
"params": unsubscribe_list.join(",")
});
debug!("Unsubscribed from {} symbols", symbols.len());
Ok(())
}
pub fn stream(&self) -> impl Stream<Item = PolygonEvent> {
let rx = self
.event_tx
.read()
.as_ref()
.expect("Not connected")
.subscribe();
futures::stream::unfold(rx, |mut rx| async move {
match rx.recv().await {
Ok(event) => Some((event, rx)),
Err(_) => None,
}
})
}
pub fn quote_stream(&self) -> impl Stream<Item = Result<Quote>> {
self.stream().filter_map(|event| async move {
match event {
PolygonEvent::Quote {
symbol,
timestamp,
bid_price,
ask_price,
bid_size,
ask_size,
..
} => {
let timestamp_dt = Self::timestamp_to_datetime(timestamp);
Some(Ok(Quote {
symbol,
timestamp: timestamp_dt,
bid: Decimal::from_f64_retain(bid_price)?,
ask: Decimal::from_f64_retain(ask_price)?,
bid_size,
ask_size,
}))
}
_ => None,
}
})
}
pub fn trade_stream(&self) -> impl Stream<Item = Result<Trade>> {
self.stream().filter_map(|event| async move {
match event {
PolygonEvent::Trade {
symbol,
timestamp,
price,
size,
conditions,
..
} => {
let timestamp_dt = Self::timestamp_to_datetime(timestamp);
Some(Ok(Trade {
symbol,
timestamp: timestamp_dt,
price: Decimal::from_f64_retain(price)?,
size,
conditions: conditions.iter().map(|c| c.to_string()).collect(),
}))
}
_ => None,
}
})
}
pub fn bar_stream(&self) -> impl Stream<Item = Result<Bar>> {
self.stream().filter_map(|event| async move {
match event {
PolygonEvent::AggregateBar {
symbol,
start_timestamp,
open,
high,
low,
close,
volume,
..
} => {
let timestamp_dt = Self::timestamp_to_datetime(start_timestamp);
Some(Ok(Bar {
symbol,
timestamp: timestamp_dt,
open: Decimal::from_f64_retain(open)?,
high: Decimal::from_f64_retain(high)?,
low: Decimal::from_f64_retain(low)?,
close: Decimal::from_f64_retain(close)?,
volume,
}))
}
_ => None,
}
})
}
async fn process_messages(
mut stream: WebSocketStream,
event_tx: Arc<RwLock<Option<broadcast::Sender<PolygonEvent>>>>,
subscriptions: Arc<DashMap<String, Subscription>>,
connection_active: Arc<RwLock<bool>>,
reconnect_delay: Arc<RwLock<Duration>>,
) {
let mut message_count = 0u64;
let mut error_count = 0u64;
while let Some(msg) = stream.next().await {
match msg {
Ok(Message::Text(text)) => {
message_count += 1;
match serde_json::from_str::<Vec<PolygonEvent>>(&text) {
Ok(events) => {
if let Some(tx) = event_tx.read().as_ref() {
for event in events {
if let PolygonEvent::Status { status, message } = &event {
info!("Polygon status: {} - {}", status, message);
}
let _ = tx.send(event);
}
}
*reconnect_delay.write() = INITIAL_RECONNECT_DELAY;
}
Err(e) => {
error_count += 1;
error!("Failed to parse Polygon message: {}", e);
debug!("Raw message: {}", text);
if error_count > 100 {
warn!("Too many parse errors, reconnecting...");
break;
}
}
}
}
Ok(Message::Binary(_)) => {
warn!("Unexpected binary message from Polygon");
}
Ok(Message::Ping(data)) => {
if let Err(e) = stream.send(Message::Pong(data)).await {
error!("Failed to send pong: {}", e);
break;
}
}
Ok(Message::Close(_)) => {
info!("WebSocket closed by Polygon");
break;
}
Err(e) => {
error!("WebSocket error: {}", e);
break;
}
_ => {}
}
if message_count % 10000 == 0 {
info!(
"Processed {} messages, {} errors, {} active subscriptions",
message_count,
error_count,
subscriptions.len()
);
}
}
*connection_active.write() = false;
let mut delay = *reconnect_delay.read();
if delay < MAX_RECONNECT_DELAY {
delay = (delay * 2).min(MAX_RECONNECT_DELAY);
*reconnect_delay.write() = delay;
}
warn!(
"Polygon WebSocket disconnected. Reconnecting in {:?}...",
delay
);
}
fn timestamp_to_datetime(nanos: i64) -> DateTime<Utc> {
let secs = nanos / 1_000_000_000;
let nsecs = (nanos % 1_000_000_000) as u32;
DateTime::from_timestamp(secs, nsecs).unwrap_or_else(|| Utc::now())
}
pub fn is_connected(&self) -> bool {
*self.connection_active.read()
}
pub fn get_subscriptions(&self) -> Vec<(String, Vec<PolygonChannel>)> {
self.subscriptions
.iter()
.filter(|entry| entry.value().active)
.map(|entry| (entry.key().clone(), entry.value().channels.clone()))
.collect()
}
}
pub struct PolygonClient {
api_key: String,
base_url: String,
ws: Arc<PolygonWebSocket>,
}
impl PolygonClient {
pub fn new(api_key: String) -> Self {
let ws = Arc::new(PolygonWebSocket::new(api_key.clone()));
Self {
api_key,
base_url: POLYGON_REST_URL.to_string(),
ws,
}
}
pub fn websocket(&self) -> Arc<PolygonWebSocket> {
self.ws.clone()
}
}
#[async_trait]
impl MarketDataProvider for PolygonClient {
async fn get_quote(&self, symbol: &str) -> Result<Quote> {
let url = format!(
"{}/v2/last/nbbo/{}?apiKey={}",
self.base_url, symbol, self.api_key
);
let response: serde_json::Value = reqwest::get(&url)
.await
.map_err(|e| MarketDataError::Network(e.to_string()))?
.json()
.await
.map_err(|e| MarketDataError::Parse(e.to_string()))?;
let results = response["results"]
.as_object()
.ok_or_else(|| MarketDataError::Parse("No results".to_string()))?;
Ok(Quote {
symbol: symbol.to_string(),
timestamp: Utc::now(),
bid: Decimal::from_f64_retain(
results["P"]
.as_f64()
.ok_or_else(|| MarketDataError::Parse("Invalid bid".to_string()))?,
)
.ok_or_else(|| MarketDataError::Parse("Invalid bid decimal".to_string()))?,
ask: Decimal::from_f64_retain(
results["p"]
.as_f64()
.ok_or_else(|| MarketDataError::Parse("Invalid ask".to_string()))?,
)
.ok_or_else(|| MarketDataError::Parse("Invalid ask decimal".to_string()))?,
bid_size: results["S"].as_u64().unwrap_or(0),
ask_size: results["s"].as_u64().unwrap_or(0),
})
}
async fn get_bars(
&self,
symbol: &str,
start: DateTime<Utc>,
end: DateTime<Utc>,
timeframe: crate::types::Timeframe,
) -> Result<Vec<Bar>> {
let tf_str = match timeframe {
crate::types::Timeframe::Minute1 => "1/minute",
crate::types::Timeframe::Minute5 => "5/minute",
crate::types::Timeframe::Minute15 => "15/minute",
crate::types::Timeframe::Hour1 => "1/hour",
crate::types::Timeframe::Day1 => "1/day",
};
let url = format!(
"{}/v2/aggs/ticker/{}/range/{}/{}/{}?apiKey={}",
self.base_url,
symbol,
tf_str,
start.timestamp_millis(),
end.timestamp_millis(),
self.api_key
);
let response: serde_json::Value = reqwest::get(&url)
.await
.map_err(|e| MarketDataError::Network(e.to_string()))?
.json()
.await
.map_err(|e| MarketDataError::Parse(e.to_string()))?;
let results = response["results"]
.as_array()
.ok_or_else(|| MarketDataError::Parse("No results".to_string()))?;
results
.iter()
.map(|bar| {
Ok(Bar {
symbol: symbol.to_string(),
timestamp: DateTime::from_timestamp_millis(
bar["t"]
.as_i64()
.ok_or_else(|| MarketDataError::Parse("Invalid timestamp".to_string()))?,
)
.ok_or_else(|| MarketDataError::Parse("Invalid datetime".to_string()))?,
open: Decimal::from_f64_retain(
bar["o"]
.as_f64()
.ok_or_else(|| MarketDataError::Parse("Invalid open".to_string()))?,
)
.ok_or_else(|| MarketDataError::Parse("Invalid open decimal".to_string()))?,
high: Decimal::from_f64_retain(
bar["h"]
.as_f64()
.ok_or_else(|| MarketDataError::Parse("Invalid high".to_string()))?,
)
.ok_or_else(|| MarketDataError::Parse("Invalid high decimal".to_string()))?,
low: Decimal::from_f64_retain(
bar["l"]
.as_f64()
.ok_or_else(|| MarketDataError::Parse("Invalid low".to_string()))?,
)
.ok_or_else(|| MarketDataError::Parse("Invalid low decimal".to_string()))?,
close: Decimal::from_f64_retain(
bar["c"]
.as_f64()
.ok_or_else(|| MarketDataError::Parse("Invalid close".to_string()))?,
)
.ok_or_else(|| MarketDataError::Parse("Invalid close decimal".to_string()))?,
volume: bar["v"].as_u64().unwrap_or(0),
})
})
.collect()
}
async fn subscribe_quotes(&self, symbols: Vec<String>) -> Result<QuoteStream> {
if !self.ws.is_connected() {
self.ws.connect().await?;
}
self.ws
.subscribe(symbols.clone(), vec![PolygonChannel::Quotes])
.await?;
let stream = self.ws.quote_stream();
Ok(Box::pin(stream))
}
async fn subscribe_trades(&self, symbols: Vec<String>) -> Result<TradeStream> {
if !self.ws.is_connected() {
self.ws.connect().await?;
}
self.ws
.subscribe(symbols.clone(), vec![PolygonChannel::Trades])
.await?;
let stream = self.ws.trade_stream();
Ok(Box::pin(stream))
}
async fn health_check(&self) -> Result<HealthStatus> {
if self.ws.is_connected() {
Ok(HealthStatus::Healthy)
} else {
Ok(HealthStatus::Degraded)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_polygon_channel_formatting() {
assert_eq!(
PolygonChannel::Trades.format_subscription("AAPL"),
"T.AAPL"
);
assert_eq!(
PolygonChannel::Quotes.format_subscription("TSLA"),
"Q.TSLA"
);
assert_eq!(
PolygonChannel::AggregateBars.format_subscription("MSFT"),
"AM.MSFT"
);
}
#[test]
fn test_timestamp_conversion() {
let nanos = 1640000000000000000i64; let dt = PolygonWebSocket::timestamp_to_datetime(nanos);
assert_eq!(dt.timestamp(), 1640000000);
}
#[tokio::test]
async fn test_websocket_creation() {
let ws = PolygonWebSocket::new("test_key".to_string());
assert!(!ws.is_connected());
assert_eq!(ws.get_subscriptions().len(), 0);
}
#[test]
fn test_polygon_event_deserialization() {
let status_json = r#"[{"ev":"status","status":"connected","message":"Connected Successfully"}]"#;
let events: Vec<PolygonEvent> = serde_json::from_str(status_json).unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
PolygonEvent::Status { status, .. } => {
assert_eq!(status, "connected");
}
_ => panic!("Wrong event type"),
}
let trade_json = r#"[{"ev":"T","sym":"AAPL","t":1640000000000000000,"p":150.00,"s":100,"c":[12,37],"x":4}]"#;
let events: Vec<PolygonEvent> = serde_json::from_str(trade_json).unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
PolygonEvent::Trade {
symbol, price, size, ..
} => {
assert_eq!(symbol, "AAPL");
assert_eq!(*price, 150.00);
assert_eq!(*size, 100);
}
_ => panic!("Wrong event type"),
}
}
#[test]
fn test_quote_event_deserialization() {
let quote_json = r#"[{"ev":"Q","sym":"TSLA","t":1640000000000000000,"bp":900.00,"ap":901.00,"bs":50,"as":75,"bx":4,"ax":4}]"#;
let events: Vec<PolygonEvent> = serde_json::from_str(quote_json).unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
PolygonEvent::Quote {
symbol,
bid_price,
ask_price,
..
} => {
assert_eq!(symbol, "TSLA");
assert_eq!(*bid_price, 900.00);
assert_eq!(*ask_price, 901.00);
}
_ => panic!("Wrong event type"),
}
}
#[tokio::test]
async fn test_subscription_management() {
let ws = PolygonWebSocket::new("test_key".to_string());
assert_eq!(ws.get_subscriptions().len(), 0);
}
#[test]
fn test_aggregate_bar_deserialization() {
let bar_json = r#"[{"ev":"AM","sym":"NVDA","s":1640000000000000000,"e":1640000060000000000,"o":500.00,"h":505.00,"l":499.00,"c":503.00,"v":1000000,"vw":502.00}]"#;
let events: Vec<PolygonEvent> = serde_json::from_str(bar_json).unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
PolygonEvent::AggregateBar {
symbol,
open,
high,
low,
close,
volume,
..
} => {
assert_eq!(symbol, "NVDA");
assert_eq!(*open, 500.00);
assert_eq!(*high, 505.00);
assert_eq!(*low, 499.00);
assert_eq!(*close, 503.00);
assert_eq!(*volume, 1000000);
}
_ => panic!("Wrong event type"),
}
}
}