Skip to main content

kaizen/telemetry/datadog/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Datadog Logs API exporter ([docs](https://docs.datadoghq.com/api/latest/logs/)).
3//!
4//! Architecture: pure JSON build + chunking in `build`, HTTP POST in `transport`. The
5//! exporter is the boundary: it expands a redacted batch, builds DD-shaped log objects with
6//! `timestamp` and `hostname`, chunks under DD's 1000-entry / 5 MB request caps, then fans
7//! the chunks to the intake.
8
9mod build;
10mod transport;
11
12use crate::sync::IngestExportBatch;
13use crate::telemetry::TelemetryExporter;
14use anyhow::Result;
15use reqwest::blocking::Client;
16use std::time::Duration;
17
18/// Re-export of the pure DD log builder for the `tests/spec/telemetry_exporters.rs` connect
19/// driver, so the spec invariant `dd_records_well_formed` checks the *real* JSON shape.
20pub fn dd_log_object_for_test(
21    item: &crate::sync::canonical::CanonicalItem,
22    hostname: &str,
23) -> serde_json::Value {
24    build::dd_log_object(item, hostname)
25}
26
27const TIMEOUT: Duration = Duration::from_secs(30);
28
29pub struct DatadogExporter {
30    client: Client,
31    /// `https://http-intake.logs.<site>/api/v2/logs`
32    logs_url: String,
33    api_key: String,
34    hostname: String,
35}
36
37impl DatadogExporter {
38    pub fn new(site: &str, api_key: &str) -> Self {
39        let s = site.trim();
40        let logs_url = format!("https://http-intake.logs.{s}/api/v2/logs");
41        let client = Client::builder()
42            .timeout(TIMEOUT)
43            .build()
44            .expect("reqwest for Datadog");
45        Self {
46            client,
47            logs_url,
48            api_key: api_key.to_string(),
49            hostname: build::current_hostname(),
50        }
51    }
52}
53
54impl TelemetryExporter for DatadogExporter {
55    fn name(&self) -> &str {
56        "datadog"
57    }
58
59    fn export(&self, batch: &IngestExportBatch) -> Result<()> {
60        let logs = build::build_log_objects(batch, &self.hostname);
61        if logs.is_empty() {
62            return Ok(());
63        }
64        let chunks = build::chunk_for_dd(logs);
65        transport::post_chunks(&self.client, &self.logs_url, &self.api_key, chunks)
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::build::{chunk_for_dd, dd_log_object};
72    use crate::sync::canonical::expand_ingest_batch;
73    use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
74    use crate::sync::{IngestExportBatch, smart::OutboundToolSpan, smart::ToolSpansBatchBody};
75
76    fn one_event_batch() -> IngestExportBatch {
77        IngestExportBatch::Events(EventsBatchBody {
78            team_id: "t".into(),
79            workspace_hash: "wh".into(),
80            project_name: Some("kaizen".into()),
81            events: vec![OutboundEvent {
82                session_id_hash: "sid".into(),
83                event_seq: 0,
84                ts_ms: 1_700_000_000_000,
85                agent: "cursor".into(),
86                model: "gpt-5".into(),
87                kind: "message".into(),
88                source: "hook".into(),
89                tool: Some("Edit".into()),
90                tool_call_id: None,
91                tokens_in: Some(120),
92                tokens_out: Some(30),
93                reasoning_tokens: None,
94                cost_usd_e6: Some(900),
95                payload: serde_json::json!({}),
96            }],
97        })
98    }
99
100    #[test]
101    fn dd_log_object_has_timestamp_and_hostname_top_level() {
102        let b = one_event_batch();
103        let items = expand_ingest_batch(&b);
104        let v = dd_log_object(&items[0], "host-1");
105        assert_eq!(v["timestamp"], serde_json::json!(1_700_000_000_000_i64));
106        assert_eq!(v["hostname"], serde_json::json!("host-1"));
107        assert_eq!(v["agent"], serde_json::json!("cursor"));
108        assert_eq!(v["model"], serde_json::json!("gpt-5"));
109        assert_eq!(v["project_name"], serde_json::json!("kaizen"));
110        assert_eq!(v["tokens_in"], serde_json::json!(120));
111        let tags = v["ddtags"].as_str().unwrap();
112        assert!(tags.contains("agent:cursor"));
113        assert!(tags.contains("model:gpt-5"));
114        assert!(tags.contains("project_name:kaizen"));
115        assert!(tags.contains("kaizen.type:kaizen.event"));
116        // Full canonical item nested under `kaizen` (not double-encoded as a string).
117        assert!(v["kaizen"].is_object());
118        assert!(v["message"].is_string());
119    }
120
121    #[test]
122    fn dd_log_object_handles_tool_span_timestamp_fallback() {
123        let b = IngestExportBatch::ToolSpans(ToolSpansBatchBody {
124            team_id: "t".into(),
125            workspace_hash: "wh".into(),
126            project_name: Some("kaizen".into()),
127            spans: vec![OutboundToolSpan {
128                session_id_hash: "sid".into(),
129                span_id_hash: "ph".into(),
130                tool: Some("Read".into()),
131                status: "ok".into(),
132                started_at_ms: None,
133                ended_at_ms: Some(42),
134                lead_time_ms: Some(40),
135                tokens_in: Some(10),
136                tokens_out: Some(4),
137                reasoning_tokens: Some(2),
138                cost_usd_e6: Some(25),
139                path_hashes: vec![],
140            }],
141        });
142        let items = expand_ingest_batch(&b);
143        let v = dd_log_object(&items[0], "h");
144        assert_eq!(v["timestamp"], serde_json::json!(42_i64));
145        assert_eq!(v["status"], serde_json::json!("ok"));
146        assert_eq!(v["lead_time_ms"], serde_json::json!(40));
147        assert_eq!(v["tokens_in"], serde_json::json!(10));
148        assert_eq!(v["tokens_out"], serde_json::json!(4));
149        assert_eq!(v["reasoning_tokens"], serde_json::json!(2));
150        assert_eq!(v["cost_usd_e6"], serde_json::json!(25));
151        assert_eq!(v["project_name"], serde_json::json!("kaizen"));
152    }
153
154    #[test]
155    fn chunk_for_dd_respects_item_cap() {
156        let logs: Vec<_> = (0..2_500).map(|i| serde_json::json!({"i": i})).collect();
157        let chunks = chunk_for_dd(logs);
158        assert_eq!(chunks.len(), 3);
159        assert!(chunks.iter().all(|c| c.len() <= 1000));
160        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), 2_500);
161    }
162
163    #[test]
164    fn chunk_for_dd_respects_byte_cap() {
165        // ~64 KB string per item -> 100 items > 5 MB -> at least 2 chunks.
166        let big = "x".repeat(64 * 1024);
167        let logs: Vec<_> = (0..100).map(|_| serde_json::json!({"s": big})).collect();
168        let chunks = chunk_for_dd(logs);
169        assert!(chunks.len() >= 2);
170        for c in &chunks {
171            let bytes = serde_json::to_vec(c).unwrap().len();
172            assert!(
173                bytes <= super::build::MAX_BYTES_PER_CHUNK + 64 * 1024,
174                "chunk too big: {bytes} bytes"
175            );
176        }
177    }
178}