1use anyhow::{Context, Result};
15use url::Url;
16
17use crate::datadog::client::DatadogClient;
18use crate::datadog::types::EventsResponse;
19
20pub const MAX_PAGE_LIMIT: usize = 1000;
22
23pub const HARD_CAP: usize = 10_000;
26
27#[derive(Debug, Default, Clone)]
34pub struct EventsListFilter {
35 pub query: Option<String>,
37 pub sources: Option<String>,
39 pub tags: Option<String>,
41}
42
43#[derive(Debug)]
45pub struct EventsApi<'a> {
46 client: &'a DatadogClient,
47}
48
49impl<'a> EventsApi<'a> {
50 #[must_use]
52 pub fn new(client: &'a DatadogClient) -> Self {
53 Self { client }
54 }
55
56 pub async fn list(
65 &self,
66 filter: &EventsListFilter,
67 from: &str,
68 to: &str,
69 limit: usize,
70 after: Option<&str>,
71 ) -> Result<EventsResponse> {
72 if limit > MAX_PAGE_LIMIT {
73 return Err(anyhow::anyhow!(
74 "`limit` must be <= {MAX_PAGE_LIMIT} (Datadog v2 events per-page cap; use `EventsApi::list_all` to auto-paginate across pages)"
75 ));
76 }
77 let url = build_list_url(self.client.base_url(), filter, from, to, limit, after)?;
78 let response = self.client.get_json(url.as_str()).await?;
79 if !response.status().is_success() {
80 return Err(DatadogClient::response_to_error(response).await.into());
81 }
82 response
83 .json::<EventsResponse>()
84 .await
85 .context("Failed to parse /api/v2/events response")
86 }
87
88 pub async fn list_all(
105 &self,
106 filter: &EventsListFilter,
107 from: &str,
108 to: &str,
109 limit: usize,
110 ) -> Result<EventsResponse> {
111 let cap = effective_cap(limit);
112 let mut acc: Option<EventsResponse> = None;
113 let mut cursor: Option<String> = None;
114 loop {
115 let collected = acc.as_ref().map_or(0, |r| r.data.len());
116 let remaining = cap - collected;
117 let page_size = remaining.min(MAX_PAGE_LIMIT);
118 let page = self
119 .list(filter, from, to, page_size, cursor.as_deref())
120 .await?;
121 let next_cursor = page
122 .meta
123 .as_ref()
124 .and_then(|m| m.page.as_ref())
125 .and_then(|p| p.after.clone());
126 match acc.as_mut() {
127 Some(existing) => {
128 existing.data.extend(page.data);
129 existing.meta = page.meta;
130 existing.links = page.links;
131 }
132 None => acc = Some(page),
133 }
134 let collected = acc.as_ref().map_or(0, |r| r.data.len());
135 if collected >= cap || next_cursor.is_none() {
136 break;
137 }
138 cursor = next_cursor;
139 }
140 let mut result = acc.unwrap_or_default();
141 result.data.truncate(cap);
142 Ok(result)
143 }
144}
145
146fn effective_cap(limit: usize) -> usize {
149 if limit == 0 {
150 HARD_CAP
151 } else {
152 limit.min(HARD_CAP)
153 }
154}
155
156fn build_list_url(
158 base_url: &str,
159 filter: &EventsListFilter,
160 from: &str,
161 to: &str,
162 limit: usize,
163 after: Option<&str>,
164) -> Result<Url> {
165 let mut url =
166 Url::parse(&format!("{base_url}/api/v2/events")).context("Invalid Datadog base URL")?;
167 {
168 let mut q = url.query_pairs_mut();
169 if let Some(query) = filter.query.as_deref() {
170 q.append_pair("filter[query]", query);
171 }
172 if let Some(sources) = filter.sources.as_deref() {
173 q.append_pair("filter[sources]", sources);
174 }
175 if let Some(tags) = filter.tags.as_deref() {
176 q.append_pair("filter[tags]", tags);
177 }
178 q.append_pair("filter[from]", from);
179 q.append_pair("filter[to]", to);
180 q.append_pair("page[limit]", &limit.to_string());
181 if let Some(cursor) = after {
182 q.append_pair("page[cursor]", cursor);
183 }
184 }
185 Ok(url)
186}
187
188#[cfg(test)]
189#[allow(clippy::unwrap_used, clippy::expect_used)]
190mod tests {
191 use super::*;
192
193 #[test]
196 fn effective_cap_zero_means_hard_cap() {
197 assert_eq!(effective_cap(0), HARD_CAP);
198 }
199
200 #[test]
201 fn effective_cap_clamps_to_hard_cap() {
202 assert_eq!(effective_cap(HARD_CAP + 5), HARD_CAP);
203 }
204
205 #[test]
206 fn effective_cap_passes_through_small_limits() {
207 assert_eq!(effective_cap(42), 42);
208 }
209
210 #[test]
213 fn build_list_url_appends_only_provided_filters() {
214 let filter = EventsListFilter {
215 query: Some("service:api".into()),
216 sources: None,
217 tags: None,
218 };
219 let url = build_list_url(
220 "https://api.datadoghq.com",
221 &filter,
222 "2026-04-22T09:00:00Z",
223 "2026-04-22T10:00:00Z",
224 50,
225 None,
226 )
227 .unwrap();
228 let qs = url.query().unwrap();
229 assert!(qs.contains("filter%5Bquery%5D=service%3Aapi"));
230 assert!(qs.contains("filter%5Bfrom%5D=2026-04-22T09%3A00%3A00Z"));
231 assert!(qs.contains("filter%5Bto%5D=2026-04-22T10%3A00%3A00Z"));
232 assert!(qs.contains("page%5Blimit%5D=50"));
233 assert!(!qs.contains("filter%5Bsources%5D"));
234 assert!(!qs.contains("filter%5Btags%5D"));
235 assert!(!qs.contains("page%5Bcursor%5D"));
236 }
237
238 #[test]
239 fn build_list_url_encodes_sources_and_tags() {
240 let filter = EventsListFilter {
241 query: None,
242 sources: Some("aws,kubernetes".into()),
243 tags: Some("env:prod,team:sre".into()),
244 };
245 let url = build_list_url(
246 "https://api.datadoghq.com",
247 &filter,
248 "2026-04-22T09:00:00Z",
249 "2026-04-22T10:00:00Z",
250 10,
251 None,
252 )
253 .unwrap();
254 let qs = url.query().unwrap();
255 assert!(qs.contains("filter%5Bsources%5D=aws%2Ckubernetes"));
256 assert!(qs.contains("filter%5Btags%5D=env%3Aprod%2Cteam%3Asre"));
257 }
258
259 #[test]
260 fn build_list_url_appends_cursor_when_provided() {
261 let url = build_list_url(
262 "https://api.datadoghq.com",
263 &EventsListFilter::default(),
264 "2026-04-22T09:00:00Z",
265 "2026-04-22T10:00:00Z",
266 10,
267 Some("tok-2"),
268 )
269 .unwrap();
270 let qs = url.query().unwrap();
271 assert!(qs.contains("page%5Bcursor%5D=tok-2"));
272 }
273
274 #[test]
275 fn build_list_url_rejects_invalid_base() {
276 let err = build_list_url(
277 "not a url",
278 &EventsListFilter::default(),
279 "2026-04-22T09:00:00Z",
280 "2026-04-22T10:00:00Z",
281 10,
282 None,
283 )
284 .unwrap_err();
285 assert!(err.to_string().contains("Invalid Datadog base URL"));
286 }
287
288 fn event_json(id: &str) -> serde_json::Value {
291 serde_json::json!({
292 "id": id,
293 "type": "event",
294 "attributes": {
295 "timestamp": "2026-04-22T10:00:00.000Z",
296 "title": "Deploy",
297 "source": "github",
298 "tags": ["env:prod"]
299 }
300 })
301 }
302
303 fn page_body(ids: &[&str], next_cursor: Option<&str>) -> serde_json::Value {
304 let data: Vec<serde_json::Value> = ids.iter().map(|id| event_json(id)).collect();
305 let meta = match next_cursor {
306 Some(c) => serde_json::json!({ "page": { "after": c }, "status": "done" }),
307 None => serde_json::json!({ "page": {}, "status": "done" }),
308 };
309 serde_json::json!({ "data": data, "meta": meta })
310 }
311
312 fn sample_body() -> serde_json::Value {
313 serde_json::json!({
314 "data": [event_json("EV1")],
315 "meta": {"page": {"after": "next"}, "status": "done"}
316 })
317 }
318
319 #[tokio::test]
322 async fn list_sends_filters_and_returns_parsed_response() {
323 let server = wiremock::MockServer::start().await;
324 wiremock::Mock::given(wiremock::matchers::method("GET"))
325 .and(wiremock::matchers::path("/api/v2/events"))
326 .and(wiremock::matchers::query_param(
327 "filter[query]",
328 "service:api",
329 ))
330 .and(wiremock::matchers::query_param(
331 "filter[from]",
332 "2026-04-22T09:00:00Z",
333 ))
334 .and(wiremock::matchers::query_param(
335 "filter[to]",
336 "2026-04-22T10:00:00Z",
337 ))
338 .and(wiremock::matchers::query_param("page[limit]", "10"))
339 .and(wiremock::matchers::header("DD-API-KEY", "api"))
340 .and(wiremock::matchers::header("DD-APPLICATION-KEY", "app"))
341 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_body()))
342 .expect(1)
343 .mount(&server)
344 .await;
345
346 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
347 let result = EventsApi::new(&client)
348 .list(
349 &EventsListFilter {
350 query: Some("service:api".into()),
351 sources: None,
352 tags: None,
353 },
354 "2026-04-22T09:00:00Z",
355 "2026-04-22T10:00:00Z",
356 10,
357 None,
358 )
359 .await
360 .unwrap();
361 assert_eq!(result.data.len(), 1);
362 assert_eq!(result.data[0].id, "EV1");
363 }
364
365 #[tokio::test]
366 async fn list_includes_cursor_in_query_when_after_is_some() {
367 let server = wiremock::MockServer::start().await;
368 wiremock::Mock::given(wiremock::matchers::method("GET"))
369 .and(wiremock::matchers::path("/api/v2/events"))
370 .and(wiremock::matchers::query_param("page[cursor]", "tok-2"))
371 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_body()))
372 .expect(1)
373 .mount(&server)
374 .await;
375
376 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
377 EventsApi::new(&client)
378 .list(
379 &EventsListFilter::default(),
380 "2026-04-22T09:00:00Z",
381 "2026-04-22T10:00:00Z",
382 10,
383 Some("tok-2"),
384 )
385 .await
386 .unwrap();
387 }
388
389 #[tokio::test]
392 async fn list_rejects_limit_above_max_page_limit_client_side() {
393 let client = DatadogClient::new("http://127.0.0.1:1", "api", "app").unwrap();
394 let err = EventsApi::new(&client)
395 .list(
396 &EventsListFilter::default(),
397 "2026-04-22T09:00:00Z",
398 "2026-04-22T10:00:00Z",
399 MAX_PAGE_LIMIT + 1,
400 None,
401 )
402 .await
403 .unwrap_err();
404 assert!(err.to_string().contains("limit"));
405 assert!(err.to_string().contains(&MAX_PAGE_LIMIT.to_string()));
406 assert!(err.to_string().contains("list_all"));
407 }
408
409 #[tokio::test]
410 async fn list_propagates_api_errors() {
411 let server = wiremock::MockServer::start().await;
412 wiremock::Mock::given(wiremock::matchers::method("GET"))
413 .and(wiremock::matchers::path("/api/v2/events"))
414 .respond_with(
415 wiremock::ResponseTemplate::new(403).set_body_string(r#"{"errors":["nope"]}"#),
416 )
417 .mount(&server)
418 .await;
419
420 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
421 let err = EventsApi::new(&client)
422 .list(
423 &EventsListFilter::default(),
424 "2026-04-22T09:00:00Z",
425 "2026-04-22T10:00:00Z",
426 10,
427 None,
428 )
429 .await
430 .unwrap_err();
431 let msg = err.to_string();
432 assert!(msg.contains("403"));
433 assert!(msg.contains("nope"));
434 }
435
436 #[tokio::test]
437 async fn list_propagates_invalid_base_url_error() {
438 let client = DatadogClient::new("not a url", "api", "app").unwrap();
439 let err = EventsApi::new(&client)
440 .list(
441 &EventsListFilter::default(),
442 "2026-04-22T09:00:00Z",
443 "2026-04-22T10:00:00Z",
444 10,
445 None,
446 )
447 .await
448 .unwrap_err();
449 assert!(err.to_string().contains("Invalid Datadog base URL"));
450 }
451
452 #[tokio::test]
453 async fn list_propagates_network_errors() {
454 let client = DatadogClient::new("http://127.0.0.1:1", "api", "app").unwrap();
455 let err = EventsApi::new(&client)
456 .list(
457 &EventsListFilter::default(),
458 "2026-04-22T09:00:00Z",
459 "2026-04-22T10:00:00Z",
460 10,
461 None,
462 )
463 .await
464 .unwrap_err();
465 assert!(err.to_string().contains("Failed to send"));
466 }
467
468 #[tokio::test]
469 async fn list_errors_on_malformed_response() {
470 let server = wiremock::MockServer::start().await;
471 wiremock::Mock::given(wiremock::matchers::method("GET"))
472 .and(wiremock::matchers::path("/api/v2/events"))
473 .respond_with(wiremock::ResponseTemplate::new(200).set_body_string("not json"))
474 .mount(&server)
475 .await;
476
477 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
478 let err = EventsApi::new(&client)
479 .list(
480 &EventsListFilter::default(),
481 "2026-04-22T09:00:00Z",
482 "2026-04-22T10:00:00Z",
483 10,
484 None,
485 )
486 .await
487 .unwrap_err();
488 assert!(err.to_string().contains("Failed to parse"));
489 }
490
491 #[tokio::test]
494 async fn list_all_single_page_when_response_has_no_cursor() {
495 let server = wiremock::MockServer::start().await;
496 wiremock::Mock::given(wiremock::matchers::method("GET"))
497 .and(wiremock::matchers::path("/api/v2/events"))
498 .and(wiremock::matchers::query_param("page[limit]", "100"))
499 .respond_with(
500 wiremock::ResponseTemplate::new(200).set_body_json(page_body(&["a", "b"], None)),
501 )
502 .expect(1)
503 .mount(&server)
504 .await;
505
506 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
507 let result = EventsApi::new(&client)
508 .list_all(
509 &EventsListFilter::default(),
510 "2026-04-22T09:00:00Z",
511 "2026-04-22T10:00:00Z",
512 100,
513 )
514 .await
515 .unwrap();
516 assert_eq!(result.data.len(), 2);
517 }
518
519 #[tokio::test]
520 async fn list_all_follows_cursor_until_no_more_pages() {
521 let server = wiremock::MockServer::start().await;
525 let limit_str = MAX_PAGE_LIMIT.to_string();
526 wiremock::Mock::given(wiremock::matchers::method("GET"))
527 .and(wiremock::matchers::path("/api/v2/events"))
528 .and(wiremock::matchers::query_param(
529 "page[limit]",
530 limit_str.as_str(),
531 ))
532 .and(wiremock::matchers::query_param_is_missing("page[cursor]"))
533 .respond_with(
534 wiremock::ResponseTemplate::new(200)
535 .set_body_json(page_body(&["a", "b"], Some("c1"))),
536 )
537 .expect(1)
538 .mount(&server)
539 .await;
540 wiremock::Mock::given(wiremock::matchers::method("GET"))
541 .and(wiremock::matchers::path("/api/v2/events"))
542 .and(wiremock::matchers::query_param("page[cursor]", "c1"))
543 .respond_with(
544 wiremock::ResponseTemplate::new(200).set_body_json(page_body(&["c"], None)),
545 )
546 .expect(1)
547 .mount(&server)
548 .await;
549
550 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
551 let result = EventsApi::new(&client)
552 .list_all(
553 &EventsListFilter::default(),
554 "2026-04-22T09:00:00Z",
555 "2026-04-22T10:00:00Z",
556 0,
557 )
558 .await
559 .unwrap();
560 let ids: Vec<&str> = result.data.iter().map(|e| e.id.as_str()).collect();
561 assert_eq!(ids, ["a", "b", "c"]);
562 assert!(result
563 .meta
564 .as_ref()
565 .and_then(|m| m.page.as_ref())
566 .and_then(|p| p.after.as_deref())
567 .is_none());
568 }
569
570 #[tokio::test]
571 async fn list_all_stops_at_explicit_limit_within_first_page() {
572 let server = wiremock::MockServer::start().await;
573 let ids = ["a", "b", "c"];
574 wiremock::Mock::given(wiremock::matchers::method("GET"))
575 .and(wiremock::matchers::path("/api/v2/events"))
576 .and(wiremock::matchers::query_param("page[limit]", "3"))
577 .respond_with(
578 wiremock::ResponseTemplate::new(200).set_body_json(page_body(&ids, Some("c1"))),
579 )
580 .expect(1)
581 .mount(&server)
582 .await;
583
584 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
585 let result = EventsApi::new(&client)
586 .list_all(
587 &EventsListFilter::default(),
588 "2026-04-22T09:00:00Z",
589 "2026-04-22T10:00:00Z",
590 3,
591 )
592 .await
593 .unwrap();
594 assert_eq!(result.data.len(), 3);
595 }
596
597 #[tokio::test]
598 async fn list_all_truncates_to_hard_cap_when_unbounded() {
599 let server = wiremock::MockServer::start().await;
600 let full_page: Vec<serde_json::Value> = (0..MAX_PAGE_LIMIT)
601 .map(|i| event_json(&format!("e{i}")))
602 .collect();
603 let body = serde_json::json!({
604 "data": full_page,
605 "meta": { "page": { "after": "always-more" } }
606 });
607 wiremock::Mock::given(wiremock::matchers::method("GET"))
608 .and(wiremock::matchers::path("/api/v2/events"))
609 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
610 .mount(&server)
611 .await;
612
613 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
614 let result = EventsApi::new(&client)
615 .list_all(
616 &EventsListFilter::default(),
617 "2026-04-22T09:00:00Z",
618 "2026-04-22T10:00:00Z",
619 0,
620 )
621 .await
622 .unwrap();
623 assert_eq!(result.data.len(), HARD_CAP);
624 }
625
626 #[tokio::test]
627 async fn list_all_propagates_api_errors_on_first_page() {
628 let server = wiremock::MockServer::start().await;
629 wiremock::Mock::given(wiremock::matchers::method("GET"))
630 .and(wiremock::matchers::path("/api/v2/events"))
631 .respond_with(
632 wiremock::ResponseTemplate::new(403).set_body_string(r#"{"errors":["nope"]}"#),
633 )
634 .mount(&server)
635 .await;
636
637 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
638 let err = EventsApi::new(&client)
639 .list_all(
640 &EventsListFilter::default(),
641 "2026-04-22T09:00:00Z",
642 "2026-04-22T10:00:00Z",
643 0,
644 )
645 .await
646 .unwrap_err();
647 let msg = err.to_string();
648 assert!(msg.contains("403"));
649 assert!(msg.contains("nope"));
650 }
651
652 #[tokio::test]
653 async fn list_all_caps_explicit_limit_at_hard_cap() {
654 let server = wiremock::MockServer::start().await;
655 let full_page: Vec<serde_json::Value> = (0..MAX_PAGE_LIMIT)
656 .map(|i| event_json(&format!("e{i}")))
657 .collect();
658 let body = serde_json::json!({
659 "data": full_page,
660 "meta": { "page": { "after": "always-more" } }
661 });
662 wiremock::Mock::given(wiremock::matchers::method("GET"))
663 .and(wiremock::matchers::path("/api/v2/events"))
664 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
665 .mount(&server)
666 .await;
667
668 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
669 let result = EventsApi::new(&client)
670 .list_all(
671 &EventsListFilter::default(),
672 "2026-04-22T09:00:00Z",
673 "2026-04-22T10:00:00Z",
674 HARD_CAP + 50,
675 )
676 .await
677 .unwrap();
678 assert_eq!(result.data.len(), HARD_CAP);
679 }
680}