lambda_otel_lite/
processor.rs

1//! Span processor implementation optimized for AWS Lambda functions.
2//!
3//! This module provides a Lambda-optimized span processor that efficiently manages OpenTelemetry spans
4//! in a serverless environment. It uses a ring buffer to store spans in memory and provides efficient
5//! batch processing capabilities.
6//!
7//! # Architecture
8//!
9//! The processor is designed specifically for the Lambda execution environment:
10//!
11//! 1. **Ring Buffer Storage**:
12//!    - Fixed-size circular buffer prevents memory growth
13//!    - O(1) push operations with no memory reallocation
14//!    - FIFO ordering ensures spans are processed in order
15//!    - Efficient batch removal for export
16//!    - When full, new spans are dropped (with warning logs)
17//!
18//! 2. **Thread Safety**:
19//!    - All operations are thread-safe
20//!    - Uses Mutex for span buffer access
21//!    - Atomic operations for state management
22//!    - Safe for concurrent span submission
23//!
24//! # Configuration
25//!
26//! The processor can be configured through environment variables:
27//!
28//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls buffer size
29//!   - Defaults to 2048 spans
30//!   - Should be tuned based on span volume
31//!
32//! - `LAMBDA_SPAN_PROCESSOR_BATCH_SIZE`: Controls batch size
33//!   - Defaults to 512 spans
34//!   - Should be tuned based on span volume
35//!
36//! # Usage Examples
37//!
38//! Basic setup with default configuration:
39//!
40//! ```no_run
41//! use lambda_otel_lite::LambdaSpanProcessor;
42//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
43//!
44//! let processor = LambdaSpanProcessor::builder()
45//!     .exporter(OtlpStdoutSpanExporter::default())
46//!     .build();
47//! ```
48//!
49//! Using with an OTLP HTTP exporter:
50//!
51//! ```no_run
52//! use lambda_otel_lite::LambdaSpanProcessor;
53//! use opentelemetry_otlp::{SpanExporter, Protocol};
54//! use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
55//!
56//! // Important: When using HTTP exporters, always use reqwest::blocking::Client
57//! // Using async clients will cause deadlocks
58//! let exporter = SpanExporter::builder()
59//!     .with_http()
60//!     .with_http_client(reqwest::blocking::Client::new())
61//!     .with_protocol(Protocol::HttpBinary)
62//!     .build()
63//!     .expect("Failed to create exporter");
64//!
65//! let processor = LambdaSpanProcessor::builder()
66//!     .exporter(exporter)
67//!     .max_queue_size(4096)
68//!     .max_batch_size(1024)
69//!     .build();
70//! ```
71//!
72//! # Performance Considerations
73//!
74//! 1. **Memory Usage**:
75//!    - Fixed memory footprint based on queue size
76//!    - Each span typically uses 100-500 bytes
77//!    - Default 2048 spans ≈ 0.5-1MB memory
78//!
79//! 2. **Latency Impact**:
80//!    - Batch processing reduces network overhead
81//!    - Configurable batch size allows tuning for your use case
82//!    - Force flush available for immediate export when needed
83//!
84//! 3. **Reliability**:
85//!    - Spans may be dropped if buffer fills
86//!    - Warning logs indicate dropped spans
87//!    - Consider increasing buffer size if spans are dropped
88//!
89//! # Best Practices
90//!
91//! 1. **Buffer Sizing**:
92//!    - Monitor dropped_spans metric
93//!    - Size based on max spans per invocation
94//!    - Consider function memory when sizing
95//!
96//! 2. **Batch Configuration**:
97//!    - Larger batches improve throughput but increase memory usage
98//!    - Smaller batches reduce memory but increase network overhead
99//!    - Default values work well for most use cases
100//!
101//! 3. **Error Handling**:
102//!    - Export errors are logged but don't fail function
103//!    - Monitor for export failures in logs
104//!    - Consider retry strategies in custom exporters
105
106use crate::constants::{defaults, env_vars};
107use crate::logger::Logger;
108use bon::bon;
109
110/// Module-specific logger
111static LOGGER: Logger = Logger::const_new("processor");
112
113use opentelemetry::Context;
114use opentelemetry_sdk::{
115    error::{OTelSdkError, OTelSdkResult},
116    trace::{Span, SpanProcessor},
117    trace::{SpanData, SpanExporter},
118    Resource,
119};
120use std::env;
121use std::sync::{
122    atomic::{AtomicBool, AtomicUsize, Ordering},
123    Arc, Mutex,
124};
125
126/// A fixed-size ring buffer for storing spans efficiently.
127///
128/// This implementation provides a memory-efficient way to store spans with
129/// predictable performance characteristics:
130///
131/// # Performance Characteristics
132///
133/// - Push Operation: O(1)
134/// - Memory Usage: Fixed based on capacity
135/// - Order: FIFO (First In, First Out)
136/// - Batch Operations: Efficient removal of all spans
137///
138/// # Implementation Details
139///
140/// The buffer uses a circular array with head and tail pointers:
141/// - `head`: Points to next write position
142/// - `tail`: Points to next read position
143/// - `size`: Current number of elements
144/// - `capacity`: Maximum number of elements
145///
146/// When the buffer is full, new spans are rejected rather than overwriting old ones.
147/// This ensures no data loss occurs silently.
148#[derive(Debug)]
149struct SpanRingBuffer {
150    buffer: Vec<Option<SpanData>>,
151    head: usize, // Where to write next
152    tail: usize, // Where to read next
153    size: usize, // Current number of elements
154    capacity: usize,
155}
156
157impl Default for SpanRingBuffer {
158    fn default() -> Self {
159        Self::new(2048) // Default capacity
160    }
161}
162
163impl SpanRingBuffer {
164    fn new(capacity: usize) -> Self {
165        let mut buffer = Vec::with_capacity(capacity);
166        buffer.extend((0..capacity).map(|_| None));
167        Self {
168            buffer,
169            head: 0,
170            tail: 0,
171            size: 0,
172            capacity,
173        }
174    }
175
176    fn push(&mut self, span: SpanData) -> bool {
177        if self.size == self.capacity {
178            return false;
179        }
180
181        self.buffer[self.head] = Some(span);
182        self.head = (self.head + 1) % self.capacity;
183        self.size += 1;
184        true
185    }
186
187    fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
188        let batch_size = self.size.min(max_batch_size);
189        let mut result = Vec::with_capacity(batch_size);
190
191        for _ in 0..batch_size {
192            if let Some(span) = self.buffer[self.tail].take() {
193                result.push(span);
194            }
195            self.tail = (self.tail + 1) % self.capacity;
196            self.size -= 1;
197        }
198
199        if self.size == 0 {
200            self.head = 0;
201            self.tail = 0;
202        }
203
204        result
205    }
206
207    fn is_empty(&self) -> bool {
208        self.size == 0
209    }
210}
211
212/// A span processor optimized for AWS Lambda functions.
213///
214/// This processor efficiently manages spans in a Lambda environment:
215/// - Uses a fixed-size ring buffer to prevent memory growth
216/// - Supports synchronous and asynchronous export modes
217/// - Handles graceful shutdown for Lambda termination
218///
219/// # Examples
220///
221/// ```
222/// use lambda_otel_lite::LambdaSpanProcessor;
223/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
224///
225/// let processor = LambdaSpanProcessor::builder()
226///     .exporter(OtlpStdoutSpanExporter::default())
227///     .build();
228/// ```
229///
230/// With custom configuration:
231///
232/// ```
233/// use lambda_otel_lite::LambdaSpanProcessor;
234/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
235///
236/// let processor = LambdaSpanProcessor::builder()
237///     .exporter(OtlpStdoutSpanExporter::default())
238///     .max_queue_size(1000)
239///     .max_batch_size(100)
240///     .build();
241/// ```
242#[derive(Debug)]
243pub struct LambdaSpanProcessor<E>
244where
245    E: SpanExporter + std::fmt::Debug,
246{
247    /// The exporter used to export spans
248    exporter: Mutex<E>,
249
250    /// Internal buffer for storing spans
251    spans: Mutex<SpanRingBuffer>,
252
253    /// Flag indicating whether the processor is shut down
254    is_shutdown: Arc<AtomicBool>,
255
256    /// Counter for dropped spans
257    dropped_count: AtomicUsize,
258
259    /// Maximum number of spans to export in a single batch
260    max_batch_size: usize,
261}
262
263#[bon]
264impl<E> LambdaSpanProcessor<E>
265where
266    E: SpanExporter + std::fmt::Debug,
267{
268    /// Creates a new LambdaSpanProcessor with the given exporter and configuration
269    ///
270    /// # Environment Variable Precedence
271    ///
272    /// Configuration values follow this precedence order:
273    /// 1. Environment variables (highest precedence)
274    /// 2. Constructor parameters
275    /// 3. Default values (lowest precedence)
276    ///
277    /// The relevant environment variables are:
278    /// - `LAMBDA_SPAN_PROCESSOR_BATCH_SIZE`: Controls the maximum batch size
279    /// - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls the maximum queue size
280    #[builder]
281    pub fn new(exporter: E, max_batch_size: Option<usize>, max_queue_size: Option<usize>) -> Self {
282        // Get batch size with proper precedence (env var > param > default)
283        let max_batch_size = match env::var(env_vars::BATCH_SIZE) {
284            Ok(value) => match value.parse::<usize>() {
285                Ok(size) => size,
286                Err(_) => {
287                    LOGGER.warn(format!(
288                        "Failed to parse {}: {}, using fallback",
289                        env_vars::BATCH_SIZE,
290                        value
291                    ));
292                    max_batch_size.unwrap_or(defaults::BATCH_SIZE)
293                }
294            },
295            Err(_) => max_batch_size.unwrap_or(defaults::BATCH_SIZE),
296        };
297
298        // Get queue size with proper precedence (env var > param > default)
299        let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
300            Ok(value) => match value.parse::<usize>() {
301                Ok(size) => size,
302                Err(_) => {
303                    LOGGER.warn(format!(
304                        "Failed to parse {}: {}, using fallback",
305                        env_vars::QUEUE_SIZE,
306                        value
307                    ));
308                    max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
309                }
310            },
311            Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
312        };
313
314        Self {
315            exporter: Mutex::new(exporter),
316            spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
317            is_shutdown: Arc::new(AtomicBool::new(false)),
318            dropped_count: AtomicUsize::new(0),
319            max_batch_size,
320        }
321    }
322}
323
324impl<E> SpanProcessor for LambdaSpanProcessor<E>
325where
326    E: SpanExporter + std::fmt::Debug,
327{
328    fn on_start(&self, _span: &mut Span, _cx: &Context) {
329        // No-op, as we only process spans on end
330    }
331
332    fn on_end(&self, span: SpanData) {
333        if self.is_shutdown.load(Ordering::Relaxed) {
334            LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
335            self.dropped_count.fetch_add(1, Ordering::Relaxed);
336            return;
337        }
338
339        // Skip unsampled spans
340        if !span.span_context.is_sampled() {
341            return;
342        }
343
344        // Try to add span to the buffer
345        if let Ok(mut spans) = self.spans.lock() {
346            if !spans.push(span) {
347                let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
348                if prev == 0 || prev % 100 == 0 {
349                    LOGGER.warn(format!(
350                        "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
351                        prev + 1
352                    ));
353                }
354            }
355        } else {
356            LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
357        }
358    }
359
360    fn force_flush(&self) -> OTelSdkResult {
361        LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
362        if let Ok(mut spans) = self.spans.lock() {
363            if spans.is_empty() {
364                return Ok(());
365            }
366
367            let mut exporter = self.exporter.lock().map_err(|_| {
368                OTelSdkError::InternalFailure(
369                    "Failed to acquire exporter lock in force_flush".to_string(),
370                )
371            })?;
372
373            // Process spans in batches
374            while !spans.is_empty() {
375                let batch = spans.take_batch(self.max_batch_size);
376                if !batch.is_empty() {
377                    let result = futures_executor::block_on(exporter.export(batch));
378                    if let Err(err) = &result {
379                        LOGGER.debug(format!("LambdaSpanProcessor.force_flush.Error: {:?}", err));
380                        return result;
381                    }
382                }
383            }
384            Ok(())
385        } else {
386            Err(OTelSdkError::InternalFailure(
387                "Failed to acquire spans lock in force_flush".to_string(),
388            ))
389        }
390    }
391
392    fn shutdown(&self) -> OTelSdkResult {
393        self.is_shutdown.store(true, Ordering::Relaxed);
394        // Flush any remaining spans
395        self.force_flush()?;
396        if let Ok(mut exporter) = self.exporter.lock() {
397            exporter.shutdown()
398        } else {
399            Err(OTelSdkError::InternalFailure(
400                "Failed to acquire exporter lock in shutdown".to_string(),
401            ))
402        }
403    }
404
405    fn set_resource(&mut self, resource: &Resource) {
406        if let Ok(mut exporter) = self.exporter.lock() {
407            exporter.set_resource(resource);
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::logger::Logger;
416    use opentelemetry::{
417        trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
418        InstrumentationScope,
419    };
420    use opentelemetry_sdk::{
421        trace::SpanExporter,
422        trace::{SpanEvents, SpanLinks},
423    };
424    use serial_test::serial;
425    use std::{borrow::Cow, future::Future, pin::Pin, sync::Arc};
426    use tokio::sync::Mutex;
427
428    fn setup_test_logger() -> Logger {
429        Logger::new("test")
430    }
431
432    // Mock exporter that captures exported spans
433    #[derive(Debug)]
434    struct MockExporter {
435        spans: Arc<Mutex<Vec<SpanData>>>,
436    }
437
438    impl MockExporter {
439        fn new() -> Self {
440            Self {
441                spans: Arc::new(Mutex::new(Vec::new())),
442            }
443        }
444    }
445
446    impl SpanExporter for MockExporter {
447        fn export(
448            &mut self,
449            batch: Vec<SpanData>,
450        ) -> Pin<Box<dyn Future<Output = OTelSdkResult> + Send>> {
451            let spans = self.spans.clone();
452            Box::pin(async move {
453                let mut spans = spans.lock().await;
454                spans.extend(batch);
455                Ok(())
456            })
457        }
458
459        fn shutdown(&mut self) -> OTelSdkResult {
460            Ok(())
461        }
462    }
463
464    // Helper function to create a test span
465    fn create_test_span(name: &str) -> SpanData {
466        let flags = TraceFlags::default().with_sampled(true);
467
468        SpanData {
469            span_context: SpanContext::new(
470                TraceId::from_hex("01000000000000000000000000000000").unwrap(),
471                SpanId::from_hex("0100000000000001").unwrap(),
472                flags,
473                false,
474                TraceState::default(),
475            ),
476            parent_span_id: SpanId::INVALID,
477            span_kind: opentelemetry::trace::SpanKind::Internal,
478            name: Cow::Owned(name.to_string()),
479            start_time: std::time::SystemTime::now(),
480            end_time: std::time::SystemTime::now(),
481            attributes: Vec::new(),
482            dropped_attributes_count: 0,
483            events: SpanEvents::default(),
484            links: SpanLinks::default(),
485            status: opentelemetry::trace::Status::default(),
486            instrumentation_scope: InstrumentationScope::builder("test").build(),
487        }
488    }
489
490    fn cleanup_env() {
491        env::remove_var(env_vars::BATCH_SIZE);
492        env::remove_var(env_vars::QUEUE_SIZE);
493        env::remove_var(env_vars::PROCESSOR_MODE);
494        env::remove_var(env_vars::COMPRESSION_LEVEL);
495        env::remove_var(env_vars::SERVICE_NAME);
496    }
497
498    #[test]
499    #[serial]
500    fn test_ring_buffer_basic_operations() {
501        let mut buffer = SpanRingBuffer::new(2);
502
503        // Test empty buffer
504        assert!(buffer.is_empty());
505        assert_eq!(buffer.take_batch(2), vec![]);
506
507        // Test adding spans
508        buffer.push(create_test_span("span1"));
509        buffer.push(create_test_span("span2"));
510
511        assert!(!buffer.is_empty());
512
513        // Test taking spans
514        let spans = buffer.take_batch(2);
515        assert_eq!(spans.len(), 2);
516        assert!(buffer.is_empty());
517    }
518
519    #[test]
520    #[serial]
521    fn test_ring_buffer_overflow() {
522        let mut buffer = SpanRingBuffer::new(2);
523
524        // Fill buffer
525        buffer.push(create_test_span("span1"));
526        buffer.push(create_test_span("span2"));
527
528        // Add one more span, should overwrite the oldest
529        let success = buffer.push(create_test_span("span3"));
530        assert!(!success); // Should fail since buffer is full
531
532        let spans = buffer.take_batch(2);
533        assert_eq!(spans.len(), 2);
534        assert!(spans.iter().any(|s| s.name == "span1"));
535        assert!(spans.iter().any(|s| s.name == "span2"));
536    }
537
538    #[test]
539    #[serial]
540    fn test_ring_buffer_batch_operations() {
541        let mut buffer = SpanRingBuffer::new(5);
542
543        // Add 5 spans
544        for i in 0..5 {
545            buffer.push(create_test_span(&format!("span{}", i)));
546        }
547
548        assert_eq!(buffer.take_batch(2).len(), 2);
549        assert_eq!(buffer.take_batch(2).len(), 2);
550        assert_eq!(buffer.take_batch(2).len(), 1);
551        assert!(buffer.is_empty());
552    }
553
554    #[tokio::test]
555    #[serial]
556    async fn test_processor_sync_mode() {
557        let _logger = setup_test_logger();
558        let mock_exporter = MockExporter::new();
559        let spans_exported = mock_exporter.spans.clone();
560
561        let processor = LambdaSpanProcessor::builder()
562            .exporter(mock_exporter)
563            .max_queue_size(10)
564            .max_batch_size(5)
565            .build();
566
567        // Test span processing
568        processor.on_end(create_test_span("test_span"));
569
570        // Force flush to ensure export
571        processor.force_flush().unwrap();
572
573        // Verify span was exported
574        let exported = spans_exported.lock().await;
575        assert_eq!(exported.len(), 1);
576        assert_eq!(exported[0].name, "test_span");
577    }
578
579    #[tokio::test]
580    #[serial]
581    async fn test_shutdown_exports_remaining_spans() {
582        let _logger = setup_test_logger();
583        let mock_exporter = MockExporter::new();
584        let spans_exported = mock_exporter.spans.clone();
585
586        let processor = LambdaSpanProcessor::builder()
587            .exporter(mock_exporter)
588            .max_queue_size(10)
589            .max_batch_size(5)
590            .build();
591
592        // Add some spans
593        processor.on_end(create_test_span("span1"));
594        processor.on_end(create_test_span("span2"));
595
596        // Shutdown should export all spans
597        processor.shutdown().unwrap();
598
599        // Verify all spans were exported
600        let exported = spans_exported.lock().await;
601        assert_eq!(exported.len(), 2);
602
603        // Verify new spans are dropped after shutdown
604        processor.on_end(create_test_span("span3"));
605        assert_eq!(exported.len(), 2); // No new spans after shutdown
606    }
607
608    #[tokio::test]
609    #[serial]
610    async fn test_concurrent_span_processing() {
611        let _logger = setup_test_logger();
612        let mock_exporter = MockExporter::new();
613        let spans_exported = mock_exporter.spans.clone();
614
615        let processor = Arc::new(
616            LambdaSpanProcessor::builder()
617                .exporter(mock_exporter)
618                .max_queue_size(100)
619                .max_batch_size(25)
620                .build(),
621        );
622
623        let mut handles = Vec::new();
624
625        // Spawn 10 tasks, each adding 10 spans
626        for i in 0..10 {
627            let processor = processor.clone();
628            handles.push(tokio::spawn(async move {
629                for j in 0..10 {
630                    processor.on_end(create_test_span(&format!("span_{}_{}", i, j)));
631                }
632            }));
633        }
634
635        // Wait for all tasks to complete
636        for handle in handles {
637            handle.await.unwrap();
638        }
639
640        // Force flush and verify all spans were processed
641        processor.force_flush().unwrap();
642
643        let exported = spans_exported.lock().await;
644        assert_eq!(exported.len(), 100);
645        assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
646    }
647
648    #[test]
649    #[serial]
650    fn test_batch_processing() {
651        let _logger = setup_test_logger();
652        let mock_exporter = MockExporter::new();
653        let processor = LambdaSpanProcessor::builder()
654            .exporter(mock_exporter)
655            .max_queue_size(10)
656            .max_batch_size(3)
657            .build();
658
659        // Add 5 spans
660        for i in 0..5 {
661            processor.on_end(create_test_span(&format!("span{}", i)));
662        }
663
664        // Force flush should process in batches of 3
665        processor.force_flush().unwrap();
666
667        // Add 2 more spans
668        processor.on_end(create_test_span("span5"));
669        processor.on_end(create_test_span("span6"));
670
671        // Final flush
672        processor.force_flush().unwrap();
673    }
674
675    #[test]
676    #[serial]
677    fn test_builder_default_values() {
678        cleanup_env();
679
680        let mock_exporter = MockExporter::new();
681
682        let processor = LambdaSpanProcessor::builder()
683            .exporter(mock_exporter)
684            .build();
685
686        // Check default values
687        assert_eq!(processor.max_batch_size, 512); // Default batch size
688        assert_eq!(processor.spans.lock().unwrap().capacity, 2048); // Default queue size
689    }
690
691    #[test]
692    #[serial]
693    fn test_builder_env_var_values() {
694        cleanup_env();
695
696        let mock_exporter = MockExporter::new();
697
698        // Set custom values via env vars
699        env::set_var(env_vars::BATCH_SIZE, "100");
700        env::set_var(env_vars::QUEUE_SIZE, "1000");
701
702        let processor = LambdaSpanProcessor::builder()
703            .exporter(mock_exporter)
704            .build();
705
706        // Check that env var values were used
707        assert_eq!(processor.max_batch_size, 100);
708        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
709
710        cleanup_env();
711    }
712
713    #[test]
714    #[serial]
715    fn test_builder_env_var_precedence() {
716        cleanup_env();
717
718        let mock_exporter = MockExporter::new();
719
720        // Set custom values via env vars
721        env::set_var(env_vars::BATCH_SIZE, "100");
722        env::set_var(env_vars::QUEUE_SIZE, "1000");
723
724        // Create with explicit values (should be overridden by env vars)
725        let processor = LambdaSpanProcessor::builder()
726            .exporter(mock_exporter)
727            .max_batch_size(50)
728            .max_queue_size(500)
729            .build();
730
731        // Check that env var values took precedence
732        assert_eq!(processor.max_batch_size, 100);
733        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
734
735        cleanup_env();
736    }
737
738    #[test]
739    #[serial]
740    fn test_invalid_env_vars() {
741        cleanup_env();
742
743        let mock_exporter = MockExporter::new();
744
745        // Set invalid values via env vars
746        env::set_var(env_vars::BATCH_SIZE, "not_a_number");
747        env::set_var(env_vars::QUEUE_SIZE, "invalid");
748
749        // Create with explicit values (should be used as fallbacks)
750        let processor = LambdaSpanProcessor::builder()
751            .exporter(mock_exporter)
752            .max_batch_size(50)
753            .max_queue_size(500)
754            .build();
755
756        // Check that fallback values were used
757        assert_eq!(processor.max_batch_size, 50);
758        assert_eq!(processor.spans.lock().unwrap().capacity, 500);
759
760        cleanup_env();
761    }
762}