opentelemetry_lambda_extension/
config.rs

1//! Configuration loading and management.
2//!
3//! This module provides layered configuration for the extension using figment.
4//! Configuration is loaded from (in order of priority):
5//! 1. Default values (compiled in)
6//! 2. Config file: `/var/task/otel-extension.toml` (optional)
7//! 3. Environment variables (prefix: `LAMBDA_OTEL_`)
8
9use figment::{
10    Figment,
11    providers::{Env, Format, Serialized, Toml},
12};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16use std::time::Duration;
17
18const DEFAULT_CONFIG_PATH: &str = "/var/task/otel-extension.toml";
19const ENV_PREFIX: &str = "LAMBDA_OTEL_";
20
21/// OTLP protocol for export.
22#[non_exhaustive]
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
24#[serde(rename_all = "lowercase")]
25pub enum Protocol {
26    /// gRPC protocol (port 4317).
27    Grpc,
28    /// HTTP/protobuf protocol (port 4318).
29    #[default]
30    Http,
31}
32
33/// Compression algorithm for OTLP export.
34#[non_exhaustive]
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
36#[serde(rename_all = "lowercase")]
37pub enum Compression {
38    /// No compression.
39    None,
40    /// Gzip compression.
41    #[default]
42    Gzip,
43}
44
45/// Flush strategy for buffered signals.
46#[non_exhaustive]
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
48#[serde(rename_all = "lowercase")]
49pub enum FlushStrategy {
50    /// Adaptive strategy based on invocation patterns.
51    #[default]
52    Default,
53    /// Flush at the end of each invocation.
54    End,
55    /// Periodic flush at fixed intervals.
56    Periodic,
57    /// Continuous flush every 20 seconds.
58    Continuous,
59}
60
61/// Main configuration struct for the extension.
62#[derive(Debug, Clone, Default, Serialize, Deserialize)]
63#[serde(default)]
64pub struct Config {
65    /// OTLP exporter configuration.
66    pub exporter: ExporterConfig,
67    /// OTLP receiver configuration.
68    pub receiver: ReceiverConfig,
69    /// Flush behaviour configuration.
70    pub flush: FlushConfig,
71    /// Span correlation configuration.
72    pub correlation: CorrelationConfig,
73    /// Telemetry API configuration.
74    pub telemetry_api: TelemetryApiConfig,
75}
76
77impl Config {
78    /// Loads configuration from all sources.
79    ///
80    /// Configuration is loaded in the following order (later sources override earlier):
81    /// 1. Default values
82    /// 2. Config file at `/var/task/otel-extension.toml` (if it exists)
83    /// 3. Environment variables with `LAMBDA_OTEL_` prefix
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if configuration parsing fails.
88    #[allow(clippy::result_large_err)]
89    pub fn load() -> Result<Self, figment::Error> {
90        Self::load_from_path(DEFAULT_CONFIG_PATH)
91    }
92
93    /// Loads configuration from a custom config file path.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if configuration parsing fails.
98    #[allow(clippy::result_large_err)]
99    pub fn load_from_path<P: AsRef<Path>>(config_path: P) -> Result<Self, figment::Error> {
100        let mut figment = Figment::from(Serialized::defaults(Config::default()));
101
102        if config_path.as_ref().exists() {
103            figment = figment.merge(Toml::file(config_path));
104        }
105
106        figment = figment.merge(Env::prefixed(ENV_PREFIX).split("_"));
107
108        figment.extract()
109    }
110
111    /// Creates a new config builder for testing.
112    pub fn builder() -> ConfigBuilder {
113        ConfigBuilder::new()
114    }
115}
116
117/// OTLP exporter configuration.
118#[derive(Debug, Clone, Serialize, Deserialize)]
119#[serde(default)]
120pub struct ExporterConfig {
121    /// OTLP endpoint URL.
122    pub endpoint: Option<String>,
123    /// Protocol to use for export.
124    pub protocol: Protocol,
125    /// Request timeout in milliseconds.
126    #[serde(with = "duration_ms")]
127    pub timeout: Duration,
128    /// Compression algorithm.
129    pub compression: Compression,
130    /// Additional headers to send with requests.
131    #[serde(default)]
132    pub headers: HashMap<String, String>,
133}
134
135impl Default for ExporterConfig {
136    fn default() -> Self {
137        Self {
138            endpoint: None,
139            protocol: Protocol::Http,
140            timeout: Duration::from_millis(500),
141            compression: Compression::Gzip,
142            headers: HashMap::new(),
143        }
144    }
145}
146
147/// OTLP receiver configuration.
148#[derive(Debug, Clone, Serialize, Deserialize)]
149#[serde(default)]
150pub struct ReceiverConfig {
151    /// gRPC port (default 4317).
152    pub grpc_port: u16,
153    /// HTTP port (default 4318).
154    pub http_port: u16,
155    /// Whether to enable the gRPC receiver.
156    pub grpc_enabled: bool,
157    /// Whether to enable the HTTP receiver.
158    pub http_enabled: bool,
159}
160
161impl Default for ReceiverConfig {
162    fn default() -> Self {
163        Self {
164            grpc_port: 4317,
165            http_port: 4318,
166            grpc_enabled: true,
167            http_enabled: true,
168        }
169    }
170}
171
172/// Flush behaviour configuration.
173#[derive(Debug, Clone, Serialize, Deserialize)]
174#[serde(default)]
175pub struct FlushConfig {
176    /// Flush strategy to use.
177    pub strategy: FlushStrategy,
178    /// Periodic flush interval in milliseconds.
179    #[serde(with = "duration_ms")]
180    pub interval: Duration,
181    /// Maximum batch size in bytes.
182    pub max_batch_bytes: usize,
183    /// Maximum entries per batch.
184    pub max_batch_entries: usize,
185}
186
187impl Default for FlushConfig {
188    fn default() -> Self {
189        Self {
190            strategy: FlushStrategy::Default,
191            interval: Duration::from_secs(20),
192            max_batch_bytes: 4 * 1024 * 1024,
193            max_batch_entries: 1000,
194        }
195    }
196}
197
198/// Span correlation configuration.
199#[derive(Debug, Clone, Serialize, Deserialize)]
200#[serde(default)]
201pub struct CorrelationConfig {
202    /// Maximum time to wait for parent span context in milliseconds.
203    #[serde(with = "duration_ms")]
204    pub max_correlation_delay: Duration,
205    /// Maximum buffered events per invocation.
206    pub max_buffered_events_per_invocation: usize,
207    /// Maximum total buffered events.
208    pub max_total_buffered_events: usize,
209    /// Maximum lifetime for invocation context in milliseconds.
210    #[serde(with = "duration_ms")]
211    pub max_invocation_lifetime: Duration,
212    /// Whether to emit orphaned spans without parent context.
213    pub emit_orphaned_spans: bool,
214}
215
216impl Default for CorrelationConfig {
217    fn default() -> Self {
218        Self {
219            max_correlation_delay: Duration::from_millis(500),
220            max_buffered_events_per_invocation: 50,
221            max_total_buffered_events: 500,
222            max_invocation_lifetime: Duration::from_secs(15 * 60),
223            emit_orphaned_spans: true,
224        }
225    }
226}
227
228/// Telemetry API configuration.
229#[derive(Debug, Clone, Serialize, Deserialize)]
230#[serde(default)]
231pub struct TelemetryApiConfig {
232    /// Whether to enable Telemetry API subscription.
233    pub enabled: bool,
234    /// Port for receiving Telemetry API events.
235    pub listener_port: u16,
236    /// Buffer size for Telemetry API events.
237    pub buffer_size: usize,
238}
239
240impl Default for TelemetryApiConfig {
241    fn default() -> Self {
242        Self {
243            enabled: true,
244            listener_port: 9999,
245            buffer_size: 256,
246        }
247    }
248}
249
250/// Builder for constructing configuration programmatically.
251#[must_use = "builders do nothing unless .build() is called"]
252pub struct ConfigBuilder {
253    config: Config,
254}
255
256impl ConfigBuilder {
257    /// Creates a new config builder with default values.
258    pub fn new() -> Self {
259        Self {
260            config: Config::default(),
261        }
262    }
263
264    /// Sets the exporter endpoint.
265    pub fn exporter_endpoint(mut self, endpoint: impl Into<String>) -> Self {
266        self.config.exporter.endpoint = Some(endpoint.into());
267        self
268    }
269
270    /// Sets the exporter protocol.
271    pub fn exporter_protocol(mut self, protocol: Protocol) -> Self {
272        self.config.exporter.protocol = protocol;
273        self
274    }
275
276    /// Sets the exporter timeout.
277    pub fn exporter_timeout(mut self, timeout: Duration) -> Self {
278        self.config.exporter.timeout = timeout;
279        self
280    }
281
282    /// Sets the flush strategy.
283    pub fn flush_strategy(mut self, strategy: FlushStrategy) -> Self {
284        self.config.flush.strategy = strategy;
285        self
286    }
287
288    /// Sets the flush interval.
289    pub fn flush_interval(mut self, interval: Duration) -> Self {
290        self.config.flush.interval = interval;
291        self
292    }
293
294    /// Sets the correlation delay.
295    pub fn correlation_delay(mut self, delay: Duration) -> Self {
296        self.config.correlation.max_correlation_delay = delay;
297        self
298    }
299
300    /// Sets whether to emit orphaned spans.
301    pub fn emit_orphaned_spans(mut self, emit: bool) -> Self {
302        self.config.correlation.emit_orphaned_spans = emit;
303        self
304    }
305
306    /// Enables or disables the gRPC receiver.
307    pub fn grpc_receiver(mut self, enabled: bool) -> Self {
308        self.config.receiver.grpc_enabled = enabled;
309        self
310    }
311
312    /// Enables or disables the HTTP receiver.
313    pub fn http_receiver(mut self, enabled: bool) -> Self {
314        self.config.receiver.http_enabled = enabled;
315        self
316    }
317
318    /// Sets the gRPC receiver port.
319    pub fn grpc_port(mut self, port: u16) -> Self {
320        self.config.receiver.grpc_port = port;
321        self
322    }
323
324    /// Sets the HTTP receiver port.
325    pub fn http_port(mut self, port: u16) -> Self {
326        self.config.receiver.http_port = port;
327        self
328    }
329
330    /// Enables or disables the Telemetry API.
331    pub fn telemetry_api(mut self, enabled: bool) -> Self {
332        self.config.telemetry_api.enabled = enabled;
333        self
334    }
335
336    /// Builds the configuration.
337    pub fn build(self) -> Config {
338        self.config
339    }
340}
341
342impl Default for ConfigBuilder {
343    fn default() -> Self {
344        Self::new()
345    }
346}
347
348mod duration_ms {
349    use serde::{Deserialize, Deserializer, Serializer};
350    use std::time::Duration;
351
352    pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
353    where
354        S: Serializer,
355    {
356        serializer.serialize_u64(duration.as_millis() as u64)
357    }
358
359    pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
360    where
361        D: Deserializer<'de>,
362    {
363        let ms = u64::deserialize(deserializer)?;
364        Ok(Duration::from_millis(ms))
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use std::io::Write;
372    use tempfile::NamedTempFile;
373
374    #[test]
375    fn test_default_config() {
376        let config = Config::default();
377
378        assert!(config.exporter.endpoint.is_none());
379        assert_eq!(config.exporter.protocol, Protocol::Http);
380        assert_eq!(config.exporter.timeout, Duration::from_millis(500));
381        assert_eq!(config.exporter.compression, Compression::Gzip);
382
383        assert_eq!(config.receiver.grpc_port, 4317);
384        assert_eq!(config.receiver.http_port, 4318);
385        assert!(config.receiver.grpc_enabled);
386        assert!(config.receiver.http_enabled);
387
388        assert_eq!(config.flush.strategy, FlushStrategy::Default);
389        assert_eq!(config.flush.interval, Duration::from_secs(20));
390
391        assert_eq!(
392            config.correlation.max_correlation_delay,
393            Duration::from_millis(500)
394        );
395        assert!(config.correlation.emit_orphaned_spans);
396
397        assert!(config.telemetry_api.enabled);
398    }
399
400    #[test]
401    fn test_config_builder() {
402        let config = Config::builder()
403            .exporter_endpoint("https://collector:4318")
404            .exporter_protocol(Protocol::Grpc)
405            .exporter_timeout(Duration::from_millis(1000))
406            .flush_strategy(FlushStrategy::Continuous)
407            .flush_interval(Duration::from_secs(10))
408            .correlation_delay(Duration::from_millis(200))
409            .emit_orphaned_spans(false)
410            .grpc_receiver(false)
411            .http_receiver(true)
412            .grpc_port(5317)
413            .http_port(5318)
414            .telemetry_api(false)
415            .build();
416
417        assert_eq!(
418            config.exporter.endpoint,
419            Some("https://collector:4318".to_string())
420        );
421        assert_eq!(config.exporter.protocol, Protocol::Grpc);
422        assert_eq!(config.exporter.timeout, Duration::from_millis(1000));
423        assert_eq!(config.flush.strategy, FlushStrategy::Continuous);
424        assert_eq!(config.flush.interval, Duration::from_secs(10));
425        assert_eq!(
426            config.correlation.max_correlation_delay,
427            Duration::from_millis(200)
428        );
429        assert!(!config.correlation.emit_orphaned_spans);
430        assert!(!config.receiver.grpc_enabled);
431        assert!(config.receiver.http_enabled);
432        assert_eq!(config.receiver.grpc_port, 5317);
433        assert_eq!(config.receiver.http_port, 5318);
434        assert!(!config.telemetry_api.enabled);
435    }
436
437    #[test]
438    fn test_load_from_toml() {
439        let toml_content = r#"
440[exporter]
441endpoint = "https://test-collector:4318"
442protocol = "grpc"
443timeout = 1000
444
445[receiver]
446grpc_port = 5317
447http_port = 5318
448grpc_enabled = false
449
450[flush]
451strategy = "periodic"
452interval = 15000
453
454[correlation]
455max_correlation_delay = 300
456emit_orphaned_spans = false
457"#;
458
459        let mut temp_file = NamedTempFile::new().unwrap();
460        temp_file.write_all(toml_content.as_bytes()).unwrap();
461
462        let config = Config::load_from_path(temp_file.path()).unwrap();
463
464        assert_eq!(
465            config.exporter.endpoint,
466            Some("https://test-collector:4318".to_string())
467        );
468        assert_eq!(config.exporter.protocol, Protocol::Grpc);
469        assert_eq!(config.exporter.timeout, Duration::from_millis(1000));
470        assert_eq!(config.receiver.grpc_port, 5317);
471        assert_eq!(config.receiver.http_port, 5318);
472        assert!(!config.receiver.grpc_enabled);
473        assert_eq!(config.flush.strategy, FlushStrategy::Periodic);
474        assert_eq!(config.flush.interval, Duration::from_secs(15));
475        assert_eq!(
476            config.correlation.max_correlation_delay,
477            Duration::from_millis(300)
478        );
479        assert!(!config.correlation.emit_orphaned_spans);
480    }
481
482    #[test]
483    fn test_load_nonexistent_file_uses_defaults() {
484        let config = Config::load_from_path("/nonexistent/path/config.toml").unwrap();
485
486        assert!(config.exporter.endpoint.is_none());
487        assert_eq!(config.receiver.grpc_port, 4317);
488    }
489
490    #[test]
491    fn test_protocol_serialization() {
492        assert_eq!(serde_json::to_string(&Protocol::Grpc).unwrap(), "\"grpc\"");
493        assert_eq!(serde_json::to_string(&Protocol::Http).unwrap(), "\"http\"");
494    }
495
496    #[test]
497    fn test_compression_serialization() {
498        assert_eq!(
499            serde_json::to_string(&Compression::None).unwrap(),
500            "\"none\""
501        );
502        assert_eq!(
503            serde_json::to_string(&Compression::Gzip).unwrap(),
504            "\"gzip\""
505        );
506    }
507
508    #[test]
509    fn test_flush_strategy_serialization() {
510        assert_eq!(
511            serde_json::to_string(&FlushStrategy::Default).unwrap(),
512            "\"default\""
513        );
514        assert_eq!(
515            serde_json::to_string(&FlushStrategy::End).unwrap(),
516            "\"end\""
517        );
518        assert_eq!(
519            serde_json::to_string(&FlushStrategy::Periodic).unwrap(),
520            "\"periodic\""
521        );
522        assert_eq!(
523            serde_json::to_string(&FlushStrategy::Continuous).unwrap(),
524            "\"continuous\""
525        );
526    }
527}