nt_execution/
polygon_broker.rs

1// Polygon.io market data integration
2//
3// Features:
4// - REST API v2 for market data
5// - WebSocket streaming for real-time quotes, trades, and aggregates
6// - Historical data retrieval (stocks, options, forex, crypto)
7// - Technical indicators and aggregates
8// - Options chain data
9
10use chrono::{DateTime, Utc};
11use futures::{SinkExt, StreamExt};
12use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
13use reqwest::{Client, Method};
14use rust_decimal::Decimal;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::num::NonZeroU32;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::RwLock;
21use tokio_tungstenite::{connect_async, tungstenite::Message};
22use tracing::{debug, error, info, warn};
23use url::Url;
24
25/// Polygon.io configuration
26#[derive(Debug, Clone)]
27pub struct PolygonConfig {
28    /// API key from polygon.io
29    pub api_key: String,
30    /// Enable WebSocket streaming
31    pub streaming: bool,
32    /// Request timeout
33    pub timeout: Duration,
34}
35
36impl Default for PolygonConfig {
37    fn default() -> Self {
38        Self {
39            api_key: String::new(),
40            streaming: true,
41            timeout: Duration::from_secs(30),
42        }
43    }
44}
45
46/// Polygon.io client for market data
47pub struct PolygonClient {
48    client: Client,
49    config: PolygonConfig,
50    base_url: String,
51    ws_url: String,
52    rate_limiter: DefaultDirectRateLimiter,
53    last_quote: Arc<RwLock<Option<PolygonQuote>>>,
54}
55
56impl PolygonClient {
57    /// Create a new Polygon.io client
58    pub fn new(config: PolygonConfig) -> Self {
59        let client = Client::builder()
60            .timeout(config.timeout)
61            .build()
62            .expect("Failed to create HTTP client");
63
64        // Free tier: 5 requests/minute, Paid: higher limits
65        let quota = Quota::per_minute(NonZeroU32::new(5).unwrap());
66        let rate_limiter = RateLimiter::direct(quota);
67
68        Self {
69            client,
70            config,
71            base_url: "https://api.polygon.io".to_string(),
72            ws_url: "wss://socket.polygon.io".to_string(),
73            rate_limiter,
74            last_quote: Arc::new(RwLock::new(None)),
75        }
76    }
77
78    /// Get last quote for a symbol
79    pub async fn get_last_quote(&self, symbol: &str) -> Result<PolygonQuote, PolygonError> {
80        self.rate_limiter.until_ready().await;
81
82        let url = format!(
83            "{}/v2/last/nbbo/{}?apiKey={}",
84            self.base_url, symbol, self.config.api_key
85        );
86
87        debug!("Polygon API request: GET {}", url);
88
89        let response = self.client.get(&url).send().await?;
90
91        if response.status().is_success() {
92            let result: PolygonLastQuoteResponse = response.json().await?;
93            Ok(result.results)
94        } else {
95            let error_text = response.text().await.unwrap_or_default();
96            error!("Polygon API error: {}", error_text);
97            Err(PolygonError::ApiError(error_text))
98        }
99    }
100
101    /// Get daily aggregates (OHLCV) for a symbol
102    pub async fn get_daily_aggregates(
103        &self,
104        symbol: &str,
105        from: DateTime<Utc>,
106        to: DateTime<Utc>,
107    ) -> Result<Vec<PolygonAggregate>, PolygonError> {
108        self.rate_limiter.until_ready().await;
109
110        let url = format!(
111            "{}/v2/aggs/ticker/{}/range/1/day/{}/{}?adjusted=true&sort=asc&apiKey={}",
112            self.base_url,
113            symbol,
114            from.format("%Y-%m-%d"),
115            to.format("%Y-%m-%d"),
116            self.config.api_key
117        );
118
119        debug!("Polygon API request: GET {}", url);
120
121        let response = self.client.get(&url).send().await?;
122
123        if response.status().is_success() {
124            let result: PolygonAggregatesResponse = response.json().await?;
125            Ok(result.results)
126        } else {
127            let error_text = response.text().await.unwrap_or_default();
128            error!("Polygon API error: {}", error_text);
129            Err(PolygonError::ApiError(error_text))
130        }
131    }
132
133    /// Get intraday aggregates with custom timespan
134    pub async fn get_aggregates(
135        &self,
136        symbol: &str,
137        multiplier: u32,
138        timespan: &str, // minute, hour, day, week, month, quarter, year
139        from: DateTime<Utc>,
140        to: DateTime<Utc>,
141    ) -> Result<Vec<PolygonAggregate>, PolygonError> {
142        self.rate_limiter.until_ready().await;
143
144        let url = format!(
145            "{}/v2/aggs/ticker/{}/range/{}/{}/{}/{}?adjusted=true&sort=asc&apiKey={}",
146            self.base_url,
147            symbol,
148            multiplier,
149            timespan,
150            from.timestamp_millis(),
151            to.timestamp_millis(),
152            self.config.api_key
153        );
154
155        debug!("Polygon API request: GET {}", url);
156
157        let response = self.client.get(&url).send().await?;
158
159        if response.status().is_success() {
160            let result: PolygonAggregatesResponse = response.json().await?;
161            Ok(result.results)
162        } else {
163            let error_text = response.text().await.unwrap_or_default();
164            Err(PolygonError::ApiError(error_text))
165        }
166    }
167
168    /// Get snapshot of all tickers
169    pub async fn get_snapshot_all(&self) -> Result<Vec<PolygonTickerSnapshot>, PolygonError> {
170        self.rate_limiter.until_ready().await;
171
172        let url = format!(
173            "{}/v2/snapshot/locale/us/markets/stocks/tickers?apiKey={}",
174            self.base_url, self.config.api_key
175        );
176
177        let response = self.client.get(&url).send().await?;
178
179        if response.status().is_success() {
180            let result: PolygonSnapshotResponse = response.json().await?;
181            Ok(result.tickers)
182        } else {
183            let error_text = response.text().await.unwrap_or_default();
184            Err(PolygonError::ApiError(error_text))
185        }
186    }
187
188    /// Start WebSocket streaming for real-time data
189    pub async fn start_streaming(
190        &self,
191        symbols: Vec<String>,
192    ) -> Result<(), PolygonError> {
193        if !self.config.streaming {
194            return Ok(());
195        }
196
197        let ws_url = format!("{}/stocks", self.ws_url);
198        let url = Url::parse(&ws_url).map_err(|e| PolygonError::WebSocketError(e.to_string()))?;
199
200        let (ws_stream, _) = connect_async(url)
201            .await
202            .map_err(|e| PolygonError::WebSocketError(e.to_string()))?;
203
204        let (mut write, mut read) = ws_stream.split();
205
206        // Authenticate
207        let auth_msg = serde_json::json!({
208            "action": "auth",
209            "params": self.config.api_key
210        });
211
212        write
213            .send(Message::Text(auth_msg.to_string()))
214            .await
215            .map_err(|e| PolygonError::WebSocketError(e.to_string()))?;
216
217        // Subscribe to symbols
218        let num_symbols = symbols.len();
219        for symbol in symbols {
220            let subscribe_msg = serde_json::json!({
221                "action": "subscribe",
222                "params": format!("T.{},Q.{},A.{}", symbol, symbol, symbol) // Trades, Quotes, Aggregates
223            });
224
225            write
226                .send(Message::Text(subscribe_msg.to_string()))
227                .await
228                .map_err(|e| PolygonError::WebSocketError(e.to_string()))?;
229        }
230
231        info!("Polygon WebSocket streaming started for {} symbols", num_symbols);
232
233        // Spawn task to handle incoming messages
234        let last_quote = self.last_quote.clone();
235        tokio::spawn(async move {
236            while let Some(msg) = read.next().await {
237                match msg {
238                    Ok(Message::Text(text)) => {
239                        if let Ok(events) = serde_json::from_str::<Vec<PolygonWebSocketEvent>>(&text)
240                        {
241                            for event in events {
242                                match event.ev.as_str() {
243                                    "Q" => {
244                                        // Quote update
245                                        debug!("Quote: {:?}", event);
246                                    }
247                                    "T" => {
248                                        // Trade update
249                                        debug!("Trade: {:?}", event);
250                                    }
251                                    "A" | "AM" => {
252                                        // Aggregate update
253                                        debug!("Aggregate: {:?}", event);
254                                    }
255                                    _ => {}
256                                }
257                            }
258                        }
259                    }
260                    Ok(Message::Close(_)) => {
261                        warn!("Polygon WebSocket closed");
262                        break;
263                    }
264                    Err(e) => {
265                        error!("Polygon WebSocket error: {}", e);
266                        break;
267                    }
268                    _ => {}
269                }
270            }
271        });
272
273        Ok(())
274    }
275
276    /// Get ticker details
277    pub async fn get_ticker_details(&self, symbol: &str) -> Result<PolygonTickerDetails, PolygonError> {
278        self.rate_limiter.until_ready().await;
279
280        let url = format!(
281            "{}/v3/reference/tickers/{}?apiKey={}",
282            self.base_url, symbol, self.config.api_key
283        );
284
285        let response = self.client.get(&url).send().await?;
286
287        if response.status().is_success() {
288            let result: PolygonTickerDetailsResponse = response.json().await?;
289            Ok(result.results)
290        } else {
291            let error_text = response.text().await.unwrap_or_default();
292            Err(PolygonError::ApiError(error_text))
293        }
294    }
295}
296
297// Polygon API types
298#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct PolygonQuote {
300    #[serde(rename = "T")]
301    pub symbol: String,
302    #[serde(rename = "t")]
303    pub sip_timestamp: i64,
304    #[serde(rename = "y")]
305    pub exchange_timestamp: i64,
306    #[serde(rename = "p")]
307    pub bid_price: Decimal,
308    #[serde(rename = "s")]
309    pub bid_size: i64,
310    #[serde(rename = "P")]
311    pub ask_price: Decimal,
312    #[serde(rename = "S")]
313    pub ask_size: i64,
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize)]
317pub struct PolygonAggregate {
318    #[serde(rename = "o")]
319    pub open: Decimal,
320    #[serde(rename = "h")]
321    pub high: Decimal,
322    #[serde(rename = "l")]
323    pub low: Decimal,
324    #[serde(rename = "c")]
325    pub close: Decimal,
326    #[serde(rename = "v")]
327    pub volume: i64,
328    #[serde(rename = "vw")]
329    pub vwap: Option<Decimal>,
330    #[serde(rename = "t")]
331    pub timestamp: i64,
332    #[serde(rename = "n")]
333    pub transactions: Option<i64>,
334}
335
336#[derive(Debug, Deserialize)]
337struct PolygonLastQuoteResponse {
338    status: String,
339    results: PolygonQuote,
340}
341
342#[derive(Debug, Deserialize)]
343struct PolygonAggregatesResponse {
344    ticker: String,
345    #[serde(default)]
346    results: Vec<PolygonAggregate>,
347    status: String,
348}
349
350#[derive(Debug, Clone, Deserialize)]
351pub struct PolygonTickerSnapshot {
352    pub ticker: String,
353    pub day: Option<PolygonAggregate>,
354    #[serde(rename = "lastQuote")]
355    pub last_quote: Option<PolygonQuote>,
356    #[serde(rename = "lastTrade")]
357    pub last_trade: Option<PolygonTrade>,
358    #[serde(rename = "prevDay")]
359    pub prev_day: Option<PolygonAggregate>,
360}
361
362#[derive(Debug, Clone, Deserialize)]
363pub struct PolygonTrade {
364    #[serde(rename = "p")]
365    pub price: Decimal,
366    #[serde(rename = "s")]
367    pub size: i64,
368    #[serde(rename = "t")]
369    pub timestamp: i64,
370}
371
372#[derive(Debug, Deserialize)]
373struct PolygonSnapshotResponse {
374    status: String,
375    tickers: Vec<PolygonTickerSnapshot>,
376}
377
378#[derive(Debug, Clone, Deserialize)]
379pub struct PolygonWebSocketEvent {
380    pub ev: String,
381    #[serde(rename = "sym")]
382    pub symbol: Option<String>,
383    #[serde(flatten)]
384    pub data: serde_json::Value,
385}
386
387#[derive(Debug, Clone, Deserialize)]
388pub struct PolygonTickerDetails {
389    pub ticker: String,
390    pub name: String,
391    pub market: String,
392    pub locale: String,
393    pub primary_exchange: String,
394    #[serde(rename = "type")]
395    pub ticker_type: String,
396    pub active: bool,
397    pub currency_name: String,
398    pub cik: Option<String>,
399    pub description: Option<String>,
400}
401
402#[derive(Debug, Deserialize)]
403struct PolygonTickerDetailsResponse {
404    status: String,
405    results: PolygonTickerDetails,
406}
407
408/// Polygon.io error types
409#[derive(Debug, thiserror::Error)]
410pub enum PolygonError {
411    #[error("API error: {0}")]
412    ApiError(String),
413
414    #[error("WebSocket error: {0}")]
415    WebSocketError(String),
416
417    #[error("Network error: {0}")]
418    Network(#[from] reqwest::Error),
419
420    #[error("Parse error: {0}")]
421    Parse(#[from] serde_json::Error),
422
423    #[error("Rate limit exceeded")]
424    RateLimit,
425
426    #[error(transparent)]
427    Other(#[from] anyhow::Error),
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433
434    #[test]
435    fn test_polygon_client_creation() {
436        let _config = PolygonConfig {
437            api_key: "test_key".to_string(),
438            ..Default::default()
439        };
440        let client = PolygonClient::new(config);
441        assert_eq!(client.base_url, "https://api.polygon.io");
442    }
443}