logforth_append_opentelemetry/
lib.rs1#![cfg_attr(docsrs, feature(doc_cfg))]
18
19use std::borrow::Cow;
20use std::fmt;
21use std::time::SystemTime;
22
23use logforth_core::Diagnostic;
24use logforth_core::Error;
25use logforth_core::Layout;
26use logforth_core::append::Append;
27use logforth_core::kv::Key;
28use logforth_core::kv::Value;
29use logforth_core::kv::Visitor;
30use logforth_core::record::Level;
31use logforth_core::record::Record;
32use opentelemetry::InstrumentationScope;
33use opentelemetry::logs::AnyValue;
34use opentelemetry::logs::LogRecord;
35use opentelemetry::logs::Logger;
36use opentelemetry::logs::LoggerProvider;
37use opentelemetry_otlp::LogExporter;
38use opentelemetry_sdk::logs::SdkLogRecord;
39use opentelemetry_sdk::logs::SdkLoggerProvider;
40
41#[derive(Debug)]
43pub struct OpentelemetryLogBuilder {
44 name: String,
45 log_exporter: LogExporter,
46 labels: Vec<(Cow<'static, str>, Cow<'static, str>)>,
47 make_body: Option<Box<dyn MakeBody>>,
48}
49
50impl OpentelemetryLogBuilder {
51 pub fn new(name: impl Into<String>, log_exporter: impl Into<LogExporter>) -> Self {
68 OpentelemetryLogBuilder {
69 name: name.into(),
70 log_exporter: log_exporter.into(),
71 labels: vec![],
72 make_body: None,
73 }
74 }
75
76 pub fn label(
94 mut self,
95 key: impl Into<Cow<'static, str>>,
96 value: impl Into<Cow<'static, str>>,
97 ) -> Self {
98 self.labels.push((key.into(), value.into()));
99 self
100 }
101
102 pub fn labels<K, V>(mut self, labels: impl IntoIterator<Item = (K, V)>) -> Self
120 where
121 K: Into<Cow<'static, str>>,
122 V: Into<Cow<'static, str>>,
123 {
124 self.labels
125 .extend(labels.into_iter().map(|(k, v)| (k.into(), v.into())));
126 self
127 }
128
129 pub fn make_body(mut self, make_body: impl Into<Box<dyn MakeBody>>) -> Self {
149 self.make_body = Some(make_body.into());
150 self
151 }
152
153 pub fn build(self) -> OpentelemetryLog {
171 let OpentelemetryLogBuilder {
172 name,
173 log_exporter,
174 labels,
175 make_body,
176 } = self;
177
178 let resource = opentelemetry_sdk::Resource::builder()
179 .with_attributes(
180 labels
181 .into_iter()
182 .map(|(key, value)| opentelemetry::KeyValue::new(key, value)),
183 )
184 .build();
185
186 let provider = SdkLoggerProvider::builder()
187 .with_batch_exporter(log_exporter)
188 .with_resource(resource)
189 .build();
190
191 let library = InstrumentationScope::builder(name).build();
192
193 let logger = provider.logger_with_scope(library);
194
195 OpentelemetryLog {
196 make_body,
197 logger,
198 provider,
199 }
200 }
201}
202
203#[derive(Debug)]
220pub struct OpentelemetryLog {
221 make_body: Option<Box<dyn MakeBody>>,
222 logger: opentelemetry_sdk::logs::SdkLogger,
223 provider: SdkLoggerProvider,
224}
225
226impl Append for OpentelemetryLog {
227 fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
228 let now = SystemTime::now();
229
230 let mut log_record = self.logger.create_log_record();
231 log_record.set_timestamp(now);
232 log_record.set_observed_timestamp(now);
233 log_record.set_severity_number(log_level_to_otel_severity(record.level()));
234 log_record.set_severity_text(record.level().name());
235 log_record.set_target(record.target().to_owned());
236
237 if let Some(make_body) = self.make_body.as_ref() {
238 log_record.set_body(make_body.create(record, diags)?);
239 } else if let Some(payload) = record.payload_static() {
240 log_record.set_body(AnyValue::from(payload));
241 } else {
242 log_record.set_body(AnyValue::from(record.payload().to_owned()));
243 }
244
245 if let Some(module_path) = record.module_path_static() {
246 log_record.add_attribute("module_path", module_path);
247 } else if let Some(module_path) = record.module_path() {
248 log_record.add_attribute("module_path", module_path.to_owned());
249 }
250
251 if let Some(file) = record.file_static() {
252 log_record.add_attribute("file", file);
253 } else if let Some(file) = record.file() {
254 log_record.add_attribute("file", file.to_owned());
255 }
256
257 if let Some(line) = record.line() {
258 log_record.add_attribute("line", line);
259 }
260
261 let mut extractor = KvExtractor {
262 record: &mut log_record,
263 };
264 record.key_values().visit(&mut extractor)?;
265 for d in diags {
266 d.visit(&mut extractor)?;
267 }
268
269 self.logger.emit(log_record);
270 Ok(())
271 }
272
273 fn flush(&self) -> Result<(), Error> {
274 self.provider
275 .force_flush()
276 .map_err(|err| Error::new("failed to flush records").set_source(err))
277 }
278}
279
280impl Drop for OpentelemetryLog {
281 fn drop(&mut self) {
282 let _ = self.provider.force_flush();
283 }
284}
285
286fn log_level_to_otel_severity(level: Level) -> opentelemetry::logs::Severity {
287 match level {
288 Level::Trace => opentelemetry::logs::Severity::Trace,
289 Level::Trace2 => opentelemetry::logs::Severity::Trace2,
290 Level::Trace3 => opentelemetry::logs::Severity::Trace3,
291 Level::Trace4 => opentelemetry::logs::Severity::Trace4,
292 Level::Debug => opentelemetry::logs::Severity::Debug,
293 Level::Debug2 => opentelemetry::logs::Severity::Debug2,
294 Level::Debug3 => opentelemetry::logs::Severity::Debug3,
295 Level::Debug4 => opentelemetry::logs::Severity::Debug4,
296 Level::Info => opentelemetry::logs::Severity::Info,
297 Level::Info2 => opentelemetry::logs::Severity::Info2,
298 Level::Info3 => opentelemetry::logs::Severity::Info3,
299 Level::Info4 => opentelemetry::logs::Severity::Info4,
300 Level::Warn => opentelemetry::logs::Severity::Warn,
301 Level::Warn2 => opentelemetry::logs::Severity::Warn2,
302 Level::Warn3 => opentelemetry::logs::Severity::Warn3,
303 Level::Warn4 => opentelemetry::logs::Severity::Warn4,
304 Level::Error => opentelemetry::logs::Severity::Error,
305 Level::Error2 => opentelemetry::logs::Severity::Error2,
306 Level::Error3 => opentelemetry::logs::Severity::Error3,
307 Level::Error4 => opentelemetry::logs::Severity::Error4,
308 Level::Fatal => opentelemetry::logs::Severity::Fatal,
309 Level::Fatal2 => opentelemetry::logs::Severity::Fatal2,
310 Level::Fatal3 => opentelemetry::logs::Severity::Fatal3,
311 Level::Fatal4 => opentelemetry::logs::Severity::Fatal4,
312 }
313}
314
315pub trait MakeBody: fmt::Debug + Send + Sync + 'static {
317 fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error>;
319}
320
321impl<T: MakeBody> From<T> for Box<dyn MakeBody> {
322 fn from(value: T) -> Self {
323 Box::new(value)
324 }
325}
326
327#[derive(Debug)]
329pub struct MakeBodyLayout {
330 layout: Box<dyn Layout>,
331}
332
333impl MakeBodyLayout {
334 pub fn new(layout: impl Into<Box<dyn Layout>>) -> Self {
336 MakeBodyLayout {
337 layout: layout.into(),
338 }
339 }
340}
341
342impl MakeBody for MakeBodyLayout {
343 fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error> {
344 let body = self.layout.format(record, diags)?;
345 Ok(AnyValue::Bytes(Box::new(body)))
346 }
347}
348
349struct KvExtractor<'a> {
350 record: &'a mut SdkLogRecord,
351}
352
353impl Visitor for KvExtractor<'_> {
354 fn visit(&mut self, key: Key, value: Value) -> Result<(), Error> {
355 let key = key.to_cow();
356 let value = value.to_string();
357 self.record.add_attribute(key, value);
358 Ok(())
359 }
360}