1use figment::{
24 Figment,
25 providers::{Env, Format, Serialized, Toml},
26};
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::path::Path;
30use std::time::Duration;
31
32const DEFAULT_CONFIG_PATH: &str = "/var/task/otel-extension.toml";
33const ENV_PREFIX: &str = "LAMBDA_OTEL_";
34
35#[non_exhaustive]
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
38#[serde(rename_all = "lowercase")]
39pub enum Protocol {
40 Grpc,
42 #[default]
44 Http,
45}
46
47#[non_exhaustive]
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
50#[serde(rename_all = "lowercase")]
51pub enum Compression {
52 None,
54 #[default]
56 Gzip,
57}
58
59#[non_exhaustive]
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
62#[serde(rename_all = "lowercase")]
63pub enum FlushStrategy {
64 #[default]
66 Default,
67 End,
69 Periodic,
71 Continuous,
73}
74
75#[derive(Debug, Clone, Default, Serialize, Deserialize)]
77#[serde(default)]
78pub struct Config {
79 pub exporter: ExporterConfig,
81 pub receiver: ReceiverConfig,
83 pub flush: FlushConfig,
85 pub correlation: CorrelationConfig,
87 pub telemetry_api: TelemetryApiConfig,
89}
90
91impl Config {
92 #[allow(clippy::result_large_err)]
103 pub fn load() -> Result<Self, figment::Error> {
104 Self::load_from_path(DEFAULT_CONFIG_PATH)
105 }
106
107 #[allow(clippy::result_large_err)]
113 pub fn load_from_path<P: AsRef<Path>>(config_path: P) -> Result<Self, figment::Error> {
114 let mut figment = Figment::from(Serialized::defaults(Config::default()));
115
116 if config_path.as_ref().exists() {
117 figment = figment.merge(Toml::file(config_path));
118 }
119
120 figment = figment.merge(standard_otel_env());
121 figment = figment.merge(Env::prefixed(ENV_PREFIX).split("_"));
122
123 figment.extract()
124 }
125
126 pub fn builder() -> ConfigBuilder {
128 ConfigBuilder::new()
129 }
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(default)]
135pub struct ExporterConfig {
136 pub endpoint: Option<String>,
138 pub protocol: Protocol,
140 #[serde(with = "duration_ms")]
142 pub timeout: Duration,
143 pub compression: Compression,
145 #[serde(default)]
147 pub headers: HashMap<String, String>,
148}
149
150impl Default for ExporterConfig {
151 fn default() -> Self {
152 Self {
153 endpoint: None,
154 protocol: Protocol::Http,
155 timeout: Duration::from_millis(500),
156 compression: Compression::Gzip,
157 headers: HashMap::new(),
158 }
159 }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(default)]
165pub struct ReceiverConfig {
166 pub grpc_port: u16,
168 pub http_port: u16,
170 pub grpc_enabled: bool,
172 pub http_enabled: bool,
174}
175
176impl Default for ReceiverConfig {
177 fn default() -> Self {
178 Self {
179 grpc_port: 4317,
180 http_port: 4318,
181 grpc_enabled: true,
182 http_enabled: true,
183 }
184 }
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(default)]
190pub struct FlushConfig {
191 pub strategy: FlushStrategy,
193 #[serde(with = "duration_ms")]
195 pub interval: Duration,
196 pub max_batch_bytes: usize,
198 pub max_batch_entries: usize,
200}
201
202impl Default for FlushConfig {
203 fn default() -> Self {
204 Self {
205 strategy: FlushStrategy::Default,
206 interval: Duration::from_secs(20),
207 max_batch_bytes: 4 * 1024 * 1024,
208 max_batch_entries: 1000,
209 }
210 }
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
215#[serde(default)]
216pub struct CorrelationConfig {
217 #[serde(with = "duration_ms")]
219 pub max_correlation_delay: Duration,
220 pub max_buffered_events_per_invocation: usize,
222 pub max_total_buffered_events: usize,
224 #[serde(with = "duration_ms")]
226 pub max_invocation_lifetime: Duration,
227 pub emit_orphaned_spans: bool,
229}
230
231impl Default for CorrelationConfig {
232 fn default() -> Self {
233 Self {
234 max_correlation_delay: Duration::from_millis(500),
235 max_buffered_events_per_invocation: 50,
236 max_total_buffered_events: 500,
237 max_invocation_lifetime: Duration::from_secs(15 * 60),
238 emit_orphaned_spans: true,
239 }
240 }
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize)]
245#[serde(default)]
246pub struct TelemetryApiConfig {
247 pub enabled: bool,
249 pub listener_port: u16,
251 pub buffer_size: usize,
253}
254
255impl Default for TelemetryApiConfig {
256 fn default() -> Self {
257 Self {
258 enabled: true,
259 listener_port: 9999,
260 buffer_size: 256,
261 }
262 }
263}
264
265#[must_use = "builders do nothing unless .build() is called"]
267pub struct ConfigBuilder {
268 config: Config,
269}
270
271impl ConfigBuilder {
272 pub fn new() -> Self {
274 Self {
275 config: Config::default(),
276 }
277 }
278
279 pub fn exporter_endpoint(mut self, endpoint: impl Into<String>) -> Self {
281 self.config.exporter.endpoint = Some(endpoint.into());
282 self
283 }
284
285 pub fn exporter_protocol(mut self, protocol: Protocol) -> Self {
287 self.config.exporter.protocol = protocol;
288 self
289 }
290
291 pub fn exporter_timeout(mut self, timeout: Duration) -> Self {
293 self.config.exporter.timeout = timeout;
294 self
295 }
296
297 pub fn flush_strategy(mut self, strategy: FlushStrategy) -> Self {
299 self.config.flush.strategy = strategy;
300 self
301 }
302
303 pub fn flush_interval(mut self, interval: Duration) -> Self {
305 self.config.flush.interval = interval;
306 self
307 }
308
309 pub fn correlation_delay(mut self, delay: Duration) -> Self {
311 self.config.correlation.max_correlation_delay = delay;
312 self
313 }
314
315 pub fn emit_orphaned_spans(mut self, emit: bool) -> Self {
317 self.config.correlation.emit_orphaned_spans = emit;
318 self
319 }
320
321 pub fn grpc_receiver(mut self, enabled: bool) -> Self {
323 self.config.receiver.grpc_enabled = enabled;
324 self
325 }
326
327 pub fn http_receiver(mut self, enabled: bool) -> Self {
329 self.config.receiver.http_enabled = enabled;
330 self
331 }
332
333 pub fn grpc_port(mut self, port: u16) -> Self {
335 self.config.receiver.grpc_port = port;
336 self
337 }
338
339 pub fn http_port(mut self, port: u16) -> Self {
341 self.config.receiver.http_port = port;
342 self
343 }
344
345 pub fn telemetry_api(mut self, enabled: bool) -> Self {
347 self.config.telemetry_api.enabled = enabled;
348 self
349 }
350
351 pub fn build(self) -> Config {
353 self.config
354 }
355}
356
357impl Default for ConfigBuilder {
358 fn default() -> Self {
359 Self::new()
360 }
361}
362
363#[derive(Debug, Default, Serialize)]
365struct PartialExporterConfig {
366 #[serde(skip_serializing_if = "Option::is_none")]
367 endpoint: Option<String>,
368 #[serde(skip_serializing_if = "Option::is_none")]
369 protocol: Option<Protocol>,
370 #[serde(skip_serializing_if = "Option::is_none")]
371 compression: Option<Compression>,
372 #[serde(skip_serializing_if = "HashMap::is_empty")]
373 headers: HashMap<String, String>,
374}
375
376#[derive(Debug, Default, Serialize)]
378struct PartialConfig {
379 #[serde(skip_serializing_if = "is_partial_exporter_empty")]
380 exporter: PartialExporterConfig,
381}
382
383fn is_partial_exporter_empty(config: &PartialExporterConfig) -> bool {
384 config.endpoint.is_none()
385 && config.protocol.is_none()
386 && config.compression.is_none()
387 && config.headers.is_empty()
388}
389
390fn standard_otel_env() -> Serialized<PartialConfig> {
391 let mut config = PartialConfig::default();
392
393 if let Ok(endpoint) = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
394 config.exporter.endpoint = Some(endpoint);
395 }
396
397 if let Ok(protocol) = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL") {
398 config.exporter.protocol = match protocol.to_lowercase().as_str() {
399 "grpc" => Some(Protocol::Grpc),
400 "http/protobuf" | "http" => Some(Protocol::Http),
401 _ => None,
402 };
403 }
404
405 if let Ok(compression) = std::env::var("OTEL_EXPORTER_OTLP_COMPRESSION") {
406 config.exporter.compression = match compression.to_lowercase().as_str() {
407 "gzip" => Some(Compression::Gzip),
408 "none" => Some(Compression::None),
409 _ => None,
410 };
411 }
412
413 if let Ok(headers_str) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
414 for pair in headers_str.split(',') {
415 if let Some((key, value)) = pair.split_once('=') {
416 config
417 .exporter
418 .headers
419 .insert(key.trim().to_string(), value.trim().to_string());
420 }
421 }
422 }
423
424 Serialized::defaults(config)
425}
426
427mod duration_ms {
428 use serde::{Deserialize, Deserializer, Serializer};
429 use std::time::Duration;
430
431 pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
432 where
433 S: Serializer,
434 {
435 serializer.serialize_u64(duration.as_millis() as u64)
436 }
437
438 pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
439 where
440 D: Deserializer<'de>,
441 {
442 let ms = u64::deserialize(deserializer)?;
443 Ok(Duration::from_millis(ms))
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use std::io::Write;
451 use tempfile::NamedTempFile;
452
453 #[test]
454 fn test_default_config() {
455 let config = Config::default();
456
457 assert!(config.exporter.endpoint.is_none());
458 assert_eq!(config.exporter.protocol, Protocol::Http);
459 assert_eq!(config.exporter.timeout, Duration::from_millis(500));
460 assert_eq!(config.exporter.compression, Compression::Gzip);
461
462 assert_eq!(config.receiver.grpc_port, 4317);
463 assert_eq!(config.receiver.http_port, 4318);
464 assert!(config.receiver.grpc_enabled);
465 assert!(config.receiver.http_enabled);
466
467 assert_eq!(config.flush.strategy, FlushStrategy::Default);
468 assert_eq!(config.flush.interval, Duration::from_secs(20));
469
470 assert_eq!(
471 config.correlation.max_correlation_delay,
472 Duration::from_millis(500)
473 );
474 assert!(config.correlation.emit_orphaned_spans);
475
476 assert!(config.telemetry_api.enabled);
477 }
478
479 #[test]
480 fn test_config_builder() {
481 let config = Config::builder()
482 .exporter_endpoint("https://collector:4318")
483 .exporter_protocol(Protocol::Grpc)
484 .exporter_timeout(Duration::from_millis(1000))
485 .flush_strategy(FlushStrategy::Continuous)
486 .flush_interval(Duration::from_secs(10))
487 .correlation_delay(Duration::from_millis(200))
488 .emit_orphaned_spans(false)
489 .grpc_receiver(false)
490 .http_receiver(true)
491 .grpc_port(5317)
492 .http_port(5318)
493 .telemetry_api(false)
494 .build();
495
496 assert_eq!(
497 config.exporter.endpoint,
498 Some("https://collector:4318".to_string())
499 );
500 assert_eq!(config.exporter.protocol, Protocol::Grpc);
501 assert_eq!(config.exporter.timeout, Duration::from_millis(1000));
502 assert_eq!(config.flush.strategy, FlushStrategy::Continuous);
503 assert_eq!(config.flush.interval, Duration::from_secs(10));
504 assert_eq!(
505 config.correlation.max_correlation_delay,
506 Duration::from_millis(200)
507 );
508 assert!(!config.correlation.emit_orphaned_spans);
509 assert!(!config.receiver.grpc_enabled);
510 assert!(config.receiver.http_enabled);
511 assert_eq!(config.receiver.grpc_port, 5317);
512 assert_eq!(config.receiver.http_port, 5318);
513 assert!(!config.telemetry_api.enabled);
514 }
515
516 #[test]
517 fn test_load_from_toml() {
518 let toml_content = r#"
519[exporter]
520endpoint = "https://test-collector:4318"
521protocol = "grpc"
522timeout = 1000
523
524[receiver]
525grpc_port = 5317
526http_port = 5318
527grpc_enabled = false
528
529[flush]
530strategy = "periodic"
531interval = 15000
532
533[correlation]
534max_correlation_delay = 300
535emit_orphaned_spans = false
536"#;
537
538 let mut temp_file = NamedTempFile::new().unwrap();
539 temp_file.write_all(toml_content.as_bytes()).unwrap();
540
541 let config = Config::load_from_path(temp_file.path()).unwrap();
542
543 assert_eq!(
544 config.exporter.endpoint,
545 Some("https://test-collector:4318".to_string())
546 );
547 assert_eq!(config.exporter.protocol, Protocol::Grpc);
548 assert_eq!(config.exporter.timeout, Duration::from_millis(1000));
549 assert_eq!(config.receiver.grpc_port, 5317);
550 assert_eq!(config.receiver.http_port, 5318);
551 assert!(!config.receiver.grpc_enabled);
552 assert_eq!(config.flush.strategy, FlushStrategy::Periodic);
553 assert_eq!(config.flush.interval, Duration::from_secs(15));
554 assert_eq!(
555 config.correlation.max_correlation_delay,
556 Duration::from_millis(300)
557 );
558 assert!(!config.correlation.emit_orphaned_spans);
559 }
560
561 #[test]
562 fn test_load_nonexistent_file_uses_defaults() {
563 let config = Config::load_from_path("/nonexistent/path/config.toml").unwrap();
564
565 assert!(config.exporter.endpoint.is_none());
566 assert_eq!(config.receiver.grpc_port, 4317);
567 }
568
569 #[test]
570 fn test_protocol_serialization() {
571 assert_eq!(serde_json::to_string(&Protocol::Grpc).unwrap(), "\"grpc\"");
572 assert_eq!(serde_json::to_string(&Protocol::Http).unwrap(), "\"http\"");
573 }
574
575 #[test]
576 fn test_compression_serialization() {
577 assert_eq!(
578 serde_json::to_string(&Compression::None).unwrap(),
579 "\"none\""
580 );
581 assert_eq!(
582 serde_json::to_string(&Compression::Gzip).unwrap(),
583 "\"gzip\""
584 );
585 }
586
587 #[test]
588 fn test_flush_strategy_serialization() {
589 assert_eq!(
590 serde_json::to_string(&FlushStrategy::Default).unwrap(),
591 "\"default\""
592 );
593 assert_eq!(
594 serde_json::to_string(&FlushStrategy::End).unwrap(),
595 "\"end\""
596 );
597 assert_eq!(
598 serde_json::to_string(&FlushStrategy::Periodic).unwrap(),
599 "\"periodic\""
600 );
601 assert_eq!(
602 serde_json::to_string(&FlushStrategy::Continuous).unwrap(),
603 "\"continuous\""
604 );
605 }
606}