Skip to main content

json_poller/
lib.rs

1use reqwest::Client;
2use serde::de::DeserializeOwned;
3use std::marker::PhantomData;
4use std::time::{Duration, Instant};
5use tokio::time::interval;
6
7pub const POLL_INTERVAL_MS: u64 = 500;
8pub const POOL_MAX_IDLE_PER_HOST: usize = 1;
9pub const POOL_IDLE_TIMEOUT_SECS: u64 = 90;
10pub const REQUEST_TIMEOUT_MS: u64 = 1000;
11pub const TCP_KEEPALIVE_SECS: u64 = 60;
12
13pub struct JsonPoller<T> {
14    client: Client,
15    url: String,
16    poll_interval: Duration,
17    _phantom: PhantomData<T>,
18}
19
20pub struct JsonPollerBuilder<T> {
21    url: String,
22    poll_interval_ms: u64,
23    pool_max_idle_per_host: usize,
24    pool_idle_timeout_secs: u64,
25    request_timeout_ms: u64,
26    tcp_keepalive_secs: u64,
27    _phantom: PhantomData<T>,
28}
29
30impl<T> JsonPollerBuilder<T> {
31    pub fn new(url: impl Into<String>) -> Self {
32        Self {
33            url: url.into(),
34            poll_interval_ms: POLL_INTERVAL_MS,
35            pool_max_idle_per_host: POOL_MAX_IDLE_PER_HOST,
36            pool_idle_timeout_secs: POOL_IDLE_TIMEOUT_SECS,
37            request_timeout_ms: REQUEST_TIMEOUT_MS,
38            tcp_keepalive_secs: TCP_KEEPALIVE_SECS,
39            _phantom: PhantomData,
40        }
41    }
42
43    pub fn poll_interval_ms(mut self, ms: u64) -> Self {
44        self.poll_interval_ms = ms;
45        self
46    }
47
48    pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
49        self.pool_max_idle_per_host = max;
50        self
51    }
52
53    pub fn pool_idle_timeout_secs(mut self, secs: u64) -> Self {
54        self.pool_idle_timeout_secs = secs;
55        self
56    }
57
58    pub fn request_timeout_ms(mut self, ms: u64) -> Self {
59        self.request_timeout_ms = ms;
60        self
61    }
62
63    pub fn tcp_keepalive_secs(mut self, secs: u64) -> Self {
64        self.tcp_keepalive_secs = secs;
65        self
66    }
67
68    pub fn build(self) -> Result<JsonPoller<T>, reqwest::Error> {
69        let client = Client::builder()
70            .pool_max_idle_per_host(self.pool_max_idle_per_host)
71            .pool_idle_timeout(Duration::from_secs(self.pool_idle_timeout_secs))
72            .timeout(Duration::from_millis(self.request_timeout_ms))
73            .tcp_keepalive(Duration::from_secs(self.tcp_keepalive_secs))
74            .build()?;
75
76        Ok(JsonPoller {
77            client,
78            url: self.url,
79            poll_interval: Duration::from_millis(self.poll_interval_ms),
80            _phantom: PhantomData,
81        })
82    }
83}
84
85/// Polls a JSON endpoint at regular intervals with connection reuse.
86///
87/// Keeps HTTP connections alive between requests for faster polling.
88///
89/// # Example
90///
91/// ```no_run
92/// use json_poller::JsonPoller;
93/// use serde::Deserialize;
94///
95/// #[derive(Deserialize)]
96/// struct Price {
97///     symbol: String,
98///     price: f64,
99/// }
100///
101/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
102/// let poller = JsonPoller::<Price>::builder("https://api.example.com/price")
103///     .poll_interval_ms(250)
104///     .build()?;
105///
106/// poller.start(|price, _duration| {
107///     println!("{}: ${}", price.symbol, price.price);
108/// }).await;
109/// # Ok(())
110/// # }
111/// ```
112impl<T> JsonPoller<T>
113where
114    T: DeserializeOwned + Send,
115{
116    pub fn builder(url: impl Into<String>) -> JsonPollerBuilder<T> {
117        JsonPollerBuilder::new(url)
118    }
119
120    pub async fn start<F>(&self, mut on_data: F)
121    where
122        F: FnMut(T, Duration) + Send,
123    {
124        let mut interval_timer = interval(self.poll_interval);
125        interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
126
127        loop {
128            interval_timer.tick().await;
129            let request_start = Instant::now();
130            match self.fetch().await {
131                Ok(data) => {
132                    let elapsed = request_start.elapsed();
133                    on_data(data, elapsed);
134                }
135                Err(e) => {
136                    tracing::error!("Failed to fetch data: {:?}", e);
137                }
138            }
139        }
140    }
141
142    async fn fetch(&self) -> Result<T, Box<dyn std::error::Error>> {
143        let response = self.client.get(&self.url).send().await.map_err(|e| {
144            tracing::error!("Request failed: {:?}", e);
145            e
146        })?;
147
148        let status = response.status();
149        if !status.is_success() {
150            tracing::error!("HTTP error: {}", status);
151            return Err(format!("HTTP {}", status).into());
152        }
153
154        let data = response.json::<T>().await.map_err(|e| {
155            tracing::error!("JSON parse failed: {:?}", e);
156            e
157        })?;
158
159        Ok(data)
160    }
161
162    pub async fn fetch_once(&self) -> Result<T, Box<dyn std::error::Error>> {
163        self.fetch().await
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use serde::Deserialize;
171
172    #[derive(Debug, Deserialize, PartialEq)]
173    struct HttpBinJson {
174        slideshow: Slideshow,
175    }
176
177    #[derive(Debug, Deserialize, PartialEq)]
178    struct Slideshow {
179        author: String,
180        date: String,
181        title: String,
182        slides: Vec<Slide>,
183    }
184
185    #[derive(Debug, Deserialize, PartialEq)]
186    struct Slide {
187        title: String,
188        #[serde(rename = "type")]
189        slide_type: String,
190        #[serde(default)]
191        items: Vec<String>,
192    }
193
194    #[test]
195    fn test_builder_defaults() {
196        let poller = JsonPoller::<HttpBinJson>::builder("https://example.com")
197            .build()
198            .unwrap();
199
200        assert_eq!(
201            poller.poll_interval,
202            Duration::from_millis(POLL_INTERVAL_MS)
203        );
204        assert_eq!(poller.url, "https://example.com");
205    }
206
207    #[test]
208    fn test_builder_custom_config() {
209        let poller = JsonPoller::<HttpBinJson>::builder("https://example.com")
210            .poll_interval_ms(1000)
211            .request_timeout_ms(2000)
212            .build()
213            .unwrap();
214
215        assert_eq!(poller.poll_interval, Duration::from_millis(1000));
216    }
217
218    #[tokio::test]
219    async fn test_http_error() {
220        let poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/status/404")
221            .build()
222            .unwrap();
223
224        let result = poller.fetch_once().await;
225        assert!(result.is_err());
226    }
227
228    #[tokio::test]
229    async fn test_invalid_json() {
230        let poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/html")
231            .build()
232            .unwrap();
233
234        let result = poller.fetch_once().await;
235        assert!(result.is_err());
236    }
237
238    #[tokio::test]
239    async fn test_fetch_once() {
240        let json_poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/json")
241            .build()
242            .unwrap();
243        let data = json_poller.fetch_once().await.unwrap();
244
245        assert_eq!(data.slideshow.author, "Yours Truly");
246        assert_eq!(data.slideshow.title, "Sample Slide Show");
247    }
248}