logforth_append_opentelemetry/
lib.rs1#![cfg_attr(docsrs, feature(doc_cfg))]
18#![deny(missing_docs)]
19
20use std::borrow::Cow;
21use std::collections::HashMap;
22use std::fmt;
23use std::time::SystemTime;
24
25use logforth_core::Diagnostic;
26use logforth_core::Error;
27use logforth_core::Layout;
28use logforth_core::append::Append;
29use logforth_core::kv::KeyView;
30use logforth_core::kv::ValueView;
31use logforth_core::kv::Visitor;
32use logforth_core::record::Level;
33use logforth_core::record::Record;
34use opentelemetry::InstrumentationScope;
35use opentelemetry::Key;
36use opentelemetry::logs::AnyValue;
37use opentelemetry::logs::LogRecord;
38use opentelemetry::logs::Logger;
39use opentelemetry::logs::LoggerProvider;
40use opentelemetry_otlp::LogExporter;
41use opentelemetry_sdk::logs::SdkLogRecord;
42use opentelemetry_sdk::logs::SdkLoggerProvider;
43
44#[derive(Debug)]
46pub struct OpentelemetryLogBuilder {
47 name: String,
48 log_exporter: LogExporter,
49 labels: Vec<(Cow<'static, str>, Cow<'static, str>)>,
50 make_body: Option<Box<dyn MakeBody>>,
51}
52
53impl OpentelemetryLogBuilder {
54 pub fn new(name: impl Into<String>, log_exporter: impl Into<LogExporter>) -> Self {
71 OpentelemetryLogBuilder {
72 name: name.into(),
73 log_exporter: log_exporter.into(),
74 labels: vec![],
75 make_body: None,
76 }
77 }
78
79 pub fn label(
97 mut self,
98 key: impl Into<Cow<'static, str>>,
99 value: impl Into<Cow<'static, str>>,
100 ) -> Self {
101 self.labels.push((key.into(), value.into()));
102 self
103 }
104
105 pub fn labels<K, V>(mut self, labels: impl IntoIterator<Item = (K, V)>) -> Self
123 where
124 K: Into<Cow<'static, str>>,
125 V: Into<Cow<'static, str>>,
126 {
127 self.labels
128 .extend(labels.into_iter().map(|(k, v)| (k.into(), v.into())));
129 self
130 }
131
132 pub fn make_body(mut self, make_body: impl Into<Box<dyn MakeBody>>) -> Self {
152 self.make_body = Some(make_body.into());
153 self
154 }
155
156 pub fn build(self) -> OpentelemetryLog {
174 let OpentelemetryLogBuilder {
175 name,
176 log_exporter,
177 labels,
178 make_body,
179 } = self;
180
181 let resource = opentelemetry_sdk::Resource::builder()
182 .with_attributes(
183 labels
184 .into_iter()
185 .map(|(key, value)| opentelemetry::KeyValue::new(key, value)),
186 )
187 .build();
188
189 let provider = SdkLoggerProvider::builder()
190 .with_batch_exporter(log_exporter)
191 .with_resource(resource)
192 .build();
193
194 let library = InstrumentationScope::builder(name).build();
195
196 let logger = provider.logger_with_scope(library);
197
198 OpentelemetryLog {
199 make_body,
200 logger,
201 provider,
202 }
203 }
204}
205
206#[derive(Debug)]
223pub struct OpentelemetryLog {
224 make_body: Option<Box<dyn MakeBody>>,
225 logger: opentelemetry_sdk::logs::SdkLogger,
226 provider: SdkLoggerProvider,
227}
228
229impl Append for OpentelemetryLog {
230 fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
231 let now = SystemTime::now();
232
233 let mut log_record = self.logger.create_log_record();
234 log_record.set_timestamp(now);
235 log_record.set_observed_timestamp(now);
236 log_record.set_severity_number(log_level_to_otel_severity(record.level()));
237 log_record.set_severity_text(record.level().name());
238
239 if let Some(target) = record.target_static() {
240 log_record.set_target(target);
241 } else {
242 log_record.set_target(record.target().to_owned());
243 }
244
245 if let Some(make_body) = self.make_body.as_ref() {
246 log_record.set_body(make_body.create(record, diags)?);
247 } else if let Some(payload) = record.payload_static() {
248 log_record.set_body(AnyValue::from(payload));
249 } else {
250 log_record.set_body(AnyValue::from(record.payload().to_string()));
251 }
252
253 if let Some(module_path) = record.module_path_static() {
254 log_record.add_attribute("module_path", module_path);
255 } else if let Some(module_path) = record.module_path() {
256 log_record.add_attribute("module_path", module_path.to_owned());
257 }
258
259 if let Some(file) = record.file_static() {
260 log_record.add_attribute("file", file);
261 } else if let Some(file) = record.file() {
262 log_record.add_attribute("file", file.to_owned());
263 }
264
265 if let Some(line) = record.line() {
266 log_record.add_attribute("line", line);
267 }
268
269 if let Some(column) = record.column() {
270 log_record.add_attribute("column", column);
271 }
272
273 let mut extractor = KvExtractor {
274 record: &mut log_record,
275 };
276 record.key_values().visit(&mut extractor)?;
277 for d in diags {
278 d.visit(&mut extractor)?;
279 }
280
281 self.logger.emit(log_record);
282 Ok(())
283 }
284
285 fn flush(&self) -> Result<(), Error> {
286 self.provider
287 .force_flush()
288 .map_err(|err| Error::new("failed to flush records").with_source(err))
289 }
290}
291
292impl Drop for OpentelemetryLog {
293 fn drop(&mut self) {
294 let _ = self.provider.force_flush();
295 }
296}
297
298fn log_level_to_otel_severity(level: Level) -> opentelemetry::logs::Severity {
299 match level {
300 Level::Trace => opentelemetry::logs::Severity::Trace,
301 Level::Trace2 => opentelemetry::logs::Severity::Trace2,
302 Level::Trace3 => opentelemetry::logs::Severity::Trace3,
303 Level::Trace4 => opentelemetry::logs::Severity::Trace4,
304 Level::Debug => opentelemetry::logs::Severity::Debug,
305 Level::Debug2 => opentelemetry::logs::Severity::Debug2,
306 Level::Debug3 => opentelemetry::logs::Severity::Debug3,
307 Level::Debug4 => opentelemetry::logs::Severity::Debug4,
308 Level::Info => opentelemetry::logs::Severity::Info,
309 Level::Info2 => opentelemetry::logs::Severity::Info2,
310 Level::Info3 => opentelemetry::logs::Severity::Info3,
311 Level::Info4 => opentelemetry::logs::Severity::Info4,
312 Level::Warn => opentelemetry::logs::Severity::Warn,
313 Level::Warn2 => opentelemetry::logs::Severity::Warn2,
314 Level::Warn3 => opentelemetry::logs::Severity::Warn3,
315 Level::Warn4 => opentelemetry::logs::Severity::Warn4,
316 Level::Error => opentelemetry::logs::Severity::Error,
317 Level::Error2 => opentelemetry::logs::Severity::Error2,
318 Level::Error3 => opentelemetry::logs::Severity::Error3,
319 Level::Error4 => opentelemetry::logs::Severity::Error4,
320 Level::Fatal => opentelemetry::logs::Severity::Fatal,
321 Level::Fatal2 => opentelemetry::logs::Severity::Fatal2,
322 Level::Fatal3 => opentelemetry::logs::Severity::Fatal3,
323 Level::Fatal4 => opentelemetry::logs::Severity::Fatal4,
324 }
325}
326
327pub trait MakeBody: fmt::Debug + Send + Sync + 'static {
329 fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error>;
331}
332
333impl<T: MakeBody> From<T> for Box<dyn MakeBody> {
334 fn from(value: T) -> Self {
335 Box::new(value)
336 }
337}
338
339#[derive(Debug)]
341pub struct MakeBodyLayout {
342 layout: Box<dyn Layout>,
343}
344
345impl MakeBodyLayout {
346 pub fn new(layout: impl Into<Box<dyn Layout>>) -> Self {
348 MakeBodyLayout {
349 layout: layout.into(),
350 }
351 }
352}
353
354impl MakeBody for MakeBodyLayout {
355 fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error> {
356 let body = self.layout.format(record, diags)?;
357 Ok(AnyValue::Bytes(Box::new(body)))
358 }
359}
360
361struct KvExtractor<'a> {
362 record: &'a mut SdkLogRecord,
363}
364
365impl Visitor for KvExtractor<'_> {
366 fn visit(&mut self, key: KeyView, value: ValueView) -> Result<(), Error> {
367 let key = key_to_key(key);
368 let value = value_to_any_value(value);
369 self.record.add_attribute(key, value);
370 Ok(())
371 }
372}
373
374fn key_to_key(key: KeyView) -> Key {
375 Key::from(key.to_cow())
376}
377
378fn value_to_any_value(value: ValueView) -> AnyValue {
379 match value {
380 ValueView::None => AnyValue::String("null".into()),
382 ValueView::BorrowedStr(v) => AnyValue::String(v.to_string().into()),
383 ValueView::StaticStr(v) => AnyValue::String(v.into()),
384 ValueView::Char(v) => AnyValue::String(v.to_string().into()),
385 ValueView::Debug(v) => AnyValue::String(v.to_string().into()),
386 ValueView::Display(v) => AnyValue::String(v.to_string().into()),
387 ValueView::Bool(v) => AnyValue::Boolean(v),
388 ValueView::I64(v) => AnyValue::Int(v),
389 ValueView::F64(v) => AnyValue::Double(v),
390 ValueView::U64(v) => {
393 if let Ok(i) = i64::try_from(v) {
394 AnyValue::Int(i)
395 } else {
396 AnyValue::String(v.to_string().into())
397 }
398 }
399 ValueView::I128(v) => {
400 if let Ok(i) = i64::try_from(v) {
401 AnyValue::Int(i)
402 } else {
403 AnyValue::String(v.to_string().into())
404 }
405 }
406 ValueView::U128(v) => {
407 if let Ok(i) = i64::try_from(v) {
408 AnyValue::Int(i)
409 } else {
410 AnyValue::String(v.to_string().into())
411 }
412 }
413 ValueView::List(v) => {
414 let mut l = Vec::new();
415 for item in v.iter() {
416 l.push(value_to_any_value(item));
417 }
418 AnyValue::ListAny(Box::new(l))
419 }
420 ValueView::Map(v) => {
421 let mut m = HashMap::new();
422 for (k, v) in v.iter() {
423 m.insert(key_to_key(k), value_to_any_value(v));
424 }
425 AnyValue::Map(Box::new(m))
426 }
427 v => AnyValue::String(v.to_string().into()),
428 }
429}