Skip to main content

kaizen/telemetry/datadog/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Datadog Logs API exporter ([docs](https://docs.datadoghq.com/api/latest/logs/)).
3//!
4//! Architecture: pure JSON build + chunking in `build`, HTTP POST in `transport`. The
5//! exporter is the boundary: it expands a redacted batch, builds DD-shaped log objects with
6//! `timestamp` and `hostname`, chunks under DD's 1000-entry / 5 MB request caps, then fans
7//! the chunks to the intake.
8
9mod build;
10mod transport;
11
12use crate::sync::IngestExportBatch;
13use crate::telemetry::TelemetryExporter;
14use anyhow::Result;
15use reqwest::blocking::Client;
16use std::time::Duration;
17
18/// Re-export of the pure DD log builder for the `tests/spec/telemetry_exporters.rs` connect
19/// driver, so the spec invariant `dd_records_well_formed` checks the *real* JSON shape.
20pub fn dd_log_object_for_test(
21    item: &crate::sync::canonical::CanonicalItem,
22    hostname: &str,
23) -> serde_json::Value {
24    build::dd_log_object(item, hostname)
25}
26
27const TIMEOUT: Duration = Duration::from_secs(30);
28
29pub struct DatadogExporter {
30    client: Client,
31    /// `https://http-intake.logs.<site>/api/v2/logs`
32    logs_url: String,
33    api_key: String,
34    hostname: String,
35}
36
37impl DatadogExporter {
38    pub fn new(site: &str, api_key: &str) -> Self {
39        let s = site.trim();
40        let logs_url = format!("https://http-intake.logs.{s}/api/v2/logs");
41        let client = Client::builder()
42            .timeout(TIMEOUT)
43            .build()
44            .expect("reqwest for Datadog");
45        Self {
46            client,
47            logs_url,
48            api_key: api_key.to_string(),
49            hostname: build::current_hostname(),
50        }
51    }
52}
53
54impl TelemetryExporter for DatadogExporter {
55    fn name(&self) -> &str {
56        "datadog"
57    }
58
59    fn export(&self, batch: &IngestExportBatch) -> Result<()> {
60        let logs = build::build_log_objects(batch, &self.hostname);
61        if logs.is_empty() {
62            return Ok(());
63        }
64        let chunks = build::chunk_for_dd(logs);
65        transport::post_chunks(&self.client, &self.logs_url, &self.api_key, chunks)
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::build::{chunk_for_dd, dd_log_object};
72    use crate::sync::canonical::expand_ingest_batch;
73    use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
74    use crate::sync::{IngestExportBatch, smart::OutboundToolSpan, smart::ToolSpansBatchBody};
75
76    fn one_event_batch() -> IngestExportBatch {
77        IngestExportBatch::Events(EventsBatchBody {
78            team_id: "t".into(),
79            workspace_hash: "wh".into(),
80            events: vec![OutboundEvent {
81                session_id_hash: "sid".into(),
82                event_seq: 0,
83                ts_ms: 1_700_000_000_000,
84                agent: "cursor".into(),
85                model: "gpt-5".into(),
86                kind: "message".into(),
87                source: "hook".into(),
88                tool: Some("Edit".into()),
89                tool_call_id: None,
90                tokens_in: Some(120),
91                tokens_out: Some(30),
92                reasoning_tokens: None,
93                cost_usd_e6: Some(900),
94                payload: serde_json::json!({}),
95            }],
96        })
97    }
98
99    #[test]
100    fn dd_log_object_has_timestamp_and_hostname_top_level() {
101        let b = one_event_batch();
102        let items = expand_ingest_batch(&b);
103        let v = dd_log_object(&items[0], "host-1");
104        assert_eq!(v["timestamp"], serde_json::json!(1_700_000_000_000_i64));
105        assert_eq!(v["hostname"], serde_json::json!("host-1"));
106        assert_eq!(v["agent"], serde_json::json!("cursor"));
107        assert_eq!(v["model"], serde_json::json!("gpt-5"));
108        assert_eq!(v["tokens_in"], serde_json::json!(120));
109        let tags = v["ddtags"].as_str().unwrap();
110        assert!(tags.contains("agent:cursor"));
111        assert!(tags.contains("model:gpt-5"));
112        assert!(tags.contains("kaizen.type:kaizen.event"));
113        // Full canonical item nested under `kaizen` (not double-encoded as a string).
114        assert!(v["kaizen"].is_object());
115        assert!(v["message"].is_string());
116    }
117
118    #[test]
119    fn dd_log_object_handles_tool_span_timestamp_fallback() {
120        let b = IngestExportBatch::ToolSpans(ToolSpansBatchBody {
121            team_id: "t".into(),
122            workspace_hash: "wh".into(),
123            spans: vec![OutboundToolSpan {
124                session_id_hash: "sid".into(),
125                span_id_hash: "ph".into(),
126                tool: Some("Read".into()),
127                status: "ok".into(),
128                started_at_ms: None,
129                ended_at_ms: Some(42),
130                lead_time_ms: None,
131                tokens_in: None,
132                tokens_out: None,
133                reasoning_tokens: None,
134                cost_usd_e6: None,
135                path_hashes: vec![],
136            }],
137        });
138        let items = expand_ingest_batch(&b);
139        let v = dd_log_object(&items[0], "h");
140        assert_eq!(v["timestamp"], serde_json::json!(42_i64));
141        assert_eq!(v["status"], serde_json::json!("ok"));
142    }
143
144    #[test]
145    fn chunk_for_dd_respects_item_cap() {
146        let logs: Vec<_> = (0..2_500).map(|i| serde_json::json!({"i": i})).collect();
147        let chunks = chunk_for_dd(logs);
148        assert_eq!(chunks.len(), 3);
149        assert!(chunks.iter().all(|c| c.len() <= 1000));
150        assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), 2_500);
151    }
152
153    #[test]
154    fn chunk_for_dd_respects_byte_cap() {
155        // ~64 KB string per item -> 100 items > 5 MB -> at least 2 chunks.
156        let big = "x".repeat(64 * 1024);
157        let logs: Vec<_> = (0..100).map(|_| serde_json::json!({"s": big})).collect();
158        let chunks = chunk_for_dd(logs);
159        assert!(chunks.len() >= 2);
160        for c in &chunks {
161            let bytes = serde_json::to_vec(c).unwrap().len();
162            assert!(
163                bytes <= super::build::MAX_BYTES_PER_CHUNK + 64 * 1024,
164                "chunk too big: {bytes} bytes"
165            );
166        }
167    }
168}