Skip to main content

fathomdb_engine/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5
6use crate::{
7    AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, QueryEmbedder,
8    WriterActor,
9    database_lock::DatabaseLock,
10    telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
11};
12
13/// Core engine runtime.
14///
15/// # Drop order invariant
16///
17/// Fields are ordered so that `coordinator` (reader connections) drops before
18/// `writer` (writer thread + connection).  This ensures the writer's
19/// `sqlite3_close()` is the last connection to the database, which triggers
20/// `SQLite`'s automatic passive WAL checkpoint and WAL/shm file cleanup.
21/// `_lock` drops last so the exclusive file lock is released only after all
22/// connections are closed.  Do not reorder these fields.
23///
24/// `telemetry` holds shared counters and has no drop-order concern (atomics).
25#[derive(Debug)]
26pub struct EngineRuntime {
27    telemetry: Arc<TelemetryCounters>,
28    coordinator: ExecutionCoordinator,
29    writer: WriterActor,
30    admin: AdminHandle,
31    _lock: DatabaseLock,
32}
33
34// Required by #[pyclass(frozen)] — guards against future fields breaking thread safety.
35#[allow(clippy::used_underscore_items)]
36const _: () = {
37    fn _assert_send_sync<T: Send + Sync>() {}
38    fn _check() {
39        _assert_send_sync::<EngineRuntime>();
40    }
41};
42
43impl EngineRuntime {
44    /// # Errors
45    /// Returns [`EngineError`] if the database connection cannot be opened, schema bootstrap fails,
46    /// or the writer actor cannot be started.
47    pub fn open(
48        path: impl AsRef<Path>,
49        provenance_mode: ProvenanceMode,
50        vector_dimension: Option<usize>,
51        read_pool_size: usize,
52        telemetry_level: TelemetryLevel,
53        query_embedder: Option<Arc<dyn QueryEmbedder>>,
54    ) -> Result<Self, EngineError> {
55        let lock = DatabaseLock::acquire(path.as_ref())?;
56
57        if read_pool_size == 0 {
58            return Err(EngineError::InvalidConfig(
59                "read_pool_size must be >= 1, got 0".to_owned(),
60            ));
61        }
62
63        trace_info!(
64            path = %path.as_ref().display(),
65            provenance_mode = ?provenance_mode,
66            vector_dimension = ?vector_dimension,
67            read_pool_size,
68            telemetry_level = ?telemetry_level,
69            "engine opening"
70        );
71        let _ = telemetry_level; // Used by trace_info! when tracing feature is active
72        let telemetry = Arc::new(TelemetryCounters::default());
73        let schema_manager = Arc::new(SchemaManager::new());
74        let coordinator = ExecutionCoordinator::open(
75            path.as_ref(),
76            Arc::clone(&schema_manager),
77            vector_dimension,
78            read_pool_size,
79            Arc::clone(&telemetry),
80            query_embedder,
81        )?;
82        let writer = WriterActor::start(
83            path.as_ref(),
84            Arc::clone(&schema_manager),
85            provenance_mode,
86            Arc::clone(&telemetry),
87        )?;
88        let admin = AdminHandle::new(AdminService::new(path.as_ref(), schema_manager));
89
90        trace_info!(path = %path.as_ref().display(), "engine opened");
91        Ok(Self {
92            telemetry,
93            coordinator,
94            writer,
95            admin,
96            _lock: lock,
97        })
98    }
99
100    pub fn coordinator(&self) -> &ExecutionCoordinator {
101        &self.coordinator
102    }
103
104    pub fn writer(&self) -> &WriterActor {
105        &self.writer
106    }
107
108    pub fn admin(&self) -> &AdminHandle {
109        &self.admin
110    }
111
112    /// Shared telemetry counters for incrementing from the public API layer.
113    pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
114        &self.telemetry
115    }
116
117    /// Read all telemetry counters and aggregate `SQLite` cache status across
118    /// the reader pool.
119    #[must_use]
120    pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
121        let mut snapshot = self.telemetry.snapshot();
122        snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
123        snapshot
124    }
125}
126
127#[cfg(test)]
128#[allow(clippy::expect_used)]
129mod tests {
130    use std::sync::Arc;
131
132    use fathomdb_query::QueryBuilder;
133
134    use crate::{
135        ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
136    };
137
138    use super::EngineRuntime;
139
140    /// Issue #30: the engine must support concurrent reads from multiple threads.
141    #[test]
142    fn concurrent_reads_from_multiple_threads() {
143        let dir = tempfile::tempdir().expect("temp dir");
144        let runtime = Arc::new(
145            EngineRuntime::open(
146                dir.path().join("test.db"),
147                ProvenanceMode::Warn,
148                None,
149                4,
150                TelemetryLevel::Counters,
151                None,
152            )
153            .expect("open"),
154        );
155
156        runtime
157            .writer()
158            .submit(WriteRequest {
159                label: "seed".to_owned(),
160                nodes: vec![NodeInsert {
161                    row_id: "r1".to_owned(),
162                    logical_id: "t:1".to_owned(),
163                    kind: "Test".to_owned(),
164                    properties: r#"{"v":1}"#.to_owned(),
165                    source_ref: Some("test".to_owned()),
166                    upsert: false,
167                    chunk_policy: ChunkPolicy::Preserve,
168                    content_ref: None,
169                }],
170                node_retires: vec![],
171                edges: vec![],
172                edge_retires: vec![],
173                chunks: vec![ChunkInsert {
174                    id: "c1".to_owned(),
175                    node_logical_id: "t:1".to_owned(),
176                    text_content: "hello world".to_owned(),
177                    byte_start: None,
178                    byte_end: None,
179                    content_hash: None,
180                }],
181                runs: vec![],
182                steps: vec![],
183                actions: vec![],
184                optional_backfills: vec![],
185                vec_inserts: vec![],
186                operational_writes: vec![],
187            })
188            .expect("seed write");
189
190        let compiled = QueryBuilder::nodes("Test")
191            .limit(10)
192            .compile()
193            .expect("compile");
194
195        let handles: Vec<_> = (0..4)
196            .map(|_| {
197                let rt = Arc::clone(&runtime);
198                let q = compiled.clone();
199                std::thread::spawn(move || {
200                    let rows = rt
201                        .coordinator()
202                        .execute_compiled_read(&q)
203                        .expect("query succeeds");
204                    assert_eq!(rows.nodes.len(), 1);
205                })
206            })
207            .collect();
208
209        for h in handles {
210            h.join().expect("worker thread panicked");
211        }
212    }
213
214    #[test]
215    fn open_same_database_twice_returns_database_locked() {
216        let dir = tempfile::tempdir().expect("temp dir");
217        let db_path = dir.path().join("test.db");
218
219        let _first = EngineRuntime::open(
220            &db_path,
221            ProvenanceMode::Warn,
222            None,
223            4,
224            TelemetryLevel::Counters,
225            None,
226        )
227        .expect("open");
228        let second = EngineRuntime::open(
229            &db_path,
230            ProvenanceMode::Warn,
231            None,
232            4,
233            TelemetryLevel::Counters,
234            None,
235        );
236
237        assert!(second.is_err(), "second open must fail");
238        let err = second.expect_err("second open must fail");
239        assert!(
240            matches!(err, crate::EngineError::DatabaseLocked(_)),
241            "expected DatabaseLocked, got: {err:?}"
242        );
243        assert!(
244            err.to_string().contains("already in use"),
245            "error must mention 'already in use': {err}"
246        );
247    }
248
249    #[test]
250    fn database_reopens_after_drop() {
251        let dir = tempfile::tempdir().expect("temp dir");
252        let db_path = dir.path().join("test.db");
253
254        {
255            let _runtime = EngineRuntime::open(
256                &db_path,
257                ProvenanceMode::Warn,
258                None,
259                4,
260                TelemetryLevel::Counters,
261                None,
262            )
263            .expect("first open");
264        }
265
266        let runtime = EngineRuntime::open(
267            &db_path,
268            ProvenanceMode::Warn,
269            None,
270            4,
271            TelemetryLevel::Counters,
272            None,
273        )
274        .expect("reopen");
275        let compiled = QueryBuilder::nodes("Test")
276            .limit(10)
277            .compile()
278            .expect("compile");
279        let rows = runtime
280            .coordinator()
281            .execute_compiled_read(&compiled)
282            .expect("query");
283        assert!(rows.nodes.is_empty());
284    }
285
286    #[test]
287    fn lock_error_includes_pid() {
288        let dir = tempfile::tempdir().expect("temp dir");
289        let db_path = dir.path().join("test.db");
290
291        let _first = EngineRuntime::open(
292            &db_path,
293            ProvenanceMode::Warn,
294            None,
295            4,
296            TelemetryLevel::Counters,
297            None,
298        )
299        .expect("open");
300        let err = EngineRuntime::open(
301            &db_path,
302            ProvenanceMode::Warn,
303            None,
304            4,
305            TelemetryLevel::Counters,
306            None,
307        )
308        .expect_err("second open must fail");
309
310        let msg = err.to_string();
311        assert!(
312            msg.contains("already in use"),
313            "error must mention 'already in use': {msg}"
314        );
315        // PID is best-effort; on Windows exclusive locks prevent reading the
316        // lock file from a second handle.
317        if cfg!(unix) {
318            let our_pid = std::process::id().to_string();
319            assert!(
320                msg.contains(&our_pid),
321                "error must contain holder pid {our_pid}: {msg}"
322            );
323        }
324    }
325
326    /// Verify that dropping `EngineRuntime` joins the writer thread and triggers
327    /// `SQLite`'s automatic passive WAL checkpoint (readers drop before writer).
328    #[test]
329    fn drop_joins_writer_and_checkpoints_wal() {
330        let dir = tempfile::tempdir().expect("temp dir");
331        let db_path = dir.path().join("test.db");
332        let wal_path = dir.path().join("test.db-wal");
333
334        {
335            let runtime = EngineRuntime::open(
336                &db_path,
337                ProvenanceMode::Warn,
338                None,
339                4,
340                TelemetryLevel::Counters,
341                None,
342            )
343            .expect("open");
344
345            runtime
346                .writer()
347                .submit(WriteRequest {
348                    label: "seed".to_owned(),
349                    nodes: vec![NodeInsert {
350                        row_id: "r1".to_owned(),
351                        logical_id: "t:1".to_owned(),
352                        kind: "Test".to_owned(),
353                        properties: r#"{"v":1}"#.to_owned(),
354                        source_ref: Some("test".to_owned()),
355                        upsert: false,
356                        chunk_policy: ChunkPolicy::Preserve,
357                        content_ref: None,
358                    }],
359                    node_retires: vec![],
360                    edges: vec![],
361                    edge_retires: vec![],
362                    chunks: vec![],
363                    runs: vec![],
364                    steps: vec![],
365                    actions: vec![],
366                    optional_backfills: vec![],
367                    vec_inserts: vec![],
368                    operational_writes: vec![],
369                })
370                .expect("seed write");
371        }
372        // After drop: WAL should be checkpointed and removed.
373        assert!(
374            !wal_path.exists(),
375            "WAL file should be cleaned up after graceful drop"
376        );
377
378        // Reopen and verify data persists.
379        let runtime = EngineRuntime::open(
380            &db_path,
381            ProvenanceMode::Warn,
382            None,
383            4,
384            TelemetryLevel::Counters,
385            None,
386        )
387        .expect("reopen");
388        let compiled = QueryBuilder::nodes("Test")
389            .limit(10)
390            .compile()
391            .expect("compile");
392        let rows = runtime
393            .coordinator()
394            .execute_compiled_read(&compiled)
395            .expect("query");
396        assert_eq!(rows.nodes.len(), 1);
397    }
398
399    /// Helper: create a seeded runtime with one node and one chunk.
400    fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
401        let dir = tempfile::tempdir().expect("temp dir");
402        let runtime = EngineRuntime::open(
403            dir.path().join("test.db"),
404            ProvenanceMode::Warn,
405            None,
406            4,
407            TelemetryLevel::Counters,
408            None,
409        )
410        .expect("open");
411
412        runtime
413            .writer()
414            .submit(WriteRequest {
415                label: "seed".to_owned(),
416                nodes: vec![NodeInsert {
417                    row_id: "r1".to_owned(),
418                    logical_id: "t:1".to_owned(),
419                    kind: "Test".to_owned(),
420                    properties: r#"{"v":1}"#.to_owned(),
421                    source_ref: Some("test".to_owned()),
422                    upsert: false,
423                    chunk_policy: ChunkPolicy::Preserve,
424                    content_ref: None,
425                }],
426                node_retires: vec![],
427                edges: vec![],
428                edge_retires: vec![],
429                chunks: vec![ChunkInsert {
430                    id: "c1".to_owned(),
431                    node_logical_id: "t:1".to_owned(),
432                    text_content: "hello world".to_owned(),
433                    byte_start: None,
434                    byte_end: None,
435                    content_hash: None,
436                }],
437                runs: vec![],
438                steps: vec![],
439                actions: vec![],
440                optional_backfills: vec![],
441                vec_inserts: vec![],
442                operational_writes: vec![],
443            })
444            .expect("seed write");
445
446        (dir, runtime)
447    }
448
449    #[test]
450    fn telemetry_counts_queries() {
451        let (_dir, runtime) = seeded_runtime();
452        let compiled = QueryBuilder::nodes("Test")
453            .limit(10)
454            .compile()
455            .expect("compile");
456
457        for _ in 0..3 {
458            runtime
459                .coordinator()
460                .execute_compiled_read(&compiled)
461                .expect("query");
462        }
463
464        let snap = runtime.telemetry_snapshot();
465        assert!(
466            snap.queries_total >= 3,
467            "expected at least 3 queries, got {}",
468            snap.queries_total,
469        );
470    }
471
472    #[test]
473    fn telemetry_counts_writes() {
474        let (_dir, runtime) = seeded_runtime();
475
476        // seeded_runtime already submitted 1 write
477        runtime
478            .writer()
479            .submit(WriteRequest {
480                label: "second".to_owned(),
481                nodes: vec![NodeInsert {
482                    row_id: "r2".to_owned(),
483                    logical_id: "t:2".to_owned(),
484                    kind: "Test".to_owned(),
485                    properties: r#"{"v":2}"#.to_owned(),
486                    source_ref: Some("test".to_owned()),
487                    upsert: false,
488                    chunk_policy: ChunkPolicy::Preserve,
489                    content_ref: None,
490                }],
491                node_retires: vec![],
492                edges: vec![],
493                edge_retires: vec![],
494                chunks: vec![],
495                runs: vec![],
496                steps: vec![],
497                actions: vec![],
498                optional_backfills: vec![],
499                vec_inserts: vec![],
500                operational_writes: vec![],
501            })
502            .expect("second write");
503
504        let snap = runtime.telemetry_snapshot();
505        assert!(
506            snap.writes_total >= 2,
507            "expected at least 2 writes, got {}",
508            snap.writes_total,
509        );
510    }
511
512    #[test]
513    fn telemetry_counts_write_rows() {
514        let (_dir, runtime) = seeded_runtime();
515        // The seed write has 1 node + 1 chunk = 2 rows
516        let snap = runtime.telemetry_snapshot();
517        assert!(
518            snap.write_rows_total >= 2,
519            "expected at least 2 write rows, got {}",
520            snap.write_rows_total,
521        );
522    }
523
524    #[test]
525    fn telemetry_snapshot_includes_cache_status() {
526        let (_dir, runtime) = seeded_runtime();
527        let compiled = QueryBuilder::nodes("Test")
528            .limit(10)
529            .compile()
530            .expect("compile");
531
532        // Run several queries to exercise the page cache.
533        for _ in 0..5 {
534            runtime
535                .coordinator()
536                .execute_compiled_read(&compiled)
537                .expect("query");
538        }
539
540        let snap = runtime.telemetry_snapshot();
541        assert!(
542            snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
543            "expected cache activity, got hits={} misses={}",
544            snap.sqlite_cache.cache_hits,
545            snap.sqlite_cache.cache_misses,
546        );
547    }
548}