clnrm_core/telemetry/weaver_controller.rs
1//! Weaver Controller - Manages Weaver live-check lifecycle and validation reporting
2//!
3//! This module provides the critical integration between clnrm tests and Weaver's
4//! semantic convention validation. It makes Weaver the single source of truth for
5//! telemetry correctness.
6//!
7//! ## Lifecycle
8//!
9//! 1. Start Weaver live-check listener before tests
10//! 2. Tests emit telemetry via OTLP
11//! 3. Weaver validates telemetry in real-time
12//! 4. Stop Weaver after tests and get validation report
13//! 5. Exit with error if violations detected
14//!
15//! ## Architecture
16//!
17//! The WeaverController spawns Weaver as a child process and manages its lifecycle:
18//! - Uses SIGHUP for graceful shutdown on Unix
19//! - Parses JSON validation reports
20//! - Provides both streaming and final validation status
21
22use crate::error::{CleanroomError, Result};
23use serde::{Deserialize, Serialize};
24use std::io::{BufRead, BufReader};
25use std::net::TcpListener;
26use std::path::PathBuf;
27use std::process::{Child, Command, Stdio};
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::Arc;
30use std::thread;
31use std::time::{Duration, Instant};
32use tracing::{debug, error, info, warn};
33
34/// Coordination metadata from Weaver startup
35///
36/// This structure contains all information needed to coordinate OTEL initialization
37/// with Weaver's runtime state. It's returned by `start_and_coordinate()` and
38/// ensures OTEL exports telemetry to Weaver's actual listening port.
39#[derive(Debug, Clone)]
40pub struct WeaverCoordination {
41 /// Process ID of Weaver instance
42 pub weaver_pid: u32,
43 /// OTLP gRPC port Weaver is listening on
44 pub otlp_grpc_port: u16,
45 /// Admin/health port for control interface
46 pub admin_port: u16,
47 /// Timestamp when Weaver became ready
48 pub ready_at: Instant,
49}
50
51/// Validation status returned by Weaver
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53#[serde(rename_all = "lowercase")]
54pub enum ValidationStatus {
55 /// Validation passed with no violations
56 Success,
57 /// Validation failed with violations detected
58 Failure,
59}
60
61/// Detailed information about a single validation issue
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ValidationDetail {
64 /// Severity level (violation, improvement, information)
65 pub level: String,
66 /// Metric or span name
67 pub metric_name: Option<String>,
68 /// Span name if applicable
69 pub span_name: Option<String>,
70 /// Description of the issue
71 pub message: String,
72 /// Path in registry
73 pub registry_path: Option<String>,
74}
75
76/// Complete validation report from Weaver
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ValidationReport {
79 /// Overall validation status
80 pub status: ValidationStatus,
81 /// Number of violations (blocking issues)
82 pub violations: u32,
83 /// Number of improvements (suggestions)
84 pub improvements: u32,
85 /// Number of information messages
86 pub information: u32,
87 /// Percentage of registry coverage (0.0 - 1.0)
88 pub registry_coverage: f64,
89 /// Number of telemetry samples received (CRITICAL: must be > 0 for valid validation)
90 #[serde(default)]
91 pub sample_count: u32,
92 /// Detailed list of validation issues
93 pub details: Vec<ValidationDetail>,
94}
95
96impl Default for ValidationReport {
97 fn default() -> Self {
98 Self {
99 status: ValidationStatus::Failure, // Default to failure (no telemetry = failed validation)
100 violations: 0,
101 improvements: 0,
102 information: 0,
103 registry_coverage: 0.0,
104 sample_count: 0,
105 details: Vec::new(),
106 }
107 }
108}
109
110/// Configuration for Weaver live-check
111#[derive(Debug, Clone)]
112pub struct WeaverConfig {
113 /// Path to semantic convention registry
114 pub registry_path: PathBuf,
115 /// OTLP gRPC port to listen on
116 pub otlp_port: u16,
117 /// Admin port for control interface
118 pub admin_port: u16,
119 /// Directory for output reports
120 pub output_dir: PathBuf,
121 /// Enable streaming output (for real-time feedback)
122 pub stream: bool,
123}
124
125impl Default for WeaverConfig {
126 fn default() -> Self {
127 Self {
128 registry_path: PathBuf::from("registry"),
129 otlp_port: 0, // 0 = auto-discover available port
130 admin_port: 0, // 0 = auto-discover available port
131 output_dir: PathBuf::from("./validation_output"),
132 stream: false,
133 }
134 }
135}
136
137/// Controller for Weaver live-check process
138///
139/// Manages the lifecycle of a Weaver validation process, including:
140/// - Starting the live-check listener
141/// - Monitoring validation status
142/// - Stopping and retrieving final report
143///
144/// # Example
145///
146/// ```no_run
147/// use clnrm_core::telemetry::weaver_controller::{WeaverController, WeaverConfig};
148/// use std::path::PathBuf;
149///
150/// # async fn example() -> clnrm_core::error::Result<()> {
151/// let config = WeaverConfig {
152/// registry_path: PathBuf::from("registry"),
153/// output_dir: PathBuf::from("./validation_output"),
154/// ..Default::default()
155/// };
156///
157/// let mut controller = WeaverController::new(config);
158///
159/// // Start Weaver before tests
160/// controller.start_live_check()?;
161///
162/// // Run tests (they emit telemetry to OTLP)
163/// // ...
164///
165/// // Stop Weaver and get validation results
166/// let report = controller.stop_and_report()?;
167///
168/// if report.violations > 0 {
169/// eprintln!("Validation failed with {} violations", report.violations);
170/// }
171/// # Ok(())
172/// # }
173/// ```
174pub struct WeaverController {
175 config: WeaverConfig,
176 live_check_process: Option<Child>,
177 has_violations: Arc<AtomicBool>,
178 monitor_thread: Option<thread::JoinHandle<()>>,
179 coordination: Option<WeaverCoordination>,
180}
181
182impl WeaverController {
183 /// Create a new WeaverController with the given configuration
184 pub fn new(config: WeaverConfig) -> Self {
185 Self {
186 config,
187 live_check_process: None,
188 has_violations: Arc::new(AtomicBool::new(false)),
189 monitor_thread: None,
190 coordination: None,
191 }
192 }
193
194 /// Start Weaver and return coordination info (Weaver-first pattern)
195 ///
196 /// This is the PRIMARY method for Weaver-first initialization.
197 /// It blocks until Weaver is ready and returns coordination metadata
198 /// that MUST be used to configure OTEL exporters.
199 ///
200 /// # Weaver-First Pattern
201 ///
202 /// ```no_run
203 /// use clnrm_core::telemetry::weaver_controller::{WeaverController, WeaverConfig};
204 /// use clnrm_core::telemetry::{init_otel, OtelConfig, Export};
205 /// use std::path::PathBuf;
206 ///
207 /// # fn example() -> clnrm_core::error::Result<()> {
208 /// let config = WeaverConfig::default();
209 /// let mut controller = WeaverController::new(config);
210 ///
211 /// // Step 1: Start Weaver and get coordination
212 /// let coordination = controller.start_and_coordinate()?;
213 /// println!("Weaver listening on port {}", coordination.otlp_grpc_port);
214 ///
215 /// // Step 2: Initialize OTEL with Weaver's actual port
216 /// let endpoint = format!("http://localhost:{}", coordination.otlp_grpc_port);
217 /// let _otel_guard = init_otel(OtelConfig {
218 /// service_name: "clnrm",
219 /// deployment_env: "testing",
220 /// sample_ratio: 1.0,
221 /// export: Export::OtlpGrpc {
222 /// endpoint: Box::leak(endpoint.into_boxed_str()),
223 /// },
224 /// enable_fmt_layer: false,
225 /// headers: None,
226 /// })?;
227 ///
228 /// // Step 3: Run tests (telemetry goes to Weaver)
229 /// // ...
230 ///
231 /// // Step 4: Flush OTEL before stopping Weaver
232 /// drop(_otel_guard);
233 /// std::thread::sleep(std::time::Duration::from_millis(500));
234 ///
235 /// // Step 5: Stop Weaver and get validation report
236 /// let report = controller.stop_and_report()?;
237 /// # Ok(())
238 /// # }
239 /// ```
240 ///
241 /// # Errors
242 ///
243 /// Returns an error if:
244 /// - Weaver binary not found
245 /// - No available ports in range
246 /// - Weaver process fails to start
247 /// - Health check timeout
248 pub fn start_and_coordinate(&mut self) -> Result<WeaverCoordination> {
249 info!("🚀 Starting Weaver with coordination (Weaver-first pattern)");
250
251 // Cleanup any orphaned processes first
252 Self::cleanup_old_weaver_processes()?;
253
254 // Find available ports with intelligent fallback
255 let otlp_port = Self::find_available_port_with_fallback()?;
256 let admin_port = Self::find_available_port(8080, 8090).or_else(|_| {
257 warn!("Primary admin port range exhausted, trying fallback");
258 Self::find_available_port(9080, 9090)
259 })?;
260
261 // Update config with discovered ports
262 self.config.otlp_port = otlp_port;
263 self.config.admin_port = admin_port;
264
265 info!("📡 Discovered OTLP port: {}", otlp_port);
266 info!("🔧 Discovered admin port: {}", admin_port);
267
268 // Ensure output directory exists
269 std::fs::create_dir_all(&self.config.output_dir).map_err(|e| {
270 CleanroomError::io_error(format!("Failed to create output directory: {}", e))
271 })?;
272
273 // Build Weaver command
274 let mut cmd = Command::new("weaver");
275 cmd.args([
276 "registry",
277 "live-check",
278 "--registry",
279 &self.config.registry_path.display().to_string(),
280 "--otlp-grpc-port",
281 &self.config.otlp_port.to_string(),
282 "--admin-port",
283 &self.config.admin_port.to_string(),
284 "--output",
285 &self.config.output_dir.display().to_string(),
286 "--format",
287 "json",
288 ]);
289
290 // Add streaming flag if disabled
291 if !self.config.stream {
292 cmd.arg("--no-stream");
293 }
294
295 debug!("Weaver command: {:?}", cmd);
296
297 // Spawn process with piped output
298 let mut child = cmd
299 .stdout(Stdio::piped())
300 .stderr(Stdio::piped())
301 .spawn()
302 .map_err(|e| {
303 CleanroomError::internal_error(format!(
304 "Failed to start Weaver (is it installed?): {}",
305 e
306 ))
307 })?;
308
309 let weaver_pid = child.id();
310 info!("🔍 Weaver process started (PID: {})", weaver_pid);
311
312 // If streaming is enabled, spawn a monitor thread
313 if self.config.stream {
314 let stderr = child
315 .stderr
316 .take()
317 .ok_or_else(|| CleanroomError::internal_error("Failed to capture Weaver stderr"))?;
318
319 let violations_flag = Arc::clone(&self.has_violations);
320 let monitor = thread::spawn(move || {
321 let reader = BufReader::new(stderr);
322 for line in reader.lines() {
323 match line {
324 Ok(line) => {
325 debug!("Weaver: {}", line);
326 // Check for violation indicators
327 if line.contains("violation") || line.contains("error") {
328 violations_flag.store(true, Ordering::Relaxed);
329 }
330 }
331 Err(e) => {
332 warn!("Error reading Weaver output: {}", e);
333 break;
334 }
335 }
336 }
337 });
338
339 self.monitor_thread = Some(monitor);
340 }
341
342 self.live_check_process = Some(child);
343
344 // Wait for Weaver to be ready
345 self.wait_for_ready(Duration::from_secs(10))?;
346
347 let ready_at = Instant::now();
348 info!("✅ Weaver is ready and coordinated");
349
350 // Create coordination metadata
351 let coordination = WeaverCoordination {
352 weaver_pid,
353 otlp_grpc_port: otlp_port,
354 admin_port,
355 ready_at,
356 };
357
358 // Store coordination for later queries
359 self.coordination = Some(coordination.clone());
360
361 Ok(coordination)
362 }
363
364 /// Get current coordination state (non-blocking)
365 ///
366 /// Returns None if Weaver not started via `start_and_coordinate()`,
367 /// otherwise returns the coordination metadata.
368 pub fn coordination(&self) -> Option<WeaverCoordination> {
369 self.coordination.clone()
370 }
371
372 /// Find available port with intelligent fallback
373 ///
374 /// Strategy:
375 /// 1. Try primary range (4317-4327) - standard OTLP gRPC ports
376 /// 2. Fallback to secondary range (5317-5327) if primary exhausted
377 ///
378 /// # Errors
379 ///
380 /// Returns an error if no ports available in any range
381 fn find_available_port_with_fallback() -> Result<u16> {
382 debug!("Searching for available OTLP port with fallback");
383
384 // Try primary range (standard OTLP gRPC ports)
385 if let Ok(port) = Self::find_available_port(4317, 4327) {
386 info!("✅ Found available port in primary range: {}", port);
387 return Ok(port);
388 }
389
390 // Fallback to secondary range
391 warn!("Primary OTLP port range (4317-4327) exhausted, trying fallback range");
392 Self::find_available_port(5317, 5327).map_err(|_| {
393 CleanroomError::validation_error(
394 "No available ports in range 4317-4327, 5317-5327. \
395 All ports in use. Stop other OTLP services or use custom port range.",
396 )
397 })
398 }
399
400 /// Wait for Weaver to become ready
401 ///
402 /// Health check strategy:
403 /// 1. Initial delay (1000ms) for process startup
404 /// 2. Check process still running (not crashed)
405 /// 3. Return success if process is running
406 ///
407 /// Future enhancement: Add HTTP health check to admin port
408 fn wait_for_ready(&mut self, _timeout: Duration) -> Result<()> {
409 info!("⏳ Waiting for Weaver to become ready...");
410 let start = Instant::now();
411
412 // Initial startup delay
413 thread::sleep(Duration::from_millis(1000));
414
415 // Check process state
416 if let Some(ref mut process) = self.live_check_process {
417 match process.try_wait() {
418 Ok(Some(status)) => {
419 return Err(CleanroomError::internal_error(format!(
420 "Weaver exited prematurely with status: {}. \
421 Check Weaver logs in validation_output/ for details.",
422 status
423 )));
424 }
425 Ok(None) => {
426 // Still running, assume ready
427 let elapsed = start.elapsed();
428 info!("✅ Weaver ready (elapsed: {}ms)", elapsed.as_millis());
429 return Ok(());
430 }
431 Err(e) => {
432 return Err(CleanroomError::internal_error(format!(
433 "Failed to check Weaver status: {}",
434 e
435 )));
436 }
437 }
438 }
439
440 Err(CleanroomError::timeout_error(
441 "Weaver not ready within timeout",
442 ))
443 }
444
445 /// Find an available port in the specified range
446 ///
447 /// Attempts to bind to each port in range [start, end] and returns the first
448 /// available port. This prevents port conflicts when starting Weaver.
449 ///
450 /// # Arguments
451 ///
452 /// * `start` - Start of port range (inclusive)
453 /// * `end` - End of port range (inclusive)
454 ///
455 /// # Errors
456 ///
457 /// Returns an error if no ports are available in the range
458 fn find_available_port(start: u16, end: u16) -> Result<u16> {
459 debug!("Searching for available port in range {}-{}", start, end);
460
461 for port in start..=end {
462 match TcpListener::bind(("127.0.0.1", port)) {
463 Ok(_) => {
464 debug!("Found available port: {}", port);
465 return Ok(port);
466 }
467 Err(_) => continue,
468 }
469 }
470
471 Err(CleanroomError::validation_error(format!(
472 "No available ports in range {}-{}. All ports are in use.",
473 start, end
474 )))
475 }
476
477 /// Cleanup any orphaned Weaver processes
478 ///
479 /// Attempts to kill any existing `weaver registry live-check` processes
480 /// to prevent port conflicts and resource leaks. This is called before
481 /// starting a new Weaver instance.
482 fn cleanup_old_weaver_processes() -> Result<()> {
483 debug!("Cleaning up orphaned Weaver processes");
484
485 // Try to kill any existing weaver live-check processes
486 #[cfg(unix)]
487 {
488 let _ = Command::new("pkill")
489 .args(["-9", "-f", "weaver registry live-check"])
490 .output();
491
492 // Give processes time to terminate
493 thread::sleep(Duration::from_millis(500));
494 }
495
496 #[cfg(not(unix))]
497 {
498 // On Windows, use taskkill
499 let _ = Command::new("taskkill")
500 .args(&["/F", "/IM", "weaver.exe"])
501 .output();
502
503 thread::sleep(Duration::from_millis(500));
504 }
505
506 Ok(())
507 }
508
509 /// Start Weaver live-check listener
510 ///
511 /// Spawns Weaver as a child process and waits for it to be ready.
512 /// Includes intelligent port management and process cleanup.
513 ///
514 /// # Errors
515 ///
516 /// Returns an error if:
517 /// - Weaver binary is not found
518 /// - Failed to create output directory
519 /// - Process failed to start
520 /// - Listener failed to become ready
521 /// - No available ports in range
522 pub fn start_live_check(&mut self) -> Result<()> {
523 info!("🔍 Starting Weaver live-check validation");
524
525 // Cleanup any orphaned processes first
526 Self::cleanup_old_weaver_processes()?;
527
528 // Find available ports with retry logic
529 let otlp_port = Self::find_available_port(4317, 4327).or_else(|_| {
530 warn!("Primary OTLP port range exhausted, trying fallback range");
531 Self::find_available_port(5317, 5327)
532 })?;
533
534 let admin_port = Self::find_available_port(8080, 8090).or_else(|_| {
535 warn!("Primary admin port range exhausted, trying fallback range");
536 Self::find_available_port(9080, 9090)
537 })?;
538
539 // Update config with discovered ports
540 self.config.otlp_port = otlp_port;
541 self.config.admin_port = admin_port;
542
543 info!("📡 Using OTLP port: {}", otlp_port);
544 info!("🔧 Using admin port: {}", admin_port);
545
546 // Ensure output directory exists
547 std::fs::create_dir_all(&self.config.output_dir).map_err(|e| {
548 CleanroomError::io_error(format!("Failed to create output directory: {}", e))
549 })?;
550
551 // Build Weaver command
552 let mut cmd = Command::new("weaver");
553 cmd.args([
554 "registry",
555 "live-check",
556 "--registry",
557 &self.config.registry_path.display().to_string(),
558 "--otlp-grpc-port",
559 &self.config.otlp_port.to_string(),
560 "--admin-port",
561 &self.config.admin_port.to_string(),
562 "--output",
563 &self.config.output_dir.display().to_string(),
564 "--format",
565 "json",
566 ]);
567
568 // Add streaming flag if disabled
569 if !self.config.stream {
570 cmd.arg("--no-stream");
571 }
572
573 debug!("Weaver command: {:?}", cmd);
574
575 // Spawn process with piped output
576 let mut child = cmd
577 .stdout(Stdio::piped())
578 .stderr(Stdio::piped())
579 .spawn()
580 .map_err(|e| {
581 CleanroomError::internal_error(format!(
582 "Failed to start Weaver (is it installed?): {}",
583 e
584 ))
585 })?;
586
587 // If streaming is enabled, spawn a monitor thread
588 if self.config.stream {
589 let stderr = child
590 .stderr
591 .take()
592 .ok_or_else(|| CleanroomError::internal_error("Failed to capture Weaver stderr"))?;
593
594 let violations_flag = Arc::clone(&self.has_violations);
595 let monitor = thread::spawn(move || {
596 let reader = BufReader::new(stderr);
597 for line in reader.lines() {
598 match line {
599 Ok(line) => {
600 debug!("Weaver: {}", line);
601 // Check for violation indicators
602 if line.contains("violation") || line.contains("error") {
603 violations_flag.store(true, Ordering::Relaxed);
604 }
605 }
606 Err(e) => {
607 warn!("Error reading Weaver output: {}", e);
608 break;
609 }
610 }
611 }
612 });
613
614 self.monitor_thread = Some(monitor);
615 }
616
617 self.live_check_process = Some(child);
618
619 // Wait for listener to be ready with proper health check
620 info!("⏳ Waiting for Weaver to initialize...");
621 self.wait_for_weaver_ready()?;
622
623 Ok(())
624 }
625
626 /// Stop Weaver and retrieve validation report
627 ///
628 /// Sends SIGHUP to gracefully stop Weaver, then parses the validation report.
629 ///
630 /// # Errors
631 ///
632 /// Returns an error if:
633 /// - Weaver is not running
634 /// - Failed to stop the process
635 /// - Report file not found or invalid JSON
636 pub fn stop_and_report(&mut self) -> Result<ValidationReport> {
637 info!("🛑 Stopping Weaver and retrieving validation report");
638
639 let mut process = self
640 .live_check_process
641 .take()
642 .ok_or_else(|| CleanroomError::internal_error("Weaver live-check is not running"))?;
643
644 // Send graceful shutdown signal
645 #[cfg(unix)]
646 {
647 use nix::sys::signal::{kill, Signal};
648 use nix::unistd::Pid;
649
650 let pid = Pid::from_raw(process.id() as i32);
651 debug!("Sending SIGHUP to Weaver (PID: {})", pid);
652
653 kill(pid, Signal::SIGHUP).map_err(|e| {
654 CleanroomError::internal_error(format!("Failed to send SIGHUP: {}", e))
655 })?;
656 }
657
658 // On non-Unix, just kill the process
659 #[cfg(not(unix))]
660 {
661 warn!("Graceful shutdown not supported on this platform, killing process");
662 process.kill().map_err(|e| {
663 CleanroomError::internal_error(format!("Failed to kill Weaver: {}", e))
664 })?;
665 }
666
667 // Wait for process to finish (with timeout)
668 let output = self.wait_with_timeout(&mut process, Duration::from_secs(10))?;
669
670 // Log output for debugging
671 if !output.stdout.is_empty() {
672 debug!("Weaver stdout: {}", String::from_utf8_lossy(&output.stdout));
673 }
674 if !output.stderr.is_empty() {
675 debug!("Weaver stderr: {}", String::from_utf8_lossy(&output.stderr));
676 }
677
678 // Wait for monitor thread to finish
679 if let Some(monitor) = self.monitor_thread.take() {
680 let _ = monitor.join();
681 }
682
683 // Parse validation report
684 let report_path = self.config.output_dir.join("validation_report.json");
685 if !report_path.exists() {
686 warn!("Validation report not found at {:?}", report_path);
687 // Return a default report indicating unknown status
688 return Ok(ValidationReport::default());
689 }
690
691 let report_json = std::fs::read_to_string(&report_path).map_err(|e| {
692 CleanroomError::io_error(format!("Failed to read validation report: {}", e))
693 })?;
694
695 let mut report: ValidationReport = serde_json::from_str(&report_json).map_err(|e| {
696 CleanroomError::serialization_error(format!("Failed to parse validation report: {}", e))
697 })?;
698
699 // CRITICAL: Zero-sample validation (prevents false positives)
700 if report.sample_count == 0 {
701 error!("🚨 CRITICAL: Weaver received ZERO telemetry samples!");
702 error!(" This means validation did not actually test anything.");
703 error!(" Possible causes:");
704 error!(" - OTEL exporter not configured correctly");
705 error!(" - Telemetry sent to wrong port");
706 error!(" - Tests failed before emitting telemetry");
707 report.status = ValidationStatus::Failure;
708 }
709
710 // Log summary
711 info!("📊 Validation Report Summary:");
712 info!(" Status: {:?}", report.status);
713 info!(" Samples Received: {}", report.sample_count);
714 info!(" Violations: {}", report.violations);
715 info!(" Improvements: {}", report.improvements);
716 info!(" Information: {}", report.information);
717 info!(
718 " Registry Coverage: {:.1}%",
719 report.registry_coverage * 100.0
720 );
721
722 if report.violations > 0 {
723 error!("❌ Weaver detected {} violations", report.violations);
724 for detail in &report.details {
725 if detail.level == "violation" {
726 error!(" - {}", detail.message);
727 }
728 }
729 } else {
730 info!("✅ No violations detected");
731 }
732
733 Ok(report)
734 }
735
736 /// Check if validation has detected violations during streaming
737 ///
738 /// This is only useful when streaming is enabled in the configuration.
739 pub fn is_validation_passing(&self) -> bool {
740 !self.has_violations.load(Ordering::Relaxed)
741 }
742
743 /// Wait for Weaver to become ready by polling the admin health endpoint
744 ///
745 /// Uses exponential backoff with a maximum timeout. Polls the HTTP health endpoint
746 /// at `http://localhost:{admin_port}/health` to verify Weaver is actually listening.
747 ///
748 /// # Errors
749 ///
750 /// Returns an error if:
751 /// - Weaver process exits before becoming ready
752 /// - Health check times out (default: 30 seconds)
753 /// - HTTP health check fails persistently
754 fn wait_for_weaver_ready(&mut self) -> Result<()> {
755 const MAX_TIMEOUT: Duration = Duration::from_secs(30);
756 const INITIAL_DELAY: Duration = Duration::from_millis(100);
757 const MAX_DELAY: Duration = Duration::from_millis(1000);
758
759 let start = Instant::now();
760 let mut delay = INITIAL_DELAY;
761
762 // Check if process is still running
763 if let Some(ref mut process) = self.live_check_process {
764 match process.try_wait() {
765 Ok(Some(status)) => {
766 return Err(CleanroomError::internal_error(format!(
767 "Weaver exited prematurely with status: {}. \
768 Check Weaver logs in validation_output/ for details.",
769 status
770 )));
771 }
772 Ok(None) => {
773 // Process still running, continue with health check
774 }
775 Err(e) => {
776 return Err(CleanroomError::internal_error(format!(
777 "Failed to check Weaver status: {}",
778 e
779 )));
780 }
781 }
782 }
783
784 let admin_port = self.config.admin_port;
785 let health_url = format!("http://localhost:{}/health", admin_port);
786
787 loop {
788 // Check timeout
789 if start.elapsed() > MAX_TIMEOUT {
790 return Err(CleanroomError::timeout_error(format!(
791 "Weaver health check timed out after {}s. \
792 Admin port: {}, Health URL: {}",
793 MAX_TIMEOUT.as_secs(),
794 admin_port,
795 health_url
796 )));
797 }
798
799 // Attempt HTTP health check using blocking reqwest client
800 match Self::check_weaver_health(&health_url) {
801 Ok(true) => {
802 let elapsed = start.elapsed();
803 info!(
804 "✅ Weaver health check passed (elapsed: {}ms)",
805 elapsed.as_millis()
806 );
807 return Ok(());
808 }
809 Ok(false) => {
810 // Health check failed or network error, continue polling
811 debug!("Health check not ready yet, will retry...");
812 }
813 Err(e) => {
814 // Unexpected error, but continue polling
815 debug!("Health check error: {} (will retry)", e);
816 }
817 }
818
819 // Wait before next attempt with exponential backoff
820 thread::sleep(delay);
821 delay = std::cmp::min(delay * 2, MAX_DELAY);
822
823 // Re-check process state periodically
824 if let Some(ref mut process) = self.live_check_process {
825 if let Ok(Some(status)) = process.try_wait() {
826 return Err(CleanroomError::internal_error(format!(
827 "Weaver exited during health check with status: {}. \
828 Check Weaver logs in validation_output/ for details.",
829 status
830 )));
831 }
832 }
833 }
834 }
835
836 /// Check Weaver health endpoint
837 ///
838 /// Performs a blocking HTTP GET request to the Weaver health endpoint.
839 /// Returns `Ok(true)` if health check succeeds, `Ok(false)` if it fails
840 /// or network error occurs (allowing polling to continue).
841 fn check_weaver_health(url: &str) -> Result<bool> {
842 // Use blocking reqwest client for health checks
843 let client = match reqwest::blocking::Client::builder()
844 .timeout(Duration::from_secs(2))
845 .build()
846 {
847 Ok(client) => client,
848 Err(e) => {
849 debug!("Failed to create HTTP client: {} (will retry)", e);
850 return Ok(false);
851 }
852 };
853
854 match client.get(url).send() {
855 Ok(response) => {
856 let status = response.status();
857 Ok(status.is_success())
858 }
859 Err(e) => {
860 // Network errors are expected during startup, return false to continue polling
861 debug!("Health check request failed: {} (will retry)", e);
862 Ok(false)
863 }
864 }
865 }
866
867 /// Get the OTLP port that Weaver is listening on
868 ///
869 /// This returns the port discovered during `start_live_check()`.
870 /// Useful for configuring OTEL exporters to send to the correct endpoint.
871 pub fn get_otlp_port(&self) -> u16 {
872 self.config.otlp_port
873 }
874
875 /// Get the admin port that Weaver is listening on
876 ///
877 /// This returns the port discovered during `start_live_check()`.
878 pub fn get_admin_port(&self) -> u16 {
879 self.config.admin_port
880 }
881
882 /// Wait for process to finish with timeout
883 fn wait_with_timeout(
884 &self,
885 process: &mut Child,
886 timeout: Duration,
887 ) -> Result<std::process::Output> {
888 let start = std::time::Instant::now();
889
890 loop {
891 match process.try_wait() {
892 Ok(Some(status)) => {
893 // Process finished, collect any remaining output
894 use std::io::Read;
895
896 let mut stdout = Vec::new();
897 let mut stderr = Vec::new();
898
899 if let Some(mut out) = process.stdout.take() {
900 let _ = out.read_to_end(&mut stdout);
901 }
902 if let Some(mut err) = process.stderr.take() {
903 let _ = err.read_to_end(&mut stderr);
904 }
905
906 return Ok(std::process::Output {
907 status,
908 stdout,
909 stderr,
910 });
911 }
912 Ok(None) => {
913 // Still running
914 if start.elapsed() > timeout {
915 // Timeout reached, kill process
916 warn!("Weaver did not stop gracefully, killing");
917 let _ = process.kill();
918 return Err(CleanroomError::timeout_error(
919 "Weaver did not stop within timeout",
920 ));
921 }
922 thread::sleep(Duration::from_millis(100));
923 }
924 Err(e) => {
925 return Err(CleanroomError::internal_error(format!(
926 "Failed to check Weaver status: {}",
927 e
928 )));
929 }
930 }
931 }
932 }
933}
934
935impl Drop for WeaverController {
936 fn drop(&mut self) {
937 // Ensure process is killed if still running
938 if let Some(mut process) = self.live_check_process.take() {
939 debug!("Cleaning up Weaver process in Drop");
940 let _ = process.kill();
941 let _ = process.wait();
942 }
943
944 // Wait for monitor thread
945 if let Some(monitor) = self.monitor_thread.take() {
946 let _ = monitor.join();
947 }
948 }
949}
950
951#[cfg(test)]
952mod tests {
953 use super::*;
954
955 #[test]
956 fn test_weaver_config_defaults() {
957 let config = WeaverConfig::default();
958 // Default config uses 0 for auto-discovery, not hardcoded ports
959 assert_eq!(config.otlp_port, 0); // 0 = auto-discover available port
960 assert_eq!(config.admin_port, 0); // 0 = auto-discover available port
961 assert_eq!(config.registry_path, PathBuf::from("registry"));
962 assert!(!config.stream);
963 }
964
965 #[test]
966 fn test_validation_report_default() {
967 let report = ValidationReport::default();
968 // Default status is Failure (no telemetry = failed validation)
969 assert_eq!(report.status, ValidationStatus::Failure);
970 assert_eq!(report.violations, 0);
971 assert_eq!(report.improvements, 0);
972 assert_eq!(report.information, 0);
973 assert_eq!(report.registry_coverage, 0.0);
974 assert!(report.details.is_empty());
975 }
976
977 #[test]
978 fn test_validation_status_serialization() {
979 let success = ValidationStatus::Success;
980 let json = serde_json::to_string(&success).unwrap();
981 assert_eq!(json, "\"success\"");
982
983 let failure = ValidationStatus::Failure;
984 let json = serde_json::to_string(&failure).unwrap();
985 assert_eq!(json, "\"failure\"");
986 }
987
988 #[test]
989 fn test_weaver_controller_creation() {
990 let config = WeaverConfig::default();
991 let controller = WeaverController::new(config);
992 assert!(controller.live_check_process.is_none());
993 assert!(!controller.has_violations.load(Ordering::Relaxed));
994 }
995
996 #[test]
997 fn test_validation_passing_initial_state() {
998 let config = WeaverConfig::default();
999 let controller = WeaverController::new(config);
1000 assert!(controller.is_validation_passing());
1001 }
1002
1003 // Integration tests are in tests/weaver/ directory
1004 // See tests/weaver/otel_integration_tests.rs for complete integration test suite
1005
1006 #[test]
1007 #[ignore = "Requires Weaver installation"]
1008 fn test_weaver_controller_lifecycle() {
1009 // This test requires Weaver to be installed and a registry to exist
1010 // For comprehensive integration tests, see:
1011 // - tests/weaver/otel_integration_tests.rs (24 integration tests)
1012 // - tests/weaver/phase2_coordination/ (coordination pattern tests)
1013 // - tests/weaver/phase3_otel_integration/ (OTEL contract tests)
1014 //
1015 // These integration tests verify end-to-end Weaver coordination,
1016 // telemetry export, and schema validation.
1017 }
1018}