lambda_otel_lite/
processor.rs

1//! Span processor implementation optimized for AWS Lambda functions.
2//!
3//! This module provides a Lambda-optimized span processor that efficiently manages OpenTelemetry spans
4//! in a serverless environment. It uses a ring buffer to store spans in memory and provides efficient
5//! batch processing capabilities.
6//!
7//! # Architecture
8//!
9//! The processor is designed specifically for the Lambda execution environment:
10//!
11//! 1. **Ring Buffer Storage**:
12//!    - Fixed-size circular buffer prevents memory growth
13//!    - O(1) push operations with no memory reallocation
14//!    - FIFO ordering ensures spans are processed in order
15//!    - Efficient batch removal for export
16//!    - When full, new spans are dropped (with warning logs)
17//!
18//! 2. **Thread Safety**:
19//!    - All operations are thread-safe
20//!    - Uses Mutex for span buffer access
21//!    - Atomic operations for state management
22//!    - Safe for concurrent span submission
23//!
24//! # Configuration
25//!
26//! The processor can be configured through environment variables:
27//!
28//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls buffer size
29//!   - Defaults to 2048 spans
30//!   - Should be tuned based on span volume
31//!
32//! # Usage Examples
33//!
34//! Basic setup with default configuration:
35//!
36//! ```no_run
37//! use lambda_otel_lite::LambdaSpanProcessor;
38//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
39//!
40//! let processor = LambdaSpanProcessor::builder()
41//!     .exporter(OtlpStdoutSpanExporter::default())
42//!     .build();
43//! ```
44//!
45//! Using with an OTLP HTTP exporter:
46//!
47//! ```no_run
48//! use lambda_otel_lite::LambdaSpanProcessor;
49//! use opentelemetry_otlp::{SpanExporter, Protocol};
50//! use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
51//!
52//! // Important: When using HTTP exporters, always use reqwest::blocking::Client
53//! // Using async clients will cause deadlocks
54//! let exporter = SpanExporter::builder()
55//!     .with_http()
56//!     .with_http_client(reqwest::blocking::Client::new())
57//!     .with_protocol(Protocol::HttpBinary)
58//!     .build()
59//!     .expect("Failed to create exporter");
60//!
61//! let processor = LambdaSpanProcessor::builder()
62//!     .exporter(exporter)
63//!     .max_queue_size(4096)
64//!     .build();
65//! ```
66//!
67//! # Performance Considerations
68//!
69//! 1. **Memory Usage**:
70//!    - Fixed memory footprint based on queue size
71//!    - Each span typically uses 100-500 bytes
72//!    - Default 2048 spans ≈ 0.5-1MB memory
73//!
74//! 2. **Latency Impact**:
75//!    - Batch processing reduces network overhead
76//!    - Configurable batch size allows tuning for your use case
77//!    - Force flush available for immediate export when needed
78//!
79//! 3. **Reliability**:
80//!    - Spans may be dropped if buffer fills
81//!    - Warning logs indicate dropped spans
82//!    - Consider increasing buffer size if spans are dropped
83//!
84//! # Best Practices
85//!
86//! 1. **Buffer Sizing**:
87//!    - Monitor dropped_spans metric
88//!    - Size based on max spans per invocation
89//!    - Consider function memory when sizing
90//!
91//! 2. **Batch Configuration**:
92//!    - Larger batches improve throughput but increase memory usage
93//!    - Smaller batches reduce memory but increase network overhead
94//!    - Default values work well for most use cases
95//!
96//! 3. **Error Handling**:
97//!    - Export errors are logged but don't fail function
98//!    - Monitor for export failures in logs
99//!    - Consider retry strategies in custom exporters
100
101use crate::constants::{defaults, env_vars};
102use crate::logger::Logger;
103use bon::bon;
104
105/// Module-specific logger
106static LOGGER: Logger = Logger::const_new("processor");
107
108use opentelemetry::Context;
109use opentelemetry_sdk::{
110    error::{OTelSdkError, OTelSdkResult},
111    trace::{Span, SpanProcessor},
112    trace::{SpanData, SpanExporter},
113    Resource,
114};
115use std::env;
116use std::sync::{
117    atomic::{AtomicBool, AtomicUsize, Ordering},
118    Arc, Mutex,
119};
120
121/// A fixed-size ring buffer for storing spans efficiently.
122///
123/// This implementation provides a memory-efficient way to store spans with
124/// predictable performance characteristics:
125///
126/// # Performance Characteristics
127///
128/// - Push Operation: O(1)
129/// - Memory Usage: Fixed based on capacity
130/// - Order: FIFO (First In, First Out)
131/// - Batch Operations: Efficient removal of all spans
132///
133/// # Implementation Details
134///
135/// The buffer uses a circular array with head and tail pointers:
136/// - `head`: Points to next write position
137/// - `tail`: Points to next read position
138/// - `size`: Current number of elements
139/// - `capacity`: Maximum number of elements
140///
141/// When the buffer is full, new spans are rejected rather than overwriting old ones.
142/// This ensures no data loss occurs silently.
143#[derive(Debug)]
144struct SpanRingBuffer {
145    buffer: Vec<Option<SpanData>>,
146    head: usize, // Where to write next
147    tail: usize, // Where to read next
148    size: usize, // Current number of elements
149    capacity: usize,
150}
151
152impl Default for SpanRingBuffer {
153    fn default() -> Self {
154        Self::new(2048) // Default capacity
155    }
156}
157
158impl SpanRingBuffer {
159    fn new(capacity: usize) -> Self {
160        let mut buffer = Vec::with_capacity(capacity);
161        buffer.extend((0..capacity).map(|_| None));
162        Self {
163            buffer,
164            head: 0,
165            tail: 0,
166            size: 0,
167            capacity,
168        }
169    }
170
171    fn push(&mut self, span: SpanData) -> bool {
172        if self.size == self.capacity {
173            return false;
174        }
175
176        self.buffer[self.head] = Some(span);
177        self.head = (self.head + 1) % self.capacity;
178        self.size += 1;
179        true
180    }
181
182    fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
183        let batch_size = self.size.min(max_batch_size);
184        let mut result = Vec::with_capacity(batch_size);
185
186        for _ in 0..batch_size {
187            if let Some(span) = self.buffer[self.tail].take() {
188                result.push(span);
189            }
190            self.tail = (self.tail + 1) % self.capacity;
191            self.size -= 1;
192        }
193
194        if self.size == 0 {
195            self.head = 0;
196            self.tail = 0;
197        }
198
199        result
200    }
201}
202
203/// A span processor optimized for AWS Lambda functions.
204///
205/// This processor efficiently manages spans in a Lambda environment:
206/// - Uses a fixed-size ring buffer to prevent memory growth
207/// - Supports synchronous and asynchronous export modes
208/// - Handles graceful shutdown for Lambda termination
209/// - Exports *all* buffered spans in a single batch when `force_flush` is called.
210///
211/// # Examples
212///
213/// ```
214/// use lambda_otel_lite::LambdaSpanProcessor;
215/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
216///
217/// let processor = LambdaSpanProcessor::builder()
218///     .exporter(OtlpStdoutSpanExporter::default())
219///     .build();
220/// ```
221///
222/// With custom configuration:
223///
224/// ```
225/// use lambda_otel_lite::LambdaSpanProcessor;
226/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
227///
228/// let processor = LambdaSpanProcessor::builder()
229///     .exporter(OtlpStdoutSpanExporter::default())
230///     .max_queue_size(1000)
231///     .build();
232/// ```
233#[derive(Debug)]
234pub struct LambdaSpanProcessor<E>
235where
236    E: SpanExporter + std::fmt::Debug,
237{
238    /// The exporter used to export spans
239    exporter: Mutex<E>,
240
241    /// Internal buffer for storing spans
242    spans: Mutex<SpanRingBuffer>,
243
244    /// Flag indicating whether the processor is shut down
245    is_shutdown: Arc<AtomicBool>,
246
247    /// Counter for dropped spans
248    dropped_count: AtomicUsize,
249}
250
251#[bon]
252impl<E> LambdaSpanProcessor<E>
253where
254    E: SpanExporter + std::fmt::Debug,
255{
256    /// Creates a new LambdaSpanProcessor with the given exporter and configuration
257    ///
258    /// # Environment Variable Precedence
259    ///
260    /// Configuration values follow this precedence order:
261    /// 1. Environment variables (highest precedence)
262    /// 2. Constructor parameters
263    /// 3. Default values (lowest precedence)
264    ///
265    /// The relevant environment variables are:
266    /// - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls the maximum queue size
267    #[builder]
268    pub fn new(exporter: E, max_queue_size: Option<usize>) -> Self {
269        // Get queue size with proper precedence (env var > param > default)
270        let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
271            Ok(value) => match value.parse::<usize>() {
272                Ok(size) => size,
273                Err(_) => {
274                    LOGGER.warn(format!(
275                        "Failed to parse {}: {}, using fallback",
276                        env_vars::QUEUE_SIZE,
277                        value
278                    ));
279                    max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
280                }
281            },
282            Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
283        };
284
285        Self {
286            exporter: Mutex::new(exporter),
287            spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
288            is_shutdown: Arc::new(AtomicBool::new(false)),
289            dropped_count: AtomicUsize::new(0),
290        }
291    }
292}
293
294impl<E> SpanProcessor for LambdaSpanProcessor<E>
295where
296    E: SpanExporter + std::fmt::Debug,
297{
298    fn on_start(&self, _span: &mut Span, _cx: &Context) {
299        // No-op, as we only process spans on end
300    }
301
302    fn on_end(&self, span: SpanData) {
303        if self.is_shutdown.load(Ordering::Relaxed) {
304            LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
305            self.dropped_count.fetch_add(1, Ordering::Relaxed);
306            return;
307        }
308
309        // Skip unsampled spans
310        if !span.span_context.is_sampled() {
311            return;
312        }
313
314        // Try to add span to the buffer
315        if let Ok(mut spans) = self.spans.lock() {
316            if !spans.push(span) {
317                let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
318                if prev == 0 || prev % 100 == 0 {
319                    LOGGER.warn(format!(
320                        "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
321                        prev + 1
322                    ));
323                }
324            }
325        } else {
326            LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
327        }
328    }
329
330    fn force_flush(&self) -> OTelSdkResult {
331        LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
332
333        // Acquire lock on the span buffer
334        let spans_result = self.spans.lock();
335        let all_spans = match spans_result {
336            Ok(mut spans) => {
337                // Determine the number of spans currently in the buffer
338                let current_size = spans.size;
339                // Drain all spans from the buffer
340                spans.take_batch(current_size)
341            }
342            Err(_) => {
343                // If locking the span buffer fails, return an internal error
344                return Err(OTelSdkError::InternalFailure(
345                    "Failed to acquire spans lock in force_flush".to_string(),
346                ));
347            }
348        };
349        // Mutex guard for spans is dropped here, releasing the lock
350
351        // Acquire lock on the exporter
352        let exporter_result = self.exporter.lock();
353        match exporter_result {
354            Ok(exporter) => {
355                // Export all drained spans in a single batch
356                // This handles both empty (all_spans is empty Vec) and non-empty cases
357                let result = futures_executor::block_on(exporter.export(all_spans));
358
359                // Log error if export failed
360                if let Err(ref err) = result {
361                    LOGGER.debug(format!(
362                        "LambdaSpanProcessor.force_flush export error: {err:?}"
363                    ));
364                }
365
366                // Return the result of the export operation
367                result
368            }
369            Err(_) => {
370                // If locking the exporter fails, return an internal error
371                Err(OTelSdkError::InternalFailure(
372                    "Failed to acquire exporter lock in force_flush".to_string(),
373                ))
374            }
375        }
376        // Mutex guard for exporter is dropped here, releasing the lock
377    }
378
379    fn shutdown(&self) -> OTelSdkResult {
380        self.is_shutdown.store(true, Ordering::Relaxed);
381        // Flush any remaining spans
382        self.force_flush()?;
383        if let Ok(mut exporter) = self.exporter.lock() {
384            exporter.shutdown()
385        } else {
386            Err(OTelSdkError::InternalFailure(
387                "Failed to acquire exporter lock in shutdown".to_string(),
388            ))
389        }
390    }
391
392    fn shutdown_with_timeout(&self, timeout: std::time::Duration) -> OTelSdkResult {
393        self.is_shutdown.store(true, Ordering::Relaxed);
394        // Flush any remaining spans
395        self.force_flush()?;
396        if let Ok(mut exporter) = self.exporter.lock() {
397            exporter.shutdown_with_timeout(timeout)
398        } else {
399            Err(OTelSdkError::InternalFailure(
400                "Failed to acquire exporter lock in shutdown_with_timeout".to_string(),
401            ))
402        }
403    }
404
405    fn set_resource(&mut self, resource: &Resource) {
406        if let Ok(mut exporter) = self.exporter.lock() {
407            exporter.set_resource(resource);
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::logger::Logger;
416    use opentelemetry::{
417        trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
418        InstrumentationScope,
419    };
420    use opentelemetry_sdk::{
421        trace::SpanExporter,
422        trace::{SpanEvents, SpanLinks},
423    };
424    use serial_test::serial;
425    use std::{borrow::Cow, sync::Arc};
426    use tokio::sync::Mutex;
427
428    fn setup_test_logger() -> Logger {
429        Logger::new("test")
430    }
431
432    // Mock exporter that captures exported spans
433    #[derive(Debug)]
434    struct MockExporter {
435        spans: Arc<Mutex<Vec<SpanData>>>,
436    }
437
438    impl MockExporter {
439        fn new() -> Self {
440            Self {
441                spans: Arc::new(Mutex::new(Vec::new())),
442            }
443        }
444    }
445
446    impl SpanExporter for MockExporter {
447        fn export(
448            &self,
449            batch: Vec<SpanData>,
450        ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
451        {
452            let spans = self.spans.clone();
453            Box::pin(async move {
454                let mut spans = spans.lock().await;
455                spans.extend(batch);
456                Ok(())
457            })
458        }
459
460        fn shutdown(&mut self) -> OTelSdkResult {
461            Ok(())
462        }
463    }
464
465    // Helper function to create a test span
466    fn create_test_span(name: &str) -> SpanData {
467        let flags = TraceFlags::default().with_sampled(true);
468
469        SpanData {
470            span_context: SpanContext::new(
471                TraceId::from_hex("01000000000000000000000000000000").unwrap(),
472                SpanId::from_hex("0100000000000001").unwrap(),
473                flags,
474                false,
475                TraceState::default(),
476            ),
477            parent_span_id: SpanId::INVALID,
478            span_kind: opentelemetry::trace::SpanKind::Internal,
479            name: Cow::Owned(name.to_string()),
480            start_time: std::time::SystemTime::now(),
481            end_time: std::time::SystemTime::now(),
482            attributes: Vec::new(),
483            dropped_attributes_count: 0,
484            events: SpanEvents::default(),
485            links: SpanLinks::default(),
486            status: opentelemetry::trace::Status::default(),
487            instrumentation_scope: InstrumentationScope::builder("test").build(),
488        }
489    }
490
491    fn cleanup_env() {
492        env::remove_var(env_vars::QUEUE_SIZE);
493        env::remove_var(env_vars::PROCESSOR_MODE);
494        env::remove_var(env_vars::COMPRESSION_LEVEL);
495        env::remove_var(env_vars::SERVICE_NAME);
496    }
497
498    #[test]
499    #[serial]
500    fn test_ring_buffer_basic_operations() {
501        let mut buffer = SpanRingBuffer::new(2);
502
503        // Test empty buffer
504        assert!(buffer.size == 0);
505        assert_eq!(buffer.take_batch(2), vec![]);
506
507        // Test adding spans
508        buffer.push(create_test_span("span1"));
509        buffer.push(create_test_span("span2"));
510
511        assert!(buffer.size != 0);
512
513        // Test taking spans
514        let spans = buffer.take_batch(2);
515        assert_eq!(spans.len(), 2);
516        assert!(buffer.size == 0);
517    }
518
519    #[test]
520    #[serial]
521    fn test_ring_buffer_overflow() {
522        let mut buffer = SpanRingBuffer::new(2);
523
524        // Fill buffer
525        buffer.push(create_test_span("span1"));
526        buffer.push(create_test_span("span2"));
527
528        // Add one more span, should overwrite the oldest
529        let success = buffer.push(create_test_span("span3"));
530        assert!(!success); // Should fail since buffer is full
531
532        let spans = buffer.take_batch(2);
533        assert_eq!(spans.len(), 2);
534        assert!(spans.iter().any(|s| s.name == "span1"));
535        assert!(spans.iter().any(|s| s.name == "span2"));
536    }
537
538    #[test]
539    #[serial]
540    fn test_ring_buffer_batch_operations() {
541        let mut buffer = SpanRingBuffer::new(5);
542
543        // Add 5 spans
544        for i in 0..5 {
545            buffer.push(create_test_span(&format!("span{i}")));
546        }
547
548        assert_eq!(buffer.take_batch(2).len(), 2);
549        assert_eq!(buffer.take_batch(2).len(), 2);
550        assert_eq!(buffer.take_batch(2).len(), 1);
551        assert!(buffer.size == 0);
552    }
553
554    #[tokio::test]
555    #[serial]
556    async fn test_processor_sync_mode() {
557        let _logger = setup_test_logger();
558        let mock_exporter = MockExporter::new();
559        let spans_exported = mock_exporter.spans.clone();
560
561        let processor = LambdaSpanProcessor::builder()
562            .exporter(mock_exporter)
563            .max_queue_size(10)
564            .build();
565
566        // Test span processing
567        processor.on_end(create_test_span("test_span"));
568
569        // Force flush to ensure export
570        processor.force_flush().unwrap();
571
572        // Verify span was exported
573        let exported = spans_exported.lock().await;
574        assert_eq!(exported.len(), 1);
575        assert_eq!(exported[0].name, "test_span");
576    }
577
578    #[tokio::test]
579    #[serial]
580    async fn test_shutdown_exports_remaining_spans() {
581        let _logger = setup_test_logger();
582        let mock_exporter = MockExporter::new();
583        let spans_exported = mock_exporter.spans.clone();
584
585        let processor = LambdaSpanProcessor::builder()
586            .exporter(mock_exporter)
587            .max_queue_size(10)
588            .build();
589
590        // Add some spans
591        processor.on_end(create_test_span("span1"));
592        processor.on_end(create_test_span("span2"));
593
594        // Shutdown should export all spans
595        processor.shutdown().unwrap();
596
597        // Verify all spans were exported
598        let exported = spans_exported.lock().await;
599        assert_eq!(exported.len(), 2);
600
601        // Verify new spans are dropped after shutdown
602        processor.on_end(create_test_span("span3"));
603        assert_eq!(exported.len(), 2); // No new spans after shutdown
604    }
605
606    #[tokio::test]
607    #[serial]
608    async fn test_concurrent_span_processing() {
609        let _logger = setup_test_logger();
610        let mock_exporter = MockExporter::new();
611        let spans_exported = mock_exporter.spans.clone();
612
613        let processor = Arc::new(
614            LambdaSpanProcessor::builder()
615                .exporter(mock_exporter)
616                .max_queue_size(100)
617                .build(),
618        );
619
620        let mut handles = Vec::new();
621
622        // Spawn 10 tasks, each adding 10 spans
623        for i in 0..10 {
624            let processor = processor.clone();
625            handles.push(tokio::spawn(async move {
626                for j in 0..10 {
627                    processor.on_end(create_test_span(&format!("span_{i}_{j}")));
628                }
629            }));
630        }
631
632        // Wait for all tasks to complete
633        for handle in handles {
634            handle.await.unwrap();
635        }
636
637        // Force flush and verify all spans were processed
638        processor.force_flush().unwrap();
639
640        let exported = spans_exported.lock().await;
641        assert_eq!(exported.len(), 100);
642        assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
643    }
644
645    #[test]
646    #[serial]
647    fn test_builder_default_values() {
648        cleanup_env();
649
650        let mock_exporter = MockExporter::new();
651
652        let processor = LambdaSpanProcessor::builder()
653            .exporter(mock_exporter)
654            .build();
655
656        // Check default values
657        assert_eq!(processor.spans.lock().unwrap().capacity, 2048); // Default queue size
658    }
659
660    #[test]
661    #[serial]
662    fn test_builder_env_var_values() {
663        cleanup_env();
664
665        let mock_exporter = MockExporter::new();
666
667        // Set custom values via env vars
668        env::set_var(env_vars::QUEUE_SIZE, "1000");
669
670        let processor = LambdaSpanProcessor::builder()
671            .exporter(mock_exporter)
672            .build();
673
674        // Check that env var values were used
675        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
676
677        cleanup_env();
678    }
679
680    #[test]
681    #[serial]
682    fn test_builder_env_var_precedence() {
683        cleanup_env();
684
685        let mock_exporter = MockExporter::new();
686
687        // Set custom values via env vars
688        env::set_var(env_vars::QUEUE_SIZE, "1000");
689
690        // Create with explicit values (should be overridden by env vars)
691        let processor = LambdaSpanProcessor::builder()
692            .exporter(mock_exporter)
693            .max_queue_size(500)
694            .build();
695
696        // Check that env var values took precedence
697        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
698
699        cleanup_env();
700    }
701
702    #[test]
703    #[serial]
704    fn test_invalid_env_vars() {
705        cleanup_env();
706
707        let mock_exporter = MockExporter::new();
708
709        // Set invalid values via env vars
710        env::set_var(env_vars::QUEUE_SIZE, "invalid");
711
712        // Create with explicit values (should be used as fallbacks)
713        let processor = LambdaSpanProcessor::builder()
714            .exporter(mock_exporter)
715            .max_queue_size(500)
716            .build();
717
718        // Check that fallback values were used
719        assert_eq!(processor.spans.lock().unwrap().capacity, 500);
720
721        cleanup_env();
722    }
723}