cloud_util/
tracer.rs

1// Copyright Rivtower Technologies LLC.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono::{Local, Offset};
16use opentelemetry::{global, propagation::Extractor, trace::TracerProvider, KeyValue};
17use opentelemetry_sdk::{
18    propagation::TraceContextPropagator,
19    runtime,
20    trace::{BatchConfig, Sampler},
21    Resource,
22};
23use serde::{Deserialize, Serialize};
24use std::str::FromStr;
25use time::{format_description::well_known, UtcOffset};
26use tonic::Request;
27use tracing_opentelemetry::{OpenTelemetryLayer, OpenTelemetrySpanExt};
28use tracing_subscriber::{fmt::format, fmt::time::OffsetTime, prelude::*, EnvFilter};
29
30struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);
31
32impl<'a> Extractor for MetadataMap<'a> {
33    /// Get a value for a key from the MetadataMap.  If the value can't be converted to &str, returns None
34    fn get(&self, key: &str) -> Option<&str> {
35        self.0.get(key).and_then(|metadata| metadata.to_str().ok())
36    }
37
38    /// Collect all the keys from the MetadataMap.
39    fn keys(&self) -> Vec<&str> {
40        self.0
41            .keys()
42            .map(|key| match key {
43                tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
44                tonic::metadata::KeyRef::Binary(v) => v.as_str(),
45            })
46            .collect::<Vec<_>>()
47    }
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(default)]
52pub struct LogConfig {
53    max_level: String,
54    filter: String,
55    service_name: String,
56    rolling_file_path: Option<String>,
57    agent_endpoint: Option<String>,
58}
59
60impl Default for LogConfig {
61    fn default() -> Self {
62        Self {
63            max_level: "info".to_owned(),
64            filter: "info".to_owned(),
65            service_name: Default::default(),
66            rolling_file_path: Default::default(),
67            agent_endpoint: Default::default(),
68        }
69    }
70}
71
72pub fn init_tracer(
73    domain: String,
74    log_config: &LogConfig,
75) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
76    // agent
77    let mut agent = None;
78    if let Some(agent_endpoint) = &log_config.agent_endpoint {
79        global::set_text_map_propagator(TraceContextPropagator::new());
80        agent = Some(
81            opentelemetry_otlp::new_pipeline()
82                .tracing()
83                .with_trace_config(
84                    opentelemetry_sdk::trace::Config::default()
85                        .with_sampler(
86                            Sampler::jaeger_remote(
87                                runtime::Tokio,
88                                reqwest::Client::new(),
89                                Sampler::AlwaysOff,
90                                &log_config.service_name,
91                            )
92                            .with_endpoint(agent_endpoint)
93                            .build()
94                            .unwrap(),
95                        )
96                        .with_resource(Resource::new(vec![KeyValue::new("domain", domain)])),
97                )
98                .with_batch_config(BatchConfig::default())
99                .with_exporter(opentelemetry_otlp::new_exporter().tonic())
100                .install_batch(runtime::Tokio)?
101                .tracer("cita_cloud_tracer"),
102        );
103    }
104
105    // log
106    let mut logfile = None;
107    let mut stdout = None;
108    if let Some(rolling_file_path) = &log_config.rolling_file_path {
109        // logfile
110        logfile = Some(tracing_appender::rolling::daily(
111            rolling_file_path,
112            &log_config.service_name,
113        ));
114    } else {
115        // stdout
116        stdout = Some(
117            std::io::stdout
118                .with_max_level(tracing::Level::from_str(&log_config.max_level).unwrap()),
119        );
120    }
121
122    // set timer
123    let local_offset_sec = Local::now().offset().fix().local_minus_utc();
124    let utc_offset = UtcOffset::from_whole_seconds(local_offset_sec)
125        .unwrap_or(UtcOffset::from_hms(8, 0, 0).unwrap());
126    let timer = OffsetTime::new(utc_offset, well_known::Rfc3339);
127
128    if let Some(agent) = agent {
129        if let Some(stdout) = stdout {
130            tracing_subscriber::registry()
131                .with(EnvFilter::new(&log_config.filter))
132                .with(OpenTelemetryLayer::new(agent))
133                .with(
134                    tracing_subscriber::fmt::layer()
135                        .event_format(format().compact())
136                        .with_ansi(false)
137                        .with_timer(timer)
138                        .with_writer(stdout),
139                )
140                .try_init()?;
141        } else {
142            tracing_subscriber::registry()
143                .with(EnvFilter::new(&log_config.filter))
144                .with(OpenTelemetryLayer::new(agent))
145                .with(
146                    tracing_subscriber::fmt::layer()
147                        .event_format(format().compact())
148                        .with_ansi(false)
149                        .with_timer(timer)
150                        .with_writer(logfile.unwrap()),
151                )
152                .try_init()?;
153        }
154    } else if let Some(stdout) = stdout {
155        tracing_subscriber::registry()
156            .with(EnvFilter::new(&log_config.filter))
157            .with(
158                tracing_subscriber::fmt::layer()
159                    .event_format(format().compact())
160                    .with_ansi(false)
161                    .with_timer(timer)
162                    .with_writer(stdout),
163            )
164            .try_init()?;
165    } else {
166        tracing_subscriber::registry()
167            .with(EnvFilter::new(&log_config.filter))
168            .with(
169                tracing_subscriber::fmt::layer()
170                    .event_format(format().compact())
171                    .with_ansi(false)
172                    .with_timer(timer)
173                    .with_writer(logfile.unwrap()),
174            )
175            .try_init()?;
176    }
177
178    Ok(())
179}
180
181pub fn shutdown_tracer() {
182    opentelemetry::global::shutdown_tracer_provider();
183}
184
185pub fn set_parent<T>(request: &Request<T>) {
186    let parent_cx =
187        global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(request.metadata())));
188    tracing::Span::current().set_parent(parent_cx);
189}