1mod catalog_ops;
8mod file_scan;
9mod fs_ops;
10mod handle_registry;
11mod helpers;
12mod maintenance;
13mod open;
14mod read;
15mod recover;
16mod replay;
17pub(crate) mod row_materialize;
18mod row_merge;
19pub(crate) mod row_paths;
20pub(crate) use row_paths::validate_unknown_fields_for_multiseg_schema;
21mod segment_write;
22mod write;
23mod writer_registry;
24pub(crate) use row_materialize::{build_non_pk_values_in_schema_order, row_value_at_path};
25
26pub(crate) use handle_registry::SharedDbHandle;
27pub use handle_registry::SharedDbState;
28
29pub use file_scan::{
30 read_header_and_superblocks, scan_database_file, scan_database_store, select_superblock,
31 DatabaseFileScan, DatabaseScanMode, SEGMENT_REGION_START,
32};
33
34use std::collections::{BTreeMap, HashMap};
35use std::marker::PhantomData;
36use std::path::{Path, PathBuf};
37use std::sync::Arc;
38
39use crate::catalog::Catalog;
40use crate::config::{OpenMode, OpenOptions, OpenRecoveryInfo};
41use crate::error::{DbError, FormatError, SchemaError};
42use crate::index::IndexState;
43use crate::index::{IndexEntry, IndexOp};
44use crate::record::{
45 encode_record_payload_v2, encode_record_payload_v3, non_pk_defs_in_order, RowValue, ScalarValue,
46};
47use crate::schema::{CollectionId, FieldDef};
48use crate::storage::{FileStore, Store, VecStore};
49use crate::validation;
50
51use self::fs_ops::{FsOps, StdFsOps};
52
53#[cfg(all(test, unix))]
54pub(crate) use maintenance::best_effort_fsync_parent_dir;
55
56pub(crate) type LatestMap = HashMap<(u32, Vec<u8>), BTreeMap<String, RowValue>>;
57
58type PlannedInsert = (
59 Vec<u8>,
60 (Vec<u8>, BTreeMap<String, RowValue>),
61 Vec<IndexEntry>,
62 ScalarValue,
63);
64
65fn plan_insert_row(
66 catalog: &Catalog,
67 collection_id: CollectionId,
68 mut row: BTreeMap<String, RowValue>,
69) -> Result<PlannedInsert, DbError> {
70 let col =
71 catalog
72 .get(collection_id)
73 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
74 id: collection_id.0,
75 }))?;
76 let pk_name =
77 col.primary_field
78 .as_deref()
79 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
80 collection_id: collection_id.0,
81 }))?;
82 let pk_def = col
83 .fields
84 .iter()
85 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
86 .ok_or(DbError::Schema(SchemaError::PrimaryFieldNotFound {
87 name: pk_name.to_string(),
88 }))?;
89 let pk_ty = &pk_def.ty;
90 validation::ensure_pk_type_primitive(pk_ty)?;
91 let mut pk_path = vec![pk_name.to_string()];
92 let pk_cell = row
93 .get(pk_name)
94 .ok_or(DbError::Schema(SchemaError::RowMissingPrimary {
95 name: pk_name.to_string(),
96 }))?;
97 validation::validate_value(&mut pk_path, pk_ty, &pk_def.constraints, pk_cell)?;
98 if let Ok(scalar) = pk_cell.clone().into_scalar() {
99 validation::ensure_pk_scalar_finite(&scalar)?;
100 }
101 let has_multi_segment_schema = col.fields.iter().any(|f| f.path.0.len() != 1);
104 if !has_multi_segment_schema {
105 validation::validate_top_level_row(&col.fields, pk_name, &row)?;
106 } else {
107 validation::validate_multiseg_row(&col.fields, pk_name, &row)?;
108 }
109
110 let pk_val = row.remove(pk_name).unwrap();
112 let pk_scalar = pk_val.clone().into_scalar()?;
113
114 let non_pk_defs = if has_multi_segment_schema {
118 col.fields
119 .iter()
120 .filter(|f| !(f.path.0.len() == 1 && f.path.0[0] == pk_name))
121 .collect::<Vec<_>>()
122 } else {
123 non_pk_defs_in_order(&col.fields, pk_name)
124 };
125 let non_pk = row_materialize::build_non_pk_values_in_schema_order(&row, &non_pk_defs)?;
126
127 let payload = if has_multi_segment_schema {
128 encode_record_payload_v3(
129 collection_id.0,
130 col.current_version.0,
131 &pk_scalar,
132 pk_ty,
133 &non_pk,
134 )?
135 } else {
136 encode_record_payload_v2(
137 collection_id.0,
138 col.current_version.0,
139 &pk_scalar,
140 pk_ty,
141 &non_pk,
142 )?
143 };
144
145 let mut full_map: BTreeMap<String, RowValue> = BTreeMap::new();
146 full_map.insert(pk_name.to_string(), pk_val);
147 for (def, v) in &non_pk {
148 let parts: Vec<String> = def.path.0.iter().map(|s| s.as_ref().to_string()).collect();
149 if parts.len() == 1 {
150 full_map.insert(parts[0].clone(), v.clone());
151 } else {
152 debug_assert!(parts.len() >= 2);
153 row_merge::merge_non_pk_into_full_map(&mut full_map, &parts, v);
154 }
155 }
156 let mut index_entries: Vec<IndexEntry> = Vec::new();
157 for idx in &col.indexes {
158 let Some(v) = scalar_at_path(&full_map, &idx.path) else {
159 if idx.kind == crate::schema::IndexKind::Unique {
160 #[cfg(feature = "tracing")]
161 tracing::warn!(
162 collection_id = collection_id.0,
163 index = %idx.name,
164 "unique index field absent or null; row is not indexed (SQL NULL semantics)"
165 );
166 }
167 continue;
168 };
169 index_entries.push(IndexEntry {
170 collection_id: collection_id.0,
171 index_name: idx.name.clone(),
172 kind: idx.kind,
173 op: IndexOp::Insert,
174 index_key: v.canonical_key_bytes(),
175 pk_key: pk_scalar.canonical_key_bytes(),
176 });
177 }
178 let pk_key = pk_scalar.canonical_key_bytes();
179 Ok((payload, (pk_key, full_map), index_entries, pk_scalar))
180}
181
182pub(crate) fn index_deletes_for_existing_row(
183 collection_id: CollectionId,
184 pk_scalar: &ScalarValue,
185 indexes: &[crate::schema::IndexDef],
186 existing_row: &BTreeMap<String, RowValue>,
187) -> Vec<IndexEntry> {
188 let mut out = Vec::new();
189 for idx in indexes {
190 let Some(v) = scalar_at_path(existing_row, &idx.path) else {
191 continue;
192 };
193 out.push(IndexEntry {
194 collection_id: collection_id.0,
195 index_name: idx.name.clone(),
196 kind: idx.kind,
197 op: IndexOp::Delete,
198 index_key: v.canonical_key_bytes(),
199 pk_key: pk_scalar.canonical_key_bytes(),
200 });
201 }
202 out
203}
204
205pub fn rebuild_indexes_from_latest(
207 catalog: &Catalog,
208 latest: &LatestMap,
209) -> Result<IndexState, DbError> {
210 let mut state = IndexState::default();
211 for col in catalog.collections() {
212 let Some(pk_name) = col.primary_field.as_deref() else {
213 continue;
214 };
215 let Some(pk_def) = col
216 .fields
217 .iter()
218 .find(|f| f.path.0.len() == 1 && f.path.0[0] == pk_name)
219 else {
220 continue;
221 };
222 for ((cid, _), row) in latest.iter() {
223 if *cid != col.id.0 {
224 continue;
225 }
226 let Ok(pk_scalar) = row
227 .get(pk_name)
228 .cloned()
229 .ok_or(())
230 .and_then(|c| c.into_scalar().map_err(|_| ()))
231 else {
232 continue;
233 };
234 if !pk_scalar.ty_matches(&pk_def.ty) {
235 continue;
236 }
237 for idx in &col.indexes {
238 let Some(v) = scalar_at_path(row, &idx.path) else {
239 continue;
240 };
241 state.apply(IndexEntry {
242 collection_id: col.id.0,
243 index_name: idx.name.clone(),
244 kind: idx.kind,
245 op: IndexOp::Insert,
246 index_key: v.canonical_key_bytes(),
247 pk_key: pk_scalar.canonical_key_bytes(),
248 })?;
249 }
250 }
251 }
252 Ok(state)
253}
254
255fn index_snapshot(entries: &mut [IndexEntry]) {
256 entries.sort_by(|a, b| {
257 let kind_key = |k: crate::schema::IndexKind| match k {
258 crate::schema::IndexKind::Unique => 0u8,
259 crate::schema::IndexKind::NonUnique => 1u8,
260 };
261 a.collection_id
262 .cmp(&b.collection_id)
263 .then_with(|| a.index_name.cmp(&b.index_name))
264 .then_with(|| kind_key(a.kind).cmp(&kind_key(b.kind)))
265 .then_with(|| a.index_key.cmp(&b.index_key))
266 .then_with(|| a.pk_key.cmp(&b.pk_key))
267 });
268}
269
270pub fn verify_indexes_match_rows(
272 catalog: &Catalog,
273 latest: &LatestMap,
274 indexes: &IndexState,
275) -> Result<(), DbError> {
276 let expected = rebuild_indexes_from_latest(catalog, latest)?;
277 let mut got = indexes.entries_for_checkpoint();
278 let mut want = expected.entries_for_checkpoint();
279 index_snapshot(&mut got);
280 index_snapshot(&mut want);
281 if got != want {
282 return Err(DbError::Format(FormatError::InvalidCatalogPayload {
283 message: "index state does not match row data".into(),
284 }));
285 }
286 Ok(())
287}
288
289pub(crate) struct TxnStaging {
291 pub(crate) txn_id: u64,
292 pub(crate) shadow_catalog: Catalog,
293 pub(crate) shadow_latest: LatestMap,
294 pub(crate) shadow_indexes: IndexState,
295 pub(crate) pending: Vec<(crate::segments::header::SegmentType, Vec<u8>)>,
296}
297
298pub struct Database<S: Store = FileStore> {
300 path: PathBuf,
302 store: S,
303 catalog: Catalog,
305 segment_start: u64,
307 format_minor: u16,
309 latest: LatestMap,
311 indexes: IndexState,
313 txn_seq: u64,
315 txn_staging: Option<TxnStaging>,
317 #[allow(dead_code)]
319 writer_registry: Option<writer_registry::WriterRegistryGuard>,
320 shared_mirror: Option<SharedDbHandle>,
322 read_only_attached: bool,
324 recovery_info: OpenRecoveryInfo,
326 #[cfg(test)]
328 #[doc(hidden)]
329 #[allow(clippy::type_complexity)]
330 pub(crate) test_poison_planned_replace_row:
331 Option<fn(CollectionId, &mut BTreeMap<String, RowValue>)>,
332 #[cfg(test)]
334 #[doc(hidden)]
335 pub(crate) test_poison_delete_encode_scalar: Option<fn(ScalarValue) -> ScalarValue>,
336}
337
338impl<S: Store> Database<S> {
339 pub(crate) fn open_with_store(
340 path: PathBuf,
341 store: S,
342 opts: OpenOptions,
343 ) -> Result<Self, DbError> {
344 open::open_with_store(path, store, opts)
345 }
346
347 #[cfg(test)]
349 #[doc(hidden)]
350 pub(crate) fn test_arm_replace_encode_poison_once(
351 &mut self,
352 poison: fn(CollectionId, &mut BTreeMap<String, RowValue>),
353 ) {
354 self.test_poison_planned_replace_row = Some(poison);
355 }
356
357 #[cfg(test)]
358 #[doc(hidden)]
359 pub(crate) fn test_arm_delete_encode_poison_once(
360 &mut self,
361 poison: fn(ScalarValue) -> ScalarValue,
362 ) {
363 self.test_poison_delete_encode_scalar = Some(poison);
364 }
365
366 #[cfg(test)]
367 #[doc(hidden)]
368 pub(crate) fn test_catalog_mut(&mut self) -> &mut Catalog {
369 &mut self.catalog
370 }
371
372 #[cfg(test)]
374 #[doc(hidden)]
375 pub(crate) fn test_write_latest_cell_unchecked(
376 &mut self,
377 collection_id: CollectionId,
378 pk: &ScalarValue,
379 field: &str,
380 value: RowValue,
381 ) {
382 let pk_key = pk.canonical_key_bytes();
383 let row = self
384 .latest
385 .get_mut(&(collection_id.0, pk_key))
386 .expect("test_write_latest_cell_unchecked: unknown row key");
387 row.insert(field.to_string(), value);
388 }
389}
390
391pub struct Collection<'a, S: Store, T: crate::schema::DbModel> {
392 db: &'a Database<S>,
393 collection_id: CollectionId,
394 _marker: PhantomData<T>,
395}
396
397impl<'a, S: Store, T: crate::schema::DbModel> Collection<'a, S, T> {
398 pub fn where_eq(
399 &self,
400 path: crate::schema::FieldPath,
401 value: ScalarValue,
402 ) -> QueryBuilder<'a, S, T> {
403 QueryBuilder {
404 db: self.db,
405 collection_id: self.collection_id,
406 predicate: Some(crate::query::Predicate::Eq { path, value }),
407 limit: None,
408 _marker: PhantomData,
409 }
410 }
411
412 pub fn all(&self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
413 let q = crate::query::Query {
414 collection: self.collection_id,
415 predicate: None,
416 limit: None,
417 order_by: None,
418 };
419 let rows = self.db.query(&q)?;
420 Ok(rows.into_iter().map(project_row::<T>).collect())
421 }
422}
423
424pub struct QueryBuilder<'a, S: Store, T: crate::schema::DbModel> {
425 db: &'a Database<S>,
426 collection_id: CollectionId,
427 predicate: Option<crate::query::Predicate>,
428 limit: Option<usize>,
429 _marker: PhantomData<T>,
430}
431
432impl<'a, S: Store, T: crate::schema::DbModel> QueryBuilder<'a, S, T> {
433 pub fn limit(mut self, n: usize) -> Self {
434 self.limit = Some(n);
435 self
436 }
437
438 pub fn all(self) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
439 let q = crate::query::Query {
440 collection: self.collection_id,
441 predicate: self.predicate,
442 limit: self.limit,
443 order_by: None,
444 };
445 let rows = self.db.query(&q)?;
446 Ok(rows.into_iter().map(project_row::<T>).collect())
447 }
448
449 pub fn explain(self) -> Result<String, DbError> {
450 let q = crate::query::Query {
451 collection: self.collection_id,
452 predicate: self.predicate,
453 limit: self.limit,
454 order_by: None,
455 };
456 self.db.explain_query(&q)
457 }
458}
459
460fn validate_subset_model<T: crate::schema::DbModel>(
461 col: &crate::catalog::CollectionInfo,
462) -> Result<(), DbError> {
463 crate::schema_compat::validate_model_fields_against_catalog(
464 col,
465 T::primary_field(),
466 &T::fields(),
467 &T::indexes(),
468 )
469}
470
471pub fn row_subset_by_field_defs(
473 row: &BTreeMap<String, RowValue>,
474 wanted: &[FieldDef],
475) -> BTreeMap<String, RowValue> {
476 let mut out: BTreeMap<String, RowValue> = BTreeMap::new();
477 for f in wanted {
478 let segs = &f.path.0;
479 if segs.is_empty() {
480 continue;
481 }
482 let Some(leaf) = row_value_at_path_segments(row, segs) else {
483 continue;
484 };
485 let root = segs[0].to_string();
486 if segs.len() == 1 {
487 out.insert(root, leaf);
488 } else {
489 let nested = row_value_nested_object_path(&segs[1..], leaf);
490 match out.get_mut(&root) {
491 Some(existing) => merge_row_value_trees(existing, nested),
492 None => {
493 out.insert(root, nested);
494 }
495 }
496 }
497 }
498 out
499}
500
501pub(crate) fn row_value_at_path_segments(
502 row: &BTreeMap<String, RowValue>,
503 path: &[std::borrow::Cow<'static, str>],
504) -> Option<RowValue> {
505 if path.is_empty() {
506 return None;
507 }
508 let mut cur = row.get(path[0].as_ref())?;
509 for seg in path.iter().skip(1) {
510 cur = match cur {
511 RowValue::Object(m) => m.get(seg.as_ref())?,
512 RowValue::None => return None,
513 _ => return None,
514 };
515 }
516 Some(cur.clone())
517}
518
519fn row_value_nested_object_path(
521 segments: &[std::borrow::Cow<'static, str>],
522 leaf: RowValue,
523) -> RowValue {
524 debug_assert!(!segments.is_empty());
525 if segments.len() == 1 {
526 let mut m = BTreeMap::new();
527 m.insert(segments[0].to_string(), leaf);
528 RowValue::Object(m)
529 } else {
530 let mut m = BTreeMap::new();
531 m.insert(
532 segments[0].to_string(),
533 row_value_nested_object_path(&segments[1..], leaf),
534 );
535 RowValue::Object(m)
536 }
537}
538
539fn merge_row_value_trees(into: &mut RowValue, from: RowValue) {
540 match (&mut *into, from) {
541 (RowValue::Object(m1), RowValue::Object(m2)) => {
542 for (k, v2) in m2 {
543 match m1.entry(k) {
544 std::collections::btree_map::Entry::Vacant(e) => {
545 e.insert(v2);
546 }
547 std::collections::btree_map::Entry::Occupied(mut e) => {
548 merge_row_value_trees(e.get_mut(), v2);
549 }
550 }
551 }
552 }
553 (slot, from) => *slot = from,
554 }
555}
556
557fn project_row<T: crate::schema::DbModel>(
558 row: BTreeMap<String, RowValue>,
559) -> BTreeMap<String, RowValue> {
560 row_subset_by_field_defs(&row, &T::fields())
561}
562
563pub(crate) fn scalar_at_path(
564 row: &BTreeMap<String, RowValue>,
565 path: &crate::schema::FieldPath,
566) -> Option<ScalarValue> {
567 let mut cur: Option<&RowValue> = None;
568 for (i, seg) in path.0.iter().enumerate() {
569 let key = seg.as_ref();
570 cur = match (i, cur) {
571 (0, _) => row.get(key),
572 (_, Some(RowValue::Object(map))) => map.get(key),
573 (_, Some(RowValue::None)) => return None,
574 _ => return None,
575 };
576 }
577 cur.and_then(|v| v.as_scalar())
578}
579
580impl Database<FileStore> {
581 pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
585 Self::open_with_options(path, crate::config::OpenOptions::default())
586 }
587
588 pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, DbError> {
590 Self::open_with_options(
591 path,
592 crate::config::OpenOptions {
593 recovery: crate::config::RecoveryMode::Strict,
594 mode: OpenMode::ReadOnly,
595 },
596 )
597 }
598
599 pub fn open_with_options(
601 path: impl AsRef<Path>,
602 opts: crate::config::OpenOptions,
603 ) -> Result<Self, DbError> {
604 let path = path.as_ref().to_path_buf();
605 if opts.mode == OpenMode::ReadOnly && writer_registry::is_writable_open(&path) {
606 if let Some(shared) = handle_registry::get(&path) {
607 let state = {
608 let g = shared.read().map_err(|_| {
609 DbError::Io(std::io::Error::other("shared database lock poisoned"))
610 })?;
611 Arc::clone(&*g)
612 };
613 let db = Self {
614 path: path.clone(),
615 store: FileStore::open_locked(&path, OpenMode::ReadOnly)?,
616 catalog: state.catalog.clone(),
617 segment_start: state.segment_start,
618 format_minor: state.format_minor,
619 latest: state.latest.clone(),
620 indexes: state.indexes.clone(),
621 txn_seq: 0,
622 txn_staging: None,
623 writer_registry: None,
624 shared_mirror: Some(shared),
625 read_only_attached: true,
626 recovery_info: OpenRecoveryInfo::default(),
627 #[cfg(test)]
628 test_poison_planned_replace_row: None,
629 #[cfg(test)]
630 test_poison_delete_encode_scalar: None,
631 };
632 return Ok(db);
633 }
634 }
635 let store = FileStore::open_locked(&path, opts.mode)?;
636 let mut db = Self::open_with_store(path.clone(), store, opts)?;
637 if opts.mode == OpenMode::ReadWrite {
638 db.writer_registry = Some(writer_registry::WriterRegistryGuard::new(path.clone())?);
639 db.shared_mirror = Some(handle_registry::register(
640 &path,
641 handle_registry::SharedDbState {
642 catalog: db.catalog.clone(),
643 latest: db.latest.clone(),
644 indexes: db.indexes.clone(),
645 segment_start: db.segment_start,
646 format_minor: db.format_minor,
647 generation: 0,
648 },
649 )?);
650 }
651 Ok(db)
652 }
653}
654
655impl Database<VecStore> {
656 pub fn open_in_memory() -> Result<Self, DbError> {
658 Self::open_in_memory_with_options(crate::config::OpenOptions::default())
659 }
660
661 pub fn open_in_memory_with_options(opts: crate::config::OpenOptions) -> Result<Self, DbError> {
663 Self::open_with_store(PathBuf::from(":memory:"), VecStore::new(), opts)
664 }
665
666 pub fn from_snapshot_bytes(bytes: Vec<u8>) -> Result<Self, DbError> {
668 Self::open_with_store(
669 PathBuf::from(":memory:"),
670 VecStore::from_vec(bytes),
671 crate::config::OpenOptions::default(),
672 )
673 }
674
675 pub fn into_snapshot_bytes(self) -> Vec<u8> {
677 self.store.into_inner()
678 }
679
680 pub fn snapshot_bytes(&self) -> Vec<u8> {
682 self.store.as_slice().to_vec()
683 }
684
685 pub fn export_snapshot_to_path(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
687 Self::export_snapshot_to_path_with_fsops(&StdFsOps, dest_path, &self.snapshot_bytes())
688 }
689
690 pub(crate) fn export_snapshot_to_path_with_fsops(
691 fs: &dyn fs_ops::FsOps,
692 dest_path: impl AsRef<Path>,
693 bytes: &[u8],
694 ) -> Result<(), DbError> {
695 fs.write(dest_path.as_ref(), bytes).map_err(DbError::Io)?;
696 Ok(())
697 }
698
699 pub fn open_snapshot_path(path: impl AsRef<Path>) -> Result<Self, DbError> {
701 let bytes = StdFsOps.read(path.as_ref()).map_err(DbError::Io)?;
702 Self::from_snapshot_bytes(bytes)
703 }
704}
705
706#[cfg(test)]
707mod scalar_at_path_tests {
708 include!(concat!(
709 env!("CARGO_MANIFEST_DIR"),
710 "/tests/unit/src_db_mod_scalar_at_path_tests.rs"
711 ));
712}
713
714#[cfg(test)]
715mod tests {
716 include!(concat!(
717 env!("CARGO_MANIFEST_DIR"),
718 "/tests/unit/src_db_mod_tests.rs"
719 ));
720}