logforth/append/
opentelemetry.rs

1// Copyright 2024 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Appenders and utilities for integrating with OpenTelemetry.
16
17use std::borrow::Cow;
18use std::time::SystemTime;
19
20use log::Record;
21use opentelemetry::logs::AnyValue;
22use opentelemetry::logs::LogRecord;
23use opentelemetry::logs::Logger;
24use opentelemetry::logs::LoggerProvider;
25use opentelemetry::InstrumentationScope;
26use opentelemetry_otlp::LogExporter;
27use opentelemetry_sdk::logs::SdkLogRecord;
28use opentelemetry_sdk::logs::SdkLoggerProvider;
29
30use crate::append::Append;
31use crate::diagnostic::Visitor;
32use crate::Diagnostic;
33use crate::Layout;
34
35/// A builder to configure and create an [`OpentelemetryLog`] appender.
36#[derive(Debug)]
37pub struct OpentelemetryLogBuilder {
38    name: String,
39    log_exporter: LogExporter,
40    labels: Vec<(Cow<'static, str>, Cow<'static, str>)>,
41    layout: Option<Box<dyn Layout>>,
42}
43
44impl OpentelemetryLogBuilder {
45    /// Creates a new [`OpentelemetryLogBuilder`].
46    ///
47    /// # Examples
48    ///
49    /// ```
50    /// use logforth::append::opentelemetry::OpentelemetryLogBuilder;
51    /// use opentelemetry_otlp::{LogExporter, WithExportConfig};
52    ///
53    /// let log_exporter = LogExporter::builder()
54    ///     .with_http()
55    ///     .with_endpoint("http://localhost:4317")
56    ///     .build()
57    ///     .unwrap();
58    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
59    /// ```
60    pub fn new(name: impl Into<String>, log_exporter: impl Into<LogExporter>) -> Self {
61        OpentelemetryLogBuilder {
62            name: name.into(),
63            log_exporter: log_exporter.into(),
64            labels: vec![],
65            layout: None,
66        }
67    }
68
69    /// Adds a label to the logs.
70    ///
71    /// # Examples
72    ///
73    /// ```
74    /// use logforth::append::opentelemetry::OpentelemetryLogBuilder;
75    /// use opentelemetry_otlp::{LogExporter, WithExportConfig};
76    ///
77    /// let log_exporter = LogExporter::builder()
78    ///     .with_http()
79    ///     .with_endpoint("http://localhost:4317")
80    ///     .build()
81    ///     .unwrap();
82    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
83    /// builder.label("env", "production");
84    /// ```
85    pub fn label(
86        mut self,
87        key: impl Into<Cow<'static, str>>,
88        value: impl Into<Cow<'static, str>>,
89    ) -> Self {
90        self.labels.push((key.into(), value.into()));
91        self
92    }
93
94    /// Adds multiple labels to the logs.
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// use logforth::append::opentelemetry::OpentelemetryLogBuilder;
100    /// use opentelemetry_otlp::{LogExporter, WithExportConfig};
101    ///
102    /// let log_exporter = LogExporter::builder()
103    ///     .with_http()
104    ///     .with_endpoint("http://localhost:4317")
105    ///     .build()
106    ///     .unwrap();
107    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
108    /// builder.labels(vec![("env", "production"), ("version", "1.0")]);
109    /// ```
110    pub fn labels<K, V>(mut self, labels: impl IntoIterator<Item = (K, V)>) -> Self
111    where
112        K: Into<Cow<'static, str>>,
113        V: Into<Cow<'static, str>>,
114    {
115        self.labels
116            .extend(labels.into_iter().map(|(k, v)| (k.into(), v.into())));
117        self
118    }
119
120    /// Sets the layout for the logs.
121    ///
122    /// # Examples
123    ///
124    /// ```
125    /// use logforth::append::opentelemetry::OpentelemetryLogBuilder;
126    /// use logforth::layout::JsonLayout;
127    /// use opentelemetry_otlp::{LogExporter, WithExportConfig};
128    ///
129    /// let log_exporter = LogExporter::builder()
130    ///     .with_http()
131    ///     .with_endpoint("http://localhost:4317")
132    ///     .build()
133    ///     .unwrap();
134    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
135    /// builder.layout(JsonLayout::default());
136    /// ```
137    pub fn layout(mut self, layout: impl Into<Box<dyn Layout>>) -> Self {
138        self.layout = Some(layout.into());
139        self
140    }
141
142    /// Builds the [`OpentelemetryLog`] appender.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// use logforth::append::opentelemetry::OpentelemetryLogBuilder;
148    /// use opentelemetry_otlp::{LogExporter, WithExportConfig};
149    ///
150    /// let log_exporter = LogExporter::builder()
151    ///     .with_http()
152    ///     .with_endpoint("http://localhost:4317")
153    ///     .build()
154    ///     .unwrap();
155    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
156    /// let otlp_appender = builder.build().unwrap();
157    /// ```
158    pub fn build(self) -> Result<OpentelemetryLog, opentelemetry_otlp::ExporterBuildError> {
159        let OpentelemetryLogBuilder {
160            name,
161            log_exporter,
162            labels,
163            layout,
164        } = self;
165
166        let resource = opentelemetry_sdk::Resource::builder()
167            .with_attributes(
168                labels
169                    .into_iter()
170                    .map(|(key, value)| opentelemetry::KeyValue::new(key, value)),
171            )
172            .build();
173
174        let provider = SdkLoggerProvider::builder()
175            .with_batch_exporter(log_exporter)
176            .with_resource(resource)
177            .build();
178
179        let library = InstrumentationScope::builder(name).build();
180        let logger = provider.logger_with_scope(library);
181        Ok(OpentelemetryLog {
182            layout,
183            logger,
184            provider,
185        })
186    }
187}
188
189/// An appender that sends log records to OpenTelemetry.
190///
191/// # Examples
192///
193/// ```
194/// use logforth::append::opentelemetry::OpentelemetryLogBuilder;
195/// use opentelemetry_otlp::{LogExporter, WithExportConfig};
196///
197/// let log_exporter = LogExporter::builder()
198///     .with_http()
199///     .with_endpoint("http://localhost:4317")
200///     .build()
201///     .unwrap();
202/// let otlp_appender =
203///     OpentelemetryLogBuilder::new("service_name", log_exporter)
204///         .build()
205///         .unwrap();
206/// ```
207#[derive(Debug)]
208pub struct OpentelemetryLog {
209    layout: Option<Box<dyn Layout>>,
210    logger: opentelemetry_sdk::logs::SdkLogger,
211    provider: SdkLoggerProvider,
212}
213
214impl Append for OpentelemetryLog {
215    fn append(&self, record: &Record, diagnostics: &[Box<dyn Diagnostic>]) -> anyhow::Result<()> {
216        let mut log_record = self.logger.create_log_record();
217        log_record.set_observed_timestamp(SystemTime::now());
218        log_record.set_severity_number(log_level_to_otel_severity(record.level()));
219        log_record.set_severity_text(record.level().as_str());
220        log_record.set_target(record.target().to_string());
221        log_record.set_body(AnyValue::Bytes(Box::new(match self.layout.as_ref() {
222            None => record.args().to_string().into_bytes(),
223            Some(layout) => layout.format(record, diagnostics)?,
224        })));
225
226        if let Some(module_path) = record.module_path() {
227            log_record.add_attribute("module_path", module_path.to_string());
228        }
229        if let Some(file) = record.file() {
230            log_record.add_attribute("file", file.to_string());
231        }
232        if let Some(line) = record.line() {
233            log_record.add_attribute("line", line);
234        }
235
236        let mut extractor = KvExtractor {
237            record: &mut log_record,
238        };
239        record.key_values().visit(&mut extractor)?;
240        for d in diagnostics {
241            d.visit(&mut extractor)?;
242        }
243
244        self.logger.emit(log_record);
245        Ok(())
246    }
247
248    fn flush(&self) -> anyhow::Result<()> {
249        self.provider.force_flush()?;
250        Ok(())
251    }
252}
253
254fn log_level_to_otel_severity(level: log::Level) -> opentelemetry::logs::Severity {
255    match level {
256        log::Level::Error => opentelemetry::logs::Severity::Error,
257        log::Level::Warn => opentelemetry::logs::Severity::Warn,
258        log::Level::Info => opentelemetry::logs::Severity::Info,
259        log::Level::Debug => opentelemetry::logs::Severity::Debug,
260        log::Level::Trace => opentelemetry::logs::Severity::Trace,
261    }
262}
263
264struct KvExtractor<'a> {
265    record: &'a mut SdkLogRecord,
266}
267
268impl<'kvs> log::kv::VisitSource<'kvs> for KvExtractor<'_> {
269    fn visit_pair(
270        &mut self,
271        key: log::kv::Key<'kvs>,
272        value: log::kv::Value<'kvs>,
273    ) -> Result<(), log::kv::Error> {
274        let key = key.to_string();
275        let value = value.to_string();
276        self.record.add_attribute(key, value);
277        Ok(())
278    }
279}
280
281impl Visitor for KvExtractor<'_> {
282    fn visit(&mut self, key: Cow<str>, value: Cow<str>) -> anyhow::Result<()> {
283        let key = key.into_owned();
284        let value = value.into_owned();
285        self.record.add_attribute(key, value);
286        Ok(())
287    }
288}