lambda_otel_lite/
processor.rs

1//! Span processor implementation optimized for AWS Lambda functions.
2//!
3//! This module provides a Lambda-optimized span processor that efficiently manages OpenTelemetry spans
4//! in a serverless environment. It uses a ring buffer to store spans in memory and provides efficient
5//! batch processing capabilities.
6//!
7//! # Architecture
8//!
9//! The processor is designed specifically for the Lambda execution environment:
10//!
11//! 1. **Ring Buffer Storage**:
12//!    - Fixed-size circular buffer prevents memory growth
13//!    - O(1) push operations with no memory reallocation
14//!    - FIFO ordering ensures spans are processed in order
15//!    - Efficient batch removal for export
16//!    - When full, new spans are dropped (with warning logs)
17//!
18//! 2. **Thread Safety**:
19//!    - All operations are thread-safe
20//!    - Uses Mutex for span buffer access
21//!    - Atomic operations for state management
22//!    - Safe for concurrent span submission
23//!
24//! # Configuration
25//!
26//! The processor can be configured through environment variables:
27//!
28//! - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls buffer size
29//!   - Defaults to 2048 spans
30//!   - Should be tuned based on span volume
31//!
32//! - `LAMBDA_SPAN_PROCESSOR_BATCH_SIZE`: Controls batch size
33//!   - Defaults to 512 spans
34//!   - Should be tuned based on span volume
35//!
36//! # Usage Examples
37//!
38//! Basic setup with default configuration:
39//!
40//! ```no_run
41//! use lambda_otel_lite::LambdaSpanProcessor;
42//! use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
43//!
44//! let processor = LambdaSpanProcessor::builder()
45//!     .exporter(OtlpStdoutSpanExporter::default())
46//!     .build();
47//! ```
48//!
49//! Using with an OTLP HTTP exporter:
50//!
51//! ```no_run
52//! use lambda_otel_lite::LambdaSpanProcessor;
53//! use opentelemetry_otlp::{SpanExporter, Protocol};
54//! use opentelemetry_otlp::{WithExportConfig, WithHttpConfig};
55//!
56//! // Important: When using HTTP exporters, always use reqwest::blocking::Client
57//! // Using async clients will cause deadlocks
58//! let exporter = SpanExporter::builder()
59//!     .with_http()
60//!     .with_http_client(reqwest::blocking::Client::new())
61//!     .with_protocol(Protocol::HttpBinary)
62//!     .build()
63//!     .expect("Failed to create exporter");
64//!
65//! let processor = LambdaSpanProcessor::builder()
66//!     .exporter(exporter)
67//!     .max_queue_size(4096)
68//!     .max_batch_size(1024)
69//!     .build();
70//! ```
71//!
72//! # Performance Considerations
73//!
74//! 1. **Memory Usage**:
75//!    - Fixed memory footprint based on queue size
76//!    - Each span typically uses 100-500 bytes
77//!    - Default 2048 spans ≈ 0.5-1MB memory
78//!
79//! 2. **Latency Impact**:
80//!    - Batch processing reduces network overhead
81//!    - Configurable batch size allows tuning for your use case
82//!    - Force flush available for immediate export when needed
83//!
84//! 3. **Reliability**:
85//!    - Spans may be dropped if buffer fills
86//!    - Warning logs indicate dropped spans
87//!    - Consider increasing buffer size if spans are dropped
88//!
89//! # Best Practices
90//!
91//! 1. **Buffer Sizing**:
92//!    - Monitor dropped_spans metric
93//!    - Size based on max spans per invocation
94//!    - Consider function memory when sizing
95//!
96//! 2. **Batch Configuration**:
97//!    - Larger batches improve throughput but increase memory usage
98//!    - Smaller batches reduce memory but increase network overhead
99//!    - Default values work well for most use cases
100//!
101//! 3. **Error Handling**:
102//!    - Export errors are logged but don't fail function
103//!    - Monitor for export failures in logs
104//!    - Consider retry strategies in custom exporters
105
106use crate::constants::{defaults, env_vars};
107use crate::logger::Logger;
108use bon::bon;
109
110/// Module-specific logger
111static LOGGER: Logger = Logger::const_new("processor");
112
113use opentelemetry::Context;
114use opentelemetry_sdk::{
115    error::{OTelSdkError, OTelSdkResult},
116    trace::{Span, SpanProcessor},
117    trace::{SpanData, SpanExporter},
118    Resource,
119};
120use std::env;
121use std::sync::{
122    atomic::{AtomicBool, AtomicUsize, Ordering},
123    Arc, Mutex,
124};
125
126/// A fixed-size ring buffer for storing spans efficiently.
127///
128/// This implementation provides a memory-efficient way to store spans with
129/// predictable performance characteristics:
130///
131/// # Performance Characteristics
132///
133/// - Push Operation: O(1)
134/// - Memory Usage: Fixed based on capacity
135/// - Order: FIFO (First In, First Out)
136/// - Batch Operations: Efficient removal of all spans
137///
138/// # Implementation Details
139///
140/// The buffer uses a circular array with head and tail pointers:
141/// - `head`: Points to next write position
142/// - `tail`: Points to next read position
143/// - `size`: Current number of elements
144/// - `capacity`: Maximum number of elements
145///
146/// When the buffer is full, new spans are rejected rather than overwriting old ones.
147/// This ensures no data loss occurs silently.
148#[derive(Debug)]
149struct SpanRingBuffer {
150    buffer: Vec<Option<SpanData>>,
151    head: usize, // Where to write next
152    tail: usize, // Where to read next
153    size: usize, // Current number of elements
154    capacity: usize,
155}
156
157impl Default for SpanRingBuffer {
158    fn default() -> Self {
159        Self::new(2048) // Default capacity
160    }
161}
162
163impl SpanRingBuffer {
164    fn new(capacity: usize) -> Self {
165        let mut buffer = Vec::with_capacity(capacity);
166        buffer.extend((0..capacity).map(|_| None));
167        Self {
168            buffer,
169            head: 0,
170            tail: 0,
171            size: 0,
172            capacity,
173        }
174    }
175
176    fn push(&mut self, span: SpanData) -> bool {
177        if self.size == self.capacity {
178            return false;
179        }
180
181        self.buffer[self.head] = Some(span);
182        self.head = (self.head + 1) % self.capacity;
183        self.size += 1;
184        true
185    }
186
187    fn take_batch(&mut self, max_batch_size: usize) -> Vec<SpanData> {
188        let batch_size = self.size.min(max_batch_size);
189        let mut result = Vec::with_capacity(batch_size);
190
191        for _ in 0..batch_size {
192            if let Some(span) = self.buffer[self.tail].take() {
193                result.push(span);
194            }
195            self.tail = (self.tail + 1) % self.capacity;
196            self.size -= 1;
197        }
198
199        if self.size == 0 {
200            self.head = 0;
201            self.tail = 0;
202        }
203
204        result
205    }
206
207    fn is_empty(&self) -> bool {
208        self.size == 0
209    }
210}
211
212/// A span processor optimized for AWS Lambda functions.
213///
214/// This processor efficiently manages spans in a Lambda environment:
215/// - Uses a fixed-size ring buffer to prevent memory growth
216/// - Supports synchronous and asynchronous export modes
217/// - Handles graceful shutdown for Lambda termination
218///
219/// # Examples
220///
221/// ```
222/// use lambda_otel_lite::LambdaSpanProcessor;
223/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
224///
225/// let processor = LambdaSpanProcessor::builder()
226///     .exporter(OtlpStdoutSpanExporter::default())
227///     .build();
228/// ```
229///
230/// With custom configuration:
231///
232/// ```
233/// use lambda_otel_lite::LambdaSpanProcessor;
234/// use otlp_stdout_span_exporter::OtlpStdoutSpanExporter;
235///
236/// let processor = LambdaSpanProcessor::builder()
237///     .exporter(OtlpStdoutSpanExporter::default())
238///     .max_queue_size(1000)
239///     .max_batch_size(100)
240///     .build();
241/// ```
242#[derive(Debug)]
243pub struct LambdaSpanProcessor<E>
244where
245    E: SpanExporter + std::fmt::Debug,
246{
247    /// The exporter used to export spans
248    exporter: Mutex<E>,
249
250    /// Internal buffer for storing spans
251    spans: Mutex<SpanRingBuffer>,
252
253    /// Flag indicating whether the processor is shut down
254    is_shutdown: Arc<AtomicBool>,
255
256    /// Counter for dropped spans
257    dropped_count: AtomicUsize,
258
259    /// Maximum number of spans to export in a single batch
260    max_batch_size: usize,
261}
262
263#[bon]
264impl<E> LambdaSpanProcessor<E>
265where
266    E: SpanExporter + std::fmt::Debug,
267{
268    /// Creates a new LambdaSpanProcessor with the given exporter and configuration
269    ///
270    /// # Environment Variable Precedence
271    ///
272    /// Configuration values follow this precedence order:
273    /// 1. Environment variables (highest precedence)
274    /// 2. Constructor parameters
275    /// 3. Default values (lowest precedence)
276    ///
277    /// The relevant environment variables are:
278    /// - `LAMBDA_SPAN_PROCESSOR_BATCH_SIZE`: Controls the maximum batch size
279    /// - `LAMBDA_SPAN_PROCESSOR_QUEUE_SIZE`: Controls the maximum queue size
280    #[builder]
281    pub fn new(exporter: E, max_batch_size: Option<usize>, max_queue_size: Option<usize>) -> Self {
282        // Get batch size with proper precedence (env var > param > default)
283        let max_batch_size = match env::var(env_vars::BATCH_SIZE) {
284            Ok(value) => match value.parse::<usize>() {
285                Ok(size) => size,
286                Err(_) => {
287                    LOGGER.warn(format!(
288                        "Failed to parse {}: {}, using fallback",
289                        env_vars::BATCH_SIZE,
290                        value
291                    ));
292                    max_batch_size.unwrap_or(defaults::BATCH_SIZE)
293                }
294            },
295            Err(_) => max_batch_size.unwrap_or(defaults::BATCH_SIZE),
296        };
297
298        // Get queue size with proper precedence (env var > param > default)
299        let max_queue_size = match env::var(env_vars::QUEUE_SIZE) {
300            Ok(value) => match value.parse::<usize>() {
301                Ok(size) => size,
302                Err(_) => {
303                    LOGGER.warn(format!(
304                        "Failed to parse {}: {}, using fallback",
305                        env_vars::QUEUE_SIZE,
306                        value
307                    ));
308                    max_queue_size.unwrap_or(defaults::QUEUE_SIZE)
309                }
310            },
311            Err(_) => max_queue_size.unwrap_or(defaults::QUEUE_SIZE),
312        };
313
314        Self {
315            exporter: Mutex::new(exporter),
316            spans: Mutex::new(SpanRingBuffer::new(max_queue_size)),
317            is_shutdown: Arc::new(AtomicBool::new(false)),
318            dropped_count: AtomicUsize::new(0),
319            max_batch_size,
320        }
321    }
322}
323
324impl<E> SpanProcessor for LambdaSpanProcessor<E>
325where
326    E: SpanExporter + std::fmt::Debug,
327{
328    fn on_start(&self, _span: &mut Span, _cx: &Context) {
329        // No-op, as we only process spans on end
330    }
331
332    fn on_end(&self, span: SpanData) {
333        if self.is_shutdown.load(Ordering::Relaxed) {
334            LOGGER.warn("LambdaSpanProcessor.on_end: processor is shut down, dropping span");
335            self.dropped_count.fetch_add(1, Ordering::Relaxed);
336            return;
337        }
338
339        // Skip unsampled spans
340        if !span.span_context.is_sampled() {
341            return;
342        }
343
344        // Try to add span to the buffer
345        if let Ok(mut spans) = self.spans.lock() {
346            if !spans.push(span) {
347                let prev = self.dropped_count.fetch_add(1, Ordering::Relaxed);
348                if prev == 0 || prev % 100 == 0 {
349                    LOGGER.warn(format!(
350                        "LambdaSpanProcessor.on_end: Dropping span because buffer is full (dropped_spans={})",
351                        prev + 1
352                    ));
353                }
354            }
355        } else {
356            LOGGER.warn("LambdaSpanProcessor.on_end: Failed to acquire spans lock in on_end");
357        }
358    }
359
360    fn force_flush(&self) -> OTelSdkResult {
361        LOGGER.debug("LambdaSpanProcessor.force_flush: flushing spans");
362        if let Ok(mut spans) = self.spans.lock() {
363            if spans.is_empty() {
364                return Ok(());
365            }
366
367            let exporter = self.exporter.lock().map_err(|_| {
368                OTelSdkError::InternalFailure(
369                    "Failed to acquire exporter lock in force_flush".to_string(),
370                )
371            })?;
372
373            // Process spans in batches
374            while !spans.is_empty() {
375                let batch = spans.take_batch(self.max_batch_size);
376                if !batch.is_empty() {
377                    let result = futures_executor::block_on(exporter.export(batch));
378                    if let Err(err) = &result {
379                        LOGGER.debug(format!("LambdaSpanProcessor.force_flush.Error: {:?}", err));
380                        return result;
381                    }
382                }
383            }
384            Ok(())
385        } else {
386            Err(OTelSdkError::InternalFailure(
387                "Failed to acquire spans lock in force_flush".to_string(),
388            ))
389        }
390    }
391
392    fn shutdown(&self) -> OTelSdkResult {
393        self.is_shutdown.store(true, Ordering::Relaxed);
394        // Flush any remaining spans
395        self.force_flush()?;
396        if let Ok(mut exporter) = self.exporter.lock() {
397            exporter.shutdown()
398        } else {
399            Err(OTelSdkError::InternalFailure(
400                "Failed to acquire exporter lock in shutdown".to_string(),
401            ))
402        }
403    }
404
405    fn set_resource(&mut self, resource: &Resource) {
406        if let Ok(mut exporter) = self.exporter.lock() {
407            exporter.set_resource(resource);
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use crate::logger::Logger;
416    use opentelemetry::{
417        trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState},
418        InstrumentationScope,
419    };
420    use opentelemetry_sdk::{
421        trace::SpanExporter,
422        trace::{SpanEvents, SpanLinks},
423    };
424    use serial_test::serial;
425    use std::{borrow::Cow, sync::Arc};
426    use tokio::sync::Mutex;
427
428    fn setup_test_logger() -> Logger {
429        Logger::new("test")
430    }
431
432    // Mock exporter that captures exported spans
433    #[derive(Debug)]
434    struct MockExporter {
435        spans: Arc<Mutex<Vec<SpanData>>>,
436    }
437
438    impl MockExporter {
439        fn new() -> Self {
440            Self {
441                spans: Arc::new(Mutex::new(Vec::new())),
442            }
443        }
444    }
445
446    impl SpanExporter for MockExporter {
447        fn export(
448            &self,
449            batch: Vec<SpanData>,
450        ) -> impl std::future::Future<Output = opentelemetry_sdk::error::OTelSdkResult> + Send
451        {
452            let spans = self.spans.clone();
453            Box::pin(async move {
454                let mut spans = spans.lock().await;
455                spans.extend(batch);
456                Ok(())
457            })
458        }
459
460        fn shutdown(&mut self) -> OTelSdkResult {
461            Ok(())
462        }
463    }
464
465    // Helper function to create a test span
466    fn create_test_span(name: &str) -> SpanData {
467        let flags = TraceFlags::default().with_sampled(true);
468
469        SpanData {
470            span_context: SpanContext::new(
471                TraceId::from_hex("01000000000000000000000000000000").unwrap(),
472                SpanId::from_hex("0100000000000001").unwrap(),
473                flags,
474                false,
475                TraceState::default(),
476            ),
477            parent_span_id: SpanId::INVALID,
478            span_kind: opentelemetry::trace::SpanKind::Internal,
479            name: Cow::Owned(name.to_string()),
480            start_time: std::time::SystemTime::now(),
481            end_time: std::time::SystemTime::now(),
482            attributes: Vec::new(),
483            dropped_attributes_count: 0,
484            events: SpanEvents::default(),
485            links: SpanLinks::default(),
486            status: opentelemetry::trace::Status::default(),
487            instrumentation_scope: InstrumentationScope::builder("test").build(),
488        }
489    }
490
491    fn cleanup_env() {
492        env::remove_var(env_vars::BATCH_SIZE);
493        env::remove_var(env_vars::QUEUE_SIZE);
494        env::remove_var(env_vars::PROCESSOR_MODE);
495        env::remove_var(env_vars::COMPRESSION_LEVEL);
496        env::remove_var(env_vars::SERVICE_NAME);
497    }
498
499    #[test]
500    #[serial]
501    fn test_ring_buffer_basic_operations() {
502        let mut buffer = SpanRingBuffer::new(2);
503
504        // Test empty buffer
505        assert!(buffer.is_empty());
506        assert_eq!(buffer.take_batch(2), vec![]);
507
508        // Test adding spans
509        buffer.push(create_test_span("span1"));
510        buffer.push(create_test_span("span2"));
511
512        assert!(!buffer.is_empty());
513
514        // Test taking spans
515        let spans = buffer.take_batch(2);
516        assert_eq!(spans.len(), 2);
517        assert!(buffer.is_empty());
518    }
519
520    #[test]
521    #[serial]
522    fn test_ring_buffer_overflow() {
523        let mut buffer = SpanRingBuffer::new(2);
524
525        // Fill buffer
526        buffer.push(create_test_span("span1"));
527        buffer.push(create_test_span("span2"));
528
529        // Add one more span, should overwrite the oldest
530        let success = buffer.push(create_test_span("span3"));
531        assert!(!success); // Should fail since buffer is full
532
533        let spans = buffer.take_batch(2);
534        assert_eq!(spans.len(), 2);
535        assert!(spans.iter().any(|s| s.name == "span1"));
536        assert!(spans.iter().any(|s| s.name == "span2"));
537    }
538
539    #[test]
540    #[serial]
541    fn test_ring_buffer_batch_operations() {
542        let mut buffer = SpanRingBuffer::new(5);
543
544        // Add 5 spans
545        for i in 0..5 {
546            buffer.push(create_test_span(&format!("span{}", i)));
547        }
548
549        assert_eq!(buffer.take_batch(2).len(), 2);
550        assert_eq!(buffer.take_batch(2).len(), 2);
551        assert_eq!(buffer.take_batch(2).len(), 1);
552        assert!(buffer.is_empty());
553    }
554
555    #[tokio::test]
556    #[serial]
557    async fn test_processor_sync_mode() {
558        let _logger = setup_test_logger();
559        let mock_exporter = MockExporter::new();
560        let spans_exported = mock_exporter.spans.clone();
561
562        let processor = LambdaSpanProcessor::builder()
563            .exporter(mock_exporter)
564            .max_queue_size(10)
565            .max_batch_size(5)
566            .build();
567
568        // Test span processing
569        processor.on_end(create_test_span("test_span"));
570
571        // Force flush to ensure export
572        processor.force_flush().unwrap();
573
574        // Verify span was exported
575        let exported = spans_exported.lock().await;
576        assert_eq!(exported.len(), 1);
577        assert_eq!(exported[0].name, "test_span");
578    }
579
580    #[tokio::test]
581    #[serial]
582    async fn test_shutdown_exports_remaining_spans() {
583        let _logger = setup_test_logger();
584        let mock_exporter = MockExporter::new();
585        let spans_exported = mock_exporter.spans.clone();
586
587        let processor = LambdaSpanProcessor::builder()
588            .exporter(mock_exporter)
589            .max_queue_size(10)
590            .max_batch_size(5)
591            .build();
592
593        // Add some spans
594        processor.on_end(create_test_span("span1"));
595        processor.on_end(create_test_span("span2"));
596
597        // Shutdown should export all spans
598        processor.shutdown().unwrap();
599
600        // Verify all spans were exported
601        let exported = spans_exported.lock().await;
602        assert_eq!(exported.len(), 2);
603
604        // Verify new spans are dropped after shutdown
605        processor.on_end(create_test_span("span3"));
606        assert_eq!(exported.len(), 2); // No new spans after shutdown
607    }
608
609    #[tokio::test]
610    #[serial]
611    async fn test_concurrent_span_processing() {
612        let _logger = setup_test_logger();
613        let mock_exporter = MockExporter::new();
614        let spans_exported = mock_exporter.spans.clone();
615
616        let processor = Arc::new(
617            LambdaSpanProcessor::builder()
618                .exporter(mock_exporter)
619                .max_queue_size(100)
620                .max_batch_size(25)
621                .build(),
622        );
623
624        let mut handles = Vec::new();
625
626        // Spawn 10 tasks, each adding 10 spans
627        for i in 0..10 {
628            let processor = processor.clone();
629            handles.push(tokio::spawn(async move {
630                for j in 0..10 {
631                    processor.on_end(create_test_span(&format!("span_{}_{}", i, j)));
632                }
633            }));
634        }
635
636        // Wait for all tasks to complete
637        for handle in handles {
638            handle.await.unwrap();
639        }
640
641        // Force flush and verify all spans were processed
642        processor.force_flush().unwrap();
643
644        let exported = spans_exported.lock().await;
645        assert_eq!(exported.len(), 100);
646        assert_eq!(processor.dropped_count.load(Ordering::Relaxed), 0);
647    }
648
649    #[test]
650    #[serial]
651    fn test_batch_processing() {
652        let _logger = setup_test_logger();
653        let mock_exporter = MockExporter::new();
654        let processor = LambdaSpanProcessor::builder()
655            .exporter(mock_exporter)
656            .max_queue_size(10)
657            .max_batch_size(3)
658            .build();
659
660        // Add 5 spans
661        for i in 0..5 {
662            processor.on_end(create_test_span(&format!("span{}", i)));
663        }
664
665        // Force flush should process in batches of 3
666        processor.force_flush().unwrap();
667
668        // Add 2 more spans
669        processor.on_end(create_test_span("span5"));
670        processor.on_end(create_test_span("span6"));
671
672        // Final flush
673        processor.force_flush().unwrap();
674    }
675
676    #[test]
677    #[serial]
678    fn test_builder_default_values() {
679        cleanup_env();
680
681        let mock_exporter = MockExporter::new();
682
683        let processor = LambdaSpanProcessor::builder()
684            .exporter(mock_exporter)
685            .build();
686
687        // Check default values
688        assert_eq!(processor.max_batch_size, 512); // Default batch size
689        assert_eq!(processor.spans.lock().unwrap().capacity, 2048); // Default queue size
690    }
691
692    #[test]
693    #[serial]
694    fn test_builder_env_var_values() {
695        cleanup_env();
696
697        let mock_exporter = MockExporter::new();
698
699        // Set custom values via env vars
700        env::set_var(env_vars::BATCH_SIZE, "100");
701        env::set_var(env_vars::QUEUE_SIZE, "1000");
702
703        let processor = LambdaSpanProcessor::builder()
704            .exporter(mock_exporter)
705            .build();
706
707        // Check that env var values were used
708        assert_eq!(processor.max_batch_size, 100);
709        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
710
711        cleanup_env();
712    }
713
714    #[test]
715    #[serial]
716    fn test_builder_env_var_precedence() {
717        cleanup_env();
718
719        let mock_exporter = MockExporter::new();
720
721        // Set custom values via env vars
722        env::set_var(env_vars::BATCH_SIZE, "100");
723        env::set_var(env_vars::QUEUE_SIZE, "1000");
724
725        // Create with explicit values (should be overridden by env vars)
726        let processor = LambdaSpanProcessor::builder()
727            .exporter(mock_exporter)
728            .max_batch_size(50)
729            .max_queue_size(500)
730            .build();
731
732        // Check that env var values took precedence
733        assert_eq!(processor.max_batch_size, 100);
734        assert_eq!(processor.spans.lock().unwrap().capacity, 1000);
735
736        cleanup_env();
737    }
738
739    #[test]
740    #[serial]
741    fn test_invalid_env_vars() {
742        cleanup_env();
743
744        let mock_exporter = MockExporter::new();
745
746        // Set invalid values via env vars
747        env::set_var(env_vars::BATCH_SIZE, "not_a_number");
748        env::set_var(env_vars::QUEUE_SIZE, "invalid");
749
750        // Create with explicit values (should be used as fallbacks)
751        let processor = LambdaSpanProcessor::builder()
752            .exporter(mock_exporter)
753            .max_batch_size(50)
754            .max_queue_size(500)
755            .build();
756
757        // Check that fallback values were used
758        assert_eq!(processor.max_batch_size, 50);
759        assert_eq!(processor.spans.lock().unwrap().capacity, 500);
760
761        cleanup_env();
762    }
763}