use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;
use std::time::Duration;
pub trait DbEventListener: std::fmt::Debug + Send + Sync + 'static {
fn on_event(&self, event: DbEvent);
}
#[derive(Debug, Clone)]
pub enum DbEvent {
FlushThreadStarted,
FlushThreadStopping,
FlushThreadPanicked,
WalRotateFailed { error: String },
WalCheckpointFailed { error: String },
SegmentIngestFailed { error: String },
SegmentCompactionFailed { error: String },
SnapshotCreated { path: PathBuf, timestamp: u64 },
SnapshotFailed { error: String },
RetentionAdvanced { delete_before: u64 },
RetentionAdvanceFailed { delete_before: u64, error: String },
}
#[derive(Debug)]
pub struct NoopEventListener;
impl DbEventListener for NoopEventListener {
#[inline]
fn on_event(&self, _event: DbEvent) {}
}
pub fn noop_event_listener() -> Arc<dyn DbEventListener> {
Arc::new(NoopEventListener)
}
pub mod db_metrics {
use super::*;
use ::metrics::{Unit, describe_counter, describe_gauge, describe_histogram};
#[cfg(feature = "prometheus")]
use metrics_exporter_prometheus::{BuildError, PrometheusBuilder, PrometheusHandle};
pub const INGEST_POINTS: &str = "ugnos_ingest_points";
pub const FLUSH_POINTS: &str = "ugnos_flush_points";
pub const FLUSH_DURATION_SECONDS: &str = "ugnos_flush_duration_seconds";
pub const FLUSH_E2E_DURATION_SECONDS: &str = "ugnos_flush_end_to_end_duration_seconds";
pub const WAL_BYTES_WRITTEN: &str = "ugnos_wal_bytes_written";
pub const WAL_FSYNC_DURATION_SECONDS: &str = "ugnos_wal_fsync_duration_seconds";
pub const SNAPSHOT_DURATION_SECONDS: &str = "ugnos_snapshot_duration_seconds";
pub const SNAPSHOT_SIZE_BYTES: &str = "ugnos_snapshot_size_bytes";
pub const CARDINALITY_LIMIT_REJECTIONS: &str = "ugnos_cardinality_limit_rejections";
pub const SERIES_CARDINALITY: &str = "ugnos_series_cardinality";
pub const TAG_POSTINGS_SEGMENT_SKIPS: &str = "ugnos_tag_postings_segment_skips";
pub const GRPC_REQUESTS: &str = "ugnos_grpc_requests";
pub const GRPC_REQUEST_DURATION_SECONDS: &str = "ugnos_grpc_request_duration_seconds";
pub const REMOTE_WRITE_REJECTIONS: &str = "ugnos_remote_write_rejections";
#[cfg(feature = "prometheus")]
#[derive(Debug)]
pub struct InProcessPrometheus {
handle: PrometheusHandle,
stop: Arc<AtomicBool>,
upkeep_thread: Mutex<Option<JoinHandle<()>>>,
}
#[cfg(feature = "prometheus")]
impl InProcessPrometheus {
pub fn install(upkeep_interval: Duration) -> Result<Self, MetricsInitError> {
describe_all();
let builder = PrometheusBuilder::new();
let handle = builder
.install_recorder()
.map_err(MetricsInitError::from_build_error)?;
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle_clone = handle.clone();
let upkeep_thread = std::thread::Builder::new()
.name("ugnos-metrics-upkeep".to_string())
.spawn(move || {
while !stop_clone.load(Ordering::Relaxed) {
std::thread::sleep(upkeep_interval);
handle_clone.run_upkeep();
}
})
.map_err(|e| MetricsInitError::ThreadSpawn(e.to_string()))?;
Ok(Self {
handle,
stop,
upkeep_thread: Mutex::new(Some(upkeep_thread)),
})
}
pub fn render(&self) -> String {
self.handle.render()
}
}
#[cfg(feature = "prometheus")]
impl Drop for InProcessPrometheus {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Ok(mut guard) = self.upkeep_thread.lock() {
if let Some(t) = guard.take() {
let _ = t.join();
}
}
}
}
#[cfg(feature = "prometheus")]
#[derive(Debug, thiserror::Error)]
pub enum MetricsInitError {
#[error("metrics recorder already installed")]
AlreadyInstalled,
#[error("failed to install prometheus recorder: {0}")]
Install(String),
#[error("failed to spawn upkeep thread: {0}")]
ThreadSpawn(String),
}
#[cfg(feature = "prometheus")]
impl MetricsInitError {
fn from_build_error(e: BuildError) -> Self {
match e {
BuildError::FailedToSetGlobalRecorder(_) => MetricsInitError::AlreadyInstalled,
other => MetricsInitError::Install(other.to_string()),
}
}
}
#[inline]
pub fn record_ingest_points(points: u64) {
if points > 0 {
::metrics::counter!(INGEST_POINTS).increment(points);
}
}
#[inline]
pub fn record_flush(duration: Duration, points: u64) {
::metrics::histogram!(FLUSH_DURATION_SECONDS).record(duration.as_secs_f64());
if points > 0 {
::metrics::counter!(FLUSH_POINTS).increment(points);
}
}
#[inline]
pub fn record_flush_end_to_end(duration: Duration) {
::metrics::histogram!(FLUSH_E2E_DURATION_SECONDS).record(duration.as_secs_f64());
}
#[inline]
pub fn record_wal_bytes_written(bytes: u64) {
if bytes > 0 {
::metrics::counter!(WAL_BYTES_WRITTEN).increment(bytes);
}
}
#[inline]
pub fn record_wal_fsync(duration: Duration) {
::metrics::histogram!(WAL_FSYNC_DURATION_SECONDS).record(duration.as_secs_f64());
}
#[inline]
pub fn record_snapshot(duration: Duration, size_bytes: u64) {
::metrics::histogram!(SNAPSHOT_DURATION_SECONDS).record(duration.as_secs_f64());
::metrics::gauge!(SNAPSHOT_SIZE_BYTES).set(size_bytes as f64);
}
#[inline]
pub fn record_cardinality_limit_rejected(scope: &str) {
::metrics::counter!(CARDINALITY_LIMIT_REJECTIONS, "scope" => scope.to_string())
.increment(1);
}
#[inline]
pub fn record_series_cardinality(scope: &str, count: u64) {
::metrics::gauge!(SERIES_CARDINALITY, "scope" => scope.to_string()).set(count as f64);
}
#[inline]
pub fn record_tag_postings_segment_skip() {
::metrics::counter!(TAG_POSTINGS_SEGMENT_SKIPS).increment(1);
}
#[inline]
pub fn record_remote_write_rejected(reason: &str) {
::metrics::counter!(REMOTE_WRITE_REJECTIONS, "reason" => reason.to_string()).increment(1);
}
#[inline]
pub fn record_grpc_request(method: &str, duration: Duration, grpc_code: &str) {
::metrics::counter!(
GRPC_REQUESTS,
"method" => method.to_string(),
"grpc_code" => grpc_code.to_string()
)
.increment(1);
::metrics::histogram!(
GRPC_REQUEST_DURATION_SECONDS,
"method" => method.to_string()
)
.record(duration.as_secs_f64());
}
fn describe_all() {
describe_counter!(
INGEST_POINTS,
Unit::Count,
"Total number of points ingested via DbCore::insert."
);
describe_counter!(
FLUSH_POINTS,
Unit::Count,
"Total number of points flushed from the write buffer."
);
describe_counter!(
WAL_BYTES_WRITTEN,
Unit::Bytes,
"Total number of bytes written to the WAL (logical bytes, not including filesystem metadata)."
);
describe_histogram!(
FLUSH_DURATION_SECONDS,
Unit::Seconds,
"Flush processing time in the background flush thread."
);
describe_histogram!(
FLUSH_E2E_DURATION_SECONDS,
Unit::Seconds,
"End-to-end latency observed by DbCore::flush (enqueue->ack)."
);
describe_histogram!(
WAL_FSYNC_DURATION_SECONDS,
Unit::Seconds,
"Duration of WAL fsync/sync_data calls."
);
describe_histogram!(
SNAPSHOT_DURATION_SECONDS,
Unit::Seconds,
"Time to create and durably install a snapshot."
);
describe_gauge!(
SNAPSHOT_SIZE_BYTES,
Unit::Bytes,
"Size of the most recently created snapshot file."
);
describe_counter!(
CARDINALITY_LIMIT_REJECTIONS,
Unit::Count,
"Number of inserts rejected due to series cardinality limit."
);
describe_counter!(
TAG_POSTINGS_SEGMENT_SKIPS,
Unit::Count,
"Number of times segment-level postings avoided reading a non-matching series block."
);
describe_gauge!(
SERIES_CARDINALITY,
Unit::Count,
"Current number of distinct series (series key = series name + tag set) per scope."
);
describe_counter!(
GRPC_REQUESTS,
Unit::Count,
"Total gRPC requests processed by the service, labelled by method and grpc_code."
);
describe_histogram!(
GRPC_REQUEST_DURATION_SECONDS,
Unit::Seconds,
"gRPC request processing duration by method."
);
describe_counter!(
REMOTE_WRITE_REJECTIONS,
Unit::Count,
"Remote write (POST /api/v1/write) requests rejected, labelled by reason (invalid_payload, cardinality_limit)."
);
}
}