1use std::time::{Duration, Instant};
2
3use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use serde::Deserialize;
6use serde_json::Value;
7use tokio::sync::mpsc::Sender;
8use tokio::time::{MissedTickBehavior, interval};
9use tokio_util::sync::CancellationToken;
10
11use crate::config::{parse_config, redact_secret};
12use crate::envelope::Envelope;
13use crate::observability::{NodeCtx, SendStopped, SourceCtx};
14use crate::retry::RetryPolicy;
15use crate::sources::Source;
16use crate::sources::retry::PollScheduler;
17
18pub struct ApiPollSource {
26 id: String,
27 url: String,
28 interval: Duration,
29 retry: Option<RetryPolicy>,
30 source_ctx: SourceCtx,
31}
32
33impl ApiPollSource {
34 pub fn new(id: impl Into<String>, url: impl Into<String>, poll_interval: Duration) -> Self {
35 let id = id.into();
36 Self {
37 source_ctx: SourceCtx::new(&id),
38 id,
39 url: url.into(),
40 interval: poll_interval,
41 retry: None,
42 }
43 }
44
45 pub fn with_retry(mut self, retry: RetryPolicy) -> Self {
46 self.retry = Some(retry);
47 self
48 }
49}
50
51#[async_trait]
52impl Source for ApiPollSource {
53 fn id(&self) -> &str {
54 &self.id
55 }
56
57 fn set_node_ctx(&mut self, ctx: NodeCtx) {
58 self.source_ctx = SourceCtx::from_node_ctx(ctx);
59 }
60
61 async fn run(self: Box<Self>, tx: Sender<Envelope>, cancel: CancellationToken) {
62 let mut scheduler = PollScheduler::new(self.interval, self.retry.clone());
63 let mut ticker = interval(self.interval);
64 ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
65 ticker.tick().await; log::info!(
68 "[{}] starting poll loop at {:?}",
69 redact_secret(&self.id),
70 self.interval
71 );
72 let source_ctx = self.source_ctx.clone();
73
74 loop {
75 let start = Instant::now();
76
77 let payload = tokio::select! {
78 _ = cancel.cancelled() => {
79 log::info!("[{}] cancelled", redact_secret(&self.id));
80 return;
81 }
82 result = fetch(&self.url) => match result {
83 Ok(v) => v,
84 Err(e) => {
85 let delay = scheduler.record_failure();
86 log::error!(
87 "[{}] fetch failed (consecutive failures: {}), next attempt in {:?}: {e}",
88 redact_secret(&self.id),
89 scheduler.consecutive_failures(),
90 delay,
91 );
92 tokio::select! {
99 _ = cancel.cancelled() => return,
100 _ = tokio::time::sleep(delay) => {}
101 }
102 continue;
103 }
104 },
105 };
106
107 scheduler.record_success();
108 log::debug!(
109 "[{}] fetch completed in {:?}",
110 redact_secret(&self.id),
111 start.elapsed()
112 );
113
114 let env = Envelope::new(&self.id, payload);
115 match source_ctx.send(&tx, env, &cancel).await {
116 Ok(()) => {}
117 Err(SendStopped::Cancelled) => return,
118 Err(SendStopped::DownstreamClosed) => {
119 log::info!("[{}] downstream closed, stopping", redact_secret(&self.id));
120 return;
121 }
122 }
123
124 let elapsed = start.elapsed();
125 if elapsed > self.interval {
126 log::warn!(
127 "[{}] iteration took {:?}, exceeding interval {:?}",
128 redact_secret(&self.id),
129 elapsed,
130 self.interval,
131 );
132 }
133
134 tokio::select! {
135 _ = cancel.cancelled() => return,
136 _ = ticker.tick() => {}
137 }
138 }
139 }
140}
141
142async fn fetch(url: &str) -> anyhow::Result<Value> {
143 let resp = reqwest::get(url).await.map_err(|e| {
144 let e = e.without_url();
145 anyhow::anyhow!("HTTP request to {} failed: {e}", redact_secret(url))
146 })?;
147 if !resp.status().is_success() {
148 return Err(anyhow::anyhow!("HTTP error: {}", resp.status()));
149 }
150 resp.json::<Value>().await.map_err(|e| {
151 let e = e.without_url();
152 anyhow::anyhow!(
153 "HTTP response from {} was not valid JSON: {e}",
154 redact_secret(url)
155 )
156 })
157}
158
159#[derive(Debug, Deserialize)]
160struct ApiPollSourceConfig {
161 url: String,
162 interval_secs: u64,
163}
164
165pub fn api_poll_source_factory(
170 id: &str,
171 config: Value,
172 retry: Option<RetryPolicy>,
173) -> Result<Box<dyn Source>> {
174 let config: ApiPollSourceConfig = parse_config("api_poll", config)?;
175 reqwest::Url::parse(&config.url).with_context(|| {
176 format!(
177 "invalid config for component type 'api_poll': invalid url '{}'",
178 redact_secret(&config.url)
179 )
180 })?;
181 if config.interval_secs == 0 {
182 bail!("invalid config for component type 'api_poll': interval_secs must be greater than 0");
183 }
184 let mut source = ApiPollSource::new(id, config.url, Duration::from_secs(config.interval_secs));
185 if let Some(policy) = retry {
186 source = source.with_retry(policy);
187 }
188 Ok(Box::new(source))
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use serde_json::json;
195 use tokio::sync::mpsc;
196 use wiremock::matchers::{method, path};
197 use wiremock::{Mock, MockServer, ResponseTemplate};
198
199 fn closing_local_url(path: &str) -> String {
200 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
201 let addr = listener.local_addr().unwrap();
202 std::thread::spawn(move || {
203 if let Ok((stream, _)) = listener.accept() {
204 drop(stream);
205 }
206 });
207 format!("http://{addr}{path}")
208 }
209
210 #[test]
211 fn factory_rejects_invalid_url() {
212 let err = api_poll_source_factory(
213 "api",
214 json!({
215 "url": "not a url",
216 "interval_secs": 60
217 }),
218 None,
219 )
220 .err()
221 .expect("expected invalid URL to fail");
222 let msg = format!("{err:#}");
223 assert!(
224 msg.contains("invalid config for component type 'api_poll'"),
225 "{msg}"
226 );
227 assert!(msg.contains("invalid url"), "{msg}");
228 }
229
230 #[test]
231 fn factory_rejects_zero_interval() {
232 let err = api_poll_source_factory(
233 "api",
234 json!({
235 "url": "http://localhost/data",
236 "interval_secs": 0
237 }),
238 None,
239 )
240 .err()
241 .expect("expected zero interval to fail");
242 let msg = format!("{err:#}");
243 assert!(
244 msg.contains("interval_secs must be greater than 0"),
245 "{msg}"
246 );
247 }
248
249 #[tokio::test]
250 async fn fetch_errors_do_not_repeat_url_from_reqwest_error() {
251 let url = closing_local_url("/token-in-url");
252
253 let err = fetch(&url).await.expect_err("expected connection failure");
254 let msg = format!("{err:#}");
255 assert_eq!(msg.matches(&url).count(), 1, "{msg}");
256 }
257
258 #[tokio::test]
259 async fn emits_envelope_per_poll() {
260 let server = MockServer::start().await;
261 Mock::given(method("GET"))
262 .and(path("/data"))
263 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "v": 1 })))
264 .mount(&server)
265 .await;
266
267 let url = format!("{}/data", server.uri());
268 let source = ApiPollSource::new("api", url, Duration::from_millis(20));
269 let (tx, mut rx) = mpsc::channel(8);
270 let cancel = CancellationToken::new();
271
272 let c = cancel.clone();
273 let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
274
275 let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
276 .await
277 .expect("poll timed out")
278 .expect("source closed before emitting");
279
280 assert_eq!(env.meta.source_id, "api");
281 assert_eq!(env.payload, json!({ "v": 1 }));
282
283 cancel.cancel();
284 let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
285 }
286
287 #[tokio::test]
288 async fn recovers_from_transient_http_error() {
289 let server = MockServer::start().await;
290 Mock::given(method("GET"))
292 .and(path("/data"))
293 .respond_with(ResponseTemplate::new(500))
294 .up_to_n_times(1)
295 .mount(&server)
296 .await;
297 Mock::given(method("GET"))
298 .and(path("/data"))
299 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "ok": true })))
300 .mount(&server)
301 .await;
302
303 let url = format!("{}/data", server.uri());
304 let source = ApiPollSource::new("api", url, Duration::from_millis(20));
305 let (tx, mut rx) = mpsc::channel(8);
306 let cancel = CancellationToken::new();
307
308 let c = cancel.clone();
309 let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
310
311 let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
312 .await
313 .expect("poll timed out after retry")
314 .expect("source closed before emitting");
315 assert_eq!(env.payload, json!({ "ok": true }));
316
317 cancel.cancel();
318 let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
319 }
320
321 #[tokio::test]
322 async fn retry_backoff_recovers_faster_than_polling_interval() {
323 use crate::retry::{ExhaustedPolicy, RetryPolicy};
324 use std::time::Instant;
325
326 let server = MockServer::start().await;
331 Mock::given(method("GET"))
332 .and(path("/data"))
333 .respond_with(ResponseTemplate::new(500))
334 .up_to_n_times(1)
335 .mount(&server)
336 .await;
337 Mock::given(method("GET"))
338 .and(path("/data"))
339 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "ok": true })))
340 .mount(&server)
341 .await;
342
343 let url = format!("{}/data", server.uri());
344 let source =
345 ApiPollSource::new("api", url, Duration::from_secs(5)).with_retry(RetryPolicy {
346 max_attempts: 5,
347 initial_delay_ms: 20,
348 backoff_multiplier: 2.0,
349 max_delay_ms: 1_000,
350 on_exhausted: ExhaustedPolicy::Propagate,
351 });
352 let (tx, mut rx) = mpsc::channel(8);
353 let cancel = CancellationToken::new();
354
355 let c = cancel.clone();
356 let started = Instant::now();
357 let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
358
359 let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
360 .await
361 .expect("poll timed out — retry did not compress the cadence")
362 .expect("source closed before emitting");
363 assert_eq!(env.payload, json!({ "ok": true }));
364 assert!(
365 started.elapsed() < Duration::from_secs(2),
366 "recovery took {:?}, retry should have fired well under interval (5s)",
367 started.elapsed(),
368 );
369
370 cancel.cancel();
371 let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
372 }
373
374 #[tokio::test]
375 async fn retry_is_disregarded_when_polling_interval_is_smaller() {
376 use crate::retry::{ExhaustedPolicy, RetryPolicy};
377 use std::time::Instant;
378
379 let server = MockServer::start().await;
384 Mock::given(method("GET"))
385 .and(path("/data"))
386 .respond_with(ResponseTemplate::new(500))
387 .up_to_n_times(1)
388 .mount(&server)
389 .await;
390 Mock::given(method("GET"))
391 .and(path("/data"))
392 .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "ok": true })))
393 .mount(&server)
394 .await;
395
396 let url = format!("{}/data", server.uri());
397 let source =
398 ApiPollSource::new("api", url, Duration::from_millis(50)).with_retry(RetryPolicy {
399 max_attempts: 5,
400 initial_delay_ms: 5_000,
401 backoff_multiplier: 2.0,
402 max_delay_ms: 30_000,
403 on_exhausted: ExhaustedPolicy::Propagate,
404 });
405 let (tx, mut rx) = mpsc::channel(8);
406 let cancel = CancellationToken::new();
407
408 let c = cancel.clone();
409 let started = Instant::now();
410 let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
411
412 let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
413 .await
414 .expect("poll timed out — interval should override the larger backoff")
415 .expect("source closed before emitting");
416 assert_eq!(env.payload, json!({ "ok": true }));
417 assert!(
418 started.elapsed() < Duration::from_secs(2),
419 "recovery took {:?}, interval should have overridden the 5s backoff",
420 started.elapsed(),
421 );
422
423 cancel.cancel();
424 let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
425 }
426
427 #[tokio::test]
428 async fn stops_when_cancelled() {
429 let server = MockServer::start().await;
430 Mock::given(method("GET"))
431 .and(path("/data"))
432 .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
433 .mount(&server)
434 .await;
435
436 let url = format!("{}/data", server.uri());
437 let source = ApiPollSource::new("api", url, Duration::from_secs(60));
440 let (tx, _rx) = mpsc::channel(8);
441 let cancel = CancellationToken::new();
442
443 let c = cancel.clone();
444 let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
445
446 tokio::time::sleep(Duration::from_millis(50)).await;
447 cancel.cancel();
448
449 let res = tokio::time::timeout(Duration::from_secs(1), handle).await;
450 assert!(res.is_ok(), "source did not exit within 1s of cancel");
451 }
452}