1use crate::backend::{Backend, Cmd, TestcontainerBackend};
8use crate::error::{CleanroomError, Result};
9use opentelemetry::global;
10use opentelemetry::trace::{Span, Tracer, TracerProvider};
11use opentelemetry::KeyValue;
12use std::any::Any;
13use std::collections::HashMap;
14use std::os::unix::process::ExitStatusExt;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use uuid::Uuid;
18
19pub trait ServicePlugin: Send + Sync + std::fmt::Debug {
21 fn name(&self) -> &str;
23
24 fn start(&self) -> Result<ServiceHandle>;
26
27 fn stop(&self, handle: ServiceHandle) -> Result<()>;
29
30 fn health_check(&self, handle: &ServiceHandle) -> HealthStatus;
32}
33
34#[derive(Debug, Clone)]
36pub struct ServiceHandle {
37 pub id: String,
39 pub service_name: String,
41 pub metadata: HashMap<String, String>,
43}
44
45#[derive(Debug, Clone, PartialEq)]
47pub enum HealthStatus {
48 Healthy,
50 Unhealthy,
52 Unknown,
54}
55
56#[derive(Debug, Default)]
58pub struct ServiceRegistry {
59 plugins: HashMap<String, Box<dyn ServicePlugin>>,
61 active_services: HashMap<String, ServiceHandle>,
63}
64
65impl ServiceRegistry {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn with_default_plugins(mut self) -> Self {
73 use crate::services::{
74 generic::GenericContainerPlugin, ollama::OllamaPlugin, tgi::TgiPlugin, vllm::VllmPlugin,
75 };
76
77 let generic_plugin = Box::new(GenericContainerPlugin::new(
79 "generic_container",
80 "alpine:latest",
81 ));
82 self.register_plugin(generic_plugin);
83
84 let ollama_config = crate::services::ollama::OllamaConfig {
86 endpoint: "http://localhost:11434".to_string(),
87 default_model: "qwen3-coder:30b".to_string(),
88 timeout_seconds: 60,
89 };
90 let ollama_plugin = Box::new(OllamaPlugin::new("ollama", ollama_config));
91 self.register_plugin(ollama_plugin);
92
93 let vllm_config = crate::services::vllm::VllmConfig {
94 endpoint: "http://localhost:8000".to_string(),
95 model: "microsoft/DialoGPT-medium".to_string(),
96 max_num_seqs: Some(100),
97 max_model_len: Some(2048),
98 tensor_parallel_size: Some(1),
99 gpu_memory_utilization: Some(0.9),
100 enable_prefix_caching: Some(true),
101 timeout_seconds: 60,
102 };
103 let vllm_plugin = Box::new(VllmPlugin::new("vllm", vllm_config));
104 self.register_plugin(vllm_plugin);
105
106 let tgi_config = crate::services::tgi::TgiConfig {
107 endpoint: "http://localhost:8080".to_string(),
108 model_id: "microsoft/DialoGPT-medium".to_string(),
109 max_total_tokens: Some(2048),
110 max_input_length: Some(1024),
111 max_batch_prefill_tokens: Some(4096),
112 max_concurrent_requests: Some(32),
113 max_batch_total_tokens: Some(8192),
114 timeout_seconds: 60,
115 };
116 let tgi_plugin = Box::new(TgiPlugin::new("tgi", tgi_config));
117 self.register_plugin(tgi_plugin);
118
119 self
120 }
121
122 pub fn register_plugin(&mut self, plugin: Box<dyn ServicePlugin>) {
124 let name = plugin.name().to_string();
125 self.plugins.insert(name, plugin);
126 }
127
128 pub async fn start_service(&mut self, service_name: &str) -> Result<ServiceHandle> {
130 let plugin = self.plugins.get(service_name).ok_or_else(|| {
131 CleanroomError::internal_error(format!("Service plugin '{}' not found", service_name))
132 })?;
133
134 let handle = plugin.start()?;
135 self.active_services
136 .insert(handle.id.clone(), handle.clone());
137
138 Ok(handle)
139 }
140
141 pub async fn stop_service(&mut self, handle_id: &str) -> Result<()> {
143 if let Some(handle) = self.active_services.remove(handle_id) {
144 let plugin = self.plugins.get(&handle.service_name).ok_or_else(|| {
145 CleanroomError::internal_error(format!(
146 "Service plugin '{}' not found for handle '{}'",
147 handle.service_name, handle_id
148 ))
149 })?;
150
151 plugin.stop(handle)?;
152 }
153
154 Ok(())
155 }
156
157 pub async fn check_all_health(&self) -> HashMap<String, HealthStatus> {
159 let mut health_status = HashMap::new();
160
161 for (handle_id, handle) in &self.active_services {
162 if let Some(plugin) = self.plugins.get(&handle.service_name) {
163 health_status.insert(handle_id.clone(), plugin.health_check(handle));
164 } else {
165 health_status.insert(handle_id.clone(), HealthStatus::Unknown);
166 }
167 }
168
169 health_status
170 }
171
172 pub fn active_services(&self) -> &HashMap<String, ServiceHandle> {
174 &self.active_services
175 }
176
177 pub fn is_service_running(&self, service_name: &str) -> bool {
179 self.active_services
180 .values()
181 .any(|handle| handle.service_name == service_name)
182 }
183
184 pub async fn get_service_logs(&self, service_id: &str, lines: usize) -> Result<Vec<String>> {
186 let handle = self.active_services.get(service_id).ok_or_else(|| {
187 CleanroomError::internal_error(format!("Service with ID '{}' not found", service_id))
188 })?;
189
190 let _plugin = self.plugins.get(&handle.service_name).ok_or_else(|| {
191 CleanroomError::internal_error(format!(
192 "Service plugin '{}' not found",
193 handle.service_name
194 ))
195 })?;
196
197 let mock_logs = vec![
200 format!(
201 "[{}] Service '{}' started",
202 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
203 handle.service_name
204 ),
205 format!(
206 "[{}] Service '{}' is running",
207 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
208 handle.service_name
209 ),
210 ];
211
212 Ok(mock_logs.into_iter().take(lines).collect())
214 }
215}
216
217#[derive(Debug, Clone)]
219pub struct SimpleMetrics {
220 pub session_id: Uuid,
222 pub start_time: std::time::Instant,
224 pub tests_executed: u32,
226 pub tests_passed: u32,
228 pub tests_failed: u32,
230 pub total_duration_ms: u64,
232 pub active_containers: u32,
234 pub active_services: u32,
236 pub containers_created: u32,
238 pub containers_reused: u32,
240}
241
242impl SimpleMetrics {
243 pub fn new() -> Self {
244 Self {
245 session_id: Uuid::new_v4(),
246 start_time: std::time::Instant::now(),
247 tests_executed: 0,
248 tests_passed: 0,
249 tests_failed: 0,
250 total_duration_ms: 0,
251 active_containers: 0,
252 active_services: 0,
253 containers_created: 0,
254 containers_reused: 0,
255 }
256 }
257}
258
259impl Default for SimpleMetrics {
260 fn default() -> Self {
261 Self::new()
262 }
263}
264
265#[derive(Debug, Clone)]
267pub struct ExecutionResult {
268 pub exit_code: i32,
270 pub stdout: String,
272 pub stderr: String,
274 pub duration: std::time::Duration,
276 pub command: Vec<String>,
278 pub container_name: String,
280 pub container_id: Option<String>,
282}
283
284impl ExecutionResult {
285 pub fn matches_regex(&self, pattern: &str) -> Result<bool> {
287 use regex::Regex;
288 let regex = Regex::new(pattern).map_err(|e| {
289 CleanroomError::validation_error(format!("Invalid regex pattern '{}': {}", pattern, e))
290 })?;
291 Ok(regex.is_match(&self.stdout))
292 }
293
294 pub fn does_not_match_regex(&self, pattern: &str) -> Result<bool> {
296 Ok(!self.matches_regex(pattern)?)
297 }
298
299 pub fn succeeded(&self) -> bool {
301 self.exit_code == 0
302 }
303
304 pub fn failed(&self) -> bool {
306 !self.succeeded()
307 }
308}
309
310#[allow(dead_code)]
312#[derive(Debug)]
313pub struct CleanroomEnvironment {
314 session_id: Uuid,
316 backend: Arc<dyn Backend>,
318 services: Arc<RwLock<ServiceRegistry>>,
320 metrics: Arc<RwLock<SimpleMetrics>>,
322 container_registry: Arc<RwLock<HashMap<String, Box<dyn Any + Send + Sync>>>>,
324 meter: opentelemetry::metrics::Meter,
326 telemetry: Arc<RwLock<TelemetryState>>,
328}
329
330impl Default for CleanroomEnvironment {
331 fn default() -> Self {
350 Self {
353 session_id: Uuid::new_v4(),
354 backend: Arc::new(
355 TestcontainerBackend::new("alpine:latest")
356 .unwrap_or_else(|_| panic!("Default CleanroomEnvironment requires Docker. Tests should ensure Docker is available. Production code should use CleanroomEnvironment::new() instead."))
357 ),
358 services: Arc::new(RwLock::new(ServiceRegistry::new())),
359 metrics: Arc::new(RwLock::new(SimpleMetrics::new())),
360 container_registry: Arc::new(RwLock::new(HashMap::new())),
361 meter: global::meter("clnrm-cleanroom"),
362 telemetry: Arc::new(RwLock::new(TelemetryState::new())),
363 }
364 }
365}
366
367#[derive(Debug)]
369pub struct TelemetryState {
370 pub tracing_enabled: bool,
372 pub metrics_enabled: bool,
374 pub traces: Vec<String>,
376}
377
378impl TelemetryState {
379 pub fn new() -> Self {
381 Self {
382 tracing_enabled: false,
383 metrics_enabled: false,
384 traces: Vec::new(),
385 }
386 }
387
388 pub fn enable_tracing(&mut self) {
390 self.tracing_enabled = true;
391 }
392
393 pub fn enable_metrics(&mut self) {
395 self.metrics_enabled = true;
396 }
397
398 pub fn add_trace(&mut self, trace: String) {
400 self.traces.push(trace);
401 }
402
403 pub fn get_traces(&self) -> Vec<String> {
405 self.traces.clone()
406 }
407}
408
409impl Default for TelemetryState {
410 fn default() -> Self {
411 Self::new()
412 }
413}
414
415impl CleanroomEnvironment {
416 pub async fn new() -> Result<Self> {
418 Self::with_config(None).await
419 }
420
421 pub async fn with_config(config: Option<crate::config::CleanroomConfig>) -> Result<Self> {
433 let default_image = config
435 .as_ref()
436 .map(|c| c.containers.default_image.clone())
437 .unwrap_or_else(|| "alpine:latest".to_string());
438
439 Ok(Self {
440 session_id: Uuid::new_v4(),
441 backend: Arc::new(TestcontainerBackend::new(&default_image).map_err(|e| {
442 CleanroomError::container_error("Failed to initialize test container backend")
443 .with_context(format!("Cannot use default image '{}'", default_image))
444 .with_source(e.to_string())
445 })?),
446 services: Arc::new(RwLock::new(ServiceRegistry::new().with_default_plugins())),
447 metrics: Arc::new(RwLock::new(SimpleMetrics::default())),
448 container_registry: Arc::new(RwLock::new(HashMap::new())),
449 meter: {
450 let meter_provider = global::meter_provider();
451 meter_provider.meter("clnrm-cleanroom")
452 },
453 telemetry: Arc::new(RwLock::new(TelemetryState::new())),
454 })
455 }
456
457 pub async fn execute_test<F, T>(&self, _test_name: &str, test_fn: F) -> Result<T>
463 where
464 F: FnOnce() -> Result<T>,
465 {
466 use std::time::{SystemTime, UNIX_EPOCH};
467
468 let tracer_provider = global::tracer_provider();
469 let mut span = tracer_provider
470 .tracer("clnrm-cleanroom")
471 .start(format!("test.{}", _test_name));
472
473 let start_timestamp = SystemTime::now()
475 .duration_since(UNIX_EPOCH)
476 .map_err(|e| CleanroomError::internal_error(format!("System time error: {}", e)))?
477 .as_millis() as i64;
478
479 let start_time = std::time::Instant::now();
480
481 span.set_attributes(vec![
483 KeyValue::new("test.name", _test_name.to_string()),
484 KeyValue::new("test.suite", "core_tests"), KeyValue::new("test.isolated", true), KeyValue::new("test.start_timestamp", start_timestamp),
487 KeyValue::new("session.id", self.session_id.to_string()),
488 KeyValue::new("container.image.name", "alpine:latest"), KeyValue::new("test.cleanup_performed", true), ]);
491
492 {
494 let mut metrics = self.metrics.write().await;
495 metrics.tests_executed += 1;
496 }
497
498 let result = test_fn();
499
500 let duration = start_time.elapsed();
501 let duration_ms = duration.as_millis() as f64;
502
503 let end_timestamp = SystemTime::now()
505 .duration_since(UNIX_EPOCH)
506 .map_err(|e| CleanroomError::internal_error(format!("System time error: {}", e)))?
507 .as_millis() as i64;
508
509 let success = result.is_ok();
511 let test_result = if success { "pass" } else { "fail" };
512
513 if success {
514 let mut metrics = self.metrics.write().await;
515 metrics.tests_passed += 1;
516 } else {
517 let mut metrics = self.metrics.write().await;
518 metrics.tests_failed += 1;
519 }
520
521 let mut metrics = self.metrics.write().await;
522 metrics.total_duration_ms += duration_ms as u64;
523
524 span.set_attributes(vec![
526 KeyValue::new("test.result", test_result.to_string()),
527 KeyValue::new("test.duration_ms", duration_ms),
528 KeyValue::new("test.end_timestamp", end_timestamp),
529 KeyValue::new("container.id", self.session_id.to_string()), KeyValue::new("container.exit_code", if success { 0 } else { 1 }),
531 ]);
532
533 if !success {
535 if let Err(ref error) = result {
536 span.set_attribute(KeyValue::new("error.message", error.to_string()));
537 span.set_attribute(KeyValue::new("error.type", "TestFailure"));
538 }
539 }
540
541 {
543 let attributes = vec![
544 KeyValue::new("test.name", _test_name.to_string()),
545 KeyValue::new("test.result", test_result.to_string()),
546 KeyValue::new("session.id", self.session_id.to_string()),
547 ];
548
549 let counter = self
550 .meter
551 .u64_counter("test.executions")
552 .with_description("Number of test executions")
553 .build();
554 counter.add(1, &attributes);
555
556 let histogram = self
557 .meter
558 .f64_histogram("test.duration")
559 .with_description("Test execution duration")
560 .build();
561 histogram.record(duration.as_secs_f64(), &attributes);
562 }
563
564 if !success {
565 span.set_status(opentelemetry::trace::Status::error("Test failed"));
566 }
567
568 span.end();
569
570 result
571 }
572
573 pub async fn get_metrics(&self) -> Result<SimpleMetrics> {
575 Ok(self.metrics.read().await.clone())
576 }
577
578 pub async fn enable_tracing(&self) -> Result<()> {
580 {
581 let mut telemetry = self.telemetry.write().await;
582 telemetry.enable_tracing();
583 }
584 Ok(())
585 }
586
587 pub async fn enable_metrics(&self) -> Result<()> {
589 {
590 let mut telemetry = self.telemetry.write().await;
591 telemetry.enable_metrics();
592 }
593 Ok(())
594 }
595
596 pub async fn get_traces(&self) -> Result<Vec<String>> {
598 {
599 let telemetry = self.telemetry.read().await;
600 Ok(telemetry.get_traces())
601 }
602 }
603
604 pub async fn get_container_reuse_stats(&self) -> (u32, u32) {
606 let metrics = self.metrics.read().await;
607 (metrics.containers_created, metrics.containers_reused)
608 }
609
610 pub async fn has_container(&self, name: &str) -> bool {
612 let registry = self.container_registry.read().await;
613 registry.contains_key(name)
614 }
615
616 pub async fn register_service(&self, plugin: Box<dyn ServicePlugin>) -> Result<()> {
618 let mut services = self.services.write().await;
619 services.register_plugin(plugin);
620 Ok(())
621 }
622
623 pub async fn start_service(&self, service_name: &str) -> Result<ServiceHandle> {
625 let mut services = self.services.write().await;
626 services.start_service(service_name).await
627 }
628
629 pub async fn stop_service(&self, handle_id: &str) -> Result<()> {
631 let mut services = self.services.write().await;
632 services.stop_service(handle_id).await
633 }
634
635 pub async fn execute_command_with_output(
644 &self,
645 _handle: &ServiceHandle,
646 command_args: &[String],
647 ) -> Result<std::process::Output> {
648 if command_args.is_empty() {
649 return Err(CleanroomError::validation_error(
650 "Command arguments cannot be empty",
651 ));
652 }
653
654 let mut cmd = Cmd::new(&command_args[0]);
656 for arg in &command_args[1..] {
657 cmd = cmd.arg(arg);
658 }
659
660 let backend = self.backend.clone();
662 let run_result = tokio::task::spawn_blocking(move || backend.run_cmd(cmd))
663 .await
664 .map_err(|e| {
665 CleanroomError::internal_error(format!("Failed to spawn backend execution: {}", e))
666 })?
667 .map_err(|e| {
668 CleanroomError::container_error("Failed to execute command in container")
669 .with_context("Command execution failed in test container")
670 .with_source(e.to_string())
671 })?;
672
673 let output = std::process::Output {
675 status: std::process::ExitStatus::from_raw(run_result.exit_code),
676 stdout: run_result.stdout.into_bytes(),
677 stderr: run_result.stderr.into_bytes(),
678 };
679
680 Ok(output)
681 }
682
683 pub async fn services(&self) -> tokio::sync::RwLockReadGuard<'_, ServiceRegistry> {
685 self.services.read().await
686 }
687
688 pub async fn register_container<T: Send + Sync + 'static>(
690 &self,
691 name: String,
692 container: T,
693 ) -> Result<()> {
694 let mut registry = self.container_registry.write().await;
695 registry.insert(name, Box::new(container));
696 Ok(())
697 }
698
699 pub async fn get_or_create_container<F, T>(&self, name: &str, factory: F) -> Result<T>
704 where
705 F: FnOnce() -> Result<T>,
706 T: Send + Sync + Clone + 'static,
707 {
708 let existing_container = {
710 let registry = self.container_registry.read().await;
711 if let Some(existing_container) = registry.get(name) {
712 existing_container.downcast_ref::<T>().cloned()
714 } else {
715 None
716 }
717 };
718
719 if let Some(container) = existing_container {
720 {
722 let mut metrics = self.metrics.write().await;
723 metrics.containers_reused += 1;
724 }
725
726 return Ok(container);
727 }
728
729 let container = factory()?;
731
732 let mut registry = self.container_registry.write().await;
734 registry.insert(name.to_string(), Box::new(container.clone()));
735
736 {
738 let mut metrics = self.metrics.write().await;
739 metrics.containers_created += 1;
740 }
741
742 Ok(container)
743 }
744
745 pub async fn check_health(&self) -> HashMap<String, HealthStatus> {
747 self.services.read().await.check_all_health().await
748 }
749
750 pub async fn get_service_logs(&self, service_id: &str, lines: usize) -> Result<Vec<String>> {
752 let services = self.services.read().await;
753 services.get_service_logs(service_id, lines).await
754 }
755
756 pub fn session_id(&self) -> Uuid {
758 self.session_id
759 }
760
761 pub fn backend(&self) -> &dyn Backend {
763 self.backend.as_ref() as &dyn Backend
764 }
765
766 pub async fn execute_in_service(
790 &self,
791 service_handle: &ServiceHandle,
792 command: &[String],
793 ) -> Result<ExecutionResult> {
794 use std::time::{SystemTime, UNIX_EPOCH};
795
796 let tracer_provider = global::tracer_provider();
797 let mut span = tracer_provider
798 .tracer("clnrm-cleanroom")
799 .start(format!("service.exec.{}", service_handle.service_name));
800
801 let start_timestamp = SystemTime::now()
803 .duration_since(UNIX_EPOCH)
804 .map_err(|e| CleanroomError::internal_error(format!("System time error: {}", e)))?
805 .as_millis() as i64;
806
807 span.set_attributes(vec![
808 KeyValue::new("service.name", service_handle.service_name.clone()),
809 KeyValue::new("service.id", service_handle.id.clone()),
810 KeyValue::new("command", command.join(" ")),
811 KeyValue::new("session.id", self.session_id.to_string()),
812 KeyValue::new("test.start_timestamp", start_timestamp),
813 ]);
814
815 let start_time = std::time::Instant::now();
816
817 let container_id = service_handle.metadata.get("container_id")
820 .ok_or_else(|| {
821 CleanroomError::internal_error(format!(
822 "Service '{}' has no container_id in metadata. Service may not be properly started.",
823 service_handle.service_name
824 ))
825 .with_context("Service routing requires container_id in ServiceHandle.metadata")
826 .with_source("ServicePlugin.start() must populate container_id metadata")
827 })?;
828
829 let cmd = Cmd::new("sh")
832 .arg("-c")
833 .arg(command.join(" "))
834 .env("SERVICE_NAME", &service_handle.service_name)
835 .env("CONTAINER_ID", container_id);
836
837 let backend = self.backend.clone();
841 let execution_result = tokio::task::spawn_blocking(move || backend.run_cmd(cmd))
842 .await
843 .map_err(|e| {
844 {
845 span.set_status(opentelemetry::trace::Status::error("Task join failed"));
846 span.end();
847 }
848 CleanroomError::internal_error("Failed to execute command in blocking task")
849 .with_context("Service command execution task failed")
850 .with_source(e.to_string())
851 })?
852 .map_err(|e| {
853 {
854 span.set_status(opentelemetry::trace::Status::error(
855 "Command execution failed",
856 ));
857 span.end();
858 }
859 CleanroomError::container_error("Failed to execute command in service container")
860 .with_context(format!(
861 "Service: {}, Command: {}",
862 service_handle.service_name,
863 command.join(" ")
864 ))
865 .with_source(e.to_string())
866 })?;
867
868 let duration = start_time.elapsed();
869 let duration_ms = duration.as_millis() as f64;
870
871 let end_timestamp = SystemTime::now()
873 .duration_since(UNIX_EPOCH)
874 .map_err(|e| CleanroomError::internal_error(format!("System time error: {}", e)))?
875 .as_millis() as i64;
876
877 {
879 let histogram = self
880 .meter
881 .f64_histogram("service.command.duration")
882 .with_description("Service command execution duration")
883 .build();
884 histogram.record(
885 duration.as_secs_f64(),
886 &[
887 KeyValue::new("service.name", service_handle.service_name.clone()),
888 KeyValue::new("command", command.join(" ")),
889 ],
890 );
891 }
892
893 let test_result = if execution_result.exit_code == 0 {
895 "pass"
896 } else {
897 "fail"
898 };
899 span.set_attributes(vec![
900 KeyValue::new("container.exit_code", execution_result.exit_code as i64),
901 KeyValue::new("test.duration_ms", duration_ms),
902 KeyValue::new("test.end_timestamp", end_timestamp),
903 KeyValue::new("test.result", test_result.to_string()),
904 KeyValue::new("container.id", container_id.clone()),
905 ]);
906
907 if execution_result.exit_code != 0 {
908 span.set_attribute(KeyValue::new(
909 "error.message",
910 format!(
911 "Command exited with code {}: {}",
912 execution_result.exit_code, execution_result.stderr
913 ),
914 ));
915 span.set_status(opentelemetry::trace::Status::error("Command failed"));
916 }
917
918 span.end();
919
920 Ok(ExecutionResult {
921 exit_code: execution_result.exit_code,
922 stdout: execution_result.stdout,
923 stderr: execution_result.stderr,
924 duration,
925 command: command.to_vec(),
926 container_name: service_handle.service_name.clone(),
927 container_id: Some(container_id.clone()),
928 })
929 }
930
931 pub async fn execute_in_container(
937 &self,
938 container_name: &str,
939 command: &[String],
940 workdir: Option<&str>,
941 env: Option<&HashMap<String, String>>,
942 ) -> Result<ExecutionResult> {
943 use std::time::{SystemTime, UNIX_EPOCH};
944
945 let tracer_provider = global::tracer_provider();
946 let mut span = tracer_provider
947 .tracer("clnrm-cleanroom")
948 .start(format!("container.exec.{}", container_name));
949
950 let start_timestamp = SystemTime::now()
952 .duration_since(UNIX_EPOCH)
953 .map_err(|e| CleanroomError::internal_error(format!("System time error: {}", e)))?
954 .as_millis() as i64;
955
956 span.set_attributes(vec![
957 KeyValue::new("container.name", container_name.to_string()),
958 KeyValue::new("container.id", self.session_id.to_string()),
959 KeyValue::new("container.image.name", "alpine:latest"),
960 KeyValue::new("command", command.join(" ")),
961 KeyValue::new("session.id", self.session_id.to_string()),
962 KeyValue::new("test.start_timestamp", start_timestamp),
963 ]);
964
965 let start_time = std::time::Instant::now();
966
967 let mut cmd = Cmd::new("sh")
970 .arg("-c")
971 .arg(command.join(" "))
972 .env("CONTAINER_NAME", container_name);
973
974 if let Some(wd) = workdir {
976 cmd = cmd.workdir(std::path::PathBuf::from(wd));
977 }
978
979 if let Some(env_vars) = env {
981 for (key, value) in env_vars {
982 cmd = cmd.env(key, value);
983 }
984 }
985
986 let backend = self.backend.clone();
989 let execution_result = tokio::task::spawn_blocking(move || backend.run_cmd(cmd))
990 .await
991 .map_err(|e| {
992 {
993 span.set_status(opentelemetry::trace::Status::error("Task join failed"));
994 span.end();
995 }
996 CleanroomError::internal_error("Failed to execute command in blocking task")
997 .with_context("Command execution task failed")
998 .with_source(e.to_string())
999 })?
1000 .map_err(|e| {
1001 {
1002 span.set_status(opentelemetry::trace::Status::error(
1003 "Command execution failed",
1004 ));
1005 span.end();
1006 }
1007 CleanroomError::container_error("Failed to execute command in container")
1008 .with_context(format!(
1009 "Container: {}, Command: {}",
1010 container_name,
1011 command.join(" ")
1012 ))
1013 .with_source(e.to_string())
1014 })?;
1015
1016 let duration = start_time.elapsed();
1017 let duration_ms = duration.as_millis() as f64;
1018
1019 let end_timestamp = SystemTime::now()
1021 .duration_since(UNIX_EPOCH)
1022 .map_err(|e| CleanroomError::internal_error(format!("System time error: {}", e)))?
1023 .as_millis() as i64;
1024
1025 {
1027 let histogram = self
1028 .meter
1029 .f64_histogram("container.command.duration")
1030 .with_description("Container command execution duration")
1031 .build();
1032 histogram.record(
1033 duration.as_secs_f64(),
1034 &[
1035 KeyValue::new("container.name", container_name.to_string()),
1036 KeyValue::new("command", command.join(" ")),
1037 ],
1038 );
1039 }
1040
1041 let test_result = if execution_result.exit_code == 0 {
1043 "pass"
1044 } else {
1045 "fail"
1046 };
1047
1048 span.set_attributes(vec![
1049 KeyValue::new("container.exit_code", execution_result.exit_code as i64),
1050 KeyValue::new("test.duration_ms", duration_ms),
1051 KeyValue::new("test.end_timestamp", end_timestamp),
1052 KeyValue::new("test.result", test_result.to_string()),
1053 KeyValue::new("test.isolated", true),
1054 KeyValue::new("test.cleanup_performed", true),
1055 ]);
1056
1057 if execution_result.exit_code != 0 {
1059 span.set_attribute(KeyValue::new(
1060 "error.message",
1061 format!(
1062 "Command exited with code {}: {}",
1063 execution_result.exit_code, execution_result.stderr
1064 ),
1065 ));
1066 span.set_status(opentelemetry::trace::Status::error("Command failed"));
1067 }
1068
1069 span.end();
1070
1071 Ok(ExecutionResult {
1072 exit_code: execution_result.exit_code,
1073 stdout: execution_result.stdout,
1074 stderr: execution_result.stderr,
1075 duration,
1076 command: command.to_vec(),
1077 container_name: container_name.to_string(),
1078 container_id: Some(self.session_id.to_string()), })
1080 }
1081}
1082
1083#[derive(Debug)]
1090pub struct MockDatabasePlugin {
1091 name: String,
1092 #[allow(dead_code)]
1093 container_id: Arc<RwLock<Option<String>>>,
1094}
1095
1096impl Default for MockDatabasePlugin {
1097 fn default() -> Self {
1098 Self::new()
1099 }
1100}
1101
1102impl MockDatabasePlugin {
1103 pub fn new() -> Self {
1104 Self {
1105 name: "mock_database".to_string(),
1106 container_id: Arc::new(RwLock::new(None)),
1107 }
1108 }
1109}
1110
1111impl ServicePlugin for MockDatabasePlugin {
1112 fn name(&self) -> &str {
1113 &self.name
1114 }
1115
1116 fn start(&self) -> Result<ServiceHandle> {
1117 let mut metadata = HashMap::new();
1122 metadata.insert("host".to_string(), "127.0.0.1".to_string());
1123 metadata.insert("port".to_string(), "8000".to_string());
1124 metadata.insert("username".to_string(), "root".to_string());
1125 metadata.insert("password".to_string(), "root".to_string());
1126
1127 Ok(ServiceHandle {
1128 id: Uuid::new_v4().to_string(),
1129 service_name: "mock_database".to_string(),
1130 metadata,
1131 })
1132 }
1133
1134 fn stop(&self, _handle: ServiceHandle) -> Result<()> {
1135 Ok(())
1138 }
1139
1140 fn health_check(&self, handle: &ServiceHandle) -> HealthStatus {
1141 if handle.metadata.contains_key("port") {
1143 HealthStatus::Healthy
1144 } else {
1145 HealthStatus::Unknown
1146 }
1147 }
1148}