1use crate::constants::{defaults, env_vars};
102use crate::logger::Logger;
103use bon::bon;
104
105static LOGGER: Logger = Logger::const_new("processor");
107
108use opentelemetry::Context;
109use opentelemetry_sdk::{
110 error::{OTelSdkError, OTelSdkResult},
111 trace::{Span, SpanProcessor},
112 trace::{SpanData, SpanExporter},
113 Resource,
114};
115use std::env;
116use std::sync::{
117 atomic::{AtomicBool, AtomicUsize, Ordering},
118 Arc, Mutex,
119};
120
121#[derive(Debug)]
144struct SpanRingBuffer {
145 buffer: Vec<Option<SpanData>>,
146 head: usize, tail: usize, size: usize, capacity: usize,
150}
151
152impl Default for SpanRingBuffer {
153 fn default() -> Self {
154 Self::new(2048) }
156}
157
158impl SpanRingBuffer {
159 fn new(capacity: usize) -> Self {
160 let mut buffer = Vec::with_capacity(capacity);
161 buffer.extend((0..capacity).map(|_| None));
162 Self {
163 buffer,
164 head: 0,
165 tail: 0,
166 size: 0,
167 capacity,
168 }
169 }
170
171 fn push(&mut self, span: SpanData) -> bool {
172 if self.size == self.capacity {
173 return false;
174 }
175
176 self.buffer[self.head] = Some(span);
177 self.head = (self.head + 1) % self.capacity;
178 self.size += 1;
179 true
180 }
181
182 fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
183 let batch_size = self.size.min(max_batch_size);
184 let mut result = Vec::with_capacity(batch_size);
185
186 for _ in 0..batch_size {
187 if let Some(span) = self.buffer[self.tail].take() {
188 result.push(span);
189 }
190 self.tail = (self.tail + 1) % self.capacity;
191 self.size -= 1;
192 }
193
194 if self.size == 0 {
195 self.head = 0;
196 self.tail = 0;
197 }
198
199 result
200 }
201}
202
203#[derive(Debug)]
234pub struct LambdaSpanProcessor<E>
235where
236 E: SpanExporter + std::fmt::Debug,
237{
238 exporter: Mutex<E>,
240
241 spans: Mutex<SpanRingBuffer>,
243
244 is_shutdown: Arc<AtomicBool>,
246
247 dropped_count: AtomicUsize,
249}
250
251#[bon]
252impl<E> LambdaSpanProcessor<E>
253where
254 E: SpanExporter + std::fmt::Debug,
255{
256 #[builder]
268 pub fn new(exporter: E, max_queue_size: Option<usize>) -> Self {
269 let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
271 Ok(value) => match value.parse::<usize>() {
272 Ok(size) => size,
273 Err(_) => {
274 LOGGER.warn(format!(
275 "Failed to parse {}: {}, using fallback",
276 env_vars::QUEUE_SIZE,
277 value
278 ));
279 max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
280 }
281 },
282 Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
283 };
284
285 Self {
286 exporter: Mutex::new(exporter),
287 spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
288 is_shutdown: Arc::new(AtomicBool::new(false)),
289 dropped_count: AtomicUsize::new(0),
290 }
291 }
292}
293
294impl<E> SpanProcessor for LambdaSpanProcessor<E>
295where
296 E: SpanExporter + std::fmt::Debug,
297{
298 fn on_start(&self, _span: &mut Span, _cx: &Context) {
299 }
301
302 fn on_end(&self, span: SpanData) {
303 if self.is_shutdown.load(Ordering::Relaxed) {
304 LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
305 self.dropped_count.fetch_add(1, Ordering::Relaxed);
306 return;
307 }
308
309 if !span.span_context.is_sampled() {
311 return;
312 }
313
314 if let Ok(mut spans) = self.spans.lock() {
316 if !spans.push(span) {
317 let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
318 if prev == 0 || prev % 100 == 0 {
319 LOGGER.warn(format!(
320 "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
321 prev + 1
322 ));
323 }
324 }
325 } else {
326 LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
327 }
328 }
329
330 fn force_flush(&self) -> OTelSdkResult {
331 LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
332
333 let spans_result = self.spans.lock();
335 let all_spans = match spans_result {
336 Ok(mut spans) => {
337 let current_size = spans.size;
339 spans.take_batch(current_size)
341 }
342 Err(_) => {
343 return Err(OTelSdkError::InternalFailure(
345 "Failed to acquire spans lock in force_flush".to_string(),
346 ));
347 }
348 };
349 let exporter_result = self.exporter.lock();
353 match exporter_result {
354 Ok(exporter) => {
355 let result = futures_executor::block_on(exporter.export(all_spans));
358
359 if let Err(ref err) = result {
361 LOGGER.debug(format!(
362 "LambdaSpanProcessor.force_flush export error: {err:?}"
363 ));
364 }
365
366 result
368 }
369 Err(_) => {
370 Err(OTelSdkError::InternalFailure(
372 "Failed to acquire exporter lock in force_flush".to_string(),
373 ))
374 }
375 }
376 }
378
379 fn shutdown(&self) -> OTelSdkResult {
380 self.is_shutdown.store(true, Ordering::Relaxed);
381 self.force_flush()?;
383 if let Ok(mut exporter) = self.exporter.lock() {
384 exporter.shutdown()
385 } else {
386 Err(OTelSdkError::InternalFailure(
387 "Failed to acquire exporter lock in shutdown".to_string(),
388 ))
389 }
390 }
391
392 fn shutdown_with_timeout(&self, timeout: std::time::Duration) -> OTelSdkResult {
393 self.is_shutdown.store(true, Ordering::Relaxed);
394 self.force_flush()?;
396 if let Ok(mut exporter) = self.exporter.lock() {
397 exporter.shutdown_with_timeout(timeout)
398 } else {
399 Err(OTelSdkError::InternalFailure(
400 "Failed to acquire exporter lock in shutdown_with_timeout".to_string(),
401 ))
402 }
403 }
404
405 fn set_resource(&mut self, resource: &Resource) {
406 if let Ok(mut exporter) = self.exporter.lock() {
407 exporter.set_resource(resource);
408 }
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::logger::Logger;
416 use opentelemetry::{
417 trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
418 InstrumentationScope,
419 };
420 use opentelemetry_sdk::{
421 trace::SpanExporter,
422 trace::{SpanEvents, SpanLinks},
423 };
424 use serial_test::serial;
425 use std::{borrow::Cow, sync::Arc};
426 use tokio::sync::Mutex;
427
428 fn setup_test_logger() -> Logger {
429 Logger::new("test")
430 }
431
432 #[derive(Debug)]
434 struct MockExporter {
435 spans: Arc<Mutex<Vec<SpanData>>>,
436 }
437
438 impl MockExporter {
439 fn new() -> Self {
440 Self {
441 spans: Arc::new(Mutex::new(Vec::new())),
442 }
443 }
444 }
445
446 impl SpanExporter for MockExporter {
447 fn export(
448 &self,
449 batch: Vec<SpanData>,
450 ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
451 {
452 let spans = self.spans.clone();
453 Box::pin(async move {
454 let mut spans = spans.lock().await;
455 spans.extend(batch);
456 Ok(())
457 })
458 }
459
460 fn shutdown(&mut self) -> OTelSdkResult {
461 Ok(())
462 }
463 }
464
465 fn create_test_span(name: &str) -> SpanData {
467 let flags = TraceFlags::default().with_sampled(true);
468
469 SpanData {
470 span_context: SpanContext::new(
471 TraceId::from_hex("01000000000000000000000000000000").unwrap(),
472 SpanId::from_hex("0100000000000001").unwrap(),
473 flags,
474 false,
475 TraceState::default(),
476 ),
477 parent_span_id: SpanId::INVALID,
478 span_kind: opentelemetry::trace::SpanKind::Internal,
479 name: Cow::Owned(name.to_string()),
480 start_time: std::time::SystemTime::now(),
481 end_time: std::time::SystemTime::now(),
482 attributes: Vec::new(),
483 dropped_attributes_count: 0,
484 events: SpanEvents::default(),
485 links: SpanLinks::default(),
486 status: opentelemetry::trace::Status::default(),
487 instrumentation_scope: InstrumentationScope::builder("test").build(),
488 }
489 }
490
491 fn cleanup_env() {
492 env::remove_var(env_vars::QUEUE_SIZE);
493 env::remove_var(env_vars::PROCESSOR_MODE);
494 env::remove_var(env_vars::COMPRESSION_LEVEL);
495 env::remove_var(env_vars::SERVICE_NAME);
496 }
497
498 #[test]
499 #[serial]
500 fn test_ring_buffer_basic_operations() {
501 let mut buffer = SpanRingBuffer::new(2);
502
503 assert!(buffer.size == 0);
505 assert_eq!(buffer.take_batch(2), vec![]);
506
507 buffer.push(create_test_span("span1"));
509 buffer.push(create_test_span("span2"));
510
511 assert!(buffer.size != 0);
512
513 let spans = buffer.take_batch(2);
515 assert_eq!(spans.len(), 2);
516 assert!(buffer.size == 0);
517 }
518
519 #[test]
520 #[serial]
521 fn test_ring_buffer_overflow() {
522 let mut buffer = SpanRingBuffer::new(2);
523
524 buffer.push(create_test_span("span1"));
526 buffer.push(create_test_span("span2"));
527
528 let success = buffer.push(create_test_span("span3"));
530 assert!(!success); let spans = buffer.take_batch(2);
533 assert_eq!(spans.len(), 2);
534 assert!(spans.iter().any(|s| s.name == "span1"));
535 assert!(spans.iter().any(|s| s.name == "span2"));
536 }
537
538 #[test]
539 #[serial]
540 fn test_ring_buffer_batch_operations() {
541 let mut buffer = SpanRingBuffer::new(5);
542
543 for i in 0..5 {
545 buffer.push(create_test_span(&format!("span{i}")));
546 }
547
548 assert_eq!(buffer.take_batch(2).len(), 2);
549 assert_eq!(buffer.take_batch(2).len(), 2);
550 assert_eq!(buffer.take_batch(2).len(), 1);
551 assert!(buffer.size == 0);
552 }
553
554 #[tokio::test]
555 #[serial]
556 async fn test_processor_sync_mode() {
557 let _logger = setup_test_logger();
558 let mock_exporter = MockExporter::new();
559 let spans_exported = mock_exporter.spans.clone();
560
561 let processor = LambdaSpanProcessor::builder()
562 .exporter(mock_exporter)
563 .max_queue_size(10)
564 .build();
565
566 processor.on_end(create_test_span("test_span"));
568
569 processor.force_flush().unwrap();
571
572 let exported = spans_exported.lock().await;
574 assert_eq!(exported.len(), 1);
575 assert_eq!(exported[0].name, "test_span");
576 }
577
578 #[tokio::test]
579 #[serial]
580 async fn test_shutdown_exports_remaining_spans() {
581 let _logger = setup_test_logger();
582 let mock_exporter = MockExporter::new();
583 let spans_exported = mock_exporter.spans.clone();
584
585 let processor = LambdaSpanProcessor::builder()
586 .exporter(mock_exporter)
587 .max_queue_size(10)
588 .build();
589
590 processor.on_end(create_test_span("span1"));
592 processor.on_end(create_test_span("span2"));
593
594 processor.shutdown().unwrap();
596
597 let exported = spans_exported.lock().await;
599 assert_eq!(exported.len(), 2);
600
601 processor.on_end(create_test_span("span3"));
603 assert_eq!(exported.len(), 2); }
605
606 #[tokio::test]
607 #[serial]
608 async fn test_concurrent_span_processing() {
609 let _logger = setup_test_logger();
610 let mock_exporter = MockExporter::new();
611 let spans_exported = mock_exporter.spans.clone();
612
613 let processor = Arc::new(
614 LambdaSpanProcessor::builder()
615 .exporter(mock_exporter)
616 .max_queue_size(100)
617 .build(),
618 );
619
620 let mut handles = Vec::new();
621
622 for i in 0..10 {
624 let processor = processor.clone();
625 handles.push(tokio::spawn(async move {
626 for j in 0..10 {
627 processor.on_end(create_test_span(&format!("span_{i}_{j}")));
628 }
629 }));
630 }
631
632 for handle in handles {
634 handle.await.unwrap();
635 }
636
637 processor.force_flush().unwrap();
639
640 let exported = spans_exported.lock().await;
641 assert_eq!(exported.len(), 100);
642 assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
643 }
644
645 #[test]
646 #[serial]
647 fn test_builder_default_values() {
648 cleanup_env();
649
650 let mock_exporter = MockExporter::new();
651
652 let processor = LambdaSpanProcessor::builder()
653 .exporter(mock_exporter)
654 .build();
655
656 assert_eq!(processor.spans.lock().unwrap().capacity, 2048); }
659
660 #[test]
661 #[serial]
662 fn test_builder_env_var_values() {
663 cleanup_env();
664
665 let mock_exporter = MockExporter::new();
666
667 env::set_var(env_vars::QUEUE_SIZE, "1000");
669
670 let processor = LambdaSpanProcessor::builder()
671 .exporter(mock_exporter)
672 .build();
673
674 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
676
677 cleanup_env();
678 }
679
680 #[test]
681 #[serial]
682 fn test_builder_env_var_precedence() {
683 cleanup_env();
684
685 let mock_exporter = MockExporter::new();
686
687 env::set_var(env_vars::QUEUE_SIZE, "1000");
689
690 let processor = LambdaSpanProcessor::builder()
692 .exporter(mock_exporter)
693 .max_queue_size(500)
694 .build();
695
696 assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
698
699 cleanup_env();
700 }
701
702 #[test]
703 #[serial]
704 fn test_invalid_env_vars() {
705 cleanup_env();
706
707 let mock_exporter = MockExporter::new();
708
709 env::set_var(env_vars::QUEUE_SIZE, "invalid");
711
712 let processor = LambdaSpanProcessor::builder()
714 .exporter(mock_exporter)
715 .max_queue_size(500)
716 .build();
717
718 assert_eq!(processor.spans.lock().unwrap().capacity, 500);
720
721 cleanup_env();
722 }
723}