Skip to main content

cuenv_events/
layer.rs

1//! Custom tracing Layer for capturing cuenv events.
2//!
3//! This layer intercepts tracing events with specific targets and fields,
4//! converts them to `CuenvEvent` instances, and sends them to the `EventBus`.
5
6// These casts are intentional for tracing field extraction - values come from trusted sources
7#![allow(
8    clippy::cast_possible_truncation,
9    clippy::cast_sign_loss,
10    clippy::too_many_lines
11)]
12
13use crate::event::{
14    CiEvent, CommandEvent, CuenvEvent, EventCategory, EventSource, InteractiveEvent, OutputEvent,
15    Stream, SystemEvent, TaskEvent,
16};
17use crate::metadata::correlation_id;
18use crate::redaction::redact;
19use tokio::sync::mpsc;
20use tracing::Subscriber;
21use tracing::field::{Field, Visit};
22use tracing_subscriber::Layer;
23use tracing_subscriber::layer::Context;
24use tracing_subscriber::registry::LookupSpan;
25
26/// A tracing Layer that captures cuenv-specific events.
27///
28/// Events are identified by their `target` (must start with "cuenv")
29/// and an `event_type` field that specifies the event category.
30pub struct CuenvEventLayer {
31    sender: mpsc::UnboundedSender<CuenvEvent>,
32}
33
34impl CuenvEventLayer {
35    /// Create a new layer that sends events to the given channel.
36    #[must_use]
37    pub const fn new(sender: mpsc::UnboundedSender<CuenvEvent>) -> Self {
38        Self { sender }
39    }
40}
41
42impl<S> Layer<S> for CuenvEventLayer
43where
44    S: Subscriber + for<'a> LookupSpan<'a>,
45{
46    fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
47        let meta = event.metadata();
48        let target = meta.target();
49
50        // Only capture events with cuenv target
51        if !target.starts_with("cuenv") {
52            return;
53        }
54
55        // Extract fields using visitor pattern
56        let mut visitor = CuenvEventVisitor::new(target);
57        event.record(&mut visitor);
58
59        // Build and send event if it has required fields
60        if let Some(cuenv_event) = visitor.build() {
61            let _ = self.sender.send(cuenv_event);
62        }
63    }
64}
65
66/// Visitor for extracting typed fields from tracing events.
67struct CuenvEventVisitor {
68    target: String,
69    event_type: Option<String>,
70
71    // Task event fields
72    task_name: Option<String>,
73    command: Option<String>,
74    hermetic: Option<bool>,
75    cache_key: Option<String>,
76    stream: Option<Stream>,
77    content: Option<String>,
78    success: Option<bool>,
79    exit_code: Option<i32>,
80    duration_ms: Option<u64>,
81    sequential: Option<bool>,
82    task_count: Option<usize>,
83
84    // CI event fields
85    provider: Option<String>,
86    event_type_ci: Option<String>,
87    ref_name: Option<String>,
88    count: Option<usize>,
89    path: Option<String>,
90    project: Option<String>,
91    task: Option<String>,
92    reason: Option<String>,
93    error: Option<String>,
94
95    // Command event fields
96    args: Option<Vec<String>>,
97    progress: Option<f32>,
98    message: Option<String>,
99
100    // Interactive event fields
101    prompt_id: Option<String>,
102    options: Option<Vec<String>>,
103    response: Option<String>,
104    elapsed_secs: Option<u64>,
105
106    // System event fields
107    tag: Option<String>,
108}
109
110impl CuenvEventVisitor {
111    fn new(target: &str) -> Self {
112        Self {
113            target: target.to_string(),
114            event_type: None,
115            task_name: None,
116            command: None,
117            hermetic: None,
118            cache_key: None,
119            stream: None,
120            content: None,
121            success: None,
122            exit_code: None,
123            duration_ms: None,
124            sequential: None,
125            task_count: None,
126            provider: None,
127            event_type_ci: None,
128            ref_name: None,
129            count: None,
130            path: None,
131            project: None,
132            task: None,
133            reason: None,
134            error: None,
135            args: None,
136            progress: None,
137            message: None,
138            prompt_id: None,
139            options: None,
140            response: None,
141            elapsed_secs: None,
142            tag: None,
143        }
144    }
145
146    #[allow(clippy::cognitive_complexity)]
147    fn build(self) -> Option<CuenvEvent> {
148        let event_type = self.event_type.as_deref()?;
149        let source = EventSource::new(&self.target);
150        let correlation = correlation_id();
151
152        // Apply secret redaction to content fields before building events
153        let content = self.content.map(|c| redact(&c));
154        let message = self.message.map(|m| redact(&m));
155        let error = self.error.map(|e| redact(&e));
156
157        let category = match event_type {
158            // Task events
159            "task.started" => EventCategory::Task(TaskEvent::Started {
160                name: self.task_name?,
161                command: self.command?,
162                hermetic: self.hermetic.unwrap_or(false),
163            }),
164            "task.cache_hit" => EventCategory::Task(TaskEvent::CacheHit {
165                name: self.task_name?,
166                cache_key: self.cache_key?,
167            }),
168            "task.cache_miss" => EventCategory::Task(TaskEvent::CacheMiss {
169                name: self.task_name?,
170            }),
171            "task.output" => EventCategory::Task(TaskEvent::Output {
172                name: self.task_name?,
173                stream: self.stream.unwrap_or(Stream::Stdout),
174                content: content?,
175            }),
176            "task.completed" => EventCategory::Task(TaskEvent::Completed {
177                name: self.task_name?,
178                success: self.success?,
179                exit_code: self.exit_code,
180                duration_ms: self.duration_ms.unwrap_or(0),
181            }),
182            "task.group_started" => EventCategory::Task(TaskEvent::GroupStarted {
183                name: self.task_name?,
184                sequential: self.sequential.unwrap_or(false),
185                task_count: self.task_count.unwrap_or(0),
186            }),
187            "task.group_completed" => EventCategory::Task(TaskEvent::GroupCompleted {
188                name: self.task_name?,
189                success: self.success?,
190                duration_ms: self.duration_ms.unwrap_or(0),
191            }),
192
193            // CI events
194            "ci.context_detected" => EventCategory::Ci(CiEvent::ContextDetected {
195                provider: self.provider?,
196                event_type: self.event_type_ci?,
197                ref_name: self.ref_name?,
198            }),
199            "ci.changed_files" => {
200                EventCategory::Ci(CiEvent::ChangedFilesFound { count: self.count? })
201            }
202            "ci.projects_discovered" => {
203                EventCategory::Ci(CiEvent::ProjectsDiscovered { count: self.count? })
204            }
205            "ci.project_skipped" => EventCategory::Ci(CiEvent::ProjectSkipped {
206                path: self.path?,
207                reason: self.reason?,
208            }),
209            "ci.task_executing" => EventCategory::Ci(CiEvent::TaskExecuting {
210                project: self.project?,
211                task: self.task?,
212            }),
213            "ci.task_result" => EventCategory::Ci(CiEvent::TaskResult {
214                project: self.project?,
215                task: self.task?,
216                success: self.success?,
217                error,
218            }),
219            "ci.report_generated" => {
220                EventCategory::Ci(CiEvent::ReportGenerated { path: self.path? })
221            }
222
223            // Command events
224            "command.started" => EventCategory::Command(CommandEvent::Started {
225                command: self.command?,
226                args: self.args.unwrap_or_default(),
227            }),
228            "command.progress" => EventCategory::Command(CommandEvent::Progress {
229                command: self.command?,
230                progress: self.progress?,
231                message: message?,
232            }),
233            "command.completed" => EventCategory::Command(CommandEvent::Completed {
234                command: self.command?,
235                success: self.success?,
236                duration_ms: self.duration_ms.unwrap_or(0),
237            }),
238
239            // Interactive events
240            "interactive.prompt_requested" => {
241                EventCategory::Interactive(InteractiveEvent::PromptRequested {
242                    prompt_id: self.prompt_id?,
243                    message: message?,
244                    options: self.options.unwrap_or_default(),
245                })
246            }
247            "interactive.prompt_resolved" => {
248                EventCategory::Interactive(InteractiveEvent::PromptResolved {
249                    prompt_id: self.prompt_id?,
250                    response: self.response?,
251                })
252            }
253            "interactive.wait_progress" => {
254                EventCategory::Interactive(InteractiveEvent::WaitProgress {
255                    target: self.task_name.or(self.path)?,
256                    elapsed_secs: self.elapsed_secs?,
257                })
258            }
259
260            // System events
261            "system.supervisor_log" => EventCategory::System(SystemEvent::SupervisorLog {
262                tag: self.tag?,
263                message: message?,
264            }),
265            "system.shutdown" => EventCategory::System(SystemEvent::Shutdown),
266
267            // Output events
268            "output.stdout" => EventCategory::Output(OutputEvent::Stdout { content: content? }),
269            "output.stderr" => EventCategory::Output(OutputEvent::Stderr { content: content? }),
270
271            _ => return None,
272        };
273
274        Some(CuenvEvent::new(correlation, source, category))
275    }
276}
277
278impl Visit for CuenvEventVisitor {
279    fn record_str(&mut self, field: &Field, value: &str) {
280        match field.name() {
281            "event_type" => self.event_type = Some(value.to_string()),
282            "task_name" | "name" => self.task_name = Some(value.to_string()),
283            "command" | "cmd" => self.command = Some(value.to_string()),
284            "cache_key" => self.cache_key = Some(value.to_string()),
285            "content" => self.content = Some(value.to_string()),
286            "provider" => self.provider = Some(value.to_string()),
287            "ci_event_type" => self.event_type_ci = Some(value.to_string()),
288            "ref_name" => self.ref_name = Some(value.to_string()),
289            "path" => self.path = Some(value.to_string()),
290            "project" => self.project = Some(value.to_string()),
291            "task" => self.task = Some(value.to_string()),
292            "reason" => self.reason = Some(value.to_string()),
293            "error" => self.error = Some(value.to_string()),
294            "message" => self.message = Some(value.to_string()),
295            "prompt_id" => self.prompt_id = Some(value.to_string()),
296            "response" => self.response = Some(value.to_string()),
297            "tag" => self.tag = Some(value.to_string()),
298            "stream" => {
299                self.stream = match value {
300                    "stdout" => Some(Stream::Stdout),
301                    "stderr" => Some(Stream::Stderr),
302                    _ => None,
303                };
304            }
305            _ => {}
306        }
307    }
308
309    fn record_i64(&mut self, field: &Field, value: i64) {
310        match field.name() {
311            "exit_code" => self.exit_code = Some(value as i32),
312            "duration_ms" => self.duration_ms = Some(value as u64),
313            "count" => self.count = Some(value as usize),
314            "task_count" => self.task_count = Some(value as usize),
315            "elapsed_secs" => self.elapsed_secs = Some(value as u64),
316            _ => {}
317        }
318    }
319
320    fn record_u64(&mut self, field: &Field, value: u64) {
321        match field.name() {
322            "duration_ms" => self.duration_ms = Some(value),
323            "count" => self.count = Some(value as usize),
324            "task_count" => self.task_count = Some(value as usize),
325            "elapsed_secs" => self.elapsed_secs = Some(value),
326            _ => {}
327        }
328    }
329
330    fn record_f64(&mut self, field: &Field, value: f64) {
331        if field.name() == "progress" {
332            self.progress = Some(value as f32);
333        }
334    }
335
336    fn record_bool(&mut self, field: &Field, value: bool) {
337        match field.name() {
338            "hermetic" => self.hermetic = Some(value),
339            "success" => self.success = Some(value),
340            "sequential" => self.sequential = Some(value),
341            _ => {}
342        }
343    }
344
345    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
346        // Handle debug-formatted fields as strings
347        let value_str = format!("{value:?}");
348        match field.name() {
349            "args" => {
350                // Try to parse as JSON array
351                if let Ok(args) = serde_json::from_str::<Vec<String>>(&value_str) {
352                    self.args = Some(args);
353                }
354            }
355            "options" => {
356                if let Ok(options) = serde_json::from_str::<Vec<String>>(&value_str) {
357                    self.options = Some(options);
358                }
359            }
360            // Fallback: try to extract string fields from debug formatting
361            // When tracing uses Display formatting (%), it wraps values in a DisplayValue
362            // which then gets passed to record_debug instead of record_str
363            "event_type" | "task_name" | "name" | "command" | "cmd" | "content" | "cache_key"
364            | "stream" => {
365                // Remove surrounding quotes if present (debug format adds them for strings)
366                let cleaned = value_str.trim_matches('"');
367                match field.name() {
368                    "event_type" => self.event_type = Some(cleaned.to_string()),
369                    "task_name" | "name" => self.task_name = Some(cleaned.to_string()),
370                    "command" | "cmd" => self.command = Some(cleaned.to_string()),
371                    "content" => self.content = Some(cleaned.to_string()),
372                    "cache_key" => self.cache_key = Some(cleaned.to_string()),
373                    "stream" => {
374                        self.stream = match cleaned {
375                            "stdout" => Some(Stream::Stdout),
376                            "stderr" => Some(Stream::Stderr),
377                            _ => None,
378                        };
379                    }
380                    _ => {}
381                }
382            }
383            // Handle exit_code which uses Debug formatting (?exit_code)
384            // Can be "Some(0)", "None", or just "0" depending on context
385            "exit_code" => {
386                // Try to parse as Option<i32> format first (e.g., "Some(0)")
387                if let Some(inner) = value_str
388                    .strip_prefix("Some(")
389                    .and_then(|s| s.strip_suffix(')'))
390                    && let Ok(code) = inner.parse::<i32>()
391                {
392                    self.exit_code = Some(code);
393                } else if value_str != "None" {
394                    // Try to parse as plain integer
395                    if let Ok(code) = value_str.parse::<i32>() {
396                        self.exit_code = Some(code);
397                    }
398                }
399            }
400            _ => {}
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use tokio::sync::mpsc;
409    use tracing_subscriber::layer::SubscriberExt;
410
411    #[tokio::test]
412    async fn test_layer_captures_cuenv_events() {
413        let (tx, mut rx) = mpsc::unbounded_channel();
414        let layer = CuenvEventLayer::new(tx);
415
416        let subscriber = tracing_subscriber::registry().with(layer);
417
418        tracing::subscriber::with_default(subscriber, || {
419            tracing::info!(
420                target: "cuenv::output",
421                event_type = "output.stdout",
422                content = "test output",
423                "Test event"
424            );
425        });
426
427        let event = rx.recv().await.unwrap();
428        match event.category {
429            EventCategory::Output(OutputEvent::Stdout { content }) => {
430                assert_eq!(content, "test output");
431            }
432            _ => panic!("Expected stdout output event"),
433        }
434    }
435
436    #[tokio::test]
437    async fn test_layer_ignores_non_cuenv_events() {
438        let (tx, mut rx) = mpsc::unbounded_channel();
439        let layer = CuenvEventLayer::new(tx);
440
441        let subscriber = tracing_subscriber::registry().with(layer);
442
443        tracing::subscriber::with_default(subscriber, || {
444            tracing::info!(
445                target: "other::target",
446                event_type = "output.stdout",
447                content = "should be ignored",
448                "Other event"
449            );
450        });
451
452        // Give a moment for any event to be sent
453        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
454        assert!(rx.try_recv().is_err());
455    }
456
457    #[tokio::test]
458    async fn test_layer_captures_task_events() {
459        let (tx, mut rx) = mpsc::unbounded_channel();
460        let layer = CuenvEventLayer::new(tx);
461
462        let subscriber = tracing_subscriber::registry().with(layer);
463
464        tracing::subscriber::with_default(subscriber, || {
465            tracing::info!(
466                target: "cuenv::task",
467                event_type = "task.started",
468                task_name = "build",
469                command = "cargo build",
470                hermetic = true,
471                "Task started"
472            );
473        });
474
475        let event = rx.recv().await.unwrap();
476        match event.category {
477            EventCategory::Task(TaskEvent::Started {
478                name,
479                command,
480                hermetic,
481            }) => {
482                assert_eq!(name, "build");
483                assert_eq!(command, "cargo build");
484                assert!(hermetic);
485            }
486            _ => panic!("Expected task started event"),
487        }
488    }
489}