use crate::error::OTelSdkResult;
use crate::{logs::SdkLogRecord, Resource};
use opentelemetry::logs::Severity;
use opentelemetry::{otel_warn, InstrumentationScope};
use std::fmt::Debug;
use std::time::Duration;
pub trait LogProcessor: Send + Sync + Debug {
fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope);
fn force_flush(&self) -> OTelSdkResult;
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
otel_warn!(
name: "LogProcessor.DefaultShutdownWithTimeout",
message = "LogProcessor is using default shutdown implementation. If this processor manages background threads, network connections, file handles, or other resources that need cleanup, implement `shutdown_with_timeout()` to properly release them. Simple processors that only transform log data can safely use this default."
);
Ok(())
}
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool {
true
}
fn set_resource(&mut self, _resource: &Resource) {}
}
#[cfg(all(test, feature = "testing", feature = "logs"))]
pub(crate) mod tests {
use crate::logs::{LogBatch, LogExporter, SdkLogRecord};
use crate::Resource;
use crate::{
error::OTelSdkResult,
logs::{LogProcessor, SdkLoggerProvider},
};
use opentelemetry::logs::AnyValue;
use opentelemetry::logs::LogRecord as _;
use opentelemetry::logs::{Logger, LoggerProvider};
use opentelemetry::{InstrumentationScope, Key};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
pub(crate) struct MockLogExporter {
pub resource: Arc<Mutex<Option<Resource>>>,
}
impl LogExporter for MockLogExporter {
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
Ok(())
}
fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_opt| {
res_opt.replace(resource.clone());
})
.expect("mock log exporter shouldn't error when setting resource");
}
}
impl MockLogExporter {
pub(crate) fn get_resource(&self) -> Option<Resource> {
(*self.resource).lock().unwrap().clone()
}
}
#[derive(Debug)]
struct FirstProcessor {
pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
}
impl LogProcessor for FirstProcessor {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
record.add_attribute(
Key::from_static_str("processed_by"),
AnyValue::String("FirstProcessor".into()),
);
record.body = Some("Updated by FirstProcessor".into());
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone())); }
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
#[derive(Debug)]
struct SecondProcessor {
pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
}
impl LogProcessor for SecondProcessor {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
assert!(record.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone()));
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
}
#[test]
fn test_log_data_modification_by_multiple_processors() {
let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
let first_processor = FirstProcessor {
logs: Arc::clone(&first_processor_logs),
};
let second_processor = SecondProcessor {
logs: Arc::clone(&second_processor_logs),
};
let logger_provider = SdkLoggerProvider::builder()
.with_log_processor(first_processor)
.with_log_processor(second_processor)
.build();
let logger = logger_provider.logger("test-logger");
let mut log_record = logger.create_log_record();
log_record.body = Some(AnyValue::String("Test log".into()));
logger.emit(log_record);
assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
let first_log = &first_processor_logs.lock().unwrap()[0];
let second_log = &second_processor_logs.lock().unwrap()[0];
assert!(first_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(second_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(
first_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
assert!(
second_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
}
}