1use crate::cleanroom::{HealthStatus, ServiceHandle, ServicePlugin};
23use crate::error::{CleanroomError, Result};
24use std::collections::HashMap;
25use std::sync::Arc;
26use tokio::sync::RwLock;
27
28const DEFAULT_OTEL_COLLECTOR_IMAGE: &str = "otel/opentelemetry-collector:latest";
30
31const OTLP_GRPC_PORT: u16 = 4317;
33
34const OTLP_HTTP_PORT: u16 = 4318;
36
37const HEALTH_CHECK_PORT: u16 = 13133;
39
40const PROMETHEUS_PORT: u16 = 8889;
42
43#[allow(dead_code)]
45const ZPAGES_PORT: u16 = 55679;
46
47#[derive(Debug, Clone)]
49pub struct OtelCollectorConfig {
50 pub image: String,
52 pub enable_otlp_grpc: bool,
54 pub enable_otlp_http: bool,
56 pub enable_health_check: bool,
58 pub enable_prometheus: bool,
60 pub enable_zpages: bool,
62 pub config_file: Option<String>,
64 pub env_vars: HashMap<String, String>,
66}
67
68impl Default for OtelCollectorConfig {
69 fn default() -> Self {
70 Self {
71 image: DEFAULT_OTEL_COLLECTOR_IMAGE.to_string(),
72 enable_otlp_grpc: true,
73 enable_otlp_http: true,
74 enable_health_check: true,
75 enable_prometheus: false,
76 enable_zpages: false,
77 config_file: None,
78 env_vars: HashMap::new(),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
88pub struct OtelCollectorPlugin {
89 name: String,
91 config: OtelCollectorConfig,
93 container_id: Arc<RwLock<Option<String>>>,
95}
96
97impl OtelCollectorPlugin {
98 pub fn new(name: &str) -> Self {
100 Self {
101 name: name.to_string(),
102 config: OtelCollectorConfig::default(),
103 container_id: Arc::new(RwLock::new(None)),
104 }
105 }
106
107 pub fn with_config(name: &str, config: OtelCollectorConfig) -> Self {
109 Self {
110 name: name.to_string(),
111 config,
112 container_id: Arc::new(RwLock::new(None)),
113 }
114 }
115
116 pub fn with_name(mut self, name: &str) -> Self {
118 self.name = name.to_string();
119 self
120 }
121
122 pub fn with_otlp_grpc(mut self, enable: bool) -> Self {
124 self.config.enable_otlp_grpc = enable;
125 self
126 }
127
128 pub fn with_otlp_http(mut self, enable: bool) -> Self {
130 self.config.enable_otlp_http = enable;
131 self
132 }
133
134 pub fn with_prometheus(mut self, enable: bool) -> Self {
136 self.config.enable_prometheus = enable;
137 self
138 }
139
140 pub fn with_env(mut self, key: &str, value: &str) -> Self {
142 self.config
143 .env_vars
144 .insert(key.to_string(), value.to_string());
145 self
146 }
147
148 pub fn with_config_file(mut self, path: &str) -> Self {
150 self.config.config_file = Some(path.to_string());
151 self
152 }
153
154 async fn verify_health(&self, host_port: u16) -> Result<()> {
156 let url = format!("http://127.0.0.1:{}/", host_port);
157
158 let client = reqwest::Client::builder()
159 .timeout(std::time::Duration::from_secs(5))
160 .build()
161 .map_err(|e| {
162 CleanroomError::connection_failed("Failed to create HTTP client")
163 .with_source(e.to_string())
164 })?;
165
166 let response = client.get(&url).send().await.map_err(|e| {
167 CleanroomError::connection_failed("Failed to connect to OTEL Collector health endpoint")
168 .with_source(e.to_string())
169 })?;
170
171 if !response.status().is_success() {
172 return Err(CleanroomError::service_error(format!(
173 "OTEL Collector health check returned status: {}",
174 response.status()
175 )));
176 }
177
178 Ok(())
179 }
180
181 #[allow(dead_code)]
183 fn build_config(&self) -> String {
184 let mut config = String::from("receivers:\n");
186
187 if self.config.enable_otlp_grpc || self.config.enable_otlp_http {
188 config.push_str(" otlp:\n");
189 config.push_str(" protocols:\n");
190
191 if self.config.enable_otlp_grpc {
192 config.push_str(&format!(
193 " grpc:\n endpoint: 0.0.0.0:{}\n",
194 OTLP_GRPC_PORT
195 ));
196 }
197
198 if self.config.enable_otlp_http {
199 config.push_str(&format!(
200 " http:\n endpoint: 0.0.0.0:{}\n",
201 OTLP_HTTP_PORT
202 ));
203 }
204 }
205
206 config.push_str("\nprocessors:\n batch:\n timeout: 10s\n\n");
207
208 config.push_str("exporters:\n logging:\n loglevel: debug\n\n");
209
210 config.push_str("service:\n pipelines:\n");
211 config.push_str(
212 " traces:\n receivers: [otlp]\n processors: [batch]\n exporters: [logging]\n",
213 );
214 config.push_str(
215 " metrics:\n receivers: [otlp]\n processors: [batch]\n exporters: [logging]\n",
216 );
217 config.push_str(
218 " logs:\n receivers: [otlp]\n processors: [batch]\n exporters: [logging]\n",
219 );
220
221 if self.config.enable_health_check || self.config.enable_zpages {
222 config.push_str("\n extensions: [");
223 let mut extensions = Vec::new();
224 if self.config.enable_health_check {
225 extensions.push("health_check");
226 }
227 if self.config.enable_zpages {
228 extensions.push("zpages");
229 }
230 config.push_str(&extensions.join(", "));
231 config.push_str("]\n\nextensions:\n");
232
233 if self.config.enable_health_check {
234 config.push_str(&format!(
235 " health_check:\n endpoint: 0.0.0.0:{}\n",
236 HEALTH_CHECK_PORT
237 ));
238 }
239
240 if self.config.enable_zpages {
241 config.push_str(&format!(
242 " zpages:\n endpoint: 0.0.0.0:{}\n",
243 ZPAGES_PORT
244 ));
245 }
246 }
247
248 config
249 }
250}
251
252impl ServicePlugin for OtelCollectorPlugin {
253 fn name(&self) -> &str {
254 &self.name
255 }
256
257 fn start(&self) -> Result<ServiceHandle> {
258 use testcontainers::{runners::AsyncRunner, GenericImage, ImageExt};
259
260 tokio::task::block_in_place(|| {
262 tokio::runtime::Handle::current().block_on(async {
263 let image = GenericImage::new(
265 self.config
266 .image
267 .split(':')
268 .next()
269 .unwrap_or("otel/opentelemetry-collector"),
270 self.config.image.split(':').nth(1).unwrap_or("latest"),
271 );
272
273 let mut container_request: testcontainers::core::ContainerRequest<GenericImage> =
275 image.into();
276
277 for (key, value) in &self.config.env_vars {
279 container_request = container_request.with_env_var(key, value);
280 }
281
282 let node = container_request.start().await.map_err(|e| {
284 CleanroomError::container_error("Failed to start OTEL Collector container")
285 .with_context("Container startup failed")
286 .with_source(e.to_string())
287 })?;
288
289 let mut metadata = HashMap::new();
291 metadata.insert("image".to_string(), self.config.image.clone());
292 metadata.insert("service_type".to_string(), "otel_collector".to_string());
293
294 if self.config.enable_otlp_grpc {
295 let grpc_port = node.get_host_port_ipv4(OTLP_GRPC_PORT).await.map_err(|e| {
296 CleanroomError::container_error("Failed to get OTLP gRPC port")
297 .with_source(e.to_string())
298 })?;
299 metadata.insert("otlp_grpc_port".to_string(), grpc_port.to_string());
300 metadata.insert(
301 "otlp_grpc_endpoint".to_string(),
302 format!("http://127.0.0.1:{}", grpc_port),
303 );
304 }
305
306 if self.config.enable_otlp_http {
307 let http_port = node.get_host_port_ipv4(OTLP_HTTP_PORT).await.map_err(|e| {
308 CleanroomError::container_error("Failed to get OTLP HTTP port")
309 .with_source(e.to_string())
310 })?;
311 metadata.insert("otlp_http_port".to_string(), http_port.to_string());
312 metadata.insert(
313 "otlp_http_endpoint".to_string(),
314 format!("http://127.0.0.1:{}", http_port),
315 );
316 }
317
318 if self.config.enable_health_check {
319 let health_port =
320 node.get_host_port_ipv4(HEALTH_CHECK_PORT)
321 .await
322 .map_err(|e| {
323 CleanroomError::container_error("Failed to get health check port")
324 .with_source(e.to_string())
325 })?;
326 metadata.insert("health_check_port".to_string(), health_port.to_string());
327 metadata.insert(
328 "health_check_endpoint".to_string(),
329 format!("http://127.0.0.1:{}", health_port),
330 );
331
332 self.verify_health(health_port).await?;
334 }
335
336 if self.config.enable_prometheus {
337 let prom_port =
338 node.get_host_port_ipv4(PROMETHEUS_PORT)
339 .await
340 .map_err(|e| {
341 CleanroomError::container_error("Failed to get Prometheus port")
342 .with_source(e.to_string())
343 })?;
344 metadata.insert("prometheus_port".to_string(), prom_port.to_string());
345 metadata.insert(
346 "prometheus_endpoint".to_string(),
347 format!("http://127.0.0.1:{}", prom_port),
348 );
349 }
350
351 let container_id = format!("otel-collector-{}", uuid::Uuid::new_v4());
353 let mut container_guard = self.container_id.write().await;
354 *container_guard = Some(container_id.clone());
355
356 Ok(ServiceHandle {
357 id: container_id,
358 service_name: self.name.clone(),
359 metadata,
360 })
361 })
362 })
363 }
364
365 fn stop(&self, handle: ServiceHandle) -> Result<()> {
366 tokio::task::block_in_place(|| {
367 tokio::runtime::Handle::current().block_on(async {
368 let mut container_guard = self.container_id.write().await;
369
370 if container_guard.is_none() {
371 return Err(CleanroomError::service_error(format!(
372 "Cannot stop OTEL Collector '{}': not running",
373 handle.service_name
374 )));
375 }
376
377 *container_guard = None;
379
380 Ok(())
381 })
382 })
383 }
384
385 fn health_check(&self, handle: &ServiceHandle) -> HealthStatus {
386 if let Some(health_endpoint) = handle.metadata.get("health_check_endpoint") {
388 tokio::task::block_in_place(|| {
389 tokio::runtime::Handle::current().block_on(async {
390 let client = match reqwest::Client::builder()
391 .timeout(std::time::Duration::from_secs(2))
392 .build()
393 {
394 Ok(c) => c,
395 Err(_) => return HealthStatus::Unknown,
396 };
397
398 match client.get(health_endpoint).send().await {
399 Ok(response) if response.status().is_success() => HealthStatus::Healthy,
400 Ok(_) => HealthStatus::Unhealthy,
401 Err(_) => HealthStatus::Unhealthy,
402 }
403 })
404 })
405 } else {
406 HealthStatus::Unknown
407 }
408 }
409}