Skip to main content

finance_query/adapters/polygon/
websocket.rs

1//! Polygon.io WebSocket streaming for real-time market data.
2//!
3//! Provides real-time trades, quotes, and aggregate bars for stocks, options, forex,
4//! crypto, futures, and indices.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use finance_query::adapters::polygon;
10//! use finance_query::adapters::polygon::websocket::*;
11//! use futures::StreamExt;
12//!
13//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
14//! polygon::init("YOUR_KEY")?;
15//! let mut stream = PolygonStream::from_singleton()?
16//!     .cluster(Cluster::Stocks)
17//!     .subscribe(&["T.AAPL", "Q.AAPL", "AM.AAPL"])
18//!     .build()
19//!     .await?;
20//!
21//! while let Some(msg) = stream.next().await {
22//!     println!("{:?}", msg);
23//! }
24//! # Ok(())
25//! # }
26//! ```
27
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use futures::stream::Stream;
32use serde::{Deserialize, Serialize};
33use tokio_tungstenite::tungstenite::Message;
34
35use crate::error::{FinanceError, Result};
36
37/// WebSocket cluster (asset class).
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum Cluster {
40    /// Real-time stock data.
41    Stocks,
42    /// Real-time options data.
43    Options,
44    /// Real-time forex data.
45    Forex,
46    /// Real-time crypto data.
47    Crypto,
48    /// Real-time futures data.
49    Futures,
50    /// Real-time index data.
51    Indices,
52}
53
54impl Cluster {
55    fn as_str(&self) -> &'static str {
56        match self {
57            Self::Stocks => "stocks",
58            Self::Options => "options",
59            Self::Forex => "forex",
60            Self::Crypto => "crypto",
61            Self::Futures => "futures",
62            Self::Indices => "indices",
63        }
64    }
65}
66
67/// A real-time trade message.
68#[derive(Debug, Clone, Serialize, Deserialize)]
69#[non_exhaustive]
70pub struct StreamTrade {
71    /// Event type (e.g., `"T"`).
72    pub ev: Option<String>,
73    /// Symbol.
74    pub sym: Option<String>,
75    /// Price.
76    pub p: Option<f64>,
77    /// Size.
78    pub s: Option<f64>,
79    /// Exchange ID.
80    pub x: Option<i32>,
81    /// Conditions.
82    pub c: Option<Vec<i32>>,
83    /// Timestamp (milliseconds).
84    pub t: Option<i64>,
85}
86
87/// A real-time quote message.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[non_exhaustive]
90pub struct StreamQuote {
91    /// Event type (e.g., `"Q"`).
92    pub ev: Option<String>,
93    /// Symbol.
94    pub sym: Option<String>,
95    /// Bid price.
96    pub bp: Option<f64>,
97    /// Bid size.
98    pub bs: Option<f64>,
99    /// Ask price.
100    pub ap: Option<f64>,
101    /// Ask size.
102    #[serde(rename = "as")]
103    pub ask_size: Option<f64>,
104    /// Bid exchange.
105    pub bx: Option<i32>,
106    /// Ask exchange.
107    pub ax: Option<i32>,
108    /// Conditions.
109    pub c: Option<Vec<i32>>,
110    /// Timestamp (milliseconds).
111    pub t: Option<i64>,
112}
113
114/// A real-time aggregate bar message.
115#[derive(Debug, Clone, Serialize, Deserialize)]
116#[non_exhaustive]
117pub struct StreamAggregate {
118    /// Event type (e.g., `"A"` per-second, `"AM"` per-minute).
119    pub ev: Option<String>,
120    /// Symbol.
121    pub sym: Option<String>,
122    /// Open.
123    pub o: Option<f64>,
124    /// High.
125    pub h: Option<f64>,
126    /// Low.
127    pub l: Option<f64>,
128    /// Close.
129    pub c: Option<f64>,
130    /// Volume.
131    pub v: Option<f64>,
132    /// VWAP.
133    pub vw: Option<f64>,
134    /// Start timestamp.
135    pub s: Option<i64>,
136    /// End timestamp.
137    pub e: Option<i64>,
138    /// Number of trades.
139    pub z: Option<u64>,
140}
141
142/// A parsed WebSocket message from Polygon.
143#[derive(Debug, Clone)]
144pub enum PolygonMessage {
145    /// Trade event.
146    Trade(StreamTrade),
147    /// Quote event.
148    Quote(StreamQuote),
149    /// Aggregate bar (per-second or per-minute).
150    Aggregate(StreamAggregate),
151    /// Status/control message (auth, subscription confirmations).
152    Status(serde_json::Value),
153    /// Unknown/unparsed message.
154    Unknown(String),
155}
156
157/// Builder for a Polygon WebSocket stream.
158pub struct PolygonStreamBuilder {
159    api_key: String,
160    cluster: Cluster,
161    subscriptions: Vec<String>,
162}
163
164impl PolygonStreamBuilder {
165    /// Set the cluster (asset class) to connect to.
166    pub fn cluster(mut self, cluster: Cluster) -> Self {
167        self.cluster = cluster;
168        self
169    }
170
171    /// Add subscription channels.
172    ///
173    /// Channel prefixes:
174    /// - `T.*` — Trades (e.g., `"T.AAPL"`)
175    /// - `Q.*` — Quotes (e.g., `"Q.AAPL"`)
176    /// - `A.*` — Per-second aggregates (e.g., `"A.AAPL"`)
177    /// - `AM.*` — Per-minute aggregates (e.g., `"AM.AAPL"`)
178    pub fn subscribe(mut self, channels: &[&str]) -> Self {
179        self.subscriptions
180            .extend(channels.iter().map(|s| s.to_string()));
181        self
182    }
183
184    /// Connect and return a `PolygonStream`.
185    pub async fn build(self) -> Result<PolygonStream> {
186        let url = format!("wss://socket.polygon.io/{}", self.cluster.as_str());
187
188        let (ws_stream, _) = tokio_tungstenite::connect_async(&url)
189            .await
190            .map_err(|e| FinanceError::ApiError(format!("Polygon WebSocket connect error: {e}")))?;
191
192        let (write, read) = futures::StreamExt::split(ws_stream);
193        let write = std::sync::Arc::new(tokio::sync::Mutex::new(write));
194
195        // Auth
196        {
197            use futures::SinkExt;
198            let auth_msg = serde_json::json!({
199                "action": "auth",
200                "params": self.api_key
201            });
202            write
203                .lock()
204                .await
205                .send(Message::Text(auth_msg.to_string().into()))
206                .await
207                .map_err(|e| {
208                    FinanceError::ApiError(format!("Polygon WebSocket auth error: {e}"))
209                })?;
210        }
211
212        // Subscribe
213        if !self.subscriptions.is_empty() {
214            use futures::SinkExt;
215            let sub_msg = serde_json::json!({
216                "action": "subscribe",
217                "params": self.subscriptions.join(",")
218            });
219            write
220                .lock()
221                .await
222                .send(Message::Text(sub_msg.to_string().into()))
223                .await
224                .map_err(|e| {
225                    FinanceError::ApiError(format!("Polygon WebSocket subscribe error: {e}"))
226                })?;
227        }
228
229        Ok(PolygonStream {
230            read: Box::pin(read),
231            _write: write,
232        })
233    }
234}
235
236/// A real-time Polygon WebSocket stream.
237///
238/// Implements `futures::Stream<Item = PolygonMessage>`.
239pub struct PolygonStream {
240    read: Pin<
241        Box<
242            dyn Stream<Item = std::result::Result<Message, tokio_tungstenite::tungstenite::Error>>
243                + Send,
244        >,
245    >,
246    _write: std::sync::Arc<
247        tokio::sync::Mutex<
248            futures::stream::SplitSink<
249                tokio_tungstenite::WebSocketStream<
250                    tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
251                >,
252                Message,
253            >,
254        >,
255    >,
256}
257
258impl PolygonStream {
259    /// Create a new builder for a Polygon WebSocket stream.
260    ///
261    /// Requires [`crate::adapters::polygon::init`] to have been called first.
262    pub fn from_singleton() -> Result<PolygonStreamBuilder> {
263        Ok(PolygonStreamBuilder {
264            api_key: super::api_key()?,
265            cluster: Cluster::Stocks,
266            subscriptions: Vec::new(),
267        })
268    }
269}
270
271impl Stream for PolygonStream {
272    type Item = PolygonMessage;
273
274    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
275        loop {
276            match self.read.as_mut().poll_next(cx) {
277                Poll::Ready(Some(Ok(Message::Text(text)))) => {
278                    return Poll::Ready(Some(parse_message(&text)));
279                }
280                Poll::Ready(Some(Ok(Message::Close(_)))) | Poll::Ready(None) => {
281                    return Poll::Ready(None);
282                }
283                Poll::Ready(Some(Ok(_))) => continue, // skip ping/pong/binary
284                Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
285                Poll::Pending => return Poll::Pending,
286            }
287        }
288    }
289}
290
291fn parse_message(text: &str) -> PolygonMessage {
292    // Polygon sends arrays of events
293    let events: Vec<serde_json::Value> = match serde_json::from_str(text) {
294        Ok(v) => v,
295        Err(_) => return PolygonMessage::Unknown(text.to_string()),
296    };
297
298    // Return the first meaningful event
299    for event in events {
300        let ev = event.get("ev").and_then(|v| v.as_str()).unwrap_or("");
301        match ev {
302            "T" | "XT" => {
303                if let Ok(trade) = serde_json::from_value(event) {
304                    return PolygonMessage::Trade(trade);
305                }
306            }
307            "Q" | "XQ" => {
308                if let Ok(quote) = serde_json::from_value(event) {
309                    return PolygonMessage::Quote(quote);
310                }
311            }
312            "A" | "AM" | "XA" | "XAM" => {
313                if let Ok(agg) = serde_json::from_value(event) {
314                    return PolygonMessage::Aggregate(agg);
315                }
316            }
317            "status" => {
318                return PolygonMessage::Status(event);
319            }
320            _ => {}
321        }
322    }
323
324    PolygonMessage::Unknown(text.to_string())
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn test_parse_trade_message() {
333        let msg =
334            r#"[{"ev":"T","sym":"AAPL","p":186.19,"s":100,"x":4,"c":[12,37],"t":1705363200000}]"#;
335        match parse_message(msg) {
336            PolygonMessage::Trade(t) => {
337                assert_eq!(t.sym.as_deref(), Some("AAPL"));
338                assert!((t.p.unwrap() - 186.19).abs() < 0.01);
339                assert_eq!(t.s.unwrap() as u64, 100);
340            }
341            other => panic!("Expected Trade, got {:?}", other),
342        }
343    }
344
345    #[test]
346    fn test_parse_quote_message() {
347        let msg = r#"[{"ev":"Q","sym":"AAPL","bp":186.18,"bs":2,"ap":186.25,"as":3,"bx":19,"ax":11,"t":1705363200000}]"#;
348        match parse_message(msg) {
349            PolygonMessage::Quote(q) => {
350                assert_eq!(q.sym.as_deref(), Some("AAPL"));
351                assert!((q.bp.unwrap() - 186.18).abs() < 0.01);
352                assert!((q.ap.unwrap() - 186.25).abs() < 0.01);
353            }
354            other => panic!("Expected Quote, got {:?}", other),
355        }
356    }
357
358    #[test]
359    fn test_parse_aggregate_message() {
360        let msg = r#"[{"ev":"AM","sym":"AAPL","o":186.0,"h":186.25,"l":185.90,"c":186.19,"v":1500000,"vw":186.05,"s":1705363200000,"e":1705363260000,"z":823}]"#;
361        match parse_message(msg) {
362            PolygonMessage::Aggregate(a) => {
363                assert_eq!(a.sym.as_deref(), Some("AAPL"));
364                assert!((a.c.unwrap() - 186.19).abs() < 0.01);
365                assert_eq!(a.ev.as_deref(), Some("AM"));
366            }
367            other => panic!("Expected Aggregate, got {:?}", other),
368        }
369    }
370
371    #[test]
372    fn test_parse_status_message() {
373        let msg = r#"[{"ev":"status","status":"auth_success","message":"authenticated"}]"#;
374        match parse_message(msg) {
375            PolygonMessage::Status(v) => {
376                assert_eq!(v.get("status").unwrap().as_str().unwrap(), "auth_success");
377            }
378            other => panic!("Expected Status, got {:?}", other),
379        }
380    }
381
382    #[test]
383    fn test_parse_unknown_message() {
384        let msg = "not json at all";
385        assert!(matches!(parse_message(msg), PolygonMessage::Unknown(_)));
386    }
387
388    #[test]
389    fn test_cluster_as_str() {
390        assert_eq!(Cluster::Stocks.as_str(), "stocks");
391        assert_eq!(Cluster::Options.as_str(), "options");
392        assert_eq!(Cluster::Crypto.as_str(), "crypto");
393        assert_eq!(Cluster::Futures.as_str(), "futures");
394        assert_eq!(Cluster::Indices.as_str(), "indices");
395    }
396}