use crate::config::OtlpSdkConfig;
use crate::error::ZerobusError;
#[cfg(feature = "observability")]
use std::sync::Arc;
#[cfg(feature = "observability")]
use otlp_arrow_library::{Config as OtlpLibraryConfig, OtlpLibrary};
#[derive(Clone)]
pub struct ObservabilityManager {
#[cfg(feature = "observability")]
library: Option<Arc<OtlpLibrary>>,
#[cfg(not(feature = "observability"))]
_phantom: std::marker::PhantomData<()>,
}
impl ObservabilityManager {
pub async fn new_async(config: Option<OtlpSdkConfig>) -> Option<Self> {
let _config = match config {
Some(c) => c,
None => return None,
};
#[cfg(feature = "observability")]
{
use otlp_arrow_library::ConfigBuilder;
let mut builder = ConfigBuilder::default();
if let Some(output_dir) = &_config.output_dir {
builder = builder.output_dir(output_dir.clone());
}
builder = builder.write_interval_secs(_config.write_interval_secs);
let log_level = _config.log_level.to_lowercase();
std::env::set_var(
"RUST_LOG",
format!("arrow_zerobus_sdk_wrapper={}", log_level),
);
let library_config = builder.build().unwrap_or_else(|_| {
tracing::warn!("Failed to build SDK config, using defaults");
OtlpLibraryConfig::default()
});
match OtlpLibrary::new(library_config).await {
Ok(library) => Some(Self {
library: Some(Arc::new(library)),
}),
Err(e) => {
tracing::warn!("Failed to initialize OtlpLibrary: {}", e);
None
}
}
}
#[cfg(not(feature = "observability"))]
{
None
}
}
pub async fn record_batch_sent(&self, batch_size_bytes: usize, success: bool, latency_ms: u64) {
#[cfg(feature = "observability")]
{
if self.library.is_some() {
tracing::info!(
metric.name = "zerobus.batch.size_bytes",
metric.value = batch_size_bytes,
metric.unit = "bytes",
batch_size_bytes = batch_size_bytes,
success = success,
latency_ms = latency_ms,
"zerobus.batch.metrics"
);
tracing::info!(
metric.name = "zerobus.batch.success",
metric.value = if success { 1i64 } else { 0i64 },
success = success,
"zerobus.batch.metrics"
);
tracing::info!(
metric.name = "zerobus.batch.latency_ms",
metric.value = latency_ms,
metric.unit = "ms",
latency_ms = latency_ms,
"zerobus.batch.metrics"
);
}
}
#[cfg(not(feature = "observability"))]
{
let _ = (batch_size_bytes, success, latency_ms);
}
}
pub fn start_send_batch_span(&self, table_name: &str) -> ObservabilitySpan {
let start_time = std::time::SystemTime::now();
#[cfg(feature = "observability")]
{
ObservabilitySpan {
_table_name: table_name.to_string(),
start_time,
library: self.library.clone(),
}
}
#[cfg(not(feature = "observability"))]
{
let _ = table_name;
ObservabilitySpan {
_table_name: String::new(),
start_time,
}
}
}
pub async fn flush(&self) -> Result<(), ZerobusError> {
#[cfg(feature = "observability")]
{
if let Some(library) = &self.library {
library.flush().await.map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to flush observability data: {}",
e
))
})?;
}
}
Ok(())
}
pub async fn shutdown(&self) -> Result<(), ZerobusError> {
#[cfg(feature = "observability")]
{
if let Some(library) = &self.library {
library.shutdown().await.map_err(|e| {
ZerobusError::ConfigurationError(format!(
"Failed to shutdown observability: {}",
e
))
})?;
}
}
Ok(())
}
}
pub struct ObservabilitySpan {
_table_name: String,
#[allow(dead_code)] start_time: std::time::SystemTime,
#[cfg(feature = "observability")]
library: Option<Arc<OtlpLibrary>>,
}
impl Drop for ObservabilitySpan {
fn drop(&mut self) {
#[cfg(feature = "observability")]
{
if self.library.is_some() {
let end_time = std::time::SystemTime::now();
let duration = end_time
.duration_since(self.start_time)
.unwrap_or_default()
.as_millis() as u64;
tracing::info!(
span.name = "zerobus.send_batch",
span.table_name = %self._table_name,
span.duration_ms = duration,
"zerobus.send_batch.completed"
);
}
}
}
}