cuenv_events/
layer.rs

1//! Custom tracing Layer for capturing cuenv events.
2//!
3//! This layer intercepts tracing events with specific targets and fields,
4//! converts them to `CuenvEvent` instances, and sends them to the `EventBus`.
5
6// These casts are intentional for tracing field extraction - values come from trusted sources
7#![allow(
8    clippy::cast_possible_truncation,
9    clippy::cast_sign_loss,
10    clippy::too_many_lines
11)]
12
13use crate::event::{
14    CiEvent, CommandEvent, CuenvEvent, EventCategory, EventSource, InteractiveEvent, OutputEvent,
15    Stream, SystemEvent, TaskEvent,
16};
17use crate::metadata::correlation_id;
18use crate::redaction::redact;
19use tokio::sync::mpsc;
20use tracing::Subscriber;
21use tracing::field::{Field, Visit};
22use tracing_subscriber::Layer;
23use tracing_subscriber::layer::Context;
24use tracing_subscriber::registry::LookupSpan;
25
26/// A tracing Layer that captures cuenv-specific events.
27///
28/// Events are identified by their `target` (must start with "cuenv")
29/// and an `event_type` field that specifies the event category.
30pub struct CuenvEventLayer {
31    sender: mpsc::UnboundedSender<CuenvEvent>,
32}
33
34impl CuenvEventLayer {
35    /// Create a new layer that sends events to the given channel.
36    #[must_use]
37    pub fn new(sender: mpsc::UnboundedSender<CuenvEvent>) -> Self {
38        Self { sender }
39    }
40}
41
42impl<S> Layer<S> for CuenvEventLayer
43where
44    S: Subscriber + for<'a> LookupSpan<'a>,
45{
46    fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
47        let meta = event.metadata();
48        let target = meta.target();
49
50        // Only capture events with cuenv target
51        if !target.starts_with("cuenv") {
52            return;
53        }
54
55        // Extract fields using visitor pattern
56        let mut visitor = CuenvEventVisitor::new(target);
57        event.record(&mut visitor);
58
59        // Build and send event if it has required fields
60        if let Some(cuenv_event) = visitor.build() {
61            let _ = self.sender.send(cuenv_event);
62        }
63    }
64}
65
66/// Visitor for extracting typed fields from tracing events.
67struct CuenvEventVisitor {
68    target: String,
69    event_type: Option<String>,
70
71    // Task event fields
72    task_name: Option<String>,
73    command: Option<String>,
74    hermetic: Option<bool>,
75    cache_key: Option<String>,
76    stream: Option<Stream>,
77    content: Option<String>,
78    success: Option<bool>,
79    exit_code: Option<i32>,
80    duration_ms: Option<u64>,
81    sequential: Option<bool>,
82    task_count: Option<usize>,
83
84    // CI event fields
85    provider: Option<String>,
86    event_type_ci: Option<String>,
87    ref_name: Option<String>,
88    count: Option<usize>,
89    path: Option<String>,
90    project: Option<String>,
91    task: Option<String>,
92    reason: Option<String>,
93    error: Option<String>,
94
95    // Command event fields
96    args: Option<Vec<String>>,
97    progress: Option<f32>,
98    message: Option<String>,
99
100    // Interactive event fields
101    prompt_id: Option<String>,
102    options: Option<Vec<String>>,
103    response: Option<String>,
104    elapsed_secs: Option<u64>,
105
106    // System event fields
107    tag: Option<String>,
108}
109
110impl CuenvEventVisitor {
111    fn new(target: &str) -> Self {
112        Self {
113            target: target.to_string(),
114            event_type: None,
115            task_name: None,
116            command: None,
117            hermetic: None,
118            cache_key: None,
119            stream: None,
120            content: None,
121            success: None,
122            exit_code: None,
123            duration_ms: None,
124            sequential: None,
125            task_count: None,
126            provider: None,
127            event_type_ci: None,
128            ref_name: None,
129            count: None,
130            path: None,
131            project: None,
132            task: None,
133            reason: None,
134            error: None,
135            args: None,
136            progress: None,
137            message: None,
138            prompt_id: None,
139            options: None,
140            response: None,
141            elapsed_secs: None,
142            tag: None,
143        }
144    }
145
146    fn build(self) -> Option<CuenvEvent> {
147        let event_type = self.event_type.as_deref()?;
148        let source = EventSource::new(&self.target);
149        let correlation = correlation_id();
150
151        // Apply secret redaction to content fields before building events
152        let content = self.content.map(|c| redact(&c));
153        let message = self.message.map(|m| redact(&m));
154        let error = self.error.map(|e| redact(&e));
155
156        let category = match event_type {
157            // Task events
158            "task.started" => EventCategory::Task(TaskEvent::Started {
159                name: self.task_name?,
160                command: self.command?,
161                hermetic: self.hermetic.unwrap_or(false),
162            }),
163            "task.cache_hit" => EventCategory::Task(TaskEvent::CacheHit {
164                name: self.task_name?,
165                cache_key: self.cache_key?,
166            }),
167            "task.cache_miss" => EventCategory::Task(TaskEvent::CacheMiss {
168                name: self.task_name?,
169            }),
170            "task.output" => EventCategory::Task(TaskEvent::Output {
171                name: self.task_name?,
172                stream: self.stream.unwrap_or(Stream::Stdout),
173                content: content?,
174            }),
175            "task.completed" => EventCategory::Task(TaskEvent::Completed {
176                name: self.task_name?,
177                success: self.success?,
178                exit_code: self.exit_code,
179                duration_ms: self.duration_ms.unwrap_or(0),
180            }),
181            "task.group_started" => EventCategory::Task(TaskEvent::GroupStarted {
182                name: self.task_name?,
183                sequential: self.sequential.unwrap_or(false),
184                task_count: self.task_count.unwrap_or(0),
185            }),
186            "task.group_completed" => EventCategory::Task(TaskEvent::GroupCompleted {
187                name: self.task_name?,
188                success: self.success?,
189                duration_ms: self.duration_ms.unwrap_or(0),
190            }),
191
192            // CI events
193            "ci.context_detected" => EventCategory::Ci(CiEvent::ContextDetected {
194                provider: self.provider?,
195                event_type: self.event_type_ci?,
196                ref_name: self.ref_name?,
197            }),
198            "ci.changed_files" => {
199                EventCategory::Ci(CiEvent::ChangedFilesFound { count: self.count? })
200            }
201            "ci.projects_discovered" => {
202                EventCategory::Ci(CiEvent::ProjectsDiscovered { count: self.count? })
203            }
204            "ci.project_skipped" => EventCategory::Ci(CiEvent::ProjectSkipped {
205                path: self.path?,
206                reason: self.reason?,
207            }),
208            "ci.task_executing" => EventCategory::Ci(CiEvent::TaskExecuting {
209                project: self.project?,
210                task: self.task?,
211            }),
212            "ci.task_result" => EventCategory::Ci(CiEvent::TaskResult {
213                project: self.project?,
214                task: self.task?,
215                success: self.success?,
216                error,
217            }),
218            "ci.report_generated" => {
219                EventCategory::Ci(CiEvent::ReportGenerated { path: self.path? })
220            }
221
222            // Command events
223            "command.started" => EventCategory::Command(CommandEvent::Started {
224                command: self.command?,
225                args: self.args.unwrap_or_default(),
226            }),
227            "command.progress" => EventCategory::Command(CommandEvent::Progress {
228                command: self.command?,
229                progress: self.progress?,
230                message: message.clone()?,
231            }),
232            "command.completed" => EventCategory::Command(CommandEvent::Completed {
233                command: self.command?,
234                success: self.success?,
235                duration_ms: self.duration_ms.unwrap_or(0),
236            }),
237
238            // Interactive events
239            "interactive.prompt_requested" => {
240                EventCategory::Interactive(InteractiveEvent::PromptRequested {
241                    prompt_id: self.prompt_id?,
242                    message: message.clone()?,
243                    options: self.options.unwrap_or_default(),
244                })
245            }
246            "interactive.prompt_resolved" => {
247                EventCategory::Interactive(InteractiveEvent::PromptResolved {
248                    prompt_id: self.prompt_id?,
249                    response: self.response?,
250                })
251            }
252            "interactive.wait_progress" => {
253                EventCategory::Interactive(InteractiveEvent::WaitProgress {
254                    target: self.task_name.or(self.path)?,
255                    elapsed_secs: self.elapsed_secs?,
256                })
257            }
258
259            // System events
260            "system.supervisor_log" => EventCategory::System(SystemEvent::SupervisorLog {
261                tag: self.tag?,
262                message: message?,
263            }),
264            "system.shutdown" => EventCategory::System(SystemEvent::Shutdown),
265
266            // Output events
267            "output.stdout" => EventCategory::Output(OutputEvent::Stdout {
268                content: content.clone()?,
269            }),
270            "output.stderr" => EventCategory::Output(OutputEvent::Stderr { content: content? }),
271
272            _ => return None,
273        };
274
275        Some(CuenvEvent::new(correlation, source, category))
276    }
277}
278
279impl Visit for CuenvEventVisitor {
280    fn record_str(&mut self, field: &Field, value: &str) {
281        match field.name() {
282            "event_type" => self.event_type = Some(value.to_string()),
283            "task_name" | "name" => self.task_name = Some(value.to_string()),
284            "command" | "cmd" => self.command = Some(value.to_string()),
285            "cache_key" => self.cache_key = Some(value.to_string()),
286            "content" => self.content = Some(value.to_string()),
287            "provider" => self.provider = Some(value.to_string()),
288            "ci_event_type" => self.event_type_ci = Some(value.to_string()),
289            "ref_name" => self.ref_name = Some(value.to_string()),
290            "path" => self.path = Some(value.to_string()),
291            "project" => self.project = Some(value.to_string()),
292            "task" => self.task = Some(value.to_string()),
293            "reason" => self.reason = Some(value.to_string()),
294            "error" => self.error = Some(value.to_string()),
295            "message" => self.message = Some(value.to_string()),
296            "prompt_id" => self.prompt_id = Some(value.to_string()),
297            "response" => self.response = Some(value.to_string()),
298            "tag" => self.tag = Some(value.to_string()),
299            "stream" => {
300                self.stream = match value {
301                    "stdout" => Some(Stream::Stdout),
302                    "stderr" => Some(Stream::Stderr),
303                    _ => None,
304                };
305            }
306            _ => {}
307        }
308    }
309
310    fn record_i64(&mut self, field: &Field, value: i64) {
311        match field.name() {
312            "exit_code" => self.exit_code = Some(value as i32),
313            "duration_ms" => self.duration_ms = Some(value as u64),
314            "count" => self.count = Some(value as usize),
315            "task_count" => self.task_count = Some(value as usize),
316            "elapsed_secs" => self.elapsed_secs = Some(value as u64),
317            _ => {}
318        }
319    }
320
321    fn record_u64(&mut self, field: &Field, value: u64) {
322        match field.name() {
323            "duration_ms" => self.duration_ms = Some(value),
324            "count" => self.count = Some(value as usize),
325            "task_count" => self.task_count = Some(value as usize),
326            "elapsed_secs" => self.elapsed_secs = Some(value),
327            _ => {}
328        }
329    }
330
331    fn record_f64(&mut self, field: &Field, value: f64) {
332        if field.name() == "progress" {
333            self.progress = Some(value as f32);
334        }
335    }
336
337    fn record_bool(&mut self, field: &Field, value: bool) {
338        match field.name() {
339            "hermetic" => self.hermetic = Some(value),
340            "success" => self.success = Some(value),
341            "sequential" => self.sequential = Some(value),
342            _ => {}
343        }
344    }
345
346    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
347        // Handle debug-formatted fields as strings
348        let value_str = format!("{value:?}");
349        match field.name() {
350            "args" => {
351                // Try to parse as JSON array
352                if let Ok(args) = serde_json::from_str::<Vec<String>>(&value_str) {
353                    self.args = Some(args);
354                }
355            }
356            "options" => {
357                if let Ok(options) = serde_json::from_str::<Vec<String>>(&value_str) {
358                    self.options = Some(options);
359                }
360            }
361            // Fallback: try to extract string fields from debug formatting
362            // When tracing uses Display formatting (%), it wraps values in a DisplayValue
363            // which then gets passed to record_debug instead of record_str
364            "event_type" | "task_name" | "name" | "command" | "cmd" | "content" | "cache_key"
365            | "stream" => {
366                // Remove surrounding quotes if present (debug format adds them for strings)
367                let cleaned = value_str.trim_matches('"');
368                match field.name() {
369                    "event_type" => self.event_type = Some(cleaned.to_string()),
370                    "task_name" | "name" => self.task_name = Some(cleaned.to_string()),
371                    "command" | "cmd" => self.command = Some(cleaned.to_string()),
372                    "content" => self.content = Some(cleaned.to_string()),
373                    "cache_key" => self.cache_key = Some(cleaned.to_string()),
374                    "stream" => {
375                        self.stream = match cleaned {
376                            "stdout" => Some(Stream::Stdout),
377                            "stderr" => Some(Stream::Stderr),
378                            _ => None,
379                        };
380                    }
381                    _ => {}
382                }
383            }
384            // Handle exit_code which uses Debug formatting (?exit_code)
385            // Can be "Some(0)", "None", or just "0" depending on context
386            "exit_code" => {
387                // Try to parse as Option<i32> format first (e.g., "Some(0)")
388                if let Some(inner) = value_str
389                    .strip_prefix("Some(")
390                    .and_then(|s| s.strip_suffix(')'))
391                    && let Ok(code) = inner.parse::<i32>()
392                {
393                    self.exit_code = Some(code);
394                } else if value_str != "None" {
395                    // Try to parse as plain integer
396                    if let Ok(code) = value_str.parse::<i32>() {
397                        self.exit_code = Some(code);
398                    }
399                }
400            }
401            _ => {}
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use tokio::sync::mpsc;
410    use tracing_subscriber::layer::SubscriberExt;
411
412    #[tokio::test]
413    async fn test_layer_captures_cuenv_events() {
414        let (tx, mut rx) = mpsc::unbounded_channel();
415        let layer = CuenvEventLayer::new(tx);
416
417        let subscriber = tracing_subscriber::registry().with(layer);
418
419        tracing::subscriber::with_default(subscriber, || {
420            tracing::info!(
421                target: "cuenv::output",
422                event_type = "output.stdout",
423                content = "test output",
424                "Test event"
425            );
426        });
427
428        let event = rx.recv().await.unwrap();
429        match event.category {
430            EventCategory::Output(OutputEvent::Stdout { content }) => {
431                assert_eq!(content, "test output");
432            }
433            _ => panic!("Expected stdout output event"),
434        }
435    }
436
437    #[tokio::test]
438    async fn test_layer_ignores_non_cuenv_events() {
439        let (tx, mut rx) = mpsc::unbounded_channel();
440        let layer = CuenvEventLayer::new(tx);
441
442        let subscriber = tracing_subscriber::registry().with(layer);
443
444        tracing::subscriber::with_default(subscriber, || {
445            tracing::info!(
446                target: "other::target",
447                event_type = "output.stdout",
448                content = "should be ignored",
449                "Other event"
450            );
451        });
452
453        // Give a moment for any event to be sent
454        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
455        assert!(rx.try_recv().is_err());
456    }
457
458    #[tokio::test]
459    async fn test_layer_captures_task_events() {
460        let (tx, mut rx) = mpsc::unbounded_channel();
461        let layer = CuenvEventLayer::new(tx);
462
463        let subscriber = tracing_subscriber::registry().with(layer);
464
465        tracing::subscriber::with_default(subscriber, || {
466            tracing::info!(
467                target: "cuenv::task",
468                event_type = "task.started",
469                task_name = "build",
470                command = "cargo build",
471                hermetic = true,
472                "Task started"
473            );
474        });
475
476        let event = rx.recv().await.unwrap();
477        match event.category {
478            EventCategory::Task(TaskEvent::Started {
479                name,
480                command,
481                hermetic,
482            }) => {
483                assert_eq!(name, "build");
484                assert_eq!(command, "cargo build");
485                assert!(hermetic);
486            }
487            _ => panic!("Expected task started event"),
488        }
489    }
490}