lambda_otel_lite/
processor.rs

1//! Span processor implementation optimized for AWS Lambda functions.
2//!
3//! This module provides a Lambda-optimized span processor that efficiently manages OpenTelemetry spans
4//! in a serverless environment. It uses a ring buffer to store spans in memory and provides efficient
5//! batch processing capabilities.
6//!
7//! # Architecture
8//!
9//! The processor is designed specifically for the Lambda execution environment:
10//!
11//! 1. **Ring Buffer Storage**:
12//!    - Fixed-size circular buffer prevents memory growth
13//!    - O(1) push operations with no memory reallocation
14//!    - FIFO ordering ensures spans are processed in order
15//!    - Efficient batch removal for export
16//!    - When full, new spans are dropped (with warning logs)
17//!
18//! 2. **Thread Safety**:
19//!    - All operations are thread-safe
20//!    - Uses Mutex for span buffer access
21//!    - Atomic operations for state management
22//!    - Safe for concurrent span submission
23//!
24//! # Configuration
25//!
26//! The processor can be configured through environment variables:
27//!
28//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls buffer size
29//!   - Defaults to 2048 spans
30//!   - Should be tuned based on span volume
31//!
32//! # Usage Examples
33//!
34//! Basic setup with default configuration:
35//!
36//! ```no_run
37//! use lambda_otel_lite::LambdaSpanProcessor;
38//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
39//!
40//! let processor = LambdaSpanProcessor::builder()
41//!     .exporter(OtlpStdoutSpanExporter::default())
42//!     .build();
43//! ```
44//!
45//! Using with an OTLP HTTP exporter:
46//!
47//! ```no_run
48//! use lambda_otel_lite::LambdaSpanProcessor;
49//! use opentelemetry_otlp::{SpanExporter, Protocol};
50//! use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
51//!
52//! // Important: When using HTTP exporters, always use reqwest::blocking::Client
53//! // Using async clients will cause deadlocks
54//! let exporter = SpanExporter::builder()
55//!     .with_http()
56//!     .with_http_client(reqwest::blocking::Client::new())
57//!     .with_protocol(Protocol::HttpBinary)
58//!     .build()
59//!     .expect("Failed to create exporter");
60//!
61//! let processor = LambdaSpanProcessor::builder()
62//!     .exporter(exporter)
63//!     .max_queue_size(4096)
64//!     .build();
65//! ```
66//!
67//! # Performance Considerations
68//!
69//! 1. **Memory Usage**:
70//!    - Fixed memory footprint based on queue size
71//!    - Each span typically uses 100-500 bytes
72//!    - Default 2048 spans ≈ 0.5-1MB memory
73//!
74//! 2. **Latency Impact**:
75//!    - Batch processing reduces network overhead
76//!    - Configurable batch size allows tuning for your use case
77//!    - Force flush available for immediate export when needed
78//!
79//! 3. **Reliability**:
80//!    - Spans may be dropped if buffer fills
81//!    - Warning logs indicate dropped spans
82//!    - Consider increasing buffer size if spans are dropped
83//!
84//! # Best Practices
85//!
86//! 1. **Buffer Sizing**:
87//!    - Monitor dropped_spans metric
88//!    - Size based on max spans per invocation
89//!    - Consider function memory when sizing
90//!
91//! 2. **Batch Configuration**:
92//!    - Larger batches improve throughput but increase memory usage
93//!    - Smaller batches reduce memory but increase network overhead
94//!    - Default values work well for most use cases
95//!
96//! 3. **Error Handling**:
97//!    - Export errors are logged but don't fail function
98//!    - Monitor for export failures in logs
99//!    - Consider retry strategies in custom exporters
100
101use crate::constants::{defaults, env_vars};
102use crate::logger::Logger;
103use bon::bon;
104
105/// Module-specific logger
106static LOGGER: Logger = Logger::const_new("processor");
107
108use opentelemetry::Context;
109use opentelemetry_sdk::{
110    error::{OTelSdkError, OTelSdkResult},
111    trace::{Span, SpanProcessor},
112    trace::{SpanData, SpanExporter},
113    Resource,
114};
115use std::env;
116use std::sync::{
117    atomic::{AtomicBool, AtomicUsize, Ordering},
118    Arc, Mutex,
119};
120
121/// A fixed-size ring buffer for storing spans efficiently.
122///
123/// This implementation provides a memory-efficient way to store spans with
124/// predictable performance characteristics:
125///
126/// # Performance Characteristics
127///
128/// - Push Operation: O(1)
129/// - Memory Usage: Fixed based on capacity
130/// - Order: FIFO (First In, First Out)
131/// - Batch Operations: Efficient removal of all spans
132///
133/// # Implementation Details
134///
135/// The buffer uses a circular array with head and tail pointers:
136/// - `head`: Points to next write position
137/// - `tail`: Points to next read position
138/// - `size`: Current number of elements
139/// - `capacity`: Maximum number of elements
140///
141/// When the buffer is full, new spans are rejected rather than overwriting old ones.
142/// This ensures no data loss occurs silently.
143#[derive(Debug)]
144struct SpanRingBuffer {
145    buffer: Vec<Option<SpanData>>,
146    head: usize, // Where to write next
147    tail: usize, // Where to read next
148    size: usize, // Current number of elements
149    capacity: usize,
150}
151
152impl Default for SpanRingBuffer {
153    fn default() -> Self {
154        Self::new(2048) // Default capacity
155    }
156}
157
158impl SpanRingBuffer {
159    fn new(capacity: usize) -> Self {
160        let mut buffer = Vec::with_capacity(capacity);
161        buffer.extend((0..capacity).map(|_| None));
162        Self {
163            buffer,
164            head: 0,
165            tail: 0,
166            size: 0,
167            capacity,
168        }
169    }
170
171    fn push(&mut self, span: SpanData) -> bool {
172        if self.size == self.capacity {
173            return false;
174        }
175
176        self.buffer[self.head] = Some(span);
177        self.head = (self.head + 1) % self.capacity;
178        self.size += 1;
179        true
180    }
181
182    fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
183        let batch_size = self.size.min(max_batch_size);
184        let mut result = Vec::with_capacity(batch_size);
185
186        for _ in 0..batch_size {
187            if let Some(span) = self.buffer[self.tail].take() {
188                result.push(span);
189            }
190            self.tail = (self.tail + 1) % self.capacity;
191            self.size -= 1;
192        }
193
194        if self.size == 0 {
195            self.head = 0;
196            self.tail = 0;
197        }
198
199        result
200    }
201}
202
203/// A span processor optimized for AWS Lambda functions.
204///
205/// This processor efficiently manages spans in a Lambda environment:
206/// - Uses a fixed-size ring buffer to prevent memory growth
207/// - Supports synchronous and asynchronous export modes
208/// - Handles graceful shutdown for Lambda termination
209/// - Exports *all* buffered spans in a single batch when `force_flush` is called.
210///
211/// # Examples
212///
213/// ```
214/// use lambda_otel_lite::LambdaSpanProcessor;
215/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
216///
217/// let processor = LambdaSpanProcessor::builder()
218///     .exporter(OtlpStdoutSpanExporter::default())
219///     .build();
220/// ```
221///
222/// With custom configuration:
223///
224/// ```
225/// use lambda_otel_lite::LambdaSpanProcessor;
226/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
227///
228/// let processor = LambdaSpanProcessor::builder()
229///     .exporter(OtlpStdoutSpanExporter::default())
230///     .max_queue_size(1000)
231///     .build();
232/// ```
233#[derive(Debug)]
234pub struct LambdaSpanProcessor<E>
235where
236    E: SpanExporter + std::fmt::Debug,
237{
238    /// The exporter used to export spans
239    exporter: Mutex<E>,
240
241    /// Internal buffer for storing spans
242    spans: Mutex<SpanRingBuffer>,
243
244    /// Flag indicating whether the processor is shut down
245    is_shutdown: Arc<AtomicBool>,
246
247    /// Counter for dropped spans
248    dropped_count: AtomicUsize,
249}
250
251#[bon]
252impl<E> LambdaSpanProcessor<E>
253where
254    E: SpanExporter + std::fmt::Debug,
255{
256    /// Creates a new LambdaSpanProcessor with the given exporter and configuration
257    ///
258    /// # Environment Variable Precedence
259    ///
260    /// Configuration values follow this precedence order:
261    /// 1. Environment variables (highest precedence)
262    /// 2. Constructor parameters
263    /// 3. Default values (lowest precedence)
264    ///
265    /// The relevant environment variables are:
266    /// - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls the maximum queue size
267    #[builder]
268    pub fn new(exporter: E, max_queue_size: Option<usize>) -> Self {
269        // Get queue size with proper precedence (env var > param > default)
270        let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
271            Ok(value) => match value.parse::<usize>() {
272                Ok(size) => size,
273                Err(_) => {
274                    LOGGER.warn(format!(
275                        "Failed to parse {}: {}, using fallback",
276                        env_vars::QUEUE_SIZE,
277                        value
278                    ));
279                    max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
280                }
281            },
282            Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
283        };
284
285        Self {
286            exporter: Mutex::new(exporter),
287            spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
288            is_shutdown: Arc::new(AtomicBool::new(false)),
289            dropped_count: AtomicUsize::new(0),
290        }
291    }
292}
293
294impl<E> SpanProcessor for LambdaSpanProcessor<E>
295where
296    E: SpanExporter + std::fmt::Debug,
297{
298    fn on_start(&self, _span: &mut Span, _cx: &Context) {
299        // No-op, as we only process spans on end
300    }
301
302    fn on_end(&self, span: SpanData) {
303        if self.is_shutdown.load(Ordering::Relaxed) {
304            LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
305            self.dropped_count.fetch_add(1, Ordering::Relaxed);
306            return;
307        }
308
309        // Skip unsampled spans
310        if !span.span_context.is_sampled() {
311            return;
312        }
313
314        // Try to add span to the buffer
315        if let Ok(mut spans) = self.spans.lock() {
316            if !spans.push(span) {
317                let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
318                if prev == 0 || prev % 100 == 0 {
319                    LOGGER.warn(format!(
320                        "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
321                        prev + 1
322                    ));
323                }
324            }
325        } else {
326            LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
327        }
328    }
329
330    fn force_flush(&self) -> OTelSdkResult {
331        LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
332
333        // Acquire lock on the span buffer
334        let spans_result = self.spans.lock();
335        let all_spans = match spans_result {
336            Ok(mut spans) => {
337                // Determine the number of spans currently in the buffer
338                let current_size = spans.size;
339                // Drain all spans from the buffer
340                spans.take_batch(current_size)
341            }
342            Err(_) => {
343                // If locking the span buffer fails, return an internal error
344                return Err(OTelSdkError::InternalFailure(
345                    "Failed to acquire spans lock in force_flush".to_string(),
346                ));
347            }
348        };
349        // Mutex guard for spans is dropped here, releasing the lock
350
351        // Acquire lock on the exporter
352        let exporter_result = self.exporter.lock();
353        match exporter_result {
354            Ok(exporter) => {
355                // Export all drained spans in a single batch
356                // This handles both empty (all_spans is empty Vec) and non-empty cases
357                let result = futures_executor::block_on(exporter.export(all_spans));
358
359                // Log error if export failed
360                if let Err(ref err) = result {
361                    LOGGER.debug(format!(
362                        "LambdaSpanProcessor.force_flush export error: {:?}",
363                        err
364                    ));
365                }
366
367                // Return the result of the export operation
368                result
369            }
370            Err(_) => {
371                // If locking the exporter fails, return an internal error
372                Err(OTelSdkError::InternalFailure(
373                    "Failed to acquire exporter lock in force_flush".to_string(),
374                ))
375            }
376        }
377        // Mutex guard for exporter is dropped here, releasing the lock
378    }
379
380    fn shutdown(&self) -> OTelSdkResult {
381        self.is_shutdown.store(true, Ordering::Relaxed);
382        // Flush any remaining spans
383        self.force_flush()?;
384        if let Ok(mut exporter) = self.exporter.lock() {
385            exporter.shutdown()
386        } else {
387            Err(OTelSdkError::InternalFailure(
388                "Failed to acquire exporter lock in shutdown".to_string(),
389            ))
390        }
391    }
392
393    fn set_resource(&mut self, resource: &Resource) {
394        if let Ok(mut exporter) = self.exporter.lock() {
395            exporter.set_resource(resource);
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use crate::logger::Logger;
404    use opentelemetry::{
405        trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
406        InstrumentationScope,
407    };
408    use opentelemetry_sdk::{
409        trace::SpanExporter,
410        trace::{SpanEvents, SpanLinks},
411    };
412    use serial_test::serial;
413    use std::{borrow::Cow, sync::Arc};
414    use tokio::sync::Mutex;
415
416    fn setup_test_logger() -> Logger {
417        Logger::new("test")
418    }
419
420    // Mock exporter that captures exported spans
421    #[derive(Debug)]
422    struct MockExporter {
423        spans: Arc<Mutex<Vec<SpanData>>>,
424    }
425
426    impl MockExporter {
427        fn new() -> Self {
428            Self {
429                spans: Arc::new(Mutex::new(Vec::new())),
430            }
431        }
432    }
433
434    impl SpanExporter for MockExporter {
435        fn export(
436            &self,
437            batch: Vec<SpanData>,
438        ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
439        {
440            let spans = self.spans.clone();
441            Box::pin(async move {
442                let mut spans = spans.lock().await;
443                spans.extend(batch);
444                Ok(())
445            })
446        }
447
448        fn shutdown(&mut self) -> OTelSdkResult {
449            Ok(())
450        }
451    }
452
453    // Helper function to create a test span
454    fn create_test_span(name: &str) -> SpanData {
455        let flags = TraceFlags::default().with_sampled(true);
456
457        SpanData {
458            span_context: SpanContext::new(
459                TraceId::from_hex("01000000000000000000000000000000").unwrap(),
460                SpanId::from_hex("0100000000000001").unwrap(),
461                flags,
462                false,
463                TraceState::default(),
464            ),
465            parent_span_id: SpanId::INVALID,
466            span_kind: opentelemetry::trace::SpanKind::Internal,
467            name: Cow::Owned(name.to_string()),
468            start_time: std::time::SystemTime::now(),
469            end_time: std::time::SystemTime::now(),
470            attributes: Vec::new(),
471            dropped_attributes_count: 0,
472            events: SpanEvents::default(),
473            links: SpanLinks::default(),
474            status: opentelemetry::trace::Status::default(),
475            instrumentation_scope: InstrumentationScope::builder("test").build(),
476        }
477    }
478
479    fn cleanup_env() {
480        env::remove_var(env_vars::QUEUE_SIZE);
481        env::remove_var(env_vars::PROCESSOR_MODE);
482        env::remove_var(env_vars::COMPRESSION_LEVEL);
483        env::remove_var(env_vars::SERVICE_NAME);
484    }
485
486    #[test]
487    #[serial]
488    fn test_ring_buffer_basic_operations() {
489        let mut buffer = SpanRingBuffer::new(2);
490
491        // Test empty buffer
492        assert!(buffer.size == 0);
493        assert_eq!(buffer.take_batch(2), vec![]);
494
495        // Test adding spans
496        buffer.push(create_test_span("span1"));
497        buffer.push(create_test_span("span2"));
498
499        assert!(buffer.size != 0);
500
501        // Test taking spans
502        let spans = buffer.take_batch(2);
503        assert_eq!(spans.len(), 2);
504        assert!(buffer.size == 0);
505    }
506
507    #[test]
508    #[serial]
509    fn test_ring_buffer_overflow() {
510        let mut buffer = SpanRingBuffer::new(2);
511
512        // Fill buffer
513        buffer.push(create_test_span("span1"));
514        buffer.push(create_test_span("span2"));
515
516        // Add one more span, should overwrite the oldest
517        let success = buffer.push(create_test_span("span3"));
518        assert!(!success); // Should fail since buffer is full
519
520        let spans = buffer.take_batch(2);
521        assert_eq!(spans.len(), 2);
522        assert!(spans.iter().any(|s| s.name == "span1"));
523        assert!(spans.iter().any(|s| s.name == "span2"));
524    }
525
526    #[test]
527    #[serial]
528    fn test_ring_buffer_batch_operations() {
529        let mut buffer = SpanRingBuffer::new(5);
530
531        // Add 5 spans
532        for i in 0..5 {
533            buffer.push(create_test_span(&format!("span{}", i)));
534        }
535
536        assert_eq!(buffer.take_batch(2).len(), 2);
537        assert_eq!(buffer.take_batch(2).len(), 2);
538        assert_eq!(buffer.take_batch(2).len(), 1);
539        assert!(buffer.size == 0);
540    }
541
542    #[tokio::test]
543    #[serial]
544    async fn test_processor_sync_mode() {
545        let _logger = setup_test_logger();
546        let mock_exporter = MockExporter::new();
547        let spans_exported = mock_exporter.spans.clone();
548
549        let processor = LambdaSpanProcessor::builder()
550            .exporter(mock_exporter)
551            .max_queue_size(10)
552            .build();
553
554        // Test span processing
555        processor.on_end(create_test_span("test_span"));
556
557        // Force flush to ensure export
558        processor.force_flush().unwrap();
559
560        // Verify span was exported
561        let exported = spans_exported.lock().await;
562        assert_eq!(exported.len(), 1);
563        assert_eq!(exported[0].name, "test_span");
564    }
565
566    #[tokio::test]
567    #[serial]
568    async fn test_shutdown_exports_remaining_spans() {
569        let _logger = setup_test_logger();
570        let mock_exporter = MockExporter::new();
571        let spans_exported = mock_exporter.spans.clone();
572
573        let processor = LambdaSpanProcessor::builder()
574            .exporter(mock_exporter)
575            .max_queue_size(10)
576            .build();
577
578        // Add some spans
579        processor.on_end(create_test_span("span1"));
580        processor.on_end(create_test_span("span2"));
581
582        // Shutdown should export all spans
583        processor.shutdown().unwrap();
584
585        // Verify all spans were exported
586        let exported = spans_exported.lock().await;
587        assert_eq!(exported.len(), 2);
588
589        // Verify new spans are dropped after shutdown
590        processor.on_end(create_test_span("span3"));
591        assert_eq!(exported.len(), 2); // No new spans after shutdown
592    }
593
594    #[tokio::test]
595    #[serial]
596    async fn test_concurrent_span_processing() {
597        let _logger = setup_test_logger();
598        let mock_exporter = MockExporter::new();
599        let spans_exported = mock_exporter.spans.clone();
600
601        let processor = Arc::new(
602            LambdaSpanProcessor::builder()
603                .exporter(mock_exporter)
604                .max_queue_size(100)
605                .build(),
606        );
607
608        let mut handles = Vec::new();
609
610        // Spawn 10 tasks, each adding 10 spans
611        for i in 0..10 {
612            let processor = processor.clone();
613            handles.push(tokio::spawn(async move {
614                for j in 0..10 {
615                    processor.on_end(create_test_span(&format!("span_{}_{}", i, j)));
616                }
617            }));
618        }
619
620        // Wait for all tasks to complete
621        for handle in handles {
622            handle.await.unwrap();
623        }
624
625        // Force flush and verify all spans were processed
626        processor.force_flush().unwrap();
627
628        let exported = spans_exported.lock().await;
629        assert_eq!(exported.len(), 100);
630        assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
631    }
632
633    #[test]
634    #[serial]
635    fn test_builder_default_values() {
636        cleanup_env();
637
638        let mock_exporter = MockExporter::new();
639
640        let processor = LambdaSpanProcessor::builder()
641            .exporter(mock_exporter)
642            .build();
643
644        // Check default values
645        assert_eq!(processor.spans.lock().unwrap().capacity, 2048); // Default queue size
646    }
647
648    #[test]
649    #[serial]
650    fn test_builder_env_var_values() {
651        cleanup_env();
652
653        let mock_exporter = MockExporter::new();
654
655        // Set custom values via env vars
656        env::set_var(env_vars::QUEUE_SIZE, "1000");
657
658        let processor = LambdaSpanProcessor::builder()
659            .exporter(mock_exporter)
660            .build();
661
662        // Check that env var values were used
663        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
664
665        cleanup_env();
666    }
667
668    #[test]
669    #[serial]
670    fn test_builder_env_var_precedence() {
671        cleanup_env();
672
673        let mock_exporter = MockExporter::new();
674
675        // Set custom values via env vars
676        env::set_var(env_vars::QUEUE_SIZE, "1000");
677
678        // Create with explicit values (should be overridden by env vars)
679        let processor = LambdaSpanProcessor::builder()
680            .exporter(mock_exporter)
681            .max_queue_size(500)
682            .build();
683
684        // Check that env var values took precedence
685        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
686
687        cleanup_env();
688    }
689
690    #[test]
691    #[serial]
692    fn test_invalid_env_vars() {
693        cleanup_env();
694
695        let mock_exporter = MockExporter::new();
696
697        // Set invalid values via env vars
698        env::set_var(env_vars::QUEUE_SIZE, "invalid");
699
700        // Create with explicit values (should be used as fallbacks)
701        let processor = LambdaSpanProcessor::builder()
702            .exporter(mock_exporter)
703            .max_queue_size(500)
704            .build();
705
706        // Check that fallback values were used
707        assert_eq!(processor.spans.lock().unwrap().capacity, 500);
708
709        cleanup_env();
710    }
711}