forge-runtime 0.9.0

Runtime executors and gateway for the Forge framework
use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter, WithExportConfig};
use opentelemetry_sdk::{
    Resource,
    logs::LoggerProvider,
    metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
    propagation::TraceContextPropagator,
    resource::{EnvResourceDetector, SdkProvidedResourceDetector},
    trace::{RandomIdGenerator, Sampler, TracerProvider},
};
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};

const DEPLOYMENT_ENVIRONMENT_NAME: &str = "deployment.environment.name";
use std::sync::OnceLock;
use thiserror::Error;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{
    EnvFilter, Layer, Registry,
    filter::{FilterExt, FilterFn},
    layer::SubscriberExt,
    util::SubscriberInitExt,
};

static TRACER_PROVIDER: OnceLock<TracerProvider> = OnceLock::new();
static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
static LOGGER_PROVIDER: OnceLock<LoggerProvider> = OnceLock::new();

#[derive(Debug, Error)]
pub enum TelemetryError {
    #[error("failed to initialize tracer: {0}")]
    TracerInit(String),
    #[error("failed to initialize meter: {0}")]
    MeterInit(String),
    #[error("failed to initialize logger: {0}")]
    LoggerInit(String),
    #[error("telemetry already initialized")]
    AlreadyInitialized,
    #[error("tracing subscriber init failed: {0}")]
    SubscriberInit(String),
}

#[derive(Debug, Clone)]
pub struct TelemetryConfig {
    pub otlp_endpoint: String,
    pub service_name: String,
    pub service_version: String,
    pub environment: String,
    pub enable_traces: bool,
    pub enable_metrics: bool,
    pub enable_logs: bool,
    pub sampling_ratio: f64,
    pub metrics_interval_secs: u64,
}

impl Default for TelemetryConfig {
    fn default() -> Self {
        Self {
            otlp_endpoint: "http://localhost:4318".to_string(),
            service_name: "forge-service".to_string(),
            service_version: "0.1.0".to_string(),
            environment: "development".to_string(),
            enable_traces: true,
            enable_metrics: true,
            enable_logs: true,
            sampling_ratio: 1.0,
            metrics_interval_secs: 15,
        }
    }
}

impl TelemetryConfig {
    pub fn new(service_name: impl Into<String>) -> Self {
        Self {
            service_name: service_name.into(),
            ..Default::default()
        }
    }

    /// Create config from ForgeConfig's observability settings.
    pub fn from_observability_config(
        obs: &forge_core::config::ObservabilityConfig,
        project_name: &str,
        project_version: &str,
    ) -> Self {
        // When observability is disabled, turn off OTLP export but keep the
        // fmt subscriber so console logs still work.
        let otlp_enabled = obs.enabled;
        Self {
            otlp_endpoint: obs.otlp_endpoint.clone(),
            service_name: obs
                .service_name
                .clone()
                .unwrap_or_else(|| project_name.to_string()),
            service_version: project_version.to_string(),
            environment: "production".to_string(),
            enable_traces: otlp_enabled && obs.enable_traces,
            enable_metrics: otlp_enabled && obs.enable_metrics,
            enable_logs: otlp_enabled && obs.enable_logs,
            sampling_ratio: obs.sampling_ratio,
            metrics_interval_secs: obs.metrics_interval_secs,
        }
    }

    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
        self.otlp_endpoint = endpoint.into();
        self
    }

    pub fn with_version(mut self, version: impl Into<String>) -> Self {
        self.service_version = version.into();
        self
    }

    pub fn with_environment(mut self, env: impl Into<String>) -> Self {
        self.environment = env.into();
        self
    }

    pub fn with_traces(mut self, enabled: bool) -> Self {
        self.enable_traces = enabled;
        self
    }

    pub fn with_metrics(mut self, enabled: bool) -> Self {
        self.enable_metrics = enabled;
        self
    }

    pub fn with_logs(mut self, enabled: bool) -> Self {
        self.enable_logs = enabled;
        self
    }

    pub fn with_sampling_ratio(mut self, ratio: f64) -> Self {
        self.sampling_ratio = ratio.clamp(0.0, 1.0);
        self
    }
}

