1use std::path::Path;
2use std::sync::Arc;
3
4use fathomdb_schema::SchemaManager;
5
6use crate::{
7 AdminHandle, AdminService, EngineError, ExecutionCoordinator, ProvenanceMode, WriterActor,
8 database_lock::DatabaseLock,
9 telemetry::{TelemetryCounters, TelemetryLevel, TelemetrySnapshot},
10};
11
12#[derive(Debug)]
25pub struct EngineRuntime {
26 telemetry: Arc<TelemetryCounters>,
27 coordinator: ExecutionCoordinator,
28 writer: WriterActor,
29 admin: AdminHandle,
30 _lock: DatabaseLock,
31}
32
33#[allow(clippy::used_underscore_items)]
35const _: () = {
36 fn _assert_send_sync<T: Send + Sync>() {}
37 fn _check() {
38 _assert_send_sync::<EngineRuntime>();
39 }
40};
41
42impl EngineRuntime {
43 pub fn open(
47 path: impl AsRef<Path>,
48 provenance_mode: ProvenanceMode,
49 vector_dimension: Option<usize>,
50 read_pool_size: usize,
51 telemetry_level: TelemetryLevel,
52 ) -> Result<Self, EngineError> {
53 let lock = DatabaseLock::acquire(path.as_ref())?;
54
55 if read_pool_size == 0 {
56 return Err(EngineError::InvalidConfig(
57 "read_pool_size must be >= 1, got 0".to_owned(),
58 ));
59 }
60
61 trace_info!(
62 path = %path.as_ref().display(),
63 provenance_mode = ?provenance_mode,
64 vector_dimension = ?vector_dimension,
65 read_pool_size,
66 telemetry_level = ?telemetry_level,
67 "engine opening"
68 );
69 let _ = telemetry_level; let telemetry = Arc::new(TelemetryCounters::default());
71 let schema_manager = Arc::new(SchemaManager::new());
72 let coordinator = ExecutionCoordinator::open(
73 path.as_ref(),
74 Arc::clone(&schema_manager),
75 vector_dimension,
76 read_pool_size,
77 Arc::clone(&telemetry),
78 )?;
79 let writer = WriterActor::start(
80 path.as_ref(),
81 Arc::clone(&schema_manager),
82 provenance_mode,
83 Arc::clone(&telemetry),
84 )?;
85 let admin = AdminHandle::new(AdminService::new(path.as_ref(), schema_manager));
86
87 trace_info!(path = %path.as_ref().display(), "engine opened");
88 Ok(Self {
89 telemetry,
90 coordinator,
91 writer,
92 admin,
93 _lock: lock,
94 })
95 }
96
97 pub fn coordinator(&self) -> &ExecutionCoordinator {
98 &self.coordinator
99 }
100
101 pub fn writer(&self) -> &WriterActor {
102 &self.writer
103 }
104
105 pub fn admin(&self) -> &AdminHandle {
106 &self.admin
107 }
108
109 pub fn telemetry(&self) -> &Arc<TelemetryCounters> {
111 &self.telemetry
112 }
113
114 #[must_use]
117 pub fn telemetry_snapshot(&self) -> TelemetrySnapshot {
118 let mut snapshot = self.telemetry.snapshot();
119 snapshot.sqlite_cache = self.coordinator.aggregate_cache_status();
120 snapshot
121 }
122}
123
124#[cfg(test)]
125#[allow(clippy::expect_used)]
126mod tests {
127 use std::sync::Arc;
128
129 use fathomdb_query::QueryBuilder;
130
131 use crate::{
132 ChunkInsert, ChunkPolicy, NodeInsert, ProvenanceMode, TelemetryLevel, WriteRequest,
133 };
134
135 use super::EngineRuntime;
136
137 #[test]
139 fn concurrent_reads_from_multiple_threads() {
140 let dir = tempfile::tempdir().expect("temp dir");
141 let runtime = Arc::new(
142 EngineRuntime::open(
143 dir.path().join("test.db"),
144 ProvenanceMode::Warn,
145 None,
146 4,
147 TelemetryLevel::Counters,
148 )
149 .expect("open"),
150 );
151
152 runtime
153 .writer()
154 .submit(WriteRequest {
155 label: "seed".to_owned(),
156 nodes: vec![NodeInsert {
157 row_id: "r1".to_owned(),
158 logical_id: "t:1".to_owned(),
159 kind: "Test".to_owned(),
160 properties: r#"{"v":1}"#.to_owned(),
161 source_ref: Some("test".to_owned()),
162 upsert: false,
163 chunk_policy: ChunkPolicy::Preserve,
164 content_ref: None,
165 }],
166 node_retires: vec![],
167 edges: vec![],
168 edge_retires: vec![],
169 chunks: vec![ChunkInsert {
170 id: "c1".to_owned(),
171 node_logical_id: "t:1".to_owned(),
172 text_content: "hello world".to_owned(),
173 byte_start: None,
174 byte_end: None,
175 content_hash: None,
176 }],
177 runs: vec![],
178 steps: vec![],
179 actions: vec![],
180 optional_backfills: vec![],
181 vec_inserts: vec![],
182 operational_writes: vec![],
183 })
184 .expect("seed write");
185
186 let compiled = QueryBuilder::nodes("Test")
187 .limit(10)
188 .compile()
189 .expect("compile");
190
191 let handles: Vec<_> = (0..4)
192 .map(|_| {
193 let rt = Arc::clone(&runtime);
194 let q = compiled.clone();
195 std::thread::spawn(move || {
196 let rows = rt
197 .coordinator()
198 .execute_compiled_read(&q)
199 .expect("query succeeds");
200 assert_eq!(rows.nodes.len(), 1);
201 })
202 })
203 .collect();
204
205 for h in handles {
206 h.join().expect("worker thread panicked");
207 }
208 }
209
210 #[test]
211 fn open_same_database_twice_returns_database_locked() {
212 let dir = tempfile::tempdir().expect("temp dir");
213 let db_path = dir.path().join("test.db");
214
215 let _first = EngineRuntime::open(
216 &db_path,
217 ProvenanceMode::Warn,
218 None,
219 4,
220 TelemetryLevel::Counters,
221 )
222 .expect("open");
223 let second = EngineRuntime::open(
224 &db_path,
225 ProvenanceMode::Warn,
226 None,
227 4,
228 TelemetryLevel::Counters,
229 );
230
231 assert!(second.is_err(), "second open must fail");
232 let err = second.expect_err("second open must fail");
233 assert!(
234 matches!(err, crate::EngineError::DatabaseLocked(_)),
235 "expected DatabaseLocked, got: {err:?}"
236 );
237 assert!(
238 err.to_string().contains("already in use"),
239 "error must mention 'already in use': {err}"
240 );
241 }
242
243 #[test]
244 fn database_reopens_after_drop() {
245 let dir = tempfile::tempdir().expect("temp dir");
246 let db_path = dir.path().join("test.db");
247
248 {
249 let _runtime = EngineRuntime::open(
250 &db_path,
251 ProvenanceMode::Warn,
252 None,
253 4,
254 TelemetryLevel::Counters,
255 )
256 .expect("first open");
257 }
258
259 let runtime = EngineRuntime::open(
260 &db_path,
261 ProvenanceMode::Warn,
262 None,
263 4,
264 TelemetryLevel::Counters,
265 )
266 .expect("reopen");
267 let compiled = QueryBuilder::nodes("Test")
268 .limit(10)
269 .compile()
270 .expect("compile");
271 let rows = runtime
272 .coordinator()
273 .execute_compiled_read(&compiled)
274 .expect("query");
275 assert!(rows.nodes.is_empty());
276 }
277
278 #[test]
279 fn lock_error_includes_pid() {
280 let dir = tempfile::tempdir().expect("temp dir");
281 let db_path = dir.path().join("test.db");
282
283 let _first = EngineRuntime::open(
284 &db_path,
285 ProvenanceMode::Warn,
286 None,
287 4,
288 TelemetryLevel::Counters,
289 )
290 .expect("open");
291 let err = EngineRuntime::open(
292 &db_path,
293 ProvenanceMode::Warn,
294 None,
295 4,
296 TelemetryLevel::Counters,
297 )
298 .expect_err("second open must fail");
299
300 let msg = err.to_string();
301 assert!(
302 msg.contains("already in use"),
303 "error must mention 'already in use': {msg}"
304 );
305 if cfg!(unix) {
308 let our_pid = std::process::id().to_string();
309 assert!(
310 msg.contains(&our_pid),
311 "error must contain holder pid {our_pid}: {msg}"
312 );
313 }
314 }
315
316 #[test]
319 fn drop_joins_writer_and_checkpoints_wal() {
320 let dir = tempfile::tempdir().expect("temp dir");
321 let db_path = dir.path().join("test.db");
322 let wal_path = dir.path().join("test.db-wal");
323
324 {
325 let runtime = EngineRuntime::open(
326 &db_path,
327 ProvenanceMode::Warn,
328 None,
329 4,
330 TelemetryLevel::Counters,
331 )
332 .expect("open");
333
334 runtime
335 .writer()
336 .submit(WriteRequest {
337 label: "seed".to_owned(),
338 nodes: vec![NodeInsert {
339 row_id: "r1".to_owned(),
340 logical_id: "t:1".to_owned(),
341 kind: "Test".to_owned(),
342 properties: r#"{"v":1}"#.to_owned(),
343 source_ref: Some("test".to_owned()),
344 upsert: false,
345 chunk_policy: ChunkPolicy::Preserve,
346 content_ref: None,
347 }],
348 node_retires: vec![],
349 edges: vec![],
350 edge_retires: vec![],
351 chunks: vec![],
352 runs: vec![],
353 steps: vec![],
354 actions: vec![],
355 optional_backfills: vec![],
356 vec_inserts: vec![],
357 operational_writes: vec![],
358 })
359 .expect("seed write");
360 }
361 assert!(
363 !wal_path.exists(),
364 "WAL file should be cleaned up after graceful drop"
365 );
366
367 let runtime = EngineRuntime::open(
369 &db_path,
370 ProvenanceMode::Warn,
371 None,
372 4,
373 TelemetryLevel::Counters,
374 )
375 .expect("reopen");
376 let compiled = QueryBuilder::nodes("Test")
377 .limit(10)
378 .compile()
379 .expect("compile");
380 let rows = runtime
381 .coordinator()
382 .execute_compiled_read(&compiled)
383 .expect("query");
384 assert_eq!(rows.nodes.len(), 1);
385 }
386
387 fn seeded_runtime() -> (tempfile::TempDir, EngineRuntime) {
389 let dir = tempfile::tempdir().expect("temp dir");
390 let runtime = EngineRuntime::open(
391 dir.path().join("test.db"),
392 ProvenanceMode::Warn,
393 None,
394 4,
395 TelemetryLevel::Counters,
396 )
397 .expect("open");
398
399 runtime
400 .writer()
401 .submit(WriteRequest {
402 label: "seed".to_owned(),
403 nodes: vec![NodeInsert {
404 row_id: "r1".to_owned(),
405 logical_id: "t:1".to_owned(),
406 kind: "Test".to_owned(),
407 properties: r#"{"v":1}"#.to_owned(),
408 source_ref: Some("test".to_owned()),
409 upsert: false,
410 chunk_policy: ChunkPolicy::Preserve,
411 content_ref: None,
412 }],
413 node_retires: vec![],
414 edges: vec![],
415 edge_retires: vec![],
416 chunks: vec![ChunkInsert {
417 id: "c1".to_owned(),
418 node_logical_id: "t:1".to_owned(),
419 text_content: "hello world".to_owned(),
420 byte_start: None,
421 byte_end: None,
422 content_hash: None,
423 }],
424 runs: vec![],
425 steps: vec![],
426 actions: vec![],
427 optional_backfills: vec![],
428 vec_inserts: vec![],
429 operational_writes: vec![],
430 })
431 .expect("seed write");
432
433 (dir, runtime)
434 }
435
436 #[test]
437 fn telemetry_counts_queries() {
438 let (_dir, runtime) = seeded_runtime();
439 let compiled = QueryBuilder::nodes("Test")
440 .limit(10)
441 .compile()
442 .expect("compile");
443
444 for _ in 0..3 {
445 runtime
446 .coordinator()
447 .execute_compiled_read(&compiled)
448 .expect("query");
449 }
450
451 let snap = runtime.telemetry_snapshot();
452 assert!(
453 snap.queries_total >= 3,
454 "expected at least 3 queries, got {}",
455 snap.queries_total,
456 );
457 }
458
459 #[test]
460 fn telemetry_counts_writes() {
461 let (_dir, runtime) = seeded_runtime();
462
463 runtime
465 .writer()
466 .submit(WriteRequest {
467 label: "second".to_owned(),
468 nodes: vec![NodeInsert {
469 row_id: "r2".to_owned(),
470 logical_id: "t:2".to_owned(),
471 kind: "Test".to_owned(),
472 properties: r#"{"v":2}"#.to_owned(),
473 source_ref: Some("test".to_owned()),
474 upsert: false,
475 chunk_policy: ChunkPolicy::Preserve,
476 content_ref: None,
477 }],
478 node_retires: vec![],
479 edges: vec![],
480 edge_retires: vec![],
481 chunks: vec![],
482 runs: vec![],
483 steps: vec![],
484 actions: vec![],
485 optional_backfills: vec![],
486 vec_inserts: vec![],
487 operational_writes: vec![],
488 })
489 .expect("second write");
490
491 let snap = runtime.telemetry_snapshot();
492 assert!(
493 snap.writes_total >= 2,
494 "expected at least 2 writes, got {}",
495 snap.writes_total,
496 );
497 }
498
499 #[test]
500 fn telemetry_counts_write_rows() {
501 let (_dir, runtime) = seeded_runtime();
502 let snap = runtime.telemetry_snapshot();
504 assert!(
505 snap.write_rows_total >= 2,
506 "expected at least 2 write rows, got {}",
507 snap.write_rows_total,
508 );
509 }
510
511 #[test]
512 fn telemetry_snapshot_includes_cache_status() {
513 let (_dir, runtime) = seeded_runtime();
514 let compiled = QueryBuilder::nodes("Test")
515 .limit(10)
516 .compile()
517 .expect("compile");
518
519 for _ in 0..5 {
521 runtime
522 .coordinator()
523 .execute_compiled_read(&compiled)
524 .expect("query");
525 }
526
527 let snap = runtime.telemetry_snapshot();
528 assert!(
529 snap.sqlite_cache.cache_hits + snap.sqlite_cache.cache_misses > 0,
530 "expected cache activity, got hits={} misses={}",
531 snap.sqlite_cache.cache_hits,
532 snap.sqlite_cache.cache_misses,
533 );
534 }
535}