Skip to main content

courier/sources/
api.rs

1use std::time::{Duration, Instant};
2
3use anyhow::{Context, Result, bail};
4use async_trait::async_trait;
5use serde::Deserialize;
6use serde_json::Value;
7use tokio::sync::mpsc::Sender;
8use tokio::time::{MissedTickBehavior, interval};
9use tokio_util::sync::CancellationToken;
10
11use crate::config::{parse_config, redact_secret};
12use crate::envelope::Envelope;
13use crate::observability::{NodeCtx, SendStopped, SourceCtx};
14use crate::retry::RetryPolicy;
15use crate::sources::Source;
16use crate::sources::retry::PollScheduler;
17
18/// Polls an HTTP endpoint at a fixed interval, emitting each JSON response
19/// body as the envelope payload. Logs a warning when an iteration exceeds
20/// the configured interval.
21///
22/// When `retry` is configured, consecutive fetch failures schedule the
23/// next attempt sooner than the normal cadence — see `PollScheduler` for
24/// the exact rule.
25pub struct ApiPollSource {
26    id: String,
27    url: String,
28    interval: Duration,
29    retry: Option<RetryPolicy>,
30    source_ctx: SourceCtx,
31}
32
33impl ApiPollSource {
34    pub fn new(id: impl Into<String>, url: impl Into<String>, poll_interval: Duration) -> Self {
35        let id = id.into();
36        Self {
37            source_ctx: SourceCtx::new(&id),
38            id,
39            url: url.into(),
40            interval: poll_interval,
41            retry: None,
42        }
43    }
44
45    pub fn with_retry(mut self, retry: RetryPolicy) -> Self {
46        self.retry = Some(retry);
47        self
48    }
49}
50
51#[async_trait]
52impl Source for ApiPollSource {
53    fn id(&self) -> &str {
54        &self.id
55    }
56
57    fn set_node_ctx(&mut self, ctx: NodeCtx) {
58        self.source_ctx = SourceCtx::from_node_ctx(ctx);
59    }
60
61    async fn run(self: Box<Self>, tx: Sender<Envelope>, cancel: CancellationToken) {
62        let mut scheduler = PollScheduler::new(self.interval, self.retry.clone());
63        let mut ticker = interval(self.interval);
64        ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
65        ticker.tick().await; // first tick completes immediately
66
67        log::info!(
68            "[{}] starting poll loop at {:?}",
69            redact_secret(&self.id),
70            self.interval
71        );
72        let source_ctx = self.source_ctx.clone();
73
74        loop {
75            let start = Instant::now();
76
77            let payload = tokio::select! {
78                _ = cancel.cancelled() => {
79                    log::info!("[{}] cancelled", redact_secret(&self.id));
80                    return;
81                }
82                result = fetch(&self.url) => match result {
83                    Ok(v) => v,
84                    Err(e) => {
85                        let delay = scheduler.record_failure();
86                        log::error!(
87                            "[{}] fetch failed (consecutive failures: {}), next attempt in {:?}: {e}",
88                            redact_secret(&self.id),
89                            scheduler.consecutive_failures(),
90                            delay,
91                        );
92                        // Backoff bypasses the ticker. If the retry finishes
93                        // before the original deadline, the next tick.tick()
94                        // still blocks until that deadline — original cadence
95                        // preserved. If the deadline has already passed,
96                        // MissedTickBehavior::Delay schedules the next tick
97                        // `interval` from now, so we don't burst-catch-up.
98                        tokio::select! {
99                            _ = cancel.cancelled() => return,
100                            _ = tokio::time::sleep(delay) => {}
101                        }
102                        continue;
103                    }
104                },
105            };
106
107            scheduler.record_success();
108            log::debug!(
109                "[{}] fetch completed in {:?}",
110                redact_secret(&self.id),
111                start.elapsed()
112            );
113
114            let env = Envelope::new(&self.id, payload);
115            match source_ctx.send(&tx, env, &cancel).await {
116                Ok(()) => {}
117                Err(SendStopped::Cancelled) => return,
118                Err(SendStopped::DownstreamClosed) => {
119                    log::info!("[{}] downstream closed, stopping", redact_secret(&self.id));
120                    return;
121                }
122            }
123
124            let elapsed = start.elapsed();
125            if elapsed > self.interval {
126                log::warn!(
127                    "[{}] iteration took {:?}, exceeding interval {:?}",
128                    redact_secret(&self.id),
129                    elapsed,
130                    self.interval,
131                );
132            }
133
134            tokio::select! {
135                _ = cancel.cancelled() => return,
136                _ = ticker.tick() => {}
137            }
138        }
139    }
140}
141
142async fn fetch(url: &str) -> anyhow::Result<Value> {
143    let resp = reqwest::get(url).await.map_err(|e| {
144        let e = e.without_url();
145        anyhow::anyhow!("HTTP request to {} failed: {e}", redact_secret(url))
146    })?;
147    if !resp.status().is_success() {
148        return Err(anyhow::anyhow!("HTTP error: {}", resp.status()));
149    }
150    resp.json::<Value>().await.map_err(|e| {
151        let e = e.without_url();
152        anyhow::anyhow!(
153            "HTTP response from {} was not valid JSON: {e}",
154            redact_secret(url)
155        )
156    })
157}
158
159#[derive(Debug, Deserialize)]
160struct ApiPollSourceConfig {
161    url: String,
162    interval_secs: u64,
163}
164
165/// Registry factory for [`ApiPollSource`]. Registered by
166/// `courier::registry::register_builtin` under kind `"api_poll"`. The
167/// optional `retry` policy is extracted by the registry and threaded into
168/// the source's `PollScheduler`.
169pub fn api_poll_source_factory(
170    id: &str,
171    config: Value,
172    retry: Option<RetryPolicy>,
173) -> Result<Box<dyn Source>> {
174    let config: ApiPollSourceConfig = parse_config("api_poll", config)?;
175    reqwest::Url::parse(&config.url).with_context(|| {
176        format!(
177            "invalid config for component type 'api_poll': invalid url '{}'",
178            redact_secret(&config.url)
179        )
180    })?;
181    if config.interval_secs == 0 {
182        bail!("invalid config for component type 'api_poll': interval_secs must be greater than 0");
183    }
184    let mut source = ApiPollSource::new(id, config.url, Duration::from_secs(config.interval_secs));
185    if let Some(policy) = retry {
186        source = source.with_retry(policy);
187    }
188    Ok(Box::new(source))
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use serde_json::json;
195    use tokio::sync::mpsc;
196    use wiremock::matchers::{method, path};
197    use wiremock::{Mock, MockServer, ResponseTemplate};
198
199    fn closing_local_url(path: &str) -> String {
200        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
201        let addr = listener.local_addr().unwrap();
202        std::thread::spawn(move || {
203            if let Ok((stream, _)) = listener.accept() {
204                drop(stream);
205            }
206        });
207        format!("http://{addr}{path}")
208    }
209
210    #[test]
211    fn factory_rejects_invalid_url() {
212        let err = api_poll_source_factory(
213            "api",
214            json!({
215                "url": "not a url",
216                "interval_secs": 60
217            }),
218            None,
219        )
220        .err()
221        .expect("expected invalid URL to fail");
222        let msg = format!("{err:#}");
223        assert!(
224            msg.contains("invalid config for component type 'api_poll'"),
225            "{msg}"
226        );
227        assert!(msg.contains("invalid url"), "{msg}");
228    }
229
230    #[test]
231    fn factory_rejects_zero_interval() {
232        let err = api_poll_source_factory(
233            "api",
234            json!({
235                "url": "http://localhost/data",
236                "interval_secs": 0
237            }),
238            None,
239        )
240        .err()
241        .expect("expected zero interval to fail");
242        let msg = format!("{err:#}");
243        assert!(
244            msg.contains("interval_secs must be greater than 0"),
245            "{msg}"
246        );
247    }
248
249    #[tokio::test]
250    async fn fetch_errors_do_not_repeat_url_from_reqwest_error() {
251        let url = closing_local_url("/token-in-url");
252
253        let err = fetch(&url).await.expect_err("expected connection failure");
254        let msg = format!("{err:#}");
255        assert_eq!(msg.matches(&url).count(), 1, "{msg}");
256    }
257
258    #[tokio::test]
259    async fn emits_envelope_per_poll() {
260        let server = MockServer::start().await;
261        Mock::given(method("GET"))
262            .and(path("/data"))
263            .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "v": 1 })))
264            .mount(&server)
265            .await;
266
267        let url = format!("{}/data", server.uri());
268        let source = ApiPollSource::new("api", url, Duration::from_millis(20));
269        let (tx, mut rx) = mpsc::channel(8);
270        let cancel = CancellationToken::new();
271
272        let c = cancel.clone();
273        let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
274
275        let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
276            .await
277            .expect("poll timed out")
278            .expect("source closed before emitting");
279
280        assert_eq!(env.meta.source_id, "api");
281        assert_eq!(env.payload, json!({ "v": 1 }));
282
283        cancel.cancel();
284        let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
285    }
286
287    #[tokio::test]
288    async fn recovers_from_transient_http_error() {
289        let server = MockServer::start().await;
290        // First response is a 500; subsequent requests succeed.
291        Mock::given(method("GET"))
292            .and(path("/data"))
293            .respond_with(ResponseTemplate::new(500))
294            .up_to_n_times(1)
295            .mount(&server)
296            .await;
297        Mock::given(method("GET"))
298            .and(path("/data"))
299            .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "ok": true })))
300            .mount(&server)
301            .await;
302
303        let url = format!("{}/data", server.uri());
304        let source = ApiPollSource::new("api", url, Duration::from_millis(20));
305        let (tx, mut rx) = mpsc::channel(8);
306        let cancel = CancellationToken::new();
307
308        let c = cancel.clone();
309        let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
310
311        let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
312            .await
313            .expect("poll timed out after retry")
314            .expect("source closed before emitting");
315        assert_eq!(env.payload, json!({ "ok": true }));
316
317        cancel.cancel();
318        let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
319    }
320
321    #[tokio::test]
322    async fn retry_backoff_recovers_faster_than_polling_interval() {
323        use crate::retry::{ExhaustedPolicy, RetryPolicy};
324        use std::time::Instant;
325
326        // Interval is 5s — without retry, recovery from a transient 500 would
327        // wait the full 5s. With retry (initial 20ms < interval), the next
328        // attempt fires far sooner. This is the whole point of the policy:
329        // retry can compress the cadence below interval to recover quickly.
330        let server = MockServer::start().await;
331        Mock::given(method("GET"))
332            .and(path("/data"))
333            .respond_with(ResponseTemplate::new(500))
334            .up_to_n_times(1)
335            .mount(&server)
336            .await;
337        Mock::given(method("GET"))
338            .and(path("/data"))
339            .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "ok": true })))
340            .mount(&server)
341            .await;
342
343        let url = format!("{}/data", server.uri());
344        let source =
345            ApiPollSource::new("api", url, Duration::from_secs(5)).with_retry(RetryPolicy {
346                max_attempts: 5,
347                initial_delay_ms: 20,
348                backoff_multiplier: 2.0,
349                max_delay_ms: 1_000,
350                on_exhausted: ExhaustedPolicy::Propagate,
351            });
352        let (tx, mut rx) = mpsc::channel(8);
353        let cancel = CancellationToken::new();
354
355        let c = cancel.clone();
356        let started = Instant::now();
357        let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
358
359        let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
360            .await
361            .expect("poll timed out — retry did not compress the cadence")
362            .expect("source closed before emitting");
363        assert_eq!(env.payload, json!({ "ok": true }));
364        assert!(
365            started.elapsed() < Duration::from_secs(2),
366            "recovery took {:?}, retry should have fired well under interval (5s)",
367            started.elapsed(),
368        );
369
370        cancel.cancel();
371        let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
372    }
373
374    #[tokio::test]
375    async fn retry_is_disregarded_when_polling_interval_is_smaller() {
376        use crate::retry::{ExhaustedPolicy, RetryPolicy};
377        use std::time::Instant;
378
379        // Interval (50ms) is shorter than initial backoff (5s). The policy
380        // says: when interval < retry, disregard retry — i.e. the next
381        // attempt fires at the polling cadence, not after the longer
382        // backoff. Recovery completes well under the backoff window.
383        let server = MockServer::start().await;
384        Mock::given(method("GET"))
385            .and(path("/data"))
386            .respond_with(ResponseTemplate::new(500))
387            .up_to_n_times(1)
388            .mount(&server)
389            .await;
390        Mock::given(method("GET"))
391            .and(path("/data"))
392            .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "ok": true })))
393            .mount(&server)
394            .await;
395
396        let url = format!("{}/data", server.uri());
397        let source =
398            ApiPollSource::new("api", url, Duration::from_millis(50)).with_retry(RetryPolicy {
399                max_attempts: 5,
400                initial_delay_ms: 5_000,
401                backoff_multiplier: 2.0,
402                max_delay_ms: 30_000,
403                on_exhausted: ExhaustedPolicy::Propagate,
404            });
405        let (tx, mut rx) = mpsc::channel(8);
406        let cancel = CancellationToken::new();
407
408        let c = cancel.clone();
409        let started = Instant::now();
410        let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
411
412        let env = tokio::time::timeout(Duration::from_secs(2), rx.recv())
413            .await
414            .expect("poll timed out — interval should override the larger backoff")
415            .expect("source closed before emitting");
416        assert_eq!(env.payload, json!({ "ok": true }));
417        assert!(
418            started.elapsed() < Duration::from_secs(2),
419            "recovery took {:?}, interval should have overridden the 5s backoff",
420            started.elapsed(),
421        );
422
423        cancel.cancel();
424        let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
425    }
426
427    #[tokio::test]
428    async fn stops_when_cancelled() {
429        let server = MockServer::start().await;
430        Mock::given(method("GET"))
431            .and(path("/data"))
432            .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
433            .mount(&server)
434            .await;
435
436        let url = format!("{}/data", server.uri());
437        // Long interval: after the first tick the source sits in `wait_next`
438        // until either the interval elapses or cancel fires.
439        let source = ApiPollSource::new("api", url, Duration::from_secs(60));
440        let (tx, _rx) = mpsc::channel(8);
441        let cancel = CancellationToken::new();
442
443        let c = cancel.clone();
444        let handle = tokio::spawn(async move { Box::new(source).run(tx, c).await });
445
446        tokio::time::sleep(Duration::from_millis(50)).await;
447        cancel.cancel();
448
449        let res = tokio::time::timeout(Duration::from_secs(1), handle).await;
450        assert!(res.is_ok(), "source did not exit within 1s of cancel");
451    }
452}