Skip to main content

fathomdb_engine/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::sync::mpsc;
4
5use fathomdb_schema::SchemaManager;
6
7use crate::{
8    AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, QueryEmbedder,
9    WriterActor,
10    database_lock::DatabaseLock,
11    rebuild_actor::{RebuildActor, RebuildRequest, recover_interrupted_rebuilds},
12    telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
13};
14
15/// Core engine runtime.
16///
17/// # Drop order invariant
18///
19/// Fields are ordered so that `coordinator` (reader connections) drops before
20/// `writer` (writer thread + connection).  This ensures the writer's
21/// `sqlite3_close()` is the last connection to the database, which triggers
22/// `SQLite`'s automatic passive WAL checkpoint and WAL/shm file cleanup.
23/// `_rebuild` drops before `_lock` so the rebuild thread's connection closes
24/// before the exclusive file lock is released.
25/// `_lock` drops last so the exclusive file lock is released only after all
26/// connections are closed.  Do not reorder these fields.
27///
28/// `telemetry` holds shared counters and has no drop-order concern (atomics).
29#[derive(Debug)]
30pub struct EngineRuntime {
31    telemetry: Arc<TelemetryCounters>,
32    coordinator: ExecutionCoordinator,
33    writer: WriterActor,
34    admin: AdminHandle,
35    /// Sender side of the rebuild channel.  Dropped before `_rebuild` so the
36    /// rebuild thread's loop exits before we join it.
37    _rebuild_sender: mpsc::SyncSender<RebuildRequest>,
38    _rebuild: RebuildActor,
39    _lock: DatabaseLock,
40}
41
42// Required by #[pyclass(frozen)] — guards against future fields breaking thread safety.
43#[allow(clippy::used_underscore_items)]
44const _: () = {
45    fn _assert_send_sync<T: Send + Sync>() {}
46    fn _check() {
47        _assert_send_sync::<EngineRuntime>();
48    }
49};
50
51impl EngineRuntime {
52    /// # Errors
53    /// Returns [`EngineError`] if the database connection cannot be opened, schema bootstrap fails,
54    /// or the writer actor cannot be started.
55    pub fn open(
56        path: impl AsRef<Path>,
57        provenance_mode: ProvenanceMode,
58        vector_dimension: Option<usize>,
59        read_pool_size: usize,
60        telemetry_level: TelemetryLevel,
61        query_embedder: Option<Arc<dyn QueryEmbedder>>,
62    ) -> Result<Self, EngineError> {
63        let lock = DatabaseLock::acquire(path.as_ref())?;
64
65        if read_pool_size == 0 {
66            return Err(EngineError::InvalidConfig(
67                "read_pool_size must be >= 1, got 0".to_owned(),
68            ));
69        }
70
71        trace_info!(
72            path = %path.as_ref().display(),
73            provenance_mode = ?provenance_mode,
74            vector_dimension = ?vector_dimension,
75            read_pool_size,
76            telemetry_level = ?telemetry_level,
77            "engine opening"
78        );
79        let _ = telemetry_level; // Used by trace_info! when tracing feature is active
80        let telemetry = Arc::new(TelemetryCounters::default());
81        let schema_manager = Arc::new(SchemaManager::new());
82        let coordinator = ExecutionCoordinator::open(
83            path.as_ref(),
84            Arc::clone(&schema_manager),
85            vector_dimension,
86            read_pool_size,
87            Arc::clone(&telemetry),
88            query_embedder,
89        )?;
90        let writer = WriterActor::start(
91            path.as_ref(),
92            Arc::clone(&schema_manager),
93            provenance_mode,
94            Arc::clone(&telemetry),
95        )?;
96
97        // Crash recovery: mark any interrupted rebuilds (PENDING/BUILDING/SWAPPING)
98        // as FAILED and clean up their staging rows.  Must run before the
99        // RebuildActor starts so the actor never sees stale non-terminal state.
100        {
101            let recovery_conn = crate::sqlite::open_connection(path.as_ref())?;
102            recover_interrupted_rebuilds(&recovery_conn)?;
103        }
104
105        // Rebuild actor: create channel, start thread, pass sender to AdminService.
106        let (rebuild_sender, rebuild_receiver) = mpsc::sync_channel::<RebuildRequest>(64);
107        let rebuild_actor =
108            RebuildActor::start(path.as_ref(), Arc::clone(&schema_manager), rebuild_receiver)?;
109        let admin = AdminHandle::new(AdminService::new_with_rebuild(
110            path.as_ref(),
111            schema_manager,
112            rebuild_sender.clone(),
113        ));
114
115        trace_info!(path = %path.as_ref().display(), "engine opened");
116        Ok(Self {
117            telemetry,
118            coordinator,
119            writer,
120            admin,
121            _rebuild_sender: rebuild_sender,
122            _rebuild: rebuild_actor,
123            _lock: lock,
124        })
125    }
126
127    pub fn coordinator(&self) -> &ExecutionCoordinator {
128        &self.coordinator
129    }
130
131    pub fn writer(&self) -> &WriterActor {
132        &self.writer
133    }
134
135    pub fn admin(&self) -> &AdminHandle {
136        &self.admin
137    }
138
139    /// Shared telemetry counters for incrementing from the public API layer.
140    pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
141        &self.telemetry
142    }
143
144    /// Read all telemetry counters and aggregate `SQLite` cache status across
145    /// the reader pool.
146    #[must_use]
147    pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
148        let mut snapshot = self.telemetry.snapshot();
149        snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
150        snapshot
151    }
152}
153
154#[cfg(test)]
155#[allow(clippy::expect_used)]
156mod tests {
157    use std::sync::Arc;
158
159    use fathomdb_query::QueryBuilder;
160
161    use crate::{
162        ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
163    };
164
165    use super::EngineRuntime;
166
167    /// Issue #30: the engine must support concurrent reads from multiple threads.
168    #[test]
169    fn concurrent_reads_from_multiple_threads() {
170        let dir = tempfile::tempdir().expect("temp dir");
171        let runtime = Arc::new(
172            EngineRuntime::open(
173                dir.path().join("test.db"),
174                ProvenanceMode::Warn,
175                None,
176                4,
177                TelemetryLevel::Counters,
178                None,
179            )
180            .expect("open"),
181        );
182
183        runtime
184            .writer()
185            .submit(WriteRequest {
186                label: "seed".to_owned(),
187                nodes: vec![NodeInsert {
188                    row_id: "r1".to_owned(),
189                    logical_id: "t:1".to_owned(),
190                    kind: "Test".to_owned(),
191                    properties: r#"{"v":1}"#.to_owned(),
192                    source_ref: Some("test".to_owned()),
193                    upsert: false,
194                    chunk_policy: ChunkPolicy::Preserve,
195                    content_ref: None,
196                }],
197                node_retires: vec![],
198                edges: vec![],
199                edge_retires: vec![],
200                chunks: vec![ChunkInsert {
201                    id: "c1".to_owned(),
202                    node_logical_id: "t:1".to_owned(),
203                    text_content: "hello world".to_owned(),
204                    byte_start: None,
205                    byte_end: None,
206                    content_hash: None,
207                }],
208                runs: vec![],
209                steps: vec![],
210                actions: vec![],
211                optional_backfills: vec![],
212                vec_inserts: vec![],
213                operational_writes: vec![],
214            })
215            .expect("seed write");
216
217        let compiled = QueryBuilder::nodes("Test")
218            .limit(10)
219            .compile()
220            .expect("compile");
221
222        let handles: Vec<_> = (0..4)
223            .map(|_| {
224                let rt = Arc::clone(&runtime);
225                let q = compiled.clone();
226                std::thread::spawn(move || {
227                    let rows = rt
228                        .coordinator()
229                        .execute_compiled_read(&q)
230                        .expect("query succeeds");
231                    assert_eq!(rows.nodes.len(), 1);
232                })
233            })
234            .collect();
235
236        for h in handles {
237            h.join().expect("worker thread panicked");
238        }
239    }
240
241    #[test]
242    fn open_same_database_twice_returns_database_locked() {
243        let dir = tempfile::tempdir().expect("temp dir");
244        let db_path = dir.path().join("test.db");
245
246        let _first = EngineRuntime::open(
247            &db_path,
248            ProvenanceMode::Warn,
249            None,
250            4,
251            TelemetryLevel::Counters,
252            None,
253        )
254        .expect("open");
255        let second = EngineRuntime::open(
256            &db_path,
257            ProvenanceMode::Warn,
258            None,
259            4,
260            TelemetryLevel::Counters,
261            None,
262        );
263
264        assert!(second.is_err(), "second open must fail");
265        let err = second.expect_err("second open must fail");
266        assert!(
267            matches!(err, crate::EngineError::DatabaseLocked(_)),
268            "expected DatabaseLocked, got: {err:?}"
269        );
270        assert!(
271            err.to_string().contains("already in use"),
272            "error must mention 'already in use': {err}"
273        );
274    }
275
276    #[test]
277    fn database_reopens_after_drop() {
278        let dir = tempfile::tempdir().expect("temp dir");
279        let db_path = dir.path().join("test.db");
280
281        {
282            let _runtime = EngineRuntime::open(
283                &db_path,
284                ProvenanceMode::Warn,
285                None,
286                4,
287                TelemetryLevel::Counters,
288                None,
289            )
290            .expect("first open");
291        }
292
293        let runtime = EngineRuntime::open(
294            &db_path,
295            ProvenanceMode::Warn,
296            None,
297            4,
298            TelemetryLevel::Counters,
299            None,
300        )
301        .expect("reopen");
302        let compiled = QueryBuilder::nodes("Test")
303            .limit(10)
304            .compile()
305            .expect("compile");
306        let rows = runtime
307            .coordinator()
308            .execute_compiled_read(&compiled)
309            .expect("query");
310        assert!(rows.nodes.is_empty());
311    }
312
313    #[test]
314    fn lock_error_includes_pid() {
315        let dir = tempfile::tempdir().expect("temp dir");
316        let db_path = dir.path().join("test.db");
317
318        let _first = EngineRuntime::open(
319            &db_path,
320            ProvenanceMode::Warn,
321            None,
322            4,
323            TelemetryLevel::Counters,
324            None,
325        )
326        .expect("open");
327        let err = EngineRuntime::open(
328            &db_path,
329            ProvenanceMode::Warn,
330            None,
331            4,
332            TelemetryLevel::Counters,
333            None,
334        )
335        .expect_err("second open must fail");
336
337        let msg = err.to_string();
338        assert!(
339            msg.contains("already in use"),
340            "error must mention 'already in use': {msg}"
341        );
342        // PID is best-effort; on Windows exclusive locks prevent reading the
343        // lock file from a second handle.
344        if cfg!(unix) {
345            let our_pid = std::process::id().to_string();
346            assert!(
347                msg.contains(&our_pid),
348                "error must contain holder pid {our_pid}: {msg}"
349            );
350        }
351    }
352
353    /// Verify that dropping `EngineRuntime` joins the writer thread and triggers
354    /// `SQLite`'s automatic passive WAL checkpoint (readers drop before writer).
355    #[test]
356    fn drop_joins_writer_and_checkpoints_wal() {
357        let dir = tempfile::tempdir().expect("temp dir");
358        let db_path = dir.path().join("test.db");
359        let wal_path = dir.path().join("test.db-wal");
360
361        {
362            let runtime = EngineRuntime::open(
363                &db_path,
364                ProvenanceMode::Warn,
365                None,
366                4,
367                TelemetryLevel::Counters,
368                None,
369            )
370            .expect("open");
371
372            runtime
373                .writer()
374                .submit(WriteRequest {
375                    label: "seed".to_owned(),
376                    nodes: vec![NodeInsert {
377                        row_id: "r1".to_owned(),
378                        logical_id: "t:1".to_owned(),
379                        kind: "Test".to_owned(),
380                        properties: r#"{"v":1}"#.to_owned(),
381                        source_ref: Some("test".to_owned()),
382                        upsert: false,
383                        chunk_policy: ChunkPolicy::Preserve,
384                        content_ref: None,
385                    }],
386                    node_retires: vec![],
387                    edges: vec![],
388                    edge_retires: vec![],
389                    chunks: vec![],
390                    runs: vec![],
391                    steps: vec![],
392                    actions: vec![],
393                    optional_backfills: vec![],
394                    vec_inserts: vec![],
395                    operational_writes: vec![],
396                })
397                .expect("seed write");
398        }
399        // After drop: WAL should be checkpointed and removed.
400        assert!(
401            !wal_path.exists(),
402            "WAL file should be cleaned up after graceful drop"
403        );
404
405        // Reopen and verify data persists.
406        let runtime = EngineRuntime::open(
407            &db_path,
408            ProvenanceMode::Warn,
409            None,
410            4,
411            TelemetryLevel::Counters,
412            None,
413        )
414        .expect("reopen");
415        let compiled = QueryBuilder::nodes("Test")
416            .limit(10)
417            .compile()
418            .expect("compile");
419        let rows = runtime
420            .coordinator()
421            .execute_compiled_read(&compiled)
422            .expect("query");
423        assert_eq!(rows.nodes.len(), 1);
424    }
425
426    /// Helper: create a seeded runtime with one node and one chunk.
427    fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
428        let dir = tempfile::tempdir().expect("temp dir");
429        let runtime = EngineRuntime::open(
430            dir.path().join("test.db"),
431            ProvenanceMode::Warn,
432            None,
433            4,
434            TelemetryLevel::Counters,
435            None,
436        )
437        .expect("open");
438
439        runtime
440            .writer()
441            .submit(WriteRequest {
442                label: "seed".to_owned(),
443                nodes: vec![NodeInsert {
444                    row_id: "r1".to_owned(),
445                    logical_id: "t:1".to_owned(),
446                    kind: "Test".to_owned(),
447                    properties: r#"{"v":1}"#.to_owned(),
448                    source_ref: Some("test".to_owned()),
449                    upsert: false,
450                    chunk_policy: ChunkPolicy::Preserve,
451                    content_ref: None,
452                }],
453                node_retires: vec![],
454                edges: vec![],
455                edge_retires: vec![],
456                chunks: vec![ChunkInsert {
457                    id: "c1".to_owned(),
458                    node_logical_id: "t:1".to_owned(),
459                    text_content: "hello world".to_owned(),
460                    byte_start: None,
461                    byte_end: None,
462                    content_hash: None,
463                }],
464                runs: vec![],
465                steps: vec![],
466                actions: vec![],
467                optional_backfills: vec![],
468                vec_inserts: vec![],
469                operational_writes: vec![],
470            })
471            .expect("seed write");
472
473        (dir, runtime)
474    }
475
476    #[test]
477    fn telemetry_counts_queries() {
478        let (_dir, runtime) = seeded_runtime();
479        let compiled = QueryBuilder::nodes("Test")
480            .limit(10)
481            .compile()
482            .expect("compile");
483
484        for _ in 0..3 {
485            runtime
486                .coordinator()
487                .execute_compiled_read(&compiled)
488                .expect("query");
489        }
490
491        let snap = runtime.telemetry_snapshot();
492        assert!(
493            snap.queries_total >= 3,
494            "expected at least 3 queries, got {}",
495            snap.queries_total,
496        );
497    }
498
499    #[test]
500    fn telemetry_counts_writes() {
501        let (_dir, runtime) = seeded_runtime();
502
503        // seeded_runtime already submitted 1 write
504        runtime
505            .writer()
506            .submit(WriteRequest {
507                label: "second".to_owned(),
508                nodes: vec![NodeInsert {
509                    row_id: "r2".to_owned(),
510                    logical_id: "t:2".to_owned(),
511                    kind: "Test".to_owned(),
512                    properties: r#"{"v":2}"#.to_owned(),
513                    source_ref: Some("test".to_owned()),
514                    upsert: false,
515                    chunk_policy: ChunkPolicy::Preserve,
516                    content_ref: None,
517                }],
518                node_retires: vec![],
519                edges: vec![],
520                edge_retires: vec![],
521                chunks: vec![],
522                runs: vec![],
523                steps: vec![],
524                actions: vec![],
525                optional_backfills: vec![],
526                vec_inserts: vec![],
527                operational_writes: vec![],
528            })
529            .expect("second write");
530
531        let snap = runtime.telemetry_snapshot();
532        assert!(
533            snap.writes_total >= 2,
534            "expected at least 2 writes, got {}",
535            snap.writes_total,
536        );
537    }
538
539    #[test]
540    fn telemetry_counts_write_rows() {
541        let (_dir, runtime) = seeded_runtime();
542        // The seed write has 1 node + 1 chunk = 2 rows
543        let snap = runtime.telemetry_snapshot();
544        assert!(
545            snap.write_rows_total >= 2,
546            "expected at least 2 write rows, got {}",
547            snap.write_rows_total,
548        );
549    }
550
551    #[test]
552    fn telemetry_snapshot_includes_cache_status() {
553        let (_dir, runtime) = seeded_runtime();
554        let compiled = QueryBuilder::nodes("Test")
555            .limit(10)
556            .compile()
557            .expect("compile");
558
559        // Run several queries to exercise the page cache.
560        for _ in 0..5 {
561            runtime
562                .coordinator()
563                .execute_compiled_read(&compiled)
564                .expect("query");
565        }
566
567        let snap = runtime.telemetry_snapshot();
568        assert!(
569            snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
570            "expected cache activity, got hits={} misses={}",
571            snap.sqlite_cache.cache_hits,
572            snap.sqlite_cache.cache_misses,
573        );
574    }
575}