1use std::path::Path;
2use std::sync::Arc;
3use std::sync::mpsc;
4
5use fathomdb_schema::SchemaManager;
6
7use crate::{
8 AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, QueryEmbedder,
9 WriterActor,
10 database_lock::DatabaseLock,
11 rebuild_actor::{RebuildActor, RebuildRequest, recover_interrupted_rebuilds},
12 telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
13};
14
15#[derive(Debug)]
30pub struct EngineRuntime {
31 telemetry: Arc<TelemetryCounters>,
32 coordinator: ExecutionCoordinator,
33 writer: WriterActor,
34 admin: AdminHandle,
35 _rebuild_sender: mpsc::SyncSender<RebuildRequest>,
38 _rebuild: RebuildActor,
39 _lock: DatabaseLock,
40}
41
42#[allow(clippy::used_underscore_items)]
44const _: () = {
45 fn _assert_send_sync<T: Send + Sync>() {}
46 fn _check() {
47 _assert_send_sync::<EngineRuntime>();
48 }
49};
50
51impl EngineRuntime {
52 pub fn open(
56 path: impl AsRef<Path>,
57 provenance_mode: ProvenanceMode,
58 vector_dimension: Option<usize>,
59 read_pool_size: usize,
60 telemetry_level: TelemetryLevel,
61 query_embedder: Option<Arc<dyn QueryEmbedder>>,
62 ) -> Result<Self, EngineError> {
63 let lock = DatabaseLock::acquire(path.as_ref())?;
64
65 if read_pool_size == 0 {
66 return Err(EngineError::InvalidConfig(
67 "read_pool_size must be >= 1, got 0".to_owned(),
68 ));
69 }
70
71 trace_info!(
72 path = %path.as_ref().display(),
73 provenance_mode = ?provenance_mode,
74 vector_dimension = ?vector_dimension,
75 read_pool_size,
76 telemetry_level = ?telemetry_level,
77 "engine opening"
78 );
79 let _ = telemetry_level; let telemetry = Arc::new(TelemetryCounters::default());
81 let schema_manager = Arc::new(SchemaManager::new());
82 let coordinator = ExecutionCoordinator::open(
83 path.as_ref(),
84 Arc::clone(&schema_manager),
85 vector_dimension,
86 read_pool_size,
87 Arc::clone(&telemetry),
88 query_embedder,
89 )?;
90 let writer = WriterActor::start(
91 path.as_ref(),
92 Arc::clone(&schema_manager),
93 provenance_mode,
94 Arc::clone(&telemetry),
95 )?;
96
97 {
101 let recovery_conn = crate::sqlite::open_connection(path.as_ref())?;
102 recover_interrupted_rebuilds(&recovery_conn)?;
103 }
104
105 let (rebuild_sender, rebuild_receiver) = mpsc::sync_channel::<RebuildRequest>(64);
107 let rebuild_actor =
108 RebuildActor::start(path.as_ref(), Arc::clone(&schema_manager), rebuild_receiver)?;
109 let admin = AdminHandle::new(AdminService::new_with_rebuild(
110 path.as_ref(),
111 schema_manager,
112 rebuild_sender.clone(),
113 ));
114
115 trace_info!(path = %path.as_ref().display(), "engine opened");
116 Ok(Self {
117 telemetry,
118 coordinator,
119 writer,
120 admin,
121 _rebuild_sender: rebuild_sender,
122 _rebuild: rebuild_actor,
123 _lock: lock,
124 })
125 }
126
127 pub fn coordinator(&self) -> &ExecutionCoordinator {
128 &self.coordinator
129 }
130
131 pub fn writer(&self) -> &WriterActor {
132 &self.writer
133 }
134
135 pub fn admin(&self) -> &AdminHandle {
136 &self.admin
137 }
138
139 pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
141 &self.telemetry
142 }
143
144 #[must_use]
147 pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
148 let mut snapshot = self.telemetry.snapshot();
149 snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
150 snapshot
151 }
152}
153
154#[cfg(test)]
155#[allow(clippy::expect_used)]
156mod tests {
157 use std::sync::Arc;
158
159 use fathomdb_query::QueryBuilder;
160
161 use crate::{
162 ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
163 };
164
165 use super::EngineRuntime;
166
167 #[test]
169 fn concurrent_reads_from_multiple_threads() {
170 let dir = tempfile::tempdir().expect("temp dir");
171 let runtime = Arc::new(
172 EngineRuntime::open(
173 dir.path().join("test.db"),
174 ProvenanceMode::Warn,
175 None,
176 4,
177 TelemetryLevel::Counters,
178 None,
179 )
180 .expect("open"),
181 );
182
183 runtime
184 .writer()
185 .submit(WriteRequest {
186 label: "seed".to_owned(),
187 nodes: vec![NodeInsert {
188 row_id: "r1".to_owned(),
189 logical_id: "t:1".to_owned(),
190 kind: "Test".to_owned(),
191 properties: r#"{"v":1}"#.to_owned(),
192 source_ref: Some("test".to_owned()),
193 upsert: false,
194 chunk_policy: ChunkPolicy::Preserve,
195 content_ref: None,
196 }],
197 node_retires: vec![],
198 edges: vec![],
199 edge_retires: vec![],
200 chunks: vec![ChunkInsert {
201 id: "c1".to_owned(),
202 node_logical_id: "t:1".to_owned(),
203 text_content: "hello world".to_owned(),
204 byte_start: None,
205 byte_end: None,
206 content_hash: None,
207 }],
208 runs: vec![],
209 steps: vec![],
210 actions: vec![],
211 optional_backfills: vec![],
212 vec_inserts: vec![],
213 operational_writes: vec![],
214 })
215 .expect("seed write");
216
217 let compiled = QueryBuilder::nodes("Test")
218 .limit(10)
219 .compile()
220 .expect("compile");
221
222 let handles: Vec<_> = (0..4)
223 .map(|_| {
224 let rt = Arc::clone(&runtime);
225 let q = compiled.clone();
226 std::thread::spawn(move || {
227 let rows = rt
228 .coordinator()
229 .execute_compiled_read(&q)
230 .expect("query succeeds");
231 assert_eq!(rows.nodes.len(), 1);
232 })
233 })
234 .collect();
235
236 for h in handles {
237 h.join().expect("worker thread panicked");
238 }
239 }
240
241 #[test]
242 fn open_same_database_twice_returns_database_locked() {
243 let dir = tempfile::tempdir().expect("temp dir");
244 let db_path = dir.path().join("test.db");
245
246 let _first = EngineRuntime::open(
247 &db_path,
248 ProvenanceMode::Warn,
249 None,
250 4,
251 TelemetryLevel::Counters,
252 None,
253 )
254 .expect("open");
255 let second = EngineRuntime::open(
256 &db_path,
257 ProvenanceMode::Warn,
258 None,
259 4,
260 TelemetryLevel::Counters,
261 None,
262 );
263
264 assert!(second.is_err(), "second open must fail");
265 let err = second.expect_err("second open must fail");
266 assert!(
267 matches!(err, crate::EngineError::DatabaseLocked(_)),
268 "expected DatabaseLocked, got: {err:?}"
269 );
270 assert!(
271 err.to_string().contains("already in use"),
272 "error must mention 'already in use': {err}"
273 );
274 }
275
276 #[test]
277 fn database_reopens_after_drop() {
278 let dir = tempfile::tempdir().expect("temp dir");
279 let db_path = dir.path().join("test.db");
280
281 {
282 let _runtime = EngineRuntime::open(
283 &db_path,
284 ProvenanceMode::Warn,
285 None,
286 4,
287 TelemetryLevel::Counters,
288 None,
289 )
290 .expect("first open");
291 }
292
293 let runtime = EngineRuntime::open(
294 &db_path,
295 ProvenanceMode::Warn,
296 None,
297 4,
298 TelemetryLevel::Counters,
299 None,
300 )
301 .expect("reopen");
302 let compiled = QueryBuilder::nodes("Test")
303 .limit(10)
304 .compile()
305 .expect("compile");
306 let rows = runtime
307 .coordinator()
308 .execute_compiled_read(&compiled)
309 .expect("query");
310 assert!(rows.nodes.is_empty());
311 }
312
313 #[test]
314 fn lock_error_includes_pid() {
315 let dir = tempfile::tempdir().expect("temp dir");
316 let db_path = dir.path().join("test.db");
317
318 let _first = EngineRuntime::open(
319 &db_path,
320 ProvenanceMode::Warn,
321 None,
322 4,
323 TelemetryLevel::Counters,
324 None,
325 )
326 .expect("open");
327 let err = EngineRuntime::open(
328 &db_path,
329 ProvenanceMode::Warn,
330 None,
331 4,
332 TelemetryLevel::Counters,
333 None,
334 )
335 .expect_err("second open must fail");
336
337 let msg = err.to_string();
338 assert!(
339 msg.contains("already in use"),
340 "error must mention 'already in use': {msg}"
341 );
342 if cfg!(unix) {
345 let our_pid = std::process::id().to_string();
346 assert!(
347 msg.contains(&our_pid),
348 "error must contain holder pid {our_pid}: {msg}"
349 );
350 }
351 }
352
353 #[test]
356 fn drop_joins_writer_and_checkpoints_wal() {
357 let dir = tempfile::tempdir().expect("temp dir");
358 let db_path = dir.path().join("test.db");
359 let wal_path = dir.path().join("test.db-wal");
360
361 {
362 let runtime = EngineRuntime::open(
363 &db_path,
364 ProvenanceMode::Warn,
365 None,
366 4,
367 TelemetryLevel::Counters,
368 None,
369 )
370 .expect("open");
371
372 runtime
373 .writer()
374 .submit(WriteRequest {
375 label: "seed".to_owned(),
376 nodes: vec![NodeInsert {
377 row_id: "r1".to_owned(),
378 logical_id: "t:1".to_owned(),
379 kind: "Test".to_owned(),
380 properties: r#"{"v":1}"#.to_owned(),
381 source_ref: Some("test".to_owned()),
382 upsert: false,
383 chunk_policy: ChunkPolicy::Preserve,
384 content_ref: None,
385 }],
386 node_retires: vec![],
387 edges: vec![],
388 edge_retires: vec![],
389 chunks: vec![],
390 runs: vec![],
391 steps: vec![],
392 actions: vec![],
393 optional_backfills: vec![],
394 vec_inserts: vec![],
395 operational_writes: vec![],
396 })
397 .expect("seed write");
398 }
399 assert!(
401 !wal_path.exists(),
402 "WAL file should be cleaned up after graceful drop"
403 );
404
405 let runtime = EngineRuntime::open(
407 &db_path,
408 ProvenanceMode::Warn,
409 None,
410 4,
411 TelemetryLevel::Counters,
412 None,
413 )
414 .expect("reopen");
415 let compiled = QueryBuilder::nodes("Test")
416 .limit(10)
417 .compile()
418 .expect("compile");
419 let rows = runtime
420 .coordinator()
421 .execute_compiled_read(&compiled)
422 .expect("query");
423 assert_eq!(rows.nodes.len(), 1);
424 }
425
426 fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
428 let dir = tempfile::tempdir().expect("temp dir");
429 let runtime = EngineRuntime::open(
430 dir.path().join("test.db"),
431 ProvenanceMode::Warn,
432 None,
433 4,
434 TelemetryLevel::Counters,
435 None,
436 )
437 .expect("open");
438
439 runtime
440 .writer()
441 .submit(WriteRequest {
442 label: "seed".to_owned(),
443 nodes: vec![NodeInsert {
444 row_id: "r1".to_owned(),
445 logical_id: "t:1".to_owned(),
446 kind: "Test".to_owned(),
447 properties: r#"{"v":1}"#.to_owned(),
448 source_ref: Some("test".to_owned()),
449 upsert: false,
450 chunk_policy: ChunkPolicy::Preserve,
451 content_ref: None,
452 }],
453 node_retires: vec![],
454 edges: vec![],
455 edge_retires: vec![],
456 chunks: vec![ChunkInsert {
457 id: "c1".to_owned(),
458 node_logical_id: "t:1".to_owned(),
459 text_content: "hello world".to_owned(),
460 byte_start: None,
461 byte_end: None,
462 content_hash: None,
463 }],
464 runs: vec![],
465 steps: vec![],
466 actions: vec![],
467 optional_backfills: vec![],
468 vec_inserts: vec![],
469 operational_writes: vec![],
470 })
471 .expect("seed write");
472
473 (dir, runtime)
474 }
475
476 #[test]
477 fn telemetry_counts_queries() {
478 let (_dir, runtime) = seeded_runtime();
479 let compiled = QueryBuilder::nodes("Test")
480 .limit(10)
481 .compile()
482 .expect("compile");
483
484 for _ in 0..3 {
485 runtime
486 .coordinator()
487 .execute_compiled_read(&compiled)
488 .expect("query");
489 }
490
491 let snap = runtime.telemetry_snapshot();
492 assert!(
493 snap.queries_total >= 3,
494 "expected at least 3 queries, got {}",
495 snap.queries_total,
496 );
497 }
498
499 #[test]
500 fn telemetry_counts_writes() {
501 let (_dir, runtime) = seeded_runtime();
502
503 runtime
505 .writer()
506 .submit(WriteRequest {
507 label: "second".to_owned(),
508 nodes: vec![NodeInsert {
509 row_id: "r2".to_owned(),
510 logical_id: "t:2".to_owned(),
511 kind: "Test".to_owned(),
512 properties: r#"{"v":2}"#.to_owned(),
513 source_ref: Some("test".to_owned()),
514 upsert: false,
515 chunk_policy: ChunkPolicy::Preserve,
516 content_ref: None,
517 }],
518 node_retires: vec![],
519 edges: vec![],
520 edge_retires: vec![],
521 chunks: vec![],
522 runs: vec![],
523 steps: vec![],
524 actions: vec![],
525 optional_backfills: vec![],
526 vec_inserts: vec![],
527 operational_writes: vec![],
528 })
529 .expect("second write");
530
531 let snap = runtime.telemetry_snapshot();
532 assert!(
533 snap.writes_total >= 2,
534 "expected at least 2 writes, got {}",
535 snap.writes_total,
536 );
537 }
538
539 #[test]
540 fn telemetry_counts_write_rows() {
541 let (_dir, runtime) = seeded_runtime();
542 let snap = runtime.telemetry_snapshot();
544 assert!(
545 snap.write_rows_total >= 2,
546 "expected at least 2 write rows, got {}",
547 snap.write_rows_total,
548 );
549 }
550
551 #[test]
552 fn telemetry_snapshot_includes_cache_status() {
553 let (_dir, runtime) = seeded_runtime();
554 let compiled = QueryBuilder::nodes("Test")
555 .limit(10)
556 .compile()
557 .expect("compile");
558
559 for _ in 0..5 {
561 runtime
562 .coordinator()
563 .execute_compiled_read(&compiled)
564 .expect("query");
565 }
566
567 let snap = runtime.telemetry_snapshot();
568 assert!(
569 snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
570 "expected cache activity, got hits={} misses={}",
571 snap.sqlite_cache.cache_hits,
572 snap.sqlite_cache.cache_misses,
573 );
574 }
575}