Skip to main content

omni_dev/datadog/
events_api.rs

1//! Datadog Events API wrapper.
2//!
3//! Exposes a thin façade over [`DatadogClient`] for the read-only events
4//! stream (`GET /api/v2/events`).
5//!
6//! Datadog v2 events use cursor pagination via `meta.page.after`.
7//! [`EventsApi::list`] issues a single request optionally seeded with an
8//! `after` cursor token; [`EventsApi::list_all`] auto-paginates up to a
9//! caller-supplied limit (or [`HARD_CAP`] when the limit is `0`),
10//! mirroring [`MonitorsApi::list`].
11//!
12//! [`MonitorsApi::list`]: crate::datadog::monitors_api::MonitorsApi::list
13
14use anyhow::{Context, Result};
15use url::Url;
16
17use crate::datadog::client::DatadogClient;
18use crate::datadog::types::EventsResponse;
19
20/// Per-page upper bound enforced by Datadog's v2 events API.
21pub const MAX_PAGE_LIMIT: usize = 1000;
22
23/// Per-call upper bound on the number of events returned by
24/// [`EventsApi::list_all`], even when the caller passes `limit = 0`.
25pub const HARD_CAP: usize = 10_000;
26
27/// Filters accepted by `GET /api/v2/events`.
28///
29/// Each field is optional: the URL builder appends a query parameter
30/// only when the field is `Some(_)`. `from` / `to` are Unix epoch
31/// **seconds** — the `EventsApi::list` method converts them to RFC 3339
32/// before sending, matching the Datadog v2 API expectations.
33#[derive(Debug, Default, Clone)]
34pub struct EventsListFilter {
35    /// Datadog events query string (e.g. `service:api`).
36    pub query: Option<String>,
37    /// Comma-separated list of source names (e.g. `aws,kubernetes`).
38    pub sources: Option<String>,
39    /// Comma-separated list of `key:value` tags.
40    pub tags: Option<String>,
41}
42
43/// Events API façade.
44#[derive(Debug)]
45pub struct EventsApi<'a> {
46    client: &'a DatadogClient,
47}
48
49impl<'a> EventsApi<'a> {
50    /// Wraps an existing [`DatadogClient`] for events operations.
51    #[must_use]
52    pub fn new(client: &'a DatadogClient) -> Self {
53        Self { client }
54    }
55
56    /// Lists events matching `filter` between `from` and `to` (RFC 3339
57    /// strings) capped at `limit` per page.
58    ///
59    /// Single-page only. When `after` is `Some`, Datadog resumes
60    /// pagination at that cursor token (`page[cursor]` in the query
61    /// string). The next-page token is preserved on `meta.page.after`
62    /// of the response so callers (or [`EventsApi::list_all`]) can
63    /// iterate.
64    pub async fn list(
65        &self,
66        filter: &EventsListFilter,
67        from: &str,
68        to: &str,
69        limit: usize,
70        after: Option<&str>,
71    ) -> Result<EventsResponse> {
72        if limit > MAX_PAGE_LIMIT {
73            return Err(anyhow::anyhow!(
74                "`limit` must be <= {MAX_PAGE_LIMIT} (Datadog v2 events per-page cap; use `EventsApi::list_all` to auto-paginate across pages)"
75            ));
76        }
77        let url = build_list_url(self.client.base_url(), filter, from, to, limit, after)?;
78        let response = self.client.get_json(url.as_str()).await?;
79        if !response.status().is_success() {
80            return Err(DatadogClient::response_to_error(response).await.into());
81        }
82        response
83            .json::<EventsResponse>()
84            .await
85            .context("Failed to parse /api/v2/events response")
86    }
87
88    /// Lists events, auto-paginating via cursor as needed.
89    ///
90    /// `limit == 0` means "fetch every match up to [`HARD_CAP`]". Any
91    /// non-zero `limit` is upper-bounded by [`HARD_CAP`] to keep a single
92    /// invocation from issuing more than 10k items' worth of requests.
93    /// Per-request page size is clamped to [`MAX_PAGE_LIMIT`].
94    ///
95    /// Termination follows cursor-pagination semantics: the loop stops
96    /// when the response omits `meta.page.after` (Datadog signals "no
97    /// more pages" only via the absent cursor — a short page on its own
98    /// is *not* a terminator) or when `cap` items have been collected.
99    ///
100    /// The returned envelope keeps the `meta` and `links` blocks from the
101    /// *last* successful page so the response's cursor reflects the
102    /// iterator's final position (typically `None` when the API is
103    /// exhausted).
104    pub async fn list_all(
105        &self,
106        filter: &EventsListFilter,
107        from: &str,
108        to: &str,
109        limit: usize,
110    ) -> Result<EventsResponse> {
111        let cap = effective_cap(limit);
112        let mut acc: Option<EventsResponse> = None;
113        let mut cursor: Option<String> = None;
114        loop {
115            let collected = acc.as_ref().map_or(0, |r| r.data.len());
116            let remaining = cap - collected;
117            let page_size = remaining.min(MAX_PAGE_LIMIT);
118            let page = self
119                .list(filter, from, to, page_size, cursor.as_deref())
120                .await?;
121            let next_cursor = page
122                .meta
123                .as_ref()
124                .and_then(|m| m.page.as_ref())
125                .and_then(|p| p.after.clone());
126            match acc.as_mut() {
127                Some(existing) => {
128                    existing.data.extend(page.data);
129                    existing.meta = page.meta;
130                    existing.links = page.links;
131                }
132                None => acc = Some(page),
133            }
134            let collected = acc.as_ref().map_or(0, |r| r.data.len());
135            if collected >= cap || next_cursor.is_none() {
136                break;
137            }
138            cursor = next_cursor;
139        }
140        let mut result = acc.unwrap_or_default();
141        result.data.truncate(cap);
142        Ok(result)
143    }
144}
145
146/// Clamps a caller-supplied limit to [`HARD_CAP`], treating `0` as
147/// "fetch as many as the cap allows".
148fn effective_cap(limit: usize) -> usize {
149    if limit == 0 {
150        HARD_CAP
151    } else {
152        limit.min(HARD_CAP)
153    }
154}
155
156/// Builds `{base_url}/api/v2/events?filter[query]=…&filter[from]=…&filter[to]=…&page[limit]=N&page[cursor]=…`.
157fn build_list_url(
158    base_url: &str,
159    filter: &EventsListFilter,
160    from: &str,
161    to: &str,
162    limit: usize,
163    after: Option<&str>,
164) -> Result<Url> {
165    let mut url =
166        Url::parse(&format!("{base_url}/api/v2/events")).context("Invalid Datadog base URL")?;
167    {
168        let mut q = url.query_pairs_mut();
169        if let Some(query) = filter.query.as_deref() {
170            q.append_pair("filter[query]", query);
171        }
172        if let Some(sources) = filter.sources.as_deref() {
173            q.append_pair("filter[sources]", sources);
174        }
175        if let Some(tags) = filter.tags.as_deref() {
176            q.append_pair("filter[tags]", tags);
177        }
178        q.append_pair("filter[from]", from);
179        q.append_pair("filter[to]", to);
180        q.append_pair("page[limit]", &limit.to_string());
181        if let Some(cursor) = after {
182            q.append_pair("page[cursor]", cursor);
183        }
184    }
185    Ok(url)
186}
187
188#[cfg(test)]
189#[allow(clippy::unwrap_used, clippy::expect_used)]
190mod tests {
191    use super::*;
192
193    // ── effective_cap ──────────────────────────────────────────────
194
195    #[test]
196    fn effective_cap_zero_means_hard_cap() {
197        assert_eq!(effective_cap(0), HARD_CAP);
198    }
199
200    #[test]
201    fn effective_cap_clamps_to_hard_cap() {
202        assert_eq!(effective_cap(HARD_CAP + 5), HARD_CAP);
203    }
204
205    #[test]
206    fn effective_cap_passes_through_small_limits() {
207        assert_eq!(effective_cap(42), 42);
208    }
209
210    // ── URL builder ────────────────────────────────────────────────
211
212    #[test]
213    fn build_list_url_appends_only_provided_filters() {
214        let filter = EventsListFilter {
215            query: Some("service:api".into()),
216            sources: None,
217            tags: None,
218        };
219        let url = build_list_url(
220            "https://api.datadoghq.com",
221            &filter,
222            "2026-04-22T09:00:00Z",
223            "2026-04-22T10:00:00Z",
224            50,
225            None,
226        )
227        .unwrap();
228        let qs = url.query().unwrap();
229        assert!(qs.contains("filter%5Bquery%5D=service%3Aapi"));
230        assert!(qs.contains("filter%5Bfrom%5D=2026-04-22T09%3A00%3A00Z"));
231        assert!(qs.contains("filter%5Bto%5D=2026-04-22T10%3A00%3A00Z"));
232        assert!(qs.contains("page%5Blimit%5D=50"));
233        assert!(!qs.contains("filter%5Bsources%5D"));
234        assert!(!qs.contains("filter%5Btags%5D"));
235        assert!(!qs.contains("page%5Bcursor%5D"));
236    }
237
238    #[test]
239    fn build_list_url_encodes_sources_and_tags() {
240        let filter = EventsListFilter {
241            query: None,
242            sources: Some("aws,kubernetes".into()),
243            tags: Some("env:prod,team:sre".into()),
244        };
245        let url = build_list_url(
246            "https://api.datadoghq.com",
247            &filter,
248            "2026-04-22T09:00:00Z",
249            "2026-04-22T10:00:00Z",
250            10,
251            None,
252        )
253        .unwrap();
254        let qs = url.query().unwrap();
255        assert!(qs.contains("filter%5Bsources%5D=aws%2Ckubernetes"));
256        assert!(qs.contains("filter%5Btags%5D=env%3Aprod%2Cteam%3Asre"));
257    }
258
259    #[test]
260    fn build_list_url_appends_cursor_when_provided() {
261        let url = build_list_url(
262            "https://api.datadoghq.com",
263            &EventsListFilter::default(),
264            "2026-04-22T09:00:00Z",
265            "2026-04-22T10:00:00Z",
266            10,
267            Some("tok-2"),
268        )
269        .unwrap();
270        let qs = url.query().unwrap();
271        assert!(qs.contains("page%5Bcursor%5D=tok-2"));
272    }
273
274    #[test]
275    fn build_list_url_rejects_invalid_base() {
276        let err = build_list_url(
277            "not a url",
278            &EventsListFilter::default(),
279            "2026-04-22T09:00:00Z",
280            "2026-04-22T10:00:00Z",
281            10,
282            None,
283        )
284        .unwrap_err();
285        assert!(err.to_string().contains("Invalid Datadog base URL"));
286    }
287
288    // ── fixtures ───────────────────────────────────────────────────
289
290    fn event_json(id: &str) -> serde_json::Value {
291        serde_json::json!({
292            "id": id,
293            "type": "event",
294            "attributes": {
295                "timestamp": "2026-04-22T10:00:00.000Z",
296                "title": "Deploy",
297                "source": "github",
298                "tags": ["env:prod"]
299            }
300        })
301    }
302
303    fn page_body(ids: &[&str], next_cursor: Option<&str>) -> serde_json::Value {
304        let data: Vec<serde_json::Value> = ids.iter().map(|id| event_json(id)).collect();
305        let meta = match next_cursor {
306            Some(c) => serde_json::json!({ "page": { "after": c }, "status": "done" }),
307            None => serde_json::json!({ "page": {}, "status": "done" }),
308        };
309        serde_json::json!({ "data": data, "meta": meta })
310    }
311
312    fn sample_body() -> serde_json::Value {
313        serde_json::json!({
314            "data": [event_json("EV1")],
315            "meta": {"page": {"after": "next"}, "status": "done"}
316        })
317    }
318
319    // ── happy path ─────────────────────────────────────────────────
320
321    #[tokio::test]
322    async fn list_sends_filters_and_returns_parsed_response() {
323        let server = wiremock::MockServer::start().await;
324        wiremock::Mock::given(wiremock::matchers::method("GET"))
325            .and(wiremock::matchers::path("/api/v2/events"))
326            .and(wiremock::matchers::query_param(
327                "filter[query]",
328                "service:api",
329            ))
330            .and(wiremock::matchers::query_param(
331                "filter[from]",
332                "2026-04-22T09:00:00Z",
333            ))
334            .and(wiremock::matchers::query_param(
335                "filter[to]",
336                "2026-04-22T10:00:00Z",
337            ))
338            .and(wiremock::matchers::query_param("page[limit]", "10"))
339            .and(wiremock::matchers::header("DD-API-KEY", "api"))
340            .and(wiremock::matchers::header("DD-APPLICATION-KEY", "app"))
341            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_body()))
342            .expect(1)
343            .mount(&server)
344            .await;
345
346        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
347        let result = EventsApi::new(&client)
348            .list(
349                &EventsListFilter {
350                    query: Some("service:api".into()),
351                    sources: None,
352                    tags: None,
353                },
354                "2026-04-22T09:00:00Z",
355                "2026-04-22T10:00:00Z",
356                10,
357                None,
358            )
359            .await
360            .unwrap();
361        assert_eq!(result.data.len(), 1);
362        assert_eq!(result.data[0].id, "EV1");
363    }
364
365    #[tokio::test]
366    async fn list_includes_cursor_in_query_when_after_is_some() {
367        let server = wiremock::MockServer::start().await;
368        wiremock::Mock::given(wiremock::matchers::method("GET"))
369            .and(wiremock::matchers::path("/api/v2/events"))
370            .and(wiremock::matchers::query_param("page[cursor]", "tok-2"))
371            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_body()))
372            .expect(1)
373            .mount(&server)
374            .await;
375
376        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
377        EventsApi::new(&client)
378            .list(
379                &EventsListFilter::default(),
380                "2026-04-22T09:00:00Z",
381                "2026-04-22T10:00:00Z",
382                10,
383                Some("tok-2"),
384            )
385            .await
386            .unwrap();
387    }
388
389    // ── client-side / API errors ───────────────────────────────────
390
391    #[tokio::test]
392    async fn list_rejects_limit_above_max_page_limit_client_side() {
393        let client = DatadogClient::new("http://127.0.0.1:1", "api", "app").unwrap();
394        let err = EventsApi::new(&client)
395            .list(
396                &EventsListFilter::default(),
397                "2026-04-22T09:00:00Z",
398                "2026-04-22T10:00:00Z",
399                MAX_PAGE_LIMIT + 1,
400                None,
401            )
402            .await
403            .unwrap_err();
404        assert!(err.to_string().contains("limit"));
405        assert!(err.to_string().contains(&MAX_PAGE_LIMIT.to_string()));
406        assert!(err.to_string().contains("list_all"));
407    }
408
409    #[tokio::test]
410    async fn list_propagates_api_errors() {
411        let server = wiremock::MockServer::start().await;
412        wiremock::Mock::given(wiremock::matchers::method("GET"))
413            .and(wiremock::matchers::path("/api/v2/events"))
414            .respond_with(
415                wiremock::ResponseTemplate::new(403).set_body_string(r#"{"errors":["nope"]}"#),
416            )
417            .mount(&server)
418            .await;
419
420        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
421        let err = EventsApi::new(&client)
422            .list(
423                &EventsListFilter::default(),
424                "2026-04-22T09:00:00Z",
425                "2026-04-22T10:00:00Z",
426                10,
427                None,
428            )
429            .await
430            .unwrap_err();
431        let msg = err.to_string();
432        assert!(msg.contains("403"));
433        assert!(msg.contains("nope"));
434    }
435
436    #[tokio::test]
437    async fn list_propagates_invalid_base_url_error() {
438        let client = DatadogClient::new("not a url", "api", "app").unwrap();
439        let err = EventsApi::new(&client)
440            .list(
441                &EventsListFilter::default(),
442                "2026-04-22T09:00:00Z",
443                "2026-04-22T10:00:00Z",
444                10,
445                None,
446            )
447            .await
448            .unwrap_err();
449        assert!(err.to_string().contains("Invalid Datadog base URL"));
450    }
451
452    #[tokio::test]
453    async fn list_propagates_network_errors() {
454        let client = DatadogClient::new("http://127.0.0.1:1", "api", "app").unwrap();
455        let err = EventsApi::new(&client)
456            .list(
457                &EventsListFilter::default(),
458                "2026-04-22T09:00:00Z",
459                "2026-04-22T10:00:00Z",
460                10,
461                None,
462            )
463            .await
464            .unwrap_err();
465        assert!(err.to_string().contains("Failed to send"));
466    }
467
468    #[tokio::test]
469    async fn list_errors_on_malformed_response() {
470        let server = wiremock::MockServer::start().await;
471        wiremock::Mock::given(wiremock::matchers::method("GET"))
472            .and(wiremock::matchers::path("/api/v2/events"))
473            .respond_with(wiremock::ResponseTemplate::new(200).set_body_string("not json"))
474            .mount(&server)
475            .await;
476
477        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
478        let err = EventsApi::new(&client)
479            .list(
480                &EventsListFilter::default(),
481                "2026-04-22T09:00:00Z",
482                "2026-04-22T10:00:00Z",
483                10,
484                None,
485            )
486            .await
487            .unwrap_err();
488        assert!(err.to_string().contains("Failed to parse"));
489    }
490
491    // ── list_all ───────────────────────────────────────────────────
492
493    #[tokio::test]
494    async fn list_all_single_page_when_response_has_no_cursor() {
495        let server = wiremock::MockServer::start().await;
496        wiremock::Mock::given(wiremock::matchers::method("GET"))
497            .and(wiremock::matchers::path("/api/v2/events"))
498            .and(wiremock::matchers::query_param("page[limit]", "100"))
499            .respond_with(
500                wiremock::ResponseTemplate::new(200).set_body_json(page_body(&["a", "b"], None)),
501            )
502            .expect(1)
503            .mount(&server)
504            .await;
505
506        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
507        let result = EventsApi::new(&client)
508            .list_all(
509                &EventsListFilter::default(),
510                "2026-04-22T09:00:00Z",
511                "2026-04-22T10:00:00Z",
512                100,
513            )
514            .await
515            .unwrap();
516        assert_eq!(result.data.len(), 2);
517    }
518
519    #[tokio::test]
520    async fn list_all_follows_cursor_until_no_more_pages() {
521        // Page 1 returns 2 events + cursor "c1"; page 2 returns 1 event +
522        // no cursor. With `limit == 0`, the loop should issue both
523        // requests and concatenate their data.
524        let server = wiremock::MockServer::start().await;
525        let limit_str = MAX_PAGE_LIMIT.to_string();
526        wiremock::Mock::given(wiremock::matchers::method("GET"))
527            .and(wiremock::matchers::path("/api/v2/events"))
528            .and(wiremock::matchers::query_param(
529                "page[limit]",
530                limit_str.as_str(),
531            ))
532            .and(wiremock::matchers::query_param_is_missing("page[cursor]"))
533            .respond_with(
534                wiremock::ResponseTemplate::new(200)
535                    .set_body_json(page_body(&["a", "b"], Some("c1"))),
536            )
537            .expect(1)
538            .mount(&server)
539            .await;
540        wiremock::Mock::given(wiremock::matchers::method("GET"))
541            .and(wiremock::matchers::path("/api/v2/events"))
542            .and(wiremock::matchers::query_param("page[cursor]", "c1"))
543            .respond_with(
544                wiremock::ResponseTemplate::new(200).set_body_json(page_body(&["c"], None)),
545            )
546            .expect(1)
547            .mount(&server)
548            .await;
549
550        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
551        let result = EventsApi::new(&client)
552            .list_all(
553                &EventsListFilter::default(),
554                "2026-04-22T09:00:00Z",
555                "2026-04-22T10:00:00Z",
556                0,
557            )
558            .await
559            .unwrap();
560        let ids: Vec<&str> = result.data.iter().map(|e| e.id.as_str()).collect();
561        assert_eq!(ids, ["a", "b", "c"]);
562        assert!(result
563            .meta
564            .as_ref()
565            .and_then(|m| m.page.as_ref())
566            .and_then(|p| p.after.as_deref())
567            .is_none());
568    }
569
570    #[tokio::test]
571    async fn list_all_stops_at_explicit_limit_within_first_page() {
572        let server = wiremock::MockServer::start().await;
573        let ids = ["a", "b", "c"];
574        wiremock::Mock::given(wiremock::matchers::method("GET"))
575            .and(wiremock::matchers::path("/api/v2/events"))
576            .and(wiremock::matchers::query_param("page[limit]", "3"))
577            .respond_with(
578                wiremock::ResponseTemplate::new(200).set_body_json(page_body(&ids, Some("c1"))),
579            )
580            .expect(1)
581            .mount(&server)
582            .await;
583
584        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
585        let result = EventsApi::new(&client)
586            .list_all(
587                &EventsListFilter::default(),
588                "2026-04-22T09:00:00Z",
589                "2026-04-22T10:00:00Z",
590                3,
591            )
592            .await
593            .unwrap();
594        assert_eq!(result.data.len(), 3);
595    }
596
597    #[tokio::test]
598    async fn list_all_truncates_to_hard_cap_when_unbounded() {
599        let server = wiremock::MockServer::start().await;
600        let full_page: Vec<serde_json::Value> = (0..MAX_PAGE_LIMIT)
601            .map(|i| event_json(&format!("e{i}")))
602            .collect();
603        let body = serde_json::json!({
604            "data": full_page,
605            "meta": { "page": { "after": "always-more" } }
606        });
607        wiremock::Mock::given(wiremock::matchers::method("GET"))
608            .and(wiremock::matchers::path("/api/v2/events"))
609            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
610            .mount(&server)
611            .await;
612
613        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
614        let result = EventsApi::new(&client)
615            .list_all(
616                &EventsListFilter::default(),
617                "2026-04-22T09:00:00Z",
618                "2026-04-22T10:00:00Z",
619                0,
620            )
621            .await
622            .unwrap();
623        assert_eq!(result.data.len(), HARD_CAP);
624    }
625
626    #[tokio::test]
627    async fn list_all_propagates_api_errors_on_first_page() {
628        let server = wiremock::MockServer::start().await;
629        wiremock::Mock::given(wiremock::matchers::method("GET"))
630            .and(wiremock::matchers::path("/api/v2/events"))
631            .respond_with(
632                wiremock::ResponseTemplate::new(403).set_body_string(r#"{"errors":["nope"]}"#),
633            )
634            .mount(&server)
635            .await;
636
637        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
638        let err = EventsApi::new(&client)
639            .list_all(
640                &EventsListFilter::default(),
641                "2026-04-22T09:00:00Z",
642                "2026-04-22T10:00:00Z",
643                0,
644            )
645            .await
646            .unwrap_err();
647        let msg = err.to_string();
648        assert!(msg.contains("403"));
649        assert!(msg.contains("nope"));
650    }
651
652    #[tokio::test]
653    async fn list_all_caps_explicit_limit_at_hard_cap() {
654        let server = wiremock::MockServer::start().await;
655        let full_page: Vec<serde_json::Value> = (0..MAX_PAGE_LIMIT)
656            .map(|i| event_json(&format!("e{i}")))
657            .collect();
658        let body = serde_json::json!({
659            "data": full_page,
660            "meta": { "page": { "after": "always-more" } }
661        });
662        wiremock::Mock::given(wiremock::matchers::method("GET"))
663            .and(wiremock::matchers::path("/api/v2/events"))
664            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
665            .mount(&server)
666            .await;
667
668        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
669        let result = EventsApi::new(&client)
670            .list_all(
671                &EventsListFilter::default(),
672                "2026-04-22T09:00:00Z",
673                "2026-04-22T10:00:00Z",
674                HARD_CAP + 50,
675            )
676            .await
677            .unwrap();
678        assert_eq!(result.data.len(), HARD_CAP);
679    }
680}