1#![allow(
8 clippy::cast_possible_truncation,
9 clippy::cast_sign_loss,
10 clippy::too_many_lines
11)]
12
13use crate::event::{
14 CiEvent, CommandEvent, CuenvEvent, EventCategory, EventSource, InteractiveEvent, OutputEvent,
15 Stream, SystemEvent, TaskEvent,
16};
17use crate::metadata::correlation_id;
18use tokio::sync::mpsc;
19use tracing::Subscriber;
20use tracing::field::{Field, Visit};
21use tracing_subscriber::Layer;
22use tracing_subscriber::layer::Context;
23use tracing_subscriber::registry::LookupSpan;
24
25pub struct CuenvEventLayer {
30 sender: mpsc::UnboundedSender<CuenvEvent>,
31}
32
33impl CuenvEventLayer {
34 #[must_use]
36 pub fn new(sender: mpsc::UnboundedSender<CuenvEvent>) -> Self {
37 Self { sender }
38 }
39}
40
41impl<S> Layer<S> for CuenvEventLayer
42where
43 S: Subscriber + for<'a> LookupSpan<'a>,
44{
45 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
46 let meta = event.metadata();
47 let target = meta.target();
48
49 if !target.starts_with("cuenv") {
51 return;
52 }
53
54 let mut visitor = CuenvEventVisitor::new(target);
56 event.record(&mut visitor);
57
58 if let Some(cuenv_event) = visitor.build() {
60 let _ = self.sender.send(cuenv_event);
61 }
62 }
63}
64
65struct CuenvEventVisitor {
67 target: String,
68 event_type: Option<String>,
69
70 task_name: Option<String>,
72 command: Option<String>,
73 hermetic: Option<bool>,
74 cache_key: Option<String>,
75 stream: Option<Stream>,
76 content: Option<String>,
77 success: Option<bool>,
78 exit_code: Option<i32>,
79 duration_ms: Option<u64>,
80 sequential: Option<bool>,
81 task_count: Option<usize>,
82
83 provider: Option<String>,
85 event_type_ci: Option<String>,
86 ref_name: Option<String>,
87 count: Option<usize>,
88 path: Option<String>,
89 project: Option<String>,
90 task: Option<String>,
91 reason: Option<String>,
92 error: Option<String>,
93
94 args: Option<Vec<String>>,
96 progress: Option<f32>,
97 message: Option<String>,
98
99 prompt_id: Option<String>,
101 options: Option<Vec<String>>,
102 response: Option<String>,
103 elapsed_secs: Option<u64>,
104
105 tag: Option<String>,
107}
108
109impl CuenvEventVisitor {
110 fn new(target: &str) -> Self {
111 Self {
112 target: target.to_string(),
113 event_type: None,
114 task_name: None,
115 command: None,
116 hermetic: None,
117 cache_key: None,
118 stream: None,
119 content: None,
120 success: None,
121 exit_code: None,
122 duration_ms: None,
123 sequential: None,
124 task_count: None,
125 provider: None,
126 event_type_ci: None,
127 ref_name: None,
128 count: None,
129 path: None,
130 project: None,
131 task: None,
132 reason: None,
133 error: None,
134 args: None,
135 progress: None,
136 message: None,
137 prompt_id: None,
138 options: None,
139 response: None,
140 elapsed_secs: None,
141 tag: None,
142 }
143 }
144
145 fn build(self) -> Option<CuenvEvent> {
146 let event_type = self.event_type.as_deref()?;
147 let source = EventSource::new(&self.target);
148 let correlation = correlation_id();
149
150 let category = match event_type {
151 "task.started" => EventCategory::Task(TaskEvent::Started {
153 name: self.task_name?,
154 command: self.command?,
155 hermetic: self.hermetic.unwrap_or(false),
156 }),
157 "task.cache_hit" => EventCategory::Task(TaskEvent::CacheHit {
158 name: self.task_name?,
159 cache_key: self.cache_key?,
160 }),
161 "task.cache_miss" => EventCategory::Task(TaskEvent::CacheMiss {
162 name: self.task_name?,
163 }),
164 "task.output" => EventCategory::Task(TaskEvent::Output {
165 name: self.task_name?,
166 stream: self.stream.unwrap_or(Stream::Stdout),
167 content: self.content?,
168 }),
169 "task.completed" => EventCategory::Task(TaskEvent::Completed {
170 name: self.task_name?,
171 success: self.success?,
172 exit_code: self.exit_code,
173 duration_ms: self.duration_ms.unwrap_or(0),
174 }),
175 "task.group_started" => EventCategory::Task(TaskEvent::GroupStarted {
176 name: self.task_name?,
177 sequential: self.sequential.unwrap_or(false),
178 task_count: self.task_count.unwrap_or(0),
179 }),
180 "task.group_completed" => EventCategory::Task(TaskEvent::GroupCompleted {
181 name: self.task_name?,
182 success: self.success?,
183 duration_ms: self.duration_ms.unwrap_or(0),
184 }),
185
186 "ci.context_detected" => EventCategory::Ci(CiEvent::ContextDetected {
188 provider: self.provider?,
189 event_type: self.event_type_ci?,
190 ref_name: self.ref_name?,
191 }),
192 "ci.changed_files" => {
193 EventCategory::Ci(CiEvent::ChangedFilesFound { count: self.count? })
194 }
195 "ci.projects_discovered" => {
196 EventCategory::Ci(CiEvent::ProjectsDiscovered { count: self.count? })
197 }
198 "ci.project_skipped" => EventCategory::Ci(CiEvent::ProjectSkipped {
199 path: self.path?,
200 reason: self.reason?,
201 }),
202 "ci.task_executing" => EventCategory::Ci(CiEvent::TaskExecuting {
203 project: self.project?,
204 task: self.task?,
205 }),
206 "ci.task_result" => EventCategory::Ci(CiEvent::TaskResult {
207 project: self.project?,
208 task: self.task?,
209 success: self.success?,
210 error: self.error,
211 }),
212 "ci.report_generated" => {
213 EventCategory::Ci(CiEvent::ReportGenerated { path: self.path? })
214 }
215
216 "command.started" => EventCategory::Command(CommandEvent::Started {
218 command: self.command?,
219 args: self.args.unwrap_or_default(),
220 }),
221 "command.progress" => EventCategory::Command(CommandEvent::Progress {
222 command: self.command?,
223 progress: self.progress?,
224 message: self.message?,
225 }),
226 "command.completed" => EventCategory::Command(CommandEvent::Completed {
227 command: self.command?,
228 success: self.success?,
229 duration_ms: self.duration_ms.unwrap_or(0),
230 }),
231
232 "interactive.prompt_requested" => {
234 EventCategory::Interactive(InteractiveEvent::PromptRequested {
235 prompt_id: self.prompt_id?,
236 message: self.message?,
237 options: self.options.unwrap_or_default(),
238 })
239 }
240 "interactive.prompt_resolved" => {
241 EventCategory::Interactive(InteractiveEvent::PromptResolved {
242 prompt_id: self.prompt_id?,
243 response: self.response?,
244 })
245 }
246 "interactive.wait_progress" => {
247 EventCategory::Interactive(InteractiveEvent::WaitProgress {
248 target: self.task_name.or(self.path)?,
249 elapsed_secs: self.elapsed_secs?,
250 })
251 }
252
253 "system.supervisor_log" => EventCategory::System(SystemEvent::SupervisorLog {
255 tag: self.tag?,
256 message: self.message?,
257 }),
258 "system.shutdown" => EventCategory::System(SystemEvent::Shutdown),
259
260 "output.stdout" => EventCategory::Output(OutputEvent::Stdout {
262 content: self.content?,
263 }),
264 "output.stderr" => EventCategory::Output(OutputEvent::Stderr {
265 content: self.content?,
266 }),
267
268 _ => return None,
269 };
270
271 Some(CuenvEvent::new(correlation, source, category))
272 }
273}
274
275impl Visit for CuenvEventVisitor {
276 fn record_str(&mut self, field: &Field, value: &str) {
277 match field.name() {
278 "event_type" => self.event_type = Some(value.to_string()),
279 "task_name" | "name" => self.task_name = Some(value.to_string()),
280 "command" | "cmd" => self.command = Some(value.to_string()),
281 "cache_key" => self.cache_key = Some(value.to_string()),
282 "content" => self.content = Some(value.to_string()),
283 "provider" => self.provider = Some(value.to_string()),
284 "ci_event_type" => self.event_type_ci = Some(value.to_string()),
285 "ref_name" => self.ref_name = Some(value.to_string()),
286 "path" => self.path = Some(value.to_string()),
287 "project" => self.project = Some(value.to_string()),
288 "task" => self.task = Some(value.to_string()),
289 "reason" => self.reason = Some(value.to_string()),
290 "error" => self.error = Some(value.to_string()),
291 "message" => self.message = Some(value.to_string()),
292 "prompt_id" => self.prompt_id = Some(value.to_string()),
293 "response" => self.response = Some(value.to_string()),
294 "tag" => self.tag = Some(value.to_string()),
295 "stream" => {
296 self.stream = match value {
297 "stdout" => Some(Stream::Stdout),
298 "stderr" => Some(Stream::Stderr),
299 _ => None,
300 };
301 }
302 _ => {}
303 }
304 }
305
306 fn record_i64(&mut self, field: &Field, value: i64) {
307 match field.name() {
308 "exit_code" => self.exit_code = Some(value as i32),
309 "duration_ms" => self.duration_ms = Some(value as u64),
310 "count" => self.count = Some(value as usize),
311 "task_count" => self.task_count = Some(value as usize),
312 "elapsed_secs" => self.elapsed_secs = Some(value as u64),
313 _ => {}
314 }
315 }
316
317 fn record_u64(&mut self, field: &Field, value: u64) {
318 match field.name() {
319 "duration_ms" => self.duration_ms = Some(value),
320 "count" => self.count = Some(value as usize),
321 "task_count" => self.task_count = Some(value as usize),
322 "elapsed_secs" => self.elapsed_secs = Some(value),
323 _ => {}
324 }
325 }
326
327 fn record_f64(&mut self, field: &Field, value: f64) {
328 if field.name() == "progress" {
329 self.progress = Some(value as f32);
330 }
331 }
332
333 fn record_bool(&mut self, field: &Field, value: bool) {
334 match field.name() {
335 "hermetic" => self.hermetic = Some(value),
336 "success" => self.success = Some(value),
337 "sequential" => self.sequential = Some(value),
338 _ => {}
339 }
340 }
341
342 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
343 let value_str = format!("{value:?}");
345 match field.name() {
346 "args" => {
347 if let Ok(args) = serde_json::from_str::<Vec<String>>(&value_str) {
349 self.args = Some(args);
350 }
351 }
352 "options" => {
353 if let Ok(options) = serde_json::from_str::<Vec<String>>(&value_str) {
354 self.options = Some(options);
355 }
356 }
357 "event_type" | "task_name" | "name" | "command" | "cmd" | "content" | "cache_key"
361 | "stream" => {
362 let cleaned = value_str.trim_matches('"');
364 match field.name() {
365 "event_type" => self.event_type = Some(cleaned.to_string()),
366 "task_name" | "name" => self.task_name = Some(cleaned.to_string()),
367 "command" | "cmd" => self.command = Some(cleaned.to_string()),
368 "content" => self.content = Some(cleaned.to_string()),
369 "cache_key" => self.cache_key = Some(cleaned.to_string()),
370 "stream" => {
371 self.stream = match cleaned {
372 "stdout" => Some(Stream::Stdout),
373 "stderr" => Some(Stream::Stderr),
374 _ => None,
375 };
376 }
377 _ => {}
378 }
379 }
380 "exit_code" => {
383 if let Some(inner) = value_str
385 .strip_prefix("Some(")
386 .and_then(|s| s.strip_suffix(')'))
387 && let Ok(code) = inner.parse::<i32>()
388 {
389 self.exit_code = Some(code);
390 } else if value_str != "None" {
391 if let Ok(code) = value_str.parse::<i32>() {
393 self.exit_code = Some(code);
394 }
395 }
396 }
397 _ => {}
398 }
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use tokio::sync::mpsc;
406 use tracing_subscriber::layer::SubscriberExt;
407
408 #[tokio::test]
409 async fn test_layer_captures_cuenv_events() {
410 let (tx, mut rx) = mpsc::unbounded_channel();
411 let layer = CuenvEventLayer::new(tx);
412
413 let subscriber = tracing_subscriber::registry().with(layer);
414
415 tracing::subscriber::with_default(subscriber, || {
416 tracing::info!(
417 target: "cuenv::output",
418 event_type = "output.stdout",
419 content = "test output",
420 "Test event"
421 );
422 });
423
424 let event = rx.recv().await.unwrap();
425 match event.category {
426 EventCategory::Output(OutputEvent::Stdout { content }) => {
427 assert_eq!(content, "test output");
428 }
429 _ => panic!("Expected stdout output event"),
430 }
431 }
432
433 #[tokio::test]
434 async fn test_layer_ignores_non_cuenv_events() {
435 let (tx, mut rx) = mpsc::unbounded_channel();
436 let layer = CuenvEventLayer::new(tx);
437
438 let subscriber = tracing_subscriber::registry().with(layer);
439
440 tracing::subscriber::with_default(subscriber, || {
441 tracing::info!(
442 target: "other::target",
443 event_type = "output.stdout",
444 content = "should be ignored",
445 "Other event"
446 );
447 });
448
449 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
451 assert!(rx.try_recv().is_err());
452 }
453
454 #[tokio::test]
455 async fn test_layer_captures_task_events() {
456 let (tx, mut rx) = mpsc::unbounded_channel();
457 let layer = CuenvEventLayer::new(tx);
458
459 let subscriber = tracing_subscriber::registry().with(layer);
460
461 tracing::subscriber::with_default(subscriber, || {
462 tracing::info!(
463 target: "cuenv::task",
464 event_type = "task.started",
465 task_name = "build",
466 command = "cargo build",
467 hermetic = true,
468 "Task started"
469 );
470 });
471
472 let event = rx.recv().await.unwrap();
473 match event.category {
474 EventCategory::Task(TaskEvent::Started {
475 name,
476 command,
477 hermetic,
478 }) => {
479 assert_eq!(name, "build");
480 assert_eq!(command, "cargo build");
481 assert!(hermetic);
482 }
483 _ => panic!("Expected task started event"),
484 }
485 }
486}