use crate::error::{OTelSdkError, OTelSdkResult};
use crate::logs::SdkLogRecord;
use crate::logs::{LogBatch, LogExporter};
use crate::Resource;
use opentelemetry::InstrumentationScope;
use std::borrow::Cow;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::time;
#[derive(Clone, Debug)]
pub struct InMemoryLogExporter {
logs: Arc<Mutex<Vec<OwnedLogData>>>,
resource: Arc<Mutex<Resource>>,
should_reset_on_shutdown: bool,
shutdown_called: Arc<AtomicBool>,
}
impl Default for InMemoryLogExporter {
fn default() -> Self {
InMemoryLogExporterBuilder::new().build()
}
}
#[derive(Debug, Clone)]
pub struct OwnedLogData {
pub record: SdkLogRecord,
pub instrumentation: InstrumentationScope,
}
#[derive(Clone, Debug)]
pub struct LogDataWithResource {
pub record: SdkLogRecord,
pub instrumentation: InstrumentationScope,
pub resource: Cow<'static, Resource>,
}
#[derive(Debug, Clone)]
pub struct InMemoryLogExporterBuilder {
reset_on_shutdown: bool,
}
impl Default for InMemoryLogExporterBuilder {
fn default() -> Self {
Self::new()
}
}
impl InMemoryLogExporterBuilder {
pub fn new() -> Self {
Self {
reset_on_shutdown: true,
}
}
pub fn build(&self) -> InMemoryLogExporter {
InMemoryLogExporter {
logs: Arc::new(Mutex::new(Vec::new())),
resource: Arc::new(Mutex::new(Resource::builder().build())),
should_reset_on_shutdown: self.reset_on_shutdown,
shutdown_called: Arc::new(AtomicBool::new(false)),
}
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn keep_records_on_shutdown(self) -> Self {
Self {
reset_on_shutdown: false,
}
}
}
impl InMemoryLogExporter {
pub fn is_shutdown_called(&self) -> bool {
self.shutdown_called
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn get_emitted_logs(&self) -> Result<Vec<LogDataWithResource>, OTelSdkError> {
let logs_guard = self.logs.lock().map_err(OTelSdkError::from)?;
let resource_guard = self.resource.lock().map_err(OTelSdkError::from)?;
let logs: Vec<LogDataWithResource> = logs_guard
.iter()
.map(|log_data| LogDataWithResource {
record: log_data.record.clone(),
resource: Cow::Owned(resource_guard.clone()),
instrumentation: log_data.instrumentation.clone(),
})
.collect();
Ok(logs)
}
pub fn reset(&self) {
let _ = self
.logs
.lock()
.map(|mut logs_guard| logs_guard.clear())
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to reset logs: {e}")));
}
}
impl LogExporter for InMemoryLogExporter {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let mut logs_guard = self.logs.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {e}"))
})?;
for (log_record, instrumentation) in batch.iter() {
let owned_log = OwnedLogData {
record: (*log_record).clone(),
instrumentation: (*instrumentation).clone(),
};
logs_guard.push(owned_log);
}
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {
self.shutdown_called
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.should_reset_on_shutdown {
self.reset();
}
Ok(())
}
fn set_resource(&mut self, resource: &Resource) {
let mut res_guard = self.resource.lock().expect("Resource lock poisoned");
*res_guard = resource.clone();
}
}