1use std::path::Path;
2use std::sync::Arc;
3use std::sync::atomic::AtomicBool;
4use std::sync::mpsc;
5
6use fathomdb_schema::SchemaManager;
7
8use crate::{
9 AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, QueryEmbedder,
10 VectorProjectionActor, WriterActor,
11 database_lock::DatabaseLock,
12 rebuild_actor::{RebuildActor, RebuildClient, RebuildRequest, recover_interrupted_rebuilds},
13 telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
14};
15
16#[derive(Debug)]
37pub struct EngineRuntime {
38 telemetry: Arc<TelemetryCounters>,
39 coordinator: ExecutionCoordinator,
40 admin: AdminHandle,
41 writer: Arc<WriterActor>,
42 _vector_actor: VectorProjectionActor,
45 _rebuild: RebuildActor,
46 _lock: DatabaseLock,
47}
48
49#[allow(clippy::used_underscore_items)]
51const _: () = {
52 fn _assert_send_sync<T: Send + Sync>() {}
53 fn _check() {
54 _assert_send_sync::<EngineRuntime>();
55 }
56};
57
58impl EngineRuntime {
59 pub fn open(
63 path: impl AsRef<Path>,
64 provenance_mode: ProvenanceMode,
65 vector_dimension: Option<usize>,
66 read_pool_size: usize,
67 telemetry_level: TelemetryLevel,
68 query_embedder: Option<Arc<dyn QueryEmbedder>>,
69 ) -> Result<Self, EngineError> {
70 let lock = DatabaseLock::acquire(path.as_ref())?;
71
72 if read_pool_size == 0 {
73 return Err(EngineError::InvalidConfig(
74 "read_pool_size must be >= 1, got 0".to_owned(),
75 ));
76 }
77
78 trace_info!(
79 path = %path.as_ref().display(),
80 provenance_mode = ?provenance_mode,
81 vector_dimension = ?vector_dimension,
82 read_pool_size,
83 telemetry_level = ?telemetry_level,
84 "engine opening"
85 );
86 let _ = telemetry_level; let telemetry = Arc::new(TelemetryCounters::default());
88 let schema_manager = Arc::new(SchemaManager::new());
89 let coordinator = ExecutionCoordinator::open(
90 path.as_ref(),
91 Arc::clone(&schema_manager),
92 vector_dimension,
93 read_pool_size,
94 Arc::clone(&telemetry),
95 query_embedder,
96 )?;
97 #[cfg(feature = "sqlite-vec")]
102 {
103 let _prime = crate::sqlite::open_connection_with_vec(path.as_ref())?;
104 }
105
106 let writer = Arc::new(WriterActor::start(
107 path.as_ref(),
108 Arc::clone(&schema_manager),
109 provenance_mode,
110 Arc::clone(&telemetry),
111 )?);
112
113 {
116 let recovery_conn = crate::sqlite::open_connection(path.as_ref())?;
117 recover_interrupted_rebuilds(&recovery_conn)?;
118 }
119
120 let (rebuild_sender, rebuild_receiver) = mpsc::sync_channel::<RebuildRequest>(64);
122 let rebuild_shutdown = Arc::new(AtomicBool::new(false));
123 let rebuild_actor = RebuildActor::start(
124 path.as_ref(),
125 Arc::clone(&schema_manager),
126 rebuild_receiver,
127 Arc::clone(&rebuild_shutdown),
128 )?;
129 let rebuild_client = RebuildClient::new(rebuild_sender, rebuild_shutdown);
130 let admin = AdminHandle::new(AdminService::new_with_engine(
131 path.as_ref(),
132 schema_manager,
133 rebuild_client,
134 Arc::clone(&writer),
135 ));
136 let vector_actor = VectorProjectionActor::start(writer.as_ref())?;
137
138 trace_info!(path = %path.as_ref().display(), "engine opened");
139 Ok(Self {
140 telemetry,
141 coordinator,
142 admin,
143 writer,
144 _vector_actor: vector_actor,
145 _rebuild: rebuild_actor,
146 _lock: lock,
147 })
148 }
149
150 pub fn coordinator(&self) -> &ExecutionCoordinator {
151 &self.coordinator
152 }
153
154 pub fn writer(&self) -> &WriterActor {
155 &self.writer
156 }
157
158 #[must_use]
160 pub fn writer_arc(&self) -> Arc<WriterActor> {
161 Arc::clone(&self.writer)
162 }
163
164 pub fn admin(&self) -> &AdminHandle {
165 &self.admin
166 }
167
168 pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
170 &self.telemetry
171 }
172
173 #[must_use]
176 pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
177 let mut snapshot = self.telemetry.snapshot();
178 snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
179 snapshot
180 }
181}
182
183#[cfg(test)]
184#[allow(clippy::expect_used)]
185mod tests {
186 use std::sync::Arc;
187
188 use fathomdb_query::QueryBuilder;
189
190 use crate::{
191 ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
192 };
193
194 use super::EngineRuntime;
195
196 #[test]
198 fn concurrent_reads_from_multiple_threads() {
199 let dir = tempfile::tempdir().expect("temp dir");
200 let runtime = Arc::new(
201 EngineRuntime::open(
202 dir.path().join("test.db"),
203 ProvenanceMode::Warn,
204 None,
205 4,
206 TelemetryLevel::Counters,
207 None,
208 )
209 .expect("open"),
210 );
211
212 runtime
213 .writer()
214 .submit(WriteRequest {
215 label: "seed".to_owned(),
216 nodes: vec![NodeInsert {
217 row_id: "r1".to_owned(),
218 logical_id: "t:1".to_owned(),
219 kind: "Test".to_owned(),
220 properties: r#"{"v":1}"#.to_owned(),
221 source_ref: Some("test".to_owned()),
222 upsert: false,
223 chunk_policy: ChunkPolicy::Preserve,
224 content_ref: None,
225 }],
226 node_retires: vec![],
227 edges: vec![],
228 edge_retires: vec![],
229 chunks: vec![ChunkInsert {
230 id: "c1".to_owned(),
231 node_logical_id: "t:1".to_owned(),
232 text_content: "hello world".to_owned(),
233 byte_start: None,
234 byte_end: None,
235 content_hash: None,
236 }],
237 runs: vec![],
238 steps: vec![],
239 actions: vec![],
240 optional_backfills: vec![],
241 vec_inserts: vec![],
242 operational_writes: vec![],
243 })
244 .expect("seed write");
245
246 let compiled = QueryBuilder::nodes("Test")
247 .limit(10)
248 .compile()
249 .expect("compile");
250
251 let handles: Vec<_> = (0..4)
252 .map(|_| {
253 let rt = Arc::clone(&runtime);
254 let q = compiled.clone();
255 std::thread::spawn(move || {
256 let rows = rt
257 .coordinator()
258 .execute_compiled_read(&q)
259 .expect("query succeeds");
260 assert_eq!(rows.nodes.len(), 1);
261 })
262 })
263 .collect();
264
265 for h in handles {
266 h.join().expect("worker thread panicked");
267 }
268 }
269
270 #[test]
271 fn open_same_database_twice_returns_database_locked() {
272 let dir = tempfile::tempdir().expect("temp dir");
273 let db_path = dir.path().join("test.db");
274
275 let _first = EngineRuntime::open(
276 &db_path,
277 ProvenanceMode::Warn,
278 None,
279 4,
280 TelemetryLevel::Counters,
281 None,
282 )
283 .expect("open");
284 let second = EngineRuntime::open(
285 &db_path,
286 ProvenanceMode::Warn,
287 None,
288 4,
289 TelemetryLevel::Counters,
290 None,
291 );
292
293 assert!(second.is_err(), "second open must fail");
294 let err = second.expect_err("second open must fail");
295 assert!(
296 matches!(err, crate::EngineError::DatabaseLocked(_)),
297 "expected DatabaseLocked, got: {err:?}"
298 );
299 assert!(
300 err.to_string().contains("already in use"),
301 "error must mention 'already in use': {err}"
302 );
303 }
304
305 #[test]
306 fn database_reopens_after_drop() {
307 let dir = tempfile::tempdir().expect("temp dir");
308 let db_path = dir.path().join("test.db");
309
310 {
311 let _runtime = EngineRuntime::open(
312 &db_path,
313 ProvenanceMode::Warn,
314 None,
315 4,
316 TelemetryLevel::Counters,
317 None,
318 )
319 .expect("first open");
320 }
321
322 let runtime = EngineRuntime::open(
323 &db_path,
324 ProvenanceMode::Warn,
325 None,
326 4,
327 TelemetryLevel::Counters,
328 None,
329 )
330 .expect("reopen");
331 let compiled = QueryBuilder::nodes("Test")
332 .limit(10)
333 .compile()
334 .expect("compile");
335 let rows = runtime
336 .coordinator()
337 .execute_compiled_read(&compiled)
338 .expect("query");
339 assert!(rows.nodes.is_empty());
340 }
341
342 #[test]
343 fn lock_error_includes_pid() {
344 let dir = tempfile::tempdir().expect("temp dir");
345 let db_path = dir.path().join("test.db");
346
347 let _first = EngineRuntime::open(
348 &db_path,
349 ProvenanceMode::Warn,
350 None,
351 4,
352 TelemetryLevel::Counters,
353 None,
354 )
355 .expect("open");
356 let err = EngineRuntime::open(
357 &db_path,
358 ProvenanceMode::Warn,
359 None,
360 4,
361 TelemetryLevel::Counters,
362 None,
363 )
364 .expect_err("second open must fail");
365
366 let msg = err.to_string();
367 assert!(
368 msg.contains("already in use"),
369 "error must mention 'already in use': {msg}"
370 );
371 if cfg!(unix) {
374 let our_pid = std::process::id().to_string();
375 assert!(
376 msg.contains(&our_pid),
377 "error must contain holder pid {our_pid}: {msg}"
378 );
379 }
380 }
381
382 #[test]
385 fn drop_joins_writer_and_checkpoints_wal() {
386 let dir = tempfile::tempdir().expect("temp dir");
387 let db_path = dir.path().join("test.db");
388 let wal_path = dir.path().join("test.db-wal");
389
390 {
391 let runtime = EngineRuntime::open(
392 &db_path,
393 ProvenanceMode::Warn,
394 None,
395 4,
396 TelemetryLevel::Counters,
397 None,
398 )
399 .expect("open");
400
401 runtime
402 .writer()
403 .submit(WriteRequest {
404 label: "seed".to_owned(),
405 nodes: vec![NodeInsert {
406 row_id: "r1".to_owned(),
407 logical_id: "t:1".to_owned(),
408 kind: "Test".to_owned(),
409 properties: r#"{"v":1}"#.to_owned(),
410 source_ref: Some("test".to_owned()),
411 upsert: false,
412 chunk_policy: ChunkPolicy::Preserve,
413 content_ref: None,
414 }],
415 node_retires: vec![],
416 edges: vec![],
417 edge_retires: vec![],
418 chunks: vec![],
419 runs: vec![],
420 steps: vec![],
421 actions: vec![],
422 optional_backfills: vec![],
423 vec_inserts: vec![],
424 operational_writes: vec![],
425 })
426 .expect("seed write");
427 }
428 let wal_cleaned = (0..20).any(|_| {
432 if !wal_path.exists() {
433 return true;
434 }
435 std::thread::sleep(std::time::Duration::from_millis(100));
436 false
437 });
438 assert!(
439 wal_cleaned,
440 "WAL file should be cleaned up after graceful drop"
441 );
442
443 let runtime = EngineRuntime::open(
445 &db_path,
446 ProvenanceMode::Warn,
447 None,
448 4,
449 TelemetryLevel::Counters,
450 None,
451 )
452 .expect("reopen");
453 let compiled = QueryBuilder::nodes("Test")
454 .limit(10)
455 .compile()
456 .expect("compile");
457 let rows = runtime
458 .coordinator()
459 .execute_compiled_read(&compiled)
460 .expect("query");
461 assert_eq!(rows.nodes.len(), 1);
462 }
463
464 fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
466 let dir = tempfile::tempdir().expect("temp dir");
467 let runtime = EngineRuntime::open(
468 dir.path().join("test.db"),
469 ProvenanceMode::Warn,
470 None,
471 4,
472 TelemetryLevel::Counters,
473 None,
474 )
475 .expect("open");
476
477 runtime
478 .writer()
479 .submit(WriteRequest {
480 label: "seed".to_owned(),
481 nodes: vec![NodeInsert {
482 row_id: "r1".to_owned(),
483 logical_id: "t:1".to_owned(),
484 kind: "Test".to_owned(),
485 properties: r#"{"v":1}"#.to_owned(),
486 source_ref: Some("test".to_owned()),
487 upsert: false,
488 chunk_policy: ChunkPolicy::Preserve,
489 content_ref: None,
490 }],
491 node_retires: vec![],
492 edges: vec![],
493 edge_retires: vec![],
494 chunks: vec![ChunkInsert {
495 id: "c1".to_owned(),
496 node_logical_id: "t:1".to_owned(),
497 text_content: "hello world".to_owned(),
498 byte_start: None,
499 byte_end: None,
500 content_hash: None,
501 }],
502 runs: vec![],
503 steps: vec![],
504 actions: vec![],
505 optional_backfills: vec![],
506 vec_inserts: vec![],
507 operational_writes: vec![],
508 })
509 .expect("seed write");
510
511 (dir, runtime)
512 }
513
514 #[test]
515 fn telemetry_counts_queries() {
516 let (_dir, runtime) = seeded_runtime();
517 let compiled = QueryBuilder::nodes("Test")
518 .limit(10)
519 .compile()
520 .expect("compile");
521
522 for _ in 0..3 {
523 runtime
524 .coordinator()
525 .execute_compiled_read(&compiled)
526 .expect("query");
527 }
528
529 let snap = runtime.telemetry_snapshot();
530 assert!(
531 snap.queries_total >= 3,
532 "expected at least 3 queries, got {}",
533 snap.queries_total,
534 );
535 }
536
537 #[test]
538 fn telemetry_counts_writes() {
539 let (_dir, runtime) = seeded_runtime();
540
541 runtime
543 .writer()
544 .submit(WriteRequest {
545 label: "second".to_owned(),
546 nodes: vec![NodeInsert {
547 row_id: "r2".to_owned(),
548 logical_id: "t:2".to_owned(),
549 kind: "Test".to_owned(),
550 properties: r#"{"v":2}"#.to_owned(),
551 source_ref: Some("test".to_owned()),
552 upsert: false,
553 chunk_policy: ChunkPolicy::Preserve,
554 content_ref: None,
555 }],
556 node_retires: vec![],
557 edges: vec![],
558 edge_retires: vec![],
559 chunks: vec![],
560 runs: vec![],
561 steps: vec![],
562 actions: vec![],
563 optional_backfills: vec![],
564 vec_inserts: vec![],
565 operational_writes: vec![],
566 })
567 .expect("second write");
568
569 let snap = runtime.telemetry_snapshot();
570 assert!(
571 snap.writes_total >= 2,
572 "expected at least 2 writes, got {}",
573 snap.writes_total,
574 );
575 }
576
577 #[test]
578 fn telemetry_counts_write_rows() {
579 let (_dir, runtime) = seeded_runtime();
580 let snap = runtime.telemetry_snapshot();
582 assert!(
583 snap.write_rows_total >= 2,
584 "expected at least 2 write rows, got {}",
585 snap.write_rows_total,
586 );
587 }
588
589 #[test]
590 fn telemetry_snapshot_includes_cache_status() {
591 let (_dir, runtime) = seeded_runtime();
592 let compiled = QueryBuilder::nodes("Test")
593 .limit(10)
594 .compile()
595 .expect("compile");
596
597 for _ in 0..5 {
599 runtime
600 .coordinator()
601 .execute_compiled_read(&compiled)
602 .expect("query");
603 }
604
605 let snap = runtime.telemetry_snapshot();
606 assert!(
607 snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
608 "expected cache activity, got hits={} misses={}",
609 snap.sqlite_cache.cache_hits,
610 snap.sqlite_cache.cache_misses,
611 );
612 }
613}