datadog/
logs.rs

1use chrono::{DateTime, Utc};
2use colored::Colorize;
3use serde::{Deserialize, Serialize};
4use std::env;
5
6/// Parameters for a logs search query
7#[derive(Debug, Clone)]
8pub struct LogsQuery {
9    pub query: String,
10    pub from: String,
11    pub to: String,
12    /// Maximum number of logs to retrieve. None = fetch all.
13    pub limit: Option<u32>,
14}
15
16impl LogsQuery {
17    pub fn new(query: String, from: String, to: String, limit: Option<u32>) -> Self {
18        Self {
19            query,
20            from,
21            to,
22            limit,
23        }
24    }
25}
26
27// Request structures (internal to API)
28#[derive(Serialize)]
29struct LogsSearchRequest {
30    filter: LogsFilter,
31    page: PageOptions,
32    sort: String,
33}
34
35#[derive(Serialize)]
36struct LogsFilter {
37    query: String,
38    from: String,
39    to: String,
40}
41
42#[derive(Serialize)]
43struct PageOptions {
44    limit: u32,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    cursor: Option<String>,
47}
48
49// Internal response structure (includes pagination metadata)
50#[derive(Deserialize)]
51struct LogsSearchResponseInternal {
52    data: Option<Vec<LogEntry>>,
53    meta: Option<Meta>,
54}
55
56#[derive(Deserialize)]
57struct Meta {
58    page: Option<PageMeta>,
59}
60
61#[derive(Deserialize)]
62struct PageMeta {
63    after: Option<String>,
64}
65
66// Public response structure
67#[derive(Deserialize, Serialize)]
68pub struct LogsSearchResponse {
69    pub data: Option<Vec<LogEntry>>,
70}
71
72#[derive(Deserialize, Serialize)]
73pub struct LogEntry {
74    pub id: Option<String>,
75    #[serde(rename = "type")]
76    pub entry_type: Option<String>,
77    pub attributes: LogAttributes,
78}
79
80#[derive(Deserialize, Serialize)]
81pub struct LogAttributes {
82    pub timestamp: Option<String>,
83    pub status: Option<String>,
84    pub message: Option<String>,
85    pub host: Option<String>,
86    pub service: Option<String>,
87    pub tags: Option<Vec<String>>,
88    #[serde(flatten)]
89    pub attributes: Option<serde_json::Map<String, serde_json::Value>>,
90}
91
92pub struct DatadogClient {
93    pub(crate) api_key: String,
94    pub(crate) app_key: String,
95    pub(crate) client: reqwest::blocking::Client,
96}
97
98impl DatadogClient {
99    pub fn new() -> Result<Self, String> {
100        let api_key = env::var("DD_API_KEY")
101            .map_err(|_| "Missing environment variable: DD_API_KEY".to_string())?;
102        let app_key = env::var("DD_APP_KEY")
103            .map_err(|_| "Missing environment variable: DD_APP_KEY".to_string())?;
104
105        Ok(Self {
106            api_key,
107            app_key,
108            client: reqwest::blocking::Client::new(),
109        })
110    }
111
112    /// Search logs with streaming output. Calls `on_batch` with each page of results as they arrive.
113    /// Returns the total number of logs retrieved.
114    pub fn search_logs<F>(&self, query: &LogsQuery, mut on_batch: F) -> Result<usize, String>
115    where
116        F: FnMut(&[LogEntry]),
117    {
118        const MAX_PAGE_SIZE: u32 = 5000;
119
120        let mut total_count: usize = 0;
121        let mut cursor: Option<String> = None;
122
123        loop {
124            // Calculate page size: min(remaining, 5000)
125            let page_size = match query.limit {
126                Some(limit) => {
127                    let remaining = limit.saturating_sub(total_count as u32);
128                    remaining.min(MAX_PAGE_SIZE)
129                }
130                None => MAX_PAGE_SIZE,
131            };
132
133            // If we've already collected enough, stop
134            if page_size == 0 {
135                break;
136            }
137
138            let request_body = LogsSearchRequest {
139                filter: LogsFilter {
140                    query: query.query.clone(),
141                    from: query.from.clone(),
142                    to: query.to.clone(),
143                },
144                page: PageOptions {
145                    limit: page_size,
146                    cursor: cursor.clone(),
147                },
148                sort: "timestamp".to_string(),
149            };
150
151            let response = self
152                .client
153                .post("https://api.datadoghq.com/api/v2/logs/events/search")
154                .header("DD-API-KEY", &self.api_key)
155                .header("DD-APPLICATION-KEY", &self.app_key)
156                .header("Content-Type", "application/json")
157                .json(&request_body)
158                .send()
159                .map_err(|e| format!("Request failed: {}", e))?;
160
161            if !response.status().is_success() {
162                let status = response.status();
163                let body = response.text().unwrap_or_default();
164                return Err(format!("API error ({}): {}", status, body));
165            }
166
167            let internal_response: LogsSearchResponseInternal = response
168                .json()
169                .map_err(|e| format!("Failed to parse response: {}", e))?;
170
171            // Stream logs from this page immediately
172            if let Some(logs) = internal_response.data {
173                on_batch(&logs);
174                total_count += logs.len();
175            }
176
177            // Check for next page cursor
178            let next_cursor = internal_response
179                .meta
180                .and_then(|m| m.page)
181                .and_then(|p| p.after);
182
183            match next_cursor {
184                Some(c) => cursor = Some(c),
185                None => break, // No more pages
186            }
187
188            // Check if we've collected enough
189            if let Some(limit) = query.limit
190                && total_count >= limit as usize
191            {
192                break;
193            }
194        }
195
196        Ok(total_count)
197    }
198}
199
200pub fn format_log_entry(entry: &LogEntry) -> String {
201    let timestamp = entry
202        .attributes
203        .timestamp
204        .as_ref()
205        .and_then(|ts| DateTime::parse_from_rfc3339(ts).ok())
206        .map(|dt| {
207            dt.with_timezone(&Utc)
208                .format("%Y-%m-%d %H:%M:%S")
209                .to_string()
210        })
211        .unwrap_or_else(|| "--------------------".to_string());
212
213    let status_raw = entry
214        .attributes
215        .status
216        .as_ref()
217        .map(|s| s.to_uppercase())
218        .unwrap_or_else(|| "-----".to_string());
219
220    let status_colored = match status_raw.as_str() {
221        "ERROR" | "CRITICAL" | "EMERGENCY" | "ALERT" => format!("{:5}", status_raw).red().bold(),
222        "WARN" | "WARNING" => format!("{:5}", status_raw).yellow(),
223        "INFO" => format!("{:5}", status_raw).green(),
224        "DEBUG" => format!("{:5}", status_raw).blue(),
225        "TRACE" => format!("{:5}", status_raw).cyan(),
226        _ => format!("{:5}", status_raw).normal(),
227    };
228
229    let message = entry.attributes.message.as_deref().unwrap_or("");
230
231    format!(
232        "[{}] {} | {}",
233        timestamp.bright_black(),
234        status_colored,
235        message
236    )
237}