Skip to main content

logforth_append_opentelemetry/
lib.rs

1// Copyright 2024 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Appenders and utilities for integrating with OpenTelemetry.
16
17#![cfg_attr(docsrs, feature(doc_cfg))]
18#![deny(missing_docs)]
19
20use std::borrow::Cow;
21use std::collections::HashMap;
22use std::fmt;
23use std::time::SystemTime;
24
25use logforth_core::Diagnostic;
26use logforth_core::Error;
27use logforth_core::Layout;
28use logforth_core::append::Append;
29use logforth_core::kv::KeyView;
30use logforth_core::kv::ValueView;
31use logforth_core::kv::Visitor;
32use logforth_core::record::Level;
33use logforth_core::record::Record;
34use opentelemetry::InstrumentationScope;
35use opentelemetry::Key;
36use opentelemetry::logs::AnyValue;
37use opentelemetry::logs::LogRecord;
38use opentelemetry::logs::Logger;
39use opentelemetry::logs::LoggerProvider;
40use opentelemetry_otlp::LogExporter;
41use opentelemetry_sdk::logs::SdkLogRecord;
42use opentelemetry_sdk::logs::SdkLoggerProvider;
43
44/// A builder to configure and create an [`OpentelemetryLog`] appender.
45#[derive(Debug)]
46pub struct OpentelemetryLogBuilder {
47    name: String,
48    log_exporter: LogExporter,
49    labels: Vec<(Cow<'static, str>, Cow<'static, str>)>,
50    make_body: Option<Box<dyn MakeBody>>,
51}
52
53impl OpentelemetryLogBuilder {
54    /// Creates a new [`OpentelemetryLogBuilder`].
55    ///
56    /// # Examples
57    ///
58    /// ```
59    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
60    /// use opentelemetry_otlp::LogExporter;
61    /// use opentelemetry_otlp::WithExportConfig;
62    ///
63    /// let log_exporter = LogExporter::builder()
64    ///     .with_http()
65    ///     .with_endpoint("http://localhost:4317")
66    ///     .build()
67    ///     .unwrap();
68    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
69    /// ```
70    pub fn new(name: impl Into<String>, log_exporter: impl Into<LogExporter>) -> Self {
71        OpentelemetryLogBuilder {
72            name: name.into(),
73            log_exporter: log_exporter.into(),
74            labels: vec![],
75            make_body: None,
76        }
77    }
78
79    /// Adds a label to the logs.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
85    /// use opentelemetry_otlp::LogExporter;
86    /// use opentelemetry_otlp::WithExportConfig;
87    ///
88    /// let log_exporter = LogExporter::builder()
89    ///     .with_http()
90    ///     .with_endpoint("http://localhost:4317")
91    ///     .build()
92    ///     .unwrap();
93    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
94    /// builder.label("env", "production");
95    /// ```
96    pub fn label(
97        mut self,
98        key: impl Into<Cow<'static, str>>,
99        value: impl Into<Cow<'static, str>>,
100    ) -> Self {
101        self.labels.push((key.into(), value.into()));
102        self
103    }
104
105    /// Adds multiple labels to the logs.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
111    /// use opentelemetry_otlp::LogExporter;
112    /// use opentelemetry_otlp::WithExportConfig;
113    ///
114    /// let log_exporter = LogExporter::builder()
115    ///     .with_http()
116    ///     .with_endpoint("http://localhost:4317")
117    ///     .build()
118    ///     .unwrap();
119    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
120    /// builder.labels(vec![("env", "production"), ("version", "1.0")]);
121    /// ```
122    pub fn labels<K, V>(mut self, labels: impl IntoIterator<Item = (K, V)>) -> Self
123    where
124        K: Into<Cow<'static, str>>,
125        V: Into<Cow<'static, str>>,
126    {
127        self.labels
128            .extend(labels.into_iter().map(|(k, v)| (k.into(), v.into())));
129        self
130    }
131
132    /// Set the layout for the logs.
133    ///
134    /// # Examples
135    ///
136    /// ```
137    /// use logforth_append_opentelemetry::MakeBodyLayout;
138    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
139    /// use logforth_layout_json::JsonLayout;
140    /// use opentelemetry_otlp::LogExporter;
141    /// use opentelemetry_otlp::WithExportConfig;
142    ///
143    /// let log_exporter = LogExporter::builder()
144    ///     .with_http()
145    ///     .with_endpoint("http://localhost:4317")
146    ///     .build()
147    ///     .unwrap();
148    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
149    /// builder.make_body(MakeBodyLayout::new(JsonLayout::default()));
150    /// ```
151    pub fn make_body(mut self, make_body: impl Into<Box<dyn MakeBody>>) -> Self {
152        self.make_body = Some(make_body.into());
153        self
154    }
155
156    /// Builds the [`OpentelemetryLog`] appender.
157    ///
158    /// # Examples
159    ///
160    /// ```
161    /// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
162    /// use opentelemetry_otlp::LogExporter;
163    /// use opentelemetry_otlp::WithExportConfig;
164    ///
165    /// let log_exporter = LogExporter::builder()
166    ///     .with_http()
167    ///     .with_endpoint("http://localhost:4317")
168    ///     .build()
169    ///     .unwrap();
170    /// let builder = OpentelemetryLogBuilder::new("my_service", log_exporter);
171    /// let otlp_appender = builder.build();
172    /// ```
173    pub fn build(self) -> OpentelemetryLog {
174        let OpentelemetryLogBuilder {
175            name,
176            log_exporter,
177            labels,
178            make_body,
179        } = self;
180
181        let resource = opentelemetry_sdk::Resource::builder()
182            .with_attributes(
183                labels
184                    .into_iter()
185                    .map(|(key, value)| opentelemetry::KeyValue::new(key, value)),
186            )
187            .build();
188
189        let provider = SdkLoggerProvider::builder()
190            .with_batch_exporter(log_exporter)
191            .with_resource(resource)
192            .build();
193
194        let library = InstrumentationScope::builder(name).build();
195
196        let logger = provider.logger_with_scope(library);
197
198        OpentelemetryLog {
199            make_body,
200            logger,
201            provider,
202        }
203    }
204}
205
206/// An appender that sends log records to OpenTelemetry.
207///
208/// # Examples
209///
210/// ```
211/// use logforth_append_opentelemetry::OpentelemetryLogBuilder;
212/// use opentelemetry_otlp::LogExporter;
213/// use opentelemetry_otlp::WithExportConfig;
214///
215/// let log_exporter = LogExporter::builder()
216///     .with_http()
217///     .with_endpoint("http://localhost:4317")
218///     .build()
219///     .unwrap();
220/// let otlp_appender = OpentelemetryLogBuilder::new("service_name", log_exporter).build();
221/// ```
222#[derive(Debug)]
223pub struct OpentelemetryLog {
224    make_body: Option<Box<dyn MakeBody>>,
225    logger: opentelemetry_sdk::logs::SdkLogger,
226    provider: SdkLoggerProvider,
227}
228
229impl Append for OpentelemetryLog {
230    fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
231        let now = SystemTime::now();
232
233        let mut log_record = self.logger.create_log_record();
234        log_record.set_timestamp(now);
235        log_record.set_observed_timestamp(now);
236        log_record.set_severity_number(log_level_to_otel_severity(record.level()));
237        log_record.set_severity_text(record.level().name());
238
239        if let Some(target) = record.target_static() {
240            log_record.set_target(target);
241        } else {
242            log_record.set_target(record.target().to_owned());
243        }
244
245        if let Some(make_body) = self.make_body.as_ref() {
246            log_record.set_body(make_body.create(record, diags)?);
247        } else if let Some(payload) = record.payload_static() {
248            log_record.set_body(AnyValue::from(payload));
249        } else {
250            log_record.set_body(AnyValue::from(record.payload().to_string()));
251        }
252
253        if let Some(module_path) = record.module_path_static() {
254            log_record.add_attribute("module_path", module_path);
255        } else if let Some(module_path) = record.module_path() {
256            log_record.add_attribute("module_path", module_path.to_owned());
257        }
258
259        if let Some(file) = record.file_static() {
260            log_record.add_attribute("file", file);
261        } else if let Some(file) = record.file() {
262            log_record.add_attribute("file", file.to_owned());
263        }
264
265        if let Some(line) = record.line() {
266            log_record.add_attribute("line", line);
267        }
268
269        if let Some(column) = record.column() {
270            log_record.add_attribute("column", column);
271        }
272
273        let mut extractor = KvExtractor {
274            record: &mut log_record,
275        };
276        record.key_values().visit(&mut extractor)?;
277        for d in diags {
278            d.visit(&mut extractor)?;
279        }
280
281        self.logger.emit(log_record);
282        Ok(())
283    }
284
285    fn flush(&self) -> Result<(), Error> {
286        self.provider
287            .force_flush()
288            .map_err(|err| Error::new("failed to flush records").with_source(err))
289    }
290}
291
292impl Drop for OpentelemetryLog {
293    fn drop(&mut self) {
294        let _ = self.provider.force_flush();
295    }
296}
297
298fn log_level_to_otel_severity(level: Level) -> opentelemetry::logs::Severity {
299    match level {
300        Level::Trace => opentelemetry::logs::Severity::Trace,
301        Level::Trace2 => opentelemetry::logs::Severity::Trace2,
302        Level::Trace3 => opentelemetry::logs::Severity::Trace3,
303        Level::Trace4 => opentelemetry::logs::Severity::Trace4,
304        Level::Debug => opentelemetry::logs::Severity::Debug,
305        Level::Debug2 => opentelemetry::logs::Severity::Debug2,
306        Level::Debug3 => opentelemetry::logs::Severity::Debug3,
307        Level::Debug4 => opentelemetry::logs::Severity::Debug4,
308        Level::Info => opentelemetry::logs::Severity::Info,
309        Level::Info2 => opentelemetry::logs::Severity::Info2,
310        Level::Info3 => opentelemetry::logs::Severity::Info3,
311        Level::Info4 => opentelemetry::logs::Severity::Info4,
312        Level::Warn => opentelemetry::logs::Severity::Warn,
313        Level::Warn2 => opentelemetry::logs::Severity::Warn2,
314        Level::Warn3 => opentelemetry::logs::Severity::Warn3,
315        Level::Warn4 => opentelemetry::logs::Severity::Warn4,
316        Level::Error => opentelemetry::logs::Severity::Error,
317        Level::Error2 => opentelemetry::logs::Severity::Error2,
318        Level::Error3 => opentelemetry::logs::Severity::Error3,
319        Level::Error4 => opentelemetry::logs::Severity::Error4,
320        Level::Fatal => opentelemetry::logs::Severity::Fatal,
321        Level::Fatal2 => opentelemetry::logs::Severity::Fatal2,
322        Level::Fatal3 => opentelemetry::logs::Severity::Fatal3,
323        Level::Fatal4 => opentelemetry::logs::Severity::Fatal4,
324    }
325}
326
327/// A trait for formatting log records into a body that can be sent to OpenTelemetry.
328pub trait MakeBody: fmt::Debug + Send + Sync + 'static {
329    /// Creates a log record with optional diagnostics.
330    fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error>;
331}
332
333impl<T: MakeBody> From<T> for Box<dyn MakeBody> {
334    fn from(value: T) -> Self {
335        Box::new(value)
336    }
337}
338
339/// Make an OpenTelemetry body with the configured [`Layout`].
340#[derive(Debug)]
341pub struct MakeBodyLayout {
342    layout: Box<dyn Layout>,
343}
344
345impl MakeBodyLayout {
346    /// Creates a new `MakeBodyLayout` with the given layout.
347    pub fn new(layout: impl Into<Box<dyn Layout>>) -> Self {
348        MakeBodyLayout {
349            layout: layout.into(),
350        }
351    }
352}
353
354impl MakeBody for MakeBodyLayout {
355    fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error> {
356        let body = self.layout.format(record, diags)?;
357        Ok(AnyValue::Bytes(Box::new(body)))
358    }
359}
360
361struct KvExtractor<'a> {
362    record: &'a mut SdkLogRecord,
363}
364
365impl Visitor for KvExtractor<'_> {
366    fn visit(&mut self, key: KeyView, value: ValueView) -> Result<(), Error> {
367        let key = key_to_key(key);
368        let value = value_to_any_value(value);
369        self.record.add_attribute(key, value);
370        Ok(())
371    }
372}
373
374fn key_to_key(key: KeyView) -> Key {
375    Key::from(key.to_cow())
376}
377
378fn value_to_any_value(value: ValueView) -> AnyValue {
379    match value {
380        // TODO(@tisonkun): see https://github.com/open-telemetry/opentelemetry-rust/issues/3528
381        ValueView::None => AnyValue::String("null".into()),
382        ValueView::BorrowedStr(v) => AnyValue::String(v.to_string().into()),
383        ValueView::StaticStr(v) => AnyValue::String(v.into()),
384        ValueView::Char(v) => AnyValue::String(v.to_string().into()),
385        ValueView::Debug(v) => AnyValue::String(v.to_string().into()),
386        ValueView::Display(v) => AnyValue::String(v.to_string().into()),
387        ValueView::Bool(v) => AnyValue::Boolean(v),
388        ValueView::I64(v) => AnyValue::Int(v),
389        ValueView::F64(v) => AnyValue::Double(v),
390        // the following three integer transforms follow what `opentelemetry-appender-log` does:
391        // https://github.com/open-telemetry/opentelemetry-rust/blob/f7b0dd99/opentelemetry-appender-log/src/lib.rs#L259-L287
392        ValueView::U64(v) => {
393            if let Ok(i) = i64::try_from(v) {
394                AnyValue::Int(i)
395            } else {
396                AnyValue::String(v.to_string().into())
397            }
398        }
399        ValueView::I128(v) => {
400            if let Ok(i) = i64::try_from(v) {
401                AnyValue::Int(i)
402            } else {
403                AnyValue::String(v.to_string().into())
404            }
405        }
406        ValueView::U128(v) => {
407            if let Ok(i) = i64::try_from(v) {
408                AnyValue::Int(i)
409            } else {
410                AnyValue::String(v.to_string().into())
411            }
412        }
413        ValueView::List(v) => {
414            let mut l = Vec::new();
415            for item in v.iter() {
416                l.push(value_to_any_value(item));
417            }
418            AnyValue::ListAny(Box::new(l))
419        }
420        ValueView::Map(v) => {
421            let mut m = HashMap::new();
422            for (k, v) in v.iter() {
423                m.insert(key_to_key(k), value_to_any_value(v));
424            }
425            AnyValue::Map(Box::new(m))
426        }
427        v => AnyValue::String(v.to_string().into()),
428    }
429}