opentelemetry_lambda_extension/
config.rs1use figment::{
10 Figment,
11 providers::{Env, Format, Serialized, Toml},
12};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16use std::time::Duration;
17
18const DEFAULT_CONFIG_PATH: &str = "/var/task/otel-extension.toml";
19const ENV_PREFIX: &str = "LAMBDA_OTEL_";
20
21#[non_exhaustive]
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
24#[serde(rename_all = "lowercase")]
25pub enum Protocol {
26 Grpc,
28 #[default]
30 Http,
31}
32
33#[non_exhaustive]
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
36#[serde(rename_all = "lowercase")]
37pub enum Compression {
38 None,
40 #[default]
42 Gzip,
43}
44
45#[non_exhaustive]
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
48#[serde(rename_all = "lowercase")]
49pub enum FlushStrategy {
50 #[default]
52 Default,
53 End,
55 Periodic,
57 Continuous,
59}
60
61#[derive(Debug, Clone, Default, Serialize, Deserialize)]
63#[serde(default)]
64pub struct Config {
65 pub exporter: ExporterConfig,
67 pub receiver: ReceiverConfig,
69 pub flush: FlushConfig,
71 pub correlation: CorrelationConfig,
73 pub telemetry_api: TelemetryApiConfig,
75}
76
77impl Config {
78 #[allow(clippy::result_large_err)]
89 pub fn load() -> Result<Self, figment::Error> {
90 Self::load_from_path(DEFAULT_CONFIG_PATH)
91 }
92
93 #[allow(clippy::result_large_err)]
99 pub fn load_from_path<P: AsRef<Path>>(config_path: P) -> Result<Self, figment::Error> {
100 let mut figment = Figment::from(Serialized::defaults(Config::default()));
101
102 if config_path.as_ref().exists() {
103 figment = figment.merge(Toml::file(config_path));
104 }
105
106 figment = figment.merge(Env::prefixed(ENV_PREFIX).split("_"));
107
108 figment.extract()
109 }
110
111 pub fn builder() -> ConfigBuilder {
113 ConfigBuilder::new()
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119#[serde(default)]
120pub struct ExporterConfig {
121 pub endpoint: Option<String>,
123 pub protocol: Protocol,
125 #[serde(with = "duration_ms")]
127 pub timeout: Duration,
128 pub compression: Compression,
130 #[serde(default)]
132 pub headers: HashMap<String, String>,
133}
134
135impl Default for ExporterConfig {
136 fn default() -> Self {
137 Self {
138 endpoint: None,
139 protocol: Protocol::Http,
140 timeout: Duration::from_millis(500),
141 compression: Compression::Gzip,
142 headers: HashMap::new(),
143 }
144 }
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149#[serde(default)]
150pub struct ReceiverConfig {
151 pub grpc_port: u16,
153 pub http_port: u16,
155 pub grpc_enabled: bool,
157 pub http_enabled: bool,
159}
160
161impl Default for ReceiverConfig {
162 fn default() -> Self {
163 Self {
164 grpc_port: 4317,
165 http_port: 4318,
166 grpc_enabled: true,
167 http_enabled: true,
168 }
169 }
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174#[serde(default)]
175pub struct FlushConfig {
176 pub strategy: FlushStrategy,
178 #[serde(with = "duration_ms")]
180 pub interval: Duration,
181 pub max_batch_bytes: usize,
183 pub max_batch_entries: usize,
185}
186
187impl Default for FlushConfig {
188 fn default() -> Self {
189 Self {
190 strategy: FlushStrategy::Default,
191 interval: Duration::from_secs(20),
192 max_batch_bytes: 4 * 1024 * 1024,
193 max_batch_entries: 1000,
194 }
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200#[serde(default)]
201pub struct CorrelationConfig {
202 #[serde(with = "duration_ms")]
204 pub max_correlation_delay: Duration,
205 pub max_buffered_events_per_invocation: usize,
207 pub max_total_buffered_events: usize,
209 #[serde(with = "duration_ms")]
211 pub max_invocation_lifetime: Duration,
212 pub emit_orphaned_spans: bool,
214}
215
216impl Default for CorrelationConfig {
217 fn default() -> Self {
218 Self {
219 max_correlation_delay: Duration::from_millis(500),
220 max_buffered_events_per_invocation: 50,
221 max_total_buffered_events: 500,
222 max_invocation_lifetime: Duration::from_secs(15 * 60),
223 emit_orphaned_spans: true,
224 }
225 }
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230#[serde(default)]
231pub struct TelemetryApiConfig {
232 pub enabled: bool,
234 pub listener_port: u16,
236 pub buffer_size: usize,
238}
239
240impl Default for TelemetryApiConfig {
241 fn default() -> Self {
242 Self {
243 enabled: true,
244 listener_port: 9999,
245 buffer_size: 256,
246 }
247 }
248}
249
250#[must_use = "builders do nothing unless .build() is called"]
252pub struct ConfigBuilder {
253 config: Config,
254}
255
256impl ConfigBuilder {
257 pub fn new() -> Self {
259 Self {
260 config: Config::default(),
261 }
262 }
263
264 pub fn exporter_endpoint(mut self, endpoint: impl Into<String>) -> Self {
266 self.config.exporter.endpoint = Some(endpoint.into());
267 self
268 }
269
270 pub fn exporter_protocol(mut self, protocol: Protocol) -> Self {
272 self.config.exporter.protocol = protocol;
273 self
274 }
275
276 pub fn exporter_timeout(mut self, timeout: Duration) -> Self {
278 self.config.exporter.timeout = timeout;
279 self
280 }
281
282 pub fn flush_strategy(mut self, strategy: FlushStrategy) -> Self {
284 self.config.flush.strategy = strategy;
285 self
286 }
287
288 pub fn flush_interval(mut self, interval: Duration) -> Self {
290 self.config.flush.interval = interval;
291 self
292 }
293
294 pub fn correlation_delay(mut self, delay: Duration) -> Self {
296 self.config.correlation.max_correlation_delay = delay;
297 self
298 }
299
300 pub fn emit_orphaned_spans(mut self, emit: bool) -> Self {
302 self.config.correlation.emit_orphaned_spans = emit;
303 self
304 }
305
306 pub fn grpc_receiver(mut self, enabled: bool) -> Self {
308 self.config.receiver.grpc_enabled = enabled;
309 self
310 }
311
312 pub fn http_receiver(mut self, enabled: bool) -> Self {
314 self.config.receiver.http_enabled = enabled;
315 self
316 }
317
318 pub fn grpc_port(mut self, port: u16) -> Self {
320 self.config.receiver.grpc_port = port;
321 self
322 }
323
324 pub fn http_port(mut self, port: u16) -> Self {
326 self.config.receiver.http_port = port;
327 self
328 }
329
330 pub fn telemetry_api(mut self, enabled: bool) -> Self {
332 self.config.telemetry_api.enabled = enabled;
333 self
334 }
335
336 pub fn build(self) -> Config {
338 self.config
339 }
340}
341
342impl Default for ConfigBuilder {
343 fn default() -> Self {
344 Self::new()
345 }
346}
347
348mod duration_ms {
349 use serde::{Deserialize, Deserializer, Serializer};
350 use std::time::Duration;
351
352 pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
353 where
354 S: Serializer,
355 {
356 serializer.serialize_u64(duration.as_millis() as u64)
357 }
358
359 pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
360 where
361 D: Deserializer<'de>,
362 {
363 let ms = u64::deserialize(deserializer)?;
364 Ok(Duration::from_millis(ms))
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use std::io::Write;
372 use tempfile::NamedTempFile;
373
374 #[test]
375 fn test_default_config() {
376 let config = Config::default();
377
378 assert!(config.exporter.endpoint.is_none());
379 assert_eq!(config.exporter.protocol, Protocol::Http);
380 assert_eq!(config.exporter.timeout, Duration::from_millis(500));
381 assert_eq!(config.exporter.compression, Compression::Gzip);
382
383 assert_eq!(config.receiver.grpc_port, 4317);
384 assert_eq!(config.receiver.http_port, 4318);
385 assert!(config.receiver.grpc_enabled);
386 assert!(config.receiver.http_enabled);
387
388 assert_eq!(config.flush.strategy, FlushStrategy::Default);
389 assert_eq!(config.flush.interval, Duration::from_secs(20));
390
391 assert_eq!(
392 config.correlation.max_correlation_delay,
393 Duration::from_millis(500)
394 );
395 assert!(config.correlation.emit_orphaned_spans);
396
397 assert!(config.telemetry_api.enabled);
398 }
399
400 #[test]
401 fn test_config_builder() {
402 let config = Config::builder()
403 .exporter_endpoint("https://collector:4318")
404 .exporter_protocol(Protocol::Grpc)
405 .exporter_timeout(Duration::from_millis(1000))
406 .flush_strategy(FlushStrategy::Continuous)
407 .flush_interval(Duration::from_secs(10))
408 .correlation_delay(Duration::from_millis(200))
409 .emit_orphaned_spans(false)
410 .grpc_receiver(false)
411 .http_receiver(true)
412 .grpc_port(5317)
413 .http_port(5318)
414 .telemetry_api(false)
415 .build();
416
417 assert_eq!(
418 config.exporter.endpoint,
419 Some("https://collector:4318".to_string())
420 );
421 assert_eq!(config.exporter.protocol, Protocol::Grpc);
422 assert_eq!(config.exporter.timeout, Duration::from_millis(1000));
423 assert_eq!(config.flush.strategy, FlushStrategy::Continuous);
424 assert_eq!(config.flush.interval, Duration::from_secs(10));
425 assert_eq!(
426 config.correlation.max_correlation_delay,
427 Duration::from_millis(200)
428 );
429 assert!(!config.correlation.emit_orphaned_spans);
430 assert!(!config.receiver.grpc_enabled);
431 assert!(config.receiver.http_enabled);
432 assert_eq!(config.receiver.grpc_port, 5317);
433 assert_eq!(config.receiver.http_port, 5318);
434 assert!(!config.telemetry_api.enabled);
435 }
436
437 #[test]
438 fn test_load_from_toml() {
439 let toml_content = r#"
440[exporter]
441endpoint = "https://test-collector:4318"
442protocol = "grpc"
443timeout = 1000
444
445[receiver]
446grpc_port = 5317
447http_port = 5318
448grpc_enabled = false
449
450[flush]
451strategy = "periodic"
452interval = 15000
453
454[correlation]
455max_correlation_delay = 300
456emit_orphaned_spans = false
457"#;
458
459 let mut temp_file = NamedTempFile::new().unwrap();
460 temp_file.write_all(toml_content.as_bytes()).unwrap();
461
462 let config = Config::load_from_path(temp_file.path()).unwrap();
463
464 assert_eq!(
465 config.exporter.endpoint,
466 Some("https://test-collector:4318".to_string())
467 );
468 assert_eq!(config.exporter.protocol, Protocol::Grpc);
469 assert_eq!(config.exporter.timeout, Duration::from_millis(1000));
470 assert_eq!(config.receiver.grpc_port, 5317);
471 assert_eq!(config.receiver.http_port, 5318);
472 assert!(!config.receiver.grpc_enabled);
473 assert_eq!(config.flush.strategy, FlushStrategy::Periodic);
474 assert_eq!(config.flush.interval, Duration::from_secs(15));
475 assert_eq!(
476 config.correlation.max_correlation_delay,
477 Duration::from_millis(300)
478 );
479 assert!(!config.correlation.emit_orphaned_spans);
480 }
481
482 #[test]
483 fn test_load_nonexistent_file_uses_defaults() {
484 let config = Config::load_from_path("/nonexistent/path/config.toml").unwrap();
485
486 assert!(config.exporter.endpoint.is_none());
487 assert_eq!(config.receiver.grpc_port, 4317);
488 }
489
490 #[test]
491 fn test_protocol_serialization() {
492 assert_eq!(serde_json::to_string(&Protocol::Grpc).unwrap(), "\"grpc\"");
493 assert_eq!(serde_json::to_string(&Protocol::Http).unwrap(), "\"http\"");
494 }
495
496 #[test]
497 fn test_compression_serialization() {
498 assert_eq!(
499 serde_json::to_string(&Compression::None).unwrap(),
500 "\"none\""
501 );
502 assert_eq!(
503 serde_json::to_string(&Compression::Gzip).unwrap(),
504 "\"gzip\""
505 );
506 }
507
508 #[test]
509 fn test_flush_strategy_serialization() {
510 assert_eq!(
511 serde_json::to_string(&FlushStrategy::Default).unwrap(),
512 "\"default\""
513 );
514 assert_eq!(
515 serde_json::to_string(&FlushStrategy::End).unwrap(),
516 "\"end\""
517 );
518 assert_eq!(
519 serde_json::to_string(&FlushStrategy::Periodic).unwrap(),
520 "\"periodic\""
521 );
522 assert_eq!(
523 serde_json::to_string(&FlushStrategy::Continuous).unwrap(),
524 "\"continuous\""
525 );
526 }
527}