Skip to main content

fathomdb_engine/
telemetry.rs

1//! Resource telemetry: always-on counters and `SQLite` cache statistics.
2//!
3//! See `dev/design-note-telemetry-and-profiling.md` for the full design.
4
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use rusqlite::Connection;
8
9/// Controls how much telemetry the engine collects.
10///
11/// Levels are additive — each level includes everything from below it.
12/// Level 0 counters are always maintained regardless of this setting;
13/// the level controls whether higher-cost collection is enabled.
14#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
15pub enum TelemetryLevel {
16    /// Level 0: cumulative counters only. Always active.
17    #[default]
18    Counters,
19    /// Level 1: per-statement profiling (`trace_v2` + `stmt_status`).
20    Statements,
21    /// Level 2: deep profiling (scan status + process snapshots).
22    /// Requires high-telemetry build for full scan-status data.
23    Profiling,
24}
25
26/// Always-on cumulative counters, shared across all engine components.
27///
28/// All increments use [`Ordering::Relaxed`] — these are statistical counters,
29/// not synchronization primitives.
30#[derive(Debug, Default)]
31#[allow(clippy::struct_field_names)]
32pub struct TelemetryCounters {
33    queries_total: AtomicU64,
34    writes_total: AtomicU64,
35    write_rows_total: AtomicU64,
36    errors_total: AtomicU64,
37    admin_ops_total: AtomicU64,
38}
39
40impl TelemetryCounters {
41    /// Increment the query counter by one.
42    pub fn increment_queries(&self) {
43        self.queries_total.fetch_add(1, Ordering::Relaxed);
44    }
45
46    /// Increment the write counter by one and add `row_count` to the row total.
47    pub fn increment_writes(&self, row_count: u64) {
48        self.writes_total.fetch_add(1, Ordering::Relaxed);
49        self.write_rows_total
50            .fetch_add(row_count, Ordering::Relaxed);
51    }
52
53    /// Increment the error counter by one.
54    pub fn increment_errors(&self) {
55        self.errors_total.fetch_add(1, Ordering::Relaxed);
56    }
57
58    /// Increment the admin operations counter by one.
59    pub fn increment_admin_ops(&self) {
60        self.admin_ops_total.fetch_add(1, Ordering::Relaxed);
61    }
62
63    /// Read all counters into a [`TelemetrySnapshot`].
64    ///
65    /// The `sqlite_cache` field is left at defaults — callers that need
66    /// cache status should populate it separately via
67    /// [`read_db_cache_status`].
68    #[must_use]
69    pub fn snapshot(&self) -> TelemetrySnapshot {
70        TelemetrySnapshot {
71            queries_total: self.queries_total.load(Ordering::Relaxed),
72            writes_total: self.writes_total.load(Ordering::Relaxed),
73            write_rows_total: self.write_rows_total.load(Ordering::Relaxed),
74            errors_total: self.errors_total.load(Ordering::Relaxed),
75            admin_ops_total: self.admin_ops_total.load(Ordering::Relaxed),
76            sqlite_cache: SqliteCacheStatus::default(),
77        }
78    }
79}
80
81/// Cumulative `SQLite` page-cache counters for a single connection.
82///
83/// Uses `i64` to allow safe summing across pool connections without overflow.
84#[derive(Clone, Debug, Default, PartialEq, Eq)]
85pub struct SqliteCacheStatus {
86    /// Page cache hits.
87    pub cache_hits: i64,
88    /// Page cache misses.
89    pub cache_misses: i64,
90    /// Pages written to cache.
91    pub cache_writes: i64,
92    /// Cache pages spilled to disk.
93    pub cache_spills: i64,
94}
95
96impl SqliteCacheStatus {
97    /// Add another status into this one (for aggregating across connections).
98    ///
99    /// Uses saturating arithmetic to avoid panics on overflow in debug builds.
100    pub fn add(&mut self, other: &Self) {
101        self.cache_hits = self.cache_hits.saturating_add(other.cache_hits);
102        self.cache_misses = self.cache_misses.saturating_add(other.cache_misses);
103        self.cache_writes = self.cache_writes.saturating_add(other.cache_writes);
104        self.cache_spills = self.cache_spills.saturating_add(other.cache_spills);
105    }
106}
107
108/// Read cumulative page-cache counters from a `SQLite` connection.
109///
110/// Calls `sqlite3_db_status()` for `CACHE_HIT`, `CACHE_MISS`, `CACHE_WRITE`,
111/// and `CACHE_SPILL` with `resetFlag=0` (non-destructive read).
112///
113/// If any `sqlite3_db_status` call returns an error, that counter is left
114/// at zero rather than propagating garbage. This should not happen with
115/// valid connection handles and known status codes, but a database engine
116/// must not return misleading data.
117///
118/// # Safety contract
119///
120/// The function is safe because `Connection::handle()` returns a valid
121/// `sqlite3*` for the connection's lifetime, and `sqlite3_db_status` is
122/// read-only and thread-safe for the owning connection.
123pub fn read_db_cache_status(conn: &Connection) -> SqliteCacheStatus {
124    let mut status = SqliteCacheStatus::default();
125
126    // Helper: read one db_status code, returning the current value.
127    // Returns 0 if sqlite3_db_status reports an error.
128    let read_one = |op: i32| -> i64 {
129        let mut current: i32 = 0;
130        let mut highwater: i32 = 0;
131        // Safety: conn.handle() is valid for the connection's lifetime.
132        // sqlite3_db_status with resetFlag=0 is a non-destructive read.
133        let rc = unsafe {
134            rusqlite::ffi::sqlite3_db_status(
135                conn.handle(),
136                op,
137                &raw mut current,
138                &raw mut highwater,
139                0, // resetFlag
140            )
141        };
142        if rc == rusqlite::ffi::SQLITE_OK {
143            i64::from(current)
144        } else {
145            0
146        }
147    };
148
149    status.cache_hits = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_HIT);
150    status.cache_misses = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_MISS);
151    status.cache_writes = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_WRITE);
152    status.cache_spills = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_SPILL);
153
154    status
155}
156
157/// Point-in-time snapshot of all telemetry counters.
158#[derive(Clone, Debug, Default, PartialEq, Eq)]
159pub struct TelemetrySnapshot {
160    /// Total read operations executed.
161    pub queries_total: u64,
162    /// Total write operations committed.
163    pub writes_total: u64,
164    /// Total rows written (nodes + edges + chunks).
165    pub write_rows_total: u64,
166    /// Total operation errors.
167    pub errors_total: u64,
168    /// Total admin operations.
169    pub admin_ops_total: u64,
170    /// Aggregated `SQLite` page-cache counters (summed across pool connections).
171    pub sqlite_cache: SqliteCacheStatus,
172}
173
174#[cfg(test)]
175#[allow(clippy::expect_used)]
176mod tests {
177    use rusqlite::Connection;
178
179    use super::{SqliteCacheStatus, TelemetryCounters, TelemetryLevel, read_db_cache_status};
180
181    #[test]
182    fn telemetry_level_default_is_counters() {
183        assert_eq!(TelemetryLevel::default(), TelemetryLevel::Counters);
184    }
185
186    #[test]
187    fn counter_defaults_to_zero() {
188        let counters = TelemetryCounters::default();
189        let snap = counters.snapshot();
190        assert_eq!(snap.queries_total, 0);
191        assert_eq!(snap.writes_total, 0);
192        assert_eq!(snap.write_rows_total, 0);
193        assert_eq!(snap.errors_total, 0);
194        assert_eq!(snap.admin_ops_total, 0);
195    }
196
197    #[test]
198    fn counter_increment_and_snapshot() {
199        let counters = TelemetryCounters::default();
200
201        counters.increment_queries();
202        counters.increment_queries();
203        counters.increment_writes(5);
204        counters.increment_writes(3);
205        counters.increment_errors();
206        counters.increment_admin_ops();
207        counters.increment_admin_ops();
208        counters.increment_admin_ops();
209
210        let snap = counters.snapshot();
211        assert_eq!(snap.queries_total, 2);
212        assert_eq!(snap.writes_total, 2);
213        assert_eq!(snap.write_rows_total, 8);
214        assert_eq!(snap.errors_total, 1);
215        assert_eq!(snap.admin_ops_total, 3);
216    }
217
218    #[test]
219    fn read_db_cache_status_on_fresh_connection() {
220        let conn = Connection::open_in_memory().expect("open in-memory db");
221        let status = read_db_cache_status(&conn);
222        // Fresh connection should have valid (non-negative) values.
223        assert!(status.cache_hits >= 0);
224        assert!(status.cache_misses >= 0);
225        assert!(status.cache_writes >= 0);
226        assert!(status.cache_spills >= 0);
227    }
228
229    #[test]
230    fn cache_status_reflects_queries() {
231        let conn = Connection::open_in_memory().expect("open in-memory db");
232        conn.execute_batch(
233            "CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT);
234             INSERT INTO t VALUES (1, 'a');
235             INSERT INTO t VALUES (2, 'b');
236             INSERT INTO t VALUES (3, 'c');",
237        )
238        .expect("setup");
239
240        // Run several queries to exercise the cache.
241        for _ in 0..10 {
242            let mut stmt = conn.prepare("SELECT * FROM t").expect("prepare");
243            let _rows: Vec<i64> = stmt
244                .query_map([], |row| row.get(0))
245                .expect("query")
246                .map(|r| r.expect("row"))
247                .collect();
248        }
249
250        let status = read_db_cache_status(&conn);
251        // After queries, we should see cache activity.
252        assert!(
253            status.cache_hits + status.cache_misses > 0,
254            "expected cache activity after queries, got hits={} misses={}",
255            status.cache_hits,
256            status.cache_misses,
257        );
258    }
259
260    #[test]
261    fn cache_status_add_sums_correctly() {
262        let a = SqliteCacheStatus {
263            cache_hits: 10,
264            cache_misses: 2,
265            cache_writes: 5,
266            cache_spills: 1,
267        };
268        let b = SqliteCacheStatus {
269            cache_hits: 3,
270            cache_misses: 7,
271            cache_writes: 0,
272            cache_spills: 4,
273        };
274        let mut total = SqliteCacheStatus::default();
275        total.add(&a);
276        total.add(&b);
277        assert_eq!(total.cache_hits, 13);
278        assert_eq!(total.cache_misses, 9);
279        assert_eq!(total.cache_writes, 5);
280        assert_eq!(total.cache_spills, 5);
281    }
282}