use std::sync::atomic::{AtomicU64, Ordering};
use rusqlite::Connection;
use serde::Serialize;
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum TelemetryLevel {
#[default]
Counters,
Statements,
Profiling,
}
#[derive(Debug, Default)]
#[allow(clippy::struct_field_names)]
pub struct TelemetryCounters {
queries_total: AtomicU64,
writes_total: AtomicU64,
write_rows_total: AtomicU64,
errors_total: AtomicU64,
admin_ops_total: AtomicU64,
}
impl TelemetryCounters {
pub fn increment_queries(&self) {
self.queries_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_writes(&self, row_count: u64) {
self.writes_total.fetch_add(1, Ordering::Relaxed);
self.write_rows_total
.fetch_add(row_count, Ordering::Relaxed);
}
pub fn increment_errors(&self) {
self.errors_total.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_admin_ops(&self) {
self.admin_ops_total.fetch_add(1, Ordering::Relaxed);
}
#[must_use]
pub fn snapshot(&self) -> TelemetrySnapshot {
TelemetrySnapshot {
queries_total: self.queries_total.load(Ordering::Relaxed),
writes_total: self.writes_total.load(Ordering::Relaxed),
write_rows_total: self.write_rows_total.load(Ordering::Relaxed),
errors_total: self.errors_total.load(Ordering::Relaxed),
admin_ops_total: self.admin_ops_total.load(Ordering::Relaxed),
sqlite_cache: SqliteCacheStatus::default(),
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct SqliteCacheStatus {
pub cache_hits: i64,
pub cache_misses: i64,
pub cache_writes: i64,
pub cache_spills: i64,
}
impl SqliteCacheStatus {
pub fn add(&mut self, other: &Self) {
self.cache_hits = self.cache_hits.saturating_add(other.cache_hits);
self.cache_misses = self.cache_misses.saturating_add(other.cache_misses);
self.cache_writes = self.cache_writes.saturating_add(other.cache_writes);
self.cache_spills = self.cache_spills.saturating_add(other.cache_spills);
}
}
pub fn read_db_cache_status(conn: &Connection) -> SqliteCacheStatus {
let mut status = SqliteCacheStatus::default();
let read_one = |op: i32| -> i64 {
let mut current: i32 = 0;
let mut highwater: i32 = 0;
let rc = unsafe {
rusqlite::ffi::sqlite3_db_status(
conn.handle(),
op,
&raw mut current,
&raw mut highwater,
0, )
};
if rc == rusqlite::ffi::SQLITE_OK {
i64::from(current)
} else {
0
}
};
status.cache_hits = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_HIT);
status.cache_misses = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_MISS);
status.cache_writes = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_WRITE);
status.cache_spills = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_SPILL);
status
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
pub struct TelemetrySnapshot {
pub queries_total: u64,
pub writes_total: u64,
pub write_rows_total: u64,
pub errors_total: u64,
pub admin_ops_total: u64,
#[serde(flatten)]
pub sqlite_cache: SqliteCacheStatus,
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use rusqlite::Connection;
use super::{
SqliteCacheStatus, TelemetryCounters, TelemetryLevel, TelemetrySnapshot,
read_db_cache_status,
};
#[test]
fn telemetry_snapshot_serializes_to_json() {
let snap = TelemetrySnapshot {
queries_total: 5,
..Default::default()
};
let json = serde_json::to_value(&snap).expect("serializes");
assert_eq!(json["queries_total"], 5);
assert_eq!(json["cache_hits"], 0);
assert!(
json.get("sqlite_cache").is_none(),
"sqlite_cache must be flattened"
);
}
#[test]
fn telemetry_level_default_is_counters() {
assert_eq!(TelemetryLevel::default(), TelemetryLevel::Counters);
}
#[test]
fn counter_defaults_to_zero() {
let counters = TelemetryCounters::default();
let snap = counters.snapshot();
assert_eq!(snap.queries_total, 0);
assert_eq!(snap.writes_total, 0);
assert_eq!(snap.write_rows_total, 0);
assert_eq!(snap.errors_total, 0);
assert_eq!(snap.admin_ops_total, 0);
}
#[test]
fn counter_increment_and_snapshot() {
let counters = TelemetryCounters::default();
counters.increment_queries();
counters.increment_queries();
counters.increment_writes(5);
counters.increment_writes(3);
counters.increment_errors();
counters.increment_admin_ops();
counters.increment_admin_ops();
counters.increment_admin_ops();
let snap = counters.snapshot();
assert_eq!(snap.queries_total, 2);
assert_eq!(snap.writes_total, 2);
assert_eq!(snap.write_rows_total, 8);
assert_eq!(snap.errors_total, 1);
assert_eq!(snap.admin_ops_total, 3);
}
#[test]
fn read_db_cache_status_on_fresh_connection() {
let conn = Connection::open_in_memory().expect("open in-memory db");
let status = read_db_cache_status(&conn);
assert!(status.cache_hits >= 0);
assert!(status.cache_misses >= 0);
assert!(status.cache_writes >= 0);
assert!(status.cache_spills >= 0);
}
#[test]
fn cache_status_reflects_queries() {
let conn = Connection::open_in_memory().expect("open in-memory db");
conn.execute_batch(
"CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT);
INSERT INTO t VALUES (1, 'a');
INSERT INTO t VALUES (2, 'b');
INSERT INTO t VALUES (3, 'c');",
)
.expect("setup");
for _ in 0..10 {
let mut stmt = conn.prepare("SELECT * FROM t").expect("prepare");
let _rows: Vec<i64> = stmt
.query_map([], |row| row.get(0))
.expect("query")
.map(|r| r.expect("row"))
.collect();
}
let status = read_db_cache_status(&conn);
assert!(
status.cache_hits + status.cache_misses > 0,
"expected cache activity after queries, got hits={} misses={}",
status.cache_hits,
status.cache_misses,
);
}
#[test]
fn cache_status_add_sums_correctly() {
let a = SqliteCacheStatus {
cache_hits: 10,
cache_misses: 2,
cache_writes: 5,
cache_spills: 1,
};
let b = SqliteCacheStatus {
cache_hits: 3,
cache_misses: 7,
cache_writes: 0,
cache_spills: 4,
};
let mut total = SqliteCacheStatus::default();
total.add(&a);
total.add(&b);
assert_eq!(total.cache_hits, 13);
assert_eq!(total.cache_misses, 9);
assert_eq!(total.cache_writes, 5);
assert_eq!(total.cache_spills, 5);
}
}