use crate::config::Config;
use crate::dashboard::server::DashboardServer;
use crate::error::OtlpError;
use crate::otlp::{BatchBuffer, OtlpFileExporter};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{Duration, interval};
use tracing::{error, info, warn};
#[derive(Clone, Debug)]
pub struct OtlpLibrary {
config: Config,
file_exporter: Arc<OtlpFileExporter>,
batch_buffer: Arc<BatchBuffer>,
write_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
cleanup_handles: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
dashboard_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
impl OtlpLibrary {
pub async fn new(config: Config) -> Result<Self, OtlpError> {
config.validate().map_err(OtlpError::from)?;
std::fs::create_dir_all(config.output_dir.join("otlp/traces")).map_err(|e| {
OtlpError::Io(std::io::Error::other(format!(
"Failed to create traces directory: {}",
e
)))
})?;
std::fs::create_dir_all(config.output_dir.join("otlp/metrics")).map_err(|e| {
OtlpError::Io(std::io::Error::other(format!(
"Failed to create metrics directory: {}",
e
)))
})?;
let file_exporter = Arc::new(
OtlpFileExporter::new(&config)
.map_err(|e| OtlpError::Io(std::io::Error::other(e.to_string())))?,
);
let batch_buffer = Arc::new(BatchBuffer::new(
config.write_interval_secs,
config.max_trace_buffer_size,
config.max_metric_buffer_size,
));
let write_handle = Arc::new(Mutex::new(None));
let file_exporter_clone = file_exporter.clone();
let batch_buffer_clone = batch_buffer.clone();
let write_interval = Duration::from_secs(config.write_interval_secs);
let handle = tokio::spawn(async move {
let mut interval_timer = interval(write_interval);
loop {
interval_timer.tick().await;
if batch_buffer_clone.should_write().await {
let traces = batch_buffer_clone.take_traces().await;
if !traces.is_empty()
&& let Err(e) = file_exporter_clone.export_traces(traces).await
{
warn!("Failed to export traces: {}", e);
}
let metrics_protobuf = batch_buffer_clone.take_metrics().await;
if !metrics_protobuf.is_empty() {
warn!(
"Batch buffer contains protobuf metrics - this should not happen with new API"
);
}
batch_buffer_clone.update_last_write().await;
}
}
});
{
let mut handle_guard = write_handle.lock().await;
*handle_guard = Some(handle);
}
let file_exporter_traces_cleanup = file_exporter.clone();
let trace_cleanup_interval = Duration::from_secs(config.trace_cleanup_interval_secs);
let trace_cleanup_handle = tokio::spawn(async move {
let mut interval_timer = interval(trace_cleanup_interval);
loop {
interval_timer.tick().await;
if let Err(e) = file_exporter_traces_cleanup
.cleanup_traces(config.trace_cleanup_interval_secs)
.await
{
warn!("Failed to cleanup trace files: {}", e);
}
}
});
let file_exporter_metrics_cleanup = file_exporter.clone();
let metric_cleanup_interval = Duration::from_secs(config.metric_cleanup_interval_secs);
let metric_cleanup_handle = tokio::spawn(async move {
let mut interval_timer = interval(metric_cleanup_interval);
loop {
interval_timer.tick().await;
if let Err(e) = file_exporter_metrics_cleanup
.cleanup_metrics(config.metric_cleanup_interval_secs)
.await
{
warn!("Failed to cleanup metric files: {}", e);
}
}
});
let cleanup_handles = Arc::new(Mutex::new(vec![
trace_cleanup_handle,
metric_cleanup_handle,
]));
let dashboard_handle = Arc::new(Mutex::new(None));
if config.dashboard.enabled {
let dashboard_server = DashboardServer::new(
config.dashboard.static_dir.clone(),
config.output_dir.clone(),
config.dashboard.port,
config.dashboard.bind_address.clone(),
);
match dashboard_server.start().await {
Ok(handle) => {
info!(
port = config.dashboard.port,
bind_address = %config.dashboard.bind_address,
static_dir = %config.dashboard.static_dir.display(),
"Dashboard HTTP server started"
);
info!(
"Dashboard available at http://{}:{}",
config.dashboard.bind_address, config.dashboard.port
);
{
let mut handle_guard = dashboard_handle.lock().await;
*handle_guard = Some(handle);
}
}
Err(e) => {
error!(
error = %e,
"Failed to start dashboard HTTP server, continuing without dashboard"
);
}
}
}
info!(
"OTLP library initialized with output directory: {}",
config.output_dir.display()
);
Ok(Self {
config,
file_exporter,
batch_buffer,
write_handle,
cleanup_handles,
dashboard_handle,
})
}
pub fn with_config_builder() -> crate::config::ConfigBuilder {
crate::config::ConfigBuilder::new()
}
pub async fn export_trace(
&self,
span: opentelemetry_sdk::trace::SpanData,
) -> Result<(), OtlpError> {
self.batch_buffer.add_trace(span).await
}
pub async fn export_traces(
&self,
spans: Vec<opentelemetry_sdk::trace::SpanData>,
) -> Result<(), OtlpError> {
self.batch_buffer.add_traces(spans).await
}
pub async fn export_metrics(
&self,
request: opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest,
) -> Result<(), OtlpError> {
self.file_exporter
.export_metrics_from_protobuf(&request)
.await
}
pub fn span_exporter(&self) -> crate::otlp::OtlpSpanExporter {
crate::otlp::OtlpSpanExporter::new(Arc::new(self.clone()))
}
pub async fn flush(&self) -> Result<(), OtlpError> {
let traces = self.batch_buffer.take_traces().await;
if !traces.is_empty() {
self.file_exporter.export_traces(traces).await?;
}
let metrics_protobuf = self.batch_buffer.take_metrics().await;
for metric_request in metrics_protobuf {
self.export_metrics(metric_request).await?;
}
self.file_exporter.flush().await?;
self.batch_buffer.update_last_write().await;
Ok(())
}
pub fn file_exporter(&self) -> Arc<OtlpFileExporter> {
self.file_exporter.clone()
}
pub async fn shutdown(&self) -> Result<(), OtlpError> {
self.flush().await?;
let mut dashboard_guard = self.dashboard_handle.lock().await;
if let Some(handle) = dashboard_guard.take() {
handle.abort();
}
let mut handle_guard = self.write_handle.lock().await;
if let Some(handle) = handle_guard.take() {
handle.abort();
}
let mut cleanup_guard = self.cleanup_handles.lock().await;
for handle in cleanup_guard.drain(..) {
handle.abort();
}
info!("OTLP library shutdown complete");
Ok(())
}
pub fn config(&self) -> &Config {
&self.config
}
}