1use crate::backend::{Backend, TestcontainerBackend, Cmd};
8use crate::error::{CleanroomError, Result};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use uuid::Uuid;
13#[cfg(feature = "otel-traces")]
14use opentelemetry::global;
15#[cfg(feature = "otel-traces")]
16use opentelemetry::KeyValue;
17#[cfg(feature = "otel-traces")]
18use opentelemetry::trace::{TracerProvider, Tracer, Span};
19use testcontainers::runners::AsyncRunner;
20use testcontainers_modules::surrealdb::{SurrealDb, SURREALDB_PORT};
21use std::future::Future;
22use std::pin::Pin;
23use std::any::Any;
24
25pub trait ServicePlugin: Send + Sync {
27 fn name(&self) -> &str;
29
30 fn start(&self) -> Pin<Box<dyn Future<Output = Result<ServiceHandle>> + Send + '_>>;
32
33 fn stop(&self, handle: ServiceHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
35
36 fn health_check(&self, handle: &ServiceHandle) -> HealthStatus;
38}
39
40#[derive(Debug, Clone)]
42pub struct ServiceHandle {
43 pub id: String,
45 pub service_name: String,
47 pub metadata: HashMap<String, String>,
49}
50
51#[derive(Debug, Clone, PartialEq)]
53pub enum HealthStatus {
54 Healthy,
56 Unhealthy,
58 Unknown,
60}
61
62#[derive(Default)]
64pub struct ServiceRegistry {
65 plugins: HashMap<String, Box<dyn ServicePlugin>>,
67 active_services: HashMap<String, ServiceHandle>,
69}
70
71impl ServiceRegistry {
72 pub fn new() -> Self {
74 Self::default()
75 }
76
77 pub fn register_plugin(&mut self, plugin: Box<dyn ServicePlugin>) {
79 let name = plugin.name().to_string();
80 self.plugins.insert(name, plugin);
81 }
82
83 pub async fn start_service(&mut self, service_name: &str) -> Result<ServiceHandle> {
85 let plugin = self.plugins.get(service_name)
86 .ok_or_else(|| CleanroomError::internal_error(&format!(
87 "Service plugin '{}' not found", service_name
88 )))?;
89
90 let handle = plugin.start().await?;
91 self.active_services.insert(handle.id.clone(), handle.clone());
92
93 Ok(handle)
94 }
95
96 pub async fn stop_service(&mut self, handle_id: &str) -> Result<()> {
98 if let Some(handle) = self.active_services.remove(handle_id) {
99 let plugin = self.plugins.get(&handle.service_name)
100 .ok_or_else(|| CleanroomError::internal_error(&format!(
101 "Service plugin '{}' not found for handle '{}'",
102 handle.service_name, handle_id
103 )))?;
104
105 plugin.stop(handle).await?;
106 }
107
108 Ok(())
109 }
110
111 pub async fn check_all_health(&self) -> HashMap<String, HealthStatus> {
113 let mut health_status = HashMap::new();
114
115 for (handle_id, handle) in &self.active_services {
116 if let Some(plugin) = self.plugins.get(&handle.service_name) {
117 health_status.insert(handle_id.clone(), plugin.health_check(handle));
118 } else {
119 health_status.insert(handle_id.clone(), HealthStatus::Unknown);
120 }
121 }
122
123 health_status
124 }
125
126 pub fn active_services(&self) -> &HashMap<String, ServiceHandle> {
128 &self.active_services
129 }
130
131 pub fn is_service_running(&self, service_name: &str) -> bool {
133 self.active_services.values()
134 .any(|handle| handle.service_name == service_name)
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct SimpleMetrics {
141 pub session_id: Uuid,
143 pub start_time: std::time::Instant,
145 pub tests_executed: u32,
147 pub tests_passed: u32,
149 pub tests_failed: u32,
151 pub total_duration_ms: u64,
153 pub active_containers: u32,
155 pub active_services: u32,
157 pub containers_created: u32,
159 pub containers_reused: u32,
161}
162
163impl Default for SimpleMetrics {
164 fn default() -> Self {
165 Self {
166 session_id: Uuid::new_v4(),
167 start_time: std::time::Instant::now(),
168 tests_executed: 0,
169 tests_passed: 0,
170 tests_failed: 0,
171 total_duration_ms: 0,
172 active_containers: 0,
173 active_services: 0,
174 containers_created: 0,
175 containers_reused: 0,
176 }
177 }
178}
179
180#[derive(Debug, Clone)]
182pub struct ExecutionResult {
183 pub exit_code: i32,
185 pub stdout: String,
187 pub stderr: String,
189 pub duration: std::time::Duration,
191 pub command: Vec<String>,
193 pub container_name: String,
195}
196
197impl ExecutionResult {
198 pub fn matches_regex(&self, pattern: &str) -> Result<bool> {
200 use regex::Regex;
201 let regex = Regex::new(pattern)
202 .map_err(|e| CleanroomError::validation_error(format!("Invalid regex pattern '{}': {}", pattern, e)))?;
203 Ok(regex.is_match(&self.stdout))
204 }
205
206 pub fn does_not_match_regex(&self, pattern: &str) -> Result<bool> {
208 Ok(!self.matches_regex(pattern)?)
209 }
210
211 pub fn succeeded(&self) -> bool {
213 self.exit_code == 0
214 }
215
216 pub fn failed(&self) -> bool {
218 !self.succeeded()
219 }
220}
221
222#[allow(dead_code)]
224pub struct CleanroomEnvironment {
225 session_id: Uuid,
227 backend: Arc<dyn Backend>,
229 services: Arc<RwLock<ServiceRegistry>>,
231 metrics: Arc<RwLock<SimpleMetrics>>,
233 container_registry: Arc<RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>>,
235 #[cfg(feature = "otel-metrics")]
237 meter: opentelemetry::metrics::Meter,
238 telemetry: Arc<RwLock<TelemetryState>>,
240}
241
242#[derive(Debug)]
244pub struct TelemetryState {
245 pub tracing_enabled: bool,
247 pub metrics_enabled: bool,
249 pub traces: Vec<String>,
251}
252
253impl TelemetryState {
254 pub fn enable_tracing(&mut self) {
256 self.tracing_enabled = true;
257 }
258
259 pub fn enable_metrics(&mut self) {
261 self.metrics_enabled = true;
262 }
263
264 pub fn get_traces(&self) -> Vec<String> {
266 self.traces.clone()
267 }
268}
269
270impl CleanroomEnvironment {
271 pub async fn new() -> Result<Self> {
273 Ok(Self {
274 session_id: Uuid::new_v4(),
275 #[cfg(feature = "otel-metrics")]
276 meter: {
277 let meter_provider = global::meter_provider();
278 meter_provider.meter("clnrm-cleanroom")
279 },
280 backend: Arc::new(TestcontainerBackend::new("alpine:latest")?),
281 services: Arc::new(RwLock::new(ServiceRegistry::new())),
282 metrics: Arc::new(RwLock::new(SimpleMetrics::default())),
283 container_registry: Arc::new(RwLock::new(HashMap::new())),
284 telemetry: Arc::new(RwLock::new(TelemetryState {
285 tracing_enabled: false,
286 metrics_enabled: false,
287 traces: Vec::new(),
288 })),
289 })
290 }
291
292 pub async fn execute_test<F, T>(&self, _test_name: &str, test_fn: F) -> Result<T>
294 where
295 F: FnOnce() -> Result<T>,
296 {
297 #[cfg(feature = "otel-traces")]
298 let tracer_provider = global::tracer_provider();
299 #[cfg(feature = "otel-traces")]
300 let mut span = tracer_provider.tracer("clnrm-cleanroom").start(format!("test.{}", _test_name));
301 #[cfg(feature = "otel-traces")]
302 span.set_attributes(vec![
303 KeyValue::new("test.name", _test_name.to_string()),
304 KeyValue::new("session.id", self.session_id.to_string()),
305 ]);
306
307 let start_time = std::time::Instant::now();
308
309 {
311 let mut metrics = self.metrics.write().await;
312 metrics.tests_executed += 1;
313 }
314
315 let result = test_fn();
316
317 let duration = start_time.elapsed();
318
319 let success = result.is_ok();
321 if success {
322 let mut metrics = self.metrics.write().await;
323 metrics.tests_passed += 1;
324 } else {
325 let mut metrics = self.metrics.write().await;
326 metrics.tests_failed += 1;
327 }
328
329 let mut metrics = self.metrics.write().await;
330 metrics.total_duration_ms += duration.as_millis() as u64;
331
332 #[cfg(feature = "otel-metrics")]
333 {
334 let attributes = vec![
336 KeyValue::new("test.name", _test_name.to_string()),
337 KeyValue::new("session.id", self.session_id.to_string()),
338 ];
339
340 let counter = self.meter.u64_counter("test.executions")
341 .with_description("Number of test executions")
342 .build();
343 counter.add(1, &attributes);
344
345 let histogram = self.meter.f64_histogram("test.duration")
346 .with_description("Test execution duration")
347 .build();
348 histogram.record(duration.as_secs_f64(), &attributes);
349 }
350
351 #[cfg(feature = "otel-traces")]
352 if !success {
353 span.set_status(opentelemetry::trace::Status::error("Test failed"));
354 }
355
356 #[cfg(feature = "otel-traces")]
357 span.end();
358
359 result
360 }
361
362 pub async fn get_metrics(&self) -> Result<SimpleMetrics> {
364 Ok(self.metrics.read().await.clone())
365 }
366
367 pub async fn enable_tracing(&self) -> Result<()> {
369 #[cfg(feature = "otel-traces")]
370 {
371 let mut telemetry = self.telemetry.write().await;
372 telemetry.enable_tracing();
373 }
374 Ok(())
375 }
376
377 pub async fn enable_metrics(&self) -> Result<()> {
379 #[cfg(feature = "otel-traces")]
380 {
381 let mut telemetry = self.telemetry.write().await;
382 telemetry.enable_metrics();
383 }
384 Ok(())
385 }
386
387 pub async fn get_traces(&self) -> Result<Vec<String>> {
389 #[cfg(feature = "otel-traces")]
390 {
391 let telemetry = self.telemetry.read().await;
392 Ok(telemetry.get_traces())
393 }
394 #[cfg(not(feature = "otel-traces"))]
395 {
396 Ok(Vec::new())
397 }
398 }
399
400 pub async fn get_container_reuse_stats(&self) -> (u32, u32) {
402 let metrics = self.metrics.read().await;
403 (metrics.containers_created, metrics.containers_reused)
404 }
405
406 pub async fn has_container(&self, name: &str) -> bool {
408 let registry = self.container_registry.read().await;
409 registry.contains_key(name)
410 }
411
412 pub async fn register_service(&self, plugin: Box<dyn ServicePlugin>) -> Result<()> {
414 let mut services = self.services.write().await;
415 services.register_plugin(plugin);
416 Ok(())
417 }
418
419 pub async fn start_service(&self, service_name: &str) -> Result<ServiceHandle> {
421 let mut services = self.services.write().await;
422 services.start_service(service_name).await
423 }
424
425 pub async fn stop_service(&self, handle_id: &str) -> Result<()> {
427 let mut services = self.services.write().await;
428 services.stop_service(handle_id).await
429 }
430
431 pub async fn services(&self) -> tokio::sync::RwLockReadGuard<'_, ServiceRegistry> {
433 self.services.read().await
434 }
435
436 pub async fn register_container<T: Send + Sync + 'static>(&self, name: String, container: T) -> Result<()> {
438 let mut registry = self.container_registry.write().await;
439 registry.insert(name, Box::new(container));
440 Ok(())
441 }
442
443 pub async fn get_or_create_container<F, T>(
448 &self,
449 name: &str,
450 factory: F,
451 ) -> Result<T>
452 where
453 F: FnOnce() -> Result<T>,
454 T: Send + Sync + Clone + 'static,
455 {
456 let existing_container = {
458 let registry = self.container_registry.read().await;
459 if let Some(existing_container) = registry.get(name) {
460 if let Some(typed_container) = existing_container.downcast_ref::<T>() {
462 Some(typed_container.clone())
463 } else {
464 None
465 }
466 } else {
467 None
468 }
469 };
470
471 if let Some(container) = existing_container {
472 {
474 let mut metrics = self.metrics.write().await;
475 metrics.containers_reused += 1;
476 }
477
478 return Ok(container);
479 }
480
481 let container = factory()?;
483
484 let mut registry = self.container_registry.write().await;
486 registry.insert(name.to_string(), Box::new(container.clone()));
487
488 {
490 let mut metrics = self.metrics.write().await;
491 metrics.containers_created += 1;
492 }
493
494 Ok(container)
495 }
496
497 pub async fn check_health(&self) -> HashMap<String, HealthStatus> {
499 self.services.read().await.check_all_health().await
500 }
501
502 pub fn session_id(&self) -> Uuid {
504 self.session_id
505 }
506
507 pub fn backend(&self) -> &dyn Backend {
509 self.backend.as_ref() as &dyn Backend
510 }
511
512 pub async fn execute_in_container(
518 &self,
519 container_name: &str,
520 command: &[String],
521 ) -> Result<ExecutionResult> {
522 #[cfg(feature = "otel-traces")]
523 let tracer_provider = global::tracer_provider();
524 #[cfg(feature = "otel-traces")]
525 let mut span = tracer_provider.tracer("clnrm-cleanroom").start(format!("container.exec.{}", container_name));
526 #[cfg(feature = "otel-traces")]
527 span.set_attributes(vec![
528 KeyValue::new("container.name", container_name.to_string()),
529 KeyValue::new("command", command.join(" ")),
530 KeyValue::new("session.id", self.session_id.to_string()),
531 ]);
532
533 let start_time = std::time::Instant::now();
534
535 let cmd = Cmd::new("sh")
538 .arg("-c")
539 .arg(command.join(" "))
540 .env("CONTAINER_NAME", container_name);
541
542 let backend = self.backend.clone();
545 let execution_result = tokio::task::spawn_blocking(move || {
546 backend.run_cmd(cmd)
547 }).await.map_err(|e| {
548 #[cfg(feature = "otel-traces")]
549 {
550 span.set_status(opentelemetry::trace::Status::error("Task join failed"));
551 span.end();
552 }
553 CleanroomError::internal_error("Failed to execute command in blocking task")
554 .with_context("Command execution task failed")
555 .with_source(e.to_string())
556 })?.map_err(|e| {
557 #[cfg(feature = "otel-traces")]
558 {
559 span.set_status(opentelemetry::trace::Status::error("Command execution failed"));
560 span.end();
561 }
562 CleanroomError::container_error("Failed to execute command in container")
563 .with_context(format!("Container: {}, Command: {}", container_name, command.join(" ")))
564 .with_source(e.to_string())
565 })?;
566
567 let duration = start_time.elapsed();
568
569 #[cfg(feature = "otel-metrics")]
571 {
572 let histogram = self.meter.f64_histogram("container.command.duration")
573 .with_description("Container command execution duration")
574 .build();
575 histogram.record(duration.as_secs_f64(), &[
576 KeyValue::new("container.name", container_name.to_string()),
577 KeyValue::new("command", command.join(" ")),
578 ]);
579 }
580
581 #[cfg(feature = "otel-traces")]
582 span.set_attributes(vec![
583 KeyValue::new("execution.exit_code", execution_result.exit_code.to_string()),
584 KeyValue::new("execution.duration_ms", duration.as_millis().to_string()),
585 ]);
586
587 #[cfg(feature = "otel-traces")]
588 if execution_result.exit_code != 0 {
589 span.set_status(opentelemetry::trace::Status::error("Command failed"));
590 }
591
592 #[cfg(feature = "otel-traces")]
593 span.end();
594
595 Ok(ExecutionResult {
596 exit_code: execution_result.exit_code,
597 stdout: execution_result.stdout,
598 stderr: execution_result.stderr,
599 duration,
600 command: command.to_vec(),
601 container_name: container_name.to_string(),
602 })
603 }
604}
605
606impl Default for CleanroomEnvironment {
607 fn default() -> Self {
608 #[cfg(feature = "otel-metrics")]
609 let meter_provider = global::meter_provider();
610 #[cfg(feature = "otel-metrics")]
611 let meter = meter_provider.meter("cleanroom");
612
613 let backend = TestcontainerBackend::new("alpine:latest")
614 .unwrap_or_else(|_| panic!("Failed to create default backend"));
615
616 Self {
617 session_id: Uuid::new_v4(),
618 #[cfg(feature = "otel-metrics")]
619 meter,
620 backend: Arc::new(backend),
621 services: Arc::new(RwLock::new(ServiceRegistry::new())),
622 metrics: Arc::new(RwLock::new(SimpleMetrics::default())),
623 container_registry: Arc::new(RwLock::new(HashMap::new())),
624 telemetry: Arc::new(RwLock::new(TelemetryState {
625 tracing_enabled: false,
626 metrics_enabled: false,
627 traces: Vec::new(),
628 })),
629 }
630 }
631}
632
633pub struct MockDatabasePlugin {
637 name: String,
638 container_id: Arc<RwLock<Option<String>>>,
639}
640
641impl MockDatabasePlugin {
642 pub fn new() -> Self {
643 Self {
644 name: "mock_database".to_string(),
645 container_id: Arc::new(RwLock::new(None)),
646 }
647 }
648}
649
650impl ServicePlugin for MockDatabasePlugin {
651 fn name(&self) -> &str {
652 &self.name
653 }
654
655 fn start(&self) -> Pin<Box<dyn Future<Output = Result<ServiceHandle>> + Send + '_>> {
656 Box::pin(async move {
657 let node = SurrealDb::default()
659 .start()
660 .await
661 .map_err(|e| CleanroomError::container_error("Failed to start SurrealDB")
662 .with_context("SurrealDB container startup failed")
663 .with_source(e.to_string()))?;
664
665 let host_port = node.get_host_port_ipv4(SURREALDB_PORT)
666 .await
667 .map_err(|e| CleanroomError::container_error("Failed to get port")
668 .with_source(e.to_string()))?;
669
670 let mut container_guard = self.container_id.write().await;
672 *container_guard = Some(format!("container-{}", host_port));
673
674 let mut metadata = HashMap::new();
676 metadata.insert("host".to_string(), "127.0.0.1".to_string());
677 metadata.insert("port".to_string(), host_port.to_string());
678 metadata.insert("username".to_string(), "root".to_string());
679 metadata.insert("password".to_string(), "root".to_string());
680
681 Ok(ServiceHandle {
682 id: Uuid::new_v4().to_string(),
683 service_name: self.name.clone(),
684 metadata,
685 })
686 })
687 }
688
689 fn stop(&self, _handle: ServiceHandle) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
690 Box::pin(async move {
691 let mut container_guard = self.container_id.write().await;
692 *container_guard = None; Ok(())
694 })
695 }
696
697 fn health_check(&self, handle: &ServiceHandle) -> HealthStatus {
698 if handle.metadata.contains_key("port") {
700 HealthStatus::Healthy
701 } else {
702 HealthStatus::Unknown
703 }
704 }
705}
706
707#[cfg(test)]
708mod tests {
709 use super::*;
710
711 #[tokio::test]
712 async fn test_cleanroom_creation() {
713 let result = CleanroomEnvironment::new().await;
714 assert!(result.is_ok()); }
716
717 #[tokio::test]
718 async fn test_cleanroom_session_id() {
719 let env = CleanroomEnvironment::default();
720 assert!(!env.session_id().is_nil());
721 }
722
723 #[tokio::test]
724 async fn test_cleanroom_execute_test() {
725 let env = CleanroomEnvironment::default();
726 let result = env.execute_test("test", || Ok::<i32, CleanroomError>(42)).await;
727 assert!(result.is_ok()); assert_eq!(result.unwrap(), 42);
729 }
730
731 #[tokio::test]
732 async fn test_service_registry() {
733 let env = CleanroomEnvironment::default();
734 let services = env.services().await;
735 assert!(services.active_services().is_empty());
736 }
737
738 #[tokio::test]
739 async fn test_service_plugin_registration() {
740 let env = CleanroomEnvironment::default();
741 let plugin = Box::new(MockDatabasePlugin::new());
742 let result = env.register_service(plugin).await;
743 assert!(result.is_ok()); }
745
746 #[tokio::test]
747 async fn test_service_start_stop() {
748 let env = CleanroomEnvironment::default();
749 let plugin = Box::new(MockDatabasePlugin::new());
750 env.register_service(plugin).await.unwrap();
751
752 let handle = env.start_service("mock_database").await.unwrap();
753 assert_eq!(handle.service_name, "mock_database");
754
755 let result = env.stop_service(&handle.id).await;
756 assert!(result.is_ok()); }
758
759 #[tokio::test]
760 async fn test_register_container() {
761 let env = CleanroomEnvironment::default();
762 let result = env.register_container("test-container".to_string(), "container-123".to_string()).await;
763 assert!(result.is_ok());
764
765 assert!(env.has_container("test-container").await);
767 }
768
769 #[tokio::test]
770 async fn test_get_or_create_container() {
771 let env = CleanroomEnvironment::default();
772
773 let result1 = env.get_or_create_container("test-container", || {
775 Ok::<String, CleanroomError>("container-instance".to_string())
776 }).await;
777 assert!(result1.is_ok());
778 assert_eq!(result1.unwrap(), "container-instance");
779
780 assert!(env.has_container("test-container").await);
782 let (created, reused) = env.get_container_reuse_stats().await;
783 assert_eq!(created, 1);
784 assert_eq!(reused, 0);
785
786 let result2 = env.get_or_create_container("test-container", || {
788 Ok::<String, CleanroomError>("container-instance-2".to_string())
789 }).await;
790 assert!(result2.is_ok());
791 assert_eq!(result2.unwrap(), "container-instance");
793
794 let (created, reused) = env.get_container_reuse_stats().await;
796 assert_eq!(created, 1);
797 assert_eq!(reused, 1);
798 }
799
800 #[tokio::test]
801 async fn test_check_health_delegates_to_service_registry() {
802 let env = CleanroomEnvironment::default();
803 let health_status = env.check_health().await;
804 assert!(health_status.is_empty());
806 }
807}