fn build_resource(config: &TelemetryConfig) -> Resource {
    let base = Resource::from_detectors(
        std::time::Duration::from_secs(5),
        vec![
            Box::new(SdkProvidedResourceDetector),
            Box::new(EnvResourceDetector::new()),
        ],
    );

    let custom = Resource::new(vec![
        KeyValue::new(SERVICE_NAME, config.service_name.clone()),
        KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
        KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.clone()),
    ]);

    base.merge(&custom)
}

fn init_tracer(config: &TelemetryConfig) -> Result<TracerProvider, TelemetryError> {
    let exporter = SpanExporter::builder()
        .with_http()
        .with_endpoint(&config.otlp_endpoint)
        .build()
        .map_err(|e| TelemetryError::TracerInit(e.to_string()))?;

    let sampler = if config.sampling_ratio >= 1.0 {
        Sampler::AlwaysOn
    } else if config.sampling_ratio <= 0.0 {
        Sampler::AlwaysOff
    } else {
        Sampler::TraceIdRatioBased(config.sampling_ratio)
    };

    let provider = TracerProvider::builder()
        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
        .with_sampler(sampler)
        .with_id_generator(RandomIdGenerator::default())
        .with_resource(build_resource(config))
        .build();

    Ok(provider)
}

fn init_meter(config: &TelemetryConfig) -> Result<SdkMeterProvider, TelemetryError> {
    let exporter = MetricExporter::builder()
        .with_http()
        .with_endpoint(&config.otlp_endpoint)
        .build()
        .map_err(|e| TelemetryError::MeterInit(e.to_string()))?;

    let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio)
        .with_interval(std::time::Duration::from_secs(config.metrics_interval_secs))
        .build();

    let provider = MeterProviderBuilder::default()
        .with_reader(reader)
        .with_resource(build_resource(config))
        .build();

    Ok(provider)
}

fn init_logger(config: &TelemetryConfig) -> Result<LoggerProvider, TelemetryError> {
    let exporter = LogExporter::builder()
        .with_http()
        .with_endpoint(&config.otlp_endpoint)
        .build()
        .map_err(|e| TelemetryError::LoggerInit(e.to_string()))?;

    let provider = LoggerProvider::builder()
        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
        .with_resource(build_resource(config))
        .build();

    Ok(provider)
}

/// Build an `EnvFilter` for console output.
///
/// Ensures the user's crate is visible at the configured level even when
/// `RUST_LOG` is set to something restrictive (e.g. `warn`).
pub fn build_env_filter(project_name: &str, log_level: &str) -> EnvFilter {
    build_console_filter(project_name, log_level)
}

fn build_console_filter(project_name: &str, log_level: &str) -> EnvFilter {
    let crate_name = project_name.replace('-', "_");

    let base = if let Ok(filter) = EnvFilter::try_from_default_env() {
        filter
    } else {
        EnvFilter::new(log_level)
    };

    let directive = format!("{}={}", crate_name, log_level);
    match directive.parse() {
        Ok(d) => base.add_directive(d),
        Err(_) => base,
    }
}

