Skip to main content

lambda_otel_lite/
processor.rs

1//! Span processor implementation optimized for AWS Lambda functions.
2//!
3//! This module provides a Lambda-optimized span processor that efficiently manages OpenTelemetry spans
4//! in a serverless environment. It uses a ring buffer to store spans in memory and provides efficient
5//! batch processing capabilities.
6//!
7//! # Architecture
8//!
9//! The processor is designed specifically for the Lambda execution environment:
10//!
11//! 1. **Ring Buffer Storage**:
12//!    - Fixed-size circular buffer prevents memory growth
13//!    - O(1) push operations with no memory reallocation
14//!    - FIFO ordering ensures spans are processed in order
15//!    - Efficient batch removal for export
16//!    - When full, new spans are dropped (with warning logs)
17//!
18//! 2. **Thread Safety**:
19//!    - All operations are thread-safe
20//!    - Uses Mutex for span buffer access
21//!    - Atomic operations for state management
22//!    - Safe for concurrent span submission
23//!
24//! # Configuration
25//!
26//! The processor can be configured through environment variables:
27//!
28//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls buffer size
29//!   - Defaults to 2048 spans
30//!   - Should be tuned based on span volume
31//!
32//! # Usage Examples
33//!
34//! Basic setup with default configuration:
35//!
36//! ```no_run
37//! use lambda_otel_lite::LambdaSpanProcessor;
38//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
39//!
40//! let processor = LambdaSpanProcessor::builder()
41//!     .exporter(OtlpStdoutSpanExporter::default())
42//!     .build();
43//! ```
44//!
45//! Using with an OTLP HTTP exporter:
46//!
47//! ```no_run
48//! use lambda_otel_lite::LambdaSpanProcessor;
49//! use opentelemetry_otlp::{SpanExporter, Protocol};
50//! use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
51//!
52//! // Important: When using HTTP exporters, always use reqwest::blocking::Client
53//! // Using async clients will cause deadlocks
54//! let exporter = SpanExporter::builder()
55//!     .with_http()
56//!     .with_http_client(reqwest::blocking::Client::new())
57//!     .with_protocol(Protocol::HttpBinary)
58//!     .build()
59//!     .expect("Failed to create exporter");
60//!
61//! let processor = LambdaSpanProcessor::builder()
62//!     .exporter(exporter)
63//!     .max_queue_size(4096)
64//!     .build();
65//! ```
66//!
67//! # Performance Considerations
68//!
69//! 1. **Memory Usage**:
70//!    - Fixed memory footprint based on queue size
71//!    - Each span typically uses 100-500 bytes
72//!    - Default 2048 spans ≈ 0.5-1MB memory
73//!
74//! 2. **Latency Impact**:
75//!    - Batch processing reduces network overhead
76//!    - Configurable batch size allows tuning for your use case
77//!    - Force flush available for immediate export when needed
78//!
79//! 3. **Reliability**:
80//!    - Spans may be dropped if buffer fills
81//!    - Warning logs indicate dropped spans
82//!    - Consider increasing buffer size if spans are dropped
83//!
84//! # Best Practices
85//!
86//! 1. **Buffer Sizing**:
87//!    - Monitor dropped_spans metric
88//!    - Size based on max spans per invocation
89//!    - Consider function memory when sizing
90//!
91//! 2. **Batch Configuration**:
92//!    - Larger batches improve throughput but increase memory usage
93//!    - Smaller batches reduce memory but increase network overhead
94//!    - Default values work well for most use cases
95//!
96//! 3. **Error Handling**:
97//!    - Export errors are logged but don't fail function
98//!    - Monitor for export failures in logs
99//!    - Consider retry strategies in custom exporters
100
101use crate::constants::{defaults, env_vars};
102use crate::logger::Logger;
103use bon::bon;
104
105/// Module-specific logger
106static LOGGER: Logger = Logger::const_new("processor");
107
108use opentelemetry::Context;
109use opentelemetry_sdk::{
110    error::{OTelSdkError, OTelSdkResult},
111    trace::{Span, SpanProcessor},
112    trace::{SpanData, SpanExporter},
113    Resource,
114};
115use std::env;
116use std::sync::{
117    atomic::{AtomicBool, AtomicUsize, Ordering},
118    Arc, Mutex,
119};
120
121/// A fixed-size ring buffer for storing spans efficiently.
122///
123/// This implementation provides a memory-efficient way to store spans with
124/// predictable performance characteristics:
125///
126/// # Performance Characteristics
127///
128/// - Push Operation: O(1)
129/// - Memory Usage: Fixed based on capacity
130/// - Order: FIFO (First In, First Out)
131/// - Batch Operations: Efficient removal of all spans
132///
133/// # Implementation Details
134///
135/// The buffer uses a circular array with head and tail pointers:
136/// - `head`: Points to next write position
137/// - `tail`: Points to next read position
138/// - `size`: Current number of elements
139/// - `capacity`: Maximum number of elements
140///
141/// When the buffer is full, new spans are rejected rather than overwriting old ones.
142/// This ensures no data loss occurs silently.
143#[derive(Debug)]
144struct SpanRingBuffer {
145    buffer: Vec<Option<SpanData>>,
146    head: usize, // Where to write next
147    tail: usize, // Where to read next
148    size: usize, // Current number of elements
149    capacity: usize,
150}
151
152impl Default for SpanRingBuffer {
153    fn default() -> Self {
154        Self::new(2048) // Default capacity
155    }
156}
157
158impl SpanRingBuffer {
159    fn new(capacity: usize) -> Self {
160        let mut buffer = Vec::with_capacity(capacity);
161        buffer.extend((0..capacity).map(|_| None));
162        Self {
163            buffer,
164            head: 0,
165            tail: 0,
166            size: 0,
167            capacity,
168        }
169    }
170
171    fn push(&mut self, span: SpanData) -> bool {
172        if self.size == self.capacity {
173            return false;
174        }
175
176        self.buffer[self.head] = Some(span);
177        self.head = (self.head + 1) % self.capacity;
178        self.size += 1;
179        true
180    }
181
182    fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
183        let batch_size = self.size.min(max_batch_size);
184        let mut result = Vec::with_capacity(batch_size);
185
186        for _ in 0..batch_size {
187            if let Some(span) = self.buffer[self.tail].take() {
188                result.push(span);
189            }
190            self.tail = (self.tail + 1) % self.capacity;
191            self.size -= 1;
192        }
193
194        if self.size == 0 {
195            self.head = 0;
196            self.tail = 0;
197        }
198
199        result
200    }
201}
202
203/// A span processor optimized for AWS Lambda functions.
204///
205/// This processor efficiently manages spans in a Lambda environment:
206/// - Uses a fixed-size ring buffer to prevent memory growth
207/// - Supports synchronous and asynchronous export modes
208/// - Handles graceful shutdown for Lambda termination
209/// - Exports *all* buffered spans in a single batch when `force_flush` is called.
210///
211/// # Examples
212///
213/// ```
214/// use lambda_otel_lite::LambdaSpanProcessor;
215/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
216///
217/// let processor = LambdaSpanProcessor::builder()
218///     .exporter(OtlpStdoutSpanExporter::default())
219///     .build();
220/// ```
221///
222/// With custom configuration:
223///
224/// ```
225/// use lambda_otel_lite::LambdaSpanProcessor;
226/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
227///
228/// let processor = LambdaSpanProcessor::builder()
229///     .exporter(OtlpStdoutSpanExporter::default())
230///     .max_queue_size(1000)
231///     .build();
232/// ```
233#[derive(Debug)]
234pub struct LambdaSpanProcessor<E>
235where
236    E: SpanExporter + std::fmt::Debug,
237{
238    /// The exporter used to export spans
239    exporter: Mutex<E>,
240
241    /// Internal buffer for storing spans
242    spans: Mutex<SpanRingBuffer>,
243
244    /// Flag indicating whether the processor is shut down
245    is_shutdown: Arc<AtomicBool>,
246
247    /// Counter for dropped spans
248    dropped_count: AtomicUsize,
249}
250
251#[bon]
252impl<E> LambdaSpanProcessor<E>
253where
254    E: SpanExporter + std::fmt::Debug,
255{
256    /// Creates a new LambdaSpanProcessor with the given exporter and configuration
257    ///
258    /// # Environment Variable Precedence
259    ///
260    /// Configuration values follow this precedence order:
261    /// 1. Environment variables (highest precedence)
262    /// 2. Constructor parameters
263    /// 3. Default values (lowest precedence)
264    ///
265    /// The relevant environment variables are:
266    /// - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls the maximum queue size
267    #[builder]
268    pub fn new(exporter: E, max_queue_size: Option<usize>) -> Self {
269        // Get queue size with proper precedence (env var > param > default)
270        let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
271            Ok(value) => match value.parse::<usize>() {
272                Ok(size) => size,
273                Err(_) => {
274                    LOGGER.warn(format!(
275                        "Failed to parse {}: {}, using fallback",
276                        env_vars::QUEUE_SIZE,
277                        value
278                    ));
279                    max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
280                }
281            },
282            Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
283        };
284
285        Self {
286            exporter: Mutex::new(exporter),
287            spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
288            is_shutdown: Arc::new(AtomicBool::new(false)),
289            dropped_count: AtomicUsize::new(0),
290        }
291    }
292}
293
294impl<E> SpanProcessor for LambdaSpanProcessor<E>
295where
296    E: SpanExporter + std::fmt::Debug,
297{
298    fn on_start(&self, _span: &mut Span, _cx: &Context) {
299        // No-op, as we only process spans on end
300    }
301
302    fn on_end(&self, span: SpanData) {
303        if self.is_shutdown.load(Ordering::Relaxed) {
304            LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
305            self.dropped_count.fetch_add(1, Ordering::Relaxed);
306            return;
307        }
308
309        // Skip unsampled spans
310        if !span.span_context.is_sampled() {
311            return;
312        }
313
314        // Try to add span to the buffer
315        if let Ok(mut spans) = self.spans.lock() {
316            if !spans.push(span) {
317                let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
318                if prev == 0 || prev % 100 == 0 {
319                    LOGGER.warn(format!(
320                        "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
321                        prev + 1
322                    ));
323                }
324            }
325        } else {
326            LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
327        }
328    }
329
330    fn force_flush(&self) -> OTelSdkResult {
331        LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
332
333        // Acquire lock on the span buffer
334        let spans_result = self.spans.lock();
335        let all_spans = match spans_result {
336            Ok(mut spans) => {
337                // Determine the number of spans currently in the buffer
338                let current_size = spans.size;
339                // Drain all spans from the buffer
340                spans.take_batch(current_size)
341            }
342            Err(_) => {
343                // If locking the span buffer fails, return an internal error
344                return Err(OTelSdkError::InternalFailure(
345                    "Failed to acquire spans lock in force_flush".to_string(),
346                ));
347            }
348        };
349        // Mutex guard for spans is dropped here, releasing the lock
350
351        // Acquire lock on the exporter
352        let exporter_result = self.exporter.lock();
353        match exporter_result {
354            Ok(exporter) => {
355                // Export all drained spans in a single batch
356                // This handles both empty (all_spans is empty Vec) and non-empty cases
357                let result = futures_executor::block_on(exporter.export(all_spans));
358
359                // Log error if export failed
360                if let Err(ref err) = result {
361                    LOGGER.debug(format!(
362                        "LambdaSpanProcessor.force_flush export error: {err:?}"
363                    ));
364                }
365
366                // Return the result of the export operation
367                result
368            }
369            Err(_) => {
370                // If locking the exporter fails, return an internal error
371                Err(OTelSdkError::InternalFailure(
372                    "Failed to acquire exporter lock in force_flush".to_string(),
373                ))
374            }
375        }
376        // Mutex guard for exporter is dropped here, releasing the lock
377    }
378
379    fn shutdown(&self) -> OTelSdkResult {
380        self.is_shutdown.store(true, Ordering::Relaxed);
381        // Flush any remaining spans
382        self.force_flush()?;
383        if let Ok(mut exporter) = self.exporter.lock() {
384            exporter.shutdown()
385        } else {
386            Err(OTelSdkError::InternalFailure(
387                "Failed to acquire exporter lock in shutdown".to_string(),
388            ))
389        }
390    }
391
392    fn shutdown_with_timeout(&self, timeout: std::time::Duration) -> OTelSdkResult {
393        self.is_shutdown.store(true, Ordering::Relaxed);
394        // Flush any remaining spans
395        self.force_flush()?;
396        if let Ok(mut exporter) = self.exporter.lock() {
397            exporter.shutdown_with_timeout(timeout)
398        } else {
399            Err(OTelSdkError::InternalFailure(
400                "Failed to acquire exporter lock in shutdown_with_timeout".to_string(),
401            ))
402        }
403    }
404
405    fn set_resource(&mut self, resource: &Resource) {
406        if let Ok(mut exporter) = self.exporter.lock() {
407            exporter.set_resource(resource);
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::logger::Logger;
416    use opentelemetry::{
417        trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
418        InstrumentationScope,
419    };
420    use opentelemetry_sdk::{
421        trace::SpanExporter,
422        trace::{SpanEvents, SpanLinks},
423    };
424    use serial_test::serial;
425    use std::{borrow::Cow, sync::Arc};
426    use tokio::sync::Mutex;
427
428    fn setup_test_logger() -> Logger {
429        Logger::new("test")
430    }
431
432    // Mock exporter that captures exported spans
433    #[derive(Debug)]
434    struct MockExporter {
435        spans: Arc<Mutex<Vec<SpanData>>>,
436    }
437
438    impl MockExporter {
439        fn new() -> Self {
440            Self {
441                spans: Arc::new(Mutex::new(Vec::new())),
442            }
443        }
444    }
445
446    impl SpanExporter for MockExporter {
447        fn export(
448            &self,
449            batch: Vec<SpanData>,
450        ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
451        {
452            let spans = self.spans.clone();
453            Box::pin(async move {
454                let mut spans = spans.lock().await;
455                spans.extend(batch);
456                Ok(())
457            })
458        }
459
460        fn shutdown(&mut self) -> OTelSdkResult {
461            Ok(())
462        }
463    }
464
465    // Helper function to create a test span
466    fn create_test_span(name: &str) -> SpanData {
467        let flags = TraceFlags::default().with_sampled(true);
468
469        SpanData {
470            span_context: SpanContext::new(
471                TraceId::from_hex("01000000000000000000000000000000").unwrap(),
472                SpanId::from_hex("0100000000000001").unwrap(),
473                flags,
474                false,
475                TraceState::default(),
476            ),
477            parent_span_id: SpanId::INVALID,
478            parent_span_is_remote: false,
479            span_kind: opentelemetry::trace::SpanKind::Internal,
480            name: Cow::Owned(name.to_string()),
481            start_time: std::time::SystemTime::now(),
482            end_time: std::time::SystemTime::now(),
483            attributes: Vec::new(),
484            dropped_attributes_count: 0,
485            events: SpanEvents::default(),
486            links: SpanLinks::default(),
487            status: opentelemetry::trace::Status::default(),
488            instrumentation_scope: InstrumentationScope::builder("test").build(),
489        }
490    }
491
492    fn cleanup_env() {
493        env::remove_var(env_vars::QUEUE_SIZE);
494        env::remove_var(env_vars::PROCESSOR_MODE);
495        env::remove_var(env_vars::COMPRESSION_LEVEL);
496        env::remove_var(env_vars::SERVICE_NAME);
497    }
498
499    #[test]
500    #[serial]
501    fn test_ring_buffer_basic_operations() {
502        let mut buffer = SpanRingBuffer::new(2);
503
504        // Test empty buffer
505        assert!(buffer.size == 0);
506        assert_eq!(buffer.take_batch(2), vec![]);
507
508        // Test adding spans
509        buffer.push(create_test_span("span1"));
510        buffer.push(create_test_span("span2"));
511
512        assert!(buffer.size != 0);
513
514        // Test taking spans
515        let spans = buffer.take_batch(2);
516        assert_eq!(spans.len(), 2);
517        assert!(buffer.size == 0);
518    }
519
520    #[test]
521    #[serial]
522    fn test_ring_buffer_overflow() {
523        let mut buffer = SpanRingBuffer::new(2);
524
525        // Fill buffer
526        buffer.push(create_test_span("span1"));
527        buffer.push(create_test_span("span2"));
528
529        // Add one more span, should overwrite the oldest
530        let success = buffer.push(create_test_span("span3"));
531        assert!(!success); // Should fail since buffer is full
532
533        let spans = buffer.take_batch(2);
534        assert_eq!(spans.len(), 2);
535        assert!(spans.iter().any(|s| s.name == "span1"));
536        assert!(spans.iter().any(|s| s.name == "span2"));
537    }
538
539    #[test]
540    #[serial]
541    fn test_ring_buffer_batch_operations() {
542        let mut buffer = SpanRingBuffer::new(5);
543
544        // Add 5 spans
545        for i in 0..5 {
546            buffer.push(create_test_span(&format!("span{i}")));
547        }
548
549        assert_eq!(buffer.take_batch(2).len(), 2);
550        assert_eq!(buffer.take_batch(2).len(), 2);
551        assert_eq!(buffer.take_batch(2).len(), 1);
552        assert!(buffer.size == 0);
553    }
554
555    #[tokio::test]
556    #[serial]
557    async fn test_processor_sync_mode() {
558        let _logger = setup_test_logger();
559        let mock_exporter = MockExporter::new();
560        let spans_exported = mock_exporter.spans.clone();
561
562        let processor = LambdaSpanProcessor::builder()
563            .exporter(mock_exporter)
564            .max_queue_size(10)
565            .build();
566
567        // Test span processing
568        processor.on_end(create_test_span("test_span"));
569
570        // Force flush to ensure export
571        processor.force_flush().unwrap();
572
573        // Verify span was exported
574        let exported = spans_exported.lock().await;
575        assert_eq!(exported.len(), 1);
576        assert_eq!(exported[0].name, "test_span");
577    }
578
579    #[tokio::test]
580    #[serial]
581    async fn test_shutdown_exports_remaining_spans() {
582        let _logger = setup_test_logger();
583        let mock_exporter = MockExporter::new();
584        let spans_exported = mock_exporter.spans.clone();
585
586        let processor = LambdaSpanProcessor::builder()
587            .exporter(mock_exporter)
588            .max_queue_size(10)
589            .build();
590
591        // Add some spans
592        processor.on_end(create_test_span("span1"));
593        processor.on_end(create_test_span("span2"));
594
595        // Shutdown should export all spans
596        processor.shutdown().unwrap();
597
598        // Verify all spans were exported
599        let exported = spans_exported.lock().await;
600        assert_eq!(exported.len(), 2);
601
602        // Verify new spans are dropped after shutdown
603        processor.on_end(create_test_span("span3"));
604        assert_eq!(exported.len(), 2); // No new spans after shutdown
605    }
606
607    #[tokio::test]
608    #[serial]
609    async fn test_concurrent_span_processing() {
610        let _logger = setup_test_logger();
611        let mock_exporter = MockExporter::new();
612        let spans_exported = mock_exporter.spans.clone();
613
614        let processor = Arc::new(
615            LambdaSpanProcessor::builder()
616                .exporter(mock_exporter)
617                .max_queue_size(100)
618                .build(),
619        );
620
621        let mut handles = Vec::new();
622
623        // Spawn 10 tasks, each adding 10 spans
624        for i in 0..10 {
625            let processor = processor.clone();
626            handles.push(tokio::spawn(async move {
627                for j in 0..10 {
628                    processor.on_end(create_test_span(&format!("span_{i}_{j}")));
629                }
630            }));
631        }
632
633        // Wait for all tasks to complete
634        for handle in handles {
635            handle.await.unwrap();
636        }
637
638        // Force flush and verify all spans were processed
639        processor.force_flush().unwrap();
640
641        let exported = spans_exported.lock().await;
642        assert_eq!(exported.len(), 100);
643        assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
644    }
645
646    #[test]
647    #[serial]
648    fn test_builder_default_values() {
649        cleanup_env();
650
651        let mock_exporter = MockExporter::new();
652
653        let processor = LambdaSpanProcessor::builder()
654            .exporter(mock_exporter)
655            .build();
656
657        // Check default values
658        assert_eq!(processor.spans.lock().unwrap().capacity, 2048); // Default queue size
659    }
660
661    #[test]
662    #[serial]
663    fn test_builder_env_var_values() {
664        cleanup_env();
665
666        let mock_exporter = MockExporter::new();
667
668        // Set custom values via env vars
669        env::set_var(env_vars::QUEUE_SIZE, "1000");
670
671        let processor = LambdaSpanProcessor::builder()
672            .exporter(mock_exporter)
673            .build();
674
675        // Check that env var values were used
676        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
677
678        cleanup_env();
679    }
680
681    #[test]
682    #[serial]
683    fn test_builder_env_var_precedence() {
684        cleanup_env();
685
686        let mock_exporter = MockExporter::new();
687
688        // Set custom values via env vars
689        env::set_var(env_vars::QUEUE_SIZE, "1000");
690
691        // Create with explicit values (should be overridden by env vars)
692        let processor = LambdaSpanProcessor::builder()
693            .exporter(mock_exporter)
694            .max_queue_size(500)
695            .build();
696
697        // Check that env var values took precedence
698        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
699
700        cleanup_env();
701    }
702
703    #[test]
704    #[serial]
705    fn test_invalid_env_vars() {
706        cleanup_env();
707
708        let mock_exporter = MockExporter::new();
709
710        // Set invalid values via env vars
711        env::set_var(env_vars::QUEUE_SIZE, "invalid");
712
713        // Create with explicit values (should be used as fallbacks)
714        let processor = LambdaSpanProcessor::builder()
715            .exporter(mock_exporter)
716            .max_queue_size(500)
717            .build();
718
719        // Check that fallback values were used
720        assert_eq!(processor.spans.lock().unwrap().capacity, 500);
721
722        cleanup_env();
723    }
724}