logforth_append_opentelemetry/
lib.rs

1// Copyright 2024 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Appenders and utilities for integrating with OpenTelemetry.
16
17#![cfg_attr(docsrs, feature(doc_cfg))]
18
19use std::borrow::Cow;
20use std::fmt;
21use std::time::SystemTime;
22
23use logforth_core::Diagnostic;
24use logforth_core::Error;
25use logforth_core::Layout;
26use logforth_core::append::Append;
27use logforth_core::kv::Key;
28use logforth_core::kv::Value;
29use logforth_core::kv::Visitor;
30use logforth_core::record::Level;
31use logforth_core::record::Record;
32use opentelemetry::InstrumentationScope;
33use opentelemetry::logs::AnyValue;
34use opentelemetry::logs::LogRecord;
35use opentelemetry::logs::Logger;
36use opentelemetry::logs::LoggerProvider;
37use opentelemetry_otlp::LogExporter;
38use opentelemetry_sdk::logs::SdkLogRecord;
39use opentelemetry_sdk::logs::SdkLoggerProvider;
40
41/// A builder to configure and create an [`OpentelemetryLog`] appender.
42#[derive(Debug)]
43pub struct OpentelemetryLogBuilder {
44    name: String,
45    log_exporter: LogExporter,
46    labels: Vec<(Cow<'static, str>, Cow<'static, str>)>,
47    make_body: Option<Box<dyn MakeBody>>,
48}
49
50impl OpentelemetryLogBuilder {
51    /// Creates a new [`OpentelemetryLogBuilder`].
52    ///
53    /// # Examples
54    ///
55    /// ```
56    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
57    /// use opentelemetry_otlp::LogExporter;
58    /// use opentelemetry_otlp::WithExportConfig;
59    ///
60    /// let log_exporter = LogExporter::builder()
61    ///     .with_http()
62    ///     .with_endpoint("http://localhost:4317")
63    ///     .build()
64    ///     .unwrap();
65    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
66    /// ```
67    pub fn new(name: impl Into<String>, log_exporter: impl Into<LogExporter>) -> Self {
68        OpentelemetryLogBuilder {
69            name: name.into(),
70            log_exporter: log_exporter.into(),
71            labels: vec![],
72            make_body: None,
73        }
74    }
75
76    /// Adds a label to the logs.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
82    /// use opentelemetry_otlp::LogExporter;
83    /// use opentelemetry_otlp::WithExportConfig;
84    ///
85    /// let log_exporter = LogExporter::builder()
86    ///     .with_http()
87    ///     .with_endpoint("http://localhost:4317")
88    ///     .build()
89    ///     .unwrap();
90    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
91    /// builder.label("env", "production");
92    /// ```
93    pub fn label(
94        mut self,
95        key: impl Into<Cow<'static, str>>,
96        value: impl Into<Cow<'static, str>>,
97    ) -> Self {
98        self.labels.push((key.into(), value.into()));
99        self
100    }
101
102    /// Adds multiple labels to the logs.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
108    /// use opentelemetry_otlp::LogExporter;
109    /// use opentelemetry_otlp::WithExportConfig;
110    ///
111    /// let log_exporter = LogExporter::builder()
112    ///     .with_http()
113    ///     .with_endpoint("http://localhost:4317")
114    ///     .build()
115    ///     .unwrap();
116    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
117    /// builder.labels(vec![("env", "production"), ("version", "1.0")]);
118    /// ```
119    pub fn labels<K, V>(mut self, labels: impl IntoIterator<Item = (K, V)>) -> Self
120    where
121        K: Into<Cow<'static, str>>,
122        V: Into<Cow<'static, str>>,
123    {
124        self.labels
125            .extend(labels.into_iter().map(|(k, v)| (k.into(), v.into())));
126        self
127    }
128
129    /// Set the layout for the logs.
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use logforth_append_opentelemetry::MakeBodyLayout;
135    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
136    /// use logforth_layout_json::JsonLayout;
137    /// use opentelemetry_otlp::LogExporter;
138    /// use opentelemetry_otlp::WithExportConfig;
139    ///
140    /// let log_exporter = LogExporter::builder()
141    ///     .with_http()
142    ///     .with_endpoint("http://localhost:4317")
143    ///     .build()
144    ///     .unwrap();
145    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
146    /// builder.make_body(MakeBodyLayout::new(JsonLayout::default()));
147    /// ```
148    pub fn make_body(mut self, make_body: impl Into<Box<dyn MakeBody>>) -> Self {
149        self.make_body = Some(make_body.into());
150        self
151    }
152
153    /// Builds the [`OpentelemetryLog`] appender.
154    ///
155    /// # Examples
156    ///
157    /// ```
158    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
159    /// use opentelemetry_otlp::LogExporter;
160    /// use opentelemetry_otlp::WithExportConfig;
161    ///
162    /// let log_exporter = LogExporter::builder()
163    ///     .with_http()
164    ///     .with_endpoint("http://localhost:4317")
165    ///     .build()
166    ///     .unwrap();
167    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
168    /// let otlp_appender = builder.build();
169    /// ```
170    pub fn build(self) -> OpentelemetryLog {
171        let OpentelemetryLogBuilder {
172            name,
173            log_exporter,
174            labels,
175            make_body,
176        } = self;
177
178        let resource = opentelemetry_sdk::Resource::builder()
179            .with_attributes(
180                labels
181                    .into_iter()
182                    .map(|(key, value)| opentelemetry::KeyValue::new(key, value)),
183            )
184            .build();
185
186        let provider = SdkLoggerProvider::builder()
187            .with_batch_exporter(log_exporter)
188            .with_resource(resource)
189            .build();
190
191        let library = InstrumentationScope::builder(name).build();
192
193        let logger = provider.logger_with_scope(library);
194
195        OpentelemetryLog {
196            make_body,
197            logger,
198            provider,
199        }
200    }
201}
202
203/// An appender that sends log records to OpenTelemetry.
204///
205/// # Examples
206///
207/// ```
208/// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
209/// use opentelemetry_otlp::LogExporter;
210/// use opentelemetry_otlp::WithExportConfig;
211///
212/// let log_exporter = LogExporter::builder()
213///     .with_http()
214///     .with_endpoint("http://localhost:4317")
215///     .build()
216///     .unwrap();
217/// let otlp_appender = OpentelemetryLogBuilder::new("service_name", log_exporter).build();
218/// ```
219#[derive(Debug)]
220pub struct OpentelemetryLog {
221    make_body: Option<Box<dyn MakeBody>>,
222    logger: opentelemetry_sdk::logs::SdkLogger,
223    provider: SdkLoggerProvider,
224}
225
226impl Append for OpentelemetryLog {
227    fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
228        let now = SystemTime::now();
229
230        let mut log_record = self.logger.create_log_record();
231        log_record.set_timestamp(now);
232        log_record.set_observed_timestamp(now);
233        log_record.set_severity_number(log_level_to_otel_severity(record.level()));
234        log_record.set_severity_text(record.level().as_str());
235        log_record.set_target(record.target().to_owned());
236
237        if let Some(make_body) = self.make_body.as_ref() {
238            log_record.set_body(make_body.create(record, diags)?);
239        } else if let Some(payload) = record.payload_static() {
240            log_record.set_body(AnyValue::from(payload));
241        } else {
242            log_record.set_body(AnyValue::from(record.payload().to_owned()));
243        }
244
245        if let Some(module_path) = record.module_path_static() {
246            log_record.add_attribute("module_path", module_path);
247        } else if let Some(module_path) = record.module_path() {
248            log_record.add_attribute("module_path", module_path.to_owned());
249        }
250
251        if let Some(file) = record.file_static() {
252            log_record.add_attribute("file", file);
253        } else if let Some(file) = record.file() {
254            log_record.add_attribute("file", file.to_owned());
255        }
256
257        if let Some(line) = record.line() {
258            log_record.add_attribute("line", line);
259        }
260
261        let mut extractor = KvExtractor {
262            record: &mut log_record,
263        };
264        record.key_values().visit(&mut extractor)?;
265        for d in diags {
266            d.visit(&mut extractor)?;
267        }
268
269        self.logger.emit(log_record);
270        Ok(())
271    }
272
273    fn flush(&self) -> Result<(), Error> {
274        self.provider
275            .force_flush()
276            .map_err(|err| Error::new("failed to flush records").set_source(err))
277    }
278}
279
280impl Drop for OpentelemetryLog {
281    fn drop(&mut self) {
282        let _ = self.provider.force_flush();
283    }
284}
285
286fn log_level_to_otel_severity(level: Level) -> opentelemetry::logs::Severity {
287    match level {
288        Level::Error => opentelemetry::logs::Severity::Error,
289        Level::Warn => opentelemetry::logs::Severity::Warn,
290        Level::Info => opentelemetry::logs::Severity::Info,
291        Level::Debug => opentelemetry::logs::Severity::Debug,
292        Level::Trace => opentelemetry::logs::Severity::Trace,
293    }
294}
295
296/// A trait for formatting log records into a body that can be sent to OpenTelemetry.
297pub trait MakeBody: fmt::Debug + Send + Sync + 'static {
298    /// Creates a log record with optional diagnostics.
299    fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error>;
300}
301
302impl<T: MakeBody> From<T> for Box<dyn MakeBody> {
303    fn from(value: T) -> Self {
304        Box::new(value)
305    }
306}
307
308/// Make an OpenTelemetry body with the configured [`Layout`].
309#[derive(Debug)]
310pub struct MakeBodyLayout {
311    layout: Box<dyn Layout>,
312}
313
314impl MakeBodyLayout {
315    /// Creates a new `MakeBodyLayout` with the given layout.
316    pub fn new(layout: impl Into<Box<dyn Layout>>) -> Self {
317        MakeBodyLayout {
318            layout: layout.into(),
319        }
320    }
321}
322
323impl MakeBody for MakeBodyLayout {
324    fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error> {
325        let body = self.layout.format(record, diags)?;
326        Ok(AnyValue::Bytes(Box::new(body)))
327    }
328}
329
330struct KvExtractor<'a> {
331    record: &'a mut SdkLogRecord,
332}
333
334impl Visitor for KvExtractor<'_> {
335    fn visit(&mut self, key: Key, value: Value) -> Result<(), Error> {
336        let key = key.to_cow();
337        let value = value.to_string();
338        self.record.add_attribute(key, value);
339        Ok(())
340    }
341}