clnrm_core/services/
otel_collector.rs

1//! OpenTelemetry Collector service plugin
2//!
3//! This plugin provides a managed OTEL Collector testcontainer that can be
4//! configured and validated from .clnrm.toml files.
5//!
6//! ## Features
7//!
8//! - Automatic OTEL Collector container management
9//! - Health check endpoint detection
10//! - OTLP HTTP/gRPC endpoint exposure
11//! - Custom collector configuration support
12//! - Metrics and traces validation
13//!
14//! ## Core Team Standards
15//!
16//! - No .unwrap() or .expect() in production code
17//! - All methods return Result<T, CleanroomError>
18//! - ServicePlugin trait is sync (dyn compatible)
19//! - AAA test pattern for all tests
20//! - Proper error context and source
21
22use crate::cleanroom::{HealthStatus, ServiceHandle, ServicePlugin};
23use crate::error::{CleanroomError, Result};
24use std::collections::HashMap;
25use std::sync::Arc;
26use tokio::sync::RwLock;
27
28/// Default OTEL Collector image
29const DEFAULT_OTEL_COLLECTOR_IMAGE: &str = "otel/opentelemetry-collector:latest";
30
31/// OTLP gRPC port
32const OTLP_GRPC_PORT: u16 = 4317;
33
34/// OTLP HTTP port
35const OTLP_HTTP_PORT: u16 = 4318;
36
37/// Health check port
38const HEALTH_CHECK_PORT: u16 = 13133;
39
40/// Prometheus metrics port
41const PROMETHEUS_PORT: u16 = 8889;
42
43/// zPages debugging port (for future use)
44#[allow(dead_code)]
45const ZPAGES_PORT: u16 = 55679;
46
47/// OTEL Collector configuration
48#[derive(Debug, Clone)]
49pub struct OtelCollectorConfig {
50    /// Collector image (default: otel/opentelemetry-collector:latest)
51    pub image: String,
52    /// Enable OTLP gRPC receiver
53    pub enable_otlp_grpc: bool,
54    /// Enable OTLP HTTP receiver
55    pub enable_otlp_http: bool,
56    /// Enable health check endpoint
57    pub enable_health_check: bool,
58    /// Enable Prometheus metrics exporter
59    pub enable_prometheus: bool,
60    /// Enable zPages debugging
61    pub enable_zpages: bool,
62    /// Custom collector config file path (optional)
63    pub config_file: Option<String>,
64    /// Additional environment variables
65    pub env_vars: HashMap<String, String>,
66}
67
68impl Default for OtelCollectorConfig {
69    fn default() -> Self {
70        Self {
71            image: DEFAULT_OTEL_COLLECTOR_IMAGE.to_string(),
72            enable_otlp_grpc: true,
73            enable_otlp_http: true,
74            enable_health_check: true,
75            enable_prometheus: false,
76            enable_zpages: false,
77            config_file: None,
78            env_vars: HashMap::new(),
79        }
80    }
81}
82
83/// OpenTelemetry Collector service plugin
84///
85/// Provides managed OTEL Collector container for testing observability
86/// integrations in hermetic environments.
87#[derive(Debug, Clone)]
88pub struct OtelCollectorPlugin {
89    /// Service name
90    name: String,
91    /// Configuration
92    config: OtelCollectorConfig,
93    /// Container ID (when running)
94    container_id: Arc<RwLock<Option<String>>>,
95}
96
97impl OtelCollectorPlugin {
98    /// Create new OTEL Collector plugin with default configuration
99    pub fn new(name: &str) -> Self {
100        Self {
101            name: name.to_string(),
102            config: OtelCollectorConfig::default(),
103            container_id: Arc::new(RwLock::new(None)),
104        }
105    }
106
107    /// Create with custom configuration
108    pub fn with_config(name: &str, config: OtelCollectorConfig) -> Self {
109        Self {
110            name: name.to_string(),
111            config,
112            container_id: Arc::new(RwLock::new(None)),
113        }
114    }
115
116    /// Set custom name
117    pub fn with_name(mut self, name: &str) -> Self {
118        self.name = name.to_string();
119        self
120    }
121
122    /// Enable OTLP gRPC receiver
123    pub fn with_otlp_grpc(mut self, enable: bool) -> Self {
124        self.config.enable_otlp_grpc = enable;
125        self
126    }
127
128    /// Enable OTLP HTTP receiver
129    pub fn with_otlp_http(mut self, enable: bool) -> Self {
130        self.config.enable_otlp_http = enable;
131        self
132    }
133
134    /// Enable Prometheus metrics exporter
135    pub fn with_prometheus(mut self, enable: bool) -> Self {
136        self.config.enable_prometheus = enable;
137        self
138    }
139
140    /// Add environment variable
141    pub fn with_env(mut self, key: &str, value: &str) -> Self {
142        self.config
143            .env_vars
144            .insert(key.to_string(), value.to_string());
145        self
146    }
147
148    /// Set custom collector config file
149    pub fn with_config_file(mut self, path: &str) -> Self {
150        self.config.config_file = Some(path.to_string());
151        self
152    }
153
154    /// Verify OTEL Collector health endpoint
155    async fn verify_health(&self, host_port: u16) -> Result<()> {
156        let url = format!("http://127.0.0.1:{}/", host_port);
157
158        let client = reqwest::Client::builder()
159            .timeout(std::time::Duration::from_secs(5))
160            .build()
161            .map_err(|e| {
162                CleanroomError::connection_failed("Failed to create HTTP client")
163                    .with_source(e.to_string())
164            })?;
165
166        let response = client.get(&url).send().await.map_err(|e| {
167            CleanroomError::connection_failed("Failed to connect to OTEL Collector health endpoint")
168                .with_source(e.to_string())
169        })?;
170
171        if !response.status().is_success() {
172            return Err(CleanroomError::service_error(format!(
173                "OTEL Collector health check returned status: {}",
174                response.status()
175            )));
176        }
177
178        Ok(())
179    }
180
181    /// Build collector configuration
182    #[allow(dead_code)]
183    fn build_config(&self) -> String {
184        // Minimal collector config that enables requested features
185        let mut config = String::from("receivers:\n");
186
187        if self.config.enable_otlp_grpc || self.config.enable_otlp_http {
188            config.push_str("  otlp:\n");
189            config.push_str("    protocols:\n");
190
191            if self.config.enable_otlp_grpc {
192                config.push_str(&format!(
193                    "      grpc:\n        endpoint: 0.0.0.0:{}\n",
194                    OTLP_GRPC_PORT
195                ));
196            }
197
198            if self.config.enable_otlp_http {
199                config.push_str(&format!(
200                    "      http:\n        endpoint: 0.0.0.0:{}\n",
201                    OTLP_HTTP_PORT
202                ));
203            }
204        }
205
206        config.push_str("\nprocessors:\n  batch:\n    timeout: 10s\n\n");
207
208        config.push_str("exporters:\n  logging:\n    loglevel: debug\n\n");
209
210        config.push_str("service:\n  pipelines:\n");
211        config.push_str(
212            "    traces:\n      receivers: [otlp]\n      processors: [batch]\n      exporters: [logging]\n",
213        );
214        config.push_str(
215            "    metrics:\n      receivers: [otlp]\n      processors: [batch]\n      exporters: [logging]\n",
216        );
217        config.push_str(
218            "    logs:\n      receivers: [otlp]\n      processors: [batch]\n      exporters: [logging]\n",
219        );
220
221        if self.config.enable_health_check || self.config.enable_zpages {
222            config.push_str("\n  extensions: [");
223            let mut extensions = Vec::new();
224            if self.config.enable_health_check {
225                extensions.push("health_check");
226            }
227            if self.config.enable_zpages {
228                extensions.push("zpages");
229            }
230            config.push_str(&extensions.join(", "));
231            config.push_str("]\n\nextensions:\n");
232
233            if self.config.enable_health_check {
234                config.push_str(&format!(
235                    "  health_check:\n    endpoint: 0.0.0.0:{}\n",
236                    HEALTH_CHECK_PORT
237                ));
238            }
239
240            if self.config.enable_zpages {
241                config.push_str(&format!(
242                    "  zpages:\n    endpoint: 0.0.0.0:{}\n",
243                    ZPAGES_PORT
244                ));
245            }
246        }
247
248        config
249    }
250}
251
252impl ServicePlugin for OtelCollectorPlugin {
253    fn name(&self) -> &str {
254        &self.name
255    }
256
257    fn start(&self) -> Result<ServiceHandle> {
258        use testcontainers::{runners::AsyncRunner, GenericImage, ImageExt};
259
260        // Use tokio::task::block_in_place for async operations within sync trait
261        tokio::task::block_in_place(|| {
262            tokio::runtime::Handle::current().block_on(async {
263                // Build collector image with configuration
264                let image = GenericImage::new(
265                    self.config
266                        .image
267                        .split(':')
268                        .next()
269                        .unwrap_or("otel/opentelemetry-collector"),
270                    self.config.image.split(':').nth(1).unwrap_or("latest"),
271                );
272
273                // Build container request with ports and env vars
274                let mut container_request: testcontainers::core::ContainerRequest<GenericImage> =
275                    image.into();
276
277                // Add environment variables
278                for (key, value) in &self.config.env_vars {
279                    container_request = container_request.with_env_var(key, value);
280                }
281
282                // Start container
283                let node = container_request.start().await.map_err(|e| {
284                    CleanroomError::container_error("Failed to start OTEL Collector container")
285                        .with_context("Container startup failed")
286                        .with_source(e.to_string())
287                })?;
288
289                // Get exposed ports
290                let mut metadata = HashMap::new();
291                metadata.insert("image".to_string(), self.config.image.clone());
292                metadata.insert("service_type".to_string(), "otel_collector".to_string());
293
294                if self.config.enable_otlp_grpc {
295                    let grpc_port = node.get_host_port_ipv4(OTLP_GRPC_PORT).await.map_err(|e| {
296                        CleanroomError::container_error("Failed to get OTLP gRPC port")
297                            .with_source(e.to_string())
298                    })?;
299                    metadata.insert("otlp_grpc_port".to_string(), grpc_port.to_string());
300                    metadata.insert(
301                        "otlp_grpc_endpoint".to_string(),
302                        format!("http://127.0.0.1:{}", grpc_port),
303                    );
304                }
305
306                if self.config.enable_otlp_http {
307                    let http_port = node.get_host_port_ipv4(OTLP_HTTP_PORT).await.map_err(|e| {
308                        CleanroomError::container_error("Failed to get OTLP HTTP port")
309                            .with_source(e.to_string())
310                    })?;
311                    metadata.insert("otlp_http_port".to_string(), http_port.to_string());
312                    metadata.insert(
313                        "otlp_http_endpoint".to_string(),
314                        format!("http://127.0.0.1:{}", http_port),
315                    );
316                }
317
318                if self.config.enable_health_check {
319                    let health_port =
320                        node.get_host_port_ipv4(HEALTH_CHECK_PORT)
321                            .await
322                            .map_err(|e| {
323                                CleanroomError::container_error("Failed to get health check port")
324                                    .with_source(e.to_string())
325                            })?;
326                    metadata.insert("health_check_port".to_string(), health_port.to_string());
327                    metadata.insert(
328                        "health_check_endpoint".to_string(),
329                        format!("http://127.0.0.1:{}", health_port),
330                    );
331
332                    // Verify health endpoint is responding
333                    self.verify_health(health_port).await?;
334                }
335
336                if self.config.enable_prometheus {
337                    let prom_port =
338                        node.get_host_port_ipv4(PROMETHEUS_PORT)
339                            .await
340                            .map_err(|e| {
341                                CleanroomError::container_error("Failed to get Prometheus port")
342                                    .with_source(e.to_string())
343                            })?;
344                    metadata.insert("prometheus_port".to_string(), prom_port.to_string());
345                    metadata.insert(
346                        "prometheus_endpoint".to_string(),
347                        format!("http://127.0.0.1:{}", prom_port),
348                    );
349                }
350
351                // Store container ID
352                let container_id = format!("otel-collector-{}", uuid::Uuid::new_v4());
353                let mut container_guard = self.container_id.write().await;
354                *container_guard = Some(container_id.clone());
355
356                Ok(ServiceHandle {
357                    id: container_id,
358                    service_name: self.name.clone(),
359                    metadata,
360                })
361            })
362        })
363    }
364
365    fn stop(&self, handle: ServiceHandle) -> Result<()> {
366        tokio::task::block_in_place(|| {
367            tokio::runtime::Handle::current().block_on(async {
368                let mut container_guard = self.container_id.write().await;
369
370                if container_guard.is_none() {
371                    return Err(CleanroomError::service_error(format!(
372                        "Cannot stop OTEL Collector '{}': not running",
373                        handle.service_name
374                    )));
375                }
376
377                // Container cleanup is automatic with testcontainers
378                *container_guard = None;
379
380                Ok(())
381            })
382        })
383    }
384
385    fn health_check(&self, handle: &ServiceHandle) -> HealthStatus {
386        // Check if health check endpoint is available in metadata
387        if let Some(health_endpoint) = handle.metadata.get("health_check_endpoint") {
388            tokio::task::block_in_place(|| {
389                tokio::runtime::Handle::current().block_on(async {
390                    let client = match reqwest::Client::builder()
391                        .timeout(std::time::Duration::from_secs(2))
392                        .build()
393                    {
394                        Ok(c) => c,
395                        Err(_) => return HealthStatus::Unknown,
396                    };
397
398                    match client.get(health_endpoint).send().await {
399                        Ok(response) if response.status().is_success() => HealthStatus::Healthy,
400                        Ok(_) => HealthStatus::Unhealthy,
401                        Err(_) => HealthStatus::Unhealthy,
402                    }
403                })
404            })
405        } else {
406            HealthStatus::Unknown
407        }
408    }
409}