clnrm_core/telemetry/
weaver_controller.rs

1//! Weaver Controller - Manages Weaver live-check lifecycle and validation reporting
2//!
3//! This module provides the critical integration between clnrm tests and Weaver's
4//! semantic convention validation. It makes Weaver the single source of truth for
5//! telemetry correctness.
6//!
7//! ## Lifecycle
8//!
9//! 1. Start Weaver live-check listener before tests
10//! 2. Tests emit telemetry via OTLP
11//! 3. Weaver validates telemetry in real-time
12//! 4. Stop Weaver after tests and get validation report
13//! 5. Exit with error if violations detected
14//!
15//! ## Architecture
16//!
17//! The WeaverController spawns Weaver as a child process and manages its lifecycle:
18//! - Uses SIGHUP for graceful shutdown on Unix
19//! - Parses JSON validation reports
20//! - Provides both streaming and final validation status
21
22use crate::error::{CleanroomError, Result};
23use serde::{Deserialize, Serialize};
24use std::io::{BufRead, BufReader};
25use std::net::TcpListener;
26use std::path::PathBuf;
27use std::process::{Child, Command, Stdio};
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::Arc;
30use std::thread;
31use std::time::{Duration, Instant};
32use tracing::{debug, error, info, warn};
33
34/// Coordination metadata from Weaver startup
35///
36/// This structure contains all information needed to coordinate OTEL initialization
37/// with Weaver's runtime state. It's returned by `start_and_coordinate()` and
38/// ensures OTEL exports telemetry to Weaver's actual listening port.
39#[derive(Debug, Clone)]
40pub struct WeaverCoordination {
41    /// Process ID of Weaver instance
42    pub weaver_pid: u32,
43    /// OTLP gRPC port Weaver is listening on
44    pub otlp_grpc_port: u16,
45    /// Admin/health port for control interface
46    pub admin_port: u16,
47    /// Timestamp when Weaver became ready
48    pub ready_at: Instant,
49}
50
51/// Validation status returned by Weaver
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(rename_all = "lowercase")]
54pub enum ValidationStatus {
55    /// Validation passed with no violations
56    Success,
57    /// Validation failed with violations detected
58    Failure,
59}
60
61/// Detailed information about a single validation issue
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ValidationDetail {
64    /// Severity level (violation, improvement, information)
65    pub level: String,
66    /// Metric or span name
67    pub metric_name: Option<String>,
68    /// Span name if applicable
69    pub span_name: Option<String>,
70    /// Description of the issue
71    pub message: String,
72    /// Path in registry
73    pub registry_path: Option<String>,
74}
75
76/// Complete validation report from Weaver
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ValidationReport {
79    /// Overall validation status
80    pub status: ValidationStatus,
81    /// Number of violations (blocking issues)
82    pub violations: u32,
83    /// Number of improvements (suggestions)
84    pub improvements: u32,
85    /// Number of information messages
86    pub information: u32,
87    /// Percentage of registry coverage (0.0 - 1.0)
88    pub registry_coverage: f64,
89    /// Number of telemetry samples received (CRITICAL: must be > 0 for valid validation)
90    #[serde(default)]
91    pub sample_count: u32,
92    /// Detailed list of validation issues
93    pub details: Vec<ValidationDetail>,
94}
95
96impl Default for ValidationReport {
97    fn default() -> Self {
98        Self {
99            status: ValidationStatus::Failure, // Default to failure (no telemetry = failed validation)
100            violations: 0,
101            improvements: 0,
102            information: 0,
103            registry_coverage: 0.0,
104            sample_count: 0,
105            details: Vec::new(),
106        }
107    }
108}
109
110/// Configuration for Weaver live-check
111#[derive(Debug, Clone)]
112pub struct WeaverConfig {
113    /// Path to semantic convention registry
114    pub registry_path: PathBuf,
115    /// OTLP gRPC port to listen on
116    pub otlp_port: u16,
117    /// Admin port for control interface
118    pub admin_port: u16,
119    /// Directory for output reports
120    pub output_dir: PathBuf,
121    /// Enable streaming output (for real-time feedback)
122    pub stream: bool,
123}
124
125impl Default for WeaverConfig {
126    fn default() -> Self {
127        Self {
128            registry_path: PathBuf::from("registry"),
129            otlp_port: 0,  // 0 = auto-discover available port
130            admin_port: 0, // 0 = auto-discover available port
131            output_dir: PathBuf::from("./validation_output"),
132            stream: false,
133        }
134    }
135}
136
137/// Controller for Weaver live-check process
138///
139/// Manages the lifecycle of a Weaver validation process, including:
140/// - Starting the live-check listener
141/// - Monitoring validation status
142/// - Stopping and retrieving final report
143///
144/// # Example
145///
146/// ```no_run
147/// use clnrm_core::telemetry::weaver_controller::{WeaverController, WeaverConfig};
148/// use std::path::PathBuf;
149///
150/// # async fn example() -> clnrm_core::error::Result<()> {
151/// let config = WeaverConfig {
152///     registry_path: PathBuf::from("registry"),
153///     output_dir: PathBuf::from("./validation_output"),
154///     ..Default::default()
155/// };
156///
157/// let mut controller = WeaverController::new(config);
158///
159/// // Start Weaver before tests
160/// controller.start_live_check()?;
161///
162/// // Run tests (they emit telemetry to OTLP)
163/// // ...
164///
165/// // Stop Weaver and get validation results
166/// let report = controller.stop_and_report()?;
167///
168/// if report.violations > 0 {
169///     eprintln!("Validation failed with {} violations", report.violations);
170/// }
171/// # Ok(())
172/// # }
173/// ```
174pub struct WeaverController {
175    config: WeaverConfig,
176    live_check_process: Option<Child>,
177    has_violations: Arc<AtomicBool>,
178    monitor_thread: Option<thread::JoinHandle<()>>,
179    coordination: Option<WeaverCoordination>,
180}
181
182impl WeaverController {
183    /// Create a new WeaverController with the given configuration
184    pub fn new(config: WeaverConfig) -> Self {
185        Self {
186            config,
187            live_check_process: None,
188            has_violations: Arc::new(AtomicBool::new(false)),
189            monitor_thread: None,
190            coordination: None,
191        }
192    }
193
194    /// Start Weaver and return coordination info (Weaver-first pattern)
195    ///
196    /// This is the PRIMARY method for Weaver-first initialization.
197    /// It blocks until Weaver is ready and returns coordination metadata
198    /// that MUST be used to configure OTEL exporters.
199    ///
200    /// # Weaver-First Pattern
201    ///
202    /// ```no_run
203    /// use clnrm_core::telemetry::weaver_controller::{WeaverController, WeaverConfig};
204    /// use clnrm_core::telemetry::{init_otel, OtelConfig, Export};
205    /// use std::path::PathBuf;
206    ///
207    /// # fn example() -> clnrm_core::error::Result<()> {
208    /// let config = WeaverConfig::default();
209    /// let mut controller = WeaverController::new(config);
210    ///
211    /// // Step 1: Start Weaver and get coordination
212    /// let coordination = controller.start_and_coordinate()?;
213    /// println!("Weaver listening on port {}", coordination.otlp_grpc_port);
214    ///
215    /// // Step 2: Initialize OTEL with Weaver's actual port
216    /// let endpoint = format!("http://localhost:{}", coordination.otlp_grpc_port);
217    /// let _otel_guard = init_otel(OtelConfig {
218    ///     service_name: "clnrm",
219    ///     deployment_env: "testing",
220    ///     sample_ratio: 1.0,
221    ///     export: Export::OtlpGrpc {
222    ///         endpoint: Box::leak(endpoint.into_boxed_str()),
223    ///     },
224    ///     enable_fmt_layer: false,
225    ///     headers: None,
226    /// })?;
227    ///
228    /// // Step 3: Run tests (telemetry goes to Weaver)
229    /// // ...
230    ///
231    /// // Step 4: Flush OTEL before stopping Weaver
232    /// drop(_otel_guard);
233    /// std::thread::sleep(std::time::Duration::from_millis(500));
234    ///
235    /// // Step 5: Stop Weaver and get validation report
236    /// let report = controller.stop_and_report()?;
237    /// # Ok(())
238    /// # }
239    /// ```
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if:
244    /// - Weaver binary not found
245    /// - No available ports in range
246    /// - Weaver process fails to start
247    /// - Health check timeout
248    pub fn start_and_coordinate(&mut self) -> Result<WeaverCoordination> {
249        info!("🚀 Starting Weaver with coordination (Weaver-first pattern)");
250
251        // Cleanup any orphaned processes first
252        Self::cleanup_old_weaver_processes()?;
253
254        // Find available ports with intelligent fallback
255        let otlp_port = Self::find_available_port_with_fallback()?;
256        let admin_port = Self::find_available_port(8080, 8090).or_else(|_| {
257            warn!("Primary admin port range exhausted, trying fallback");
258            Self::find_available_port(9080, 9090)
259        })?;
260
261        // Update config with discovered ports
262        self.config.otlp_port = otlp_port;
263        self.config.admin_port = admin_port;
264
265        info!("📡 Discovered OTLP port: {}", otlp_port);
266        info!("🔧 Discovered admin port: {}", admin_port);
267
268        // Ensure output directory exists
269        std::fs::create_dir_all(&self.config.output_dir).map_err(|e| {
270            CleanroomError::io_error(format!("Failed to create output directory: {}", e))
271        })?;
272
273        // Build Weaver command
274        let mut cmd = Command::new("weaver");
275        cmd.args([
276            "registry",
277            "live-check",
278            "--registry",
279            &self.config.registry_path.display().to_string(),
280            "--otlp-grpc-port",
281            &self.config.otlp_port.to_string(),
282            "--admin-port",
283            &self.config.admin_port.to_string(),
284            "--output",
285            &self.config.output_dir.display().to_string(),
286            "--format",
287            "json",
288        ]);
289
290        // Add streaming flag if disabled
291        if !self.config.stream {
292            cmd.arg("--no-stream");
293        }
294
295        debug!("Weaver command: {:?}", cmd);
296
297        // Spawn process with piped output
298        let mut child = cmd
299            .stdout(Stdio::piped())
300            .stderr(Stdio::piped())
301            .spawn()
302            .map_err(|e| {
303                CleanroomError::internal_error(format!(
304                    "Failed to start Weaver (is it installed?): {}",
305                    e
306                ))
307            })?;
308
309        let weaver_pid = child.id();
310        info!("🔍 Weaver process started (PID: {})", weaver_pid);
311
312        // If streaming is enabled, spawn a monitor thread
313        if self.config.stream {
314            let stderr = child
315                .stderr
316                .take()
317                .ok_or_else(|| CleanroomError::internal_error("Failed to capture Weaver stderr"))?;
318
319            let violations_flag = Arc::clone(&self.has_violations);
320            let monitor = thread::spawn(move || {
321                let reader = BufReader::new(stderr);
322                for line in reader.lines() {
323                    match line {
324                        Ok(line) => {
325                            debug!("Weaver: {}", line);
326                            // Check for violation indicators
327                            if line.contains("violation") || line.contains("error") {
328                                violations_flag.store(true, Ordering::Relaxed);
329                            }
330                        }
331                        Err(e) => {
332                            warn!("Error reading Weaver output: {}", e);
333                            break;
334                        }
335                    }
336                }
337            });
338
339            self.monitor_thread = Some(monitor);
340        }
341
342        self.live_check_process = Some(child);
343
344        // Wait for Weaver to be ready
345        self.wait_for_ready(Duration::from_secs(10))?;
346
347        let ready_at = Instant::now();
348        info!("✅ Weaver is ready and coordinated");
349
350        // Create coordination metadata
351        let coordination = WeaverCoordination {
352            weaver_pid,
353            otlp_grpc_port: otlp_port,
354            admin_port,
355            ready_at,
356        };
357
358        // Store coordination for later queries
359        self.coordination = Some(coordination.clone());
360
361        Ok(coordination)
362    }
363
364    /// Get current coordination state (non-blocking)
365    ///
366    /// Returns None if Weaver not started via `start_and_coordinate()`,
367    /// otherwise returns the coordination metadata.
368    pub fn coordination(&self) -> Option<WeaverCoordination> {
369        self.coordination.clone()
370    }
371
372    /// Find available port with intelligent fallback
373    ///
374    /// Strategy:
375    /// 1. Try primary range (4317-4327) - standard OTLP gRPC ports
376    /// 2. Fallback to secondary range (5317-5327) if primary exhausted
377    ///
378    /// # Errors
379    ///
380    /// Returns an error if no ports available in any range
381    fn find_available_port_with_fallback() -> Result<u16> {
382        debug!("Searching for available OTLP port with fallback");
383
384        // Try primary range (standard OTLP gRPC ports)
385        if let Ok(port) = Self::find_available_port(4317, 4327) {
386            info!("✅ Found available port in primary range: {}", port);
387            return Ok(port);
388        }
389
390        // Fallback to secondary range
391        warn!("Primary OTLP port range (4317-4327) exhausted, trying fallback range");
392        Self::find_available_port(5317, 5327).map_err(|_| {
393            CleanroomError::validation_error(
394                "No available ports in range 4317-4327, 5317-5327. \
395                 All ports in use. Stop other OTLP services or use custom port range.",
396            )
397        })
398    }
399
400    /// Wait for Weaver to become ready
401    ///
402    /// Health check strategy:
403    /// 1. Initial delay (1000ms) for process startup
404    /// 2. Check process still running (not crashed)
405    /// 3. Return success if process is running
406    ///
407    /// Future enhancement: Add HTTP health check to admin port
408    fn wait_for_ready(&mut self, _timeout: Duration) -> Result<()> {
409        info!("⏳ Waiting for Weaver to become ready...");
410        let start = Instant::now();
411
412        // Initial startup delay
413        thread::sleep(Duration::from_millis(1000));
414
415        // Check process state
416        if let Some(ref mut process) = self.live_check_process {
417            match process.try_wait() {
418                Ok(Some(status)) => {
419                    return Err(CleanroomError::internal_error(format!(
420                        "Weaver exited prematurely with status: {}. \
421                         Check Weaver logs in validation_output/ for details.",
422                        status
423                    )));
424                }
425                Ok(None) => {
426                    // Still running, assume ready
427                    let elapsed = start.elapsed();
428                    info!("✅ Weaver ready (elapsed: {}ms)", elapsed.as_millis());
429                    return Ok(());
430                }
431                Err(e) => {
432                    return Err(CleanroomError::internal_error(format!(
433                        "Failed to check Weaver status: {}",
434                        e
435                    )));
436                }
437            }
438        }
439
440        Err(CleanroomError::timeout_error(
441            "Weaver not ready within timeout",
442        ))
443    }
444
445    /// Find an available port in the specified range
446    ///
447    /// Attempts to bind to each port in range [start, end] and returns the first
448    /// available port. This prevents port conflicts when starting Weaver.
449    ///
450    /// # Arguments
451    ///
452    /// * `start` - Start of port range (inclusive)
453    /// * `end` - End of port range (inclusive)
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if no ports are available in the range
458    fn find_available_port(start: u16, end: u16) -> Result<u16> {
459        debug!("Searching for available port in range {}-{}", start, end);
460
461        for port in start..=end {
462            match TcpListener::bind(("127.0.0.1", port)) {
463                Ok(_) => {
464                    debug!("Found available port: {}", port);
465                    return Ok(port);
466                }
467                Err(_) => continue,
468            }
469        }
470
471        Err(CleanroomError::validation_error(format!(
472            "No available ports in range {}-{}. All ports are in use.",
473            start, end
474        )))
475    }
476
477    /// Cleanup any orphaned Weaver processes
478    ///
479    /// Attempts to kill any existing `weaver registry live-check` processes
480    /// to prevent port conflicts and resource leaks. This is called before
481    /// starting a new Weaver instance.
482    fn cleanup_old_weaver_processes() -> Result<()> {
483        debug!("Cleaning up orphaned Weaver processes");
484
485        // Try to kill any existing weaver live-check processes
486        #[cfg(unix)]
487        {
488            let _ = Command::new("pkill")
489                .args(["-9", "-f", "weaver registry live-check"])
490                .output();
491
492            // Give processes time to terminate
493            thread::sleep(Duration::from_millis(500));
494        }
495
496        #[cfg(not(unix))]
497        {
498            // On Windows, use taskkill
499            let _ = Command::new("taskkill")
500                .args(&["/F", "/IM", "weaver.exe"])
501                .output();
502
503            thread::sleep(Duration::from_millis(500));
504        }
505
506        Ok(())
507    }
508
509    /// Start Weaver live-check listener
510    ///
511    /// Spawns Weaver as a child process and waits for it to be ready.
512    /// Includes intelligent port management and process cleanup.
513    ///
514    /// # Errors
515    ///
516    /// Returns an error if:
517    /// - Weaver binary is not found
518    /// - Failed to create output directory
519    /// - Process failed to start
520    /// - Listener failed to become ready
521    /// - No available ports in range
522    pub fn start_live_check(&mut self) -> Result<()> {
523        info!("🔍 Starting Weaver live-check validation");
524
525        // Cleanup any orphaned processes first
526        Self::cleanup_old_weaver_processes()?;
527
528        // Find available ports with retry logic
529        let otlp_port = Self::find_available_port(4317, 4327).or_else(|_| {
530            warn!("Primary OTLP port range exhausted, trying fallback range");
531            Self::find_available_port(5317, 5327)
532        })?;
533
534        let admin_port = Self::find_available_port(8080, 8090).or_else(|_| {
535            warn!("Primary admin port range exhausted, trying fallback range");
536            Self::find_available_port(9080, 9090)
537        })?;
538
539        // Update config with discovered ports
540        self.config.otlp_port = otlp_port;
541        self.config.admin_port = admin_port;
542
543        info!("📡 Using OTLP port: {}", otlp_port);
544        info!("🔧 Using admin port: {}", admin_port);
545
546        // Ensure output directory exists
547        std::fs::create_dir_all(&self.config.output_dir).map_err(|e| {
548            CleanroomError::io_error(format!("Failed to create output directory: {}", e))
549        })?;
550
551        // Build Weaver command
552        let mut cmd = Command::new("weaver");
553        cmd.args([
554            "registry",
555            "live-check",
556            "--registry",
557            &self.config.registry_path.display().to_string(),
558            "--otlp-grpc-port",
559            &self.config.otlp_port.to_string(),
560            "--admin-port",
561            &self.config.admin_port.to_string(),
562            "--output",
563            &self.config.output_dir.display().to_string(),
564            "--format",
565            "json",
566        ]);
567
568        // Add streaming flag if disabled
569        if !self.config.stream {
570            cmd.arg("--no-stream");
571        }
572
573        debug!("Weaver command: {:?}", cmd);
574
575        // Spawn process with piped output
576        let mut child = cmd
577            .stdout(Stdio::piped())
578            .stderr(Stdio::piped())
579            .spawn()
580            .map_err(|e| {
581                CleanroomError::internal_error(format!(
582                    "Failed to start Weaver (is it installed?): {}",
583                    e
584                ))
585            })?;
586
587        // If streaming is enabled, spawn a monitor thread
588        if self.config.stream {
589            let stderr = child
590                .stderr
591                .take()
592                .ok_or_else(|| CleanroomError::internal_error("Failed to capture Weaver stderr"))?;
593
594            let violations_flag = Arc::clone(&self.has_violations);
595            let monitor = thread::spawn(move || {
596                let reader = BufReader::new(stderr);
597                for line in reader.lines() {
598                    match line {
599                        Ok(line) => {
600                            debug!("Weaver: {}", line);
601                            // Check for violation indicators
602                            if line.contains("violation") || line.contains("error") {
603                                violations_flag.store(true, Ordering::Relaxed);
604                            }
605                        }
606                        Err(e) => {
607                            warn!("Error reading Weaver output: {}", e);
608                            break;
609                        }
610                    }
611                }
612            });
613
614            self.monitor_thread = Some(monitor);
615        }
616
617        self.live_check_process = Some(child);
618
619        // Wait for listener to be ready with proper health check
620        info!("⏳ Waiting for Weaver to initialize...");
621        self.wait_for_weaver_ready()?;
622
623        Ok(())
624    }
625
626    /// Stop Weaver and retrieve validation report
627    ///
628    /// Sends SIGHUP to gracefully stop Weaver, then parses the validation report.
629    ///
630    /// # Errors
631    ///
632    /// Returns an error if:
633    /// - Weaver is not running
634    /// - Failed to stop the process
635    /// - Report file not found or invalid JSON
636    pub fn stop_and_report(&mut self) -> Result<ValidationReport> {
637        info!("🛑 Stopping Weaver and retrieving validation report");
638
639        let mut process = self
640            .live_check_process
641            .take()
642            .ok_or_else(|| CleanroomError::internal_error("Weaver live-check is not running"))?;
643
644        // Send graceful shutdown signal
645        #[cfg(unix)]
646        {
647            use nix::sys::signal::{kill, Signal};
648            use nix::unistd::Pid;
649
650            let pid = Pid::from_raw(process.id() as i32);
651            debug!("Sending SIGHUP to Weaver (PID: {})", pid);
652
653            kill(pid, Signal::SIGHUP).map_err(|e| {
654                CleanroomError::internal_error(format!("Failed to send SIGHUP: {}", e))
655            })?;
656        }
657
658        // On non-Unix, just kill the process
659        #[cfg(not(unix))]
660        {
661            warn!("Graceful shutdown not supported on this platform, killing process");
662            process.kill().map_err(|e| {
663                CleanroomError::internal_error(format!("Failed to kill Weaver: {}", e))
664            })?;
665        }
666
667        // Wait for process to finish (with timeout)
668        let output = self.wait_with_timeout(&mut process, Duration::from_secs(10))?;
669
670        // Log output for debugging
671        if !output.stdout.is_empty() {
672            debug!("Weaver stdout: {}", String::from_utf8_lossy(&output.stdout));
673        }
674        if !output.stderr.is_empty() {
675            debug!("Weaver stderr: {}", String::from_utf8_lossy(&output.stderr));
676        }
677
678        // Wait for monitor thread to finish
679        if let Some(monitor) = self.monitor_thread.take() {
680            let _ = monitor.join();
681        }
682
683        // Parse validation report
684        let report_path = self.config.output_dir.join("validation_report.json");
685        if !report_path.exists() {
686            warn!("Validation report not found at {:?}", report_path);
687            // Return a default report indicating unknown status
688            return Ok(ValidationReport::default());
689        }
690
691        let report_json = std::fs::read_to_string(&report_path).map_err(|e| {
692            CleanroomError::io_error(format!("Failed to read validation report: {}", e))
693        })?;
694
695        let mut report: ValidationReport = serde_json::from_str(&report_json).map_err(|e| {
696            CleanroomError::serialization_error(format!("Failed to parse validation report: {}", e))
697        })?;
698
699        // CRITICAL: Zero-sample validation (prevents false positives)
700        if report.sample_count == 0 {
701            error!("🚨 CRITICAL: Weaver received ZERO telemetry samples!");
702            error!("   This means validation did not actually test anything.");
703            error!("   Possible causes:");
704            error!("   - OTEL exporter not configured correctly");
705            error!("   - Telemetry sent to wrong port");
706            error!("   - Tests failed before emitting telemetry");
707            report.status = ValidationStatus::Failure;
708        }
709
710        // Log summary
711        info!("📊 Validation Report Summary:");
712        info!("   Status: {:?}", report.status);
713        info!("   Samples Received: {}", report.sample_count);
714        info!("   Violations: {}", report.violations);
715        info!("   Improvements: {}", report.improvements);
716        info!("   Information: {}", report.information);
717        info!(
718            "   Registry Coverage: {:.1}%",
719            report.registry_coverage * 100.0
720        );
721
722        if report.violations > 0 {
723            error!("❌ Weaver detected {} violations", report.violations);
724            for detail in &report.details {
725                if detail.level == "violation" {
726                    error!("   - {}", detail.message);
727                }
728            }
729        } else {
730            info!("✅ No violations detected");
731        }
732
733        Ok(report)
734    }
735
736    /// Check if validation has detected violations during streaming
737    ///
738    /// This is only useful when streaming is enabled in the configuration.
739    pub fn is_validation_passing(&self) -> bool {
740        !self.has_violations.load(Ordering::Relaxed)
741    }
742
743    /// Wait for Weaver to become ready by polling the admin health endpoint
744    ///
745    /// Uses exponential backoff with a maximum timeout. Polls the HTTP health endpoint
746    /// at `http://localhost:{admin_port}/health` to verify Weaver is actually listening.
747    ///
748    /// # Errors
749    ///
750    /// Returns an error if:
751    /// - Weaver process exits before becoming ready
752    /// - Health check times out (default: 30 seconds)
753    /// - HTTP health check fails persistently
754    fn wait_for_weaver_ready(&mut self) -> Result<()> {
755        const MAX_TIMEOUT: Duration = Duration::from_secs(30);
756        const INITIAL_DELAY: Duration = Duration::from_millis(100);
757        const MAX_DELAY: Duration = Duration::from_millis(1000);
758
759        let start = Instant::now();
760        let mut delay = INITIAL_DELAY;
761
762        // Check if process is still running
763        if let Some(ref mut process) = self.live_check_process {
764            match process.try_wait() {
765                Ok(Some(status)) => {
766                    return Err(CleanroomError::internal_error(format!(
767                        "Weaver exited prematurely with status: {}. \
768                         Check Weaver logs in validation_output/ for details.",
769                        status
770                    )));
771                }
772                Ok(None) => {
773                    // Process still running, continue with health check
774                }
775                Err(e) => {
776                    return Err(CleanroomError::internal_error(format!(
777                        "Failed to check Weaver status: {}",
778                        e
779                    )));
780                }
781            }
782        }
783
784        let admin_port = self.config.admin_port;
785        let health_url = format!("http://localhost:{}/health", admin_port);
786
787        loop {
788            // Check timeout
789            if start.elapsed() > MAX_TIMEOUT {
790                return Err(CleanroomError::timeout_error(format!(
791                    "Weaver health check timed out after {}s. \
792                     Admin port: {}, Health URL: {}",
793                    MAX_TIMEOUT.as_secs(),
794                    admin_port,
795                    health_url
796                )));
797            }
798
799            // Attempt HTTP health check using blocking reqwest client
800            match Self::check_weaver_health(&health_url) {
801                Ok(true) => {
802                    let elapsed = start.elapsed();
803                    info!(
804                        "✅ Weaver health check passed (elapsed: {}ms)",
805                        elapsed.as_millis()
806                    );
807                    return Ok(());
808                }
809                Ok(false) => {
810                    // Health check failed or network error, continue polling
811                    debug!("Health check not ready yet, will retry...");
812                }
813                Err(e) => {
814                    // Unexpected error, but continue polling
815                    debug!("Health check error: {} (will retry)", e);
816                }
817            }
818
819            // Wait before next attempt with exponential backoff
820            thread::sleep(delay);
821            delay = std::cmp::min(delay * 2, MAX_DELAY);
822
823            // Re-check process state periodically
824            if let Some(ref mut process) = self.live_check_process {
825                if let Ok(Some(status)) = process.try_wait() {
826                    return Err(CleanroomError::internal_error(format!(
827                        "Weaver exited during health check with status: {}. \
828                         Check Weaver logs in validation_output/ for details.",
829                        status
830                    )));
831                }
832            }
833        }
834    }
835
836    /// Check Weaver health endpoint
837    ///
838    /// Performs a blocking HTTP GET request to the Weaver health endpoint.
839    /// Returns `Ok(true)` if health check succeeds, `Ok(false)` if it fails
840    /// or network error occurs (allowing polling to continue).
841    fn check_weaver_health(url: &str) -> Result<bool> {
842        // Use blocking reqwest client for health checks
843        let client = match reqwest::blocking::Client::builder()
844            .timeout(Duration::from_secs(2))
845            .build()
846        {
847            Ok(client) => client,
848            Err(e) => {
849                debug!("Failed to create HTTP client: {} (will retry)", e);
850                return Ok(false);
851            }
852        };
853
854        match client.get(url).send() {
855            Ok(response) => {
856                let status = response.status();
857                Ok(status.is_success())
858            }
859            Err(e) => {
860                // Network errors are expected during startup, return false to continue polling
861                debug!("Health check request failed: {} (will retry)", e);
862                Ok(false)
863            }
864        }
865    }
866
867    /// Get the OTLP port that Weaver is listening on
868    ///
869    /// This returns the port discovered during `start_live_check()`.
870    /// Useful for configuring OTEL exporters to send to the correct endpoint.
871    pub fn get_otlp_port(&self) -> u16 {
872        self.config.otlp_port
873    }
874
875    /// Get the admin port that Weaver is listening on
876    ///
877    /// This returns the port discovered during `start_live_check()`.
878    pub fn get_admin_port(&self) -> u16 {
879        self.config.admin_port
880    }
881
882    /// Wait for process to finish with timeout
883    fn wait_with_timeout(
884        &self,
885        process: &mut Child,
886        timeout: Duration,
887    ) -> Result<std::process::Output> {
888        let start = std::time::Instant::now();
889
890        loop {
891            match process.try_wait() {
892                Ok(Some(status)) => {
893                    // Process finished, collect any remaining output
894                    use std::io::Read;
895
896                    let mut stdout = Vec::new();
897                    let mut stderr = Vec::new();
898
899                    if let Some(mut out) = process.stdout.take() {
900                        let _ = out.read_to_end(&mut stdout);
901                    }
902                    if let Some(mut err) = process.stderr.take() {
903                        let _ = err.read_to_end(&mut stderr);
904                    }
905
906                    return Ok(std::process::Output {
907                        status,
908                        stdout,
909                        stderr,
910                    });
911                }
912                Ok(None) => {
913                    // Still running
914                    if start.elapsed() > timeout {
915                        // Timeout reached, kill process
916                        warn!("Weaver did not stop gracefully, killing");
917                        let _ = process.kill();
918                        return Err(CleanroomError::timeout_error(
919                            "Weaver did not stop within timeout",
920                        ));
921                    }
922                    thread::sleep(Duration::from_millis(100));
923                }
924                Err(e) => {
925                    return Err(CleanroomError::internal_error(format!(
926                        "Failed to check Weaver status: {}",
927                        e
928                    )));
929                }
930            }
931        }
932    }
933}
934
935impl Drop for WeaverController {
936    fn drop(&mut self) {
937        // Ensure process is killed if still running
938        if let Some(mut process) = self.live_check_process.take() {
939            debug!("Cleaning up Weaver process in Drop");
940            let _ = process.kill();
941            let _ = process.wait();
942        }
943
944        // Wait for monitor thread
945        if let Some(monitor) = self.monitor_thread.take() {
946            let _ = monitor.join();
947        }
948    }
949}
950
951#[cfg(test)]
952mod tests {
953    use super::*;
954
955    #[test]
956    fn test_weaver_config_defaults() {
957        let config = WeaverConfig::default();
958        // Default config uses 0 for auto-discovery, not hardcoded ports
959        assert_eq!(config.otlp_port, 0); // 0 = auto-discover available port
960        assert_eq!(config.admin_port, 0); // 0 = auto-discover available port
961        assert_eq!(config.registry_path, PathBuf::from("registry"));
962        assert!(!config.stream);
963    }
964
965    #[test]
966    fn test_validation_report_default() {
967        let report = ValidationReport::default();
968        // Default status is Failure (no telemetry = failed validation)
969        assert_eq!(report.status, ValidationStatus::Failure);
970        assert_eq!(report.violations, 0);
971        assert_eq!(report.improvements, 0);
972        assert_eq!(report.information, 0);
973        assert_eq!(report.registry_coverage, 0.0);
974        assert!(report.details.is_empty());
975    }
976
977    #[test]
978    fn test_validation_status_serialization() {
979        let success = ValidationStatus::Success;
980        let json = serde_json::to_string(&success).unwrap();
981        assert_eq!(json, "\"success\"");
982
983        let failure = ValidationStatus::Failure;
984        let json = serde_json::to_string(&failure).unwrap();
985        assert_eq!(json, "\"failure\"");
986    }
987
988    #[test]
989    fn test_weaver_controller_creation() {
990        let config = WeaverConfig::default();
991        let controller = WeaverController::new(config);
992        assert!(controller.live_check_process.is_none());
993        assert!(!controller.has_violations.load(Ordering::Relaxed));
994    }
995
996    #[test]
997    fn test_validation_passing_initial_state() {
998        let config = WeaverConfig::default();
999        let controller = WeaverController::new(config);
1000        assert!(controller.is_validation_passing());
1001    }
1002
1003    // Integration tests are in tests/weaver/ directory
1004    // See tests/weaver/otel_integration_tests.rs for complete integration test suite
1005
1006    #[test]
1007    #[ignore = "Requires Weaver installation"]
1008    fn test_weaver_controller_lifecycle() {
1009        // This test requires Weaver to be installed and a registry to exist
1010        // For comprehensive integration tests, see:
1011        // - tests/weaver/otel_integration_tests.rs (24 integration tests)
1012        // - tests/weaver/phase2_coordination/ (coordination pattern tests)
1013        // - tests/weaver/phase3_otel_integration/ (OTEL contract tests)
1014        //
1015        // These integration tests verify end-to-end Weaver coordination,
1016        // telemetry export, and schema validation.
1017    }
1018}