Skip to main content

omni_dev/datadog/
logs_api.rs

1//! Datadog Logs API wrapper.
2//!
3//! Exposes a thin façade over [`DatadogClient`] for the v2 logs search
4//! endpoint. Datadog v2 logs search uses **cursor pagination**
5//! (`meta.page.after`), not offset. [`LogsApi::search`] issues a single
6//! request optionally seeded with an `after` cursor token;
7//! [`LogsApi::search_all`] auto-paginates up to a caller-supplied limit
8//! (or [`HARD_CAP`] when the limit is `0`), mirroring [`MonitorsApi::list`].
9//!
10//! [`MonitorsApi::list`]: crate::datadog::monitors_api::MonitorsApi::list
11
12use anyhow::{Context, Result};
13use serde::Serialize;
14
15use crate::datadog::client::DatadogClient;
16use crate::datadog::types::{LogSearchResult, SortOrder};
17
18/// Maximum page size accepted by `POST /api/v2/logs/events/search`.
19///
20/// Datadog rejects page sizes above 1000; the API client surfaces a
21/// clearer error than the server's HTTP 400 by validating before the
22/// request is sent.
23pub const MAX_PAGE_LIMIT: usize = 1000;
24
25/// Per-call upper bound on the number of log events returned by
26/// [`LogsApi::search_all`], even when the caller passes `limit = 0`.
27pub const HARD_CAP: usize = 10_000;
28
29/// Logs API façade.
30#[derive(Debug)]
31pub struct LogsApi<'a> {
32    client: &'a DatadogClient,
33}
34
35impl<'a> LogsApi<'a> {
36    /// Wraps an existing [`DatadogClient`] for log operations.
37    #[must_use]
38    pub fn new(client: &'a DatadogClient) -> Self {
39        Self { client }
40    }
41
42    /// Searches log events.
43    ///
44    /// `from` and `to` are passed through to Datadog as strings. Datadog
45    /// accepts ISO 8601 timestamps, epoch milliseconds, and relative
46    /// shorthand like `now-15m` / `now`; callers are expected to convert
47    /// CLI-level inputs into a form Datadog understands before calling.
48    ///
49    /// Returns a single page only. When `after` is `Some`, Datadog
50    /// resumes pagination at that cursor token (`page.cursor` in the
51    /// request body). The next-page token is preserved on
52    /// `meta.page.after` of the response so the caller (or
53    /// [`LogsApi::search_all`]) can iterate.
54    ///
55    /// `limit` is rejected client-side when it exceeds [`MAX_PAGE_LIMIT`].
56    pub async fn search(
57        &self,
58        query: &str,
59        from: &str,
60        to: &str,
61        limit: usize,
62        sort: SortOrder,
63        after: Option<&str>,
64    ) -> Result<LogSearchResult> {
65        if limit > MAX_PAGE_LIMIT {
66            return Err(anyhow::anyhow!(
67                "`limit` must be <= {MAX_PAGE_LIMIT} (Datadog v2 logs search per-page cap; use `LogsApi::search_all` to auto-paginate across pages)"
68            ));
69        }
70        let body = SearchRequest {
71            filter: Filter { query, from, to },
72            page: Page {
73                limit,
74                cursor: after,
75            },
76            sort,
77        };
78        let url = format!("{}/api/v2/logs/events/search", self.client.base_url());
79        let response = self.client.post_json(&url, &body).await?;
80        if !response.status().is_success() {
81            return Err(DatadogClient::response_to_error(response).await.into());
82        }
83        response
84            .json::<LogSearchResult>()
85            .await
86            .context("Failed to parse /api/v2/logs/events/search response")
87    }
88
89    /// Searches log events, auto-paginating via cursor as needed.
90    ///
91    /// `limit == 0` means "fetch every match up to [`HARD_CAP`]". Any
92    /// non-zero `limit` is upper-bounded by [`HARD_CAP`] to keep a single
93    /// invocation from issuing more than 10k items' worth of requests.
94    /// Per-request page size is clamped to [`MAX_PAGE_LIMIT`].
95    ///
96    /// Termination follows cursor-pagination semantics: the loop stops
97    /// when the response omits `meta.page.after` (Datadog signals "no
98    /// more pages" only via the absent cursor — a short page on its own
99    /// is *not* a terminator) or when `cap` items have been collected.
100    ///
101    /// The returned envelope keeps the `meta` block from the *last*
102    /// successful page so the response's cursor reflects the iterator's
103    /// final position (typically `None` when the API is exhausted).
104    pub async fn search_all(
105        &self,
106        query: &str,
107        from: &str,
108        to: &str,
109        limit: usize,
110        sort: SortOrder,
111    ) -> Result<LogSearchResult> {
112        let cap = effective_cap(limit);
113        let mut acc: Option<LogSearchResult> = None;
114        let mut cursor: Option<String> = None;
115        loop {
116            let collected = acc.as_ref().map_or(0, |r| r.data.len());
117            let remaining = cap - collected;
118            let page_size = remaining.min(MAX_PAGE_LIMIT);
119            let page = self
120                .search(query, from, to, page_size, sort, cursor.as_deref())
121                .await?;
122            let next_cursor = page
123                .meta
124                .as_ref()
125                .and_then(|m| m.page.as_ref())
126                .and_then(|p| p.after.clone());
127            match acc.as_mut() {
128                Some(existing) => {
129                    existing.data.extend(page.data);
130                    existing.meta = page.meta;
131                }
132                None => acc = Some(page),
133            }
134            let collected = acc.as_ref().map_or(0, |r| r.data.len());
135            if collected >= cap || next_cursor.is_none() {
136                break;
137            }
138            cursor = next_cursor;
139        }
140        let mut result = acc.unwrap_or_default();
141        result.data.truncate(cap);
142        Ok(result)
143    }
144}
145
146/// Clamps a caller-supplied limit to [`HARD_CAP`], treating `0` as
147/// "fetch as many as the cap allows".
148fn effective_cap(limit: usize) -> usize {
149    if limit == 0 {
150        HARD_CAP
151    } else {
152        limit.min(HARD_CAP)
153    }
154}
155
156#[derive(Debug, Serialize)]
157struct SearchRequest<'a> {
158    filter: Filter<'a>,
159    page: Page<'a>,
160    sort: SortOrder,
161}
162
163#[derive(Debug, Serialize)]
164struct Filter<'a> {
165    query: &'a str,
166    from: &'a str,
167    to: &'a str,
168}
169
170#[derive(Debug, Serialize)]
171struct Page<'a> {
172    limit: usize,
173    #[serde(skip_serializing_if = "Option::is_none")]
174    cursor: Option<&'a str>,
175}
176
177#[cfg(test)]
178#[allow(clippy::unwrap_used, clippy::expect_used)]
179mod tests {
180    use super::*;
181
182    fn sample_search_body() -> serde_json::Value {
183        serde_json::json!({
184            "data": [
185                {
186                    "id": "AAAA",
187                    "type": "log",
188                    "attributes": {
189                        "timestamp": "2026-04-22T10:00:00.000Z",
190                        "service": "api",
191                        "status": "info",
192                        "message": "hello",
193                        "tags": ["env:prod"]
194                    }
195                }
196            ],
197            "meta": {
198                "page": { "after": "next-cursor" },
199                "status": "done",
200                "elapsed": 12
201            }
202        })
203    }
204
205    fn log_event_json(id: &str) -> serde_json::Value {
206        serde_json::json!({
207            "id": id,
208            "type": "log",
209            "attributes": {
210                "timestamp": "2026-04-22T10:00:00.000Z",
211                "service": "api",
212                "status": "info",
213                "message": id,
214                "tags": []
215            }
216        })
217    }
218
219    fn page_body(ids: &[&str], next_cursor: Option<&str>) -> serde_json::Value {
220        let data: Vec<serde_json::Value> = ids.iter().map(|id| log_event_json(id)).collect();
221        let meta = match next_cursor {
222            Some(c) => serde_json::json!({ "page": { "after": c }, "status": "done" }),
223            None => serde_json::json!({ "page": {}, "status": "done" }),
224        };
225        serde_json::json!({ "data": data, "meta": meta })
226    }
227
228    // ── effective_cap ──────────────────────────────────────────────
229
230    #[test]
231    fn effective_cap_zero_means_hard_cap() {
232        assert_eq!(effective_cap(0), HARD_CAP);
233    }
234
235    #[test]
236    fn effective_cap_clamps_to_hard_cap() {
237        assert_eq!(effective_cap(HARD_CAP + 5), HARD_CAP);
238    }
239
240    #[test]
241    fn effective_cap_passes_through_small_limits() {
242        assert_eq!(effective_cap(42), 42);
243    }
244
245    // ── search ─────────────────────────────────────────────────────
246
247    #[tokio::test]
248    async fn search_posts_exact_body_shape_and_parses_response() {
249        let server = wiremock::MockServer::start().await;
250        wiremock::Mock::given(wiremock::matchers::method("POST"))
251            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
252            .and(wiremock::matchers::header("DD-API-KEY", "api"))
253            .and(wiremock::matchers::header("DD-APPLICATION-KEY", "app"))
254            .and(wiremock::matchers::header(
255                "Content-Type",
256                "application/json",
257            ))
258            .and(wiremock::matchers::body_json(serde_json::json!({
259                "filter": {
260                    "query": "service:api status:error",
261                    "from": "now-15m",
262                    "to": "now"
263                },
264                "page": { "limit": 100 },
265                "sort": "-timestamp"
266            })))
267            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_search_body()))
268            .expect(1)
269            .mount(&server)
270            .await;
271
272        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
273        let result = LogsApi::new(&client)
274            .search(
275                "service:api status:error",
276                "now-15m",
277                "now",
278                100,
279                SortOrder::TimestampDesc,
280                None,
281            )
282            .await
283            .unwrap();
284        assert_eq!(result.data.len(), 1);
285        assert_eq!(result.data[0].id, "AAAA");
286        assert_eq!(
287            result
288                .meta
289                .as_ref()
290                .and_then(|m| m.page.as_ref())
291                .and_then(|p| p.after.as_deref()),
292            Some("next-cursor")
293        );
294    }
295
296    #[tokio::test]
297    async fn search_includes_cursor_in_body_when_after_is_some() {
298        let server = wiremock::MockServer::start().await;
299        wiremock::Mock::given(wiremock::matchers::method("POST"))
300            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
301            .and(wiremock::matchers::body_json(serde_json::json!({
302                "filter": { "query": "*", "from": "now-1h", "to": "now" },
303                "page": { "limit": 50, "cursor": "tok-2" },
304                "sort": "-timestamp"
305            })))
306            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_search_body()))
307            .expect(1)
308            .mount(&server)
309            .await;
310
311        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
312        LogsApi::new(&client)
313            .search(
314                "*",
315                "now-1h",
316                "now",
317                50,
318                SortOrder::TimestampDesc,
319                Some("tok-2"),
320            )
321            .await
322            .unwrap();
323    }
324
325    #[tokio::test]
326    async fn search_serializes_ascending_sort_without_minus_prefix() {
327        let server = wiremock::MockServer::start().await;
328        wiremock::Mock::given(wiremock::matchers::method("POST"))
329            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
330            .and(wiremock::matchers::body_json(serde_json::json!({
331                "filter": { "query": "*", "from": "now-1h", "to": "now" },
332                "page": { "limit": 50 },
333                "sort": "timestamp"
334            })))
335            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_search_body()))
336            .expect(1)
337            .mount(&server)
338            .await;
339
340        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
341        LogsApi::new(&client)
342            .search("*", "now-1h", "now", 50, SortOrder::TimestampAsc, None)
343            .await
344            .unwrap();
345    }
346
347    #[tokio::test]
348    async fn search_rejects_limit_above_max_page_limit_client_side() {
349        let client = DatadogClient::new("http://127.0.0.1:1", "api", "app").unwrap();
350        let err = LogsApi::new(&client)
351            .search(
352                "*",
353                "now-1h",
354                "now",
355                MAX_PAGE_LIMIT + 1,
356                SortOrder::TimestampDesc,
357                None,
358            )
359            .await
360            .unwrap_err();
361        let msg = err.to_string();
362        assert!(msg.contains("limit"));
363        assert!(msg.contains("1000"));
364        assert!(msg.contains("search_all"));
365    }
366
367    #[tokio::test]
368    async fn search_accepts_limit_at_max_page_limit_boundary() {
369        let server = wiremock::MockServer::start().await;
370        wiremock::Mock::given(wiremock::matchers::method("POST"))
371            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
372            .and(wiremock::matchers::body_json(serde_json::json!({
373                "filter": { "query": "*", "from": "now-1h", "to": "now" },
374                "page": { "limit": MAX_PAGE_LIMIT },
375                "sort": "-timestamp"
376            })))
377            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_search_body()))
378            .expect(1)
379            .mount(&server)
380            .await;
381
382        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
383        LogsApi::new(&client)
384            .search(
385                "*",
386                "now-1h",
387                "now",
388                MAX_PAGE_LIMIT,
389                SortOrder::TimestampDesc,
390                None,
391            )
392            .await
393            .unwrap();
394    }
395
396    #[tokio::test]
397    async fn search_propagates_api_errors() {
398        let server = wiremock::MockServer::start().await;
399        wiremock::Mock::given(wiremock::matchers::method("POST"))
400            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
401            .respond_with(
402                wiremock::ResponseTemplate::new(400).set_body_string(r#"{"errors":["bad query"]}"#),
403            )
404            .mount(&server)
405            .await;
406
407        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
408        let err = LogsApi::new(&client)
409            .search("???", "now-1h", "now", 10, SortOrder::TimestampDesc, None)
410            .await
411            .unwrap_err();
412        let msg = err.to_string();
413        assert!(msg.contains("400"));
414        assert!(msg.contains("bad query"));
415    }
416
417    #[tokio::test]
418    async fn search_propagates_network_errors() {
419        let client = DatadogClient::new("http://127.0.0.1:1", "api", "app").unwrap();
420        let err = LogsApi::new(&client)
421            .search("*", "now-1h", "now", 10, SortOrder::TimestampDesc, None)
422            .await
423            .unwrap_err();
424        assert!(err.to_string().contains("Failed to send"));
425    }
426
427    #[tokio::test]
428    async fn search_errors_on_malformed_response() {
429        let server = wiremock::MockServer::start().await;
430        wiremock::Mock::given(wiremock::matchers::method("POST"))
431            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
432            .respond_with(wiremock::ResponseTemplate::new(200).set_body_string("not json"))
433            .mount(&server)
434            .await;
435
436        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
437        let err = LogsApi::new(&client)
438            .search("*", "now-1h", "now", 10, SortOrder::TimestampDesc, None)
439            .await
440            .unwrap_err();
441        assert!(err.to_string().contains("Failed to parse"));
442    }
443
444    // ── search_all ─────────────────────────────────────────────────
445
446    #[tokio::test]
447    async fn search_all_single_page_when_response_has_no_cursor() {
448        let server = wiremock::MockServer::start().await;
449        wiremock::Mock::given(wiremock::matchers::method("POST"))
450            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
451            .and(wiremock::matchers::body_json(serde_json::json!({
452                "filter": { "query": "*", "from": "now-1h", "to": "now" },
453                "page": { "limit": 100 },
454                "sort": "-timestamp"
455            })))
456            .respond_with(
457                wiremock::ResponseTemplate::new(200).set_body_json(page_body(&["a", "b"], None)),
458            )
459            .expect(1)
460            .mount(&server)
461            .await;
462
463        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
464        let result = LogsApi::new(&client)
465            .search_all("*", "now-1h", "now", 100, SortOrder::TimestampDesc)
466            .await
467            .unwrap();
468        assert_eq!(result.data.len(), 2);
469    }
470
471    #[tokio::test]
472    async fn search_all_follows_cursor_until_no_more_pages() {
473        // Page 1 returns 2 items + cursor "c1", page 2 returns 1 item + no
474        // cursor. With `limit == 0`, the loop should issue both requests
475        // and concatenate their data.
476        let server = wiremock::MockServer::start().await;
477        wiremock::Mock::given(wiremock::matchers::method("POST"))
478            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
479            .and(wiremock::matchers::body_json(serde_json::json!({
480                "filter": { "query": "*", "from": "now-1h", "to": "now" },
481                "page": { "limit": MAX_PAGE_LIMIT },
482                "sort": "-timestamp"
483            })))
484            .respond_with(
485                wiremock::ResponseTemplate::new(200)
486                    .set_body_json(page_body(&["a", "b"], Some("c1"))),
487            )
488            .expect(1)
489            .mount(&server)
490            .await;
491        wiremock::Mock::given(wiremock::matchers::method("POST"))
492            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
493            .and(wiremock::matchers::body_json(serde_json::json!({
494                "filter": { "query": "*", "from": "now-1h", "to": "now" },
495                "page": { "limit": MAX_PAGE_LIMIT, "cursor": "c1" },
496                "sort": "-timestamp"
497            })))
498            .respond_with(
499                wiremock::ResponseTemplate::new(200).set_body_json(page_body(&["c"], None)),
500            )
501            .expect(1)
502            .mount(&server)
503            .await;
504
505        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
506        let result = LogsApi::new(&client)
507            .search_all("*", "now-1h", "now", 0, SortOrder::TimestampDesc)
508            .await
509            .unwrap();
510        let ids: Vec<&str> = result.data.iter().map(|e| e.id.as_str()).collect();
511        assert_eq!(ids, ["a", "b", "c"]);
512        // The final response had no cursor — meta.page.after is None.
513        assert!(result
514            .meta
515            .as_ref()
516            .and_then(|m| m.page.as_ref())
517            .and_then(|p| p.after.as_deref())
518            .is_none());
519    }
520
521    #[tokio::test]
522    async fn search_all_stops_at_explicit_limit_within_first_page() {
523        // limit=5 → page_size=5; the API returns exactly 5 (a "short
524        // page" by user's request) so we stop without a second call.
525        let server = wiremock::MockServer::start().await;
526        let ids = ["a", "b", "c", "d", "e"];
527        wiremock::Mock::given(wiremock::matchers::method("POST"))
528            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
529            .and(wiremock::matchers::body_json(serde_json::json!({
530                "filter": { "query": "*", "from": "now-1h", "to": "now" },
531                "page": { "limit": 5 },
532                "sort": "-timestamp"
533            })))
534            .respond_with(
535                wiremock::ResponseTemplate::new(200).set_body_json(page_body(&ids, Some("c1"))),
536            )
537            .expect(1)
538            .mount(&server)
539            .await;
540
541        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
542        let result = LogsApi::new(&client)
543            .search_all("*", "now-1h", "now", 5, SortOrder::TimestampDesc)
544            .await
545            .unwrap();
546        assert_eq!(result.data.len(), 5);
547    }
548
549    #[tokio::test]
550    async fn search_all_truncates_to_hard_cap_when_unbounded() {
551        // The mock returns a full MAX_PAGE_LIMIT page + cursor on every
552        // request, so the only stopping condition is HARD_CAP.
553        let server = wiremock::MockServer::start().await;
554        let full_page: Vec<serde_json::Value> = (0..MAX_PAGE_LIMIT)
555            .map(|i| log_event_json(&format!("e{i}")))
556            .collect();
557        let body = serde_json::json!({
558            "data": full_page,
559            "meta": { "page": { "after": "always-more" } }
560        });
561        wiremock::Mock::given(wiremock::matchers::method("POST"))
562            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
563            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
564            .mount(&server)
565            .await;
566
567        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
568        let result = LogsApi::new(&client)
569            .search_all("*", "now-1h", "now", 0, SortOrder::TimestampDesc)
570            .await
571            .unwrap();
572        assert_eq!(result.data.len(), HARD_CAP);
573    }
574
575    #[tokio::test]
576    async fn search_all_propagates_api_errors_on_first_page() {
577        let server = wiremock::MockServer::start().await;
578        wiremock::Mock::given(wiremock::matchers::method("POST"))
579            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
580            .respond_with(
581                wiremock::ResponseTemplate::new(403).set_body_string(r#"{"errors":["nope"]}"#),
582            )
583            .mount(&server)
584            .await;
585
586        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
587        let err = LogsApi::new(&client)
588            .search_all("*", "now-1h", "now", 0, SortOrder::TimestampDesc)
589            .await
590            .unwrap_err();
591        let msg = err.to_string();
592        assert!(msg.contains("403"));
593        assert!(msg.contains("nope"));
594    }
595
596    #[tokio::test]
597    async fn search_all_caps_explicit_limit_at_hard_cap() {
598        // Caller asked for HARD_CAP + 50; per-page mock returns full pages
599        // forever. We stop at HARD_CAP without honouring the surplus.
600        let server = wiremock::MockServer::start().await;
601        let full_page: Vec<serde_json::Value> = (0..MAX_PAGE_LIMIT)
602            .map(|i| log_event_json(&format!("e{i}")))
603            .collect();
604        let body = serde_json::json!({
605            "data": full_page,
606            "meta": { "page": { "after": "always-more" } }
607        });
608        wiremock::Mock::given(wiremock::matchers::method("POST"))
609            .and(wiremock::matchers::path("/api/v2/logs/events/search"))
610            .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
611            .mount(&server)
612            .await;
613
614        let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
615        let result = LogsApi::new(&client)
616            .search_all(
617                "*",
618                "now-1h",
619                "now",
620                HARD_CAP + 50,
621                SortOrder::TimestampDesc,
622            )
623            .await
624            .unwrap();
625        assert_eq!(result.data.len(), HARD_CAP);
626    }
627}