1#![allow(
8 clippy::cast_possible_truncation,
9 clippy::cast_sign_loss,
10 clippy::too_many_lines
11)]
12
13use crate::event::{
14 CiEvent, CommandEvent, CuenvEvent, EventCategory, EventSource, InteractiveEvent, OutputEvent,
15 Stream, SystemEvent, TaskEvent,
16};
17use crate::metadata::correlation_id;
18use crate::redaction::redact;
19use tokio::sync::mpsc;
20use tracing::Subscriber;
21use tracing::field::{Field, Visit};
22use tracing_subscriber::Layer;
23use tracing_subscriber::layer::Context;
24use tracing_subscriber::registry::LookupSpan;
25
26pub struct CuenvEventLayer {
31 sender: mpsc::UnboundedSender<CuenvEvent>,
32}
33
34impl CuenvEventLayer {
35 #[must_use]
37 pub const fn new(sender: mpsc::UnboundedSender<CuenvEvent>) -> Self {
38 Self { sender }
39 }
40}
41
42impl<S> Layer<S> for CuenvEventLayer
43where
44 S: Subscriber + for<'a> LookupSpan<'a>,
45{
46 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
47 let meta = event.metadata();
48 let target = meta.target();
49
50 if !target.starts_with("cuenv") {
52 return;
53 }
54
55 let mut visitor = CuenvEventVisitor::new(target);
57 event.record(&mut visitor);
58
59 if let Some(cuenv_event) = visitor.build() {
61 let _ = self.sender.send(cuenv_event);
62 }
63 }
64}
65
66struct CuenvEventVisitor {
68 target: String,
69 event_type: Option<String>,
70
71 task_name: Option<String>,
73 command: Option<String>,
74 hermetic: Option<bool>,
75 cache_key: Option<String>,
76 stream: Option<Stream>,
77 content: Option<String>,
78 success: Option<bool>,
79 exit_code: Option<i32>,
80 duration_ms: Option<u64>,
81 sequential: Option<bool>,
82 task_count: Option<usize>,
83
84 provider: Option<String>,
86 event_type_ci: Option<String>,
87 ref_name: Option<String>,
88 count: Option<usize>,
89 path: Option<String>,
90 project: Option<String>,
91 task: Option<String>,
92 reason: Option<String>,
93 error: Option<String>,
94
95 args: Option<Vec<String>>,
97 progress: Option<f32>,
98 message: Option<String>,
99
100 prompt_id: Option<String>,
102 options: Option<Vec<String>>,
103 response: Option<String>,
104 elapsed_secs: Option<u64>,
105
106 tag: Option<String>,
108}
109
110impl CuenvEventVisitor {
111 fn new(target: &str) -> Self {
112 Self {
113 target: target.to_string(),
114 event_type: None,
115 task_name: None,
116 command: None,
117 hermetic: None,
118 cache_key: None,
119 stream: None,
120 content: None,
121 success: None,
122 exit_code: None,
123 duration_ms: None,
124 sequential: None,
125 task_count: None,
126 provider: None,
127 event_type_ci: None,
128 ref_name: None,
129 count: None,
130 path: None,
131 project: None,
132 task: None,
133 reason: None,
134 error: None,
135 args: None,
136 progress: None,
137 message: None,
138 prompt_id: None,
139 options: None,
140 response: None,
141 elapsed_secs: None,
142 tag: None,
143 }
144 }
145
146 #[allow(clippy::cognitive_complexity)]
147 fn build(self) -> Option<CuenvEvent> {
148 let event_type = self.event_type.as_deref()?;
149 let source = EventSource::new(&self.target);
150 let correlation = correlation_id();
151
152 let content = self.content.map(|c| redact(&c));
154 let message = self.message.map(|m| redact(&m));
155 let error = self.error.map(|e| redact(&e));
156
157 let category = match event_type {
158 "task.started" => EventCategory::Task(TaskEvent::Started {
160 name: self.task_name?,
161 command: self.command?,
162 hermetic: self.hermetic.unwrap_or(false),
163 }),
164 "task.cache_hit" => EventCategory::Task(TaskEvent::CacheHit {
165 name: self.task_name?,
166 cache_key: self.cache_key?,
167 }),
168 "task.cache_miss" => EventCategory::Task(TaskEvent::CacheMiss {
169 name: self.task_name?,
170 }),
171 "task.output" => EventCategory::Task(TaskEvent::Output {
172 name: self.task_name?,
173 stream: self.stream.unwrap_or(Stream::Stdout),
174 content: content?,
175 }),
176 "task.completed" => EventCategory::Task(TaskEvent::Completed {
177 name: self.task_name?,
178 success: self.success?,
179 exit_code: self.exit_code,
180 duration_ms: self.duration_ms.unwrap_or(0),
181 }),
182 "task.group_started" => EventCategory::Task(TaskEvent::GroupStarted {
183 name: self.task_name?,
184 sequential: self.sequential.unwrap_or(false),
185 task_count: self.task_count.unwrap_or(0),
186 }),
187 "task.group_completed" => EventCategory::Task(TaskEvent::GroupCompleted {
188 name: self.task_name?,
189 success: self.success?,
190 duration_ms: self.duration_ms.unwrap_or(0),
191 }),
192
193 "ci.context_detected" => EventCategory::Ci(CiEvent::ContextDetected {
195 provider: self.provider?,
196 event_type: self.event_type_ci?,
197 ref_name: self.ref_name?,
198 }),
199 "ci.changed_files" => {
200 EventCategory::Ci(CiEvent::ChangedFilesFound { count: self.count? })
201 }
202 "ci.projects_discovered" => {
203 EventCategory::Ci(CiEvent::ProjectsDiscovered { count: self.count? })
204 }
205 "ci.project_skipped" => EventCategory::Ci(CiEvent::ProjectSkipped {
206 path: self.path?,
207 reason: self.reason?,
208 }),
209 "ci.task_executing" => EventCategory::Ci(CiEvent::TaskExecuting {
210 project: self.project?,
211 task: self.task?,
212 }),
213 "ci.task_result" => EventCategory::Ci(CiEvent::TaskResult {
214 project: self.project?,
215 task: self.task?,
216 success: self.success?,
217 error,
218 }),
219 "ci.report_generated" => {
220 EventCategory::Ci(CiEvent::ReportGenerated { path: self.path? })
221 }
222
223 "command.started" => EventCategory::Command(CommandEvent::Started {
225 command: self.command?,
226 args: self.args.unwrap_or_default(),
227 }),
228 "command.progress" => EventCategory::Command(CommandEvent::Progress {
229 command: self.command?,
230 progress: self.progress?,
231 message: message?,
232 }),
233 "command.completed" => EventCategory::Command(CommandEvent::Completed {
234 command: self.command?,
235 success: self.success?,
236 duration_ms: self.duration_ms.unwrap_or(0),
237 }),
238
239 "interactive.prompt_requested" => {
241 EventCategory::Interactive(InteractiveEvent::PromptRequested {
242 prompt_id: self.prompt_id?,
243 message: message?,
244 options: self.options.unwrap_or_default(),
245 })
246 }
247 "interactive.prompt_resolved" => {
248 EventCategory::Interactive(InteractiveEvent::PromptResolved {
249 prompt_id: self.prompt_id?,
250 response: self.response?,
251 })
252 }
253 "interactive.wait_progress" => {
254 EventCategory::Interactive(InteractiveEvent::WaitProgress {
255 target: self.task_name.or(self.path)?,
256 elapsed_secs: self.elapsed_secs?,
257 })
258 }
259
260 "system.supervisor_log" => EventCategory::System(SystemEvent::SupervisorLog {
262 tag: self.tag?,
263 message: message?,
264 }),
265 "system.shutdown" => EventCategory::System(SystemEvent::Shutdown),
266
267 "output.stdout" => EventCategory::Output(OutputEvent::Stdout { content: content? }),
269 "output.stderr" => EventCategory::Output(OutputEvent::Stderr { content: content? }),
270
271 _ => return None,
272 };
273
274 Some(CuenvEvent::new(correlation, source, category))
275 }
276}
277
278impl Visit for CuenvEventVisitor {
279 fn record_str(&mut self, field: &Field, value: &str) {
280 match field.name() {
281 "event_type" => self.event_type = Some(value.to_string()),
282 "task_name" | "name" => self.task_name = Some(value.to_string()),
283 "command" | "cmd" => self.command = Some(value.to_string()),
284 "cache_key" => self.cache_key = Some(value.to_string()),
285 "content" => self.content = Some(value.to_string()),
286 "provider" => self.provider = Some(value.to_string()),
287 "ci_event_type" => self.event_type_ci = Some(value.to_string()),
288 "ref_name" => self.ref_name = Some(value.to_string()),
289 "path" => self.path = Some(value.to_string()),
290 "project" => self.project = Some(value.to_string()),
291 "task" => self.task = Some(value.to_string()),
292 "reason" => self.reason = Some(value.to_string()),
293 "error" => self.error = Some(value.to_string()),
294 "message" => self.message = Some(value.to_string()),
295 "prompt_id" => self.prompt_id = Some(value.to_string()),
296 "response" => self.response = Some(value.to_string()),
297 "tag" => self.tag = Some(value.to_string()),
298 "stream" => {
299 self.stream = match value {
300 "stdout" => Some(Stream::Stdout),
301 "stderr" => Some(Stream::Stderr),
302 _ => None,
303 };
304 }
305 _ => {}
306 }
307 }
308
309 fn record_i64(&mut self, field: &Field, value: i64) {
310 match field.name() {
311 "exit_code" => self.exit_code = Some(value as i32),
312 "duration_ms" => self.duration_ms = Some(value as u64),
313 "count" => self.count = Some(value as usize),
314 "task_count" => self.task_count = Some(value as usize),
315 "elapsed_secs" => self.elapsed_secs = Some(value as u64),
316 _ => {}
317 }
318 }
319
320 fn record_u64(&mut self, field: &Field, value: u64) {
321 match field.name() {
322 "duration_ms" => self.duration_ms = Some(value),
323 "count" => self.count = Some(value as usize),
324 "task_count" => self.task_count = Some(value as usize),
325 "elapsed_secs" => self.elapsed_secs = Some(value),
326 _ => {}
327 }
328 }
329
330 fn record_f64(&mut self, field: &Field, value: f64) {
331 if field.name() == "progress" {
332 self.progress = Some(value as f32);
333 }
334 }
335
336 fn record_bool(&mut self, field: &Field, value: bool) {
337 match field.name() {
338 "hermetic" => self.hermetic = Some(value),
339 "success" => self.success = Some(value),
340 "sequential" => self.sequential = Some(value),
341 _ => {}
342 }
343 }
344
345 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
346 let value_str = format!("{value:?}");
348 match field.name() {
349 "args" => {
350 if let Ok(args) = serde_json::from_str::<Vec<String>>(&value_str) {
352 self.args = Some(args);
353 }
354 }
355 "options" => {
356 if let Ok(options) = serde_json::from_str::<Vec<String>>(&value_str) {
357 self.options = Some(options);
358 }
359 }
360 "event_type" | "task_name" | "name" | "command" | "cmd" | "content" | "cache_key"
364 | "stream" => {
365 let cleaned = value_str.trim_matches('"');
367 match field.name() {
368 "event_type" => self.event_type = Some(cleaned.to_string()),
369 "task_name" | "name" => self.task_name = Some(cleaned.to_string()),
370 "command" | "cmd" => self.command = Some(cleaned.to_string()),
371 "content" => self.content = Some(cleaned.to_string()),
372 "cache_key" => self.cache_key = Some(cleaned.to_string()),
373 "stream" => {
374 self.stream = match cleaned {
375 "stdout" => Some(Stream::Stdout),
376 "stderr" => Some(Stream::Stderr),
377 _ => None,
378 };
379 }
380 _ => {}
381 }
382 }
383 "exit_code" => {
386 if let Some(inner) = value_str
388 .strip_prefix("Some(")
389 .and_then(|s| s.strip_suffix(')'))
390 && let Ok(code) = inner.parse::<i32>()
391 {
392 self.exit_code = Some(code);
393 } else if value_str != "None" {
394 if let Ok(code) = value_str.parse::<i32>() {
396 self.exit_code = Some(code);
397 }
398 }
399 }
400 _ => {}
401 }
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use tokio::sync::mpsc;
409 use tracing_subscriber::layer::SubscriberExt;
410
411 #[tokio::test]
412 async fn test_layer_captures_cuenv_events() {
413 let (tx, mut rx) = mpsc::unbounded_channel();
414 let layer = CuenvEventLayer::new(tx);
415
416 let subscriber = tracing_subscriber::registry().with(layer);
417
418 tracing::subscriber::with_default(subscriber, || {
419 tracing::info!(
420 target: "cuenv::output",
421 event_type = "output.stdout",
422 content = "test output",
423 "Test event"
424 );
425 });
426
427 let event = rx.recv().await.unwrap();
428 match event.category {
429 EventCategory::Output(OutputEvent::Stdout { content }) => {
430 assert_eq!(content, "test output");
431 }
432 _ => panic!("Expected stdout output event"),
433 }
434 }
435
436 #[tokio::test]
437 async fn test_layer_ignores_non_cuenv_events() {
438 let (tx, mut rx) = mpsc::unbounded_channel();
439 let layer = CuenvEventLayer::new(tx);
440
441 let subscriber = tracing_subscriber::registry().with(layer);
442
443 tracing::subscriber::with_default(subscriber, || {
444 tracing::info!(
445 target: "other::target",
446 event_type = "output.stdout",
447 content = "should be ignored",
448 "Other event"
449 );
450 });
451
452 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
454 assert!(rx.try_recv().is_err());
455 }
456
457 #[tokio::test]
458 async fn test_layer_captures_task_events() {
459 let (tx, mut rx) = mpsc::unbounded_channel();
460 let layer = CuenvEventLayer::new(tx);
461
462 let subscriber = tracing_subscriber::registry().with(layer);
463
464 tracing::subscriber::with_default(subscriber, || {
465 tracing::info!(
466 target: "cuenv::task",
467 event_type = "task.started",
468 task_name = "build",
469 command = "cargo build",
470 hermetic = true,
471 "Task started"
472 );
473 });
474
475 let event = rx.recv().await.unwrap();
476 match event.category {
477 EventCategory::Task(TaskEvent::Started {
478 name,
479 command,
480 hermetic,
481 }) => {
482 assert_eq!(name, "build");
483 assert_eq!(command, "cargo build");
484 assert!(hermetic);
485 }
486 _ => panic!("Expected task started event"),
487 }
488 }
489}