1use crate::constants::{defaults, env_vars};
102use crate::logger::Logger;
103use bon::bon;
104
105static LOGGER: Logger = Logger::const_new("processor");
107
108use opentelemetry::Context;
109use opentelemetry_sdk::{
110 error::{OTelSdkError, OTelSdkResult},
111 trace::{Span, SpanProcessor},
112 trace::{SpanData, SpanExporter},
113 Resource,
114};
115use std::env;
116use std::sync::{
117 atomic::{AtomicBool, AtomicUsize, Ordering},
118 Arc, Mutex,
119};
120
121#[derive(Debug)]
144struct SpanRingBuffer {
145 buffer: Vec<Option<SpanData>>,
146 head: usize, tail: usize, size: usize, capacity: usize,
150}
151
152impl Default for SpanRingBuffer {
153 fn default() -> Self {
154 Self::new(2048) }
156}
157
158impl SpanRingBuffer {
159 fn new(capacity: usize) -> Self {
160 let mut buffer = Vec::with_capacity(capacity);
161 buffer.extend((0..capacity).map(|_| None));
162 Self {
163 buffer,
164 head: 0,
165 tail: 0,
166 size: 0,
167 capacity,
168 }
169 }
170
171 fn push(&mut self, span: SpanData) -> bool {
172 if self.size == self.capacity {
173 return false;
174 }
175
176 self.buffer[self.head] = Some(span);
177 self.head = (self.head + 1) % self.capacity;
178 self.size += 1;
179 true
180 }
181
182 fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
183 let batch_size = self.size.min(max_batch_size);
184 let mut result = Vec::with_capacity(batch_size);
185
186 for _ in 0..batch_size {
187 if let Some(span) = self.buffer[self.tail].take() {
188 result.push(span);
189 }
190 self.tail = (self.tail + 1) % self.capacity;
191 self.size -= 1;
192 }
193
194 if self.size == 0 {
195 self.head = 0;
196 self.tail = 0;
197 }
198
199 result
200 }
201}
202
203#[derive(Debug)]
234pub struct LambdaSpanProcessor<E>
235where
236 E: SpanExporter + std::fmt::Debug,
237{
238 exporter: Mutex<E>,
240
241 spans: Mutex<SpanRingBuffer>,
243
244 is_shutdown: Arc<AtomicBool>,
246
247 dropped_count: AtomicUsize,
249}
250
251#[bon]
252impl<E> LambdaSpanProcessor<E>
253where
254 E: SpanExporter + std::fmt::Debug,
255{
256 #[builder]
268 pub fn new(exporter: E, max_queue_size: Option<usize>) -> Self {
269 let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
271 Ok(value) => match value.parse::<usize>() {
272 Ok(size) => size,
273 Err(_) => {
274 LOGGER.warn(format!(
275 "Failed to parse {}: {}, using fallback",
276 env_vars::QUEUE_SIZE,
277 value
278 ));
279 max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
280 }
281 },
282 Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
283 };
284
285 Self {
286 exporter: Mutex::new(exporter),
287 spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
288 is_shutdown: Arc::new(AtomicBool::new(false)),
289 dropped_count: AtomicUsize::new(0),
290 }
291 }
292}
293
294impl<E> SpanProcessor for LambdaSpanProcessor<E>
295where
296 E: SpanExporter + std::fmt::Debug,
297{
298 fn on_start(&self, _span: &mut Span, _cx: &Context) {
299 }
301
302 fn on_end(&self, span: SpanData) {
303 if self.is_shutdown.load(Ordering::Relaxed) {
304 LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
305 self.dropped_count.fetch_add(1, Ordering::Relaxed);
306 return;
307 }
308
309 if !span.span_context.is_sampled() {
311 return;
312 }
313
314 if let Ok(mut spans) = self.spans.lock() {
316 if !spans.push(span) {
317 let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
318 if prev == 0 || prev % 100 == 0 {
319 LOGGER.warn(format!(
320 "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
321 prev + 1
322 ));
323 }
324 }
325 } else {
326 LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
327 }
328 }
329
330 fn force_flush(&self) -> OTelSdkResult {
331 LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
332
333 let spans_result = self.spans.lock();
335 let all_spans = match spans_result {
336 Ok(mut spans) => {
337 let current_size = spans.size;
339 spans.take_batch(current_size)
341 }
342 Err(_) => {
343 return Err(OTelSdkError::InternalFailure(
345 "Failed to acquire spans lock in force_flush".to_string(),
346 ));
347 }
348 };
349 let exporter_result = self.exporter.lock();
353 match exporter_result {
354 Ok(exporter) => {
355 let result = futures_executor::block_on(exporter.export(all_spans));
358
359 if let Err(ref err) = result {
361 LOGGER.debug(format!(
362 "LambdaSpanProcessor.force_flush export error: {:?}",
363 err
364 ));
365 }
366
367 result
369 }
370 Err(_) => {
371 Err(OTelSdkError::InternalFailure(
373 "Failed to acquire exporter lock in force_flush".to_string(),
374 ))
375 }
376 }
377 }
379
380 fn shutdown(&self) -> OTelSdkResult {
381 self.is_shutdown.store(true, Ordering::Relaxed);
382 self.force_flush()?;
384 if let Ok(mut exporter) = self.exporter.lock() {
385 exporter.shutdown()
386 } else {
387 Err(OTelSdkError::InternalFailure(
388 "Failed to acquire exporter lock in shutdown".to_string(),
389 ))
390 }
391 }
392
393 fn set_resource(&mut self, resource: &Resource) {
394 if let Ok(mut exporter) = self.exporter.lock() {
395 exporter.set_resource(resource);
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use crate::logger::Logger;
404 use opentelemetry::{
405 trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
406 InstrumentationScope,
407 };
408 use opentelemetry_sdk::{
409 trace::SpanExporter,
410 trace::{SpanEvents, SpanLinks},
411 };
412 use serial_test::serial;
413 use std::{borrow::Cow, sync::Arc};
414 use tokio::sync::Mutex;
415
416 fn setup_test_logger() -> Logger {
417 Logger::new("test")
418 }
419
420 #[derive(Debug)]
422 struct MockExporter {
423 spans: Arc<Mutex<Vec<SpanData>>>,
424 }
425
426 impl MockExporter {
427 fn new() -> Self {
428 Self {
429 spans: Arc::new(Mutex::new(Vec::new())),
430 }
431 }
432 }
433
434 impl SpanExporter for MockExporter {
435 fn export(
436 &self,
437 batch: Vec<SpanData>,
438 ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
439 {
440 let spans = self.spans.clone();
441 Box::pin(async move {
442 let mut spans = spans.lock().await;
443 spans.extend(batch);
444 Ok(())
445 })
446 }
447
448 fn shutdown(&mut self) -> OTelSdkResult {
449 Ok(())
450 }
451 }
452
453 fn create_test_span(name: &str) -> SpanData {
455 let flags = TraceFlags::default().with_sampled(true);
456
457 SpanData {
458 span_context: SpanContext::new(
459 TraceId::from_hex("01000000000000000000000000000000").unwrap(),
460 SpanId::from_hex("0100000000000001").unwrap(),
461 flags,
462 false,
463 TraceState::default(),
464 ),
465 parent_span_id: SpanId::INVALID,
466 span_kind: opentelemetry::trace::SpanKind::Internal,
467 name: Cow::Owned(name.to_string()),
468 start_time: std::time::SystemTime::now(),
469 end_time: std::time::SystemTime::now(),
470 attributes: Vec::new(),
471 dropped_attributes_count: 0,
472 events: SpanEvents::default(),
473 links: SpanLinks::default(),
474 status: opentelemetry::trace::Status::default(),
475 instrumentation_scope: InstrumentationScope::builder("test").build(),
476 }
477 }
478
479 fn cleanup_env() {
480 env::remove_var(env_vars::QUEUE_SIZE);
481 env::remove_var(env_vars::PROCESSOR_MODE);
482 env::remove_var(env_vars::COMPRESSION_LEVEL);
483 env::remove_var(env_vars::SERVICE_NAME);
484 }
485
486 #[test]
487 #[serial]
488 fn test_ring_buffer_basic_operations() {
489 let mut buffer = SpanRingBuffer::new(2);
490
491 assert!(buffer.size == 0);
493 assert_eq!(buffer.take_batch(2), vec![]);
494
495 buffer.push(create_test_span("span1"));
497 buffer.push(create_test_span("span2"));
498
499 assert!(buffer.size != 0);
500
501 let spans = buffer.take_batch(2);
503 assert_eq!(spans.len(), 2);
504 assert!(buffer.size == 0);
505 }
506
507 #[test]
508 #[serial]
509 fn test_ring_buffer_overflow() {
510 let mut buffer = SpanRingBuffer::new(2);
511
512 buffer.push(create_test_span("span1"));
514 buffer.push(create_test_span("span2"));
515
516 let success = buffer.push(create_test_span("span3"));
518 assert!(!success); let spans = buffer.take_batch(2);
521 assert_eq!(spans.len(), 2);
522 assert!(spans.iter().any(|s| s.name == "span1"));
523 assert!(spans.iter().any(|s| s.name == "span2"));
524 }
525
526 #[test]
527 #[serial]
528 fn test_ring_buffer_batch_operations() {
529 let mut buffer = SpanRingBuffer::new(5);
530
531 for i in 0..5 {
533 buffer.push(create_test_span(&format!("span{}", i)));
534 }
535
536 assert_eq!(buffer.take_batch(2).len(), 2);
537 assert_eq!(buffer.take_batch(2).len(), 2);
538 assert_eq!(buffer.take_batch(2).len(), 1);
539 assert!(buffer.size == 0);
540 }
541
542 #[tokio::test]
543 #[serial]
544 async fn test_processor_sync_mode() {
545 let _logger = setup_test_logger();
546 let mock_exporter = MockExporter::new();
547 let spans_exported = mock_exporter.spans.clone();
548
549 let processor = LambdaSpanProcessor::builder()
550 .exporter(mock_exporter)
551 .max_queue_size(10)
552 .build();
553
554 processor.on_end(create_test_span("test_span"));
556
557 processor.force_flush().unwrap();
559
560 let exported = spans_exported.lock().await;
562 assert_eq!(exported.len(), 1);
563 assert_eq!(exported[0].name, "test_span");
564 }
565
566 #[tokio::test]
567 #[serial]
568 async fn test_shutdown_exports_remaining_spans() {
569 let _logger = setup_test_logger();
570 let mock_exporter = MockExporter::new();
571 let spans_exported = mock_exporter.spans.clone();
572
573 let processor = LambdaSpanProcessor::builder()
574 .exporter(mock_exporter)
575 .max_queue_size(10)
576 .build();
577
578 processor.on_end(create_test_span("span1"));
580 processor.on_end(create_test_span("span2"));
581
582 processor.shutdown().unwrap();
584
585 let exported = spans_exported.lock().await;
587 assert_eq!(exported.len(), 2);
588
589 processor.on_end(create_test_span("span3"));
591 assert_eq!(exported.len(), 2); }
593
594 #[tokio::test]
595 #[serial]
596 async fn test_concurrent_span_processing() {
597 let _logger = setup_test_logger();
598 let mock_exporter = MockExporter::new();
599 let spans_exported = mock_exporter.spans.clone();
600
601 let processor = Arc::new(
602 LambdaSpanProcessor::builder()
603 .exporter(mock_exporter)
604 .max_queue_size(100)
605 .build(),
606 );
607
608 let mut handles = Vec::new();
609
610 for i in 0..10 {
612 let processor = processor.clone();
613 handles.push(tokio::spawn(async move {
614 for j in 0..10 {
615 processor.on_end(create_test_span(&format!("span_{}_{}", i, j)));
616 }
617 }));
618 }
619
620 for handle in handles {
622 handle.await.unwrap();
623 }
624
625 processor.force_flush().unwrap();
627
628 let exported = spans_exported.lock().await;
629 assert_eq!(exported.len(), 100);
630 assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
631 }
632
633 #[test]
634 #[serial]
635 fn test_builder_default_values() {
636 cleanup_env();
637
638 let mock_exporter = MockExporter::new();
639
640 let processor = LambdaSpanProcessor::builder()
641 .exporter(mock_exporter)
642 .build();
643
644 assert_eq!(processor.spans.lock().unwrap().capacity, 2048); }
647
648 #[test]
649 #[serial]
650 fn test_builder_env_var_values() {
651 cleanup_env();
652
653 let mock_exporter = MockExporter::new();
654
655 env::set_var(env_vars::QUEUE_SIZE, "1000");
657
658 let processor = LambdaSpanProcessor::builder()
659 .exporter(mock_exporter)
660 .build();
661
662 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
664
665 cleanup_env();
666 }
667
668 #[test]
669 #[serial]
670 fn test_builder_env_var_precedence() {
671 cleanup_env();
672
673 let mock_exporter = MockExporter::new();
674
675 env::set_var(env_vars::QUEUE_SIZE, "1000");
677
678 let processor = LambdaSpanProcessor::builder()
680 .exporter(mock_exporter)
681 .max_queue_size(500)
682 .build();
683
684 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
686
687 cleanup_env();
688 }
689
690 #[test]
691 #[serial]
692 fn test_invalid_env_vars() {
693 cleanup_env();
694
695 let mock_exporter = MockExporter::new();
696
697 env::set_var(env_vars::QUEUE_SIZE, "invalid");
699
700 let processor = LambdaSpanProcessor::builder()
702 .exporter(mock_exporter)
703 .max_queue_size(500)
704 .build();
705
706 assert_eq!(processor.spans.lock().unwrap().capacity, 500);
708
709 cleanup_env();
710 }
711}