opentelemetry_lambda_extension/
config.rs

1//! Configuration loading and management.
2//!
3//! This module provides layered configuration for the extension using figment.
4//! Configuration is loaded from (in order of priority):
5//! 1. Default values (compiled in)
6//! 2. Config file: `/var/task/otel-extension.toml` (optional)
7//! 3. Standard OpenTelemetry environment variables (`OTEL_*`)
8//! 4. Extension-specific environment variables (`LAMBDA_OTEL_*`)
9//!
10//! # Supported Standard Environment Variables
11//!
12//! The following standard OpenTelemetry environment variables are supported:
13//!
14//! | Variable | Config Path | Description |
15//! |----------|-------------|-------------|
16//! | `OTEL_EXPORTER_OTLP_ENDPOINT` | `exporter.endpoint` | OTLP endpoint URL |
17//! | `OTEL_EXPORTER_OTLP_PROTOCOL` | `exporter.protocol` | Protocol (http or grpc) |
18//! | `OTEL_EXPORTER_OTLP_HEADERS` | `exporter.headers` | Comma-separated key=value pairs |
19//! | `OTEL_EXPORTER_OTLP_COMPRESSION` | `exporter.compression` | Compression (gzip or none) |
20//!
21//! Extension-specific variables with `LAMBDA_OTEL_` prefix take precedence.
22
23use figment::{
24    Figment,
25    providers::{Env, Format, Serialized, Toml},
26};
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::path::Path;
30use std::time::Duration;
31
32const DEFAULT_CONFIG_PATH: &str = "/var/task/otel-extension.toml";
33const ENV_PREFIX: &str = "LAMBDA_OTEL_";
34
35/// OTLP protocol for export.
36#[non_exhaustive]
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
38#[serde(rename_all = "lowercase")]
39pub enum Protocol {
40    /// gRPC protocol (port 4317).
41    Grpc,
42    /// HTTP/protobuf protocol (port 4318).
43    #[default]
44    Http,
45}
46
47/// Compression algorithm for OTLP export.
48#[non_exhaustive]
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
50#[serde(rename_all = "lowercase")]
51pub enum Compression {
52    /// No compression.
53    None,
54    /// Gzip compression.
55    #[default]
56    Gzip,
57}
58
59/// Flush strategy for buffered signals.
60#[non_exhaustive]
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
62#[serde(rename_all = "lowercase")]
63pub enum FlushStrategy {
64    /// Adaptive strategy based on invocation patterns.
65    #[default]
66    Default,
67    /// Flush at the end of each invocation.
68    End,
69    /// Periodic flush at fixed intervals.
70    Periodic,
71    /// Continuous flush every 20 seconds.
72    Continuous,
73}
74
75/// Main configuration struct for the extension.
76#[derive(Debug, Clone, Default, Serialize, Deserialize)]
77#[serde(default)]
78pub struct Config {
79    /// OTLP exporter configuration.
80    pub exporter: ExporterConfig,
81    /// OTLP receiver configuration.
82    pub receiver: ReceiverConfig,
83    /// Flush behaviour configuration.
84    pub flush: FlushConfig,
85    /// Span correlation configuration.
86    pub correlation: CorrelationConfig,
87    /// Telemetry API configuration.
88    pub telemetry_api: TelemetryApiConfig,
89}
90
91impl Config {
92    /// Loads configuration from all sources.
93    ///
94    /// Configuration is loaded in the following order (later sources override earlier):
95    /// 1. Default values
96    /// 2. Config file at `/var/task/otel-extension.toml` (if it exists)
97    /// 3. Environment variables with `LAMBDA_OTEL_` prefix
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if configuration parsing fails.
102    #[allow(clippy::result_large_err)]
103    pub fn load() -> Result<Self, figment::Error> {
104        Self::load_from_path(DEFAULT_CONFIG_PATH)
105    }
106
107    /// Loads configuration from a custom config file path.
108    ///
109    /// # Errors
110    ///
111    /// Returns an error if configuration parsing fails.
112    #[allow(clippy::result_large_err)]
113    pub fn load_from_path<P: AsRef<Path>>(config_path: P) -> Result<Self, figment::Error> {
114        let mut figment = Figment::from(Serialized::defaults(Config::default()));
115
116        if config_path.as_ref().exists() {
117            figment = figment.merge(Toml::file(config_path));
118        }
119
120        figment = figment.merge(standard_otel_env());
121        figment = figment.merge(Env::prefixed(ENV_PREFIX).split("_"));
122
123        figment.extract()
124    }
125
126    /// Creates a new config builder for testing.
127    pub fn builder() -> ConfigBuilder {
128        ConfigBuilder::new()
129    }
130}
131
132/// OTLP exporter configuration.
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(default)]
135pub struct ExporterConfig {
136    /// OTLP endpoint URL.
137    pub endpoint: Option<String>,
138    /// Protocol to use for export.
139    pub protocol: Protocol,
140    /// Request timeout in milliseconds.
141    #[serde(with = "duration_ms")]
142    pub timeout: Duration,
143    /// Compression algorithm.
144    pub compression: Compression,
145    /// Additional headers to send with requests.
146    #[serde(default)]
147    pub headers: HashMap<String, String>,
148}
149
150impl Default for ExporterConfig {
151    fn default() -> Self {
152        Self {
153            endpoint: None,
154            protocol: Protocol::Http,
155            timeout: Duration::from_millis(500),
156            compression: Compression::Gzip,
157            headers: HashMap::new(),
158        }
159    }
160}
161
162/// OTLP receiver configuration.
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(default)]
165pub struct ReceiverConfig {
166    /// gRPC port (default 4317).
167    pub grpc_port: u16,
168    /// HTTP port (default 4318).
169    pub http_port: u16,
170    /// Whether to enable the gRPC receiver.
171    pub grpc_enabled: bool,
172    /// Whether to enable the HTTP receiver.
173    pub http_enabled: bool,
174}
175
176impl Default for ReceiverConfig {
177    fn default() -> Self {
178        Self {
179            grpc_port: 4317,
180            http_port: 4318,
181            grpc_enabled: true,
182            http_enabled: true,
183        }
184    }
185}
186
187/// Flush behaviour configuration.
188#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(default)]
190pub struct FlushConfig {
191    /// Flush strategy to use.
192    pub strategy: FlushStrategy,
193    /// Periodic flush interval in milliseconds.
194    #[serde(with = "duration_ms")]
195    pub interval: Duration,
196    /// Maximum batch size in bytes.
197    pub max_batch_bytes: usize,
198    /// Maximum entries per batch.
199    pub max_batch_entries: usize,
200}
201
202impl Default for FlushConfig {
203    fn default() -> Self {
204        Self {
205            strategy: FlushStrategy::Default,
206            interval: Duration::from_secs(20),
207            max_batch_bytes: 4 * 1024 * 1024,
208            max_batch_entries: 1000,
209        }
210    }
211}
212
213/// Span correlation configuration.
214#[derive(Debug, Clone, Serialize, Deserialize)]
215#[serde(default)]
216pub struct CorrelationConfig {
217    /// Maximum time to wait for parent span context in milliseconds.
218    #[serde(with = "duration_ms")]
219    pub max_correlation_delay: Duration,
220    /// Maximum buffered events per invocation.
221    pub max_buffered_events_per_invocation: usize,
222    /// Maximum total buffered events.
223    pub max_total_buffered_events: usize,
224    /// Maximum lifetime for invocation context in milliseconds.
225    #[serde(with = "duration_ms")]
226    pub max_invocation_lifetime: Duration,
227    /// Whether to emit orphaned spans without parent context.
228    pub emit_orphaned_spans: bool,
229}
230
231impl Default for CorrelationConfig {
232    fn default() -> Self {
233        Self {
234            max_correlation_delay: Duration::from_millis(500),
235            max_buffered_events_per_invocation: 50,
236            max_total_buffered_events: 500,
237            max_invocation_lifetime: Duration::from_secs(15 * 60),
238            emit_orphaned_spans: true,
239        }
240    }
241}
242
243/// Telemetry API configuration.
244#[derive(Debug, Clone, Serialize, Deserialize)]
245#[serde(default)]
246pub struct TelemetryApiConfig {
247    /// Whether to enable Telemetry API subscription.
248    pub enabled: bool,
249    /// Port for receiving Telemetry API events.
250    pub listener_port: u16,
251    /// Buffer size for Telemetry API events.
252    pub buffer_size: usize,
253}
254
255impl Default for TelemetryApiConfig {
256    fn default() -> Self {
257        Self {
258            enabled: true,
259            listener_port: 9999,
260            buffer_size: 256,
261        }
262    }
263}
264
265/// Builder for constructing configuration programmatically.
266#[must_use = "builders do nothing unless .build() is called"]
267pub struct ConfigBuilder {
268    config: Config,
269}
270
271impl ConfigBuilder {
272    /// Creates a new config builder with default values.
273    pub fn new() -> Self {
274        Self {
275            config: Config::default(),
276        }
277    }
278
279    /// Sets the exporter endpoint.
280    pub fn exporter_endpoint(mut self, endpoint: impl Into<String>) -> Self {
281        self.config.exporter.endpoint = Some(endpoint.into());
282        self
283    }
284
285    /// Sets the exporter protocol.
286    pub fn exporter_protocol(mut self, protocol: Protocol) -> Self {
287        self.config.exporter.protocol = protocol;
288        self
289    }
290
291    /// Sets the exporter timeout.
292    pub fn exporter_timeout(mut self, timeout: Duration) -> Self {
293        self.config.exporter.timeout = timeout;
294        self
295    }
296
297    /// Sets the flush strategy.
298    pub fn flush_strategy(mut self, strategy: FlushStrategy) -> Self {
299        self.config.flush.strategy = strategy;
300        self
301    }
302
303    /// Sets the flush interval.
304    pub fn flush_interval(mut self, interval: Duration) -> Self {
305        self.config.flush.interval = interval;
306        self
307    }
308
309    /// Sets the correlation delay.
310    pub fn correlation_delay(mut self, delay: Duration) -> Self {
311        self.config.correlation.max_correlation_delay = delay;
312        self
313    }
314
315    /// Sets whether to emit orphaned spans.
316    pub fn emit_orphaned_spans(mut self, emit: bool) -> Self {
317        self.config.correlation.emit_orphaned_spans = emit;
318        self
319    }
320
321    /// Enables or disables the gRPC receiver.
322    pub fn grpc_receiver(mut self, enabled: bool) -> Self {
323        self.config.receiver.grpc_enabled = enabled;
324        self
325    }
326
327    /// Enables or disables the HTTP receiver.
328    pub fn http_receiver(mut self, enabled: bool) -> Self {
329        self.config.receiver.http_enabled = enabled;
330        self
331    }
332
333    /// Sets the gRPC receiver port.
334    pub fn grpc_port(mut self, port: u16) -> Self {
335        self.config.receiver.grpc_port = port;
336        self
337    }
338
339    /// Sets the HTTP receiver port.
340    pub fn http_port(mut self, port: u16) -> Self {
341        self.config.receiver.http_port = port;
342        self
343    }
344
345    /// Enables or disables the Telemetry API.
346    pub fn telemetry_api(mut self, enabled: bool) -> Self {
347        self.config.telemetry_api.enabled = enabled;
348        self
349    }
350
351    /// Builds the configuration.
352    pub fn build(self) -> Config {
353        self.config
354    }
355}
356
357impl Default for ConfigBuilder {
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363/// Partial exporter config for standard OTEL env var overrides.
364#[derive(Debug, Default, Serialize)]
365struct PartialExporterConfig {
366    #[serde(skip_serializing_if = "Option::is_none")]
367    endpoint: Option<String>,
368    #[serde(skip_serializing_if = "Option::is_none")]
369    protocol: Option<Protocol>,
370    #[serde(skip_serializing_if = "Option::is_none")]
371    compression: Option<Compression>,
372    #[serde(skip_serializing_if = "HashMap::is_empty")]
373    headers: HashMap<String, String>,
374}
375
376/// Partial config for standard OTEL env var overrides.
377#[derive(Debug, Default, Serialize)]
378struct PartialConfig {
379    #[serde(skip_serializing_if = "is_partial_exporter_empty")]
380    exporter: PartialExporterConfig,
381}
382
383fn is_partial_exporter_empty(config: &PartialExporterConfig) -> bool {
384    config.endpoint.is_none()
385        && config.protocol.is_none()
386        && config.compression.is_none()
387        && config.headers.is_empty()
388}
389
390fn standard_otel_env() -> Serialized<PartialConfig> {
391    let mut config = PartialConfig::default();
392
393    if let Ok(endpoint) = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") {
394        config.exporter.endpoint = Some(endpoint);
395    }
396
397    if let Ok(protocol) = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL") {
398        config.exporter.protocol = match protocol.to_lowercase().as_str() {
399            "grpc" => Some(Protocol::Grpc),
400            "http/protobuf" | "http" => Some(Protocol::Http),
401            _ => None,
402        };
403    }
404
405    if let Ok(compression) = std::env::var("OTEL_EXPORTER_OTLP_COMPRESSION") {
406        config.exporter.compression = match compression.to_lowercase().as_str() {
407            "gzip" => Some(Compression::Gzip),
408            "none" => Some(Compression::None),
409            _ => None,
410        };
411    }
412
413    if let Ok(headers_str) = std::env::var("OTEL_EXPORTER_OTLP_HEADERS") {
414        for pair in headers_str.split(',') {
415            if let Some((key, value)) = pair.split_once('=') {
416                config
417                    .exporter
418                    .headers
419                    .insert(key.trim().to_string(), value.trim().to_string());
420            }
421        }
422    }
423
424    Serialized::defaults(config)
425}
426
427mod duration_ms {
428    use serde::{Deserialize, Deserializer, Serializer};
429    use std::time::Duration;
430
431    pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
432    where
433        S: Serializer,
434    {
435        serializer.serialize_u64(duration.as_millis() as u64)
436    }
437
438    pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
439    where
440        D: Deserializer<'de>,
441    {
442        let ms = u64::deserialize(deserializer)?;
443        Ok(Duration::from_millis(ms))
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use std::io::Write;
451    use tempfile::NamedTempFile;
452
453    #[test]
454    fn test_default_config() {
455        let config = Config::default();
456
457        assert!(config.exporter.endpoint.is_none());
458        assert_eq!(config.exporter.protocol, Protocol::Http);
459        assert_eq!(config.exporter.timeout, Duration::from_millis(500));
460        assert_eq!(config.exporter.compression, Compression::Gzip);
461
462        assert_eq!(config.receiver.grpc_port, 4317);
463        assert_eq!(config.receiver.http_port, 4318);
464        assert!(config.receiver.grpc_enabled);
465        assert!(config.receiver.http_enabled);
466
467        assert_eq!(config.flush.strategy, FlushStrategy::Default);
468        assert_eq!(config.flush.interval, Duration::from_secs(20));
469
470        assert_eq!(
471            config.correlation.max_correlation_delay,
472            Duration::from_millis(500)
473        );
474        assert!(config.correlation.emit_orphaned_spans);
475
476        assert!(config.telemetry_api.enabled);
477    }
478
479    #[test]
480    fn test_config_builder() {
481        let config = Config::builder()
482            .exporter_endpoint("https://collector:4318")
483            .exporter_protocol(Protocol::Grpc)
484            .exporter_timeout(Duration::from_millis(1000))
485            .flush_strategy(FlushStrategy::Continuous)
486            .flush_interval(Duration::from_secs(10))
487            .correlation_delay(Duration::from_millis(200))
488            .emit_orphaned_spans(false)
489            .grpc_receiver(false)
490            .http_receiver(true)
491            .grpc_port(5317)
492            .http_port(5318)
493            .telemetry_api(false)
494            .build();
495
496        assert_eq!(
497            config.exporter.endpoint,
498            Some("https://collector:4318".to_string())
499        );
500        assert_eq!(config.exporter.protocol, Protocol::Grpc);
501        assert_eq!(config.exporter.timeout, Duration::from_millis(1000));
502        assert_eq!(config.flush.strategy, FlushStrategy::Continuous);
503        assert_eq!(config.flush.interval, Duration::from_secs(10));
504        assert_eq!(
505            config.correlation.max_correlation_delay,
506            Duration::from_millis(200)
507        );
508        assert!(!config.correlation.emit_orphaned_spans);
509        assert!(!config.receiver.grpc_enabled);
510        assert!(config.receiver.http_enabled);
511        assert_eq!(config.receiver.grpc_port, 5317);
512        assert_eq!(config.receiver.http_port, 5318);
513        assert!(!config.telemetry_api.enabled);
514    }
515
516    #[test]
517    fn test_load_from_toml() {
518        let toml_content = r#"
519[exporter]
520endpoint = "https://test-collector:4318"
521protocol = "grpc"
522timeout = 1000
523
524[receiver]
525grpc_port = 5317
526http_port = 5318
527grpc_enabled = false
528
529[flush]
530strategy = "periodic"
531interval = 15000
532
533[correlation]
534max_correlation_delay = 300
535emit_orphaned_spans = false
536"#;
537
538        let mut temp_file = NamedTempFile::new().unwrap();
539        temp_file.write_all(toml_content.as_bytes()).unwrap();
540
541        let config = Config::load_from_path(temp_file.path()).unwrap();
542
543        assert_eq!(
544            config.exporter.endpoint,
545            Some("https://test-collector:4318".to_string())
546        );
547        assert_eq!(config.exporter.protocol, Protocol::Grpc);
548        assert_eq!(config.exporter.timeout, Duration::from_millis(1000));
549        assert_eq!(config.receiver.grpc_port, 5317);
550        assert_eq!(config.receiver.http_port, 5318);
551        assert!(!config.receiver.grpc_enabled);
552        assert_eq!(config.flush.strategy, FlushStrategy::Periodic);
553        assert_eq!(config.flush.interval, Duration::from_secs(15));
554        assert_eq!(
555            config.correlation.max_correlation_delay,
556            Duration::from_millis(300)
557        );
558        assert!(!config.correlation.emit_orphaned_spans);
559    }
560
561    #[test]
562    fn test_load_nonexistent_file_uses_defaults() {
563        let config = Config::load_from_path("/nonexistent/path/config.toml").unwrap();
564
565        assert!(config.exporter.endpoint.is_none());
566        assert_eq!(config.receiver.grpc_port, 4317);
567    }
568
569    #[test]
570    fn test_protocol_serialization() {
571        assert_eq!(serde_json::to_string(&Protocol::Grpc).unwrap(), "\"grpc\"");
572        assert_eq!(serde_json::to_string(&Protocol::Http).unwrap(), "\"http\"");
573    }
574
575    #[test]
576    fn test_compression_serialization() {
577        assert_eq!(
578            serde_json::to_string(&Compression::None).unwrap(),
579            "\"none\""
580        );
581        assert_eq!(
582            serde_json::to_string(&Compression::Gzip).unwrap(),
583            "\"gzip\""
584        );
585    }
586
587    #[test]
588    fn test_flush_strategy_serialization() {
589        assert_eq!(
590            serde_json::to_string(&FlushStrategy::Default).unwrap(),
591            "\"default\""
592        );
593        assert_eq!(
594            serde_json::to_string(&FlushStrategy::End).unwrap(),
595            "\"end\""
596        );
597        assert_eq!(
598            serde_json::to_string(&FlushStrategy::Periodic).unwrap(),
599            "\"periodic\""
600        );
601        assert_eq!(
602            serde_json::to_string(&FlushStrategy::Continuous).unwrap(),
603            "\"continuous\""
604        );
605    }
606}