1use crate::constants::{defaults, env_vars};
107use crate::logger::Logger;
108use bon::bon;
109
110static LOGGER: Logger = Logger::const_new("processor");
112
113use opentelemetry::Context;
114use opentelemetry_sdk::{
115 error::{OTelSdkError, OTelSdkResult},
116 trace::{Span, SpanProcessor},
117 trace::{SpanData, SpanExporter},
118 Resource,
119};
120use std::env;
121use std::sync::{
122 atomic::{AtomicBool, AtomicUsize, Ordering},
123 Arc, Mutex,
124};
125
126#[derive(Debug)]
149struct SpanRingBuffer {
150 buffer: Vec<Option<SpanData>>,
151 head: usize, tail: usize, size: usize, capacity: usize,
155}
156
157impl Default for SpanRingBuffer {
158 fn default() -> Self {
159 Self::new(2048) }
161}
162
163impl SpanRingBuffer {
164 fn new(capacity: usize) -> Self {
165 let mut buffer = Vec::with_capacity(capacity);
166 buffer.extend((0..capacity).map(|_| None));
167 Self {
168 buffer,
169 head: 0,
170 tail: 0,
171 size: 0,
172 capacity,
173 }
174 }
175
176 fn push(&mut self, span: SpanData) -> bool {
177 if self.size == self.capacity {
178 return false;
179 }
180
181 self.buffer[self.head] = Some(span);
182 self.head = (self.head + 1) % self.capacity;
183 self.size += 1;
184 true
185 }
186
187 fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
188 let batch_size = self.size.min(max_batch_size);
189 let mut result = Vec::with_capacity(batch_size);
190
191 for _ in 0..batch_size {
192 if let Some(span) = self.buffer[self.tail].take() {
193 result.push(span);
194 }
195 self.tail = (self.tail + 1) % self.capacity;
196 self.size -= 1;
197 }
198
199 if self.size == 0 {
200 self.head = 0;
201 self.tail = 0;
202 }
203
204 result
205 }
206
207 fn is_empty(&self) -> bool {
208 self.size == 0
209 }
210}
211
212#[derive(Debug)]
243pub struct LambdaSpanProcessor<E>
244where
245 E: SpanExporter + std::fmt::Debug,
246{
247 exporter: Mutex<E>,
249
250 spans: Mutex<SpanRingBuffer>,
252
253 is_shutdown: Arc<AtomicBool>,
255
256 dropped_count: AtomicUsize,
258
259 max_batch_size: usize,
261}
262
263#[bon]
264impl<E> LambdaSpanProcessor<E>
265where
266 E: SpanExporter + std::fmt::Debug,
267{
268 #[builder]
281 pub fn new(exporter: E, max_batch_size: Option<usize>, max_queue_size: Option<usize>) -> Self {
282 let max_batch_size = match env::var(env_vars::BATCH_SIZE) {
284 Ok(value) => match value.parse::<usize>() {
285 Ok(size) => size,
286 Err(_) => {
287 LOGGER.warn(format!(
288 "Failed to parse {}: {}, using fallback",
289 env_vars::BATCH_SIZE,
290 value
291 ));
292 max_batch_size.unwrap_or(defaults::BATCH_SIZE)
293 }
294 },
295 Err(_) => max_batch_size.unwrap_or(defaults::BATCH_SIZE),
296 };
297
298 let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
300 Ok(value) => match value.parse::<usize>() {
301 Ok(size) => size,
302 Err(_) => {
303 LOGGER.warn(format!(
304 "Failed to parse {}: {}, using fallback",
305 env_vars::QUEUE_SIZE,
306 value
307 ));
308 max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
309 }
310 },
311 Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
312 };
313
314 Self {
315 exporter: Mutex::new(exporter),
316 spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
317 is_shutdown: Arc::new(AtomicBool::new(false)),
318 dropped_count: AtomicUsize::new(0),
319 max_batch_size,
320 }
321 }
322}
323
324impl<E> SpanProcessor for LambdaSpanProcessor<E>
325where
326 E: SpanExporter + std::fmt::Debug,
327{
328 fn on_start(&self, _span: &mut Span, _cx: &Context) {
329 }
331
332 fn on_end(&self, span: SpanData) {
333 if self.is_shutdown.load(Ordering::Relaxed) {
334 LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
335 self.dropped_count.fetch_add(1, Ordering::Relaxed);
336 return;
337 }
338
339 if !span.span_context.is_sampled() {
341 return;
342 }
343
344 if let Ok(mut spans) = self.spans.lock() {
346 if !spans.push(span) {
347 let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
348 if prev == 0 || prev % 100 == 0 {
349 LOGGER.warn(format!(
350 "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
351 prev + 1
352 ));
353 }
354 }
355 } else {
356 LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
357 }
358 }
359
360 fn force_flush(&self) -> OTelSdkResult {
361 LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
362 if let Ok(mut spans) = self.spans.lock() {
363 if spans.is_empty() {
364 return Ok(());
365 }
366
367 let mut exporter = self.exporter.lock().map_err(|_| {
368 OTelSdkError::InternalFailure(
369 "Failed to acquire exporter lock in force_flush".to_string(),
370 )
371 })?;
372
373 while !spans.is_empty() {
375 let batch = spans.take_batch(self.max_batch_size);
376 if !batch.is_empty() {
377 let result = futures_executor::block_on(exporter.export(batch));
378 if let Err(err) = &result {
379 LOGGER.debug(format!("LambdaSpanProcessor.force_flush.Error: {:?}", err));
380 return result;
381 }
382 }
383 }
384 Ok(())
385 } else {
386 Err(OTelSdkError::InternalFailure(
387 "Failed to acquire spans lock in force_flush".to_string(),
388 ))
389 }
390 }
391
392 fn shutdown(&self) -> OTelSdkResult {
393 self.is_shutdown.store(true, Ordering::Relaxed);
394 self.force_flush()?;
396 if let Ok(mut exporter) = self.exporter.lock() {
397 exporter.shutdown()
398 } else {
399 Err(OTelSdkError::InternalFailure(
400 "Failed to acquire exporter lock in shutdown".to_string(),
401 ))
402 }
403 }
404
405 fn set_resource(&mut self, resource: &Resource) {
406 if let Ok(mut exporter) = self.exporter.lock() {
407 exporter.set_resource(resource);
408 }
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::logger::Logger;
416 use opentelemetry::{
417 trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
418 InstrumentationScope,
419 };
420 use opentelemetry_sdk::{
421 trace::SpanExporter,
422 trace::{SpanEvents, SpanLinks},
423 };
424 use serial_test::serial;
425 use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc};
426 use tokio::sync::Mutex;
427
428 fn setup_test_logger() -> Logger {
429 Logger::new("test")
430 }
431
432 #[derive(Debug)]
434 struct MockExporter {
435 spans: Arc<Mutex<Vec<SpanData>>>,
436 }
437
438 impl MockExporter {
439 fn new() -> Self {
440 Self {
441 spans: Arc::new(Mutex::new(Vec::new())),
442 }
443 }
444 }
445
446 impl SpanExporter for MockExporter {
447 fn export(
448 &mut self,
449 batch: Vec<SpanData>,
450 ) -> Pin<Box<dyn Future<Output = OTelSdkResult> + Send>> {
451 let spans = self.spans.clone();
452 Box::pin(async move {
453 let mut spans = spans.lock().await;
454 spans.extend(batch);
455 Ok(())
456 })
457 }
458
459 fn shutdown(&mut self) -> OTelSdkResult {
460 Ok(())
461 }
462 }
463
464 fn create_test_span(name: &str) -> SpanData {
466 let flags = TraceFlags::default().with_sampled(true);
467
468 SpanData {
469 span_context: SpanContext::new(
470 TraceId::from_hex("01000000000000000000000000000000").unwrap(),
471 SpanId::from_hex("0100000000000001").unwrap(),
472 flags,
473 false,
474 TraceState::default(),
475 ),
476 parent_span_id: SpanId::INVALID,
477 span_kind: opentelemetry::trace::SpanKind::Internal,
478 name: Cow::Owned(name.to_string()),
479 start_time: std::time::SystemTime::now(),
480 end_time: std::time::SystemTime::now(),
481 attributes: Vec::new(),
482 dropped_attributes_count: 0,
483 events: SpanEvents::default(),
484 links: SpanLinks::default(),
485 status: opentelemetry::trace::Status::default(),
486 instrumentation_scope: InstrumentationScope::builder("test").build(),
487 }
488 }
489
490 fn cleanup_env() {
491 env::remove_var(env_vars::BATCH_SIZE);
492 env::remove_var(env_vars::QUEUE_SIZE);
493 env::remove_var(env_vars::PROCESSOR_MODE);
494 env::remove_var(env_vars::COMPRESSION_LEVEL);
495 env::remove_var(env_vars::SERVICE_NAME);
496 }
497
498 #[test]
499 #[serial]
500 fn test_ring_buffer_basic_operations() {
501 let mut buffer = SpanRingBuffer::new(2);
502
503 assert!(buffer.is_empty());
505 assert_eq!(buffer.take_batch(2), vec![]);
506
507 buffer.push(create_test_span("span1"));
509 buffer.push(create_test_span("span2"));
510
511 assert!(!buffer.is_empty());
512
513 let spans = buffer.take_batch(2);
515 assert_eq!(spans.len(), 2);
516 assert!(buffer.is_empty());
517 }
518
519 #[test]
520 #[serial]
521 fn test_ring_buffer_overflow() {
522 let mut buffer = SpanRingBuffer::new(2);
523
524 buffer.push(create_test_span("span1"));
526 buffer.push(create_test_span("span2"));
527
528 let success = buffer.push(create_test_span("span3"));
530 assert!(!success); let spans = buffer.take_batch(2);
533 assert_eq!(spans.len(), 2);
534 assert!(spans.iter().any(|s| s.name == "span1"));
535 assert!(spans.iter().any(|s| s.name == "span2"));
536 }
537
538 #[test]
539 #[serial]
540 fn test_ring_buffer_batch_operations() {
541 let mut buffer = SpanRingBuffer::new(5);
542
543 for i in 0..5 {
545 buffer.push(create_test_span(&format!("span{}", i)));
546 }
547
548 assert_eq!(buffer.take_batch(2).len(), 2);
549 assert_eq!(buffer.take_batch(2).len(), 2);
550 assert_eq!(buffer.take_batch(2).len(), 1);
551 assert!(buffer.is_empty());
552 }
553
554 #[tokio::test]
555 #[serial]
556 async fn test_processor_sync_mode() {
557 let _logger = setup_test_logger();
558 let mock_exporter = MockExporter::new();
559 let spans_exported = mock_exporter.spans.clone();
560
561 let processor = LambdaSpanProcessor::builder()
562 .exporter(mock_exporter)
563 .max_queue_size(10)
564 .max_batch_size(5)
565 .build();
566
567 processor.on_end(create_test_span("test_span"));
569
570 processor.force_flush().unwrap();
572
573 let exported = spans_exported.lock().await;
575 assert_eq!(exported.len(), 1);
576 assert_eq!(exported[0].name, "test_span");
577 }
578
579 #[tokio::test]
580 #[serial]
581 async fn test_shutdown_exports_remaining_spans() {
582 let _logger = setup_test_logger();
583 let mock_exporter = MockExporter::new();
584 let spans_exported = mock_exporter.spans.clone();
585
586 let processor = LambdaSpanProcessor::builder()
587 .exporter(mock_exporter)
588 .max_queue_size(10)
589 .max_batch_size(5)
590 .build();
591
592 processor.on_end(create_test_span("span1"));
594 processor.on_end(create_test_span("span2"));
595
596 processor.shutdown().unwrap();
598
599 let exported = spans_exported.lock().await;
601 assert_eq!(exported.len(), 2);
602
603 processor.on_end(create_test_span("span3"));
605 assert_eq!(exported.len(), 2); }
607
608 #[tokio::test]
609 #[serial]
610 async fn test_concurrent_span_processing() {
611 let _logger = setup_test_logger();
612 let mock_exporter = MockExporter::new();
613 let spans_exported = mock_exporter.spans.clone();
614
615 let processor = Arc::new(
616 LambdaSpanProcessor::builder()
617 .exporter(mock_exporter)
618 .max_queue_size(100)
619 .max_batch_size(25)
620 .build(),
621 );
622
623 let mut handles = Vec::new();
624
625 for i in 0..10 {
627 let processor = processor.clone();
628 handles.push(tokio::spawn(async move {
629 for j in 0..10 {
630 processor.on_end(create_test_span(&format!("span_{}_{}", i, j)));
631 }
632 }));
633 }
634
635 for handle in handles {
637 handle.await.unwrap();
638 }
639
640 processor.force_flush().unwrap();
642
643 let exported = spans_exported.lock().await;
644 assert_eq!(exported.len(), 100);
645 assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
646 }
647
648 #[test]
649 #[serial]
650 fn test_batch_processing() {
651 let _logger = setup_test_logger();
652 let mock_exporter = MockExporter::new();
653 let processor = LambdaSpanProcessor::builder()
654 .exporter(mock_exporter)
655 .max_queue_size(10)
656 .max_batch_size(3)
657 .build();
658
659 for i in 0..5 {
661 processor.on_end(create_test_span(&format!("span{}", i)));
662 }
663
664 processor.force_flush().unwrap();
666
667 processor.on_end(create_test_span("span5"));
669 processor.on_end(create_test_span("span6"));
670
671 processor.force_flush().unwrap();
673 }
674
675 #[test]
676 #[serial]
677 fn test_builder_default_values() {
678 cleanup_env();
679
680 let mock_exporter = MockExporter::new();
681
682 let processor = LambdaSpanProcessor::builder()
683 .exporter(mock_exporter)
684 .build();
685
686 assert_eq!(processor.max_batch_size, 512); assert_eq!(processor.spans.lock().unwrap().capacity, 2048); }
690
691 #[test]
692 #[serial]
693 fn test_builder_env_var_values() {
694 cleanup_env();
695
696 let mock_exporter = MockExporter::new();
697
698 env::set_var(env_vars::BATCH_SIZE, "100");
700 env::set_var(env_vars::QUEUE_SIZE, "1000");
701
702 let processor = LambdaSpanProcessor::builder()
703 .exporter(mock_exporter)
704 .build();
705
706 assert_eq!(processor.max_batch_size, 100);
708 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
709
710 cleanup_env();
711 }
712
713 #[test]
714 #[serial]
715 fn test_builder_env_var_precedence() {
716 cleanup_env();
717
718 let mock_exporter = MockExporter::new();
719
720 env::set_var(env_vars::BATCH_SIZE, "100");
722 env::set_var(env_vars::QUEUE_SIZE, "1000");
723
724 let processor = LambdaSpanProcessor::builder()
726 .exporter(mock_exporter)
727 .max_batch_size(50)
728 .max_queue_size(500)
729 .build();
730
731 assert_eq!(processor.max_batch_size, 100);
733 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
734
735 cleanup_env();
736 }
737
738 #[test]
739 #[serial]
740 fn test_invalid_env_vars() {
741 cleanup_env();
742
743 let mock_exporter = MockExporter::new();
744
745 env::set_var(env_vars::BATCH_SIZE, "not_a_number");
747 env::set_var(env_vars::QUEUE_SIZE, "invalid");
748
749 let processor = LambdaSpanProcessor::builder()
751 .exporter(mock_exporter)
752 .max_batch_size(50)
753 .max_queue_size(500)
754 .build();
755
756 assert_eq!(processor.max_batch_size, 50);
758 assert_eq!(processor.spans.lock().unwrap().capacity, 500);
759
760 cleanup_env();
761 }
762}