cuenv_events/
layer.rs

1//! Custom tracing Layer for capturing cuenv events.
2//!
3//! This layer intercepts tracing events with specific targets and fields,
4//! converts them to `CuenvEvent` instances, and sends them to the `EventBus`.
5
6// These casts are intentional for tracing field extraction - values come from trusted sources
7#![allow(
8    clippy::cast_possible_truncation,
9    clippy::cast_sign_loss,
10    clippy::too_many_lines
11)]
12
13use crate::event::{
14    CiEvent, CommandEvent, CuenvEvent, EventCategory, EventSource, InteractiveEvent, OutputEvent,
15    Stream, SystemEvent, TaskEvent,
16};
17use crate::metadata::correlation_id;
18use tokio::sync::mpsc;
19use tracing::Subscriber;
20use tracing::field::{Field, Visit};
21use tracing_subscriber::Layer;
22use tracing_subscriber::layer::Context;
23use tracing_subscriber::registry::LookupSpan;
24
25/// A tracing Layer that captures cuenv-specific events.
26///
27/// Events are identified by their `target` (must start with "cuenv")
28/// and an `event_type` field that specifies the event category.
29pub struct CuenvEventLayer {
30    sender: mpsc::UnboundedSender<CuenvEvent>,
31}
32
33impl CuenvEventLayer {
34    /// Create a new layer that sends events to the given channel.
35    #[must_use]
36    pub fn new(sender: mpsc::UnboundedSender<CuenvEvent>) -> Self {
37        Self { sender }
38    }
39}
40
41impl<S> Layer<S> for CuenvEventLayer
42where
43    S: Subscriber + for<'a> LookupSpan<'a>,
44{
45    fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
46        let meta = event.metadata();
47        let target = meta.target();
48
49        // Only capture events with cuenv target
50        if !target.starts_with("cuenv") {
51            return;
52        }
53
54        // Extract fields using visitor pattern
55        let mut visitor = CuenvEventVisitor::new(target);
56        event.record(&mut visitor);
57
58        // Build and send event if it has required fields
59        if let Some(cuenv_event) = visitor.build() {
60            let _ = self.sender.send(cuenv_event);
61        }
62    }
63}
64
65/// Visitor for extracting typed fields from tracing events.
66struct CuenvEventVisitor {
67    target: String,
68    event_type: Option<String>,
69
70    // Task event fields
71    task_name: Option<String>,
72    command: Option<String>,
73    hermetic: Option<bool>,
74    cache_key: Option<String>,
75    stream: Option<Stream>,
76    content: Option<String>,
77    success: Option<bool>,
78    exit_code: Option<i32>,
79    duration_ms: Option<u64>,
80    sequential: Option<bool>,
81    task_count: Option<usize>,
82
83    // CI event fields
84    provider: Option<String>,
85    event_type_ci: Option<String>,
86    ref_name: Option<String>,
87    count: Option<usize>,
88    path: Option<String>,
89    project: Option<String>,
90    task: Option<String>,
91    reason: Option<String>,
92    error: Option<String>,
93
94    // Command event fields
95    args: Option<Vec<String>>,
96    progress: Option<f32>,
97    message: Option<String>,
98
99    // Interactive event fields
100    prompt_id: Option<String>,
101    options: Option<Vec<String>>,
102    response: Option<String>,
103    elapsed_secs: Option<u64>,
104
105    // System event fields
106    tag: Option<String>,
107}
108
109impl CuenvEventVisitor {
110    fn new(target: &str) -> Self {
111        Self {
112            target: target.to_string(),
113            event_type: None,
114            task_name: None,
115            command: None,
116            hermetic: None,
117            cache_key: None,
118            stream: None,
119            content: None,
120            success: None,
121            exit_code: None,
122            duration_ms: None,
123            sequential: None,
124            task_count: None,
125            provider: None,
126            event_type_ci: None,
127            ref_name: None,
128            count: None,
129            path: None,
130            project: None,
131            task: None,
132            reason: None,
133            error: None,
134            args: None,
135            progress: None,
136            message: None,
137            prompt_id: None,
138            options: None,
139            response: None,
140            elapsed_secs: None,
141            tag: None,
142        }
143    }
144
145    fn build(self) -> Option<CuenvEvent> {
146        let event_type = self.event_type.as_deref()?;
147        let source = EventSource::new(&self.target);
148        let correlation = correlation_id();
149
150        let category = match event_type {
151            // Task events
152            "task.started" => EventCategory::Task(TaskEvent::Started {
153                name: self.task_name?,
154                command: self.command?,
155                hermetic: self.hermetic.unwrap_or(false),
156            }),
157            "task.cache_hit" => EventCategory::Task(TaskEvent::CacheHit {
158                name: self.task_name?,
159                cache_key: self.cache_key?,
160            }),
161            "task.cache_miss" => EventCategory::Task(TaskEvent::CacheMiss {
162                name: self.task_name?,
163            }),
164            "task.output" => EventCategory::Task(TaskEvent::Output {
165                name: self.task_name?,
166                stream: self.stream.unwrap_or(Stream::Stdout),
167                content: self.content?,
168            }),
169            "task.completed" => EventCategory::Task(TaskEvent::Completed {
170                name: self.task_name?,
171                success: self.success?,
172                exit_code: self.exit_code,
173                duration_ms: self.duration_ms.unwrap_or(0),
174            }),
175            "task.group_started" => EventCategory::Task(TaskEvent::GroupStarted {
176                name: self.task_name?,
177                sequential: self.sequential.unwrap_or(false),
178                task_count: self.task_count.unwrap_or(0),
179            }),
180            "task.group_completed" => EventCategory::Task(TaskEvent::GroupCompleted {
181                name: self.task_name?,
182                success: self.success?,
183                duration_ms: self.duration_ms.unwrap_or(0),
184            }),
185
186            // CI events
187            "ci.context_detected" => EventCategory::Ci(CiEvent::ContextDetected {
188                provider: self.provider?,
189                event_type: self.event_type_ci?,
190                ref_name: self.ref_name?,
191            }),
192            "ci.changed_files" => {
193                EventCategory::Ci(CiEvent::ChangedFilesFound { count: self.count? })
194            }
195            "ci.projects_discovered" => {
196                EventCategory::Ci(CiEvent::ProjectsDiscovered { count: self.count? })
197            }
198            "ci.project_skipped" => EventCategory::Ci(CiEvent::ProjectSkipped {
199                path: self.path?,
200                reason: self.reason?,
201            }),
202            "ci.task_executing" => EventCategory::Ci(CiEvent::TaskExecuting {
203                project: self.project?,
204                task: self.task?,
205            }),
206            "ci.task_result" => EventCategory::Ci(CiEvent::TaskResult {
207                project: self.project?,
208                task: self.task?,
209                success: self.success?,
210                error: self.error,
211            }),
212            "ci.report_generated" => {
213                EventCategory::Ci(CiEvent::ReportGenerated { path: self.path? })
214            }
215
216            // Command events
217            "command.started" => EventCategory::Command(CommandEvent::Started {
218                command: self.command?,
219                args: self.args.unwrap_or_default(),
220            }),
221            "command.progress" => EventCategory::Command(CommandEvent::Progress {
222                command: self.command?,
223                progress: self.progress?,
224                message: self.message?,
225            }),
226            "command.completed" => EventCategory::Command(CommandEvent::Completed {
227                command: self.command?,
228                success: self.success?,
229                duration_ms: self.duration_ms.unwrap_or(0),
230            }),
231
232            // Interactive events
233            "interactive.prompt_requested" => {
234                EventCategory::Interactive(InteractiveEvent::PromptRequested {
235                    prompt_id: self.prompt_id?,
236                    message: self.message?,
237                    options: self.options.unwrap_or_default(),
238                })
239            }
240            "interactive.prompt_resolved" => {
241                EventCategory::Interactive(InteractiveEvent::PromptResolved {
242                    prompt_id: self.prompt_id?,
243                    response: self.response?,
244                })
245            }
246            "interactive.wait_progress" => {
247                EventCategory::Interactive(InteractiveEvent::WaitProgress {
248                    target: self.task_name.or(self.path)?,
249                    elapsed_secs: self.elapsed_secs?,
250                })
251            }
252
253            // System events
254            "system.supervisor_log" => EventCategory::System(SystemEvent::SupervisorLog {
255                tag: self.tag?,
256                message: self.message?,
257            }),
258            "system.shutdown" => EventCategory::System(SystemEvent::Shutdown),
259
260            // Output events
261            "output.stdout" => EventCategory::Output(OutputEvent::Stdout {
262                content: self.content?,
263            }),
264            "output.stderr" => EventCategory::Output(OutputEvent::Stderr {
265                content: self.content?,
266            }),
267
268            _ => return None,
269        };
270
271        Some(CuenvEvent::new(correlation, source, category))
272    }
273}
274
275impl Visit for CuenvEventVisitor {
276    fn record_str(&mut self, field: &Field, value: &str) {
277        match field.name() {
278            "event_type" => self.event_type = Some(value.to_string()),
279            "task_name" | "name" => self.task_name = Some(value.to_string()),
280            "command" | "cmd" => self.command = Some(value.to_string()),
281            "cache_key" => self.cache_key = Some(value.to_string()),
282            "content" => self.content = Some(value.to_string()),
283            "provider" => self.provider = Some(value.to_string()),
284            "ci_event_type" => self.event_type_ci = Some(value.to_string()),
285            "ref_name" => self.ref_name = Some(value.to_string()),
286            "path" => self.path = Some(value.to_string()),
287            "project" => self.project = Some(value.to_string()),
288            "task" => self.task = Some(value.to_string()),
289            "reason" => self.reason = Some(value.to_string()),
290            "error" => self.error = Some(value.to_string()),
291            "message" => self.message = Some(value.to_string()),
292            "prompt_id" => self.prompt_id = Some(value.to_string()),
293            "response" => self.response = Some(value.to_string()),
294            "tag" => self.tag = Some(value.to_string()),
295            "stream" => {
296                self.stream = match value {
297                    "stdout" => Some(Stream::Stdout),
298                    "stderr" => Some(Stream::Stderr),
299                    _ => None,
300                };
301            }
302            _ => {}
303        }
304    }
305
306    fn record_i64(&mut self, field: &Field, value: i64) {
307        match field.name() {
308            "exit_code" => self.exit_code = Some(value as i32),
309            "duration_ms" => self.duration_ms = Some(value as u64),
310            "count" => self.count = Some(value as usize),
311            "task_count" => self.task_count = Some(value as usize),
312            "elapsed_secs" => self.elapsed_secs = Some(value as u64),
313            _ => {}
314        }
315    }
316
317    fn record_u64(&mut self, field: &Field, value: u64) {
318        match field.name() {
319            "duration_ms" => self.duration_ms = Some(value),
320            "count" => self.count = Some(value as usize),
321            "task_count" => self.task_count = Some(value as usize),
322            "elapsed_secs" => self.elapsed_secs = Some(value),
323            _ => {}
324        }
325    }
326
327    fn record_f64(&mut self, field: &Field, value: f64) {
328        if field.name() == "progress" {
329            self.progress = Some(value as f32);
330        }
331    }
332
333    fn record_bool(&mut self, field: &Field, value: bool) {
334        match field.name() {
335            "hermetic" => self.hermetic = Some(value),
336            "success" => self.success = Some(value),
337            "sequential" => self.sequential = Some(value),
338            _ => {}
339        }
340    }
341
342    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
343        // Handle debug-formatted fields as strings
344        let value_str = format!("{value:?}");
345        match field.name() {
346            "args" => {
347                // Try to parse as JSON array
348                if let Ok(args) = serde_json::from_str::<Vec<String>>(&value_str) {
349                    self.args = Some(args);
350                }
351            }
352            "options" => {
353                if let Ok(options) = serde_json::from_str::<Vec<String>>(&value_str) {
354                    self.options = Some(options);
355                }
356            }
357            _ => {}
358        }
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use tokio::sync::mpsc;
366    use tracing_subscriber::layer::SubscriberExt;
367
368    #[tokio::test]
369    async fn test_layer_captures_cuenv_events() {
370        let (tx, mut rx) = mpsc::unbounded_channel();
371        let layer = CuenvEventLayer::new(tx);
372
373        let subscriber = tracing_subscriber::registry().with(layer);
374
375        tracing::subscriber::with_default(subscriber, || {
376            tracing::info!(
377                target: "cuenv::output",
378                event_type = "output.stdout",
379                content = "test output",
380                "Test event"
381            );
382        });
383
384        let event = rx.recv().await.unwrap();
385        match event.category {
386            EventCategory::Output(OutputEvent::Stdout { content }) => {
387                assert_eq!(content, "test output");
388            }
389            _ => panic!("Expected stdout output event"),
390        }
391    }
392
393    #[tokio::test]
394    async fn test_layer_ignores_non_cuenv_events() {
395        let (tx, mut rx) = mpsc::unbounded_channel();
396        let layer = CuenvEventLayer::new(tx);
397
398        let subscriber = tracing_subscriber::registry().with(layer);
399
400        tracing::subscriber::with_default(subscriber, || {
401            tracing::info!(
402                target: "other::target",
403                event_type = "output.stdout",
404                content = "should be ignored",
405                "Other event"
406            );
407        });
408
409        // Give a moment for any event to be sent
410        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
411        assert!(rx.try_recv().is_err());
412    }
413
414    #[tokio::test]
415    async fn test_layer_captures_task_events() {
416        let (tx, mut rx) = mpsc::unbounded_channel();
417        let layer = CuenvEventLayer::new(tx);
418
419        let subscriber = tracing_subscriber::registry().with(layer);
420
421        tracing::subscriber::with_default(subscriber, || {
422            tracing::info!(
423                target: "cuenv::task",
424                event_type = "task.started",
425                task_name = "build",
426                command = "cargo build",
427                hermetic = true,
428                "Task started"
429            );
430        });
431
432        let event = rx.recv().await.unwrap();
433        match event.category {
434            EventCategory::Task(TaskEvent::Started {
435                name,
436                command,
437                hermetic,
438            }) => {
439                assert_eq!(name, "build");
440                assert_eq!(command, "cargo build");
441                assert!(hermetic);
442            }
443            _ => panic!("Expected task started event"),
444        }
445    }
446}