Skip to main content

fathomdb_engine/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::sync::atomic::AtomicBool;
4use std::sync::mpsc;
5
6use fathomdb_schema::SchemaManager;
7
8use crate::{
9    AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, QueryEmbedder,
10    VectorProjectionActor, WriterActor,
11    database_lock::DatabaseLock,
12    rebuild_actor::{RebuildActor, RebuildClient, RebuildRequest, recover_interrupted_rebuilds},
13    telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
14};
15
16/// Core engine runtime.
17///
18/// # Drop order invariant
19///
20/// Fields are ordered so that `coordinator` (reader connections) drops before
21/// `writer` (writer thread + connection).  This ensures the writer's
22/// `sqlite3_close()` is the last connection to the database, which triggers
23/// `SQLite`'s automatic passive WAL checkpoint and WAL/shm file cleanup.
24/// `_vector_actor` drops after `writer` and before `_rebuild` so the vector
25/// projection thread stops submitting writer messages before the writer
26/// channel closes (actually the writer drops first because we hold the
27/// writer in an `Arc` shared with the admin service, and the Arc survives
28/// through admin; effectively the admin handle drops before the vector
29/// actor's join).
30/// `_rebuild` drops before `_lock` so the rebuild thread's connection closes
31/// before the exclusive file lock is released.
32/// `_lock` drops last so the exclusive file lock is released only after all
33/// connections are closed.  Do not reorder these fields.
34///
35/// `telemetry` holds shared counters and has no drop-order concern (atomics).
36#[derive(Debug)]
37pub struct EngineRuntime {
38    telemetry: Arc<TelemetryCounters>,
39    coordinator: ExecutionCoordinator,
40    admin: AdminHandle,
41    writer: Arc<WriterActor>,
42    /// Background worker for `vector_projection_work`.  Held between
43    /// `writer` and `_rebuild` in drop order.
44    _vector_actor: VectorProjectionActor,
45    _rebuild: RebuildActor,
46    _lock: DatabaseLock,
47}
48
49// Required by #[pyclass(frozen)] — guards against future fields breaking thread safety.
50#[allow(clippy::used_underscore_items)]
51const _: () = {
52    fn _assert_send_sync<T: Send + Sync>() {}
53    fn _check() {
54        _assert_send_sync::<EngineRuntime>();
55    }
56};
57
58impl EngineRuntime {
59    /// # Errors
60    /// Returns [`EngineError`] if the database connection cannot be opened, schema bootstrap fails,
61    /// or the writer actor cannot be started.
62    pub fn open(
63        path: impl AsRef<Path>,
64        provenance_mode: ProvenanceMode,
65        vector_dimension: Option<usize>,
66        read_pool_size: usize,
67        telemetry_level: TelemetryLevel,
68        query_embedder: Option<Arc<dyn QueryEmbedder>>,
69    ) -> Result<Self, EngineError> {
70        let lock = DatabaseLock::acquire(path.as_ref())?;
71
72        if read_pool_size == 0 {
73            return Err(EngineError::InvalidConfig(
74                "read_pool_size must be >= 1, got 0".to_owned(),
75            ));
76        }
77
78        trace_info!(
79            path = %path.as_ref().display(),
80            provenance_mode = ?provenance_mode,
81            vector_dimension = ?vector_dimension,
82            read_pool_size,
83            telemetry_level = ?telemetry_level,
84            "engine opening"
85        );
86        let _ = telemetry_level; // Used by trace_info! when tracing feature is active
87        let telemetry = Arc::new(TelemetryCounters::default());
88        let schema_manager = Arc::new(SchemaManager::new());
89        let coordinator = ExecutionCoordinator::open(
90            path.as_ref(),
91            Arc::clone(&schema_manager),
92            vector_dimension,
93            read_pool_size,
94            Arc::clone(&telemetry),
95            query_embedder,
96        )?;
97        // Ensure the sqlite-vec auto-extension is registered globally BEFORE
98        // the writer thread opens its connection, so it can create/insert
99        // into `vec_<kind>` virtual tables for vector projection apply
100        // flows.  Registration is idempotent at the SQLite level.
101        #[cfg(feature = "sqlite-vec")]
102        {
103            let _prime = crate::sqlite::open_connection_with_vec(path.as_ref())?;
104        }
105
106        let writer = Arc::new(WriterActor::start(
107            path.as_ref(),
108            Arc::clone(&schema_manager),
109            provenance_mode,
110            Arc::clone(&telemetry),
111        )?);
112
113        // Crash recovery: PENDING is durable queued work and survives restart.
114        // BUILDING/SWAPPING were in-flight during a crash and are marked FAILED.
115        {
116            let recovery_conn = crate::sqlite::open_connection(path.as_ref())?;
117            recover_interrupted_rebuilds(&recovery_conn)?;
118        }
119
120        // Rebuild actor: create channel, start thread, pass sender to AdminService.
121        let (rebuild_sender, rebuild_receiver) = mpsc::sync_channel::<RebuildRequest>(64);
122        let rebuild_shutdown = Arc::new(AtomicBool::new(false));
123        let rebuild_actor = RebuildActor::start(
124            path.as_ref(),
125            Arc::clone(&schema_manager),
126            rebuild_receiver,
127            Arc::clone(&rebuild_shutdown),
128        )?;
129        let rebuild_client = RebuildClient::new(rebuild_sender, rebuild_shutdown);
130        let admin = AdminHandle::new(AdminService::new_with_engine(
131            path.as_ref(),
132            schema_manager,
133            rebuild_client,
134            Arc::clone(&writer),
135        ));
136        let vector_actor = VectorProjectionActor::start(writer.as_ref())?;
137
138        trace_info!(path = %path.as_ref().display(), "engine opened");
139        Ok(Self {
140            telemetry,
141            coordinator,
142            admin,
143            writer,
144            _vector_actor: vector_actor,
145            _rebuild: rebuild_actor,
146            _lock: lock,
147        })
148    }
149
150    pub fn coordinator(&self) -> &ExecutionCoordinator {
151        &self.coordinator
152    }
153
154    pub fn writer(&self) -> &WriterActor {
155        &self.writer
156    }
157
158    /// Cloneable shared handle to the writer actor.
159    #[must_use]
160    pub fn writer_arc(&self) -> Arc<WriterActor> {
161        Arc::clone(&self.writer)
162    }
163
164    pub fn admin(&self) -> &AdminHandle {
165        &self.admin
166    }
167
168    /// Shared telemetry counters for incrementing from the public API layer.
169    pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
170        &self.telemetry
171    }
172
173    /// Read all telemetry counters and aggregate `SQLite` cache status across
174    /// the reader pool.
175    #[must_use]
176    pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
177        let mut snapshot = self.telemetry.snapshot();
178        snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
179        snapshot
180    }
181}
182
183#[cfg(test)]
184#[allow(clippy::expect_used)]
185mod tests {
186    use std::sync::Arc;
187
188    use fathomdb_query::QueryBuilder;
189
190    use crate::{
191        ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
192    };
193
194    use super::EngineRuntime;
195
196    /// Issue #30: the engine must support concurrent reads from multiple threads.
197    #[test]
198    fn concurrent_reads_from_multiple_threads() {
199        let dir = tempfile::tempdir().expect("temp dir");
200        let runtime = Arc::new(
201            EngineRuntime::open(
202                dir.path().join("test.db"),
203                ProvenanceMode::Warn,
204                None,
205                4,
206                TelemetryLevel::Counters,
207                None,
208            )
209            .expect("open"),
210        );
211
212        runtime
213            .writer()
214            .submit(WriteRequest {
215                label: "seed".to_owned(),
216                nodes: vec![NodeInsert {
217                    row_id: "r1".to_owned(),
218                    logical_id: "t:1".to_owned(),
219                    kind: "Test".to_owned(),
220                    properties: r#"{"v":1}"#.to_owned(),
221                    source_ref: Some("test".to_owned()),
222                    upsert: false,
223                    chunk_policy: ChunkPolicy::Preserve,
224                    content_ref: None,
225                }],
226                node_retires: vec![],
227                edges: vec![],
228                edge_retires: vec![],
229                chunks: vec![ChunkInsert {
230                    id: "c1".to_owned(),
231                    node_logical_id: "t:1".to_owned(),
232                    text_content: "hello world".to_owned(),
233                    byte_start: None,
234                    byte_end: None,
235                    content_hash: None,
236                }],
237                runs: vec![],
238                steps: vec![],
239                actions: vec![],
240                optional_backfills: vec![],
241                vec_inserts: vec![],
242                operational_writes: vec![],
243            })
244            .expect("seed write");
245
246        let compiled = QueryBuilder::nodes("Test")
247            .limit(10)
248            .compile()
249            .expect("compile");
250
251        let handles: Vec<_> = (0..4)
252            .map(|_| {
253                let rt = Arc::clone(&runtime);
254                let q = compiled.clone();
255                std::thread::spawn(move || {
256                    let rows = rt
257                        .coordinator()
258                        .execute_compiled_read(&q)
259                        .expect("query succeeds");
260                    assert_eq!(rows.nodes.len(), 1);
261                })
262            })
263            .collect();
264
265        for h in handles {
266            h.join().expect("worker thread panicked");
267        }
268    }
269
270    #[test]
271    fn open_same_database_twice_returns_database_locked() {
272        let dir = tempfile::tempdir().expect("temp dir");
273        let db_path = dir.path().join("test.db");
274
275        let _first = EngineRuntime::open(
276            &db_path,
277            ProvenanceMode::Warn,
278            None,
279            4,
280            TelemetryLevel::Counters,
281            None,
282        )
283        .expect("open");
284        let second = EngineRuntime::open(
285            &db_path,
286            ProvenanceMode::Warn,
287            None,
288            4,
289            TelemetryLevel::Counters,
290            None,
291        );
292
293        assert!(second.is_err(), "second open must fail");
294        let err = second.expect_err("second open must fail");
295        assert!(
296            matches!(err, crate::EngineError::DatabaseLocked(_)),
297            "expected DatabaseLocked, got: {err:?}"
298        );
299        assert!(
300            err.to_string().contains("already in use"),
301            "error must mention 'already in use': {err}"
302        );
303    }
304
305    #[test]
306    fn database_reopens_after_drop() {
307        let dir = tempfile::tempdir().expect("temp dir");
308        let db_path = dir.path().join("test.db");
309
310        {
311            let _runtime = EngineRuntime::open(
312                &db_path,
313                ProvenanceMode::Warn,
314                None,
315                4,
316                TelemetryLevel::Counters,
317                None,
318            )
319            .expect("first open");
320        }
321
322        let runtime = EngineRuntime::open(
323            &db_path,
324            ProvenanceMode::Warn,
325            None,
326            4,
327            TelemetryLevel::Counters,
328            None,
329        )
330        .expect("reopen");
331        let compiled = QueryBuilder::nodes("Test")
332            .limit(10)
333            .compile()
334            .expect("compile");
335        let rows = runtime
336            .coordinator()
337            .execute_compiled_read(&compiled)
338            .expect("query");
339        assert!(rows.nodes.is_empty());
340    }
341
342    #[test]
343    fn lock_error_includes_pid() {
344        let dir = tempfile::tempdir().expect("temp dir");
345        let db_path = dir.path().join("test.db");
346
347        let _first = EngineRuntime::open(
348            &db_path,
349            ProvenanceMode::Warn,
350            None,
351            4,
352            TelemetryLevel::Counters,
353            None,
354        )
355        .expect("open");
356        let err = EngineRuntime::open(
357            &db_path,
358            ProvenanceMode::Warn,
359            None,
360            4,
361            TelemetryLevel::Counters,
362            None,
363        )
364        .expect_err("second open must fail");
365
366        let msg = err.to_string();
367        assert!(
368            msg.contains("already in use"),
369            "error must mention 'already in use': {msg}"
370        );
371        // PID is best-effort; on Windows exclusive locks prevent reading the
372        // lock file from a second handle.
373        if cfg!(unix) {
374            let our_pid = std::process::id().to_string();
375            assert!(
376                msg.contains(&our_pid),
377                "error must contain holder pid {our_pid}: {msg}"
378            );
379        }
380    }
381
382    /// Verify that dropping `EngineRuntime` joins the writer thread and triggers
383    /// `SQLite`'s automatic passive WAL checkpoint (readers drop before writer).
384    #[test]
385    fn drop_joins_writer_and_checkpoints_wal() {
386        let dir = tempfile::tempdir().expect("temp dir");
387        let db_path = dir.path().join("test.db");
388        let wal_path = dir.path().join("test.db-wal");
389
390        {
391            let runtime = EngineRuntime::open(
392                &db_path,
393                ProvenanceMode::Warn,
394                None,
395                4,
396                TelemetryLevel::Counters,
397                None,
398            )
399            .expect("open");
400
401            runtime
402                .writer()
403                .submit(WriteRequest {
404                    label: "seed".to_owned(),
405                    nodes: vec![NodeInsert {
406                        row_id: "r1".to_owned(),
407                        logical_id: "t:1".to_owned(),
408                        kind: "Test".to_owned(),
409                        properties: r#"{"v":1}"#.to_owned(),
410                        source_ref: Some("test".to_owned()),
411                        upsert: false,
412                        chunk_policy: ChunkPolicy::Preserve,
413                        content_ref: None,
414                    }],
415                    node_retires: vec![],
416                    edges: vec![],
417                    edge_retires: vec![],
418                    chunks: vec![],
419                    runs: vec![],
420                    steps: vec![],
421                    actions: vec![],
422                    optional_backfills: vec![],
423                    vec_inserts: vec![],
424                    operational_writes: vec![],
425                })
426                .expect("seed write");
427        }
428        // After drop: WAL should be checkpointed and removed. SQLite removes
429        // the -wal file lazily on some platforms (macos in particular); poll
430        // briefly to absorb the delay.
431        let wal_cleaned = (0..20).any(|_| {
432            if !wal_path.exists() {
433                return true;
434            }
435            std::thread::sleep(std::time::Duration::from_millis(100));
436            false
437        });
438        assert!(
439            wal_cleaned,
440            "WAL file should be cleaned up after graceful drop"
441        );
442
443        // Reopen and verify data persists.
444        let runtime = EngineRuntime::open(
445            &db_path,
446            ProvenanceMode::Warn,
447            None,
448            4,
449            TelemetryLevel::Counters,
450            None,
451        )
452        .expect("reopen");
453        let compiled = QueryBuilder::nodes("Test")
454            .limit(10)
455            .compile()
456            .expect("compile");
457        let rows = runtime
458            .coordinator()
459            .execute_compiled_read(&compiled)
460            .expect("query");
461        assert_eq!(rows.nodes.len(), 1);
462    }
463
464    /// Helper: create a seeded runtime with one node and one chunk.
465    fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
466        let dir = tempfile::tempdir().expect("temp dir");
467        let runtime = EngineRuntime::open(
468            dir.path().join("test.db"),
469            ProvenanceMode::Warn,
470            None,
471            4,
472            TelemetryLevel::Counters,
473            None,
474        )
475        .expect("open");
476
477        runtime
478            .writer()
479            .submit(WriteRequest {
480                label: "seed".to_owned(),
481                nodes: vec![NodeInsert {
482                    row_id: "r1".to_owned(),
483                    logical_id: "t:1".to_owned(),
484                    kind: "Test".to_owned(),
485                    properties: r#"{"v":1}"#.to_owned(),
486                    source_ref: Some("test".to_owned()),
487                    upsert: false,
488                    chunk_policy: ChunkPolicy::Preserve,
489                    content_ref: None,
490                }],
491                node_retires: vec![],
492                edges: vec![],
493                edge_retires: vec![],
494                chunks: vec![ChunkInsert {
495                    id: "c1".to_owned(),
496                    node_logical_id: "t:1".to_owned(),
497                    text_content: "hello world".to_owned(),
498                    byte_start: None,
499                    byte_end: None,
500                    content_hash: None,
501                }],
502                runs: vec![],
503                steps: vec![],
504                actions: vec![],
505                optional_backfills: vec![],
506                vec_inserts: vec![],
507                operational_writes: vec![],
508            })
509            .expect("seed write");
510
511        (dir, runtime)
512    }
513
514    #[test]
515    fn telemetry_counts_queries() {
516        let (_dir, runtime) = seeded_runtime();
517        let compiled = QueryBuilder::nodes("Test")
518            .limit(10)
519            .compile()
520            .expect("compile");
521
522        for _ in 0..3 {
523            runtime
524                .coordinator()
525                .execute_compiled_read(&compiled)
526                .expect("query");
527        }
528
529        let snap = runtime.telemetry_snapshot();
530        assert!(
531            snap.queries_total >= 3,
532            "expected at least 3 queries, got {}",
533            snap.queries_total,
534        );
535    }
536
537    #[test]
538    fn telemetry_counts_writes() {
539        let (_dir, runtime) = seeded_runtime();
540
541        // seeded_runtime already submitted 1 write
542        runtime
543            .writer()
544            .submit(WriteRequest {
545                label: "second".to_owned(),
546                nodes: vec![NodeInsert {
547                    row_id: "r2".to_owned(),
548                    logical_id: "t:2".to_owned(),
549                    kind: "Test".to_owned(),
550                    properties: r#"{"v":2}"#.to_owned(),
551                    source_ref: Some("test".to_owned()),
552                    upsert: false,
553                    chunk_policy: ChunkPolicy::Preserve,
554                    content_ref: None,
555                }],
556                node_retires: vec![],
557                edges: vec![],
558                edge_retires: vec![],
559                chunks: vec![],
560                runs: vec![],
561                steps: vec![],
562                actions: vec![],
563                optional_backfills: vec![],
564                vec_inserts: vec![],
565                operational_writes: vec![],
566            })
567            .expect("second write");
568
569        let snap = runtime.telemetry_snapshot();
570        assert!(
571            snap.writes_total >= 2,
572            "expected at least 2 writes, got {}",
573            snap.writes_total,
574        );
575    }
576
577    #[test]
578    fn telemetry_counts_write_rows() {
579        let (_dir, runtime) = seeded_runtime();
580        // The seed write has 1 node + 1 chunk = 2 rows
581        let snap = runtime.telemetry_snapshot();
582        assert!(
583            snap.write_rows_total >= 2,
584            "expected at least 2 write rows, got {}",
585            snap.write_rows_total,
586        );
587    }
588
589    #[test]
590    fn telemetry_snapshot_includes_cache_status() {
591        let (_dir, runtime) = seeded_runtime();
592        let compiled = QueryBuilder::nodes("Test")
593            .limit(10)
594            .compile()
595            .expect("compile");
596
597        // Run several queries to exercise the page cache.
598        for _ in 0..5 {
599            runtime
600                .coordinator()
601                .execute_compiled_read(&compiled)
602                .expect("query");
603        }
604
605        let snap = runtime.telemetry_snapshot();
606        assert!(
607            snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
608            "expected cache activity, got hits={} misses={}",
609            snap.sqlite_cache.cache_hits,
610            snap.sqlite_cache.cache_misses,
611        );
612    }
613}