kaizen/telemetry/datadog/
mod.rs1mod build;
10mod transport;
11
12use crate::sync::IngestExportBatch;
13use crate::telemetry::TelemetryExporter;
14use anyhow::Result;
15use reqwest::blocking::Client;
16use std::time::Duration;
17
18pub fn dd_log_object_for_test(
21 item: &crate::sync::canonical::CanonicalItem,
22 hostname: &str,
23) -> serde_json::Value {
24 build::dd_log_object(item, hostname)
25}
26
27const TIMEOUT: Duration = Duration::from_secs(30);
28
29pub struct DatadogExporter {
30 client: Client,
31 logs_url: String,
33 api_key: String,
34 hostname: String,
35}
36
37impl DatadogExporter {
38 pub fn new(site: &str, api_key: &str) -> Self {
39 let s = site.trim();
40 let logs_url = format!("https://http-intake.logs.{s}/api/v2/logs");
41 let client = Client::builder()
42 .timeout(TIMEOUT)
43 .build()
44 .expect("reqwest for Datadog");
45 Self {
46 client,
47 logs_url,
48 api_key: api_key.to_string(),
49 hostname: build::current_hostname(),
50 }
51 }
52}
53
54impl TelemetryExporter for DatadogExporter {
55 fn name(&self) -> &str {
56 "datadog"
57 }
58
59 fn export(&self, batch: &IngestExportBatch) -> Result<()> {
60 let logs = build::build_log_objects(batch, &self.hostname);
61 if logs.is_empty() {
62 return Ok(());
63 }
64 let chunks = build::chunk_for_dd(logs);
65 transport::post_chunks(&self.client, &self.logs_url, &self.api_key, chunks)
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::build::{chunk_for_dd, dd_log_object};
72 use crate::sync::canonical::expand_ingest_batch;
73 use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
74 use crate::sync::{IngestExportBatch, smart::OutboundToolSpan, smart::ToolSpansBatchBody};
75
76 fn one_event_batch() -> IngestExportBatch {
77 IngestExportBatch::Events(EventsBatchBody {
78 team_id: "t".into(),
79 workspace_hash: "wh".into(),
80 project_name: Some("kaizen".into()),
81 events: vec![OutboundEvent {
82 session_id_hash: "sid".into(),
83 event_seq: 0,
84 ts_ms: 1_700_000_000_000,
85 agent: "cursor".into(),
86 model: "gpt-5".into(),
87 kind: "message".into(),
88 source: "hook".into(),
89 tool: Some("Edit".into()),
90 tool_call_id: None,
91 tokens_in: Some(120),
92 tokens_out: Some(30),
93 reasoning_tokens: None,
94 cost_usd_e6: Some(900),
95 payload: serde_json::json!({}),
96 }],
97 })
98 }
99
100 #[test]
101 fn dd_log_object_has_timestamp_and_hostname_top_level() {
102 let b = one_event_batch();
103 let items = expand_ingest_batch(&b);
104 let v = dd_log_object(&items[0], "host-1");
105 assert_eq!(v["timestamp"], serde_json::json!(1_700_000_000_000_i64));
106 assert_eq!(v["hostname"], serde_json::json!("host-1"));
107 assert_eq!(v["agent"], serde_json::json!("cursor"));
108 assert_eq!(v["model"], serde_json::json!("gpt-5"));
109 assert_eq!(v["project_name"], serde_json::json!("kaizen"));
110 assert_eq!(v["tokens_in"], serde_json::json!(120));
111 let tags = v["ddtags"].as_str().unwrap();
112 assert!(tags.contains("agent:cursor"));
113 assert!(tags.contains("model:gpt-5"));
114 assert!(tags.contains("project_name:kaizen"));
115 assert!(tags.contains("kaizen.type:kaizen.event"));
116 assert!(v["kaizen"].is_object());
118 assert!(v["message"].is_string());
119 }
120
121 #[test]
122 fn dd_log_object_handles_tool_span_timestamp_fallback() {
123 let b = IngestExportBatch::ToolSpans(ToolSpansBatchBody {
124 team_id: "t".into(),
125 workspace_hash: "wh".into(),
126 project_name: Some("kaizen".into()),
127 spans: vec![OutboundToolSpan {
128 session_id_hash: "sid".into(),
129 span_id_hash: "ph".into(),
130 tool: Some("Read".into()),
131 status: "ok".into(),
132 started_at_ms: None,
133 ended_at_ms: Some(42),
134 lead_time_ms: Some(40),
135 tokens_in: Some(10),
136 tokens_out: Some(4),
137 reasoning_tokens: Some(2),
138 cost_usd_e6: Some(25),
139 path_hashes: vec![],
140 }],
141 });
142 let items = expand_ingest_batch(&b);
143 let v = dd_log_object(&items[0], "h");
144 assert_eq!(v["timestamp"], serde_json::json!(42_i64));
145 assert_eq!(v["status"], serde_json::json!("ok"));
146 assert_eq!(v["lead_time_ms"], serde_json::json!(40));
147 assert_eq!(v["tokens_in"], serde_json::json!(10));
148 assert_eq!(v["tokens_out"], serde_json::json!(4));
149 assert_eq!(v["reasoning_tokens"], serde_json::json!(2));
150 assert_eq!(v["cost_usd_e6"], serde_json::json!(25));
151 assert_eq!(v["project_name"], serde_json::json!("kaizen"));
152 }
153
154 #[test]
155 fn chunk_for_dd_respects_item_cap() {
156 let logs: Vec<_> = (0..2_500).map(|i| serde_json::json!({"i": i})).collect();
157 let chunks = chunk_for_dd(logs);
158 assert_eq!(chunks.len(), 3);
159 assert!(chunks.iter().all(|c| c.len() <= 1000));
160 assert_eq!(chunks.iter().map(|c| c.len()).sum::<usize>(), 2_500);
161 }
162
163 #[test]
164 fn chunk_for_dd_respects_byte_cap() {
165 let big = "x".repeat(64 * 1024);
167 let logs: Vec<_> = (0..100).map(|_| serde_json::json!({"s": big})).collect();
168 let chunks = chunk_for_dd(logs);
169 assert!(chunks.len() >= 2);
170 for c in &chunks {
171 let bytes = serde_json::to_vec(c).unwrap().len();
172 assert!(
173 bytes <= super::build::MAX_BYTES_PER_CHUNK + 64 * 1024,
174 "chunk too big: {bytes} bytes"
175 );
176 }
177 }
178}