opentelemetry_lambda_extension/
runtime.rs1use crate::config::Config;
10use crate::receiver::{OtlpReceiver, Signal};
11use crate::resource::{ResourceBuilder, detect_resource, to_proto_resource};
12use crate::service::{EventsService, ExtensionState, TelemetryService};
13use lambda_extension::{Extension, SharedService};
14use opentelemetry_sdk::resource::Resource as SdkResource;
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::sync::mpsc;
18use tokio_util::sync::CancellationToken;
19
20pub struct ExtensionRuntime {
22 config: Config,
23 cancel_token: CancellationToken,
24 resource: SdkResource,
25}
26
27impl ExtensionRuntime {
28 pub fn new(config: Config) -> Self {
30 Self {
31 config,
32 cancel_token: CancellationToken::new(),
33 resource: detect_resource(),
34 }
35 }
36
37 pub fn with_defaults() -> Self {
39 Self::new(Config::default())
40 }
41
42 pub fn with_resource(mut self, resource: SdkResource) -> Self {
44 self.resource = resource;
45 self
46 }
47
48 pub fn cancellation_token(&self) -> CancellationToken {
50 self.cancel_token.clone()
51 }
52
53 pub async fn run(self) -> Result<(), RuntimeError> {
67 tracing::debug!("Starting extension with lambda_extension crate");
68
69 let proto_resource = to_proto_resource(&self.resource);
72 let (state, shutdown_rx) = ExtensionState::new(self.config.clone(), proto_resource)
73 .map_err(|e| RuntimeError::StateInit(Box::new(e)))?;
74 let state = Arc::new(state);
75 tracing::debug!("Extension state created");
76
77 let events_service = EventsService::new(Arc::clone(&state));
79 let telemetry_service = TelemetryService::new(Arc::clone(&state));
80
81 let signal_tx = {
83 let aggregator = Arc::clone(&state.aggregator);
84 let (tx, mut rx) = mpsc::channel::<Signal>(self.config.telemetry_api.buffer_size);
85
86 tokio::spawn(async move {
88 while let Some(signal) = rx.recv().await {
89 aggregator.add(signal).await;
90 }
91 });
92
93 tx
94 };
95
96 let receiver = OtlpReceiver::new(
97 self.config.receiver.clone(),
98 signal_tx,
99 self.cancel_token.clone(),
100 );
101
102 let (_receiver_handle, receiver_future) = receiver
103 .start()
104 .await
105 .map_err(RuntimeError::ReceiverStart)?;
106
107 let receiver_task = tokio::spawn(receiver_future);
108
109 tracing::debug!("Building Extension and starting run loop");
114
115 let extension_future = Extension::new()
116 .with_events_processor(events_service)
117 .with_telemetry_types(&["platform", "function", "extension"])
118 .with_telemetry_processor(SharedService::new(telemetry_service))
119 .run();
120
121 let result = tokio::select! {
125 result = extension_future => {
126 result.map_err(|e| {
127 tracing::error!(error = %e, "Extension run failed");
128 RuntimeError::EventLoop(e)
129 })
130 }
131 _ = shutdown_rx => {
132 tracing::info!("Shutdown complete, exiting event loop");
133 Ok(())
134 }
135 };
136 tracing::debug!(?result, "Extension finished");
137
138 self.cancel_token.cancel();
139 let _ = tokio::time::timeout(Duration::from_secs(2), receiver_task).await;
140
141 result
142 }
143}
144
145#[non_exhaustive]
147#[derive(Debug, thiserror::Error)]
148pub enum RuntimeError {
149 #[error("failed to create extension state")]
151 StateInit(#[source] Box<crate::exporter::ExportError>),
152
153 #[error("failed to start OTLP receiver")]
155 ReceiverStart(#[source] std::io::Error),
156
157 #[error("event loop error")]
159 EventLoop(#[source] Box<dyn std::error::Error + Send + Sync>),
160}
161
162#[must_use = "builders do nothing unless .build() is called"]
164pub struct RuntimeBuilder {
165 config: Config,
166 resource: Option<SdkResource>,
167}
168
169impl RuntimeBuilder {
170 pub fn new() -> Self {
172 Self {
173 config: Config::default(),
174 resource: None,
175 }
176 }
177
178 pub fn config(mut self, config: Config) -> Self {
180 self.config = config;
181 self
182 }
183
184 pub fn exporter_endpoint(mut self, endpoint: impl Into<String>) -> Self {
186 self.config.exporter.endpoint = Some(endpoint.into());
187 self
188 }
189
190 pub fn flush_strategy(mut self, strategy: crate::config::FlushStrategy) -> Self {
192 self.config.flush.strategy = strategy;
193 self
194 }
195
196 pub fn disable_telemetry_api(mut self) -> Self {
198 self.config.telemetry_api.enabled = false;
199 self
200 }
201
202 pub fn resource(mut self, resource: SdkResource) -> Self {
204 self.resource = Some(resource);
205 self
206 }
207
208 pub fn with_resource_attributes<F>(mut self, f: F) -> Self
210 where
211 F: FnOnce(ResourceBuilder) -> ResourceBuilder,
212 {
213 let builder = ResourceBuilder::new();
214 self.resource = Some(f(builder).build());
215 self
216 }
217
218 pub fn build(self) -> ExtensionRuntime {
220 let mut runtime = ExtensionRuntime::new(self.config);
221 if let Some(resource) = self.resource {
222 runtime = runtime.with_resource(resource);
223 }
224 runtime
225 }
226}
227
228impl Default for RuntimeBuilder {
229 fn default() -> Self {
230 Self::new()
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn test_runtime_builder() {
240 let runtime = RuntimeBuilder::new()
241 .exporter_endpoint("http://localhost:4318")
242 .flush_strategy(crate::config::FlushStrategy::End)
243 .disable_telemetry_api()
244 .build();
245
246 assert_eq!(
247 runtime.config.exporter.endpoint,
248 Some("http://localhost:4318".to_string())
249 );
250 assert_eq!(
251 runtime.config.flush.strategy,
252 crate::config::FlushStrategy::End
253 );
254 assert!(!runtime.config.telemetry_api.enabled);
255 }
256
257 #[test]
258 fn test_runtime_with_defaults() {
259 let runtime = ExtensionRuntime::with_defaults();
260 assert!(runtime.config.telemetry_api.enabled);
261 }
262
263 #[test]
264 fn test_runtime_error_display() {
265 use std::error::Error;
266
267 let io_err = std::io::Error::new(std::io::ErrorKind::AddrInUse, "port in use");
268 let err = RuntimeError::ReceiverStart(io_err);
269
270 assert!(format!("{}", err).contains("receiver"));
271 assert!(err.source().is_some());
272 }
273
274 #[test]
275 fn test_cancellation_token() {
276 let runtime = ExtensionRuntime::with_defaults();
277 let token = runtime.cancellation_token();
278 assert!(!token.is_cancelled());
279 }
280}