fusen-common 0.8.12

fusen-common
Documentation
use chrono::Local;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{StringValue, Value};
use opentelemetry_otlp::{ExporterBuildError, SpanExporter, WithExportConfig};
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::trace::span_processor_with_async_runtime;
use opentelemetry_sdk::{Resource, trace::SdkTracerProvider};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::str::FromStr;
use tokio::time::Instant;
use tracing::field::Field;
use tracing::{Subscriber, error, info};
use tracing_appender::{
    non_blocking::WorkerGuard,
    rolling::{RollingFileAppender, Rotation},
};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::time::FormatTime;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::{Layer, layer::SubscriberExt, util::SubscriberInitExt};

#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct LogConfig {
    pub level: String,
    pub path: Option<String>,
    pub endpoint: Option<String>,
    pub env_filter: Option<String>,
}

pub struct LogWorkGroup {
    _work_guard: Option<WorkerGuard>,
    trace_provider: Option<SdkTracerProvider>,
}

impl Drop for LogWorkGroup {
    fn drop(&mut self) {
        if let Some(trace_provider) = &self.trace_provider {
            let _ = trace_provider.shutdown();
        }
    }
}

fn init_opentelemetry_trace(
    otlp_url: &str,
    app_name: &str,
) -> Result<SdkTracerProvider, ExporterBuildError> {
    let exporter = SpanExporter::builder()
        .with_tonic()
        .with_endpoint(otlp_url)
        .build()?;
    Ok(SdkTracerProvider::builder()
        .with_resource(
            Resource::builder()
                .with_service_name(Value::String(StringValue::from(app_name.to_owned())))
                .build(),
        )
        .with_span_processor(
            span_processor_with_async_runtime::BatchSpanProcessor::builder(
                exporter,
                runtime::Tokio,
            )
            .build(),
        )
        .build())
}

pub fn init_log(app_name: &str, log_config: LogConfig) -> Option<LogWorkGroup> {
    std::panic::set_hook(Box::new(|error| {
        error!("panic : {error:?} : {}", error.to_string());
    }));
    let mut work_guard = None;
    let mut trace_provider = None;
    let mut layter_list = vec![];
    let env_filter = || {
        if let Some(env_filter) = &log_config.env_filter {
            let env_filter = env_filter.replace("{level}", &log_config.level);
            EnvFilter::from_str(&env_filter).unwrap()
        } else {
            EnvFilter::from_default_env()
        }
    };
    if let Some(path) = &log_config.path {
        let file_appender = RollingFileAppender::builder()
            .rotation(Rotation::DAILY)
            .filename_prefix(app_name)
            .filename_suffix("log")
            .build(path)
            .expect("initializing rolling file appender failed");
        let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
        let _ = work_guard.insert(guard);
        let tracing = tracing_subscriber::fmt::layer()
            .with_line_number(true)
            .with_thread_ids(true)
            .with_timer(LocalTimer);
        let json_tracing = tracing.json().with_writer(non_blocking);
        layter_list.push(json_tracing.boxed());
    };
    let tracing = tracing_subscriber::fmt::layer()
        .with_line_number(true)
        .with_thread_ids(true)
        .with_timer(LocalTimer);
    layter_list.push(tracing.boxed());
    if let Some(endpoint) = &log_config.endpoint {
        let provider = init_opentelemetry_trace(endpoint, app_name).unwrap();
        let _ = trace_provider.insert(provider.clone());
        let opentelemetry = OpenTelemetryLayer::new(provider.tracer(app_name.to_owned()));
        layter_list.push(opentelemetry.boxed());
    }
    if layter_list.is_empty() {
        return None;
    }
    let mut layer = layter_list.remove(0);
    for item in layter_list {
        layer = Box::new(layer.and_then(item));
    }
    tracing_subscriber::registry()
        .with(env_filter())
        .with(layer)
        .with(TimingLayer)
        .init();
    Some(LogWorkGroup {
        _work_guard: work_guard,
        trace_provider,
    })
}

struct LocalTimer;

impl FormatTime for LocalTimer {
    fn format_time(&self, w: &mut Writer<'_>) -> std::fmt::Result {
        write!(w, "{}", Local::now().format("%FT%T%.3f"))
    }
}

struct TimingLayer;

struct TempStatus {
    time: Instant,
    span_type: Option<String>,
}

impl<S> Layer<S> for TimingLayer
where
    S: Subscriber + for<'a> LookupSpan<'a>,
{
    fn on_new_span(
        &self,
        attrs: &tracing::span::Attributes<'_>,
        id: &tracing::span::Id,
        ctx: tracing_subscriber::layer::Context<'_, S>,
    ) {
        let mut span_type = None;
        attrs.record(&mut |field: &Field, value: &dyn std::fmt::Debug| {
            if field.name() == "span_type" {
                let _ = span_type.insert(format!("{value:?}").replace("\"", ""));
            }
        });
        if let Some(span) = ctx.span(id) {
            span.extensions_mut().insert(TempStatus {
                time: Instant::now(),
                span_type,
            })
        };
    }

    fn on_close(&self, id: tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
        // 在 span 关闭时计算耗时
        ctx.span(&id).map(|span| {
            span.extensions().get::<TempStatus>().map(|temp| {
                let duration = temp.time.elapsed().as_millis();
                let span_name = span.name();
                let span_type_temp = &temp.span_type;
                let span_type = span_type_temp
                    .as_ref()
                    .map(|e| e.as_str())
                    .unwrap_or("unknown");
                //unknown
                info!(
                    log_type = "span_timer",
                    span_name = &span_name,
                    span_type = &span_type,
                    elapsed = &duration
                )
            })
        });
    }
}