1use crate::constants::{defaults, env_vars};
107use crate::logger::Logger;
108use bon::bon;
109
110static LOGGER: Logger = Logger::const_new("processor");
112
113use opentelemetry::Context;
114use opentelemetry_sdk::{
115 error::{OTelSdkError, OTelSdkResult},
116 trace::{Span, SpanProcessor},
117 trace::{SpanData, SpanExporter},
118 Resource,
119};
120use std::env;
121use std::sync::{
122 atomic::{AtomicBool, AtomicUsize, Ordering},
123 Arc, Mutex,
124};
125
126#[derive(Debug)]
149struct SpanRingBuffer {
150 buffer: Vec<Option<SpanData>>,
151 head: usize, tail: usize, size: usize, capacity: usize,
155}
156
157impl Default for SpanRingBuffer {
158 fn default() -> Self {
159 Self::new(2048) }
161}
162
163impl SpanRingBuffer {
164 fn new(capacity: usize) -> Self {
165 let mut buffer = Vec::with_capacity(capacity);
166 buffer.extend((0..capacity).map(|_| None));
167 Self {
168 buffer,
169 head: 0,
170 tail: 0,
171 size: 0,
172 capacity,
173 }
174 }
175
176 fn push(&mut self, span: SpanData) -> bool {
177 if self.size == self.capacity {
178 return false;
179 }
180
181 self.buffer[self.head] = Some(span);
182 self.head = (self.head + 1) % self.capacity;
183 self.size += 1;
184 true
185 }
186
187 fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
188 let batch_size = self.size.min(max_batch_size);
189 let mut result = Vec::with_capacity(batch_size);
190
191 for _ in 0..batch_size {
192 if let Some(span) = self.buffer[self.tail].take() {
193 result.push(span);
194 }
195 self.tail = (self.tail + 1) % self.capacity;
196 self.size -= 1;
197 }
198
199 if self.size == 0 {
200 self.head = 0;
201 self.tail = 0;
202 }
203
204 result
205 }
206
207 fn is_empty(&self) -> bool {
208 self.size == 0
209 }
210}
211
212#[derive(Debug)]
243pub struct LambdaSpanProcessor<E>
244where
245 E: SpanExporter + std::fmt::Debug,
246{
247 exporter: Mutex<E>,
249
250 spans: Mutex<SpanRingBuffer>,
252
253 is_shutdown: Arc<AtomicBool>,
255
256 dropped_count: AtomicUsize,
258
259 max_batch_size: usize,
261}
262
263#[bon]
264impl<E> LambdaSpanProcessor<E>
265where
266 E: SpanExporter + std::fmt::Debug,
267{
268 #[builder]
281 pub fn new(exporter: E, max_batch_size: Option<usize>, max_queue_size: Option<usize>) -> Self {
282 let max_batch_size = match env::var(env_vars::BATCH_SIZE) {
284 Ok(value) => match value.parse::<usize>() {
285 Ok(size) => size,
286 Err(_) => {
287 LOGGER.warn(format!(
288 "Failed to parse {}: {}, using fallback",
289 env_vars::BATCH_SIZE,
290 value
291 ));
292 max_batch_size.unwrap_or(defaults::BATCH_SIZE)
293 }
294 },
295 Err(_) => max_batch_size.unwrap_or(defaults::BATCH_SIZE),
296 };
297
298 let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
300 Ok(value) => match value.parse::<usize>() {
301 Ok(size) => size,
302 Err(_) => {
303 LOGGER.warn(format!(
304 "Failed to parse {}: {}, using fallback",
305 env_vars::QUEUE_SIZE,
306 value
307 ));
308 max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
309 }
310 },
311 Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
312 };
313
314 Self {
315 exporter: Mutex::new(exporter),
316 spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
317 is_shutdown: Arc::new(AtomicBool::new(false)),
318 dropped_count: AtomicUsize::new(0),
319 max_batch_size,
320 }
321 }
322}
323
324impl<E> SpanProcessor for LambdaSpanProcessor<E>
325where
326 E: SpanExporter + std::fmt::Debug,
327{
328 fn on_start(&self, _span: &mut Span, _cx: &Context) {
329 }
331
332 fn on_end(&self, span: SpanData) {
333 if self.is_shutdown.load(Ordering::Relaxed) {
334 LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
335 self.dropped_count.fetch_add(1, Ordering::Relaxed);
336 return;
337 }
338
339 if !span.span_context.is_sampled() {
341 return;
342 }
343
344 if let Ok(mut spans) = self.spans.lock() {
346 if !spans.push(span) {
347 let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
348 if prev == 0 || prev % 100 == 0 {
349 LOGGER.warn(format!(
350 "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
351 prev + 1
352 ));
353 }
354 }
355 } else {
356 LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
357 }
358 }
359
360 fn force_flush(&self) -> OTelSdkResult {
361 LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
362 if let Ok(mut spans) = self.spans.lock() {
363 if spans.is_empty() {
364 return Ok(());
365 }
366
367 let exporter = self.exporter.lock().map_err(|_| {
368 OTelSdkError::InternalFailure(
369 "Failed to acquire exporter lock in force_flush".to_string(),
370 )
371 })?;
372
373 while !spans.is_empty() {
375 let batch = spans.take_batch(self.max_batch_size);
376 if !batch.is_empty() {
377 let result = futures_executor::block_on(exporter.export(batch));
378 if let Err(err) = &result {
379 LOGGER.debug(format!("LambdaSpanProcessor.force_flush.Error: {:?}", err));
380 return result;
381 }
382 }
383 }
384 Ok(())
385 } else {
386 Err(OTelSdkError::InternalFailure(
387 "Failed to acquire spans lock in force_flush".to_string(),
388 ))
389 }
390 }
391
392 fn shutdown(&self) -> OTelSdkResult {
393 self.is_shutdown.store(true, Ordering::Relaxed);
394 self.force_flush()?;
396 if let Ok(mut exporter) = self.exporter.lock() {
397 exporter.shutdown()
398 } else {
399 Err(OTelSdkError::InternalFailure(
400 "Failed to acquire exporter lock in shutdown".to_string(),
401 ))
402 }
403 }
404
405 fn set_resource(&mut self, resource: &Resource) {
406 if let Ok(mut exporter) = self.exporter.lock() {
407 exporter.set_resource(resource);
408 }
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::logger::Logger;
416 use opentelemetry::{
417 trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
418 InstrumentationScope,
419 };
420 use opentelemetry_sdk::{
421 trace::SpanExporter,
422 trace::{SpanEvents, SpanLinks},
423 };
424 use serial_test::serial;
425 use std::{borrow::Cow, sync::Arc};
426 use tokio::sync::Mutex;
427
428 fn setup_test_logger() -> Logger {
429 Logger::new("test")
430 }
431
432 #[derive(Debug)]
434 struct MockExporter {
435 spans: Arc<Mutex<Vec<SpanData>>>,
436 }
437
438 impl MockExporter {
439 fn new() -> Self {
440 Self {
441 spans: Arc::new(Mutex::new(Vec::new())),
442 }
443 }
444 }
445
446 impl SpanExporter for MockExporter {
447 fn export(
448 &self,
449 batch: Vec<SpanData>,
450 ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
451 {
452 let spans = self.spans.clone();
453 Box::pin(async move {
454 let mut spans = spans.lock().await;
455 spans.extend(batch);
456 Ok(())
457 })
458 }
459
460 fn shutdown(&mut self) -> OTelSdkResult {
461 Ok(())
462 }
463 }
464
465 fn create_test_span(name: &str) -> SpanData {
467 let flags = TraceFlags::default().with_sampled(true);
468
469 SpanData {
470 span_context: SpanContext::new(
471 TraceId::from_hex("01000000000000000000000000000000").unwrap(),
472 SpanId::from_hex("0100000000000001").unwrap(),
473 flags,
474 false,
475 TraceState::default(),
476 ),
477 parent_span_id: SpanId::INVALID,
478 span_kind: opentelemetry::trace::SpanKind::Internal,
479 name: Cow::Owned(name.to_string()),
480 start_time: std::time::SystemTime::now(),
481 end_time: std::time::SystemTime::now(),
482 attributes: Vec::new(),
483 dropped_attributes_count: 0,
484 events: SpanEvents::default(),
485 links: SpanLinks::default(),
486 status: opentelemetry::trace::Status::default(),
487 instrumentation_scope: InstrumentationScope::builder("test").build(),
488 }
489 }
490
491 fn cleanup_env() {
492 env::remove_var(env_vars::BATCH_SIZE);
493 env::remove_var(env_vars::QUEUE_SIZE);
494 env::remove_var(env_vars::PROCESSOR_MODE);
495 env::remove_var(env_vars::COMPRESSION_LEVEL);
496 env::remove_var(env_vars::SERVICE_NAME);
497 }
498
499 #[test]
500 #[serial]
501 fn test_ring_buffer_basic_operations() {
502 let mut buffer = SpanRingBuffer::new(2);
503
504 assert!(buffer.is_empty());
506 assert_eq!(buffer.take_batch(2), vec![]);
507
508 buffer.push(create_test_span("span1"));
510 buffer.push(create_test_span("span2"));
511
512 assert!(!buffer.is_empty());
513
514 let spans = buffer.take_batch(2);
516 assert_eq!(spans.len(), 2);
517 assert!(buffer.is_empty());
518 }
519
520 #[test]
521 #[serial]
522 fn test_ring_buffer_overflow() {
523 let mut buffer = SpanRingBuffer::new(2);
524
525 buffer.push(create_test_span("span1"));
527 buffer.push(create_test_span("span2"));
528
529 let success = buffer.push(create_test_span("span3"));
531 assert!(!success); let spans = buffer.take_batch(2);
534 assert_eq!(spans.len(), 2);
535 assert!(spans.iter().any(|s| s.name == "span1"));
536 assert!(spans.iter().any(|s| s.name == "span2"));
537 }
538
539 #[test]
540 #[serial]
541 fn test_ring_buffer_batch_operations() {
542 let mut buffer = SpanRingBuffer::new(5);
543
544 for i in 0..5 {
546 buffer.push(create_test_span(&format!("span{}", i)));
547 }
548
549 assert_eq!(buffer.take_batch(2).len(), 2);
550 assert_eq!(buffer.take_batch(2).len(), 2);
551 assert_eq!(buffer.take_batch(2).len(), 1);
552 assert!(buffer.is_empty());
553 }
554
555 #[tokio::test]
556 #[serial]
557 async fn test_processor_sync_mode() {
558 let _logger = setup_test_logger();
559 let mock_exporter = MockExporter::new();
560 let spans_exported = mock_exporter.spans.clone();
561
562 let processor = LambdaSpanProcessor::builder()
563 .exporter(mock_exporter)
564 .max_queue_size(10)
565 .max_batch_size(5)
566 .build();
567
568 processor.on_end(create_test_span("test_span"));
570
571 processor.force_flush().unwrap();
573
574 let exported = spans_exported.lock().await;
576 assert_eq!(exported.len(), 1);
577 assert_eq!(exported[0].name, "test_span");
578 }
579
580 #[tokio::test]
581 #[serial]
582 async fn test_shutdown_exports_remaining_spans() {
583 let _logger = setup_test_logger();
584 let mock_exporter = MockExporter::new();
585 let spans_exported = mock_exporter.spans.clone();
586
587 let processor = LambdaSpanProcessor::builder()
588 .exporter(mock_exporter)
589 .max_queue_size(10)
590 .max_batch_size(5)
591 .build();
592
593 processor.on_end(create_test_span("span1"));
595 processor.on_end(create_test_span("span2"));
596
597 processor.shutdown().unwrap();
599
600 let exported = spans_exported.lock().await;
602 assert_eq!(exported.len(), 2);
603
604 processor.on_end(create_test_span("span3"));
606 assert_eq!(exported.len(), 2); }
608
609 #[tokio::test]
610 #[serial]
611 async fn test_concurrent_span_processing() {
612 let _logger = setup_test_logger();
613 let mock_exporter = MockExporter::new();
614 let spans_exported = mock_exporter.spans.clone();
615
616 let processor = Arc::new(
617 LambdaSpanProcessor::builder()
618 .exporter(mock_exporter)
619 .max_queue_size(100)
620 .max_batch_size(25)
621 .build(),
622 );
623
624 let mut handles = Vec::new();
625
626 for i in 0..10 {
628 let processor = processor.clone();
629 handles.push(tokio::spawn(async move {
630 for j in 0..10 {
631 processor.on_end(create_test_span(&format!("span_{}_{}", i, j)));
632 }
633 }));
634 }
635
636 for handle in handles {
638 handle.await.unwrap();
639 }
640
641 processor.force_flush().unwrap();
643
644 let exported = spans_exported.lock().await;
645 assert_eq!(exported.len(), 100);
646 assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
647 }
648
649 #[test]
650 #[serial]
651 fn test_batch_processing() {
652 let _logger = setup_test_logger();
653 let mock_exporter = MockExporter::new();
654 let processor = LambdaSpanProcessor::builder()
655 .exporter(mock_exporter)
656 .max_queue_size(10)
657 .max_batch_size(3)
658 .build();
659
660 for i in 0..5 {
662 processor.on_end(create_test_span(&format!("span{}", i)));
663 }
664
665 processor.force_flush().unwrap();
667
668 processor.on_end(create_test_span("span5"));
670 processor.on_end(create_test_span("span6"));
671
672 processor.force_flush().unwrap();
674 }
675
676 #[test]
677 #[serial]
678 fn test_builder_default_values() {
679 cleanup_env();
680
681 let mock_exporter = MockExporter::new();
682
683 let processor = LambdaSpanProcessor::builder()
684 .exporter(mock_exporter)
685 .build();
686
687 assert_eq!(processor.max_batch_size, 512); assert_eq!(processor.spans.lock().unwrap().capacity, 2048); }
691
692 #[test]
693 #[serial]
694 fn test_builder_env_var_values() {
695 cleanup_env();
696
697 let mock_exporter = MockExporter::new();
698
699 env::set_var(env_vars::BATCH_SIZE, "100");
701 env::set_var(env_vars::QUEUE_SIZE, "1000");
702
703 let processor = LambdaSpanProcessor::builder()
704 .exporter(mock_exporter)
705 .build();
706
707 assert_eq!(processor.max_batch_size, 100);
709 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
710
711 cleanup_env();
712 }
713
714 #[test]
715 #[serial]
716 fn test_builder_env_var_precedence() {
717 cleanup_env();
718
719 let mock_exporter = MockExporter::new();
720
721 env::set_var(env_vars::BATCH_SIZE, "100");
723 env::set_var(env_vars::QUEUE_SIZE, "1000");
724
725 let processor = LambdaSpanProcessor::builder()
727 .exporter(mock_exporter)
728 .max_batch_size(50)
729 .max_queue_size(500)
730 .build();
731
732 assert_eq!(processor.max_batch_size, 100);
734 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
735
736 cleanup_env();
737 }
738
739 #[test]
740 #[serial]
741 fn test_invalid_env_vars() {
742 cleanup_env();
743
744 let mock_exporter = MockExporter::new();
745
746 env::set_var(env_vars::BATCH_SIZE, "not_a_number");
748 env::set_var(env_vars::QUEUE_SIZE, "invalid");
749
750 let processor = LambdaSpanProcessor::builder()
752 .exporter(mock_exporter)
753 .max_batch_size(50)
754 .max_queue_size(500)
755 .build();
756
757 assert_eq!(processor.max_batch_size, 50);
759 assert_eq!(processor.spans.lock().unwrap().capacity, 500);
760
761 cleanup_env();
762 }
763}