market_data/publishers/
polygon_io.rs

1//! Fetch time series stock data from [Polygon.io](https://polygon.io/docs/stocks/get_v2_aggs_ticker__stocksticker__range__multiplier___timespan___from___to)
2///
3///Claim your [API Key](https://polygon.io/pricing)
4///
5/// Example Daily requests:
6/// https://api.polygon.io/v2/aggs/ticker/AAPL/range/1/day/2023-01-09/2024-01-09?adjusted=true&sort=asc&limit=120&apiKey=<your_api_key>
7///
8/// Example Intraday requests:
9/// https://api.polygon.io/v2/aggs/ticker/AAPL/range/1/hour/2023-01-09/2024-01-09?adjusted=true&sort=asc&limit=120&apiKey=<your_api_key>
10///  
11use chrono::DateTime;
12use serde::{Deserialize, Serialize};
13use url::Url;
14
15use crate::{
16    client::{Interval, MarketSeries, Series},
17    errors::{MarketError, MarketResult},
18    publishers::Publisher,
19    rest_call::Client,
20};
21
22const BASE_URL: &str = "https://api.polygon.io/v2/aggs/ticker/";
23
24/// Fetch time series stock data from [Polygon.io](), implements Publisher trait
25#[derive(Debug, Default)]
26pub struct Polygon {
27    token: String,
28    requests: Vec<PolygonRequest>,
29    endpoints: Vec<url::Url>,
30    data: Vec<PolygonPrices>,
31    // interval should be maintained, as it is necesarry in transformation phase
32    // TO FIX, should be a Vec<Interval>
33    interval: Interval,
34}
35
36#[derive(Debug, Default)]
37pub struct PolygonRequest {
38    symbol: String,
39    // The size of the time window.
40    timespan: String,
41    // The size of the timespan multiplier.
42    multiplier: i32,
43    // The start of the aggregate time window (YYYY-MM-DD)
44    from_date: String,
45    // The end of the aggregate time window (YYYY-MM-DD)
46    to_date: String,
47    // Limits the number of base aggregates queried to create the aggregate results. Max 50000 and Default 5000
48    limit: i32,
49}
50
51impl Polygon {
52    /// create new instance of Twelvedata
53    pub fn new(token: impl Into<String>) -> Self {
54        Polygon {
55            token: token.into(),
56            ..Default::default()
57        }
58    }
59
60    /// Request for intraday series
61    /// it supports only the following intervals: 1min, 5min, 15min, 30min, 1h, 2h, 4h
62    pub fn intraday_series(
63        &mut self,
64        symbol: impl Into<String>,
65        from_date: impl Into<String>,
66        to_date: impl Into<String>,
67        interval: Interval,
68        limit: i32,
69    ) -> MarketResult<()> {
70        let (timespan, multiplier) = match interval {
71            Interval::Min1 => ("minute", 1),
72            Interval::Min5 => ("minute", 5),
73            Interval::Min15 => ("minute", 15),
74            Interval::Min30 => ("minute", 30),
75            Interval::Hour1 => ("hour", 1),
76            Interval::Hour2 => ("hour", 2),
77            Interval::Hour4 => ("hour", 4),
78            _ => Err(MarketError::UnsuportedInterval(format!(
79                "{} Unsuported Interval",
80                interval
81            )))?,
82        };
83        self.interval = interval;
84        self.requests.push(PolygonRequest {
85            symbol: symbol.into(),
86            timespan: timespan.into(),
87            multiplier,
88            from_date: from_date.into(),
89            to_date: to_date.into(),
90            limit,
91        });
92        Ok(())
93    }
94
95    /// Request for daily series
96    pub fn daily_series(
97        &mut self,
98        symbol: impl Into<String>,
99        from_date: impl Into<String>,
100        to_date: impl Into<String>,
101        limit: i32,
102    ) -> () {
103        self.interval = Interval::Daily;
104        self.requests.push(PolygonRequest {
105            symbol: symbol.into(),
106            timespan: String::from("day"),
107            multiplier: 1,
108            from_date: from_date.into(),
109            to_date: to_date.into(),
110            limit,
111        });
112    }
113
114    /// Request for weekly series
115    pub fn weekly_series(
116        &mut self,
117        symbol: impl Into<String>,
118        from_date: impl Into<String>,
119        to_date: impl Into<String>,
120        limit: i32,
121    ) -> () {
122        self.interval = Interval::Weekly;
123        self.requests.push(PolygonRequest {
124            symbol: symbol.into(),
125            timespan: String::from("week"),
126            multiplier: 1,
127            from_date: from_date.into(),
128            to_date: to_date.into(),
129            limit,
130        });
131    }
132
133    /// Request for monthly series
134    pub fn monthly_series(
135        &mut self,
136        symbol: impl Into<String>,
137        from_date: impl Into<String>,
138        to_date: impl Into<String>,
139        limit: i32,
140    ) -> () {
141        self.interval = Interval::Monthly;
142        self.requests.push(PolygonRequest {
143            symbol: symbol.into(),
144            timespan: String::from("month"),
145            multiplier: 1,
146            from_date: from_date.into(),
147            to_date: to_date.into(),
148            limit,
149        });
150    }
151}
152
153impl Publisher for Polygon {
154    fn create_endpoint(&mut self) -> MarketResult<()> {
155        let base_url = Url::parse(BASE_URL)?;
156        self.endpoints = self
157            .requests
158            .iter()
159            .map(|request| {
160                let constructed_url = base_url
161                    .join(&format!(
162                        "{}/range/{}/{}/{}/{}?sort=asc&limit={}&apiKey={}",
163                        request.symbol,
164                        request.multiplier,
165                        request.timespan,
166                        request.from_date,
167                        request.to_date,
168                        request.limit,
169                        self.token,
170                    ))
171                    .unwrap();
172                constructed_url
173            })
174            .collect();
175
176        // self.requests have to be consumed once used for creating the endpoints
177        self.requests.clear();
178
179        Ok(())
180    }
181
182    #[cfg(feature = "use-sync")]
183    fn get_data(&mut self) -> MarketResult<()> {
184        let rest_client = Client::new();
185        for endpoint in &self.endpoints {
186            let response = rest_client.get_data(endpoint)?;
187            let body = response.into_string()?;
188
189            let prices: PolygonPrices = serde_json::from_str(&body)?;
190            self.data.push(prices);
191        }
192
193        // self.endpoints have to be consumed once the data was downloaded for requested URL
194        self.endpoints.clear();
195
196        Ok(())
197    }
198
199    #[cfg(feature = "use-async")]
200    async fn get_data(&mut self) -> MarketResult<()> {
201        let client = Client::new();
202        for endpoint in &self.endpoints {
203            let response = client.get_data(endpoint).await?;
204            let body = response.text().await?;
205
206            let prices: PolygonPrices = serde_json::from_str(&body)?;
207            self.data.push(prices);
208        }
209
210        // self.endpoints have to be consumed once the data was downloaded for requested URL
211        self.endpoints.clear();
212
213        Ok(())
214    }
215
216    fn to_writer(&self, writer: impl std::io::Write) -> MarketResult<()> {
217        serde_json::to_writer(writer, &self.data).map_err(|err| {
218            MarketError::ToWriter(format!("Unable to write to writer, got the error: {}", err))
219        })?;
220
221        Ok(())
222    }
223
224    fn transform_data(&mut self) -> Vec<MarketResult<MarketSeries>> {
225        let mut result = Vec::new();
226        for data in self.data.iter() {
227            let parsed_data = transform(data, self.interval.clone());
228            result.push(parsed_data)
229        }
230
231        // self.data have to be consumed once the data is transformed to MarketSeries
232        self.data.clear();
233        result
234    }
235}
236
237// PolygonPrices is the struct returned by Polygon.io on Aggregates API
238#[derive(Debug, Serialize, Deserialize)]
239struct PolygonPrices {
240    // Whether or not this response was adjusted for splits.
241    adjusted: bool,
242    // If present, this value can be used to fetch the next page of data.
243    next_url: Option<String>,
244    // The number of aggregates (minute or day) used to generate the response.
245    #[serde(rename = "queryCount")]
246    query_count: i32,
247    // A request id assigned by the server.
248    request_id: String,
249    // The total number of results for this request.
250    #[serde(rename = "results")]
251    time_series: Vec<TimeSeriesData>,
252    #[serde(rename = "resultsCount")]
253    results_count: i32,
254    // The status of this request's response.
255    status: String,
256    // The exchange symbol that this item is traded under.
257    ticker: String,
258}
259
260#[derive(Debug, Serialize, Deserialize)]
261struct TimeSeriesData {
262    // The close price for the symbol in the given time period.
263    c: f32,
264    // The highest price for the symbol in the given time period.
265    h: f32,
266    // The lowest price for the symbol in the given time period.
267    l: f32,
268    // The number of transactions in the aggregate window.
269    n: i32,
270    // The open price for the symbol in the given time period.
271    o: f32,
272    // The Unix Msec timestamp for the start of the aggregate window.
273    t: i64,
274    // The trading volume of the symbol in the given time period.
275    v: f64,
276    // The volume weighted average price.
277    vw: f32,
278}
279
280fn transform(data: &PolygonPrices, interval: Interval) -> MarketResult<MarketSeries> {
281    // validate the data, first check is status
282    if data.status != "OK".to_string() {
283        return Err(MarketError::DownloadedData(format!(
284            "Downloaded data status is: {}",
285            data.status
286        )));
287    }
288
289    let mut data_series: Vec<Series> = Vec::with_capacity(data.time_series.len());
290
291    for series in data.time_series.iter() {
292        let open: f32 = series.o;
293        let close: f32 = series.c;
294        let high: f32 = series.h;
295        let low: f32 = series.l;
296        let volume: f32 = series.v as f32;
297
298        // Create a NaiveDateTime from the Unix timestamp
299        let datetime = DateTime::from_timestamp_millis(series.t).ok_or(
300            MarketError::ParsingError(format!("Unable to parse the timestamp")),
301        )?;
302
303        // Extract the date part
304        let date = datetime.date_naive();
305
306        data_series.push(Series {
307            date,
308            open,
309            close,
310            high,
311            low,
312            volume,
313        })
314    }
315
316    // sort the series by date - No needed as it is coming already sorted
317    // data_series.sort_by_key(|item| item.date);
318
319    Ok(MarketSeries {
320        symbol: data.ticker.clone(),
321        interval: interval,
322        data: data_series,
323    })
324}