cloud-util 0.8.2-beta1

cita-cloud utility library
Documentation
// Copyright Rivtower Technologies LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::{Local, Offset};
use opentelemetry::{global, propagation::Extractor, trace::TracerProvider, KeyValue};
use opentelemetry_sdk::{
    propagation::TraceContextPropagator,
    runtime,
    trace::{BatchConfig, Sampler},
    Resource,
};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use time::{format_description::well_known, UtcOffset};
use tonic::Request;
use tracing_opentelemetry::{OpenTelemetryLayer, OpenTelemetrySpanExt};
use tracing_subscriber::{fmt::format, fmt::time::OffsetTime, prelude::*, EnvFilter};

struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);

impl<'a> Extractor for MetadataMap<'a> {
    /// Get a value for a key from the MetadataMap.  If the value can't be converted to &str, returns None
    fn get(&self, key: &str) -> Option<&str> {
        self.0.get(key).and_then(|metadata| metadata.to_str().ok())
    }

    /// Collect all the keys from the MetadataMap.
    fn keys(&self) -> Vec<&str> {
        self.0
            .keys()
            .map(|key| match key {
                tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
                tonic::metadata::KeyRef::Binary(v) => v.as_str(),
            })
            .collect::<Vec<_>>()
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct LogConfig {
    max_level: String,
    filter: String,
    service_name: String,
    rolling_file_path: Option<String>,
    agent_endpoint: Option<String>,
}

impl Default for LogConfig {
    fn default() -> Self {
        Self {
            max_level: "info".to_owned(),
            filter: "info".to_owned(),
            service_name: Default::default(),
            rolling_file_path: Default::default(),
            agent_endpoint: Default::default(),
        }
    }
}

pub fn init_tracer(
    domain: String,
    log_config: &LogConfig,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    // agent
    let mut agent = None;
    if let Some(agent_endpoint) = &log_config.agent_endpoint {
        global::set_text_map_propagator(TraceContextPropagator::new());
        agent = Some(
            opentelemetry_otlp::new_pipeline()
                .tracing()
                .with_trace_config(
                    opentelemetry_sdk::trace::Config::default()
                        .with_sampler(
                            Sampler::jaeger_remote(
                                runtime::Tokio,
                                reqwest::Client::new(),
                                Sampler::AlwaysOff,
                                &log_config.service_name,
                            )
                            .with_endpoint(agent_endpoint)
                            .build()
                            .unwrap(),
                        )
                        .with_resource(Resource::new(vec![KeyValue::new("domain", domain)])),
                )
                .with_batch_config(BatchConfig::default())
                .with_exporter(opentelemetry_otlp::new_exporter().tonic())
                .install_batch(runtime::Tokio)?
                .tracer("cita_cloud_tracer"),
        );
    }

    // log
    let mut logfile = None;
    let mut stdout = None;
    if let Some(rolling_file_path) = &log_config.rolling_file_path {
        // logfile
        logfile = Some(tracing_appender::rolling::daily(
            rolling_file_path,
            &log_config.service_name,
        ));
    } else {
        // stdout
        stdout = Some(
            std::io::stdout
                .with_max_level(tracing::Level::from_str(&log_config.max_level).unwrap()),
        );
    }

    // set timer
    let local_offset_sec = Local::now().offset().fix().local_minus_utc();
    let utc_offset = UtcOffset::from_whole_seconds(local_offset_sec)
        .unwrap_or(UtcOffset::from_hms(8, 0, 0).unwrap());
    let timer = OffsetTime::new(utc_offset, well_known::Rfc3339);

    if let Some(agent) = agent {
        if let Some(stdout) = stdout {
            tracing_subscriber::registry()
                .with(EnvFilter::new(&log_config.filter))
                .with(OpenTelemetryLayer::new(agent))
                .with(
                    tracing_subscriber::fmt::layer()
                        .event_format(format().compact())
                        .with_ansi(false)
                        .with_timer(timer)
                        .with_writer(stdout),
                )
                .try_init()?;
        } else {
            tracing_subscriber::registry()
                .with(EnvFilter::new(&log_config.filter))
                .with(OpenTelemetryLayer::new(agent))
                .with(
                    tracing_subscriber::fmt::layer()
                        .event_format(format().compact())
                        .with_ansi(false)
                        .with_timer(timer)
                        .with_writer(logfile.unwrap()),
                )
                .try_init()?;
        }
    } else if let Some(stdout) = stdout {
        tracing_subscriber::registry()
            .with(EnvFilter::new(&log_config.filter))
            .with(
                tracing_subscriber::fmt::layer()
                    .event_format(format().compact())
                    .with_ansi(false)
                    .with_timer(timer)
                    .with_writer(stdout),
            )
            .try_init()?;
    } else {
        tracing_subscriber::registry()
            .with(EnvFilter::new(&log_config.filter))
            .with(
                tracing_subscriber::fmt::layer()
                    .event_format(format().compact())
                    .with_ansi(false)
                    .with_timer(timer)
                    .with_writer(logfile.unwrap()),
            )
            .try_init()?;
    }

    Ok(())
}

pub fn shutdown_tracer() {
    opentelemetry::global::shutdown_tracer_provider();
}

pub fn set_parent<T>(request: &Request<T>) {
    let parent_cx =
        global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(request.metadata())));
    tracing::Span::current().set_parent(parent_cx);
}