logforth_append_opentelemetry/
lib.rs

1// Copyright 2024 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Appenders and utilities for integrating with OpenTelemetry.
16
17#![cfg_attr(docsrs, feature(doc_cfg))]
18
19use std::borrow::Cow;
20use std::fmt;
21use std::time::SystemTime;
22
23use logforth_core::Diagnostic;
24use logforth_core::Error;
25use logforth_core::Layout;
26use logforth_core::append::Append;
27use logforth_core::kv::Key;
28use logforth_core::kv::Value;
29use logforth_core::kv::Visitor;
30use logforth_core::record::Level;
31use logforth_core::record::Record;
32use opentelemetry::InstrumentationScope;
33use opentelemetry::logs::AnyValue;
34use opentelemetry::logs::LogRecord;
35use opentelemetry::logs::Logger;
36use opentelemetry::logs::LoggerProvider;
37use opentelemetry_otlp::LogExporter;
38use opentelemetry_sdk::logs::SdkLogRecord;
39use opentelemetry_sdk::logs::SdkLoggerProvider;
40
41/// A builder to configure and create an [`OpentelemetryLog`] appender.
42#[derive(Debug)]
43pub struct OpentelemetryLogBuilder {
44    name: String,
45    log_exporter: LogExporter,
46    labels: Vec<(Cow<'static, str>, Cow<'static, str>)>,
47    make_body: Option<Box<dyn MakeBody>>,
48}
49
50impl OpentelemetryLogBuilder {
51    /// Creates a new [`OpentelemetryLogBuilder`].
52    ///
53    /// # Examples
54    ///
55    /// ```
56    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
57    /// use opentelemetry_otlp::LogExporter;
58    /// use opentelemetry_otlp::WithExportConfig;
59    ///
60    /// let log_exporter = LogExporter::builder()
61    ///     .with_http()
62    ///     .with_endpoint("http://localhost:4317")
63    ///     .build()
64    ///     .unwrap();
65    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
66    /// ```
67    pub fn new(name: impl Into<String>, log_exporter: impl Into<LogExporter>) -> Self {
68        OpentelemetryLogBuilder {
69            name: name.into(),
70            log_exporter: log_exporter.into(),
71            labels: vec![],
72            make_body: None,
73        }
74    }
75
76    /// Adds a label to the logs.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
82    /// use opentelemetry_otlp::LogExporter;
83    /// use opentelemetry_otlp::WithExportConfig;
84    ///
85    /// let log_exporter = LogExporter::builder()
86    ///     .with_http()
87    ///     .with_endpoint("http://localhost:4317")
88    ///     .build()
89    ///     .unwrap();
90    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
91    /// builder.label("env", "production");
92    /// ```
93    pub fn label(
94        mut self,
95        key: impl Into<Cow<'static, str>>,
96        value: impl Into<Cow<'static, str>>,
97    ) -> Self {
98        self.labels.push((key.into(), value.into()));
99        self
100    }
101
102    /// Adds multiple labels to the logs.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
108    /// use opentelemetry_otlp::LogExporter;
109    /// use opentelemetry_otlp::WithExportConfig;
110    ///
111    /// let log_exporter = LogExporter::builder()
112    ///     .with_http()
113    ///     .with_endpoint("http://localhost:4317")
114    ///     .build()
115    ///     .unwrap();
116    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
117    /// builder.labels(vec![("env", "production"), ("version", "1.0")]);
118    /// ```
119    pub fn labels<K, V>(mut self, labels: impl IntoIterator<Item = (K, V)>) -> Self
120    where
121        K: Into<Cow<'static, str>>,
122        V: Into<Cow<'static, str>>,
123    {
124        self.labels
125            .extend(labels.into_iter().map(|(k, v)| (k.into(), v.into())));
126        self
127    }
128
129    /// Set the layout for the logs.
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use logforth_append_opentelemetry::MakeBodyLayout;
135    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
136    /// use logforth_layout_json::JsonLayout;
137    /// use opentelemetry_otlp::LogExporter;
138    /// use opentelemetry_otlp::WithExportConfig;
139    ///
140    /// let log_exporter = LogExporter::builder()
141    ///     .with_http()
142    ///     .with_endpoint("http://localhost:4317")
143    ///     .build()
144    ///     .unwrap();
145    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
146    /// builder.make_body(MakeBodyLayout::new(JsonLayout::default()));
147    /// ```
148    pub fn make_body(mut self, make_body: impl Into<Box<dyn MakeBody>>) -> Self {
149        self.make_body = Some(make_body.into());
150        self
151    }
152
153    /// Builds the [`OpentelemetryLog`] appender.
154    ///
155    /// # Examples
156    ///
157    /// ```
158    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
159    /// use opentelemetry_otlp::LogExporter;
160    /// use opentelemetry_otlp::WithExportConfig;
161    ///
162    /// let log_exporter = LogExporter::builder()
163    ///     .with_http()
164    ///     .with_endpoint("http://localhost:4317")
165    ///     .build()
166    ///     .unwrap();
167    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
168    /// let otlp_appender = builder.build();
169    /// ```
170    pub fn build(self) -> OpentelemetryLog {
171        let OpentelemetryLogBuilder {
172            name,
173            log_exporter,
174            labels,
175            make_body,
176        } = self;
177
178        let resource = opentelemetry_sdk::Resource::builder()
179            .with_attributes(
180                labels
181                    .into_iter()
182                    .map(|(key, value)| opentelemetry::KeyValue::new(key, value)),
183            )
184            .build();
185
186        let provider = SdkLoggerProvider::builder()
187            .with_batch_exporter(log_exporter)
188            .with_resource(resource)
189            .build();
190
191        let library = InstrumentationScope::builder(name).build();
192
193        let logger = provider.logger_with_scope(library);
194
195        OpentelemetryLog {
196            make_body,
197            logger,
198            provider,
199        }
200    }
201}
202
203/// An appender that sends log records to OpenTelemetry.
204///
205/// # Examples
206///
207/// ```
208/// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
209/// use opentelemetry_otlp::LogExporter;
210/// use opentelemetry_otlp::WithExportConfig;
211///
212/// let log_exporter = LogExporter::builder()
213///     .with_http()
214///     .with_endpoint("http://localhost:4317")
215///     .build()
216///     .unwrap();
217/// let otlp_appender = OpentelemetryLogBuilder::new("service_name", log_exporter).build();
218/// ```
219#[derive(Debug)]
220pub struct OpentelemetryLog {
221    make_body: Option<Box<dyn MakeBody>>,
222    logger: opentelemetry_sdk::logs::SdkLogger,
223    provider: SdkLoggerProvider,
224}
225
226impl Append for OpentelemetryLog {
227    fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
228        let now = SystemTime::now();
229
230        let mut log_record = self.logger.create_log_record();
231        log_record.set_timestamp(now);
232        log_record.set_observed_timestamp(now);
233        log_record.set_severity_number(log_level_to_otel_severity(record.level()));
234        log_record.set_severity_text(record.level().name());
235
236        if let Some(target) = record.target_static() {
237            log_record.set_target(target);
238        } else {
239            log_record.set_target(record.target().to_owned());
240        }
241
242        if let Some(make_body) = self.make_body.as_ref() {
243            log_record.set_body(make_body.create(record, diags)?);
244        } else if let Some(payload) = record.payload_static() {
245            log_record.set_body(AnyValue::from(payload));
246        } else {
247            log_record.set_body(AnyValue::from(record.payload().to_owned()));
248        }
249
250        if let Some(module_path) = record.module_path_static() {
251            log_record.add_attribute("module_path", module_path);
252        } else if let Some(module_path) = record.module_path() {
253            log_record.add_attribute("module_path", module_path.to_owned());
254        }
255
256        if let Some(file) = record.file_static() {
257            log_record.add_attribute("file", file);
258        } else if let Some(file) = record.file() {
259            log_record.add_attribute("file", file.to_owned());
260        }
261
262        if let Some(line) = record.line() {
263            log_record.add_attribute("line", line);
264        }
265
266        if let Some(column) = record.column() {
267            log_record.add_attribute("column", column);
268        }
269
270        let mut extractor = KvExtractor {
271            record: &mut log_record,
272        };
273        record.key_values().visit(&mut extractor)?;
274        for d in diags {
275            d.visit(&mut extractor)?;
276        }
277
278        self.logger.emit(log_record);
279        Ok(())
280    }
281
282    fn flush(&self) -> Result<(), Error> {
283        self.provider
284            .force_flush()
285            .map_err(|err| Error::new("failed to flush records").set_source(err))
286    }
287}
288
289impl Drop for OpentelemetryLog {
290    fn drop(&mut self) {
291        let _ = self.provider.force_flush();
292    }
293}
294
295fn log_level_to_otel_severity(level: Level) -> opentelemetry::logs::Severity {
296    match level {
297        Level::Trace => opentelemetry::logs::Severity::Trace,
298        Level::Trace2 => opentelemetry::logs::Severity::Trace2,
299        Level::Trace3 => opentelemetry::logs::Severity::Trace3,
300        Level::Trace4 => opentelemetry::logs::Severity::Trace4,
301        Level::Debug => opentelemetry::logs::Severity::Debug,
302        Level::Debug2 => opentelemetry::logs::Severity::Debug2,
303        Level::Debug3 => opentelemetry::logs::Severity::Debug3,
304        Level::Debug4 => opentelemetry::logs::Severity::Debug4,
305        Level::Info => opentelemetry::logs::Severity::Info,
306        Level::Info2 => opentelemetry::logs::Severity::Info2,
307        Level::Info3 => opentelemetry::logs::Severity::Info3,
308        Level::Info4 => opentelemetry::logs::Severity::Info4,
309        Level::Warn => opentelemetry::logs::Severity::Warn,
310        Level::Warn2 => opentelemetry::logs::Severity::Warn2,
311        Level::Warn3 => opentelemetry::logs::Severity::Warn3,
312        Level::Warn4 => opentelemetry::logs::Severity::Warn4,
313        Level::Error => opentelemetry::logs::Severity::Error,
314        Level::Error2 => opentelemetry::logs::Severity::Error2,
315        Level::Error3 => opentelemetry::logs::Severity::Error3,
316        Level::Error4 => opentelemetry::logs::Severity::Error4,
317        Level::Fatal => opentelemetry::logs::Severity::Fatal,
318        Level::Fatal2 => opentelemetry::logs::Severity::Fatal2,
319        Level::Fatal3 => opentelemetry::logs::Severity::Fatal3,
320        Level::Fatal4 => opentelemetry::logs::Severity::Fatal4,
321    }
322}
323
324/// A trait for formatting log records into a body that can be sent to OpenTelemetry.
325pub trait MakeBody: fmt::Debug + Send + Sync + 'static {
326    /// Creates a log record with optional diagnostics.
327    fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error>;
328}
329
330impl<T: MakeBody> From<T> for Box<dyn MakeBody> {
331    fn from(value: T) -> Self {
332        Box::new(value)
333    }
334}
335
336/// Make an OpenTelemetry body with the configured [`Layout`].
337#[derive(Debug)]
338pub struct MakeBodyLayout {
339    layout: Box<dyn Layout>,
340}
341
342impl MakeBodyLayout {
343    /// Creates a new `MakeBodyLayout` with the given layout.
344    pub fn new(layout: impl Into<Box<dyn Layout>>) -> Self {
345        MakeBodyLayout {
346            layout: layout.into(),
347        }
348    }
349}
350
351impl MakeBody for MakeBodyLayout {
352    fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error> {
353        let body = self.layout.format(record, diags)?;
354        Ok(AnyValue::Bytes(Box::new(body)))
355    }
356}
357
358struct KvExtractor<'a> {
359    record: &'a mut SdkLogRecord,
360}
361
362impl Visitor for KvExtractor<'_> {
363    fn visit(&mut self, key: Key, value: Value) -> Result<(), Error> {
364        let key = key.to_cow();
365        let value = value.to_string();
366        self.record.add_attribute(key, value);
367        Ok(())
368    }
369}