/// Set up tracing so logs work without any user boilerplate.
/// Returns `Ok(false)` if a subscriber already exists (user configured their own).
pub fn init_telemetry(
    config: &TelemetryConfig,
    project_name: &str,
    log_level: &str,
) -> Result<bool, TelemetryError> {
    global::set_text_map_propagator(TraceContextPropagator::new());

    // Per-layer filters avoid the global EnvFilter + per-layer filter conflict
    // that causes the OTel log bridge to silently drop events.
    let fmt_layer = tracing_subscriber::fmt::layer()
        .with_target(true)
        .with_thread_ids(false)
        .with_file(false)
        .with_line_number(false)
        .with_filter(build_console_filter(project_name, log_level));

    // Build optional trace layer (includes sqlx debug for DB-level spans)
    let otel_trace_layer = if config.enable_traces {
        let tracer_provider = init_tracer(config)?;
        let tracer = tracer_provider.tracer(config.service_name.clone());

        TRACER_PROVIDER
            .set(tracer_provider.clone())
            .map_err(|_| TelemetryError::AlreadyInitialized)?;

        global::set_tracer_provider(tracer_provider);

        Some(
            OpenTelemetryLayer::new(tracer)
                .with_filter(build_console_filter(project_name, log_level)),
        )
    } else {
        None
    };

    // Build optional log bridge layer.
    // Uses the OTel filter (includes sqlx debug) plus an anti-recursion guard
    // that blocks events from the OTLP transport stack (hyper, reqwest, h2,
    // etc.) to prevent infinite feedback through the HTTP exporter.
    let otel_log_layer = if config.enable_logs {
        let logger_provider = init_logger(config)?;

        let env_filter = build_console_filter(project_name, log_level);
        let log_layer = OpenTelemetryTracingBridge::new(&logger_provider).with_filter(
            env_filter.and(FilterFn::new(|metadata| {
                let target = metadata.target();
                !target.starts_with("hyper")
                    && !target.starts_with("reqwest")
                    && !target.starts_with("h2")
                    && !target.starts_with("tonic")
                    && !target.starts_with("tower")
                    && !target.starts_with("opentelemetry")
            })),
        );

        LOGGER_PROVIDER
            .set(logger_provider)
            .map_err(|_| TelemetryError::AlreadyInitialized)?;

        Some(log_layer)
    } else {
        None
    };

    // No global filter: each layer carries its own per-layer filter.
    if Registry::default()
        .with(fmt_layer)
        .with(otel_trace_layer)
        .with(otel_log_layer)
        .try_init()
        .is_err()
    {
        return Ok(false);
    }

    if config.enable_metrics {
        let meter_provider = init_meter(config)?;

        METER_PROVIDER
            .set(meter_provider.clone())
            .map_err(|_| TelemetryError::AlreadyInitialized)?;

        global::set_meter_provider(meter_provider);
    }

    tracing::info!(
        service = %config.service_name,
        version = %config.service_version,
        environment = %config.environment,
        traces = config.enable_traces,
        metrics = config.enable_metrics,
        logs = config.enable_logs,
        "telemetry initialized"
    );

    Ok(true)
}

pub fn shutdown_telemetry() {
    tracing::info!("shutting down telemetry");

    if let Some(provider) = TRACER_PROVIDER.get()
        && let Err(e) = provider.shutdown()
    {
        tracing::warn!(error = %e, "failed to shutdown tracer provider");
    }

    if let Some(provider) = METER_PROVIDER.get()
        && let Err(e) = provider.shutdown()
    {
        tracing::warn!(error = %e, "failed to shutdown meter provider");
    }

    if let Some(provider) = LOGGER_PROVIDER.get()
        && let Err(e) = provider.shutdown()
    {
        tracing::warn!(error = %e, "failed to shutdown logger provider");
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_config_builder() {
        let config = TelemetryConfig::new("test-service")
            .with_endpoint("http://otel:4318")
            .with_version("1.0.0")
            .with_environment("production")
            .with_traces(true)
            .with_metrics(false)
            .with_logs(true);

        assert_eq!(config.service_name, "test-service");
        assert_eq!(config.otlp_endpoint, "http://otel:4318");
        assert_eq!(config.service_version, "1.0.0");
        assert_eq!(config.environment, "production");
        assert!(config.enable_traces);
        assert!(!config.enable_metrics);
        assert!(config.enable_logs);
    }

    #[test]
    fn test_default_config() {
        let config = TelemetryConfig::default();

        assert_eq!(config.otlp_endpoint, "http://localhost:4318");
        assert_eq!(config.service_name, "forge-service");
        assert_eq!(config.environment, "development");
        assert!(config.enable_traces);
        assert!(config.enable_metrics);
        assert!(config.enable_logs);
    }
}