finance_query/adapters/polygon/
websocket.rs1use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use futures::stream::Stream;
32use serde::{Deserialize, Serialize};
33use tokio_tungstenite::tungstenite::Message;
34
35use crate::error::{FinanceError, Result};
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum Cluster {
40 Stocks,
42 Options,
44 Forex,
46 Crypto,
48 Futures,
50 Indices,
52}
53
54impl Cluster {
55 fn as_str(&self) -> &'static str {
56 match self {
57 Self::Stocks => "stocks",
58 Self::Options => "options",
59 Self::Forex => "forex",
60 Self::Crypto => "crypto",
61 Self::Futures => "futures",
62 Self::Indices => "indices",
63 }
64 }
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69#[non_exhaustive]
70pub struct StreamTrade {
71 pub ev: Option<String>,
73 pub sym: Option<String>,
75 pub p: Option<f64>,
77 pub s: Option<f64>,
79 pub x: Option<i32>,
81 pub c: Option<Vec<i32>>,
83 pub t: Option<i64>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89#[non_exhaustive]
90pub struct StreamQuote {
91 pub ev: Option<String>,
93 pub sym: Option<String>,
95 pub bp: Option<f64>,
97 pub bs: Option<f64>,
99 pub ap: Option<f64>,
101 #[serde(rename = "as")]
103 pub ask_size: Option<f64>,
104 pub bx: Option<i32>,
106 pub ax: Option<i32>,
108 pub c: Option<Vec<i32>>,
110 pub t: Option<i64>,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116#[non_exhaustive]
117pub struct StreamAggregate {
118 pub ev: Option<String>,
120 pub sym: Option<String>,
122 pub o: Option<f64>,
124 pub h: Option<f64>,
126 pub l: Option<f64>,
128 pub c: Option<f64>,
130 pub v: Option<f64>,
132 pub vw: Option<f64>,
134 pub s: Option<i64>,
136 pub e: Option<i64>,
138 pub z: Option<u64>,
140}
141
142#[derive(Debug, Clone)]
144pub enum PolygonMessage {
145 Trade(StreamTrade),
147 Quote(StreamQuote),
149 Aggregate(StreamAggregate),
151 Status(serde_json::Value),
153 Unknown(String),
155}
156
157pub struct PolygonStreamBuilder {
159 api_key: String,
160 cluster: Cluster,
161 subscriptions: Vec<String>,
162}
163
164impl PolygonStreamBuilder {
165 pub fn cluster(mut self, cluster: Cluster) -> Self {
167 self.cluster = cluster;
168 self
169 }
170
171 pub fn subscribe(mut self, channels: &[&str]) -> Self {
179 self.subscriptions
180 .extend(channels.iter().map(|s| s.to_string()));
181 self
182 }
183
184 pub async fn build(self) -> Result<PolygonStream> {
186 let url = format!("wss://socket.polygon.io/{}", self.cluster.as_str());
187
188 let (ws_stream, _) = tokio_tungstenite::connect_async(&url)
189 .await
190 .map_err(|e| FinanceError::ApiError(format!("Polygon WebSocket connect error: {e}")))?;
191
192 let (write, read) = futures::StreamExt::split(ws_stream);
193 let write = std::sync::Arc::new(tokio::sync::Mutex::new(write));
194
195 {
197 use futures::SinkExt;
198 let auth_msg = serde_json::json!({
199 "action": "auth",
200 "params": self.api_key
201 });
202 write
203 .lock()
204 .await
205 .send(Message::Text(auth_msg.to_string().into()))
206 .await
207 .map_err(|e| {
208 FinanceError::ApiError(format!("Polygon WebSocket auth error: {e}"))
209 })?;
210 }
211
212 if !self.subscriptions.is_empty() {
214 use futures::SinkExt;
215 let sub_msg = serde_json::json!({
216 "action": "subscribe",
217 "params": self.subscriptions.join(",")
218 });
219 write
220 .lock()
221 .await
222 .send(Message::Text(sub_msg.to_string().into()))
223 .await
224 .map_err(|e| {
225 FinanceError::ApiError(format!("Polygon WebSocket subscribe error: {e}"))
226 })?;
227 }
228
229 Ok(PolygonStream {
230 read: Box::pin(read),
231 _write: write,
232 })
233 }
234}
235
236pub struct PolygonStream {
240 read: Pin<
241 Box<
242 dyn Stream<Item = std::result::Result<Message, tokio_tungstenite::tungstenite::Error>>
243 + Send,
244 >,
245 >,
246 _write: std::sync::Arc<
247 tokio::sync::Mutex<
248 futures::stream::SplitSink<
249 tokio_tungstenite::WebSocketStream<
250 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
251 >,
252 Message,
253 >,
254 >,
255 >,
256}
257
258impl PolygonStream {
259 pub fn from_singleton() -> Result<PolygonStreamBuilder> {
263 Ok(PolygonStreamBuilder {
264 api_key: super::api_key()?,
265 cluster: Cluster::Stocks,
266 subscriptions: Vec::new(),
267 })
268 }
269}
270
271impl Stream for PolygonStream {
272 type Item = PolygonMessage;
273
274 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
275 loop {
276 match self.read.as_mut().poll_next(cx) {
277 Poll::Ready(Some(Ok(Message::Text(text)))) => {
278 return Poll::Ready(Some(parse_message(&text)));
279 }
280 Poll::Ready(Some(Ok(Message::Close(_)))) | Poll::Ready(None) => {
281 return Poll::Ready(None);
282 }
283 Poll::Ready(Some(Ok(_))) => continue, Poll::Ready(Some(Err(_))) => return Poll::Ready(None),
285 Poll::Pending => return Poll::Pending,
286 }
287 }
288 }
289}
290
291fn parse_message(text: &str) -> PolygonMessage {
292 let events: Vec<serde_json::Value> = match serde_json::from_str(text) {
294 Ok(v) => v,
295 Err(_) => return PolygonMessage::Unknown(text.to_string()),
296 };
297
298 for event in events {
300 let ev = event.get("ev").and_then(|v| v.as_str()).unwrap_or("");
301 match ev {
302 "T" | "XT" => {
303 if let Ok(trade) = serde_json::from_value(event) {
304 return PolygonMessage::Trade(trade);
305 }
306 }
307 "Q" | "XQ" => {
308 if let Ok(quote) = serde_json::from_value(event) {
309 return PolygonMessage::Quote(quote);
310 }
311 }
312 "A" | "AM" | "XA" | "XAM" => {
313 if let Ok(agg) = serde_json::from_value(event) {
314 return PolygonMessage::Aggregate(agg);
315 }
316 }
317 "status" => {
318 return PolygonMessage::Status(event);
319 }
320 _ => {}
321 }
322 }
323
324 PolygonMessage::Unknown(text.to_string())
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn test_parse_trade_message() {
333 let msg =
334 r#"[{"ev":"T","sym":"AAPL","p":186.19,"s":100,"x":4,"c":[12,37],"t":1705363200000}]"#;
335 match parse_message(msg) {
336 PolygonMessage::Trade(t) => {
337 assert_eq!(t.sym.as_deref(), Some("AAPL"));
338 assert!((t.p.unwrap() - 186.19).abs() < 0.01);
339 assert_eq!(t.s.unwrap() as u64, 100);
340 }
341 other => panic!("Expected Trade, got {:?}", other),
342 }
343 }
344
345 #[test]
346 fn test_parse_quote_message() {
347 let msg = r#"[{"ev":"Q","sym":"AAPL","bp":186.18,"bs":2,"ap":186.25,"as":3,"bx":19,"ax":11,"t":1705363200000}]"#;
348 match parse_message(msg) {
349 PolygonMessage::Quote(q) => {
350 assert_eq!(q.sym.as_deref(), Some("AAPL"));
351 assert!((q.bp.unwrap() - 186.18).abs() < 0.01);
352 assert!((q.ap.unwrap() - 186.25).abs() < 0.01);
353 }
354 other => panic!("Expected Quote, got {:?}", other),
355 }
356 }
357
358 #[test]
359 fn test_parse_aggregate_message() {
360 let msg = r#"[{"ev":"AM","sym":"AAPL","o":186.0,"h":186.25,"l":185.90,"c":186.19,"v":1500000,"vw":186.05,"s":1705363200000,"e":1705363260000,"z":823}]"#;
361 match parse_message(msg) {
362 PolygonMessage::Aggregate(a) => {
363 assert_eq!(a.sym.as_deref(), Some("AAPL"));
364 assert!((a.c.unwrap() - 186.19).abs() < 0.01);
365 assert_eq!(a.ev.as_deref(), Some("AM"));
366 }
367 other => panic!("Expected Aggregate, got {:?}", other),
368 }
369 }
370
371 #[test]
372 fn test_parse_status_message() {
373 let msg = r#"[{"ev":"status","status":"auth_success","message":"authenticated"}]"#;
374 match parse_message(msg) {
375 PolygonMessage::Status(v) => {
376 assert_eq!(v.get("status").unwrap().as_str().unwrap(), "auth_success");
377 }
378 other => panic!("Expected Status, got {:?}", other),
379 }
380 }
381
382 #[test]
383 fn test_parse_unknown_message() {
384 let msg = "not json at all";
385 assert!(matches!(parse_message(msg), PolygonMessage::Unknown(_)));
386 }
387
388 #[test]
389 fn test_cluster_as_str() {
390 assert_eq!(Cluster::Stocks.as_str(), "stocks");
391 assert_eq!(Cluster::Options.as_str(), "options");
392 assert_eq!(Cluster::Crypto.as_str(), "crypto");
393 assert_eq!(Cluster::Futures.as_str(), "futures");
394 assert_eq!(Cluster::Indices.as_str(), "indices");
395 }
396}