1use reqwest::Client;
2use serde::de::DeserializeOwned;
3use std::marker::PhantomData;
4use std::time::{Duration, Instant};
5use tokio::time::interval;
6
7pub const POLL_INTERVAL_MS: u64 = 500;
8pub const POOL_MAX_IDLE_PER_HOST: usize = 1;
9pub const POOL_IDLE_TIMEOUT_SECS: u64 = 90;
10pub const REQUEST_TIMEOUT_MS: u64 = 1000;
11pub const TCP_KEEPALIVE_SECS: u64 = 60;
12
13pub struct JsonPoller<T> {
14 client: Client,
15 url: String,
16 poll_interval: Duration,
17 _phantom: PhantomData<T>,
18}
19
20pub struct JsonPollerBuilder<T> {
21 url: String,
22 poll_interval_ms: u64,
23 pool_max_idle_per_host: usize,
24 pool_idle_timeout_secs: u64,
25 request_timeout_ms: u64,
26 tcp_keepalive_secs: u64,
27 _phantom: PhantomData<T>,
28}
29
30impl<T> JsonPollerBuilder<T> {
31 pub fn new(url: impl Into<String>) -> Self {
32 Self {
33 url: url.into(),
34 poll_interval_ms: POLL_INTERVAL_MS,
35 pool_max_idle_per_host: POOL_MAX_IDLE_PER_HOST,
36 pool_idle_timeout_secs: POOL_IDLE_TIMEOUT_SECS,
37 request_timeout_ms: REQUEST_TIMEOUT_MS,
38 tcp_keepalive_secs: TCP_KEEPALIVE_SECS,
39 _phantom: PhantomData,
40 }
41 }
42
43 pub fn poll_interval_ms(mut self, ms: u64) -> Self {
44 self.poll_interval_ms = ms;
45 self
46 }
47
48 pub fn pool_max_idle_per_host(mut self, max: usize) -> Self {
49 self.pool_max_idle_per_host = max;
50 self
51 }
52
53 pub fn pool_idle_timeout_secs(mut self, secs: u64) -> Self {
54 self.pool_idle_timeout_secs = secs;
55 self
56 }
57
58 pub fn request_timeout_ms(mut self, ms: u64) -> Self {
59 self.request_timeout_ms = ms;
60 self
61 }
62
63 pub fn tcp_keepalive_secs(mut self, secs: u64) -> Self {
64 self.tcp_keepalive_secs = secs;
65 self
66 }
67
68 pub fn build(self) -> Result<JsonPoller<T>, reqwest::Error> {
69 let client = Client::builder()
70 .pool_max_idle_per_host(self.pool_max_idle_per_host)
71 .pool_idle_timeout(Duration::from_secs(self.pool_idle_timeout_secs))
72 .timeout(Duration::from_millis(self.request_timeout_ms))
73 .tcp_keepalive(Duration::from_secs(self.tcp_keepalive_secs))
74 .build()?;
75
76 Ok(JsonPoller {
77 client,
78 url: self.url,
79 poll_interval: Duration::from_millis(self.poll_interval_ms),
80 _phantom: PhantomData,
81 })
82 }
83}
84
85impl<T> JsonPoller<T>
113where
114 T: DeserializeOwned + Send,
115{
116 pub fn builder(url: impl Into<String>) -> JsonPollerBuilder<T> {
117 JsonPollerBuilder::new(url)
118 }
119
120 pub async fn start<F>(&self, mut on_data: F)
121 where
122 F: FnMut(T, Duration) + Send,
123 {
124 let mut interval_timer = interval(self.poll_interval);
125 interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
126
127 loop {
128 interval_timer.tick().await;
129 let request_start = Instant::now();
130 match self.fetch().await {
131 Ok(data) => {
132 let elapsed = request_start.elapsed();
133 on_data(data, elapsed);
134 }
135 Err(e) => {
136 tracing::error!("Failed to fetch data: {:?}", e);
137 }
138 }
139 }
140 }
141
142 async fn fetch(&self) -> Result<T, Box<dyn std::error::Error>> {
143 let response = self.client.get(&self.url).send().await.map_err(|e| {
144 tracing::error!("Request failed: {:?}", e);
145 e
146 })?;
147
148 let status = response.status();
149 if !status.is_success() {
150 tracing::error!("HTTP error: {}", status);
151 return Err(format!("HTTP {}", status).into());
152 }
153
154 let data = response.json::<T>().await.map_err(|e| {
155 tracing::error!("JSON parse failed: {:?}", e);
156 e
157 })?;
158
159 Ok(data)
160 }
161
162 pub async fn fetch_once(&self) -> Result<T, Box<dyn std::error::Error>> {
163 self.fetch().await
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use serde::Deserialize;
171
172 #[derive(Debug, Deserialize, PartialEq)]
173 struct HttpBinJson {
174 slideshow: Slideshow,
175 }
176
177 #[derive(Debug, Deserialize, PartialEq)]
178 struct Slideshow {
179 author: String,
180 date: String,
181 title: String,
182 slides: Vec<Slide>,
183 }
184
185 #[derive(Debug, Deserialize, PartialEq)]
186 struct Slide {
187 title: String,
188 #[serde(rename = "type")]
189 slide_type: String,
190 #[serde(default)]
191 items: Vec<String>,
192 }
193
194 #[test]
195 fn test_builder_defaults() {
196 let poller = JsonPoller::<HttpBinJson>::builder("https://example.com")
197 .build()
198 .unwrap();
199
200 assert_eq!(
201 poller.poll_interval,
202 Duration::from_millis(POLL_INTERVAL_MS)
203 );
204 assert_eq!(poller.url, "https://example.com");
205 }
206
207 #[test]
208 fn test_builder_custom_config() {
209 let poller = JsonPoller::<HttpBinJson>::builder("https://example.com")
210 .poll_interval_ms(1000)
211 .request_timeout_ms(2000)
212 .build()
213 .unwrap();
214
215 assert_eq!(poller.poll_interval, Duration::from_millis(1000));
216 }
217
218 #[tokio::test]
219 async fn test_http_error() {
220 let poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/status/404")
221 .build()
222 .unwrap();
223
224 let result = poller.fetch_once().await;
225 assert!(result.is_err());
226 }
227
228 #[tokio::test]
229 async fn test_invalid_json() {
230 let poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/html")
231 .build()
232 .unwrap();
233
234 let result = poller.fetch_once().await;
235 assert!(result.is_err());
236 }
237
238 #[tokio::test]
239 async fn test_fetch_once() {
240 let json_poller = JsonPoller::<HttpBinJson>::builder("https://httpbin.org/json")
241 .build()
242 .unwrap();
243 let data = json_poller.fetch_once().await.unwrap();
244
245 assert_eq!(data.slideshow.author, "Yours Truly");
246 assert_eq!(data.slideshow.title, "Sample Slide Show");
247 }
248}