1use crate::error::{CleanroomError, Result};
29use serde::{Deserialize, Serialize};
30use std::path::{Path, PathBuf};
31use std::process::{Child, Command, Stdio};
32use std::time::Duration;
33use tracing::{debug, info, warn};
34
35#[derive(Debug, Clone)]
37pub struct EmitConfig {
38 pub registry_path: PathBuf,
40 pub endpoint: String,
42 pub stdout: bool,
44 pub debug: bool,
46}
47
48impl Default for EmitConfig {
49 fn default() -> Self {
50 Self {
51 registry_path: PathBuf::from("registry"),
52 endpoint: "http://localhost:4317".to_string(),
53 stdout: false,
54 debug: false,
55 }
56 }
57}
58
59impl EmitConfig {
60 pub fn stdout() -> Self {
62 Self {
63 stdout: true,
64 ..Default::default()
65 }
66 }
67
68 pub fn with_endpoint<S: Into<String>>(endpoint: S) -> Self {
70 Self {
71 endpoint: endpoint.into(),
72 ..Default::default()
73 }
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct EmitResult {
80 pub spans_emitted: usize,
82 pub metrics_emitted: usize,
84 pub events_emitted: usize,
86 pub total_signals: usize,
88 pub success: bool,
90 pub error: Option<String>,
92}
93
94impl EmitResult {
95 pub fn success(spans: usize, metrics: usize, events: usize) -> Self {
97 Self {
98 spans_emitted: spans,
99 metrics_emitted: metrics,
100 events_emitted: events,
101 total_signals: spans + metrics + events,
102 success: true,
103 error: None,
104 }
105 }
106
107 pub fn failure(error: String) -> Self {
109 Self {
110 spans_emitted: 0,
111 metrics_emitted: 0,
112 events_emitted: 0,
113 total_signals: 0,
114 success: false,
115 error: Some(error),
116 }
117 }
118}
119
120pub struct WeaverEmitter {
125 config: EmitConfig,
126}
127
128impl WeaverEmitter {
129 pub fn new<P: AsRef<Path>>(registry_path: P) -> Self {
131 Self {
132 config: EmitConfig {
133 registry_path: registry_path.as_ref().to_path_buf(),
134 ..Default::default()
135 },
136 }
137 }
138
139 pub fn with_config(config: EmitConfig) -> Self {
141 Self { config }
142 }
143
144 pub fn emit(&self) -> Result<EmitResult> {
157 info!(
158 "🚀 Emitting telemetry from registry: {}",
159 self.config.registry_path.display()
160 );
161
162 if !self.config.registry_path.exists() {
164 return Err(CleanroomError::validation_error(format!(
165 "Registry not found: {}",
166 self.config.registry_path.display()
167 )));
168 }
169
170 let mut cmd = Command::new("weaver");
172 cmd.args([
173 "registry",
174 "emit",
175 "--registry",
176 &self.config.registry_path.display().to_string(),
177 ]);
178
179 if self.config.stdout {
180 cmd.arg("--stdout");
181 } else {
182 cmd.args(["--endpoint", &self.config.endpoint]);
183 }
184
185 if self.config.debug {
186 cmd.arg("--debug");
187 }
188
189 debug!("Weaver emit command: {:?}", cmd);
190
191 let output = cmd.output().map_err(|e| {
193 CleanroomError::internal_error(format!(
194 "Failed to run weaver emit (is it installed?): {}",
195 e
196 ))
197 })?;
198
199 let stdout = String::from_utf8_lossy(&output.stdout);
200 let stderr = String::from_utf8_lossy(&output.stderr);
201
202 if self.config.debug {
203 debug!("Weaver emit stdout: {}", stdout);
204 debug!("Weaver emit stderr: {}", stderr);
205 }
206
207 if !output.status.success() {
208 return Ok(EmitResult::failure(format!(
209 "Weaver emit failed: {}",
210 stderr
211 )));
212 }
213
214 let result = self.parse_emit_output(&stdout, &stderr)?;
216
217 info!(
218 "✅ Emitted {} signals ({} spans, {} metrics, {} events)",
219 result.total_signals,
220 result.spans_emitted,
221 result.metrics_emitted,
222 result.events_emitted
223 );
224
225 Ok(result)
226 }
227
228 pub fn start_continuous(&self) -> Result<EmitHandle> {
237 info!("🔄 Starting continuous telemetry emission");
238
239 let mut cmd = Command::new("weaver");
241 cmd.args([
242 "registry",
243 "emit",
244 "--registry",
245 &self.config.registry_path.display().to_string(),
246 "--endpoint",
247 &self.config.endpoint,
248 ]);
249
250 if self.config.debug {
251 cmd.arg("--debug");
252 }
253
254 let child = cmd
256 .stdout(Stdio::piped())
257 .stderr(Stdio::piped())
258 .spawn()
259 .map_err(|e| {
260 CleanroomError::internal_error(format!("Failed to spawn weaver emit: {}", e))
261 })?;
262
263 info!("✅ Continuous emission started (PID: {})", child.id());
264
265 Ok(EmitHandle { process: child })
266 }
267
268 pub fn emit_to_string(&self) -> Result<String> {
272 let mut config = self.config.clone();
273 config.stdout = true;
274
275 let emitter = WeaverEmitter::with_config(config);
276 let result = emitter.emit()?;
277
278 if !result.success {
279 return Err(CleanroomError::validation_error(format!(
280 "Emission failed: {}",
281 result.error.unwrap_or_default()
282 )));
283 }
284
285 let output = Command::new("weaver")
287 .args([
288 "registry",
289 "emit",
290 "--registry",
291 &self.config.registry_path.display().to_string(),
292 "--stdout",
293 ])
294 .output()
295 .map_err(|e| CleanroomError::internal_error(format!("Failed to run weaver: {}", e)))?;
296
297 Ok(String::from_utf8_lossy(&output.stdout).to_string())
298 }
299
300 fn parse_emit_output(&self, _stdout: &str, stderr: &str) -> Result<EmitResult> {
302 let mut spans = 0;
306 let mut metrics = 0;
307 let mut events = 0;
308
309 for line in stderr.lines() {
310 if line.contains("Emitted") || line.contains("emitted") {
311 let words: Vec<&str> = line.split_whitespace().collect();
313 for (i, word) in words.iter().enumerate() {
314 if let Ok(num) = word.parse::<usize>() {
315 if i + 1 < words.len() {
316 let next = words[i + 1].to_lowercase();
317 if next.contains("span") {
318 spans = num;
319 } else if next.contains("metric") {
320 metrics = num;
321 } else if next.contains("event") || next.contains("log") {
322 events = num;
323 }
324 }
325 }
326 }
327 }
328 }
329
330 if spans == 0 && metrics == 0 && events == 0 {
332 warn!("Could not parse emission counts from output, assuming success");
333 spans = 1; }
335
336 Ok(EmitResult::success(spans, metrics, events))
337 }
338}
339
340pub struct EmitHandle {
344 process: Child,
345}
346
347impl EmitHandle {
348 pub fn stop(mut self) -> Result<()> {
350 info!("🛑 Stopping continuous emission");
351
352 self.process.kill().map_err(|e| {
353 CleanroomError::internal_error(format!("Failed to kill emitter process: {}", e))
354 })?;
355
356 let _ = self.process.wait();
357 info!("✅ Emission stopped");
358
359 Ok(())
360 }
361
362 pub fn is_running(&mut self) -> bool {
364 match self.process.try_wait() {
365 Ok(None) => true,
366 _ => false,
367 }
368 }
369
370 pub fn wait_with_timeout(&mut self, timeout: Duration) -> Result<()> {
372 let start = std::time::Instant::now();
373
374 while start.elapsed() < timeout {
375 if !self.is_running() {
376 return Ok(());
377 }
378 std::thread::sleep(Duration::from_millis(100));
379 }
380
381 Err(CleanroomError::timeout_error(
382 "Emitter did not stop within timeout",
383 ))
384 }
385}
386
387impl Drop for EmitHandle {
388 fn drop(&mut self) {
389 debug!("Cleaning up emitter process in Drop");
390 let _ = self.process.kill();
391 let _ = self.process.wait();
392 }
393}
394
395pub struct FixtureGenerator {
399 registry_path: PathBuf,
400}
401
402impl FixtureGenerator {
403 pub fn new<P: AsRef<Path>>(registry_path: P) -> Self {
405 Self {
406 registry_path: registry_path.as_ref().to_path_buf(),
407 }
408 }
409
410 pub fn generate_json_fixtures(&self) -> Result<String> {
412 let config = EmitConfig {
413 registry_path: self.registry_path.clone(),
414 stdout: true,
415 ..Default::default()
416 };
417
418 let emitter = WeaverEmitter::with_config(config);
419 emitter.emit_to_string()
420 }
421
422 pub fn emit_to_file<P: AsRef<Path>>(&self, output_path: P) -> Result<()> {
424 let fixtures = self.generate_json_fixtures()?;
425
426 std::fs::write(output_path.as_ref(), fixtures)
427 .map_err(|e| CleanroomError::io_error(format!("Failed to write fixtures: {}", e)))?;
428
429 info!("✅ Fixtures written to: {}", output_path.as_ref().display());
430
431 Ok(())
432 }
433
434 pub fn seed_collector(&self, endpoint: &str) -> Result<EmitResult> {
436 let config = EmitConfig {
437 registry_path: self.registry_path.clone(),
438 endpoint: endpoint.to_string(),
439 ..Default::default()
440 };
441
442 let emitter = WeaverEmitter::with_config(config);
443 emitter.emit()
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 #[test]
452 fn test_emit_config_default() {
453 let config = EmitConfig::default();
454 assert_eq!(config.registry_path, PathBuf::from("registry"));
455 assert_eq!(config.endpoint, "http://localhost:4317");
456 assert!(!config.stdout);
457 assert!(!config.debug);
458 }
459
460 #[test]
461 fn test_emit_config_stdout() {
462 let config = EmitConfig::stdout();
463 assert!(config.stdout);
464 }
465
466 #[test]
467 fn test_emit_config_with_endpoint() {
468 let config = EmitConfig::with_endpoint("http://example.com:4317");
469 assert_eq!(config.endpoint, "http://example.com:4317");
470 }
471
472 #[test]
473 fn test_emit_result_success() {
474 let result = EmitResult::success(10, 5, 3);
475 assert!(result.success);
476 assert_eq!(result.spans_emitted, 10);
477 assert_eq!(result.metrics_emitted, 5);
478 assert_eq!(result.events_emitted, 3);
479 assert_eq!(result.total_signals, 18);
480 assert!(result.error.is_none());
481 }
482
483 #[test]
484 fn test_emit_result_failure() {
485 let result = EmitResult::failure("Test error".to_string());
486 assert!(!result.success);
487 assert_eq!(result.total_signals, 0);
488 assert_eq!(result.error, Some("Test error".to_string()));
489 }
490
491 #[test]
492 fn test_weaver_emitter_creation() {
493 let emitter = WeaverEmitter::new("registry/");
494 assert_eq!(emitter.config.registry_path, PathBuf::from("registry/"));
495 }
496
497 #[test]
498 fn test_fixture_generator_creation() {
499 let generator = FixtureGenerator::new("registry/");
500 assert_eq!(generator.registry_path, PathBuf::from("registry/"));
501 }
502
503 #[test]
504 fn test_parse_emit_output() {
505 let emitter = WeaverEmitter::new("test");
506 let stderr = "Emitted 15 spans, 10 metrics, 5 events successfully";
507
508 let result = emitter.parse_emit_output("", stderr).unwrap();
509 assert_eq!(result.spans_emitted, 15);
510 assert_eq!(result.metrics_emitted, 10);
511 assert_eq!(result.events_emitted, 5);
512 assert!(result.success);
513 }
514
515 #[test]
516 #[ignore = "Requires Weaver installation"]
517 fn test_emit_integration() {
518 }
521}