1use anyhow::{Context, Result};
13use serde::Serialize;
14
15use crate::datadog::client::DatadogClient;
16use crate::datadog::types::{LogSearchResult, SortOrder};
17
18pub const MAX_PAGE_LIMIT: usize = 1000;
24
25pub const HARD_CAP: usize = 10_000;
28
29#[derive(Debug)]
31pub struct LogsApi<'a> {
32 client: &'a DatadogClient,
33}
34
35impl<'a> LogsApi<'a> {
36 #[must_use]
38 pub fn new(client: &'a DatadogClient) -> Self {
39 Self { client }
40 }
41
42 pub async fn search(
57 &self,
58 query: &str,
59 from: &str,
60 to: &str,
61 limit: usize,
62 sort: SortOrder,
63 after: Option<&str>,
64 ) -> Result<LogSearchResult> {
65 if limit > MAX_PAGE_LIMIT {
66 return Err(anyhow::anyhow!(
67 "`limit` must be <= {MAX_PAGE_LIMIT} (Datadog v2 logs search per-page cap; use `LogsApi::search_all` to auto-paginate across pages)"
68 ));
69 }
70 let body = SearchRequest {
71 filter: Filter { query, from, to },
72 page: Page {
73 limit,
74 cursor: after,
75 },
76 sort,
77 };
78 let url = format!("{}/api/v2/logs/events/search", self.client.base_url());
79 let response = self.client.post_json(&url, &body).await?;
80 if !response.status().is_success() {
81 return Err(DatadogClient::response_to_error(response).await.into());
82 }
83 response
84 .json::<LogSearchResult>()
85 .await
86 .context("Failed to parse /api/v2/logs/events/search response")
87 }
88
89 pub async fn search_all(
105 &self,
106 query: &str,
107 from: &str,
108 to: &str,
109 limit: usize,
110 sort: SortOrder,
111 ) -> Result<LogSearchResult> {
112 let cap = effective_cap(limit);
113 let mut acc: Option<LogSearchResult> = None;
114 let mut cursor: Option<String> = None;
115 loop {
116 let collected = acc.as_ref().map_or(0, |r| r.data.len());
117 let remaining = cap - collected;
118 let page_size = remaining.min(MAX_PAGE_LIMIT);
119 let page = self
120 .search(query, from, to, page_size, sort, cursor.as_deref())
121 .await?;
122 let next_cursor = page
123 .meta
124 .as_ref()
125 .and_then(|m| m.page.as_ref())
126 .and_then(|p| p.after.clone());
127 match acc.as_mut() {
128 Some(existing) => {
129 existing.data.extend(page.data);
130 existing.meta = page.meta;
131 }
132 None => acc = Some(page),
133 }
134 let collected = acc.as_ref().map_or(0, |r| r.data.len());
135 if collected >= cap || next_cursor.is_none() {
136 break;
137 }
138 cursor = next_cursor;
139 }
140 let mut result = acc.unwrap_or_default();
141 result.data.truncate(cap);
142 Ok(result)
143 }
144}
145
146fn effective_cap(limit: usize) -> usize {
149 if limit == 0 {
150 HARD_CAP
151 } else {
152 limit.min(HARD_CAP)
153 }
154}
155
156#[derive(Debug, Serialize)]
157struct SearchRequest<'a> {
158 filter: Filter<'a>,
159 page: Page<'a>,
160 sort: SortOrder,
161}
162
163#[derive(Debug, Serialize)]
164struct Filter<'a> {
165 query: &'a str,
166 from: &'a str,
167 to: &'a str,
168}
169
170#[derive(Debug, Serialize)]
171struct Page<'a> {
172 limit: usize,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 cursor: Option<&'a str>,
175}
176
177#[cfg(test)]
178#[allow(clippy::unwrap_used, clippy::expect_used)]
179mod tests {
180 use super::*;
181
182 fn sample_search_body() -> serde_json::Value {
183 serde_json::json!({
184 "data": [
185 {
186 "id": "AAAA",
187 "type": "log",
188 "attributes": {
189 "timestamp": "2026-04-22T10:00:00.000Z",
190 "service": "api",
191 "status": "info",
192 "message": "hello",
193 "tags": ["env:prod"]
194 }
195 }
196 ],
197 "meta": {
198 "page": { "after": "next-cursor" },
199 "status": "done",
200 "elapsed": 12
201 }
202 })
203 }
204
205 fn log_event_json(id: &str) -> serde_json::Value {
206 serde_json::json!({
207 "id": id,
208 "type": "log",
209 "attributes": {
210 "timestamp": "2026-04-22T10:00:00.000Z",
211 "service": "api",
212 "status": "info",
213 "message": id,
214 "tags": []
215 }
216 })
217 }
218
219 fn page_body(ids: &[&str], next_cursor: Option<&str>) -> serde_json::Value {
220 let data: Vec<serde_json::Value> = ids.iter().map(|id| log_event_json(id)).collect();
221 let meta = match next_cursor {
222 Some(c) => serde_json::json!({ "page": { "after": c }, "status": "done" }),
223 None => serde_json::json!({ "page": {}, "status": "done" }),
224 };
225 serde_json::json!({ "data": data, "meta": meta })
226 }
227
228 #[test]
231 fn effective_cap_zero_means_hard_cap() {
232 assert_eq!(effective_cap(0), HARD_CAP);
233 }
234
235 #[test]
236 fn effective_cap_clamps_to_hard_cap() {
237 assert_eq!(effective_cap(HARD_CAP + 5), HARD_CAP);
238 }
239
240 #[test]
241 fn effective_cap_passes_through_small_limits() {
242 assert_eq!(effective_cap(42), 42);
243 }
244
245 #[tokio::test]
248 async fn search_posts_exact_body_shape_and_parses_response() {
249 let server = wiremock::MockServer::start().await;
250 wiremock::Mock::given(wiremock::matchers::method("POST"))
251 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
252 .and(wiremock::matchers::header("DD-API-KEY", "api"))
253 .and(wiremock::matchers::header("DD-APPLICATION-KEY", "app"))
254 .and(wiremock::matchers::header(
255 "Content-Type",
256 "application/json",
257 ))
258 .and(wiremock::matchers::body_json(serde_json::json!({
259 "filter": {
260 "query": "service:api status:error",
261 "from": "now-15m",
262 "to": "now"
263 },
264 "page": { "limit": 100 },
265 "sort": "-timestamp"
266 })))
267 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_search_body()))
268 .expect(1)
269 .mount(&server)
270 .await;
271
272 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
273 let result = LogsApi::new(&client)
274 .search(
275 "service:api status:error",
276 "now-15m",
277 "now",
278 100,
279 SortOrder::TimestampDesc,
280 None,
281 )
282 .await
283 .unwrap();
284 assert_eq!(result.data.len(), 1);
285 assert_eq!(result.data[0].id, "AAAA");
286 assert_eq!(
287 result
288 .meta
289 .as_ref()
290 .and_then(|m| m.page.as_ref())
291 .and_then(|p| p.after.as_deref()),
292 Some("next-cursor")
293 );
294 }
295
296 #[tokio::test]
297 async fn search_includes_cursor_in_body_when_after_is_some() {
298 let server = wiremock::MockServer::start().await;
299 wiremock::Mock::given(wiremock::matchers::method("POST"))
300 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
301 .and(wiremock::matchers::body_json(serde_json::json!({
302 "filter": { "query": "*", "from": "now-1h", "to": "now" },
303 "page": { "limit": 50, "cursor": "tok-2" },
304 "sort": "-timestamp"
305 })))
306 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_search_body()))
307 .expect(1)
308 .mount(&server)
309 .await;
310
311 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
312 LogsApi::new(&client)
313 .search(
314 "*",
315 "now-1h",
316 "now",
317 50,
318 SortOrder::TimestampDesc,
319 Some("tok-2"),
320 )
321 .await
322 .unwrap();
323 }
324
325 #[tokio::test]
326 async fn search_serializes_ascending_sort_without_minus_prefix() {
327 let server = wiremock::MockServer::start().await;
328 wiremock::Mock::given(wiremock::matchers::method("POST"))
329 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
330 .and(wiremock::matchers::body_json(serde_json::json!({
331 "filter": { "query": "*", "from": "now-1h", "to": "now" },
332 "page": { "limit": 50 },
333 "sort": "timestamp"
334 })))
335 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_search_body()))
336 .expect(1)
337 .mount(&server)
338 .await;
339
340 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
341 LogsApi::new(&client)
342 .search("*", "now-1h", "now", 50, SortOrder::TimestampAsc, None)
343 .await
344 .unwrap();
345 }
346
347 #[tokio::test]
348 async fn search_rejects_limit_above_max_page_limit_client_side() {
349 let client = DatadogClient::new("http://127.0.0.1:1", "api", "app").unwrap();
350 let err = LogsApi::new(&client)
351 .search(
352 "*",
353 "now-1h",
354 "now",
355 MAX_PAGE_LIMIT + 1,
356 SortOrder::TimestampDesc,
357 None,
358 )
359 .await
360 .unwrap_err();
361 let msg = err.to_string();
362 assert!(msg.contains("limit"));
363 assert!(msg.contains("1000"));
364 assert!(msg.contains("search_all"));
365 }
366
367 #[tokio::test]
368 async fn search_accepts_limit_at_max_page_limit_boundary() {
369 let server = wiremock::MockServer::start().await;
370 wiremock::Mock::given(wiremock::matchers::method("POST"))
371 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
372 .and(wiremock::matchers::body_json(serde_json::json!({
373 "filter": { "query": "*", "from": "now-1h", "to": "now" },
374 "page": { "limit": MAX_PAGE_LIMIT },
375 "sort": "-timestamp"
376 })))
377 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(sample_search_body()))
378 .expect(1)
379 .mount(&server)
380 .await;
381
382 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
383 LogsApi::new(&client)
384 .search(
385 "*",
386 "now-1h",
387 "now",
388 MAX_PAGE_LIMIT,
389 SortOrder::TimestampDesc,
390 None,
391 )
392 .await
393 .unwrap();
394 }
395
396 #[tokio::test]
397 async fn search_propagates_api_errors() {
398 let server = wiremock::MockServer::start().await;
399 wiremock::Mock::given(wiremock::matchers::method("POST"))
400 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
401 .respond_with(
402 wiremock::ResponseTemplate::new(400).set_body_string(r#"{"errors":["bad query"]}"#),
403 )
404 .mount(&server)
405 .await;
406
407 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
408 let err = LogsApi::new(&client)
409 .search("???", "now-1h", "now", 10, SortOrder::TimestampDesc, None)
410 .await
411 .unwrap_err();
412 let msg = err.to_string();
413 assert!(msg.contains("400"));
414 assert!(msg.contains("bad query"));
415 }
416
417 #[tokio::test]
418 async fn search_propagates_network_errors() {
419 let client = DatadogClient::new("http://127.0.0.1:1", "api", "app").unwrap();
420 let err = LogsApi::new(&client)
421 .search("*", "now-1h", "now", 10, SortOrder::TimestampDesc, None)
422 .await
423 .unwrap_err();
424 assert!(err.to_string().contains("Failed to send"));
425 }
426
427 #[tokio::test]
428 async fn search_errors_on_malformed_response() {
429 let server = wiremock::MockServer::start().await;
430 wiremock::Mock::given(wiremock::matchers::method("POST"))
431 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
432 .respond_with(wiremock::ResponseTemplate::new(200).set_body_string("not json"))
433 .mount(&server)
434 .await;
435
436 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
437 let err = LogsApi::new(&client)
438 .search("*", "now-1h", "now", 10, SortOrder::TimestampDesc, None)
439 .await
440 .unwrap_err();
441 assert!(err.to_string().contains("Failed to parse"));
442 }
443
444 #[tokio::test]
447 async fn search_all_single_page_when_response_has_no_cursor() {
448 let server = wiremock::MockServer::start().await;
449 wiremock::Mock::given(wiremock::matchers::method("POST"))
450 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
451 .and(wiremock::matchers::body_json(serde_json::json!({
452 "filter": { "query": "*", "from": "now-1h", "to": "now" },
453 "page": { "limit": 100 },
454 "sort": "-timestamp"
455 })))
456 .respond_with(
457 wiremock::ResponseTemplate::new(200).set_body_json(page_body(&["a", "b"], None)),
458 )
459 .expect(1)
460 .mount(&server)
461 .await;
462
463 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
464 let result = LogsApi::new(&client)
465 .search_all("*", "now-1h", "now", 100, SortOrder::TimestampDesc)
466 .await
467 .unwrap();
468 assert_eq!(result.data.len(), 2);
469 }
470
471 #[tokio::test]
472 async fn search_all_follows_cursor_until_no_more_pages() {
473 let server = wiremock::MockServer::start().await;
477 wiremock::Mock::given(wiremock::matchers::method("POST"))
478 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
479 .and(wiremock::matchers::body_json(serde_json::json!({
480 "filter": { "query": "*", "from": "now-1h", "to": "now" },
481 "page": { "limit": MAX_PAGE_LIMIT },
482 "sort": "-timestamp"
483 })))
484 .respond_with(
485 wiremock::ResponseTemplate::new(200)
486 .set_body_json(page_body(&["a", "b"], Some("c1"))),
487 )
488 .expect(1)
489 .mount(&server)
490 .await;
491 wiremock::Mock::given(wiremock::matchers::method("POST"))
492 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
493 .and(wiremock::matchers::body_json(serde_json::json!({
494 "filter": { "query": "*", "from": "now-1h", "to": "now" },
495 "page": { "limit": MAX_PAGE_LIMIT, "cursor": "c1" },
496 "sort": "-timestamp"
497 })))
498 .respond_with(
499 wiremock::ResponseTemplate::new(200).set_body_json(page_body(&["c"], None)),
500 )
501 .expect(1)
502 .mount(&server)
503 .await;
504
505 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
506 let result = LogsApi::new(&client)
507 .search_all("*", "now-1h", "now", 0, SortOrder::TimestampDesc)
508 .await
509 .unwrap();
510 let ids: Vec<&str> = result.data.iter().map(|e| e.id.as_str()).collect();
511 assert_eq!(ids, ["a", "b", "c"]);
512 assert!(result
514 .meta
515 .as_ref()
516 .and_then(|m| m.page.as_ref())
517 .and_then(|p| p.after.as_deref())
518 .is_none());
519 }
520
521 #[tokio::test]
522 async fn search_all_stops_at_explicit_limit_within_first_page() {
523 let server = wiremock::MockServer::start().await;
526 let ids = ["a", "b", "c", "d", "e"];
527 wiremock::Mock::given(wiremock::matchers::method("POST"))
528 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
529 .and(wiremock::matchers::body_json(serde_json::json!({
530 "filter": { "query": "*", "from": "now-1h", "to": "now" },
531 "page": { "limit": 5 },
532 "sort": "-timestamp"
533 })))
534 .respond_with(
535 wiremock::ResponseTemplate::new(200).set_body_json(page_body(&ids, Some("c1"))),
536 )
537 .expect(1)
538 .mount(&server)
539 .await;
540
541 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
542 let result = LogsApi::new(&client)
543 .search_all("*", "now-1h", "now", 5, SortOrder::TimestampDesc)
544 .await
545 .unwrap();
546 assert_eq!(result.data.len(), 5);
547 }
548
549 #[tokio::test]
550 async fn search_all_truncates_to_hard_cap_when_unbounded() {
551 let server = wiremock::MockServer::start().await;
554 let full_page: Vec<serde_json::Value> = (0..MAX_PAGE_LIMIT)
555 .map(|i| log_event_json(&format!("e{i}")))
556 .collect();
557 let body = serde_json::json!({
558 "data": full_page,
559 "meta": { "page": { "after": "always-more" } }
560 });
561 wiremock::Mock::given(wiremock::matchers::method("POST"))
562 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
563 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
564 .mount(&server)
565 .await;
566
567 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
568 let result = LogsApi::new(&client)
569 .search_all("*", "now-1h", "now", 0, SortOrder::TimestampDesc)
570 .await
571 .unwrap();
572 assert_eq!(result.data.len(), HARD_CAP);
573 }
574
575 #[tokio::test]
576 async fn search_all_propagates_api_errors_on_first_page() {
577 let server = wiremock::MockServer::start().await;
578 wiremock::Mock::given(wiremock::matchers::method("POST"))
579 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
580 .respond_with(
581 wiremock::ResponseTemplate::new(403).set_body_string(r#"{"errors":["nope"]}"#),
582 )
583 .mount(&server)
584 .await;
585
586 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
587 let err = LogsApi::new(&client)
588 .search_all("*", "now-1h", "now", 0, SortOrder::TimestampDesc)
589 .await
590 .unwrap_err();
591 let msg = err.to_string();
592 assert!(msg.contains("403"));
593 assert!(msg.contains("nope"));
594 }
595
596 #[tokio::test]
597 async fn search_all_caps_explicit_limit_at_hard_cap() {
598 let server = wiremock::MockServer::start().await;
601 let full_page: Vec<serde_json::Value> = (0..MAX_PAGE_LIMIT)
602 .map(|i| log_event_json(&format!("e{i}")))
603 .collect();
604 let body = serde_json::json!({
605 "data": full_page,
606 "meta": { "page": { "after": "always-more" } }
607 });
608 wiremock::Mock::given(wiremock::matchers::method("POST"))
609 .and(wiremock::matchers::path("/api/v2/logs/events/search"))
610 .respond_with(wiremock::ResponseTemplate::new(200).set_body_json(body))
611 .mount(&server)
612 .await;
613
614 let client = DatadogClient::new(&server.uri(), "api", "app").unwrap();
615 let result = LogsApi::new(&client)
616 .search_all(
617 "*",
618 "now-1h",
619 "now",
620 HARD_CAP + 50,
621 SortOrder::TimestampDesc,
622 )
623 .await
624 .unwrap();
625 assert_eq!(result.data.len(), HARD_CAP);
626 }
627}