1#![allow(
8 clippy::cast_possible_truncation,
9 clippy::cast_sign_loss,
10 clippy::too_many_lines
11)]
12
13use crate::event::{
14 CiEvent, CommandEvent, CuenvEvent, EventCategory, EventSource, InteractiveEvent, OutputEvent,
15 Stream, SystemEvent, TaskEvent,
16};
17use crate::metadata::correlation_id;
18use crate::redaction::redact;
19use tokio::sync::mpsc;
20use tracing::Subscriber;
21use tracing::field::{Field, Visit};
22use tracing_subscriber::Layer;
23use tracing_subscriber::layer::Context;
24use tracing_subscriber::registry::LookupSpan;
25
26pub struct CuenvEventLayer {
31 sender: mpsc::UnboundedSender<CuenvEvent>,
32}
33
34impl CuenvEventLayer {
35 #[must_use]
37 pub fn new(sender: mpsc::UnboundedSender<CuenvEvent>) -> Self {
38 Self { sender }
39 }
40}
41
42impl<S> Layer<S> for CuenvEventLayer
43where
44 S: Subscriber + for<'a> LookupSpan<'a>,
45{
46 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
47 let meta = event.metadata();
48 let target = meta.target();
49
50 if !target.starts_with("cuenv") {
52 return;
53 }
54
55 let mut visitor = CuenvEventVisitor::new(target);
57 event.record(&mut visitor);
58
59 if let Some(cuenv_event) = visitor.build() {
61 let _ = self.sender.send(cuenv_event);
62 }
63 }
64}
65
66struct CuenvEventVisitor {
68 target: String,
69 event_type: Option<String>,
70
71 task_name: Option<String>,
73 command: Option<String>,
74 hermetic: Option<bool>,
75 cache_key: Option<String>,
76 stream: Option<Stream>,
77 content: Option<String>,
78 success: Option<bool>,
79 exit_code: Option<i32>,
80 duration_ms: Option<u64>,
81 sequential: Option<bool>,
82 task_count: Option<usize>,
83
84 provider: Option<String>,
86 event_type_ci: Option<String>,
87 ref_name: Option<String>,
88 count: Option<usize>,
89 path: Option<String>,
90 project: Option<String>,
91 task: Option<String>,
92 reason: Option<String>,
93 error: Option<String>,
94
95 args: Option<Vec<String>>,
97 progress: Option<f32>,
98 message: Option<String>,
99
100 prompt_id: Option<String>,
102 options: Option<Vec<String>>,
103 response: Option<String>,
104 elapsed_secs: Option<u64>,
105
106 tag: Option<String>,
108}
109
110impl CuenvEventVisitor {
111 fn new(target: &str) -> Self {
112 Self {
113 target: target.to_string(),
114 event_type: None,
115 task_name: None,
116 command: None,
117 hermetic: None,
118 cache_key: None,
119 stream: None,
120 content: None,
121 success: None,
122 exit_code: None,
123 duration_ms: None,
124 sequential: None,
125 task_count: None,
126 provider: None,
127 event_type_ci: None,
128 ref_name: None,
129 count: None,
130 path: None,
131 project: None,
132 task: None,
133 reason: None,
134 error: None,
135 args: None,
136 progress: None,
137 message: None,
138 prompt_id: None,
139 options: None,
140 response: None,
141 elapsed_secs: None,
142 tag: None,
143 }
144 }
145
146 fn build(self) -> Option<CuenvEvent> {
147 let event_type = self.event_type.as_deref()?;
148 let source = EventSource::new(&self.target);
149 let correlation = correlation_id();
150
151 let content = self.content.map(|c| redact(&c));
153 let message = self.message.map(|m| redact(&m));
154 let error = self.error.map(|e| redact(&e));
155
156 let category = match event_type {
157 "task.started" => EventCategory::Task(TaskEvent::Started {
159 name: self.task_name?,
160 command: self.command?,
161 hermetic: self.hermetic.unwrap_or(false),
162 }),
163 "task.cache_hit" => EventCategory::Task(TaskEvent::CacheHit {
164 name: self.task_name?,
165 cache_key: self.cache_key?,
166 }),
167 "task.cache_miss" => EventCategory::Task(TaskEvent::CacheMiss {
168 name: self.task_name?,
169 }),
170 "task.output" => EventCategory::Task(TaskEvent::Output {
171 name: self.task_name?,
172 stream: self.stream.unwrap_or(Stream::Stdout),
173 content: content?,
174 }),
175 "task.completed" => EventCategory::Task(TaskEvent::Completed {
176 name: self.task_name?,
177 success: self.success?,
178 exit_code: self.exit_code,
179 duration_ms: self.duration_ms.unwrap_or(0),
180 }),
181 "task.group_started" => EventCategory::Task(TaskEvent::GroupStarted {
182 name: self.task_name?,
183 sequential: self.sequential.unwrap_or(false),
184 task_count: self.task_count.unwrap_or(0),
185 }),
186 "task.group_completed" => EventCategory::Task(TaskEvent::GroupCompleted {
187 name: self.task_name?,
188 success: self.success?,
189 duration_ms: self.duration_ms.unwrap_or(0),
190 }),
191
192 "ci.context_detected" => EventCategory::Ci(CiEvent::ContextDetected {
194 provider: self.provider?,
195 event_type: self.event_type_ci?,
196 ref_name: self.ref_name?,
197 }),
198 "ci.changed_files" => {
199 EventCategory::Ci(CiEvent::ChangedFilesFound { count: self.count? })
200 }
201 "ci.projects_discovered" => {
202 EventCategory::Ci(CiEvent::ProjectsDiscovered { count: self.count? })
203 }
204 "ci.project_skipped" => EventCategory::Ci(CiEvent::ProjectSkipped {
205 path: self.path?,
206 reason: self.reason?,
207 }),
208 "ci.task_executing" => EventCategory::Ci(CiEvent::TaskExecuting {
209 project: self.project?,
210 task: self.task?,
211 }),
212 "ci.task_result" => EventCategory::Ci(CiEvent::TaskResult {
213 project: self.project?,
214 task: self.task?,
215 success: self.success?,
216 error,
217 }),
218 "ci.report_generated" => {
219 EventCategory::Ci(CiEvent::ReportGenerated { path: self.path? })
220 }
221
222 "command.started" => EventCategory::Command(CommandEvent::Started {
224 command: self.command?,
225 args: self.args.unwrap_or_default(),
226 }),
227 "command.progress" => EventCategory::Command(CommandEvent::Progress {
228 command: self.command?,
229 progress: self.progress?,
230 message: message.clone()?,
231 }),
232 "command.completed" => EventCategory::Command(CommandEvent::Completed {
233 command: self.command?,
234 success: self.success?,
235 duration_ms: self.duration_ms.unwrap_or(0),
236 }),
237
238 "interactive.prompt_requested" => {
240 EventCategory::Interactive(InteractiveEvent::PromptRequested {
241 prompt_id: self.prompt_id?,
242 message: message.clone()?,
243 options: self.options.unwrap_or_default(),
244 })
245 }
246 "interactive.prompt_resolved" => {
247 EventCategory::Interactive(InteractiveEvent::PromptResolved {
248 prompt_id: self.prompt_id?,
249 response: self.response?,
250 })
251 }
252 "interactive.wait_progress" => {
253 EventCategory::Interactive(InteractiveEvent::WaitProgress {
254 target: self.task_name.or(self.path)?,
255 elapsed_secs: self.elapsed_secs?,
256 })
257 }
258
259 "system.supervisor_log" => EventCategory::System(SystemEvent::SupervisorLog {
261 tag: self.tag?,
262 message: message?,
263 }),
264 "system.shutdown" => EventCategory::System(SystemEvent::Shutdown),
265
266 "output.stdout" => EventCategory::Output(OutputEvent::Stdout {
268 content: content.clone()?,
269 }),
270 "output.stderr" => EventCategory::Output(OutputEvent::Stderr { content: content? }),
271
272 _ => return None,
273 };
274
275 Some(CuenvEvent::new(correlation, source, category))
276 }
277}
278
279impl Visit for CuenvEventVisitor {
280 fn record_str(&mut self, field: &Field, value: &str) {
281 match field.name() {
282 "event_type" => self.event_type = Some(value.to_string()),
283 "task_name" | "name" => self.task_name = Some(value.to_string()),
284 "command" | "cmd" => self.command = Some(value.to_string()),
285 "cache_key" => self.cache_key = Some(value.to_string()),
286 "content" => self.content = Some(value.to_string()),
287 "provider" => self.provider = Some(value.to_string()),
288 "ci_event_type" => self.event_type_ci = Some(value.to_string()),
289 "ref_name" => self.ref_name = Some(value.to_string()),
290 "path" => self.path = Some(value.to_string()),
291 "project" => self.project = Some(value.to_string()),
292 "task" => self.task = Some(value.to_string()),
293 "reason" => self.reason = Some(value.to_string()),
294 "error" => self.error = Some(value.to_string()),
295 "message" => self.message = Some(value.to_string()),
296 "prompt_id" => self.prompt_id = Some(value.to_string()),
297 "response" => self.response = Some(value.to_string()),
298 "tag" => self.tag = Some(value.to_string()),
299 "stream" => {
300 self.stream = match value {
301 "stdout" => Some(Stream::Stdout),
302 "stderr" => Some(Stream::Stderr),
303 _ => None,
304 };
305 }
306 _ => {}
307 }
308 }
309
310 fn record_i64(&mut self, field: &Field, value: i64) {
311 match field.name() {
312 "exit_code" => self.exit_code = Some(value as i32),
313 "duration_ms" => self.duration_ms = Some(value as u64),
314 "count" => self.count = Some(value as usize),
315 "task_count" => self.task_count = Some(value as usize),
316 "elapsed_secs" => self.elapsed_secs = Some(value as u64),
317 _ => {}
318 }
319 }
320
321 fn record_u64(&mut self, field: &Field, value: u64) {
322 match field.name() {
323 "duration_ms" => self.duration_ms = Some(value),
324 "count" => self.count = Some(value as usize),
325 "task_count" => self.task_count = Some(value as usize),
326 "elapsed_secs" => self.elapsed_secs = Some(value),
327 _ => {}
328 }
329 }
330
331 fn record_f64(&mut self, field: &Field, value: f64) {
332 if field.name() == "progress" {
333 self.progress = Some(value as f32);
334 }
335 }
336
337 fn record_bool(&mut self, field: &Field, value: bool) {
338 match field.name() {
339 "hermetic" => self.hermetic = Some(value),
340 "success" => self.success = Some(value),
341 "sequential" => self.sequential = Some(value),
342 _ => {}
343 }
344 }
345
346 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
347 let value_str = format!("{value:?}");
349 match field.name() {
350 "args" => {
351 if let Ok(args) = serde_json::from_str::<Vec<String>>(&value_str) {
353 self.args = Some(args);
354 }
355 }
356 "options" => {
357 if let Ok(options) = serde_json::from_str::<Vec<String>>(&value_str) {
358 self.options = Some(options);
359 }
360 }
361 "event_type" | "task_name" | "name" | "command" | "cmd" | "content" | "cache_key"
365 | "stream" => {
366 let cleaned = value_str.trim_matches('"');
368 match field.name() {
369 "event_type" => self.event_type = Some(cleaned.to_string()),
370 "task_name" | "name" => self.task_name = Some(cleaned.to_string()),
371 "command" | "cmd" => self.command = Some(cleaned.to_string()),
372 "content" => self.content = Some(cleaned.to_string()),
373 "cache_key" => self.cache_key = Some(cleaned.to_string()),
374 "stream" => {
375 self.stream = match cleaned {
376 "stdout" => Some(Stream::Stdout),
377 "stderr" => Some(Stream::Stderr),
378 _ => None,
379 };
380 }
381 _ => {}
382 }
383 }
384 "exit_code" => {
387 if let Some(inner) = value_str
389 .strip_prefix("Some(")
390 .and_then(|s| s.strip_suffix(')'))
391 && let Ok(code) = inner.parse::<i32>()
392 {
393 self.exit_code = Some(code);
394 } else if value_str != "None" {
395 if let Ok(code) = value_str.parse::<i32>() {
397 self.exit_code = Some(code);
398 }
399 }
400 }
401 _ => {}
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use tokio::sync::mpsc;
410 use tracing_subscriber::layer::SubscriberExt;
411
412 #[tokio::test]
413 async fn test_layer_captures_cuenv_events() {
414 let (tx, mut rx) = mpsc::unbounded_channel();
415 let layer = CuenvEventLayer::new(tx);
416
417 let subscriber = tracing_subscriber::registry().with(layer);
418
419 tracing::subscriber::with_default(subscriber, || {
420 tracing::info!(
421 target: "cuenv::output",
422 event_type = "output.stdout",
423 content = "test output",
424 "Test event"
425 );
426 });
427
428 let event = rx.recv().await.unwrap();
429 match event.category {
430 EventCategory::Output(OutputEvent::Stdout { content }) => {
431 assert_eq!(content, "test output");
432 }
433 _ => panic!("Expected stdout output event"),
434 }
435 }
436
437 #[tokio::test]
438 async fn test_layer_ignores_non_cuenv_events() {
439 let (tx, mut rx) = mpsc::unbounded_channel();
440 let layer = CuenvEventLayer::new(tx);
441
442 let subscriber = tracing_subscriber::registry().with(layer);
443
444 tracing::subscriber::with_default(subscriber, || {
445 tracing::info!(
446 target: "other::target",
447 event_type = "output.stdout",
448 content = "should be ignored",
449 "Other event"
450 );
451 });
452
453 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
455 assert!(rx.try_recv().is_err());
456 }
457
458 #[tokio::test]
459 async fn test_layer_captures_task_events() {
460 let (tx, mut rx) = mpsc::unbounded_channel();
461 let layer = CuenvEventLayer::new(tx);
462
463 let subscriber = tracing_subscriber::registry().with(layer);
464
465 tracing::subscriber::with_default(subscriber, || {
466 tracing::info!(
467 target: "cuenv::task",
468 event_type = "task.started",
469 task_name = "build",
470 command = "cargo build",
471 hermetic = true,
472 "Task started"
473 );
474 });
475
476 let event = rx.recv().await.unwrap();
477 match event.category {
478 EventCategory::Task(TaskEvent::Started {
479 name,
480 command,
481 hermetic,
482 }) => {
483 assert_eq!(name, "build");
484 assert_eq!(command, "cargo build");
485 assert!(hermetic);
486 }
487 _ => panic!("Expected task started event"),
488 }
489 }
490}