logforth_append_opentelemetry/
lib.rs1#![cfg_attr(docsrs, feature(doc_cfg))]
18
19use std::borrow::Cow;
20use std::fmt;
21use std::time::SystemTime;
22
23use logforth_core::Diagnostic;
24use logforth_core::Error;
25use logforth_core::Layout;
26use logforth_core::append::Append;
27use logforth_core::kv::Key;
28use logforth_core::kv::Value;
29use logforth_core::kv::Visitor;
30use logforth_core::record::Level;
31use logforth_core::record::Record;
32use opentelemetry::InstrumentationScope;
33use opentelemetry::logs::AnyValue;
34use opentelemetry::logs::LogRecord;
35use opentelemetry::logs::Logger;
36use opentelemetry::logs::LoggerProvider;
37use opentelemetry_otlp::LogExporter;
38use opentelemetry_sdk::logs::SdkLogRecord;
39use opentelemetry_sdk::logs::SdkLoggerProvider;
40
41#[derive(Debug)]
43pub struct OpentelemetryLogBuilder {
44 name: String,
45 log_exporter: LogExporter,
46 labels: Vec<(Cow<'static, str>, Cow<'static, str>)>,
47 make_body: Option<Box<dyn MakeBody>>,
48}
49
50impl OpentelemetryLogBuilder {
51 pub fn new(name: impl Into<String>, log_exporter: impl Into<LogExporter>) -> Self {
68 OpentelemetryLogBuilder {
69 name: name.into(),
70 log_exporter: log_exporter.into(),
71 labels: vec![],
72 make_body: None,
73 }
74 }
75
76 pub fn label(
94 mut self,
95 key: impl Into<Cow<'static, str>>,
96 value: impl Into<Cow<'static, str>>,
97 ) -> Self {
98 self.labels.push((key.into(), value.into()));
99 self
100 }
101
102 pub fn labels<K, V>(mut self, labels: impl IntoIterator<Item = (K, V)>) -> Self
120 where
121 K: Into<Cow<'static, str>>,
122 V: Into<Cow<'static, str>>,
123 {
124 self.labels
125 .extend(labels.into_iter().map(|(k, v)| (k.into(), v.into())));
126 self
127 }
128
129 pub fn make_body(mut self, make_body: impl Into<Box<dyn MakeBody>>) -> Self {
149 self.make_body = Some(make_body.into());
150 self
151 }
152
153 pub fn build(self) -> OpentelemetryLog {
171 let OpentelemetryLogBuilder {
172 name,
173 log_exporter,
174 labels,
175 make_body,
176 } = self;
177
178 let resource = opentelemetry_sdk::Resource::builder()
179 .with_attributes(
180 labels
181 .into_iter()
182 .map(|(key, value)| opentelemetry::KeyValue::new(key, value)),
183 )
184 .build();
185
186 let provider = SdkLoggerProvider::builder()
187 .with_batch_exporter(log_exporter)
188 .with_resource(resource)
189 .build();
190
191 let library = InstrumentationScope::builder(name).build();
192
193 let logger = provider.logger_with_scope(library);
194
195 OpentelemetryLog {
196 make_body,
197 logger,
198 provider,
199 }
200 }
201}
202
203#[derive(Debug)]
220pub struct OpentelemetryLog {
221 make_body: Option<Box<dyn MakeBody>>,
222 logger: opentelemetry_sdk::logs::SdkLogger,
223 provider: SdkLoggerProvider,
224}
225
226impl Append for OpentelemetryLog {
227 fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
228 let now = SystemTime::now();
229
230 let mut log_record = self.logger.create_log_record();
231 log_record.set_timestamp(now);
232 log_record.set_observed_timestamp(now);
233 log_record.set_severity_number(log_level_to_otel_severity(record.level()));
234 log_record.set_severity_text(record.level().as_str());
235 log_record.set_target(record.target().to_owned());
236
237 if let Some(make_body) = self.make_body.as_ref() {
238 log_record.set_body(make_body.create(record, diags)?);
239 } else if let Some(payload) = record.payload_static() {
240 log_record.set_body(AnyValue::from(payload));
241 } else {
242 log_record.set_body(AnyValue::from(record.payload().to_owned()));
243 }
244
245 if let Some(module_path) = record.module_path_static() {
246 log_record.add_attribute("module_path", module_path);
247 } else if let Some(module_path) = record.module_path() {
248 log_record.add_attribute("module_path", module_path.to_owned());
249 }
250
251 if let Some(file) = record.file_static() {
252 log_record.add_attribute("file", file);
253 } else if let Some(file) = record.file() {
254 log_record.add_attribute("file", file.to_owned());
255 }
256
257 if let Some(line) = record.line() {
258 log_record.add_attribute("line", line);
259 }
260
261 let mut extractor = KvExtractor {
262 record: &mut log_record,
263 };
264 record.key_values().visit(&mut extractor)?;
265 for d in diags {
266 d.visit(&mut extractor)?;
267 }
268
269 self.logger.emit(log_record);
270 Ok(())
271 }
272
273 fn flush(&self) -> Result<(), Error> {
274 self.provider
275 .force_flush()
276 .map_err(|err| Error::new("failed to flush records").set_source(err))
277 }
278}
279
280impl Drop for OpentelemetryLog {
281 fn drop(&mut self) {
282 let _ = self.provider.force_flush();
283 }
284}
285
286fn log_level_to_otel_severity(level: Level) -> opentelemetry::logs::Severity {
287 match level {
288 Level::Error => opentelemetry::logs::Severity::Error,
289 Level::Warn => opentelemetry::logs::Severity::Warn,
290 Level::Info => opentelemetry::logs::Severity::Info,
291 Level::Debug => opentelemetry::logs::Severity::Debug,
292 Level::Trace => opentelemetry::logs::Severity::Trace,
293 }
294}
295
296pub trait MakeBody: fmt::Debug + Send + Sync + 'static {
298 fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error>;
300}
301
302impl<T: MakeBody> From<T> for Box<dyn MakeBody> {
303 fn from(value: T) -> Self {
304 Box::new(value)
305 }
306}
307
308#[derive(Debug)]
310pub struct MakeBodyLayout {
311 layout: Box<dyn Layout>,
312}
313
314impl MakeBodyLayout {
315 pub fn new(layout: impl Into<Box<dyn Layout>>) -> Self {
317 MakeBodyLayout {
318 layout: layout.into(),
319 }
320 }
321}
322
323impl MakeBody for MakeBodyLayout {
324 fn create(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<AnyValue, Error> {
325 let body = self.layout.format(record, diags)?;
326 Ok(AnyValue::Bytes(Box::new(body)))
327 }
328}
329
330struct KvExtractor<'a> {
331 record: &'a mut SdkLogRecord,
332}
333
334impl Visitor for KvExtractor<'_> {
335 fn visit(&mut self, key: Key, value: Value) -> Result<(), Error> {
336 let key = key.to_cow();
337 let value = value.to_string();
338 self.record.add_attribute(key, value);
339 Ok(())
340 }
341}