1use chrono::{DateTime, Utc};
11use futures::{SinkExt, StreamExt};
12use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
13use reqwest::{Client, Method};
14use rust_decimal::Decimal;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::num::NonZeroU32;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::RwLock;
21use tokio_tungstenite::{connect_async, tungstenite::Message};
22use tracing::{debug, error, info, warn};
23use url::Url;
24
25#[derive(Debug, Clone)]
27pub struct PolygonConfig {
28 pub api_key: String,
30 pub streaming: bool,
32 pub timeout: Duration,
34}
35
36impl Default for PolygonConfig {
37 fn default() -> Self {
38 Self {
39 api_key: String::new(),
40 streaming: true,
41 timeout: Duration::from_secs(30),
42 }
43 }
44}
45
46pub struct PolygonClient {
48 client: Client,
49 config: PolygonConfig,
50 base_url: String,
51 ws_url: String,
52 rate_limiter: DefaultDirectRateLimiter,
53 last_quote: Arc<RwLock<Option<PolygonQuote>>>,
54}
55
56impl PolygonClient {
57 pub fn new(config: PolygonConfig) -> Self {
59 let client = Client::builder()
60 .timeout(config.timeout)
61 .build()
62 .expect("Failed to create HTTP client");
63
64 let quota = Quota::per_minute(NonZeroU32::new(5).unwrap());
66 let rate_limiter = RateLimiter::direct(quota);
67
68 Self {
69 client,
70 config,
71 base_url: "https://api.polygon.io".to_string(),
72 ws_url: "wss://socket.polygon.io".to_string(),
73 rate_limiter,
74 last_quote: Arc::new(RwLock::new(None)),
75 }
76 }
77
78 pub async fn get_last_quote(&self, symbol: &str) -> Result<PolygonQuote, PolygonError> {
80 self.rate_limiter.until_ready().await;
81
82 let url = format!(
83 "{}/v2/last/nbbo/{}?apiKey={}",
84 self.base_url, symbol, self.config.api_key
85 );
86
87 debug!("Polygon API request: GET {}", url);
88
89 let response = self.client.get(&url).send().await?;
90
91 if response.status().is_success() {
92 let result: PolygonLastQuoteResponse = response.json().await?;
93 Ok(result.results)
94 } else {
95 let error_text = response.text().await.unwrap_or_default();
96 error!("Polygon API error: {}", error_text);
97 Err(PolygonError::ApiError(error_text))
98 }
99 }
100
101 pub async fn get_daily_aggregates(
103 &self,
104 symbol: &str,
105 from: DateTime<Utc>,
106 to: DateTime<Utc>,
107 ) -> Result<Vec<PolygonAggregate>, PolygonError> {
108 self.rate_limiter.until_ready().await;
109
110 let url = format!(
111 "{}/v2/aggs/ticker/{}/range/1/day/{}/{}?adjusted=true&sort=asc&apiKey={}",
112 self.base_url,
113 symbol,
114 from.format("%Y-%m-%d"),
115 to.format("%Y-%m-%d"),
116 self.config.api_key
117 );
118
119 debug!("Polygon API request: GET {}", url);
120
121 let response = self.client.get(&url).send().await?;
122
123 if response.status().is_success() {
124 let result: PolygonAggregatesResponse = response.json().await?;
125 Ok(result.results)
126 } else {
127 let error_text = response.text().await.unwrap_or_default();
128 error!("Polygon API error: {}", error_text);
129 Err(PolygonError::ApiError(error_text))
130 }
131 }
132
133 pub async fn get_aggregates(
135 &self,
136 symbol: &str,
137 multiplier: u32,
138 timespan: &str, from: DateTime<Utc>,
140 to: DateTime<Utc>,
141 ) -> Result<Vec<PolygonAggregate>, PolygonError> {
142 self.rate_limiter.until_ready().await;
143
144 let url = format!(
145 "{}/v2/aggs/ticker/{}/range/{}/{}/{}/{}?adjusted=true&sort=asc&apiKey={}",
146 self.base_url,
147 symbol,
148 multiplier,
149 timespan,
150 from.timestamp_millis(),
151 to.timestamp_millis(),
152 self.config.api_key
153 );
154
155 debug!("Polygon API request: GET {}", url);
156
157 let response = self.client.get(&url).send().await?;
158
159 if response.status().is_success() {
160 let result: PolygonAggregatesResponse = response.json().await?;
161 Ok(result.results)
162 } else {
163 let error_text = response.text().await.unwrap_or_default();
164 Err(PolygonError::ApiError(error_text))
165 }
166 }
167
168 pub async fn get_snapshot_all(&self) -> Result<Vec<PolygonTickerSnapshot>, PolygonError> {
170 self.rate_limiter.until_ready().await;
171
172 let url = format!(
173 "{}/v2/snapshot/locale/us/markets/stocks/tickers?apiKey={}",
174 self.base_url, self.config.api_key
175 );
176
177 let response = self.client.get(&url).send().await?;
178
179 if response.status().is_success() {
180 let result: PolygonSnapshotResponse = response.json().await?;
181 Ok(result.tickers)
182 } else {
183 let error_text = response.text().await.unwrap_or_default();
184 Err(PolygonError::ApiError(error_text))
185 }
186 }
187
188 pub async fn start_streaming(
190 &self,
191 symbols: Vec<String>,
192 ) -> Result<(), PolygonError> {
193 if !self.config.streaming {
194 return Ok(());
195 }
196
197 let ws_url = format!("{}/stocks", self.ws_url);
198 let url = Url::parse(&ws_url).map_err(|e| PolygonError::WebSocketError(e.to_string()))?;
199
200 let (ws_stream, _) = connect_async(url)
201 .await
202 .map_err(|e| PolygonError::WebSocketError(e.to_string()))?;
203
204 let (mut write, mut read) = ws_stream.split();
205
206 let auth_msg = serde_json::json!({
208 "action": "auth",
209 "params": self.config.api_key
210 });
211
212 write
213 .send(Message::Text(auth_msg.to_string()))
214 .await
215 .map_err(|e| PolygonError::WebSocketError(e.to_string()))?;
216
217 let num_symbols = symbols.len();
219 for symbol in symbols {
220 let subscribe_msg = serde_json::json!({
221 "action": "subscribe",
222 "params": format!("T.{},Q.{},A.{}", symbol, symbol, symbol) });
224
225 write
226 .send(Message::Text(subscribe_msg.to_string()))
227 .await
228 .map_err(|e| PolygonError::WebSocketError(e.to_string()))?;
229 }
230
231 info!("Polygon WebSocket streaming started for {} symbols", num_symbols);
232
233 let last_quote = self.last_quote.clone();
235 tokio::spawn(async move {
236 while let Some(msg) = read.next().await {
237 match msg {
238 Ok(Message::Text(text)) => {
239 if let Ok(events) = serde_json::from_str::<Vec<PolygonWebSocketEvent>>(&text)
240 {
241 for event in events {
242 match event.ev.as_str() {
243 "Q" => {
244 debug!("Quote: {:?}", event);
246 }
247 "T" => {
248 debug!("Trade: {:?}", event);
250 }
251 "A" | "AM" => {
252 debug!("Aggregate: {:?}", event);
254 }
255 _ => {}
256 }
257 }
258 }
259 }
260 Ok(Message::Close(_)) => {
261 warn!("Polygon WebSocket closed");
262 break;
263 }
264 Err(e) => {
265 error!("Polygon WebSocket error: {}", e);
266 break;
267 }
268 _ => {}
269 }
270 }
271 });
272
273 Ok(())
274 }
275
276 pub async fn get_ticker_details(&self, symbol: &str) -> Result<PolygonTickerDetails, PolygonError> {
278 self.rate_limiter.until_ready().await;
279
280 let url = format!(
281 "{}/v3/reference/tickers/{}?apiKey={}",
282 self.base_url, symbol, self.config.api_key
283 );
284
285 let response = self.client.get(&url).send().await?;
286
287 if response.status().is_success() {
288 let result: PolygonTickerDetailsResponse = response.json().await?;
289 Ok(result.results)
290 } else {
291 let error_text = response.text().await.unwrap_or_default();
292 Err(PolygonError::ApiError(error_text))
293 }
294 }
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct PolygonQuote {
300 #[serde(rename = "T")]
301 pub symbol: String,
302 #[serde(rename = "t")]
303 pub sip_timestamp: i64,
304 #[serde(rename = "y")]
305 pub exchange_timestamp: i64,
306 #[serde(rename = "p")]
307 pub bid_price: Decimal,
308 #[serde(rename = "s")]
309 pub bid_size: i64,
310 #[serde(rename = "P")]
311 pub ask_price: Decimal,
312 #[serde(rename = "S")]
313 pub ask_size: i64,
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize)]
317pub struct PolygonAggregate {
318 #[serde(rename = "o")]
319 pub open: Decimal,
320 #[serde(rename = "h")]
321 pub high: Decimal,
322 #[serde(rename = "l")]
323 pub low: Decimal,
324 #[serde(rename = "c")]
325 pub close: Decimal,
326 #[serde(rename = "v")]
327 pub volume: i64,
328 #[serde(rename = "vw")]
329 pub vwap: Option<Decimal>,
330 #[serde(rename = "t")]
331 pub timestamp: i64,
332 #[serde(rename = "n")]
333 pub transactions: Option<i64>,
334}
335
336#[derive(Debug, Deserialize)]
337struct PolygonLastQuoteResponse {
338 status: String,
339 results: PolygonQuote,
340}
341
342#[derive(Debug, Deserialize)]
343struct PolygonAggregatesResponse {
344 ticker: String,
345 #[serde(default)]
346 results: Vec<PolygonAggregate>,
347 status: String,
348}
349
350#[derive(Debug, Clone, Deserialize)]
351pub struct PolygonTickerSnapshot {
352 pub ticker: String,
353 pub day: Option<PolygonAggregate>,
354 #[serde(rename = "lastQuote")]
355 pub last_quote: Option<PolygonQuote>,
356 #[serde(rename = "lastTrade")]
357 pub last_trade: Option<PolygonTrade>,
358 #[serde(rename = "prevDay")]
359 pub prev_day: Option<PolygonAggregate>,
360}
361
362#[derive(Debug, Clone, Deserialize)]
363pub struct PolygonTrade {
364 #[serde(rename = "p")]
365 pub price: Decimal,
366 #[serde(rename = "s")]
367 pub size: i64,
368 #[serde(rename = "t")]
369 pub timestamp: i64,
370}
371
372#[derive(Debug, Deserialize)]
373struct PolygonSnapshotResponse {
374 status: String,
375 tickers: Vec<PolygonTickerSnapshot>,
376}
377
378#[derive(Debug, Clone, Deserialize)]
379pub struct PolygonWebSocketEvent {
380 pub ev: String,
381 #[serde(rename = "sym")]
382 pub symbol: Option<String>,
383 #[serde(flatten)]
384 pub data: serde_json::Value,
385}
386
387#[derive(Debug, Clone, Deserialize)]
388pub struct PolygonTickerDetails {
389 pub ticker: String,
390 pub name: String,
391 pub market: String,
392 pub locale: String,
393 pub primary_exchange: String,
394 #[serde(rename = "type")]
395 pub ticker_type: String,
396 pub active: bool,
397 pub currency_name: String,
398 pub cik: Option<String>,
399 pub description: Option<String>,
400}
401
402#[derive(Debug, Deserialize)]
403struct PolygonTickerDetailsResponse {
404 status: String,
405 results: PolygonTickerDetails,
406}
407
408#[derive(Debug, thiserror::Error)]
410pub enum PolygonError {
411 #[error("API error: {0}")]
412 ApiError(String),
413
414 #[error("WebSocket error: {0}")]
415 WebSocketError(String),
416
417 #[error("Network error: {0}")]
418 Network(#[from] reqwest::Error),
419
420 #[error("Parse error: {0}")]
421 Parse(#[from] serde_json::Error),
422
423 #[error("Rate limit exceeded")]
424 RateLimit,
425
426 #[error(transparent)]
427 Other(#[from] anyhow::Error),
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433
434 #[test]
435 fn test_polygon_client_creation() {
436 let _config = PolygonConfig {
437 api_key: "test_key".to_string(),
438 ..Default::default()
439 };
440 let client = PolygonClient::new(config);
441 assert_eq!(client.base_url, "https://api.polygon.io");
442 }
443}