use crate::config::Config;
use crate::receiver::{OtlpReceiver, Signal};
use crate::resource::{ResourceBuilder, detect_resource, to_proto_resource};
use crate::service::{EventsService, ExtensionState, TelemetryService};
use lambda_extension::{Extension, SharedService};
use opentelemetry_sdk::resource::Resource as SdkResource;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
pub struct ExtensionRuntime {
config: Config,
cancel_token: CancellationToken,
resource: SdkResource,
}
impl ExtensionRuntime {
pub fn new(config: Config) -> Self {
Self {
config,
cancel_token: CancellationToken::new(),
resource: detect_resource(),
}
}
pub fn with_defaults() -> Self {
Self::new(Config::default())
}
pub fn with_resource(mut self, resource: SdkResource) -> Self {
self.resource = resource;
self
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancel_token.clone()
}
pub async fn run(self) -> Result<(), RuntimeError> {
tracing::debug!("Starting extension with lambda_extension crate");
let proto_resource = to_proto_resource(&self.resource);
let (state, shutdown_rx) = ExtensionState::new(self.config.clone(), proto_resource)
.map_err(|e| RuntimeError::StateInit(Box::new(e)))?;
let state = Arc::new(state);
tracing::debug!("Extension state created");
let events_service = EventsService::new(Arc::clone(&state));
let telemetry_service = TelemetryService::new(Arc::clone(&state));
let signal_tx = {
let aggregator = Arc::clone(&state.aggregator);
let (tx, mut rx) = mpsc::channel::<Signal>(self.config.telemetry_api.buffer_size);
tokio::spawn(async move {
while let Some(signal) = rx.recv().await {
aggregator.add(signal).await;
}
});
tx
};
let receiver = OtlpReceiver::new(
self.config.receiver.clone(),
signal_tx,
self.cancel_token.child_token(),
);
let (_receiver_handle, receiver_future) = receiver
.start()
.await
.map_err(RuntimeError::ReceiverStart)?;
let receiver_task = tokio::spawn(receiver_future);
tracing::debug!("Building Extension and starting run loop");
let extension_future = Extension::new()
.with_events_processor(events_service)
.with_telemetry_types(&["platform", "function", "extension"])
.with_telemetry_processor(SharedService::new(telemetry_service))
.run();
let result = tokio::select! {
result = extension_future => {
result.map_err(|e| {
tracing::error!(error = %e, "Extension run failed");
RuntimeError::EventLoop(e)
})
}
_ = shutdown_rx => {
tracing::info!("Shutdown complete, exiting event loop");
Ok(())
}
};
tracing::debug!(?result, "Extension finished");
self.cancel_token.cancel();
let _ = tokio::time::timeout(Duration::from_secs(2), receiver_task).await;
result
}
}
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum RuntimeError {
#[error("failed to create extension state")]
StateInit(#[source] Box<crate::exporter::ExportError>),
#[error("failed to start OTLP receiver")]
ReceiverStart(#[source] std::io::Error),
#[error("event loop error")]
EventLoop(#[source] Box<dyn std::error::Error + Send + Sync>),
}
#[must_use = "builders do nothing unless .build() is called"]
pub struct RuntimeBuilder {
config: Config,
resource: Option<SdkResource>,
}
impl RuntimeBuilder {
pub fn new() -> Self {
Self {
config: Config::default(),
resource: None,
}
}
pub fn config(mut self, config: Config) -> Self {
self.config = config;
self
}
pub fn exporter_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.config.exporter.endpoint = Some(endpoint.into());
self
}
pub fn flush_strategy(mut self, strategy: crate::config::FlushStrategy) -> Self {
self.config.flush.strategy = strategy;
self
}
pub fn disable_telemetry_api(mut self) -> Self {
self.config.telemetry_api.enabled = false;
self
}
pub fn resource(mut self, resource: SdkResource) -> Self {
self.resource = Some(resource);
self
}
pub fn with_resource_attributes<F>(mut self, f: F) -> Self
where
F: FnOnce(ResourceBuilder) -> ResourceBuilder,
{
let builder = ResourceBuilder::new();
self.resource = Some(f(builder).build());
self
}
pub fn build(self) -> ExtensionRuntime {
let mut runtime = ExtensionRuntime::new(self.config);
if let Some(resource) = self.resource {
runtime = runtime.with_resource(resource);
}
runtime
}
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_runtime_builder() {
let runtime = RuntimeBuilder::new()
.exporter_endpoint("http://localhost:4318")
.flush_strategy(crate::config::FlushStrategy::End)
.disable_telemetry_api()
.build();
assert_eq!(
runtime.config.exporter.endpoint,
Some("http://localhost:4318".to_string())
);
assert_eq!(
runtime.config.flush.strategy,
crate::config::FlushStrategy::End
);
assert!(!runtime.config.telemetry_api.enabled);
}
#[test]
fn test_runtime_with_defaults() {
let runtime = ExtensionRuntime::with_defaults();
assert!(runtime.config.telemetry_api.enabled);
}
#[test]
fn test_runtime_error_display() {
use std::error::Error;
let io_err = std::io::Error::new(std::io::ErrorKind::AddrInUse, "port in use");
let err = RuntimeError::ReceiverStart(io_err);
assert!(format!("{}", err).contains("receiver"));
assert!(err.source().is_some());
}
#[test]
fn test_cancellation_token() {
let runtime = ExtensionRuntime::with_defaults();
let token = runtime.cancellation_token();
assert!(!token.is_cancelled());
}
}