1use crate::logger::Logger;
107use bon::bon;
108
109static LOGGER: Logger = Logger::const_new("processor");
111
112use opentelemetry::Context;
113use opentelemetry_sdk::{
114 error::{OTelSdkError, OTelSdkResult},
115 trace::{Span, SpanProcessor},
116 trace::{SpanData, SpanExporter},
117 Resource,
118};
119use std::env;
120use std::sync::{
121 atomic::{AtomicBool, AtomicUsize, Ordering},
122 Arc, Mutex,
123};
124
125#[derive(Debug)]
148struct SpanRingBuffer {
149 buffer: Vec<Option<SpanData>>,
150 head: usize, tail: usize, size: usize, capacity: usize,
154}
155
156impl Default for SpanRingBuffer {
157 fn default() -> Self {
158 Self::new(2048) }
160}
161
162impl SpanRingBuffer {
163 fn new(capacity: usize) -> Self {
164 let mut buffer = Vec::with_capacity(capacity);
165 buffer.extend((0..capacity).map(|_| None));
166 Self {
167 buffer,
168 head: 0,
169 tail: 0,
170 size: 0,
171 capacity,
172 }
173 }
174
175 fn push(&mut self, span: SpanData) -> bool {
176 if self.size == self.capacity {
177 return false;
178 }
179
180 self.buffer[self.head] = Some(span);
181 self.head = (self.head + 1) % self.capacity;
182 self.size += 1;
183 true
184 }
185
186 fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
187 let batch_size = self.size.min(max_batch_size);
188 let mut result = Vec::with_capacity(batch_size);
189
190 for _ in 0..batch_size {
191 if let Some(span) = self.buffer[self.tail].take() {
192 result.push(span);
193 }
194 self.tail = (self.tail + 1) % self.capacity;
195 self.size -= 1;
196 }
197
198 if self.size == 0 {
199 self.head = 0;
200 self.tail = 0;
201 }
202
203 result
204 }
205
206 fn is_empty(&self) -> bool {
207 self.size == 0
208 }
209}
210
211#[derive(Debug)]
242pub struct LambdaSpanProcessor<E>
243where
244 E: SpanExporter + std::fmt::Debug,
245{
246 exporter: Mutex<E>,
248
249 spans: Mutex<SpanRingBuffer>,
251
252 is_shutdown: Arc<AtomicBool>,
254
255 dropped_count: AtomicUsize,
257
258 max_batch_size: usize,
260}
261
262#[bon]
263impl<E> LambdaSpanProcessor<E>
264where
265 E: SpanExporter + std::fmt::Debug,
266{
267 fn default_max_batch_size() -> usize {
269 env::var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE")
270 .ok()
271 .and_then(|s| s.parse().ok())
272 .unwrap_or(512)
273 }
274
275 fn default_max_queue_size() -> usize {
277 env::var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE")
278 .ok()
279 .and_then(|s| s.parse().ok())
280 .unwrap_or(2048)
281 }
282
283 #[builder]
285 pub fn new(exporter: E, max_batch_size: Option<usize>, max_queue_size: Option<usize>) -> Self {
286 let max_batch_size = max_batch_size.unwrap_or_else(Self::default_max_batch_size);
287 let max_queue_size = max_queue_size.unwrap_or_else(Self::default_max_queue_size);
288
289 Self {
290 exporter: Mutex::new(exporter),
291 spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
292 is_shutdown: Arc::new(AtomicBool::new(false)),
293 dropped_count: AtomicUsize::new(0),
294 max_batch_size,
295 }
296 }
297}
298
299impl<E> SpanProcessor for LambdaSpanProcessor<E>
300where
301 E: SpanExporter + std::fmt::Debug,
302{
303 fn on_start(&self, _span: &mut Span, _cx: &Context) {
304 }
306
307 fn on_end(&self, span: SpanData) {
308 if self.is_shutdown.load(Ordering::Relaxed) {
309 LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
310 self.dropped_count.fetch_add(1, Ordering::Relaxed);
311 return;
312 }
313
314 if !span.span_context.is_sampled() {
316 return;
317 }
318
319 if let Ok(mut spans) = self.spans.lock() {
321 if !spans.push(span) {
322 let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
323 if prev == 0 || prev % 100 == 0 {
324 LOGGER.warn(format!(
325 "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
326 prev + 1
327 ));
328 }
329 }
330 } else {
331 LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
332 }
333 }
334
335 fn force_flush(&self) -> OTelSdkResult {
336 LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
337 if let Ok(mut spans) = self.spans.lock() {
338 if spans.is_empty() {
339 return Ok(());
340 }
341
342 let mut exporter = self.exporter.lock().map_err(|_| {
343 OTelSdkError::InternalFailure(
344 "Failed to acquire exporter lock in force_flush".to_string(),
345 )
346 })?;
347
348 while !spans.is_empty() {
350 let batch = spans.take_batch(self.max_batch_size);
351 if !batch.is_empty() {
352 let result = futures_executor::block_on(exporter.export(batch));
353 if let Err(err) = &result {
354 LOGGER.debug(format!("LambdaSpanProcessor.force_flush.Error: {:?}", err));
355 return result;
356 }
357 }
358 }
359 Ok(())
360 } else {
361 Err(OTelSdkError::InternalFailure(
362 "Failed to acquire spans lock in force_flush".to_string(),
363 ))
364 }
365 }
366
367 fn shutdown(&self) -> OTelSdkResult {
368 self.is_shutdown.store(true, Ordering::Relaxed);
369 self.force_flush()?;
371 if let Ok(mut exporter) = self.exporter.lock() {
372 exporter.shutdown()
373 } else {
374 Err(OTelSdkError::InternalFailure(
375 "Failed to acquire exporter lock in shutdown".to_string(),
376 ))
377 }
378 }
379
380 fn set_resource(&mut self, resource: &Resource) {
381 if let Ok(mut exporter) = self.exporter.lock() {
382 exporter.set_resource(resource);
383 }
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::logger::Logger;
391 use opentelemetry::{
392 trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
393 InstrumentationScope,
394 };
395 use opentelemetry_sdk::{
396 trace::SpanExporter,
397 trace::{SpanEvents, SpanLinks},
398 };
399 use serial_test::serial;
400 use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc};
401 use tokio::sync::Mutex;
402
403 fn setup_test_logger() -> Logger {
404 Logger::new("test")
405 }
406
407 #[derive(Debug)]
409 struct MockExporter {
410 spans: Arc<Mutex<Vec<SpanData>>>,
411 }
412
413 impl MockExporter {
414 fn new() -> Self {
415 Self {
416 spans: Arc::new(Mutex::new(Vec::new())),
417 }
418 }
419 }
420
421 impl SpanExporter for MockExporter {
422 fn export(
423 &mut self,
424 batch: Vec<SpanData>,
425 ) -> Pin<Box<dyn Future<Output = OTelSdkResult> + Send>> {
426 let spans = self.spans.clone();
427 Box::pin(async move {
428 let mut spans = spans.lock().await;
429 spans.extend(batch);
430 Ok(())
431 })
432 }
433
434 fn shutdown(&mut self) -> OTelSdkResult {
435 Ok(())
436 }
437 }
438
439 fn create_test_span(name: &str) -> SpanData {
441 let flags = TraceFlags::default().with_sampled(true);
442
443 SpanData {
444 span_context: SpanContext::new(
445 TraceId::from_hex("01000000000000000000000000000000").unwrap(),
446 SpanId::from_hex("0100000000000001").unwrap(),
447 flags,
448 false,
449 TraceState::default(),
450 ),
451 parent_span_id: SpanId::INVALID,
452 span_kind: opentelemetry::trace::SpanKind::Internal,
453 name: Cow::Owned(name.to_string()),
454 start_time: std::time::SystemTime::now(),
455 end_time: std::time::SystemTime::now(),
456 attributes: Vec::new(),
457 dropped_attributes_count: 0,
458 events: SpanEvents::default(),
459 links: SpanLinks::default(),
460 status: opentelemetry::trace::Status::default(),
461 instrumentation_scope: InstrumentationScope::builder("test").build(),
462 }
463 }
464
465 #[test]
466 #[serial]
467 fn test_ring_buffer_basic_operations() {
468 let mut buffer = SpanRingBuffer::new(2);
469
470 assert!(buffer.is_empty());
472 assert_eq!(buffer.take_batch(2), vec![]);
473
474 buffer.push(create_test_span("span1"));
476 buffer.push(create_test_span("span2"));
477
478 assert!(!buffer.is_empty());
479
480 let spans = buffer.take_batch(2);
482 assert_eq!(spans.len(), 2);
483 assert!(buffer.is_empty());
484 }
485
486 #[test]
487 #[serial]
488 fn test_ring_buffer_overflow() {
489 let mut buffer = SpanRingBuffer::new(2);
490
491 buffer.push(create_test_span("span1"));
493 buffer.push(create_test_span("span2"));
494
495 let success = buffer.push(create_test_span("span3"));
497 assert!(!success); let spans = buffer.take_batch(2);
500 assert_eq!(spans.len(), 2);
501 assert!(spans.iter().any(|s| s.name == "span1"));
502 assert!(spans.iter().any(|s| s.name == "span2"));
503 }
504
505 #[test]
506 #[serial]
507 fn test_ring_buffer_batch_operations() {
508 let mut buffer = SpanRingBuffer::new(5);
509
510 for i in 0..5 {
512 buffer.push(create_test_span(&format!("span{}", i)));
513 }
514
515 assert_eq!(buffer.take_batch(2).len(), 2);
516 assert_eq!(buffer.take_batch(2).len(), 2);
517 assert_eq!(buffer.take_batch(2).len(), 1);
518 assert!(buffer.is_empty());
519 }
520
521 #[tokio::test]
522 #[serial]
523 async fn test_processor_sync_mode() {
524 let _logger = setup_test_logger();
525 let mock_exporter = MockExporter::new();
526 let spans_exported = mock_exporter.spans.clone();
527
528 let processor = LambdaSpanProcessor::builder()
529 .exporter(mock_exporter)
530 .max_queue_size(10)
531 .max_batch_size(5)
532 .build();
533
534 processor.on_end(create_test_span("test_span"));
536
537 processor.force_flush().unwrap();
539
540 let exported = spans_exported.lock().await;
542 assert_eq!(exported.len(), 1);
543 assert_eq!(exported[0].name, "test_span");
544 }
545
546 #[tokio::test]
547 #[serial]
548 async fn test_shutdown_exports_remaining_spans() {
549 let _logger = setup_test_logger();
550 let mock_exporter = MockExporter::new();
551 let spans_exported = mock_exporter.spans.clone();
552
553 let processor = LambdaSpanProcessor::builder()
554 .exporter(mock_exporter)
555 .max_queue_size(10)
556 .max_batch_size(5)
557 .build();
558
559 processor.on_end(create_test_span("span1"));
561 processor.on_end(create_test_span("span2"));
562
563 processor.shutdown().unwrap();
565
566 let exported = spans_exported.lock().await;
568 assert_eq!(exported.len(), 2);
569
570 processor.on_end(create_test_span("span3"));
572 assert_eq!(exported.len(), 2); }
574
575 #[tokio::test]
576 #[serial]
577 async fn test_concurrent_span_processing() {
578 let _logger = setup_test_logger();
579 let mock_exporter = MockExporter::new();
580 let spans_exported = mock_exporter.spans.clone();
581
582 let processor = Arc::new(
583 LambdaSpanProcessor::builder()
584 .exporter(mock_exporter)
585 .max_queue_size(100)
586 .max_batch_size(25)
587 .build(),
588 );
589
590 let mut handles = Vec::new();
591
592 for i in 0..10 {
594 let processor = processor.clone();
595 handles.push(tokio::spawn(async move {
596 for j in 0..10 {
597 processor.on_end(create_test_span(&format!("span_{}_{}", i, j)));
598 }
599 }));
600 }
601
602 for handle in handles {
604 handle.await.unwrap();
605 }
606
607 processor.force_flush().unwrap();
609
610 let exported = spans_exported.lock().await;
611 assert_eq!(exported.len(), 100);
612 assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
613 }
614
615 #[test]
616 #[serial]
617 fn test_batch_processing() {
618 let _logger = setup_test_logger();
619 let mock_exporter = MockExporter::new();
620 let processor = LambdaSpanProcessor::builder()
621 .exporter(mock_exporter)
622 .max_queue_size(10)
623 .max_batch_size(3)
624 .build();
625
626 for i in 0..5 {
628 processor.on_end(create_test_span(&format!("span{}", i)));
629 }
630
631 processor.force_flush().unwrap();
633
634 processor.on_end(create_test_span("span5"));
636 processor.on_end(create_test_span("span6"));
637
638 processor.force_flush().unwrap();
640 }
641
642 #[test]
643 #[serial]
644 fn test_builder_default_values() {
645 let mock_exporter = MockExporter::new();
646
647 env::remove_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE");
649 env::remove_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE");
650
651 let processor = LambdaSpanProcessor::builder()
652 .exporter(mock_exporter)
653 .build();
654
655 assert_eq!(processor.max_batch_size, 512); assert_eq!(processor.spans.lock().unwrap().capacity, 2048); }
659
660 #[test]
661 #[serial]
662 fn test_builder_env_var_values() {
663 let mock_exporter = MockExporter::new();
664
665 env::set_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE", "100");
667 env::set_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE", "1000");
668
669 let processor = LambdaSpanProcessor::builder()
670 .exporter(mock_exporter)
671 .build();
672
673 assert_eq!(processor.max_batch_size, 100);
675 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
676
677 env::remove_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE");
679 env::remove_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE");
680 }
681
682 #[test]
683 #[serial]
684 fn test_builder_explicit_values_override_env() {
685 let mock_exporter = MockExporter::new();
686
687 env::set_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE", "100");
689 env::set_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE", "1000");
690
691 let processor = LambdaSpanProcessor::builder()
692 .exporter(mock_exporter)
693 .max_batch_size(200)
694 .max_queue_size(2000)
695 .build();
696
697 assert_eq!(processor.max_batch_size, 200);
699 assert_eq!(processor.spans.lock().unwrap().capacity, 2000);
700
701 env::remove_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE");
703 env::remove_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE");
704 }
705
706 #[test]
707 #[serial]
708 fn test_builder_invalid_env_vars() {
709 let mock_exporter = MockExporter::new();
710
711 env::set_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE", "not_a_number");
713 env::set_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE", "also_not_a_number");
714
715 let processor = LambdaSpanProcessor::builder()
716 .exporter(mock_exporter)
717 .build();
718
719 assert_eq!(processor.max_batch_size, 512);
721 assert_eq!(processor.spans.lock().unwrap().capacity, 2048);
722
723 env::remove_var("LAMBDA_SPAN_PROCESSOR_BATCH_SIZE");
725 env::remove_var("LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE");
726 }
727}