kaizen/telemetry/datadog/
mod.rs1mod build;
10mod transport;
11
12use crate::sync::IngestExportBatch;
13use crate::telemetry::TelemetryExporter;
14use anyhow::Result;
15use reqwest::blocking::Client;
16use std::time::Duration;
17
18pub fn dd_log_object_for_test(
21 item: &crate::sync::canonical::CanonicalItem,
22 hostname: &str,
23) -> serde_json::Value {
24 build::dd_log_object(item, hostname)
25}
26
27const TIMEOUT: Duration = Duration::from_secs(30);
28
29pub struct DatadogExporter {
30 client: Client,
31 logs_url: String,
33 api_key: String,
34 hostname: String,
35}
36
37impl DatadogExporter {
38 pub fn new(site: &str, api_key: &str) -> Self {
39 let s = site.trim();
40 let logs_url = format!("https://http-intake.logs.{s}/api/v2/logs");
41 let client = Client::builder()
42 .timeout(TIMEOUT)
43 .build()
44 .expect("reqwest for Datadog");
45 Self {
46 client,
47 logs_url,
48 api_key: api_key.to_string(),
49 hostname: build::current_hostname(),
50 }
51 }
52}
53
54impl TelemetryExporter for DatadogExporter {
55 fn name(&self) -> &str {
56 "datadog"
57 }
58
59 fn export(&self, batch: &IngestExportBatch) -> Result<()> {
60 let logs = build::build_log_objects(batch, &self.hostname);
61 if logs.is_empty() {
62 return Ok(());
63 }
64 let chunks = build::chunk_for_dd(logs);
65 transport::post_chunks(&self.client, &self.logs_url, &self.api_key, chunks)
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::build::{chunk_for_dd, dd_log_object};
72 use crate::sync::canonical::expand_ingest_batch;
73 use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
74 use crate::sync::{IngestExportBatch, smart::OutboundToolSpan, smart::ToolSpansBatchBody};
75
76 fn one_event_batch() -> IngestExportBatch {
77 IngestExportBatch::Events(EventsBatchBody {
78 team_id: "t".into(),
79 workspace_hash: "wh".into(),
80 events: vec![OutboundEvent {
81 session_id_hash: "sid".into(),
82 event_seq: 0,
83 ts_ms: 1_700_000_000_000,
84 agent: "cursor".into(),
85 model: "gpt-5".into(),
86 kind: "message".into(),
87 source: "hook".into(),
88 tool: Some("Edit".into()),
89 tool_call_id: None,
90 tokens_in: Some(120),
91 tokens_out: Some(30),
92 reasoning_tokens: None,
93 cost_usd_e6: Some(900),
94 payload: serde_json::json!({}),
95 }],
96 })
97 }
98
99 #[test]
100 fn dd_log_object_has_timestamp_and_hostname_top_level() {
101 let b = one_event_batch();
102 let items = expand_ingest_batch(&b);
103 let v = dd_log_object(&items[0], "host-1");
104 assert_eq!(v["timestamp"], serde_json::json!(1_700_000_000_000_i64));
105 assert_eq!(v["hostname"], serde_json::json!("host-1"));
106 assert_eq!(v["agent"], serde_json::json!("cursor"));
107 assert_eq!(v["model"], serde_json::json!("gpt-5"));
108 assert_eq!(v["tokens_in"], serde_json::json!(120));
109 let tags = v["ddtags"].as_str().unwrap();
110 assert!(tags.contains("agent:cursor"));
111 assert!(tags.contains("model:gpt-5"));
112 assert!(tags.contains("kaizen.type:kaizen.event"));
113 assert!(v["kaizen"].is_object());
115 assert!(v["message"].is_string());
116 }
117
118 #[test]
119 fn dd_log_object_handles_tool_span_timestamp_fallback() {
120 let b = IngestExportBatch::ToolSpans(ToolSpansBatchBody {
121 team_id: "t".into(),
122 workspace_hash: "wh".into(),
123 spans: vec![OutboundToolSpan {
124 session_id_hash: "sid".into(),
125 span_id_hash: "ph".into(),
126 tool: Some("Read".into()),
127 status: "ok".into(),
128 started_at_ms: None,
129 ended_at_ms: Some(42),
130 lead_time_ms: None,
131 tokens_in: None,
132 tokens_out: None,
133 reasoning_tokens: None,
134 cost_usd_e6: None,
135 path_hashes: vec![],
136 }],
137 });
138 let items = expand_ingest_batch(&b);
139 let v = dd_log_object(&items[0], "h");
140 assert_eq!(v["timestamp"], serde_json::json!(42_i64));
141 assert_eq!(v["status"], serde_json::json!("ok"));
142 }
143
144 #[test]
145 fn chunk_for_dd_respects_item_cap() {
146 let logs: Vec<_> = (0..2_500).map(|i| serde_json::json!({"i": i})).collect();
147 let chunks = chunk_for_dd(logs);
148 assert_eq!(chunks.len(), 3);
149 assert!(chunks.iter().all(|c| c.len() <= 1000));
150 assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), 2_500);
151 }
152
153 #[test]
154 fn chunk_for_dd_respects_byte_cap() {
155 let big = "x".repeat(64 * 1024);
157 let logs: Vec<_> = (0..100).map(|_| serde_json::json!({"s": big})).collect();
158 let chunks = chunk_for_dd(logs);
159 assert!(chunks.len() >= 2);
160 for c in &chunks {
161 let bytes = serde_json::to_vec(c).unwrap().len();
162 assert!(
163 bytes <= super::build::MAX_BYTES_PER_CHUNK + 64 * 1024,
164 "chunk too big: {bytes} bytes"
165 );
166 }
167 }
168}