Skip to main content

fathomdb_engine/
telemetry.rs

1//! Resource telemetry: always-on counters and `SQLite` cache statistics.
2//!
3//! See `dev/design-note-telemetry-and-profiling.md` for the full design.
4
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use rusqlite::Connection;
8use serde::Serialize;
9
10/// Controls how much telemetry the engine collects.
11///
12/// Levels are additive — each level includes everything from below it.
13/// Level 0 counters are always maintained regardless of this setting;
14/// the level controls whether higher-cost collection is enabled.
15#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
16pub enum TelemetryLevel {
17    /// Level 0: cumulative counters only. Always active.
18    #[default]
19    Counters,
20    /// Level 1: per-statement profiling (`trace_v2` + `stmt_status`).
21    Statements,
22    /// Level 2: deep profiling (scan status + process snapshots).
23    /// Requires high-telemetry build for full scan-status data.
24    Profiling,
25}
26
27/// Always-on cumulative counters, shared across all engine components.
28///
29/// All increments use [`Ordering::Relaxed`] — these are statistical counters,
30/// not synchronization primitives.
31#[derive(Debug, Default)]
32#[allow(clippy::struct_field_names)]
33pub struct TelemetryCounters {
34    queries_total: AtomicU64,
35    writes_total: AtomicU64,
36    write_rows_total: AtomicU64,
37    errors_total: AtomicU64,
38    admin_ops_total: AtomicU64,
39}
40
41impl TelemetryCounters {
42    /// Increment the query counter by one.
43    pub fn increment_queries(&self) {
44        self.queries_total.fetch_add(1, Ordering::Relaxed);
45    }
46
47    /// Increment the write counter by one and add `row_count` to the row total.
48    pub fn increment_writes(&self, row_count: u64) {
49        self.writes_total.fetch_add(1, Ordering::Relaxed);
50        self.write_rows_total
51            .fetch_add(row_count, Ordering::Relaxed);
52    }
53
54    /// Increment the error counter by one.
55    pub fn increment_errors(&self) {
56        self.errors_total.fetch_add(1, Ordering::Relaxed);
57    }
58
59    /// Increment the admin operations counter by one.
60    pub fn increment_admin_ops(&self) {
61        self.admin_ops_total.fetch_add(1, Ordering::Relaxed);
62    }
63
64    /// Read all counters into a [`TelemetrySnapshot`].
65    ///
66    /// The `sqlite_cache` field is left at defaults — callers that need
67    /// cache status should populate it separately via
68    /// [`read_db_cache_status`].
69    #[must_use]
70    pub fn snapshot(&self) -> TelemetrySnapshot {
71        TelemetrySnapshot {
72            queries_total: self.queries_total.load(Ordering::Relaxed),
73            writes_total: self.writes_total.load(Ordering::Relaxed),
74            write_rows_total: self.write_rows_total.load(Ordering::Relaxed),
75            errors_total: self.errors_total.load(Ordering::Relaxed),
76            admin_ops_total: self.admin_ops_total.load(Ordering::Relaxed),
77            sqlite_cache: SqliteCacheStatus::default(),
78        }
79    }
80}
81
82/// Cumulative `SQLite` page-cache counters for a single connection.
83///
84/// Uses `i64` to allow safe summing across pool connections without overflow.
85#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
86pub struct SqliteCacheStatus {
87    /// Page cache hits.
88    pub cache_hits: i64,
89    /// Page cache misses.
90    pub cache_misses: i64,
91    /// Pages written to cache.
92    pub cache_writes: i64,
93    /// Cache pages spilled to disk.
94    pub cache_spills: i64,
95}
96
97impl SqliteCacheStatus {
98    /// Add another status into this one (for aggregating across connections).
99    ///
100    /// Uses saturating arithmetic to avoid panics on overflow in debug builds.
101    pub fn add(&mut self, other: &Self) {
102        self.cache_hits = self.cache_hits.saturating_add(other.cache_hits);
103        self.cache_misses = self.cache_misses.saturating_add(other.cache_misses);
104        self.cache_writes = self.cache_writes.saturating_add(other.cache_writes);
105        self.cache_spills = self.cache_spills.saturating_add(other.cache_spills);
106    }
107}
108
109/// Read cumulative page-cache counters from a `SQLite` connection.
110///
111/// Calls `sqlite3_db_status()` for `CACHE_HIT`, `CACHE_MISS`, `CACHE_WRITE`,
112/// and `CACHE_SPILL` with `resetFlag=0` (non-destructive read).
113///
114/// If any `sqlite3_db_status` call returns an error, that counter is left
115/// at zero rather than propagating garbage. This should not happen with
116/// valid connection handles and known status codes, but a database engine
117/// must not return misleading data.
118///
119/// # Safety contract
120///
121/// The function is safe because `Connection::handle()` returns a valid
122/// `sqlite3*` for the connection's lifetime, and `sqlite3_db_status` is
123/// read-only and thread-safe for the owning connection.
124pub fn read_db_cache_status(conn: &Connection) -> SqliteCacheStatus {
125    let mut status = SqliteCacheStatus::default();
126
127    // Helper: read one db_status code, returning the current value.
128    // Returns 0 if sqlite3_db_status reports an error.
129    let read_one = |op: i32| -> i64 {
130        let mut current: i32 = 0;
131        let mut highwater: i32 = 0;
132        // Safety: conn.handle() is valid for the connection's lifetime.
133        // sqlite3_db_status with resetFlag=0 is a non-destructive read.
134        let rc = unsafe {
135            rusqlite::ffi::sqlite3_db_status(
136                conn.handle(),
137                op,
138                &raw mut current,
139                &raw mut highwater,
140                0, // resetFlag
141            )
142        };
143        if rc == rusqlite::ffi::SQLITE_OK {
144            i64::from(current)
145        } else {
146            0
147        }
148    };
149
150    status.cache_hits = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_HIT);
151    status.cache_misses = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_MISS);
152    status.cache_writes = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_WRITE);
153    status.cache_spills = read_one(rusqlite::ffi::SQLITE_DBSTATUS_CACHE_SPILL);
154
155    status
156}
157
158/// Point-in-time snapshot of all telemetry counters.
159#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
160pub struct TelemetrySnapshot {
161    /// Total read operations executed.
162    pub queries_total: u64,
163    /// Total write operations committed.
164    pub writes_total: u64,
165    /// Total rows written (nodes + edges + chunks).
166    pub write_rows_total: u64,
167    /// Total operation errors.
168    pub errors_total: u64,
169    /// Total admin operations.
170    pub admin_ops_total: u64,
171    /// Aggregated `SQLite` page-cache counters (summed across pool connections).
172    #[serde(flatten)]
173    pub sqlite_cache: SqliteCacheStatus,
174}
175
176#[cfg(test)]
177#[allow(clippy::expect_used)]
178mod tests {
179    use rusqlite::Connection;
180
181    use super::{
182        SqliteCacheStatus, TelemetryCounters, TelemetryLevel, TelemetrySnapshot,
183        read_db_cache_status,
184    };
185
186    #[test]
187    fn telemetry_snapshot_serializes_to_json() {
188        let snap = TelemetrySnapshot {
189            queries_total: 5,
190            ..Default::default()
191        };
192        let json = serde_json::to_value(&snap).expect("serializes");
193        assert_eq!(json["queries_total"], 5);
194        // sqlite_cache fields are flattened into the top-level object
195        assert_eq!(json["cache_hits"], 0);
196        assert!(
197            json.get("sqlite_cache").is_none(),
198            "sqlite_cache must be flattened"
199        );
200    }
201
202    #[test]
203    fn telemetry_level_default_is_counters() {
204        assert_eq!(TelemetryLevel::default(), TelemetryLevel::Counters);
205    }
206
207    #[test]
208    fn counter_defaults_to_zero() {
209        let counters = TelemetryCounters::default();
210        let snap = counters.snapshot();
211        assert_eq!(snap.queries_total, 0);
212        assert_eq!(snap.writes_total, 0);
213        assert_eq!(snap.write_rows_total, 0);
214        assert_eq!(snap.errors_total, 0);
215        assert_eq!(snap.admin_ops_total, 0);
216    }
217
218    #[test]
219    fn counter_increment_and_snapshot() {
220        let counters = TelemetryCounters::default();
221
222        counters.increment_queries();
223        counters.increment_queries();
224        counters.increment_writes(5);
225        counters.increment_writes(3);
226        counters.increment_errors();
227        counters.increment_admin_ops();
228        counters.increment_admin_ops();
229        counters.increment_admin_ops();
230
231        let snap = counters.snapshot();
232        assert_eq!(snap.queries_total, 2);
233        assert_eq!(snap.writes_total, 2);
234        assert_eq!(snap.write_rows_total, 8);
235        assert_eq!(snap.errors_total, 1);
236        assert_eq!(snap.admin_ops_total, 3);
237    }
238
239    #[test]
240    fn read_db_cache_status_on_fresh_connection() {
241        let conn = Connection::open_in_memory().expect("open in-memory db");
242        let status = read_db_cache_status(&conn);
243        // Fresh connection should have valid (non-negative) values.
244        assert!(status.cache_hits >= 0);
245        assert!(status.cache_misses >= 0);
246        assert!(status.cache_writes >= 0);
247        assert!(status.cache_spills >= 0);
248    }
249
250    #[test]
251    fn cache_status_reflects_queries() {
252        let conn = Connection::open_in_memory().expect("open in-memory db");
253        conn.execute_batch(
254            "CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT);
255             INSERT INTO t VALUES (1, 'a');
256             INSERT INTO t VALUES (2, 'b');
257             INSERT INTO t VALUES (3, 'c');",
258        )
259        .expect("setup");
260
261        // Run several queries to exercise the cache.
262        for _ in 0..10 {
263            let mut stmt = conn.prepare("SELECT * FROM t").expect("prepare");
264            let _rows: Vec<i64> = stmt
265                .query_map([], |row| row.get(0))
266                .expect("query")
267                .map(|r| r.expect("row"))
268                .collect();
269        }
270
271        let status = read_db_cache_status(&conn);
272        // After queries, we should see cache activity.
273        assert!(
274            status.cache_hits + status.cache_misses > 0,
275            "expected cache activity after queries, got hits={} misses={}",
276            status.cache_hits,
277            status.cache_misses,
278        );
279    }
280
281    #[test]
282    fn cache_status_add_sums_correctly() {
283        let a = SqliteCacheStatus {
284            cache_hits: 10,
285            cache_misses: 2,
286            cache_writes: 5,
287            cache_spills: 1,
288        };
289        let b = SqliteCacheStatus {
290            cache_hits: 3,
291            cache_misses: 7,
292            cache_writes: 0,
293            cache_spills: 4,
294        };
295        let mut total = SqliteCacheStatus::default();
296        total.add(&a);
297        total.add(&b);
298        assert_eq!(total.cache_hits, 13);
299        assert_eq!(total.cache_misses, 9);
300        assert_eq!(total.cache_writes, 5);
301        assert_eq!(total.cache_spills, 5);
302    }
303}