clnrm_core/telemetry/
weaver_emit.rs

1//! Weaver Emit Integration - Test Data Generation from Schemas
2//!
3//! This module integrates with Weaver's `registry emit` command to provide:
4//! - Schema-based test data generation
5//! - Example telemetry emission for validation
6//! - Fixture generation for integration tests
7//! - Live-check validation with synthetic data
8//!
9//! ## Purpose
10//!
11//! The `emit` command generates example telemetry conforming to schemas,
12//! which is invaluable for:
13//! - Testing validation pipelines without production data
14//! - Demonstrating schema compliance
15//! - Seeding test environments with realistic data
16//! - Validating collector configurations
17//!
18//! ## Integration
19//!
20//! ```rust
21//! use clnrm_core::telemetry::weaver_emit::WeaverEmitter;
22//!
23//! let emitter = WeaverEmitter::new("registry/");
24//! emitter.emit_to_endpoint("http://localhost:4317")?;
25//! // Telemetry is now flowing to collector for validation
26//! ```
27
28use crate::error::{CleanroomError, Result};
29use serde::{Deserialize, Serialize};
30use std::path::{Path, PathBuf};
31use std::process::{Child, Command, Stdio};
32use std::time::Duration;
33use tracing::{debug, info, warn};
34
35/// Configuration for Weaver telemetry emission
36#[derive(Debug, Clone)]
37pub struct EmitConfig {
38    /// Path to semantic convention registry
39    pub registry_path: PathBuf,
40    /// OTLP endpoint to emit to (default: localhost:4317)
41    pub endpoint: String,
42    /// Emit to stdout instead of OTLP (useful for debugging)
43    pub stdout: bool,
44    /// Enable debug output
45    pub debug: bool,
46}
47
48impl Default for EmitConfig {
49    fn default() -> Self {
50        Self {
51            registry_path: PathBuf::from("registry"),
52            endpoint: "http://localhost:4317".to_string(),
53            stdout: false,
54            debug: false,
55        }
56    }
57}
58
59impl EmitConfig {
60    /// Create config that emits to stdout for testing
61    pub fn stdout() -> Self {
62        Self {
63            stdout: true,
64            ..Default::default()
65        }
66    }
67
68    /// Create config with custom endpoint
69    pub fn with_endpoint<S: Into<String>>(endpoint: S) -> Self {
70        Self {
71            endpoint: endpoint.into(),
72            ..Default::default()
73        }
74    }
75}
76
77/// Result of telemetry emission
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct EmitResult {
80    /// Number of spans emitted
81    pub spans_emitted: usize,
82    /// Number of metrics emitted
83    pub metrics_emitted: usize,
84    /// Number of events emitted (log records)
85    pub events_emitted: usize,
86    /// Total signals emitted
87    pub total_signals: usize,
88    /// Whether emission succeeded
89    pub success: bool,
90    /// Error message if failed
91    pub error: Option<String>,
92}
93
94impl EmitResult {
95    /// Create successful result
96    pub fn success(spans: usize, metrics: usize, events: usize) -> Self {
97        Self {
98            spans_emitted: spans,
99            metrics_emitted: metrics,
100            events_emitted: events,
101            total_signals: spans + metrics + events,
102            success: true,
103            error: None,
104        }
105    }
106
107    /// Create failed result
108    pub fn failure(error: String) -> Self {
109        Self {
110            spans_emitted: 0,
111            metrics_emitted: 0,
112            events_emitted: 0,
113            total_signals: 0,
114            success: false,
115            error: Some(error),
116        }
117    }
118}
119
120/// Weaver telemetry emitter
121///
122/// Wraps Weaver's `registry emit` command to generate schema-compliant
123/// test data for validation and testing purposes.
124pub struct WeaverEmitter {
125    config: EmitConfig,
126}
127
128impl WeaverEmitter {
129    /// Create a new emitter with default configuration
130    pub fn new<P: AsRef<Path>>(registry_path: P) -> Self {
131        Self {
132            config: EmitConfig {
133                registry_path: registry_path.as_ref().to_path_buf(),
134                ..Default::default()
135            },
136        }
137    }
138
139    /// Create emitter with custom configuration
140    pub fn with_config(config: EmitConfig) -> Self {
141        Self { config }
142    }
143
144    /// Emit telemetry to OTLP endpoint (one-shot)
145    ///
146    /// Generates example telemetry from all schemas in the registry
147    /// and sends to the configured OTLP endpoint.
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if:
152    /// - Weaver binary not found
153    /// - Registry invalid
154    /// - Endpoint unreachable
155    /// - Emission failed
156    pub fn emit(&self) -> Result<EmitResult> {
157        info!(
158            "🚀 Emitting telemetry from registry: {}",
159            self.config.registry_path.display()
160        );
161
162        // Validate registry exists
163        if !self.config.registry_path.exists() {
164            return Err(CleanroomError::validation_error(format!(
165                "Registry not found: {}",
166                self.config.registry_path.display()
167            )));
168        }
169
170        // Build weaver emit command
171        let mut cmd = Command::new("weaver");
172        cmd.args([
173            "registry",
174            "emit",
175            "--registry",
176            &self.config.registry_path.display().to_string(),
177        ]);
178
179        if self.config.stdout {
180            cmd.arg("--stdout");
181        } else {
182            cmd.args(["--endpoint", &self.config.endpoint]);
183        }
184
185        if self.config.debug {
186            cmd.arg("--debug");
187        }
188
189        debug!("Weaver emit command: {:?}", cmd);
190
191        // Execute command
192        let output = cmd.output().map_err(|e| {
193            CleanroomError::internal_error(format!(
194                "Failed to run weaver emit (is it installed?): {}",
195                e
196            ))
197        })?;
198
199        let stdout = String::from_utf8_lossy(&output.stdout);
200        let stderr = String::from_utf8_lossy(&output.stderr);
201
202        if self.config.debug {
203            debug!("Weaver emit stdout: {}", stdout);
204            debug!("Weaver emit stderr: {}", stderr);
205        }
206
207        if !output.status.success() {
208            return Ok(EmitResult::failure(format!(
209                "Weaver emit failed: {}",
210                stderr
211            )));
212        }
213
214        // Parse result from output
215        let result = self.parse_emit_output(&stdout, &stderr)?;
216
217        info!(
218            "✅ Emitted {} signals ({} spans, {} metrics, {} events)",
219            result.total_signals,
220            result.spans_emitted,
221            result.metrics_emitted,
222            result.events_emitted
223        );
224
225        Ok(result)
226    }
227
228    /// Start continuous emission (for long-running validation)
229    ///
230    /// Spawns Weaver emit as a background process that continuously
231    /// generates telemetry. Useful for testing collectors and pipelines.
232    ///
233    /// # Returns
234    ///
235    /// Returns a handle to the background process that can be stopped.
236    pub fn start_continuous(&self) -> Result<EmitHandle> {
237        info!("🔄 Starting continuous telemetry emission");
238
239        // Build command
240        let mut cmd = Command::new("weaver");
241        cmd.args([
242            "registry",
243            "emit",
244            "--registry",
245            &self.config.registry_path.display().to_string(),
246            "--endpoint",
247            &self.config.endpoint,
248        ]);
249
250        if self.config.debug {
251            cmd.arg("--debug");
252        }
253
254        // Spawn as background process
255        let child = cmd
256            .stdout(Stdio::piped())
257            .stderr(Stdio::piped())
258            .spawn()
259            .map_err(|e| {
260                CleanroomError::internal_error(format!("Failed to spawn weaver emit: {}", e))
261            })?;
262
263        info!("✅ Continuous emission started (PID: {})", child.id());
264
265        Ok(EmitHandle { process: child })
266    }
267
268    /// Emit telemetry and capture to string (for testing)
269    ///
270    /// Uses --stdout flag to capture generated telemetry as JSON.
271    pub fn emit_to_string(&self) -> Result<String> {
272        let mut config = self.config.clone();
273        config.stdout = true;
274
275        let emitter = WeaverEmitter::with_config(config);
276        let result = emitter.emit()?;
277
278        if !result.success {
279            return Err(CleanroomError::validation_error(format!(
280                "Emission failed: {}",
281                result.error.unwrap_or_default()
282            )));
283        }
284
285        // Re-run to capture stdout (previous call validated)
286        let output = Command::new("weaver")
287            .args([
288                "registry",
289                "emit",
290                "--registry",
291                &self.config.registry_path.display().to_string(),
292                "--stdout",
293            ])
294            .output()
295            .map_err(|e| CleanroomError::internal_error(format!("Failed to run weaver: {}", e)))?;
296
297        Ok(String::from_utf8_lossy(&output.stdout).to_string())
298    }
299
300    /// Parse emission result from Weaver output
301    fn parse_emit_output(&self, _stdout: &str, stderr: &str) -> Result<EmitResult> {
302        // Weaver emit outputs summary to stderr
303        // Example: "Emitted 15 spans, 10 metrics, 5 events"
304
305        let mut spans = 0;
306        let mut metrics = 0;
307        let mut events = 0;
308
309        for line in stderr.lines() {
310            if line.contains("Emitted") || line.contains("emitted") {
311                // Try to extract numbers
312                let words: Vec<&str> = line.split_whitespace().collect();
313                for (i, word) in words.iter().enumerate() {
314                    if let Ok(num) = word.parse::<usize>() {
315                        if i + 1 < words.len() {
316                            let next = words[i + 1].to_lowercase();
317                            if next.contains("span") {
318                                spans = num;
319                            } else if next.contains("metric") {
320                                metrics = num;
321                            } else if next.contains("event") || next.contains("log") {
322                                events = num;
323                            }
324                        }
325                    }
326                }
327            }
328        }
329
330        // If we couldn't parse counts, assume success with unknown counts
331        if spans == 0 && metrics == 0 && events == 0 {
332            warn!("Could not parse emission counts from output, assuming success");
333            spans = 1; // Assume at least some emission happened
334        }
335
336        Ok(EmitResult::success(spans, metrics, events))
337    }
338}
339
340/// Handle to a continuously emitting Weaver process
341///
342/// Automatically stops the process when dropped.
343pub struct EmitHandle {
344    process: Child,
345}
346
347impl EmitHandle {
348    /// Stop the emission process
349    pub fn stop(mut self) -> Result<()> {
350        info!("🛑 Stopping continuous emission");
351
352        self.process.kill().map_err(|e| {
353            CleanroomError::internal_error(format!("Failed to kill emitter process: {}", e))
354        })?;
355
356        let _ = self.process.wait();
357        info!("✅ Emission stopped");
358
359        Ok(())
360    }
361
362    /// Check if process is still running
363    pub fn is_running(&mut self) -> bool {
364        match self.process.try_wait() {
365            Ok(None) => true,
366            _ => false,
367        }
368    }
369
370    /// Wait for process to finish (with timeout)
371    pub fn wait_with_timeout(&mut self, timeout: Duration) -> Result<()> {
372        let start = std::time::Instant::now();
373
374        while start.elapsed() < timeout {
375            if !self.is_running() {
376                return Ok(());
377            }
378            std::thread::sleep(Duration::from_millis(100));
379        }
380
381        Err(CleanroomError::timeout_error(
382            "Emitter did not stop within timeout",
383        ))
384    }
385}
386
387impl Drop for EmitHandle {
388    fn drop(&mut self) {
389        debug!("Cleaning up emitter process in Drop");
390        let _ = self.process.kill();
391        let _ = self.process.wait();
392    }
393}
394
395/// Helper to emit test fixtures for integration tests
396///
397/// This is a convenience wrapper for common testing scenarios.
398pub struct FixtureGenerator {
399    registry_path: PathBuf,
400}
401
402impl FixtureGenerator {
403    /// Create a new fixture generator
404    pub fn new<P: AsRef<Path>>(registry_path: P) -> Self {
405        Self {
406            registry_path: registry_path.as_ref().to_path_buf(),
407        }
408    }
409
410    /// Generate JSON fixtures for all schemas
411    pub fn generate_json_fixtures(&self) -> Result<String> {
412        let config = EmitConfig {
413            registry_path: self.registry_path.clone(),
414            stdout: true,
415            ..Default::default()
416        };
417
418        let emitter = WeaverEmitter::with_config(config);
419        emitter.emit_to_string()
420    }
421
422    /// Emit fixtures to a file
423    pub fn emit_to_file<P: AsRef<Path>>(&self, output_path: P) -> Result<()> {
424        let fixtures = self.generate_json_fixtures()?;
425
426        std::fs::write(output_path.as_ref(), fixtures)
427            .map_err(|e| CleanroomError::io_error(format!("Failed to write fixtures: {}", e)))?;
428
429        info!("✅ Fixtures written to: {}", output_path.as_ref().display());
430
431        Ok(())
432    }
433
434    /// Emit test data to collector for validation testing
435    pub fn seed_collector(&self, endpoint: &str) -> Result<EmitResult> {
436        let config = EmitConfig {
437            registry_path: self.registry_path.clone(),
438            endpoint: endpoint.to_string(),
439            ..Default::default()
440        };
441
442        let emitter = WeaverEmitter::with_config(config);
443        emitter.emit()
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    #[test]
452    fn test_emit_config_default() {
453        let config = EmitConfig::default();
454        assert_eq!(config.registry_path, PathBuf::from("registry"));
455        assert_eq!(config.endpoint, "http://localhost:4317");
456        assert!(!config.stdout);
457        assert!(!config.debug);
458    }
459
460    #[test]
461    fn test_emit_config_stdout() {
462        let config = EmitConfig::stdout();
463        assert!(config.stdout);
464    }
465
466    #[test]
467    fn test_emit_config_with_endpoint() {
468        let config = EmitConfig::with_endpoint("http://example.com:4317");
469        assert_eq!(config.endpoint, "http://example.com:4317");
470    }
471
472    #[test]
473    fn test_emit_result_success() {
474        let result = EmitResult::success(10, 5, 3);
475        assert!(result.success);
476        assert_eq!(result.spans_emitted, 10);
477        assert_eq!(result.metrics_emitted, 5);
478        assert_eq!(result.events_emitted, 3);
479        assert_eq!(result.total_signals, 18);
480        assert!(result.error.is_none());
481    }
482
483    #[test]
484    fn test_emit_result_failure() {
485        let result = EmitResult::failure("Test error".to_string());
486        assert!(!result.success);
487        assert_eq!(result.total_signals, 0);
488        assert_eq!(result.error, Some("Test error".to_string()));
489    }
490
491    #[test]
492    fn test_weaver_emitter_creation() {
493        let emitter = WeaverEmitter::new("registry/");
494        assert_eq!(emitter.config.registry_path, PathBuf::from("registry/"));
495    }
496
497    #[test]
498    fn test_fixture_generator_creation() {
499        let generator = FixtureGenerator::new("registry/");
500        assert_eq!(generator.registry_path, PathBuf::from("registry/"));
501    }
502
503    #[test]
504    fn test_parse_emit_output() {
505        let emitter = WeaverEmitter::new("test");
506        let stderr = "Emitted 15 spans, 10 metrics, 5 events successfully";
507
508        let result = emitter.parse_emit_output("", stderr).unwrap();
509        assert_eq!(result.spans_emitted, 15);
510        assert_eq!(result.metrics_emitted, 10);
511        assert_eq!(result.events_emitted, 5);
512        assert!(result.success);
513    }
514
515    #[test]
516    #[ignore = "Requires Weaver installation"]
517    fn test_emit_integration() {
518        // This test requires Weaver and a valid registry
519        // Run with: cargo test --ignored
520    }
521}