1use std::path::Path;
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5
6use crate::{
7 AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, WriterActor,
8 database_lock::DatabaseLock,
9 telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
10};
11
12#[derive(Debug)]
25pub struct EngineRuntime {
26 telemetry: Arc<TelemetryCounters>,
27 coordinator: ExecutionCoordinator,
28 writer: WriterActor,
29 admin: AdminHandle,
30 _lock: DatabaseLock,
31}
32
33#[allow(clippy::used_underscore_items)]
35const _: () = {
36 fn _assert_send_sync<T: Send + Sync>() {}
37 fn _check() {
38 _assert_send_sync::<EngineRuntime>();
39 }
40};
41
42impl EngineRuntime {
43 pub fn open(
47 path: impl AsRef<Path>,
48 provenance_mode: ProvenanceMode,
49 vector_dimension: Option<usize>,
50 read_pool_size: usize,
51 telemetry_level: TelemetryLevel,
52 ) -> Result<Self, EngineError> {
53 let lock = DatabaseLock::acquire(path.as_ref())?;
54
55 if read_pool_size == 0 {
56 return Err(EngineError::InvalidConfig(
57 "read_pool_size must be >= 1, got 0".to_owned(),
58 ));
59 }
60
61 trace_info!(
62 path = %path.as_ref().display(),
63 provenance_mode = ?provenance_mode,
64 vector_dimension = ?vector_dimension,
65 read_pool_size,
66 telemetry_level = ?telemetry_level,
67 "engine opening"
68 );
69 let _ = telemetry_level; let telemetry = Arc::new(TelemetryCounters::default());
71 let schema_manager = Arc::new(SchemaManager::new());
72 let coordinator = ExecutionCoordinator::open(
73 path.as_ref(),
74 Arc::clone(&schema_manager),
75 vector_dimension,
76 read_pool_size,
77 Arc::clone(&telemetry),
78 )?;
79 let writer = WriterActor::start(
80 path.as_ref(),
81 Arc::clone(&schema_manager),
82 provenance_mode,
83 Arc::clone(&telemetry),
84 )?;
85 let admin = AdminHandle::new(AdminService::new(path.as_ref(), schema_manager));
86
87 trace_info!(path = %path.as_ref().display(), "engine opened");
88 Ok(Self {
89 telemetry,
90 coordinator,
91 writer,
92 admin,
93 _lock: lock,
94 })
95 }
96
97 pub fn coordinator(&self) -> &ExecutionCoordinator {
98 &self.coordinator
99 }
100
101 pub fn writer(&self) -> &WriterActor {
102 &self.writer
103 }
104
105 pub fn admin(&self) -> &AdminHandle {
106 &self.admin
107 }
108
109 pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
111 &self.telemetry
112 }
113
114 #[must_use]
117 pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
118 let mut snapshot = self.telemetry.snapshot();
119 snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
120 snapshot
121 }
122}
123
124#[cfg(test)]
125#[allow(clippy::expect_used)]
126mod tests {
127 use std::sync::Arc;
128
129 use fathomdb_query::QueryBuilder;
130
131 use crate::{
132 ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
133 };
134
135 use super::EngineRuntime;
136
137 #[test]
139 fn concurrent_reads_from_multiple_threads() {
140 let dir = tempfile::tempdir().expect("temp dir");
141 let runtime = Arc::new(
142 EngineRuntime::open(
143 dir.path().join("test.db"),
144 ProvenanceMode::Warn,
145 None,
146 4,
147 TelemetryLevel::Counters,
148 )
149 .expect("open"),
150 );
151
152 runtime
153 .writer()
154 .submit(WriteRequest {
155 label: "seed".to_owned(),
156 nodes: vec![NodeInsert {
157 row_id: "r1".to_owned(),
158 logical_id: "t:1".to_owned(),
159 kind: "Test".to_owned(),
160 properties: r#"{"v":1}"#.to_owned(),
161 source_ref: Some("test".to_owned()),
162 upsert: false,
163 chunk_policy: ChunkPolicy::Preserve,
164 }],
165 node_retires: vec![],
166 edges: vec![],
167 edge_retires: vec![],
168 chunks: vec![ChunkInsert {
169 id: "c1".to_owned(),
170 node_logical_id: "t:1".to_owned(),
171 text_content: "hello world".to_owned(),
172 byte_start: None,
173 byte_end: None,
174 }],
175 runs: vec![],
176 steps: vec![],
177 actions: vec![],
178 optional_backfills: vec![],
179 vec_inserts: vec![],
180 operational_writes: vec![],
181 })
182 .expect("seed write");
183
184 let compiled = QueryBuilder::nodes("Test")
185 .limit(10)
186 .compile()
187 .expect("compile");
188
189 let handles: Vec<_> = (0..4)
190 .map(|_| {
191 let rt = Arc::clone(&runtime);
192 let q = compiled.clone();
193 std::thread::spawn(move || {
194 let rows = rt
195 .coordinator()
196 .execute_compiled_read(&q)
197 .expect("query succeeds");
198 assert_eq!(rows.nodes.len(), 1);
199 })
200 })
201 .collect();
202
203 for h in handles {
204 h.join().expect("worker thread panicked");
205 }
206 }
207
208 #[test]
209 fn open_same_database_twice_returns_database_locked() {
210 let dir = tempfile::tempdir().expect("temp dir");
211 let db_path = dir.path().join("test.db");
212
213 let _first = EngineRuntime::open(
214 &db_path,
215 ProvenanceMode::Warn,
216 None,
217 4,
218 TelemetryLevel::Counters,
219 )
220 .expect("open");
221 let second = EngineRuntime::open(
222 &db_path,
223 ProvenanceMode::Warn,
224 None,
225 4,
226 TelemetryLevel::Counters,
227 );
228
229 assert!(second.is_err(), "second open must fail");
230 let err = second.expect_err("second open must fail");
231 assert!(
232 matches!(err, crate::EngineError::DatabaseLocked(_)),
233 "expected DatabaseLocked, got: {err:?}"
234 );
235 assert!(
236 err.to_string().contains("already in use"),
237 "error must mention 'already in use': {err}"
238 );
239 }
240
241 #[test]
242 fn database_reopens_after_drop() {
243 let dir = tempfile::tempdir().expect("temp dir");
244 let db_path = dir.path().join("test.db");
245
246 {
247 let _runtime = EngineRuntime::open(
248 &db_path,
249 ProvenanceMode::Warn,
250 None,
251 4,
252 TelemetryLevel::Counters,
253 )
254 .expect("first open");
255 }
256
257 let runtime = EngineRuntime::open(
258 &db_path,
259 ProvenanceMode::Warn,
260 None,
261 4,
262 TelemetryLevel::Counters,
263 )
264 .expect("reopen");
265 let compiled = QueryBuilder::nodes("Test")
266 .limit(10)
267 .compile()
268 .expect("compile");
269 let rows = runtime
270 .coordinator()
271 .execute_compiled_read(&compiled)
272 .expect("query");
273 assert!(rows.nodes.is_empty());
274 }
275
276 #[test]
277 fn lock_error_includes_pid() {
278 let dir = tempfile::tempdir().expect("temp dir");
279 let db_path = dir.path().join("test.db");
280
281 let _first = EngineRuntime::open(
282 &db_path,
283 ProvenanceMode::Warn,
284 None,
285 4,
286 TelemetryLevel::Counters,
287 )
288 .expect("open");
289 let err = EngineRuntime::open(
290 &db_path,
291 ProvenanceMode::Warn,
292 None,
293 4,
294 TelemetryLevel::Counters,
295 )
296 .expect_err("second open must fail");
297
298 let msg = err.to_string();
299 assert!(
300 msg.contains("already in use"),
301 "error must mention 'already in use': {msg}"
302 );
303 if cfg!(unix) {
306 let our_pid = std::process::id().to_string();
307 assert!(
308 msg.contains(&our_pid),
309 "error must contain holder pid {our_pid}: {msg}"
310 );
311 }
312 }
313
314 #[test]
317 fn drop_joins_writer_and_checkpoints_wal() {
318 let dir = tempfile::tempdir().expect("temp dir");
319 let db_path = dir.path().join("test.db");
320 let wal_path = dir.path().join("test.db-wal");
321
322 {
323 let runtime = EngineRuntime::open(
324 &db_path,
325 ProvenanceMode::Warn,
326 None,
327 4,
328 TelemetryLevel::Counters,
329 )
330 .expect("open");
331
332 runtime
333 .writer()
334 .submit(WriteRequest {
335 label: "seed".to_owned(),
336 nodes: vec![NodeInsert {
337 row_id: "r1".to_owned(),
338 logical_id: "t:1".to_owned(),
339 kind: "Test".to_owned(),
340 properties: r#"{"v":1}"#.to_owned(),
341 source_ref: Some("test".to_owned()),
342 upsert: false,
343 chunk_policy: ChunkPolicy::Preserve,
344 }],
345 node_retires: vec![],
346 edges: vec![],
347 edge_retires: vec![],
348 chunks: vec![],
349 runs: vec![],
350 steps: vec![],
351 actions: vec![],
352 optional_backfills: vec![],
353 vec_inserts: vec![],
354 operational_writes: vec![],
355 })
356 .expect("seed write");
357 }
358 assert!(
360 !wal_path.exists(),
361 "WAL file should be cleaned up after graceful drop"
362 );
363
364 let runtime = EngineRuntime::open(
366 &db_path,
367 ProvenanceMode::Warn,
368 None,
369 4,
370 TelemetryLevel::Counters,
371 )
372 .expect("reopen");
373 let compiled = QueryBuilder::nodes("Test")
374 .limit(10)
375 .compile()
376 .expect("compile");
377 let rows = runtime
378 .coordinator()
379 .execute_compiled_read(&compiled)
380 .expect("query");
381 assert_eq!(rows.nodes.len(), 1);
382 }
383
384 fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
386 let dir = tempfile::tempdir().expect("temp dir");
387 let runtime = EngineRuntime::open(
388 dir.path().join("test.db"),
389 ProvenanceMode::Warn,
390 None,
391 4,
392 TelemetryLevel::Counters,
393 )
394 .expect("open");
395
396 runtime
397 .writer()
398 .submit(WriteRequest {
399 label: "seed".to_owned(),
400 nodes: vec![NodeInsert {
401 row_id: "r1".to_owned(),
402 logical_id: "t:1".to_owned(),
403 kind: "Test".to_owned(),
404 properties: r#"{"v":1}"#.to_owned(),
405 source_ref: Some("test".to_owned()),
406 upsert: false,
407 chunk_policy: ChunkPolicy::Preserve,
408 }],
409 node_retires: vec![],
410 edges: vec![],
411 edge_retires: vec![],
412 chunks: vec![ChunkInsert {
413 id: "c1".to_owned(),
414 node_logical_id: "t:1".to_owned(),
415 text_content: "hello world".to_owned(),
416 byte_start: None,
417 byte_end: None,
418 }],
419 runs: vec![],
420 steps: vec![],
421 actions: vec![],
422 optional_backfills: vec![],
423 vec_inserts: vec![],
424 operational_writes: vec![],
425 })
426 .expect("seed write");
427
428 (dir, runtime)
429 }
430
431 #[test]
432 fn telemetry_counts_queries() {
433 let (_dir, runtime) = seeded_runtime();
434 let compiled = QueryBuilder::nodes("Test")
435 .limit(10)
436 .compile()
437 .expect("compile");
438
439 for _ in 0..3 {
440 runtime
441 .coordinator()
442 .execute_compiled_read(&compiled)
443 .expect("query");
444 }
445
446 let snap = runtime.telemetry_snapshot();
447 assert!(
448 snap.queries_total >= 3,
449 "expected at least 3 queries, got {}",
450 snap.queries_total,
451 );
452 }
453
454 #[test]
455 fn telemetry_counts_writes() {
456 let (_dir, runtime) = seeded_runtime();
457
458 runtime
460 .writer()
461 .submit(WriteRequest {
462 label: "second".to_owned(),
463 nodes: vec![NodeInsert {
464 row_id: "r2".to_owned(),
465 logical_id: "t:2".to_owned(),
466 kind: "Test".to_owned(),
467 properties: r#"{"v":2}"#.to_owned(),
468 source_ref: Some("test".to_owned()),
469 upsert: false,
470 chunk_policy: ChunkPolicy::Preserve,
471 }],
472 node_retires: vec![],
473 edges: vec![],
474 edge_retires: vec![],
475 chunks: vec![],
476 runs: vec![],
477 steps: vec![],
478 actions: vec![],
479 optional_backfills: vec![],
480 vec_inserts: vec![],
481 operational_writes: vec![],
482 })
483 .expect("second write");
484
485 let snap = runtime.telemetry_snapshot();
486 assert!(
487 snap.writes_total >= 2,
488 "expected at least 2 writes, got {}",
489 snap.writes_total,
490 );
491 }
492
493 #[test]
494 fn telemetry_counts_write_rows() {
495 let (_dir, runtime) = seeded_runtime();
496 let snap = runtime.telemetry_snapshot();
498 assert!(
499 snap.write_rows_total >= 2,
500 "expected at least 2 write rows, got {}",
501 snap.write_rows_total,
502 );
503 }
504
505 #[test]
506 fn telemetry_snapshot_includes_cache_status() {
507 let (_dir, runtime) = seeded_runtime();
508 let compiled = QueryBuilder::nodes("Test")
509 .limit(10)
510 .compile()
511 .expect("compile");
512
513 for _ in 0..5 {
515 runtime
516 .coordinator()
517 .execute_compiled_read(&compiled)
518 .expect("query");
519 }
520
521 let snap = runtime.telemetry_snapshot();
522 assert!(
523 snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
524 "expected cache activity, got hits={} misses={}",
525 snap.sqlite_cache.cache_hits,
526 snap.sqlite_cache.cache_misses,
527 );
528 }
529}