1use std::path::Path;
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5
6use crate::{
7 AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, QueryEmbedder,
8 WriterActor,
9 database_lock::DatabaseLock,
10 telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
11};
12
13#[derive(Debug)]
26pub struct EngineRuntime {
27 telemetry: Arc<TelemetryCounters>,
28 coordinator: ExecutionCoordinator,
29 writer: WriterActor,
30 admin: AdminHandle,
31 _lock: DatabaseLock,
32}
33
34#[allow(clippy::used_underscore_items)]
36const _: () = {
37 fn _assert_send_sync<T: Send + Sync>() {}
38 fn _check() {
39 _assert_send_sync::<EngineRuntime>();
40 }
41};
42
43impl EngineRuntime {
44 pub fn open(
48 path: impl AsRef<Path>,
49 provenance_mode: ProvenanceMode,
50 vector_dimension: Option<usize>,
51 read_pool_size: usize,
52 telemetry_level: TelemetryLevel,
53 query_embedder: Option<Arc<dyn QueryEmbedder>>,
54 ) -> Result<Self, EngineError> {
55 let lock = DatabaseLock::acquire(path.as_ref())?;
56
57 if read_pool_size == 0 {
58 return Err(EngineError::InvalidConfig(
59 "read_pool_size must be >= 1, got 0".to_owned(),
60 ));
61 }
62
63 trace_info!(
64 path = %path.as_ref().display(),
65 provenance_mode = ?provenance_mode,
66 vector_dimension = ?vector_dimension,
67 read_pool_size,
68 telemetry_level = ?telemetry_level,
69 "engine opening"
70 );
71 let _ = telemetry_level; let telemetry = Arc::new(TelemetryCounters::default());
73 let schema_manager = Arc::new(SchemaManager::new());
74 let coordinator = ExecutionCoordinator::open(
75 path.as_ref(),
76 Arc::clone(&schema_manager),
77 vector_dimension,
78 read_pool_size,
79 Arc::clone(&telemetry),
80 query_embedder,
81 )?;
82 let writer = WriterActor::start(
83 path.as_ref(),
84 Arc::clone(&schema_manager),
85 provenance_mode,
86 Arc::clone(&telemetry),
87 )?;
88 let admin = AdminHandle::new(AdminService::new(path.as_ref(), schema_manager));
89
90 trace_info!(path = %path.as_ref().display(), "engine opened");
91 Ok(Self {
92 telemetry,
93 coordinator,
94 writer,
95 admin,
96 _lock: lock,
97 })
98 }
99
100 pub fn coordinator(&self) -> &ExecutionCoordinator {
101 &self.coordinator
102 }
103
104 pub fn writer(&self) -> &WriterActor {
105 &self.writer
106 }
107
108 pub fn admin(&self) -> &AdminHandle {
109 &self.admin
110 }
111
112 pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
114 &self.telemetry
115 }
116
117 #[must_use]
120 pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
121 let mut snapshot = self.telemetry.snapshot();
122 snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
123 snapshot
124 }
125}
126
127#[cfg(test)]
128#[allow(clippy::expect_used)]
129mod tests {
130 use std::sync::Arc;
131
132 use fathomdb_query::QueryBuilder;
133
134 use crate::{
135 ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
136 };
137
138 use super::EngineRuntime;
139
140 #[test]
142 fn concurrent_reads_from_multiple_threads() {
143 let dir = tempfile::tempdir().expect("temp dir");
144 let runtime = Arc::new(
145 EngineRuntime::open(
146 dir.path().join("test.db"),
147 ProvenanceMode::Warn,
148 None,
149 4,
150 TelemetryLevel::Counters,
151 None,
152 )
153 .expect("open"),
154 );
155
156 runtime
157 .writer()
158 .submit(WriteRequest {
159 label: "seed".to_owned(),
160 nodes: vec![NodeInsert {
161 row_id: "r1".to_owned(),
162 logical_id: "t:1".to_owned(),
163 kind: "Test".to_owned(),
164 properties: r#"{"v":1}"#.to_owned(),
165 source_ref: Some("test".to_owned()),
166 upsert: false,
167 chunk_policy: ChunkPolicy::Preserve,
168 content_ref: None,
169 }],
170 node_retires: vec![],
171 edges: vec![],
172 edge_retires: vec![],
173 chunks: vec![ChunkInsert {
174 id: "c1".to_owned(),
175 node_logical_id: "t:1".to_owned(),
176 text_content: "hello world".to_owned(),
177 byte_start: None,
178 byte_end: None,
179 content_hash: None,
180 }],
181 runs: vec![],
182 steps: vec![],
183 actions: vec![],
184 optional_backfills: vec![],
185 vec_inserts: vec![],
186 operational_writes: vec![],
187 })
188 .expect("seed write");
189
190 let compiled = QueryBuilder::nodes("Test")
191 .limit(10)
192 .compile()
193 .expect("compile");
194
195 let handles: Vec<_> = (0..4)
196 .map(|_| {
197 let rt = Arc::clone(&runtime);
198 let q = compiled.clone();
199 std::thread::spawn(move || {
200 let rows = rt
201 .coordinator()
202 .execute_compiled_read(&q)
203 .expect("query succeeds");
204 assert_eq!(rows.nodes.len(), 1);
205 })
206 })
207 .collect();
208
209 for h in handles {
210 h.join().expect("worker thread panicked");
211 }
212 }
213
214 #[test]
215 fn open_same_database_twice_returns_database_locked() {
216 let dir = tempfile::tempdir().expect("temp dir");
217 let db_path = dir.path().join("test.db");
218
219 let _first = EngineRuntime::open(
220 &db_path,
221 ProvenanceMode::Warn,
222 None,
223 4,
224 TelemetryLevel::Counters,
225 None,
226 )
227 .expect("open");
228 let second = EngineRuntime::open(
229 &db_path,
230 ProvenanceMode::Warn,
231 None,
232 4,
233 TelemetryLevel::Counters,
234 None,
235 );
236
237 assert!(second.is_err(), "second open must fail");
238 let err = second.expect_err("second open must fail");
239 assert!(
240 matches!(err, crate::EngineError::DatabaseLocked(_)),
241 "expected DatabaseLocked, got: {err:?}"
242 );
243 assert!(
244 err.to_string().contains("already in use"),
245 "error must mention 'already in use': {err}"
246 );
247 }
248
249 #[test]
250 fn database_reopens_after_drop() {
251 let dir = tempfile::tempdir().expect("temp dir");
252 let db_path = dir.path().join("test.db");
253
254 {
255 let _runtime = EngineRuntime::open(
256 &db_path,
257 ProvenanceMode::Warn,
258 None,
259 4,
260 TelemetryLevel::Counters,
261 None,
262 )
263 .expect("first open");
264 }
265
266 let runtime = EngineRuntime::open(
267 &db_path,
268 ProvenanceMode::Warn,
269 None,
270 4,
271 TelemetryLevel::Counters,
272 None,
273 )
274 .expect("reopen");
275 let compiled = QueryBuilder::nodes("Test")
276 .limit(10)
277 .compile()
278 .expect("compile");
279 let rows = runtime
280 .coordinator()
281 .execute_compiled_read(&compiled)
282 .expect("query");
283 assert!(rows.nodes.is_empty());
284 }
285
286 #[test]
287 fn lock_error_includes_pid() {
288 let dir = tempfile::tempdir().expect("temp dir");
289 let db_path = dir.path().join("test.db");
290
291 let _first = EngineRuntime::open(
292 &db_path,
293 ProvenanceMode::Warn,
294 None,
295 4,
296 TelemetryLevel::Counters,
297 None,
298 )
299 .expect("open");
300 let err = EngineRuntime::open(
301 &db_path,
302 ProvenanceMode::Warn,
303 None,
304 4,
305 TelemetryLevel::Counters,
306 None,
307 )
308 .expect_err("second open must fail");
309
310 let msg = err.to_string();
311 assert!(
312 msg.contains("already in use"),
313 "error must mention 'already in use': {msg}"
314 );
315 if cfg!(unix) {
318 let our_pid = std::process::id().to_string();
319 assert!(
320 msg.contains(&our_pid),
321 "error must contain holder pid {our_pid}: {msg}"
322 );
323 }
324 }
325
326 #[test]
329 fn drop_joins_writer_and_checkpoints_wal() {
330 let dir = tempfile::tempdir().expect("temp dir");
331 let db_path = dir.path().join("test.db");
332 let wal_path = dir.path().join("test.db-wal");
333
334 {
335 let runtime = EngineRuntime::open(
336 &db_path,
337 ProvenanceMode::Warn,
338 None,
339 4,
340 TelemetryLevel::Counters,
341 None,
342 )
343 .expect("open");
344
345 runtime
346 .writer()
347 .submit(WriteRequest {
348 label: "seed".to_owned(),
349 nodes: vec![NodeInsert {
350 row_id: "r1".to_owned(),
351 logical_id: "t:1".to_owned(),
352 kind: "Test".to_owned(),
353 properties: r#"{"v":1}"#.to_owned(),
354 source_ref: Some("test".to_owned()),
355 upsert: false,
356 chunk_policy: ChunkPolicy::Preserve,
357 content_ref: None,
358 }],
359 node_retires: vec![],
360 edges: vec![],
361 edge_retires: vec![],
362 chunks: vec![],
363 runs: vec![],
364 steps: vec![],
365 actions: vec![],
366 optional_backfills: vec![],
367 vec_inserts: vec![],
368 operational_writes: vec![],
369 })
370 .expect("seed write");
371 }
372 assert!(
374 !wal_path.exists(),
375 "WAL file should be cleaned up after graceful drop"
376 );
377
378 let runtime = EngineRuntime::open(
380 &db_path,
381 ProvenanceMode::Warn,
382 None,
383 4,
384 TelemetryLevel::Counters,
385 None,
386 )
387 .expect("reopen");
388 let compiled = QueryBuilder::nodes("Test")
389 .limit(10)
390 .compile()
391 .expect("compile");
392 let rows = runtime
393 .coordinator()
394 .execute_compiled_read(&compiled)
395 .expect("query");
396 assert_eq!(rows.nodes.len(), 1);
397 }
398
399 fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
401 let dir = tempfile::tempdir().expect("temp dir");
402 let runtime = EngineRuntime::open(
403 dir.path().join("test.db"),
404 ProvenanceMode::Warn,
405 None,
406 4,
407 TelemetryLevel::Counters,
408 None,
409 )
410 .expect("open");
411
412 runtime
413 .writer()
414 .submit(WriteRequest {
415 label: "seed".to_owned(),
416 nodes: vec![NodeInsert {
417 row_id: "r1".to_owned(),
418 logical_id: "t:1".to_owned(),
419 kind: "Test".to_owned(),
420 properties: r#"{"v":1}"#.to_owned(),
421 source_ref: Some("test".to_owned()),
422 upsert: false,
423 chunk_policy: ChunkPolicy::Preserve,
424 content_ref: None,
425 }],
426 node_retires: vec![],
427 edges: vec![],
428 edge_retires: vec![],
429 chunks: vec![ChunkInsert {
430 id: "c1".to_owned(),
431 node_logical_id: "t:1".to_owned(),
432 text_content: "hello world".to_owned(),
433 byte_start: None,
434 byte_end: None,
435 content_hash: None,
436 }],
437 runs: vec![],
438 steps: vec![],
439 actions: vec![],
440 optional_backfills: vec![],
441 vec_inserts: vec![],
442 operational_writes: vec![],
443 })
444 .expect("seed write");
445
446 (dir, runtime)
447 }
448
449 #[test]
450 fn telemetry_counts_queries() {
451 let (_dir, runtime) = seeded_runtime();
452 let compiled = QueryBuilder::nodes("Test")
453 .limit(10)
454 .compile()
455 .expect("compile");
456
457 for _ in 0..3 {
458 runtime
459 .coordinator()
460 .execute_compiled_read(&compiled)
461 .expect("query");
462 }
463
464 let snap = runtime.telemetry_snapshot();
465 assert!(
466 snap.queries_total >= 3,
467 "expected at least 3 queries, got {}",
468 snap.queries_total,
469 );
470 }
471
472 #[test]
473 fn telemetry_counts_writes() {
474 let (_dir, runtime) = seeded_runtime();
475
476 runtime
478 .writer()
479 .submit(WriteRequest {
480 label: "second".to_owned(),
481 nodes: vec![NodeInsert {
482 row_id: "r2".to_owned(),
483 logical_id: "t:2".to_owned(),
484 kind: "Test".to_owned(),
485 properties: r#"{"v":2}"#.to_owned(),
486 source_ref: Some("test".to_owned()),
487 upsert: false,
488 chunk_policy: ChunkPolicy::Preserve,
489 content_ref: None,
490 }],
491 node_retires: vec![],
492 edges: vec![],
493 edge_retires: vec![],
494 chunks: vec![],
495 runs: vec![],
496 steps: vec![],
497 actions: vec![],
498 optional_backfills: vec![],
499 vec_inserts: vec![],
500 operational_writes: vec![],
501 })
502 .expect("second write");
503
504 let snap = runtime.telemetry_snapshot();
505 assert!(
506 snap.writes_total >= 2,
507 "expected at least 2 writes, got {}",
508 snap.writes_total,
509 );
510 }
511
512 #[test]
513 fn telemetry_counts_write_rows() {
514 let (_dir, runtime) = seeded_runtime();
515 let snap = runtime.telemetry_snapshot();
517 assert!(
518 snap.write_rows_total >= 2,
519 "expected at least 2 write rows, got {}",
520 snap.write_rows_total,
521 );
522 }
523
524 #[test]
525 fn telemetry_snapshot_includes_cache_status() {
526 let (_dir, runtime) = seeded_runtime();
527 let compiled = QueryBuilder::nodes("Test")
528 .limit(10)
529 .compile()
530 .expect("compile");
531
532 for _ in 0..5 {
534 runtime
535 .coordinator()
536 .execute_compiled_read(&compiled)
537 .expect("query");
538 }
539
540 let snap = runtime.telemetry_snapshot();
541 assert!(
542 snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
543 "expected cache activity, got hits={} misses={}",
544 snap.sqlite_cache.cache_hits,
545 snap.sqlite_cache.cache_misses,
546 );
547 }
548}