Skip to main content

fathomdb_engine/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5
6use crate::{
7    AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, WriterActor,
8    database_lock::DatabaseLock,
9    telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
10};
11
12/// Core engine runtime.
13///
14/// # Drop order invariant
15///
16/// Fields are ordered so that `coordinator` (reader connections) drops before
17/// `writer` (writer thread + connection).  This ensures the writer's
18/// `sqlite3_close()` is the last connection to the database, which triggers
19/// `SQLite`'s automatic passive WAL checkpoint and WAL/shm file cleanup.
20/// `_lock` drops last so the exclusive file lock is released only after all
21/// connections are closed.  Do not reorder these fields.
22///
23/// `telemetry` holds shared counters and has no drop-order concern (atomics).
24#[derive(Debug)]
25pub struct EngineRuntime {
26    telemetry: Arc<TelemetryCounters>,
27    coordinator: ExecutionCoordinator,
28    writer: WriterActor,
29    admin: AdminHandle,
30    _lock: DatabaseLock,
31}
32
33// Required by #[pyclass(frozen)] — guards against future fields breaking thread safety.
34#[allow(clippy::used_underscore_items)]
35const _: () = {
36    fn _assert_send_sync<T: Send + Sync>() {}
37    fn _check() {
38        _assert_send_sync::<EngineRuntime>();
39    }
40};
41
42impl EngineRuntime {
43    /// # Errors
44    /// Returns [`EngineError`] if the database connection cannot be opened, schema bootstrap fails,
45    /// or the writer actor cannot be started.
46    pub fn open(
47        path: impl AsRef<Path>,
48        provenance_mode: ProvenanceMode,
49        vector_dimension: Option<usize>,
50        read_pool_size: usize,
51        telemetry_level: TelemetryLevel,
52    ) -> Result<Self, EngineError> {
53        let lock = DatabaseLock::acquire(path.as_ref())?;
54
55        if read_pool_size == 0 {
56            return Err(EngineError::InvalidConfig(
57                "read_pool_size must be >= 1, got 0".to_owned(),
58            ));
59        }
60
61        trace_info!(
62            path = %path.as_ref().display(),
63            provenance_mode = ?provenance_mode,
64            vector_dimension = ?vector_dimension,
65            read_pool_size,
66            telemetry_level = ?telemetry_level,
67            "engine opening"
68        );
69        let _ = telemetry_level; // Used by trace_info! when tracing feature is active
70        let telemetry = Arc::new(TelemetryCounters::default());
71        let schema_manager = Arc::new(SchemaManager::new());
72        let coordinator = ExecutionCoordinator::open(
73            path.as_ref(),
74            Arc::clone(&schema_manager),
75            vector_dimension,
76            read_pool_size,
77            Arc::clone(&telemetry),
78        )?;
79        let writer = WriterActor::start(
80            path.as_ref(),
81            Arc::clone(&schema_manager),
82            provenance_mode,
83            Arc::clone(&telemetry),
84        )?;
85        let admin = AdminHandle::new(AdminService::new(path.as_ref(), schema_manager));
86
87        trace_info!(path = %path.as_ref().display(), "engine opened");
88        Ok(Self {
89            telemetry,
90            coordinator,
91            writer,
92            admin,
93            _lock: lock,
94        })
95    }
96
97    pub fn coordinator(&self) -> &ExecutionCoordinator {
98        &self.coordinator
99    }
100
101    pub fn writer(&self) -> &WriterActor {
102        &self.writer
103    }
104
105    pub fn admin(&self) -> &AdminHandle {
106        &self.admin
107    }
108
109    /// Shared telemetry counters for incrementing from the public API layer.
110    pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
111        &self.telemetry
112    }
113
114    /// Read all telemetry counters and aggregate `SQLite` cache status across
115    /// the reader pool.
116    #[must_use]
117    pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
118        let mut snapshot = self.telemetry.snapshot();
119        snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
120        snapshot
121    }
122}
123
124#[cfg(test)]
125#[allow(clippy::expect_used)]
126mod tests {
127    use std::sync::Arc;
128
129    use fathomdb_query::QueryBuilder;
130
131    use crate::{
132        ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
133    };
134
135    use super::EngineRuntime;
136
137    /// Issue #30: the engine must support concurrent reads from multiple threads.
138    #[test]
139    fn concurrent_reads_from_multiple_threads() {
140        let dir = tempfile::tempdir().expect("temp dir");
141        let runtime = Arc::new(
142            EngineRuntime::open(
143                dir.path().join("test.db"),
144                ProvenanceMode::Warn,
145                None,
146                4,
147                TelemetryLevel::Counters,
148            )
149            .expect("open"),
150        );
151
152        runtime
153            .writer()
154            .submit(WriteRequest {
155                label: "seed".to_owned(),
156                nodes: vec![NodeInsert {
157                    row_id: "r1".to_owned(),
158                    logical_id: "t:1".to_owned(),
159                    kind: "Test".to_owned(),
160                    properties: r#"{"v":1}"#.to_owned(),
161                    source_ref: Some("test".to_owned()),
162                    upsert: false,
163                    chunk_policy: ChunkPolicy::Preserve,
164                    content_ref: None,
165                }],
166                node_retires: vec![],
167                edges: vec![],
168                edge_retires: vec![],
169                chunks: vec![ChunkInsert {
170                    id: "c1".to_owned(),
171                    node_logical_id: "t:1".to_owned(),
172                    text_content: "hello world".to_owned(),
173                    byte_start: None,
174                    byte_end: None,
175                    content_hash: None,
176                }],
177                runs: vec![],
178                steps: vec![],
179                actions: vec![],
180                optional_backfills: vec![],
181                vec_inserts: vec![],
182                operational_writes: vec![],
183            })
184            .expect("seed write");
185
186        let compiled = QueryBuilder::nodes("Test")
187            .limit(10)
188            .compile()
189            .expect("compile");
190
191        let handles: Vec<_> = (0..4)
192            .map(|_| {
193                let rt = Arc::clone(&runtime);
194                let q = compiled.clone();
195                std::thread::spawn(move || {
196                    let rows = rt
197                        .coordinator()
198                        .execute_compiled_read(&q)
199                        .expect("query succeeds");
200                    assert_eq!(rows.nodes.len(), 1);
201                })
202            })
203            .collect();
204
205        for h in handles {
206            h.join().expect("worker thread panicked");
207        }
208    }
209
210    #[test]
211    fn open_same_database_twice_returns_database_locked() {
212        let dir = tempfile::tempdir().expect("temp dir");
213        let db_path = dir.path().join("test.db");
214
215        let _first = EngineRuntime::open(
216            &db_path,
217            ProvenanceMode::Warn,
218            None,
219            4,
220            TelemetryLevel::Counters,
221        )
222        .expect("open");
223        let second = EngineRuntime::open(
224            &db_path,
225            ProvenanceMode::Warn,
226            None,
227            4,
228            TelemetryLevel::Counters,
229        );
230
231        assert!(second.is_err(), "second open must fail");
232        let err = second.expect_err("second open must fail");
233        assert!(
234            matches!(err, crate::EngineError::DatabaseLocked(_)),
235            "expected DatabaseLocked, got: {err:?}"
236        );
237        assert!(
238            err.to_string().contains("already in use"),
239            "error must mention 'already in use': {err}"
240        );
241    }
242
243    #[test]
244    fn database_reopens_after_drop() {
245        let dir = tempfile::tempdir().expect("temp dir");
246        let db_path = dir.path().join("test.db");
247
248        {
249            let _runtime = EngineRuntime::open(
250                &db_path,
251                ProvenanceMode::Warn,
252                None,
253                4,
254                TelemetryLevel::Counters,
255            )
256            .expect("first open");
257        }
258
259        let runtime = EngineRuntime::open(
260            &db_path,
261            ProvenanceMode::Warn,
262            None,
263            4,
264            TelemetryLevel::Counters,
265        )
266        .expect("reopen");
267        let compiled = QueryBuilder::nodes("Test")
268            .limit(10)
269            .compile()
270            .expect("compile");
271        let rows = runtime
272            .coordinator()
273            .execute_compiled_read(&compiled)
274            .expect("query");
275        assert!(rows.nodes.is_empty());
276    }
277
278    #[test]
279    fn lock_error_includes_pid() {
280        let dir = tempfile::tempdir().expect("temp dir");
281        let db_path = dir.path().join("test.db");
282
283        let _first = EngineRuntime::open(
284            &db_path,
285            ProvenanceMode::Warn,
286            None,
287            4,
288            TelemetryLevel::Counters,
289        )
290        .expect("open");
291        let err = EngineRuntime::open(
292            &db_path,
293            ProvenanceMode::Warn,
294            None,
295            4,
296            TelemetryLevel::Counters,
297        )
298        .expect_err("second open must fail");
299
300        let msg = err.to_string();
301        assert!(
302            msg.contains("already in use"),
303            "error must mention 'already in use': {msg}"
304        );
305        // PID is best-effort; on Windows exclusive locks prevent reading the
306        // lock file from a second handle.
307        if cfg!(unix) {
308            let our_pid = std::process::id().to_string();
309            assert!(
310                msg.contains(&our_pid),
311                "error must contain holder pid {our_pid}: {msg}"
312            );
313        }
314    }
315
316    /// Verify that dropping `EngineRuntime` joins the writer thread and triggers
317    /// `SQLite`'s automatic passive WAL checkpoint (readers drop before writer).
318    #[test]
319    fn drop_joins_writer_and_checkpoints_wal() {
320        let dir = tempfile::tempdir().expect("temp dir");
321        let db_path = dir.path().join("test.db");
322        let wal_path = dir.path().join("test.db-wal");
323
324        {
325            let runtime = EngineRuntime::open(
326                &db_path,
327                ProvenanceMode::Warn,
328                None,
329                4,
330                TelemetryLevel::Counters,
331            )
332            .expect("open");
333
334            runtime
335                .writer()
336                .submit(WriteRequest {
337                    label: "seed".to_owned(),
338                    nodes: vec![NodeInsert {
339                        row_id: "r1".to_owned(),
340                        logical_id: "t:1".to_owned(),
341                        kind: "Test".to_owned(),
342                        properties: r#"{"v":1}"#.to_owned(),
343                        source_ref: Some("test".to_owned()),
344                        upsert: false,
345                        chunk_policy: ChunkPolicy::Preserve,
346                        content_ref: None,
347                    }],
348                    node_retires: vec![],
349                    edges: vec![],
350                    edge_retires: vec![],
351                    chunks: vec![],
352                    runs: vec![],
353                    steps: vec![],
354                    actions: vec![],
355                    optional_backfills: vec![],
356                    vec_inserts: vec![],
357                    operational_writes: vec![],
358                })
359                .expect("seed write");
360        }
361        // After drop: WAL should be checkpointed and removed.
362        assert!(
363            !wal_path.exists(),
364            "WAL file should be cleaned up after graceful drop"
365        );
366
367        // Reopen and verify data persists.
368        let runtime = EngineRuntime::open(
369            &db_path,
370            ProvenanceMode::Warn,
371            None,
372            4,
373            TelemetryLevel::Counters,
374        )
375        .expect("reopen");
376        let compiled = QueryBuilder::nodes("Test")
377            .limit(10)
378            .compile()
379            .expect("compile");
380        let rows = runtime
381            .coordinator()
382            .execute_compiled_read(&compiled)
383            .expect("query");
384        assert_eq!(rows.nodes.len(), 1);
385    }
386
387    /// Helper: create a seeded runtime with one node and one chunk.
388    fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
389        let dir = tempfile::tempdir().expect("temp dir");
390        let runtime = EngineRuntime::open(
391            dir.path().join("test.db"),
392            ProvenanceMode::Warn,
393            None,
394            4,
395            TelemetryLevel::Counters,
396        )
397        .expect("open");
398
399        runtime
400            .writer()
401            .submit(WriteRequest {
402                label: "seed".to_owned(),
403                nodes: vec![NodeInsert {
404                    row_id: "r1".to_owned(),
405                    logical_id: "t:1".to_owned(),
406                    kind: "Test".to_owned(),
407                    properties: r#"{"v":1}"#.to_owned(),
408                    source_ref: Some("test".to_owned()),
409                    upsert: false,
410                    chunk_policy: ChunkPolicy::Preserve,
411                    content_ref: None,
412                }],
413                node_retires: vec![],
414                edges: vec![],
415                edge_retires: vec![],
416                chunks: vec![ChunkInsert {
417                    id: "c1".to_owned(),
418                    node_logical_id: "t:1".to_owned(),
419                    text_content: "hello world".to_owned(),
420                    byte_start: None,
421                    byte_end: None,
422                    content_hash: None,
423                }],
424                runs: vec![],
425                steps: vec![],
426                actions: vec![],
427                optional_backfills: vec![],
428                vec_inserts: vec![],
429                operational_writes: vec![],
430            })
431            .expect("seed write");
432
433        (dir, runtime)
434    }
435
436    #[test]
437    fn telemetry_counts_queries() {
438        let (_dir, runtime) = seeded_runtime();
439        let compiled = QueryBuilder::nodes("Test")
440            .limit(10)
441            .compile()
442            .expect("compile");
443
444        for _ in 0..3 {
445            runtime
446                .coordinator()
447                .execute_compiled_read(&compiled)
448                .expect("query");
449        }
450
451        let snap = runtime.telemetry_snapshot();
452        assert!(
453            snap.queries_total >= 3,
454            "expected at least 3 queries, got {}",
455            snap.queries_total,
456        );
457    }
458
459    #[test]
460    fn telemetry_counts_writes() {
461        let (_dir, runtime) = seeded_runtime();
462
463        // seeded_runtime already submitted 1 write
464        runtime
465            .writer()
466            .submit(WriteRequest {
467                label: "second".to_owned(),
468                nodes: vec![NodeInsert {
469                    row_id: "r2".to_owned(),
470                    logical_id: "t:2".to_owned(),
471                    kind: "Test".to_owned(),
472                    properties: r#"{"v":2}"#.to_owned(),
473                    source_ref: Some("test".to_owned()),
474                    upsert: false,
475                    chunk_policy: ChunkPolicy::Preserve,
476                    content_ref: None,
477                }],
478                node_retires: vec![],
479                edges: vec![],
480                edge_retires: vec![],
481                chunks: vec![],
482                runs: vec![],
483                steps: vec![],
484                actions: vec![],
485                optional_backfills: vec![],
486                vec_inserts: vec![],
487                operational_writes: vec![],
488            })
489            .expect("second write");
490
491        let snap = runtime.telemetry_snapshot();
492        assert!(
493            snap.writes_total >= 2,
494            "expected at least 2 writes, got {}",
495            snap.writes_total,
496        );
497    }
498
499    #[test]
500    fn telemetry_counts_write_rows() {
501        let (_dir, runtime) = seeded_runtime();
502        // The seed write has 1 node + 1 chunk = 2 rows
503        let snap = runtime.telemetry_snapshot();
504        assert!(
505            snap.write_rows_total >= 2,
506            "expected at least 2 write rows, got {}",
507            snap.write_rows_total,
508        );
509    }
510
511    #[test]
512    fn telemetry_snapshot_includes_cache_status() {
513        let (_dir, runtime) = seeded_runtime();
514        let compiled = QueryBuilder::nodes("Test")
515            .limit(10)
516            .compile()
517            .expect("compile");
518
519        // Run several queries to exercise the page cache.
520        for _ in 0..5 {
521            runtime
522                .coordinator()
523                .execute_compiled_read(&compiled)
524                .expect("query");
525        }
526
527        let snap = runtime.telemetry_snapshot();
528        assert!(
529            snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
530            "expected cache activity, got hits={} misses={}",
531            snap.sqlite_cache.cache_hits,
532            snap.sqlite_cache.cache_misses,
533        );
534    }
535}