lambda_otel_lite/
processor.rs

1//! Span processor implementation optimized for AWS Lambda functions.
2//!
3//! This module provides a Lambda-optimized span processor that efficiently manages OpenTelemetry spans
4//! in a serverless environment. It uses a ring buffer to store spans in memory and provides efficient
5//! batch processing capabilities.
6//!
7//! # Architecture
8//!
9//! The processor is designed specifically for the Lambda execution environment:
10//!
11//! 1. **Ring Buffer Storage**:
12//!    - Fixed-size circular buffer prevents memory growth
13//!    - O(1) push operations with no memory reallocation
14//!    - FIFO ordering ensures spans are processed in order
15//!    - Efficient batch removal for export
16//!    - When full, new spans are dropped (with warning logs)
17//!
18//! 2. **Thread Safety**:
19//!    - All operations are thread-safe
20//!    - Uses Mutex for span buffer access
21//!    - Atomic operations for state management
22//!    - Safe for concurrent span submission
23//!
24//! # Configuration
25//!
26//! The processor can be configured through environment variables:
27//!
28//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls buffer size
29//!   - Defaults to 2048 spans
30//!   - Should be tuned based on span volume
31//!
32//! - `LAMBDA_SPAN_PROCESSOR_BATCH_SIZE`: Controls batch size
33//!   - Defaults to 512 spans
34//!   - Should be tuned based on span volume
35//!
36//! # Usage Examples
37//!
38//! Basic setup with default configuration:
39//!
40//! ```no_run
41//! use lambda_otel_lite::LambdaSpanProcessor;
42//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
43//!
44//! let processor = LambdaSpanProcessor::builder()
45//!     .exporter(OtlpStdoutSpanExporter::default())
46//!     .build();
47//! ```
48//!
49//! Using with an OTLP HTTP exporter:
50//!
51//! ```no_run
52//! use lambda_otel_lite::LambdaSpanProcessor;
53//! use opentelemetry_otlp::{SpanExporter, Protocol};
54//! use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
55//!
56//! // Important: When using HTTP exporters, always use reqwest::blocking::Client
57//! // Using async clients will cause deadlocks
58//! let exporter = SpanExporter::builder()
59//!     .with_http()
60//!     .with_http_client(reqwest::blocking::Client::new())
61//!     .with_protocol(Protocol::HttpBinary)
62//!     .build()
63//!     .expect("Failed to create exporter");
64//!
65//! let processor = LambdaSpanProcessor::builder()
66//!     .exporter(exporter)
67//!     .max_queue_size(4096)
68//!     .max_batch_size(1024)
69//!     .build();
70//! ```
71//!
72//! # Performance Considerations
73//!
74//! 1. **Memory Usage**:
75//!    - Fixed memory footprint based on queue size
76//!    - Each span typically uses 100-500 bytes
77//!    - Default 2048 spans ≈ 0.5-1MB memory
78//!
79//! 2. **Latency Impact**:
80//!    - Batch processing reduces network overhead
81//!    - Configurable batch size allows tuning for your use case
82//!    - Force flush available for immediate export when needed
83//!
84//! 3. **Reliability**:
85//!    - Spans may be dropped if buffer fills
86//!    - Warning logs indicate dropped spans
87//!    - Consider increasing buffer size if spans are dropped
88//!
89//! # Best Practices
90//!
91//! 1. **Buffer Sizing**:
92//!    - Monitor dropped_spans metric
93//!    - Size based on max spans per invocation
94//!    - Consider function memory when sizing
95//!
96//! 2. **Batch Configuration**:
97//!    - Larger batches improve throughput but increase memory usage
98//!    - Smaller batches reduce memory but increase network overhead
99//!    - Default values work well for most use cases
100//!
101//! 3. **Error Handling**:
102//!    - Export errors are logged but don't fail function
103//!    - Monitor for export failures in logs
104//!    - Consider retry strategies in custom exporters
105
106use crate::logger::Logger;
107use bon::bon;
108
109/// Module-specific logger
110static LOGGER: Logger = Logger::const_new("processor");
111
112use opentelemetry::Context;
113use opentelemetry_sdk::{
114    error::{OTelSdkError, OTelSdkResult},
115    trace::{Span, SpanProcessor},
116    trace::{SpanData, SpanExporter},
117    Resource,
118};
119use std::env;
120use std::sync::{
121    atomic::{AtomicBool, AtomicUsize, Ordering},
122    Arc, Mutex,
123};
124
125/// A fixed-size ring buffer for storing spans efficiently.
126///
127/// This implementation provides a memory-efficient way to store spans with
128/// predictable performance characteristics:
129///
130/// # Performance Characteristics
131///
132/// - Push Operation: O(1)
133/// - Memory Usage: Fixed based on capacity
134/// - Order: FIFO (First In, First Out)
135/// - Batch Operations: Efficient removal of all spans
136///
137/// # Implementation Details
138///
139/// The buffer uses a circular array with head and tail pointers:
140/// - `head`: Points to next write position
141/// - `tail`: Points to next read position
142/// - `size`: Current number of elements
143/// - `capacity`: Maximum number of elements
144///
145/// When the buffer is full, new spans are rejected rather than overwriting old ones.
146/// This ensures no data loss occurs silently.
147#[derive(Debug)]
148struct SpanRingBuffer {
149    buffer: Vec<Option<SpanData>>,
150    head: usize, // Where to write next
151    tail: usize, // Where to read next
152    size: usize, // Current number of elements
153    capacity: usize,
154}
155
156impl Default for SpanRingBuffer {
157    fn default() -> Self {
158        Self::new(2048) // Default capacity
159    }
160}
161
162impl SpanRingBuffer {
163    fn new(capacity: usize) -> Self {
164        let mut buffer = Vec::with_capacity(capacity);
165        buffer.extend((0..capacity).map(|_| None));
166        Self {
167            buffer,
168            head: 0,
169            tail: 0,
170            size: 0,
171            capacity,
172        }
173    }
174
175    fn push(&mut self, span: SpanData) -> bool {
176        if self.size == self.capacity {
177            return false;
178        }
179
180        self.buffer[self.head] = Some(span);
181        self.head = (self.head + 1) % self.capacity;
182        self.size += 1;
183        true
184    }
185
186    fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
187        let batch_size = self.size.min(max_batch_size);
188        let mut result = Vec::with_capacity(batch_size);
189
190        for _ in 0..batch_size {
191            if let Some(span) = self.buffer[self.tail].take() {
192                result.push(span);
193            }
194            self.tail = (self.tail + 1) % self.capacity;
195            self.size -= 1;
196        }
197
198        if self.size == 0 {
199            self.head = 0;
200            self.tail = 0;
201        }
202
203        result
204    }
205
206    fn is_empty(&self) -> bool {
207        self.size == 0
208    }
209}
210
211/// A span processor optimized for AWS Lambda functions.
212///
213/// This processor efficiently manages spans in a Lambda environment:
214/// - Uses a fixed-size ring buffer to prevent memory growth
215/// - Supports synchronous and asynchronous export modes
216/// - Handles graceful shutdown for Lambda termination
217///
218/// # Examples
219///
220/// ```
221/// use lambda_otel_lite::LambdaSpanProcessor;
222/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
223///
224/// let processor = LambdaSpanProcessor::builder()
225///     .exporter(OtlpStdoutSpanExporter::default())
226///     .build();
227/// ```
228///
229/// With custom configuration:
230///
231/// ```
232/// use lambda_otel_lite::LambdaSpanProcessor;
233/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
234///
235/// let processor = LambdaSpanProcessor::builder()
236///     .exporter(OtlpStdoutSpanExporter::default())
237///     .max_queue_size(1000)
238///     .max_batch_size(100)
239///     .build();
240/// ```
241#[derive(Debug)]
242pub struct LambdaSpanProcessor<E>
243where
244    E: SpanExporter + std::fmt::Debug,
245{
246    /// The exporter used to export spans
247    exporter: Mutex<E>,
248
249    /// Internal buffer for storing spans
250    spans: Mutex<SpanRingBuffer>,
251
252    /// Flag indicating whether the processor is shut down
253    is_shutdown: Arc<AtomicBool>,
254
255    /// Counter for dropped spans
256    dropped_count: AtomicUsize,
257
258    /// Maximum number of spans to export in a single batch
259    max_batch_size: usize,
260}
261
262#[bon]
263impl<E> LambdaSpanProcessor<E>
264where
265    E: SpanExporter + std::fmt::Debug,
266{
267    /// Returns the default max batch size from environment or fallback value
268    fn default_max_batch_size() -> usize {
269        env::var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE")
270            .ok()
271            .and_then(|s| s.parse().ok())
272            .unwrap_or(512)
273    }
274
275    /// Returns the default max queue size from environment or fallback value
276    fn default_max_queue_size() -> usize {
277        env::var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE")
278            .ok()
279            .and_then(|s| s.parse().ok())
280            .unwrap_or(2048)
281    }
282
283    /// Creates a new LambdaSpanProcessor with the given exporter and configuration
284    #[builder]
285    pub fn new(exporter: E, max_batch_size: Option<usize>, max_queue_size: Option<usize>) -> Self {
286        let max_batch_size = max_batch_size.unwrap_or_else(Self::default_max_batch_size);
287        let max_queue_size = max_queue_size.unwrap_or_else(Self::default_max_queue_size);
288
289        Self {
290            exporter: Mutex::new(exporter),
291            spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
292            is_shutdown: Arc::new(AtomicBool::new(false)),
293            dropped_count: AtomicUsize::new(0),
294            max_batch_size,
295        }
296    }
297}
298
299impl<E> SpanProcessor for LambdaSpanProcessor<E>
300where
301    E: SpanExporter + std::fmt::Debug,
302{
303    fn on_start(&self, _span: &mut Span, _cx: &Context) {
304        // No-op, as we only process spans on end
305    }
306
307    fn on_end(&self, span: SpanData) {
308        if self.is_shutdown.load(Ordering::Relaxed) {
309            LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
310            self.dropped_count.fetch_add(1, Ordering::Relaxed);
311            return;
312        }
313
314        // Skip unsampled spans
315        if !span.span_context.is_sampled() {
316            return;
317        }
318
319        // Try to add span to the buffer
320        if let Ok(mut spans) = self.spans.lock() {
321            if !spans.push(span) {
322                let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
323                if prev == 0 || prev % 100 == 0 {
324                    LOGGER.warn(format!(
325                        "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
326                        prev + 1
327                    ));
328                }
329            }
330        } else {
331            LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
332        }
333    }
334
335    fn force_flush(&self) -> OTelSdkResult {
336        LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
337        if let Ok(mut spans) = self.spans.lock() {
338            if spans.is_empty() {
339                return Ok(());
340            }
341
342            let mut exporter = self.exporter.lock().map_err(|_| {
343                OTelSdkError::InternalFailure(
344                    "Failed to acquire exporter lock in force_flush".to_string(),
345                )
346            })?;
347
348            // Process spans in batches
349            while !spans.is_empty() {
350                let batch = spans.take_batch(self.max_batch_size);
351                if !batch.is_empty() {
352                    let result = futures_executor::block_on(exporter.export(batch));
353                    if let Err(err) = &result {
354                        LOGGER.debug(format!("LambdaSpanProcessor.force_flush.Error: {:?}", err));
355                        return result;
356                    }
357                }
358            }
359            Ok(())
360        } else {
361            Err(OTelSdkError::InternalFailure(
362                "Failed to acquire spans lock in force_flush".to_string(),
363            ))
364        }
365    }
366
367    fn shutdown(&self) -> OTelSdkResult {
368        self.is_shutdown.store(true, Ordering::Relaxed);
369        // Flush any remaining spans
370        self.force_flush()?;
371        if let Ok(mut exporter) = self.exporter.lock() {
372            exporter.shutdown()
373        } else {
374            Err(OTelSdkError::InternalFailure(
375                "Failed to acquire exporter lock in shutdown".to_string(),
376            ))
377        }
378    }
379
380    fn set_resource(&mut self, resource: &Resource) {
381        if let Ok(mut exporter) = self.exporter.lock() {
382            exporter.set_resource(resource);
383        }
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::logger::Logger;
391    use opentelemetry::{
392        trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
393        InstrumentationScope,
394    };
395    use opentelemetry_sdk::{
396        trace::SpanExporter,
397        trace::{SpanEvents, SpanLinks},
398    };
399    use serial_test::serial;
400    use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc};
401    use tokio::sync::Mutex;
402
403    fn setup_test_logger() -> Logger {
404        Logger::new("test")
405    }
406
407    // Mock exporter that captures exported spans
408    #[derive(Debug)]
409    struct MockExporter {
410        spans: Arc<Mutex<Vec<SpanData>>>,
411    }
412
413    impl MockExporter {
414        fn new() -> Self {
415            Self {
416                spans: Arc::new(Mutex::new(Vec::new())),
417            }
418        }
419    }
420
421    impl SpanExporter for MockExporter {
422        fn export(
423            &mut self,
424            batch: Vec<SpanData>,
425        ) -> Pin<Box<dyn Future<Output = OTelSdkResult> + Send>> {
426            let spans = self.spans.clone();
427            Box::pin(async move {
428                let mut spans = spans.lock().await;
429                spans.extend(batch);
430                Ok(())
431            })
432        }
433
434        fn shutdown(&mut self) -> OTelSdkResult {
435            Ok(())
436        }
437    }
438
439    // Helper function to create a test span
440    fn create_test_span(name: &str) -> SpanData {
441        let flags = TraceFlags::default().with_sampled(true);
442
443        SpanData {
444            span_context: SpanContext::new(
445                TraceId::from_hex("01000000000000000000000000000000").unwrap(),
446                SpanId::from_hex("0100000000000001").unwrap(),
447                flags,
448                false,
449                TraceState::default(),
450            ),
451            parent_span_id: SpanId::INVALID,
452            span_kind: opentelemetry::trace::SpanKind::Internal,
453            name: Cow::Owned(name.to_string()),
454            start_time: std::time::SystemTime::now(),
455            end_time: std::time::SystemTime::now(),
456            attributes: Vec::new(),
457            dropped_attributes_count: 0,
458            events: SpanEvents::default(),
459            links: SpanLinks::default(),
460            status: opentelemetry::trace::Status::default(),
461            instrumentation_scope: InstrumentationScope::builder("test").build(),
462        }
463    }
464
465    #[test]
466    #[serial]
467    fn test_ring_buffer_basic_operations() {
468        let mut buffer = SpanRingBuffer::new(2);
469
470        // Test empty buffer
471        assert!(buffer.is_empty());
472        assert_eq!(buffer.take_batch(2), vec![]);
473
474        // Test adding spans
475        buffer.push(create_test_span("span1"));
476        buffer.push(create_test_span("span2"));
477
478        assert!(!buffer.is_empty());
479
480        // Test taking spans
481        let spans = buffer.take_batch(2);
482        assert_eq!(spans.len(), 2);
483        assert!(buffer.is_empty());
484    }
485
486    #[test]
487    #[serial]
488    fn test_ring_buffer_overflow() {
489        let mut buffer = SpanRingBuffer::new(2);
490
491        // Fill buffer
492        buffer.push(create_test_span("span1"));
493        buffer.push(create_test_span("span2"));
494
495        // Add one more span, should overwrite the oldest
496        let success = buffer.push(create_test_span("span3"));
497        assert!(!success); // Should fail since buffer is full
498
499        let spans = buffer.take_batch(2);
500        assert_eq!(spans.len(), 2);
501        assert!(spans.iter().any(|s| s.name == "span1"));
502        assert!(spans.iter().any(|s| s.name == "span2"));
503    }
504
505    #[test]
506    #[serial]
507    fn test_ring_buffer_batch_operations() {
508        let mut buffer = SpanRingBuffer::new(5);
509
510        // Add 5 spans
511        for i in 0..5 {
512            buffer.push(create_test_span(&format!("span{}", i)));
513        }
514
515        assert_eq!(buffer.take_batch(2).len(), 2);
516        assert_eq!(buffer.take_batch(2).len(), 2);
517        assert_eq!(buffer.take_batch(2).len(), 1);
518        assert!(buffer.is_empty());
519    }
520
521    #[tokio::test]
522    #[serial]
523    async fn test_processor_sync_mode() {
524        let _logger = setup_test_logger();
525        let mock_exporter = MockExporter::new();
526        let spans_exported = mock_exporter.spans.clone();
527
528        let processor = LambdaSpanProcessor::builder()
529            .exporter(mock_exporter)
530            .max_queue_size(10)
531            .max_batch_size(5)
532            .build();
533
534        // Test span processing
535        processor.on_end(create_test_span("test_span"));
536
537        // Force flush to ensure export
538        processor.force_flush().unwrap();
539
540        // Verify span was exported
541        let exported = spans_exported.lock().await;
542        assert_eq!(exported.len(), 1);
543        assert_eq!(exported[0].name, "test_span");
544    }
545
546    #[tokio::test]
547    #[serial]
548    async fn test_shutdown_exports_remaining_spans() {
549        let _logger = setup_test_logger();
550        let mock_exporter = MockExporter::new();
551        let spans_exported = mock_exporter.spans.clone();
552
553        let processor = LambdaSpanProcessor::builder()
554            .exporter(mock_exporter)
555            .max_queue_size(10)
556            .max_batch_size(5)
557            .build();
558
559        // Add some spans
560        processor.on_end(create_test_span("span1"));
561        processor.on_end(create_test_span("span2"));
562
563        // Shutdown should export all spans
564        processor.shutdown().unwrap();
565
566        // Verify all spans were exported
567        let exported = spans_exported.lock().await;
568        assert_eq!(exported.len(), 2);
569
570        // Verify new spans are dropped after shutdown
571        processor.on_end(create_test_span("span3"));
572        assert_eq!(exported.len(), 2); // No new spans after shutdown
573    }
574
575    #[tokio::test]
576    #[serial]
577    async fn test_concurrent_span_processing() {
578        let _logger = setup_test_logger();
579        let mock_exporter = MockExporter::new();
580        let spans_exported = mock_exporter.spans.clone();
581
582        let processor = Arc::new(
583            LambdaSpanProcessor::builder()
584                .exporter(mock_exporter)
585                .max_queue_size(100)
586                .max_batch_size(25)
587                .build(),
588        );
589
590        let mut handles = Vec::new();
591
592        // Spawn 10 tasks, each adding 10 spans
593        for i in 0..10 {
594            let processor = processor.clone();
595            handles.push(tokio::spawn(async move {
596                for j in 0..10 {
597                    processor.on_end(create_test_span(&format!("span_{}_{}", i, j)));
598                }
599            }));
600        }
601
602        // Wait for all tasks to complete
603        for handle in handles {
604            handle.await.unwrap();
605        }
606
607        // Force flush and verify all spans were processed
608        processor.force_flush().unwrap();
609
610        let exported = spans_exported.lock().await;
611        assert_eq!(exported.len(), 100);
612        assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
613    }
614
615    #[test]
616    #[serial]
617    fn test_batch_processing() {
618        let _logger = setup_test_logger();
619        let mock_exporter = MockExporter::new();
620        let processor = LambdaSpanProcessor::builder()
621            .exporter(mock_exporter)
622            .max_queue_size(10)
623            .max_batch_size(3)
624            .build();
625
626        // Add 5 spans
627        for i in 0..5 {
628            processor.on_end(create_test_span(&format!("span{}", i)));
629        }
630
631        // Force flush should process in batches of 3
632        processor.force_flush().unwrap();
633
634        // Add 2 more spans
635        processor.on_end(create_test_span("span5"));
636        processor.on_end(create_test_span("span6"));
637
638        // Final flush
639        processor.force_flush().unwrap();
640    }
641
642    #[test]
643    #[serial]
644    fn test_builder_default_values() {
645        let mock_exporter = MockExporter::new();
646
647        // Remove any existing env vars to test true defaults
648        env::remove_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE");
649        env::remove_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE");
650
651        let processor = LambdaSpanProcessor::builder()
652            .exporter(mock_exporter)
653            .build();
654
655        // Check default values
656        assert_eq!(processor.max_batch_size, 512); // Default batch size
657        assert_eq!(processor.spans.lock().unwrap().capacity, 2048); // Default queue size
658    }
659
660    #[test]
661    #[serial]
662    fn test_builder_env_var_values() {
663        let mock_exporter = MockExporter::new();
664
665        // Set custom values via env vars
666        env::set_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE", "100");
667        env::set_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE", "1000");
668
669        let processor = LambdaSpanProcessor::builder()
670            .exporter(mock_exporter)
671            .build();
672
673        // Check that env var values were used
674        assert_eq!(processor.max_batch_size, 100);
675        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
676
677        // Clean up
678        env::remove_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE");
679        env::remove_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE");
680    }
681
682    #[test]
683    #[serial]
684    fn test_builder_explicit_values_override_env() {
685        let mock_exporter = MockExporter::new();
686
687        // Set env vars that should be overridden
688        env::set_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE", "100");
689        env::set_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE", "1000");
690
691        let processor = LambdaSpanProcessor::builder()
692            .exporter(mock_exporter)
693            .max_batch_size(200)
694            .max_queue_size(2000)
695            .build();
696
697        // Check that explicit values were used instead of env vars
698        assert_eq!(processor.max_batch_size, 200);
699        assert_eq!(processor.spans.lock().unwrap().capacity, 2000);
700
701        // Clean up
702        env::remove_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE");
703        env::remove_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE");
704    }
705
706    #[test]
707    #[serial]
708    fn test_builder_invalid_env_vars() {
709        let mock_exporter = MockExporter::new();
710
711        // Set invalid values in env vars
712        env::set_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE", "not_a_number");
713        env::set_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE", "also_not_a_number");
714
715        let processor = LambdaSpanProcessor::builder()
716            .exporter(mock_exporter)
717            .build();
718
719        // Check that defaults were used
720        assert_eq!(processor.max_batch_size, 512);
721        assert_eq!(processor.spans.lock().unwrap().capacity, 2048);
722
723        // Clean up
724        env::remove_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE");
725        env::remove_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE");
726    }
727}