1use crate::constants::{defaults, env_vars};
102use crate::logger::Logger;
103use bon::bon;
104
105static LOGGER: Logger = Logger::const_new("processor");
107
108use opentelemetry::Context;
109use opentelemetry_sdk::{
110 error::{OTelSdkError, OTelSdkResult},
111 trace::{Span, SpanProcessor},
112 trace::{SpanData, SpanExporter},
113 Resource,
114};
115use std::env;
116use std::sync::{
117 atomic::{AtomicBool, AtomicUsize, Ordering},
118 Arc, Mutex,
119};
120
121#[derive(Debug)]
144struct SpanRingBuffer {
145 buffer: Vec<Option<SpanData>>,
146 head: usize, tail: usize, size: usize, capacity: usize,
150}
151
152impl Default for SpanRingBuffer {
153 fn default() -> Self {
154 Self::new(2048) }
156}
157
158impl SpanRingBuffer {
159 fn new(capacity: usize) -> Self {
160 let mut buffer = Vec::with_capacity(capacity);
161 buffer.extend((0..capacity).map(|_| None));
162 Self {
163 buffer,
164 head: 0,
165 tail: 0,
166 size: 0,
167 capacity,
168 }
169 }
170
171 fn push(&mut self, span: SpanData) -> bool {
172 if self.size == self.capacity {
173 return false;
174 }
175
176 self.buffer[self.head] = Some(span);
177 self.head = (self.head + 1) % self.capacity;
178 self.size += 1;
179 true
180 }
181
182 fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
183 let batch_size = self.size.min(max_batch_size);
184 let mut result = Vec::with_capacity(batch_size);
185
186 for _ in 0..batch_size {
187 if let Some(span) = self.buffer[self.tail].take() {
188 result.push(span);
189 }
190 self.tail = (self.tail + 1) % self.capacity;
191 self.size -= 1;
192 }
193
194 if self.size == 0 {
195 self.head = 0;
196 self.tail = 0;
197 }
198
199 result
200 }
201}
202
203#[derive(Debug)]
234pub struct LambdaSpanProcessor<E>
235where
236 E: SpanExporter + std::fmt::Debug,
237{
238 exporter: Mutex<E>,
240
241 spans: Mutex<SpanRingBuffer>,
243
244 is_shutdown: Arc<AtomicBool>,
246
247 dropped_count: AtomicUsize,
249}
250
251#[bon]
252impl<E> LambdaSpanProcessor<E>
253where
254 E: SpanExporter + std::fmt::Debug,
255{
256 #[builder]
268 pub fn new(exporter: E, max_queue_size: Option<usize>) -> Self {
269 let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
271 Ok(value) => match value.parse::<usize>() {
272 Ok(size) => size,
273 Err(_) => {
274 LOGGER.warn(format!(
275 "Failed to parse {}: {}, using fallback",
276 env_vars::QUEUE_SIZE,
277 value
278 ));
279 max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
280 }
281 },
282 Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
283 };
284
285 Self {
286 exporter: Mutex::new(exporter),
287 spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
288 is_shutdown: Arc::new(AtomicBool::new(false)),
289 dropped_count: AtomicUsize::new(0),
290 }
291 }
292}
293
294impl<E> SpanProcessor for LambdaSpanProcessor<E>
295where
296 E: SpanExporter + std::fmt::Debug,
297{
298 fn on_start(&self, _span: &mut Span, _cx: &Context) {
299 }
301
302 fn on_end(&self, span: SpanData) {
303 if self.is_shutdown.load(Ordering::Relaxed) {
304 LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
305 self.dropped_count.fetch_add(1, Ordering::Relaxed);
306 return;
307 }
308
309 if !span.span_context.is_sampled() {
311 return;
312 }
313
314 if let Ok(mut spans) = self.spans.lock() {
316 if !spans.push(span) {
317 let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
318 if prev == 0 || prev % 100 == 0 {
319 LOGGER.warn(format!(
320 "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
321 prev + 1
322 ));
323 }
324 }
325 } else {
326 LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
327 }
328 }
329
330 fn force_flush(&self) -> OTelSdkResult {
331 LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
332
333 let spans_result = self.spans.lock();
335 let all_spans = match spans_result {
336 Ok(mut spans) => {
337 let current_size = spans.size;
339 spans.take_batch(current_size)
341 }
342 Err(_) => {
343 return Err(OTelSdkError::InternalFailure(
345 "Failed to acquire spans lock in force_flush".to_string(),
346 ));
347 }
348 };
349 let exporter_result = self.exporter.lock();
353 match exporter_result {
354 Ok(exporter) => {
355 let result = futures_executor::block_on(exporter.export(all_spans));
358
359 if let Err(ref err) = result {
361 LOGGER.debug(format!(
362 "LambdaSpanProcessor.force_flush export error: {err:?}"
363 ));
364 }
365
366 result
368 }
369 Err(_) => {
370 Err(OTelSdkError::InternalFailure(
372 "Failed to acquire exporter lock in force_flush".to_string(),
373 ))
374 }
375 }
376 }
378
379 fn shutdown(&self) -> OTelSdkResult {
380 self.is_shutdown.store(true, Ordering::Relaxed);
381 self.force_flush()?;
383 if let Ok(mut exporter) = self.exporter.lock() {
384 exporter.shutdown()
385 } else {
386 Err(OTelSdkError::InternalFailure(
387 "Failed to acquire exporter lock in shutdown".to_string(),
388 ))
389 }
390 }
391
392 fn shutdown_with_timeout(&self, timeout: std::time::Duration) -> OTelSdkResult {
393 self.is_shutdown.store(true, Ordering::Relaxed);
394 self.force_flush()?;
396 if let Ok(mut exporter) = self.exporter.lock() {
397 exporter.shutdown_with_timeout(timeout)
398 } else {
399 Err(OTelSdkError::InternalFailure(
400 "Failed to acquire exporter lock in shutdown_with_timeout".to_string(),
401 ))
402 }
403 }
404
405 fn set_resource(&mut self, resource: &Resource) {
406 if let Ok(mut exporter) = self.exporter.lock() {
407 exporter.set_resource(resource);
408 }
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::logger::Logger;
416 use opentelemetry::{
417 trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
418 InstrumentationScope,
419 };
420 use opentelemetry_sdk::{
421 trace::SpanExporter,
422 trace::{SpanEvents, SpanLinks},
423 };
424 use serial_test::serial;
425 use std::{borrow::Cow, sync::Arc};
426 use tokio::sync::Mutex;
427
428 fn setup_test_logger() -> Logger {
429 Logger::new("test")
430 }
431
432 #[derive(Debug)]
434 struct MockExporter {
435 spans: Arc<Mutex<Vec<SpanData>>>,
436 }
437
438 impl MockExporter {
439 fn new() -> Self {
440 Self {
441 spans: Arc::new(Mutex::new(Vec::new())),
442 }
443 }
444 }
445
446 impl SpanExporter for MockExporter {
447 fn export(
448 &self,
449 batch: Vec<SpanData>,
450 ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
451 {
452 let spans = self.spans.clone();
453 Box::pin(async move {
454 let mut spans = spans.lock().await;
455 spans.extend(batch);
456 Ok(())
457 })
458 }
459
460 fn shutdown(&mut self) -> OTelSdkResult {
461 Ok(())
462 }
463 }
464
465 fn create_test_span(name: &str) -> SpanData {
467 let flags = TraceFlags::default().with_sampled(true);
468
469 SpanData {
470 span_context: SpanContext::new(
471 TraceId::from_hex("01000000000000000000000000000000").unwrap(),
472 SpanId::from_hex("0100000000000001").unwrap(),
473 flags,
474 false,
475 TraceState::default(),
476 ),
477 parent_span_id: SpanId::INVALID,
478 parent_span_is_remote: false,
479 span_kind: opentelemetry::trace::SpanKind::Internal,
480 name: Cow::Owned(name.to_string()),
481 start_time: std::time::SystemTime::now(),
482 end_time: std::time::SystemTime::now(),
483 attributes: Vec::new(),
484 dropped_attributes_count: 0,
485 events: SpanEvents::default(),
486 links: SpanLinks::default(),
487 status: opentelemetry::trace::Status::default(),
488 instrumentation_scope: InstrumentationScope::builder("test").build(),
489 }
490 }
491
492 fn cleanup_env() {
493 env::remove_var(env_vars::QUEUE_SIZE);
494 env::remove_var(env_vars::PROCESSOR_MODE);
495 env::remove_var(env_vars::COMPRESSION_LEVEL);
496 env::remove_var(env_vars::SERVICE_NAME);
497 }
498
499 #[test]
500 #[serial]
501 fn test_ring_buffer_basic_operations() {
502 let mut buffer = SpanRingBuffer::new(2);
503
504 assert!(buffer.size == 0);
506 assert_eq!(buffer.take_batch(2), vec![]);
507
508 buffer.push(create_test_span("span1"));
510 buffer.push(create_test_span("span2"));
511
512 assert!(buffer.size != 0);
513
514 let spans = buffer.take_batch(2);
516 assert_eq!(spans.len(), 2);
517 assert!(buffer.size == 0);
518 }
519
520 #[test]
521 #[serial]
522 fn test_ring_buffer_overflow() {
523 let mut buffer = SpanRingBuffer::new(2);
524
525 buffer.push(create_test_span("span1"));
527 buffer.push(create_test_span("span2"));
528
529 let success = buffer.push(create_test_span("span3"));
531 assert!(!success); let spans = buffer.take_batch(2);
534 assert_eq!(spans.len(), 2);
535 assert!(spans.iter().any(|s| s.name == "span1"));
536 assert!(spans.iter().any(|s| s.name == "span2"));
537 }
538
539 #[test]
540 #[serial]
541 fn test_ring_buffer_batch_operations() {
542 let mut buffer = SpanRingBuffer::new(5);
543
544 for i in 0..5 {
546 buffer.push(create_test_span(&format!("span{i}")));
547 }
548
549 assert_eq!(buffer.take_batch(2).len(), 2);
550 assert_eq!(buffer.take_batch(2).len(), 2);
551 assert_eq!(buffer.take_batch(2).len(), 1);
552 assert!(buffer.size == 0);
553 }
554
555 #[tokio::test]
556 #[serial]
557 async fn test_processor_sync_mode() {
558 let _logger = setup_test_logger();
559 let mock_exporter = MockExporter::new();
560 let spans_exported = mock_exporter.spans.clone();
561
562 let processor = LambdaSpanProcessor::builder()
563 .exporter(mock_exporter)
564 .max_queue_size(10)
565 .build();
566
567 processor.on_end(create_test_span("test_span"));
569
570 processor.force_flush().unwrap();
572
573 let exported = spans_exported.lock().await;
575 assert_eq!(exported.len(), 1);
576 assert_eq!(exported[0].name, "test_span");
577 }
578
579 #[tokio::test]
580 #[serial]
581 async fn test_shutdown_exports_remaining_spans() {
582 let _logger = setup_test_logger();
583 let mock_exporter = MockExporter::new();
584 let spans_exported = mock_exporter.spans.clone();
585
586 let processor = LambdaSpanProcessor::builder()
587 .exporter(mock_exporter)
588 .max_queue_size(10)
589 .build();
590
591 processor.on_end(create_test_span("span1"));
593 processor.on_end(create_test_span("span2"));
594
595 processor.shutdown().unwrap();
597
598 let exported = spans_exported.lock().await;
600 assert_eq!(exported.len(), 2);
601
602 processor.on_end(create_test_span("span3"));
604 assert_eq!(exported.len(), 2); }
606
607 #[tokio::test]
608 #[serial]
609 async fn test_concurrent_span_processing() {
610 let _logger = setup_test_logger();
611 let mock_exporter = MockExporter::new();
612 let spans_exported = mock_exporter.spans.clone();
613
614 let processor = Arc::new(
615 LambdaSpanProcessor::builder()
616 .exporter(mock_exporter)
617 .max_queue_size(100)
618 .build(),
619 );
620
621 let mut handles = Vec::new();
622
623 for i in 0..10 {
625 let processor = processor.clone();
626 handles.push(tokio::spawn(async move {
627 for j in 0..10 {
628 processor.on_end(create_test_span(&format!("span_{i}_{j}")));
629 }
630 }));
631 }
632
633 for handle in handles {
635 handle.await.unwrap();
636 }
637
638 processor.force_flush().unwrap();
640
641 let exported = spans_exported.lock().await;
642 assert_eq!(exported.len(), 100);
643 assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
644 }
645
646 #[test]
647 #[serial]
648 fn test_builder_default_values() {
649 cleanup_env();
650
651 let mock_exporter = MockExporter::new();
652
653 let processor = LambdaSpanProcessor::builder()
654 .exporter(mock_exporter)
655 .build();
656
657 assert_eq!(processor.spans.lock().unwrap().capacity, 2048); }
660
661 #[test]
662 #[serial]
663 fn test_builder_env_var_values() {
664 cleanup_env();
665
666 let mock_exporter = MockExporter::new();
667
668 env::set_var(env_vars::QUEUE_SIZE, "1000");
670
671 let processor = LambdaSpanProcessor::builder()
672 .exporter(mock_exporter)
673 .build();
674
675 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
677
678 cleanup_env();
679 }
680
681 #[test]
682 #[serial]
683 fn test_builder_env_var_precedence() {
684 cleanup_env();
685
686 let mock_exporter = MockExporter::new();
687
688 env::set_var(env_vars::QUEUE_SIZE, "1000");
690
691 let processor = LambdaSpanProcessor::builder()
693 .exporter(mock_exporter)
694 .max_queue_size(500)
695 .build();
696
697 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
699
700 cleanup_env();
701 }
702
703 #[test]
704 #[serial]
705 fn test_invalid_env_vars() {
706 cleanup_env();
707
708 let mock_exporter = MockExporter::new();
709
710 env::set_var(env_vars::QUEUE_SIZE, "invalid");
712
713 let processor = LambdaSpanProcessor::builder()
715 .exporter(mock_exporter)
716 .max_queue_size(500)
717 .build();
718
719 assert_eq!(processor.spans.lock().unwrap().capacity, 500);
721
722 cleanup_env();
723 }
724}