Skip to main content

fathomdb_engine/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5
6use crate::{
7    AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, WriterActor,
8    database_lock::DatabaseLock,
9    telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
10};
11
12/// Core engine runtime.
13///
14/// # Drop order invariant
15///
16/// Fields are ordered so that `coordinator` (reader connections) drops before
17/// `writer` (writer thread + connection).  This ensures the writer's
18/// `sqlite3_close()` is the last connection to the database, which triggers
19/// `SQLite`'s automatic passive WAL checkpoint and WAL/shm file cleanup.
20/// `_lock` drops last so the exclusive file lock is released only after all
21/// connections are closed.  Do not reorder these fields.
22///
23/// `telemetry` holds shared counters and has no drop-order concern (atomics).
24#[derive(Debug)]
25pub struct EngineRuntime {
26    telemetry: Arc<TelemetryCounters>,
27    coordinator: ExecutionCoordinator,
28    writer: WriterActor,
29    admin: AdminHandle,
30    _lock: DatabaseLock,
31}
32
33// Required by #[pyclass(frozen)] — guards against future fields breaking thread safety.
34#[allow(clippy::used_underscore_items)]
35const _: () = {
36    fn _assert_send_sync<T: Send + Sync>() {}
37    fn _check() {
38        _assert_send_sync::<EngineRuntime>();
39    }
40};
41
42impl EngineRuntime {
43    /// # Errors
44    /// Returns [`EngineError`] if the database connection cannot be opened, schema bootstrap fails,
45    /// or the writer actor cannot be started.
46    pub fn open(
47        path: impl AsRef<Path>,
48        provenance_mode: ProvenanceMode,
49        vector_dimension: Option<usize>,
50        read_pool_size: usize,
51        telemetry_level: TelemetryLevel,
52    ) -> Result<Self, EngineError> {
53        let lock = DatabaseLock::acquire(path.as_ref())?;
54
55        if read_pool_size == 0 {
56            return Err(EngineError::InvalidConfig(
57                "read_pool_size must be >= 1, got 0".to_owned(),
58            ));
59        }
60
61        trace_info!(
62            path = %path.as_ref().display(),
63            provenance_mode = ?provenance_mode,
64            vector_dimension = ?vector_dimension,
65            read_pool_size,
66            telemetry_level = ?telemetry_level,
67            "engine opening"
68        );
69        let _ = telemetry_level; // Used by trace_info! when tracing feature is active
70        let telemetry = Arc::new(TelemetryCounters::default());
71        let schema_manager = Arc::new(SchemaManager::new());
72        let coordinator = ExecutionCoordinator::open(
73            path.as_ref(),
74            Arc::clone(&schema_manager),
75            vector_dimension,
76            read_pool_size,
77            Arc::clone(&telemetry),
78        )?;
79        let writer = WriterActor::start(
80            path.as_ref(),
81            Arc::clone(&schema_manager),
82            provenance_mode,
83            Arc::clone(&telemetry),
84        )?;
85        let admin = AdminHandle::new(AdminService::new(path.as_ref(), schema_manager));
86
87        trace_info!(path = %path.as_ref().display(), "engine opened");
88        Ok(Self {
89            telemetry,
90            coordinator,
91            writer,
92            admin,
93            _lock: lock,
94        })
95    }
96
97    pub fn coordinator(&self) -> &ExecutionCoordinator {
98        &self.coordinator
99    }
100
101    pub fn writer(&self) -> &WriterActor {
102        &self.writer
103    }
104
105    pub fn admin(&self) -> &AdminHandle {
106        &self.admin
107    }
108
109    /// Shared telemetry counters for incrementing from the public API layer.
110    pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
111        &self.telemetry
112    }
113
114    /// Read all telemetry counters and aggregate `SQLite` cache status across
115    /// the reader pool.
116    #[must_use]
117    pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
118        let mut snapshot = self.telemetry.snapshot();
119        snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
120        snapshot
121    }
122}
123
124#[cfg(test)]
125#[allow(clippy::expect_used)]
126mod tests {
127    use std::sync::Arc;
128
129    use fathomdb_query::QueryBuilder;
130
131    use crate::{
132        ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
133    };
134
135    use super::EngineRuntime;
136
137    /// Issue #30: the engine must support concurrent reads from multiple threads.
138    #[test]
139    fn concurrent_reads_from_multiple_threads() {
140        let dir = tempfile::tempdir().expect("temp dir");
141        let runtime = Arc::new(
142            EngineRuntime::open(
143                dir.path().join("test.db"),
144                ProvenanceMode::Warn,
145                None,
146                4,
147                TelemetryLevel::Counters,
148            )
149            .expect("open"),
150        );
151
152        runtime
153            .writer()
154            .submit(WriteRequest {
155                label: "seed".to_owned(),
156                nodes: vec![NodeInsert {
157                    row_id: "r1".to_owned(),
158                    logical_id: "t:1".to_owned(),
159                    kind: "Test".to_owned(),
160                    properties: r#"{"v":1}"#.to_owned(),
161                    source_ref: Some("test".to_owned()),
162                    upsert: false,
163                    chunk_policy: ChunkPolicy::Preserve,
164                }],
165                node_retires: vec![],
166                edges: vec![],
167                edge_retires: vec![],
168                chunks: vec![ChunkInsert {
169                    id: "c1".to_owned(),
170                    node_logical_id: "t:1".to_owned(),
171                    text_content: "hello world".to_owned(),
172                    byte_start: None,
173                    byte_end: None,
174                }],
175                runs: vec![],
176                steps: vec![],
177                actions: vec![],
178                optional_backfills: vec![],
179                vec_inserts: vec![],
180                operational_writes: vec![],
181            })
182            .expect("seed write");
183
184        let compiled = QueryBuilder::nodes("Test")
185            .limit(10)
186            .compile()
187            .expect("compile");
188
189        let handles: Vec<_> = (0..4)
190            .map(|_| {
191                let rt = Arc::clone(&runtime);
192                let q = compiled.clone();
193                std::thread::spawn(move || {
194                    let rows = rt
195                        .coordinator()
196                        .execute_compiled_read(&q)
197                        .expect("query succeeds");
198                    assert_eq!(rows.nodes.len(), 1);
199                })
200            })
201            .collect();
202
203        for h in handles {
204            h.join().expect("worker thread panicked");
205        }
206    }
207
208    #[test]
209    fn open_same_database_twice_returns_database_locked() {
210        let dir = tempfile::tempdir().expect("temp dir");
211        let db_path = dir.path().join("test.db");
212
213        let _first = EngineRuntime::open(
214            &db_path,
215            ProvenanceMode::Warn,
216            None,
217            4,
218            TelemetryLevel::Counters,
219        )
220        .expect("open");
221        let second = EngineRuntime::open(
222            &db_path,
223            ProvenanceMode::Warn,
224            None,
225            4,
226            TelemetryLevel::Counters,
227        );
228
229        assert!(second.is_err(), "second open must fail");
230        let err = second.expect_err("second open must fail");
231        assert!(
232            matches!(err, crate::EngineError::DatabaseLocked(_)),
233            "expected DatabaseLocked, got: {err:?}"
234        );
235        assert!(
236            err.to_string().contains("already in use"),
237            "error must mention 'already in use': {err}"
238        );
239    }
240
241    #[test]
242    fn database_reopens_after_drop() {
243        let dir = tempfile::tempdir().expect("temp dir");
244        let db_path = dir.path().join("test.db");
245
246        {
247            let _runtime = EngineRuntime::open(
248                &db_path,
249                ProvenanceMode::Warn,
250                None,
251                4,
252                TelemetryLevel::Counters,
253            )
254            .expect("first open");
255        }
256
257        let runtime = EngineRuntime::open(
258            &db_path,
259            ProvenanceMode::Warn,
260            None,
261            4,
262            TelemetryLevel::Counters,
263        )
264        .expect("reopen");
265        let compiled = QueryBuilder::nodes("Test")
266            .limit(10)
267            .compile()
268            .expect("compile");
269        let rows = runtime
270            .coordinator()
271            .execute_compiled_read(&compiled)
272            .expect("query");
273        assert!(rows.nodes.is_empty());
274    }
275
276    #[test]
277    fn lock_error_includes_pid() {
278        let dir = tempfile::tempdir().expect("temp dir");
279        let db_path = dir.path().join("test.db");
280
281        let _first = EngineRuntime::open(
282            &db_path,
283            ProvenanceMode::Warn,
284            None,
285            4,
286            TelemetryLevel::Counters,
287        )
288        .expect("open");
289        let err = EngineRuntime::open(
290            &db_path,
291            ProvenanceMode::Warn,
292            None,
293            4,
294            TelemetryLevel::Counters,
295        )
296        .expect_err("second open must fail");
297
298        let msg = err.to_string();
299        assert!(
300            msg.contains("already in use"),
301            "error must mention 'already in use': {msg}"
302        );
303        // PID is best-effort; on Windows exclusive locks prevent reading the
304        // lock file from a second handle.
305        if cfg!(unix) {
306            let our_pid = std::process::id().to_string();
307            assert!(
308                msg.contains(&our_pid),
309                "error must contain holder pid {our_pid}: {msg}"
310            );
311        }
312    }
313
314    /// Verify that dropping `EngineRuntime` joins the writer thread and triggers
315    /// `SQLite`'s automatic passive WAL checkpoint (readers drop before writer).
316    #[test]
317    fn drop_joins_writer_and_checkpoints_wal() {
318        let dir = tempfile::tempdir().expect("temp dir");
319        let db_path = dir.path().join("test.db");
320        let wal_path = dir.path().join("test.db-wal");
321
322        {
323            let runtime = EngineRuntime::open(
324                &db_path,
325                ProvenanceMode::Warn,
326                None,
327                4,
328                TelemetryLevel::Counters,
329            )
330            .expect("open");
331
332            runtime
333                .writer()
334                .submit(WriteRequest {
335                    label: "seed".to_owned(),
336                    nodes: vec![NodeInsert {
337                        row_id: "r1".to_owned(),
338                        logical_id: "t:1".to_owned(),
339                        kind: "Test".to_owned(),
340                        properties: r#"{"v":1}"#.to_owned(),
341                        source_ref: Some("test".to_owned()),
342                        upsert: false,
343                        chunk_policy: ChunkPolicy::Preserve,
344                    }],
345                    node_retires: vec![],
346                    edges: vec![],
347                    edge_retires: vec![],
348                    chunks: vec![],
349                    runs: vec![],
350                    steps: vec![],
351                    actions: vec![],
352                    optional_backfills: vec![],
353                    vec_inserts: vec![],
354                    operational_writes: vec![],
355                })
356                .expect("seed write");
357        }
358        // After drop: WAL should be checkpointed and removed.
359        assert!(
360            !wal_path.exists(),
361            "WAL file should be cleaned up after graceful drop"
362        );
363
364        // Reopen and verify data persists.
365        let runtime = EngineRuntime::open(
366            &db_path,
367            ProvenanceMode::Warn,
368            None,
369            4,
370            TelemetryLevel::Counters,
371        )
372        .expect("reopen");
373        let compiled = QueryBuilder::nodes("Test")
374            .limit(10)
375            .compile()
376            .expect("compile");
377        let rows = runtime
378            .coordinator()
379            .execute_compiled_read(&compiled)
380            .expect("query");
381        assert_eq!(rows.nodes.len(), 1);
382    }
383
384    /// Helper: create a seeded runtime with one node and one chunk.
385    fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
386        let dir = tempfile::tempdir().expect("temp dir");
387        let runtime = EngineRuntime::open(
388            dir.path().join("test.db"),
389            ProvenanceMode::Warn,
390            None,
391            4,
392            TelemetryLevel::Counters,
393        )
394        .expect("open");
395
396        runtime
397            .writer()
398            .submit(WriteRequest {
399                label: "seed".to_owned(),
400                nodes: vec![NodeInsert {
401                    row_id: "r1".to_owned(),
402                    logical_id: "t:1".to_owned(),
403                    kind: "Test".to_owned(),
404                    properties: r#"{"v":1}"#.to_owned(),
405                    source_ref: Some("test".to_owned()),
406                    upsert: false,
407                    chunk_policy: ChunkPolicy::Preserve,
408                }],
409                node_retires: vec![],
410                edges: vec![],
411                edge_retires: vec![],
412                chunks: vec![ChunkInsert {
413                    id: "c1".to_owned(),
414                    node_logical_id: "t:1".to_owned(),
415                    text_content: "hello world".to_owned(),
416                    byte_start: None,
417                    byte_end: None,
418                }],
419                runs: vec![],
420                steps: vec![],
421                actions: vec![],
422                optional_backfills: vec![],
423                vec_inserts: vec![],
424                operational_writes: vec![],
425            })
426            .expect("seed write");
427
428        (dir, runtime)
429    }
430
431    #[test]
432    fn telemetry_counts_queries() {
433        let (_dir, runtime) = seeded_runtime();
434        let compiled = QueryBuilder::nodes("Test")
435            .limit(10)
436            .compile()
437            .expect("compile");
438
439        for _ in 0..3 {
440            runtime
441                .coordinator()
442                .execute_compiled_read(&compiled)
443                .expect("query");
444        }
445
446        let snap = runtime.telemetry_snapshot();
447        assert!(
448            snap.queries_total >= 3,
449            "expected at least 3 queries, got {}",
450            snap.queries_total,
451        );
452    }
453
454    #[test]
455    fn telemetry_counts_writes() {
456        let (_dir, runtime) = seeded_runtime();
457
458        // seeded_runtime already submitted 1 write
459        runtime
460            .writer()
461            .submit(WriteRequest {
462                label: "second".to_owned(),
463                nodes: vec![NodeInsert {
464                    row_id: "r2".to_owned(),
465                    logical_id: "t:2".to_owned(),
466                    kind: "Test".to_owned(),
467                    properties: r#"{"v":2}"#.to_owned(),
468                    source_ref: Some("test".to_owned()),
469                    upsert: false,
470                    chunk_policy: ChunkPolicy::Preserve,
471                }],
472                node_retires: vec![],
473                edges: vec![],
474                edge_retires: vec![],
475                chunks: vec![],
476                runs: vec![],
477                steps: vec![],
478                actions: vec![],
479                optional_backfills: vec![],
480                vec_inserts: vec![],
481                operational_writes: vec![],
482            })
483            .expect("second write");
484
485        let snap = runtime.telemetry_snapshot();
486        assert!(
487            snap.writes_total >= 2,
488            "expected at least 2 writes, got {}",
489            snap.writes_total,
490        );
491    }
492
493    #[test]
494    fn telemetry_counts_write_rows() {
495        let (_dir, runtime) = seeded_runtime();
496        // The seed write has 1 node + 1 chunk = 2 rows
497        let snap = runtime.telemetry_snapshot();
498        assert!(
499            snap.write_rows_total >= 2,
500            "expected at least 2 write rows, got {}",
501            snap.write_rows_total,
502        );
503    }
504
505    #[test]
506    fn telemetry_snapshot_includes_cache_status() {
507        let (_dir, runtime) = seeded_runtime();
508        let compiled = QueryBuilder::nodes("Test")
509            .limit(10)
510            .compile()
511            .expect("compile");
512
513        // Run several queries to exercise the page cache.
514        for _ in 0..5 {
515            runtime
516                .coordinator()
517                .execute_compiled_read(&compiled)
518                .expect("query");
519        }
520
521        let snap = runtime.telemetry_snapshot();
522        assert!(
523            snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
524            "expected cache activity, got hits={} misses={}",
525            snap.sqlite_cache.cache_hits,
526            snap.sqlite_cache.cache_misses,
527        );
528    }
529}