use alloc::string::ToString;
use alloc::vec::Vec;
use core::sync::atomic::{AtomicU64, Ordering};
use lazy_static::lazy_static;
use spin::Mutex;
use super::{MetricId, Telemetry, TelemetryError};
use crate::storage::zpl::ZPL;
pub static READ_OPS: AtomicU64 = AtomicU64::new(0);
pub static WRITE_OPS: AtomicU64 = AtomicU64::new(0);
pub static BYTES_READ: AtomicU64 = AtomicU64::new(0);
pub static BYTES_WRITTEN: AtomicU64 = AtomicU64::new(0);
pub static FSYNC_OPS: AtomicU64 = AtomicU64::new(0);
pub static LOOKUP_OPS: AtomicU64 = AtomicU64::new(0);
pub static CREATE_OPS: AtomicU64 = AtomicU64::new(0);
pub static DELETE_OPS: AtomicU64 = AtomicU64::new(0);
pub static OPEN_HANDLES: AtomicU64 = AtomicU64::new(0);
pub static ARC_HITS: AtomicU64 = AtomicU64::new(0);
pub static ARC_MISSES: AtomicU64 = AtomicU64::new(0);
pub static ARC_EVICTIONS: AtomicU64 = AtomicU64::new(0);
pub static L2ARC_HITS: AtomicU64 = AtomicU64::new(0);
pub static L2ARC_MISSES: AtomicU64 = AtomicU64::new(0);
pub static CURRENT_TXG: AtomicU64 = AtomicU64::new(1);
pub static TXG_SYNCS: AtomicU64 = AtomicU64::new(0);
pub static TXG_SYNC_TIME_US: AtomicU64 = AtomicU64::new(0);
pub static DEDUP_HITS: AtomicU64 = AtomicU64::new(0);
pub static DEDUP_MISSES: AtomicU64 = AtomicU64::new(0);
pub static DEDUP_ENTRIES: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone)]
pub struct LcpfsMetrics {
pub used_bytes: MetricId,
pub quota_bytes: MetricId,
pub available_bytes: MetricId,
pub file_count: MetricId,
pub dir_count: MetricId,
pub read_ops: MetricId,
pub write_ops: MetricId,
pub read_bytes: MetricId,
pub write_bytes: MetricId,
pub fsync_ops: MetricId,
pub lookup_ops: MetricId,
pub create_ops: MetricId,
pub delete_ops: MetricId,
pub open_handles: MetricId,
pub arc_hits: MetricId,
pub arc_misses: MetricId,
pub arc_hit_ratio: MetricId,
pub arc_size_bytes: MetricId,
pub arc_evictions: MetricId,
pub l2arc_hits: MetricId,
pub l2arc_misses: MetricId,
pub current_txg: MetricId,
pub txg_syncs: MetricId,
pub txg_sync_latency: MetricId,
pub dedup_ratio: MetricId,
pub dedup_hits: MetricId,
pub dedup_misses: MetricId,
pub dedup_entries: MetricId,
pub io_errors: MetricId,
pub checksum_errors: MetricId,
}
lazy_static! {
static ref LCPFS_METRICS: Mutex<Option<LcpfsMetrics>> = Mutex::new(None);
}
pub fn init_lcpfs_metrics() -> Result<(), TelemetryError> {
let mut metrics_guard = LCPFS_METRICS.lock();
if metrics_guard.is_some() {
return Ok(()); }
let used_bytes = Telemetry::register_gauge(
"lcpfs_storage_used_bytes",
"Used storage space in bytes",
&["pool"],
)?;
let quota_bytes = Telemetry::register_gauge(
"lcpfs_storage_quota_bytes",
"Storage quota in bytes (0 = unlimited)",
&["pool"],
)?;
let available_bytes = Telemetry::register_gauge(
"lcpfs_storage_available_bytes",
"Available storage space in bytes",
&["pool"],
)?;
let file_count = Telemetry::register_gauge(
"lcpfs_objects_files_total",
"Total number of files",
&["pool"],
)?;
let dir_count = Telemetry::register_gauge(
"lcpfs_objects_directories_total",
"Total number of directories",
&["pool"],
)?;
let read_ops = Telemetry::register_counter(
"lcpfs_operations_read_total",
"Total read operations",
&["pool"],
)?;
let write_ops = Telemetry::register_counter(
"lcpfs_operations_write_total",
"Total write operations",
&["pool"],
)?;
let read_bytes =
Telemetry::register_counter("lcpfs_io_read_bytes_total", "Total bytes read", &["pool"])?;
let write_bytes = Telemetry::register_counter(
"lcpfs_io_write_bytes_total",
"Total bytes written",
&["pool"],
)?;
let fsync_ops = Telemetry::register_counter(
"lcpfs_operations_fsync_total",
"Total fsync operations",
&["pool"],
)?;
let lookup_ops = Telemetry::register_counter(
"lcpfs_operations_lookup_total",
"Total lookup operations",
&["pool"],
)?;
let create_ops = Telemetry::register_counter(
"lcpfs_operations_create_total",
"Total create operations",
&["pool"],
)?;
let delete_ops = Telemetry::register_counter(
"lcpfs_operations_delete_total",
"Total delete operations",
&["pool"],
)?;
let open_handles = Telemetry::register_gauge(
"lcpfs_handles_open",
"Currently open file handles",
&["pool"],
)?;
let arc_hits =
Telemetry::register_counter("lcpfs_arc_hits_total", "ARC cache hits", &["pool"])?;
let arc_misses =
Telemetry::register_counter("lcpfs_arc_misses_total", "ARC cache misses", &["pool"])?;
let arc_hit_ratio = Telemetry::register_gauge(
"lcpfs_arc_hit_ratio",
"ARC cache hit ratio (0.0-1.0)",
&["pool"],
)?;
let arc_size_bytes =
Telemetry::register_gauge("lcpfs_arc_size_bytes", "ARC cache size in bytes", &["pool"])?;
let arc_evictions = Telemetry::register_counter(
"lcpfs_arc_evictions_total",
"ARC cache evictions",
&["pool"],
)?;
let l2arc_hits =
Telemetry::register_counter("lcpfs_l2arc_hits_total", "L2ARC cache hits", &["pool"])?;
let l2arc_misses =
Telemetry::register_counter("lcpfs_l2arc_misses_total", "L2ARC cache misses", &["pool"])?;
let current_txg = Telemetry::register_gauge(
"lcpfs_txg_current",
"Current transaction group number",
&["pool"],
)?;
let txg_syncs = Telemetry::register_counter(
"lcpfs_txg_syncs_total",
"Total TXG sync operations",
&["pool"],
)?;
let txg_sync_latency = Telemetry::register_histogram(
"lcpfs_txg_sync_latency_seconds",
"TXG sync latency in seconds",
&["pool"],
&[
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
],
)?;
let dedup_ratio = Telemetry::register_gauge(
"lcpfs_dedup_ratio",
"Deduplication ratio (1.0 = no dedup, >1.0 = space savings)",
&["pool"],
)?;
let dedup_hits = Telemetry::register_counter(
"lcpfs_dedup_hits_total",
"Dedup table hits (duplicate blocks found)",
&["pool"],
)?;
let dedup_misses = Telemetry::register_counter(
"lcpfs_dedup_misses_total",
"Dedup table misses (new unique blocks)",
&["pool"],
)?;
let dedup_entries =
Telemetry::register_gauge("lcpfs_dedup_entries", "Dedup table entry count", &["pool"])?;
let io_errors = Telemetry::register_counter(
"lcpfs_errors_io_total",
"Total I/O errors",
&["pool", "type"],
)?;
let checksum_errors = Telemetry::register_counter(
"lcpfs_errors_checksum_total",
"Total checksum errors",
&["pool"],
)?;
*metrics_guard = Some(LcpfsMetrics {
used_bytes,
quota_bytes,
available_bytes,
file_count,
dir_count,
read_ops,
write_ops,
read_bytes,
write_bytes,
fsync_ops,
lookup_ops,
create_ops,
delete_ops,
open_handles,
arc_hits,
arc_misses,
arc_hit_ratio,
arc_size_bytes,
arc_evictions,
l2arc_hits,
l2arc_misses,
current_txg,
txg_syncs,
txg_sync_latency,
dedup_ratio,
dedup_hits,
dedup_misses,
dedup_entries,
io_errors,
checksum_errors,
});
Ok(())
}
pub fn collect_all_metrics(pool: &str) {
let metrics = match LCPFS_METRICS.lock().as_ref() {
Some(m) => m.clone(),
None => return, };
let labels = &[pool];
{
let zpl = ZPL.lock();
Telemetry::gauge_set(metrics.used_bytes, labels, zpl.get_used_bytes() as f64);
Telemetry::gauge_set(metrics.quota_bytes, labels, zpl.get_quota() as f64);
let available = if zpl.get_quota() > 0 {
zpl.get_quota().saturating_sub(zpl.get_used_bytes())
} else {
u64::MAX
};
Telemetry::gauge_set(metrics.available_bytes, labels, available as f64);
}
Telemetry::counter_add(
metrics.read_ops,
labels,
READ_OPS.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.write_ops,
labels,
WRITE_OPS.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.read_bytes,
labels,
BYTES_READ.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.write_bytes,
labels,
BYTES_WRITTEN.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.fsync_ops,
labels,
FSYNC_OPS.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.lookup_ops,
labels,
LOOKUP_OPS.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.create_ops,
labels,
CREATE_OPS.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.delete_ops,
labels,
DELETE_OPS.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::gauge_set(
metrics.open_handles,
labels,
OPEN_HANDLES.load(Ordering::Relaxed) as f64,
);
let arc_hits_val = ARC_HITS.swap(0, Ordering::Relaxed);
let arc_misses_val = ARC_MISSES.swap(0, Ordering::Relaxed);
Telemetry::counter_add(metrics.arc_hits, labels, arc_hits_val as f64);
Telemetry::counter_add(metrics.arc_misses, labels, arc_misses_val as f64);
Telemetry::counter_add(
metrics.arc_evictions,
labels,
ARC_EVICTIONS.swap(0, Ordering::Relaxed) as f64,
);
let total_accesses = arc_hits_val + arc_misses_val;
if total_accesses > 0 {
let hit_ratio = arc_hits_val as f64 / total_accesses as f64;
Telemetry::gauge_set(metrics.arc_hit_ratio, labels, hit_ratio);
}
Telemetry::counter_add(
metrics.l2arc_hits,
labels,
L2ARC_HITS.swap(0, Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.l2arc_misses,
labels,
L2ARC_MISSES.swap(0, Ordering::Relaxed) as f64,
);
if let Ok(arc_stats) = get_arc_stats() {
Telemetry::gauge_set(metrics.arc_size_bytes, labels, arc_stats.size_bytes as f64);
}
Telemetry::gauge_set(
metrics.current_txg,
labels,
CURRENT_TXG.load(Ordering::Relaxed) as f64,
);
Telemetry::counter_add(
metrics.txg_syncs,
labels,
TXG_SYNCS.swap(0, Ordering::Relaxed) as f64,
);
let dedup_hits_val = DEDUP_HITS.swap(0, Ordering::Relaxed);
let dedup_misses_val = DEDUP_MISSES.swap(0, Ordering::Relaxed);
Telemetry::counter_add(metrics.dedup_hits, labels, dedup_hits_val as f64);
Telemetry::counter_add(metrics.dedup_misses, labels, dedup_misses_val as f64);
Telemetry::gauge_set(
metrics.dedup_entries,
labels,
DEDUP_ENTRIES.load(Ordering::Relaxed) as f64,
);
if dedup_misses_val > 0 {
let dedup_ratio = (dedup_hits_val + dedup_misses_val) as f64 / dedup_misses_val as f64;
Telemetry::gauge_set(metrics.dedup_ratio, labels, dedup_ratio);
}
}
#[inline]
pub fn record_read(bytes: u64) {
READ_OPS.fetch_add(1, Ordering::Relaxed);
BYTES_READ.fetch_add(bytes, Ordering::Relaxed);
}
#[inline]
pub fn record_write(bytes: u64) {
WRITE_OPS.fetch_add(1, Ordering::Relaxed);
BYTES_WRITTEN.fetch_add(bytes, Ordering::Relaxed);
}
#[inline]
pub fn record_fsync() {
FSYNC_OPS.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_lookup() {
LOOKUP_OPS.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_create() {
CREATE_OPS.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_delete() {
DELETE_OPS.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_handle_open() {
OPEN_HANDLES.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_handle_close() {
OPEN_HANDLES.fetch_sub(1, Ordering::Relaxed);
}
#[inline]
pub fn record_arc_hit() {
ARC_HITS.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_arc_miss() {
ARC_MISSES.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_arc_eviction() {
ARC_EVICTIONS.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_l2arc_hit() {
L2ARC_HITS.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_l2arc_miss() {
L2ARC_MISSES.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_txg_sync(latency_us: u64) {
TXG_SYNCS.fetch_add(1, Ordering::Relaxed);
TXG_SYNC_TIME_US.fetch_add(latency_us, Ordering::Relaxed);
CURRENT_TXG.fetch_add(1, Ordering::Relaxed);
if let Some(metrics) = LCPFS_METRICS.lock().as_ref() {
let latency_secs = latency_us as f64 / 1_000_000.0;
Telemetry::histogram_observe(metrics.txg_sync_latency, &["default"], latency_secs);
}
}
#[inline]
pub fn record_dedup_hit() {
DEDUP_HITS.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_dedup_miss() {
DEDUP_MISSES.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn update_dedup_entries(count: u64) {
DEDUP_ENTRIES.store(count, Ordering::Relaxed);
}
pub fn record_io_error(pool: &str, error_type: &str) {
if let Some(metrics) = LCPFS_METRICS.lock().as_ref() {
Telemetry::counter_inc(metrics.io_errors, &[pool, error_type]);
}
}
pub fn record_checksum_error(pool: &str) {
if let Some(metrics) = LCPFS_METRICS.lock().as_ref() {
Telemetry::counter_inc(metrics.checksum_errors, &[pool]);
}
}
#[derive(Debug, Clone, Default)]
pub struct ArcStats {
pub size_bytes: u64,
pub target_bytes: u64,
pub min_bytes: u64,
pub max_bytes: u64,
}
fn get_arc_stats() -> Result<ArcStats, &'static str> {
use crate::cache::arc::ARC;
let arc = ARC.lock();
Ok(ArcStats {
size_bytes: arc.current_size as u64,
target_bytes: arc.max_bytes as u64 / 2, min_bytes: arc.max_bytes as u64 / 8, max_bytes: arc.max_bytes as u64,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_operations() {
READ_OPS.store(0, Ordering::Relaxed);
WRITE_OPS.store(0, Ordering::Relaxed);
BYTES_READ.store(0, Ordering::Relaxed);
BYTES_WRITTEN.store(0, Ordering::Relaxed);
record_read(1024);
record_read(2048);
record_write(512);
assert_eq!(READ_OPS.load(Ordering::Relaxed), 2);
assert_eq!(WRITE_OPS.load(Ordering::Relaxed), 1);
assert_eq!(BYTES_READ.load(Ordering::Relaxed), 3072);
assert_eq!(BYTES_WRITTEN.load(Ordering::Relaxed), 512);
}
#[test]
fn test_arc_counters() {
ARC_HITS.store(0, Ordering::Relaxed);
ARC_MISSES.store(0, Ordering::Relaxed);
record_arc_hit();
record_arc_hit();
record_arc_miss();
assert_eq!(ARC_HITS.load(Ordering::Relaxed), 2);
assert_eq!(ARC_MISSES.load(Ordering::Relaxed), 1);
}
#[test]
fn test_handle_tracking() {
OPEN_HANDLES.store(0, Ordering::Relaxed);
record_handle_open();
record_handle_open();
record_handle_close();
assert_eq!(OPEN_HANDLES.load(Ordering::Relaxed), 1);
}
#[test]
fn test_txg_sync() {
TXG_SYNCS.store(0, Ordering::Relaxed);
CURRENT_TXG.store(100, Ordering::Relaxed);
record_txg_sync(5000);
assert_eq!(TXG_SYNCS.load(Ordering::Relaxed), 1);
assert_eq!(CURRENT_TXG.load(Ordering::Relaxed), 101);
}
#[test]
fn test_dedup_counters() {
DEDUP_HITS.store(0, Ordering::Relaxed);
DEDUP_MISSES.store(0, Ordering::Relaxed);
DEDUP_ENTRIES.store(0, Ordering::Relaxed);
record_dedup_hit();
record_dedup_hit();
record_dedup_miss();
update_dedup_entries(100);
assert_eq!(DEDUP_HITS.load(Ordering::Relaxed), 2);
assert_eq!(DEDUP_MISSES.load(Ordering::Relaxed), 1);
assert_eq!(DEDUP_ENTRIES.load(Ordering::Relaxed), 100);
}
}