opentelemetry_lambda_extension/
runtime.rs

1//! Extension runtime orchestrator.
2//!
3//! This module provides the main runtime that coordinates all extension components:
4//! - Extensions API client for lifecycle management (via `lambda_extension` crate)
5//! - Telemetry API subscription for platform events
6//! - OTLP receiver for function telemetry
7//! - Signal aggregation and export
8
9use crate::config::Config;
10use crate::receiver::{OtlpReceiver, Signal};
11use crate::resource::{ResourceBuilder, detect_resource, to_proto_resource};
12use crate::service::{EventsService, ExtensionState, TelemetryService};
13use lambda_extension::{Extension, SharedService};
14use opentelemetry_sdk::resource::Resource as SdkResource;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::mpsc;
18use tokio_util::sync::CancellationToken;
19
20/// Extension runtime that orchestrates all components.
21pub struct ExtensionRuntime {
22    config: Config,
23    cancel_token: CancellationToken,
24    resource: SdkResource,
25}
26
27impl ExtensionRuntime {
28    /// Creates a new extension runtime with the given configuration.
29    pub fn new(config: Config) -> Self {
30        Self {
31            config,
32            cancel_token: CancellationToken::new(),
33            resource: detect_resource(),
34        }
35    }
36
37    /// Creates a runtime with default configuration.
38    pub fn with_defaults() -> Self {
39        Self::new(Config::default())
40    }
41
42    /// Sets a custom resource for this runtime.
43    pub fn with_resource(mut self, resource: SdkResource) -> Self {
44        self.resource = resource;
45        self
46    }
47
48    /// Returns a handle to the cancellation token.
49    pub fn cancellation_token(&self) -> CancellationToken {
50        self.cancel_token.clone()
51    }
52
53    /// Runs the extension runtime.
54    ///
55    /// This method uses the official `lambda_extension` crate with Tower services
56    /// for proper lifecycle management. It:
57    /// 1. Registers with the Extensions API
58    /// 2. Subscribes to the Telemetry API for platform events
59    /// 3. Starts the OTLP receiver for function telemetry
60    /// 4. Runs the event loop until shutdown
61    ///
62    /// # Errors
63    ///
64    /// Returns an error if any component fails to start or if the extension
65    /// cannot register with Lambda.
66    pub async fn run(self) -> Result<(), RuntimeError> {
67        tracing::debug!("Starting extension with lambda_extension crate");
68
69        // Create shared state for services
70        // Convert SDK Resource to proto Resource for internal use
71        let proto_resource = to_proto_resource(&self.resource);
72        let (state, shutdown_rx) = ExtensionState::new(self.config.clone(), proto_resource)
73            .map_err(|e| RuntimeError::StateInit(Box::new(e)))?;
74        let state = Arc::new(state);
75        tracing::debug!("Extension state created");
76
77        // Create Tower services
78        let events_service = EventsService::new(Arc::clone(&state));
79        let telemetry_service = TelemetryService::new(Arc::clone(&state));
80
81        // Start OTLP receiver for function telemetry
82        let signal_tx = {
83            let aggregator = Arc::clone(&state.aggregator);
84            let (tx, mut rx) = mpsc::channel::<Signal>(self.config.telemetry_api.buffer_size);
85
86            // Spawn task to forward signals to aggregator
87            tokio::spawn(async move {
88                while let Some(signal) = rx.recv().await {
89                    aggregator.add(signal).await;
90                }
91            });
92
93            tx
94        };
95
96        let receiver = OtlpReceiver::new(
97            self.config.receiver.clone(),
98            signal_tx,
99            self.cancel_token.clone(),
100        );
101
102        let (_receiver_handle, receiver_future) = receiver
103            .start()
104            .await
105            .map_err(RuntimeError::ReceiverStart)?;
106
107        let receiver_task = tokio::spawn(receiver_future);
108
109        // Build and run the extension using lambda_extension
110        // Note: Each with_* method returns a new type, so we must chain or branch.
111        // We always enable telemetry processing when using this method since it's
112        // the recommended path for proper lifecycle handling.
113        tracing::debug!("Building Extension and starting run loop");
114
115        let extension_future = Extension::new()
116            .with_events_processor(events_service)
117            .with_telemetry_types(&["platform", "function", "extension"])
118            .with_telemetry_processor(SharedService::new(telemetry_service))
119            .run();
120
121        // Race between the extension event loop and the shutdown signal.
122        // The shutdown signal is sent after final_flush completes in the SHUTDOWN handler.
123        // This allows us to exit gracefully before the extension tries to poll /next again.
124        let result = tokio::select! {
125            result = extension_future => {
126                result.map_err(|e| {
127                    tracing::error!(error = %e, "Extension run failed");
128                    RuntimeError::EventLoop(e)
129                })
130            }
131            _ = shutdown_rx => {
132                tracing::info!("Shutdown complete, exiting event loop");
133                Ok(())
134            }
135        };
136        tracing::debug!(?result, "Extension finished");
137
138        self.cancel_token.cancel();
139        let _ = tokio::time::timeout(Duration::from_secs(2), receiver_task).await;
140
141        result
142    }
143}
144
145/// Errors from the extension runtime.
146#[non_exhaustive]
147#[derive(Debug, thiserror::Error)]
148pub enum RuntimeError {
149    /// Failed to create extension state during initialisation.
150    #[error("failed to create extension state")]
151    StateInit(#[source] Box<crate::exporter::ExportError>),
152
153    /// Failed to start OTLP receiver.
154    #[error("failed to start OTLP receiver")]
155    ReceiverStart(#[source] std::io::Error),
156
157    /// Event loop encountered an error.
158    #[error("event loop error")]
159    EventLoop(#[source] Box<dyn std::error::Error + Send + Sync>),
160}
161
162/// Builder for configuring the extension runtime.
163#[must_use = "builders do nothing unless .build() is called"]
164pub struct RuntimeBuilder {
165    config: Config,
166    resource: Option<SdkResource>,
167}
168
169impl RuntimeBuilder {
170    /// Creates a new runtime builder with default configuration.
171    pub fn new() -> Self {
172        Self {
173            config: Config::default(),
174            resource: None,
175        }
176    }
177
178    /// Sets the configuration.
179    pub fn config(mut self, config: Config) -> Self {
180        self.config = config;
181        self
182    }
183
184    /// Sets the OTLP exporter endpoint.
185    pub fn exporter_endpoint(mut self, endpoint: impl Into<String>) -> Self {
186        self.config.exporter.endpoint = Some(endpoint.into());
187        self
188    }
189
190    /// Sets the flush strategy.
191    pub fn flush_strategy(mut self, strategy: crate::config::FlushStrategy) -> Self {
192        self.config.flush.strategy = strategy;
193        self
194    }
195
196    /// Disables the Telemetry API subscription.
197    pub fn disable_telemetry_api(mut self) -> Self {
198        self.config.telemetry_api.enabled = false;
199        self
200    }
201
202    /// Sets a custom resource.
203    pub fn resource(mut self, resource: SdkResource) -> Self {
204        self.resource = Some(resource);
205        self
206    }
207
208    /// Adds custom resource attributes.
209    pub fn with_resource_attributes<F>(mut self, f: F) -> Self
210    where
211        F: FnOnce(ResourceBuilder) -> ResourceBuilder,
212    {
213        let builder = ResourceBuilder::new();
214        self.resource = Some(f(builder).build());
215        self
216    }
217
218    /// Builds the extension runtime.
219    pub fn build(self) -> ExtensionRuntime {
220        let mut runtime = ExtensionRuntime::new(self.config);
221        if let Some(resource) = self.resource {
222            runtime = runtime.with_resource(resource);
223        }
224        runtime
225    }
226}
227
228impl Default for RuntimeBuilder {
229    fn default() -> Self {
230        Self::new()
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn test_runtime_builder() {
240        let runtime = RuntimeBuilder::new()
241            .exporter_endpoint("http://localhost:4318")
242            .flush_strategy(crate::config::FlushStrategy::End)
243            .disable_telemetry_api()
244            .build();
245
246        assert_eq!(
247            runtime.config.exporter.endpoint,
248            Some("http://localhost:4318".to_string())
249        );
250        assert_eq!(
251            runtime.config.flush.strategy,
252            crate::config::FlushStrategy::End
253        );
254        assert!(!runtime.config.telemetry_api.enabled);
255    }
256
257    #[test]
258    fn test_runtime_with_defaults() {
259        let runtime = ExtensionRuntime::with_defaults();
260        assert!(runtime.config.telemetry_api.enabled);
261    }
262
263    #[test]
264    fn test_runtime_error_display() {
265        use std::error::Error;
266
267        let io_err = std::io::Error::new(std::io::ErrorKind::AddrInUse, "port in use");
268        let err = RuntimeError::ReceiverStart(io_err);
269
270        assert!(format!("{}", err).contains("receiver"));
271        assert!(err.source().is_some());
272    }
273
274    #[test]
275    fn test_cancellation_token() {
276        let runtime = ExtensionRuntime::with_defaults();
277        let token = runtime.cancellation_token();
278        assert!(!token.is_cancelled());
279    }
280}