1use std::{
27 collections::BTreeMap,
28 io,
29 path::{Path, PathBuf},
30 sync::atomic::{AtomicU64, Ordering},
31};
32
33use bincode::{config::standard, decode_from_slice, encode_to_vec};
34
35use crate::infinitedb_core::{
36 address::{Address, DimensionVector, RevisionId, SpaceId},
37 block::{Block, BlockId, Record},
38 branch::{Branch, BranchId, BranchRegistry},
39 snapshot::{Snapshot, SnapshotId},
40 space::{SpaceConfig, SpaceRegistry},
41};
42use crate::infinitedb_index::{
43 composite::{CompositeKey, Dimension, KeyConfig},
44};
45use crate::infinitedb_storage::{
46 nvme::{compute_checksum, BlockStore},
47 wal::{WalEntry, WalWriter},
48};
49
50pub struct InfiniteDb {
57 store: BlockStore,
58 wal: WalWriter,
59 spaces: SpaceRegistry,
60 branches: BranchRegistry,
61 buffer: Vec<Record>,
63 revision: AtomicU64,
65 next_block_id: AtomicU64,
67 next_snapshot_id: AtomicU64,
69 next_branch_id: AtomicU64,
71 snapshots: BTreeMap<u64, Snapshot>,
73 flush_threshold: usize,
75}
76
77impl InfiniteDb {
78 pub fn open<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
80 let root = dir.as_ref().to_path_buf();
81 let store = BlockStore::open(root.clone())?;
82 let wal_path = store.wal_path();
83
84 let recovered = recover_wal(&wal_path)?;
86
87 let wal = WalWriter::open(wal_path)?;
88
89 let (spaces, branches, snapshots, next_rev, next_block, next_snap) =
91 load_meta(&store).unwrap_or_else(default_meta);
92
93 let mut db = Self {
94 store,
95 wal,
96 spaces,
97 branches,
98 buffer: Vec::new(),
99 revision: AtomicU64::new(next_rev),
100 next_block_id: AtomicU64::new(next_block),
101 next_snapshot_id: AtomicU64::new(next_snap),
102 next_branch_id: AtomicU64::new(2), snapshots,
104 flush_threshold: 256,
105 };
106
107 for entry in recovered {
109 db.apply_wal_entry(entry)?;
110 }
111
112 if db.branches.get_by_name("main").is_none() {
114 let snap_id = db.alloc_snapshot_id();
115 let _ = db.branches.insert(Branch {
117 id: BranchId(1),
118 name: "main".to_string(),
119 head: snap_id,
120 parent: None,
121 forked_at: RevisionId::ZERO,
122 });
123 }
124
125 Ok(db)
126 }
127
128 pub fn register_space(&mut self, config: SpaceConfig) -> Result<(), String> {
134 self.spaces.register(config).map_err(|e| format!("{:?}", e))?;
135 self.persist_meta().map_err(|e| e.to_string())?;
136 Ok(())
137 }
138
139 pub fn insert(
146 &mut self,
147 space: SpaceId,
148 point: DimensionVector,
149 data: Vec<u8>,
150 ) -> io::Result<RevisionId> {
151 let rev = self.next_revision();
152 let address = Address::new(space, point);
153 let entry = WalEntry::Write {
154 address: address.clone(),
155 revision: rev,
156 data: data.clone(),
157 };
158 self.wal.append(&entry)?;
159 self.buffer.push(Record {
160 address,
161 revision: rev,
162 data,
163 tombstone: false,
164 });
165 if self.buffer.len() >= self.flush_threshold {
166 self.flush(space)?;
167 }
168 Ok(rev)
169 }
170
171 pub fn delete(&mut self, space: SpaceId, point: DimensionVector) -> io::Result<RevisionId> {
173 let rev = self.next_revision();
174 let address = Address::new(space, point);
175 let entry = WalEntry::Tombstone {
176 address: address.clone(),
177 revision: rev,
178 };
179 self.wal.append(&entry)?;
180 self.buffer.push(Record {
181 address,
182 revision: rev,
183 data: vec![],
184 tombstone: true,
185 });
186 Ok(rev)
187 }
188
189 pub fn flush(&mut self, space: SpaceId) -> io::Result<()> {
191 if self.buffer.is_empty() {
192 return Ok(());
193 }
194
195 let mut remaining = Vec::new();
197 let mut records: Vec<Record> = Vec::new();
198 for record in self.buffer.drain(..) {
199 if record.address.space == space {
200 records.push(record);
201 } else {
202 remaining.push(record);
203 }
204 }
205 self.buffer = remaining;
206
207 if records.is_empty() {
208 return Ok(());
209 }
210
211 records.sort_by_key(|r| {
213 let key = hilbert_key_for(&r.address.point);
214 (key, r.revision.0)
215 });
216
217 let min_rev = records.iter().map(|r| r.revision).min().unwrap_or(RevisionId::ZERO);
218 let max_rev = records.iter().map(|r| r.revision).max().unwrap_or(RevisionId::ZERO);
219 let block_id = self.alloc_block_id();
220
221 let mut block = Block {
222 id: block_id,
223 space,
224 records,
225 min_revision: min_rev,
226 max_revision: max_rev,
227 checksum: [0u8; 32],
228 };
229 block.checksum = compute_checksum(&block)?;
230
231 self.store.write_block(&block)?;
233
234 let snap_id = self.alloc_snapshot_id();
236 self.wal.append(&WalEntry::BlockSealed {
237 block_id,
238 space,
239 snapshot: snap_id,
240 })?;
241
242 let snapshot = self.snapshots.entry(space.0).or_insert_with(|| {
244 Snapshot::root(snap_id, space)
245 });
246 let hilbert_min = block
247 .records
248 .first()
249 .map(|r| hilbert_key_for(&r.address.point))
250 .unwrap_or(0);
251 snapshot.blocks.insert(hilbert_min, block_id);
252 snapshot.revision = max_rev;
253
254 self.persist_meta()?;
255 Ok(())
256 }
257
258 pub fn current_snapshot(&self, space: SpaceId) -> Option<SnapshotId> {
264 self.snapshots.get(&space.0).map(|s| s.id)
265 }
266
267 pub fn query(
270 &mut self,
271 space: SpaceId,
272 as_of: Option<RevisionId>,
273 ) -> io::Result<Vec<Record>> {
274 self.query_inner(space, None, as_of)
275 }
276
277 pub fn query_bbox(
290 &mut self,
291 space: SpaceId,
292 min: DimensionVector,
293 max: DimensionVector,
294 as_of: Option<RevisionId>,
295 ) -> io::Result<Vec<Record>> {
296 assert_eq!(min.dims(), max.dims(), "min and max must have equal dimensions");
297 let k_min = hilbert_key_for(&min);
300 let k_max = hilbert_key_for(&max);
301 let (lo, hi) = if k_min <= k_max { (k_min, k_max) } else { (k_max, k_min) };
302 let mut results = self.query_inner(space, Some((lo, hi)), as_of)?;
303 results.retain(|r| r.address.point.within(&min, &max));
305 Ok(results)
306 }
307
308 pub fn query_subscope(
318 &mut self,
319 space: SpaceId,
320 parent_coords: &[u32],
321 as_of: Option<RevisionId>,
322 ) -> io::Result<Vec<Record>> {
323 let dims = self.spaces.get(space)
325 .map(|c| c.dims)
326 .unwrap_or(parent_coords.len() + 1);
327 assert!(
328 parent_coords.len() <= dims,
329 "parent_coords has more dimensions than the space"
330 );
331 let inner_dims = dims - parent_coords.len();
332 let mut min_coords: Vec<u32> = parent_coords.to_vec();
333 let mut max_coords: Vec<u32> = parent_coords.to_vec();
334 min_coords.extend(std::iter::repeat(0).take(inner_dims));
335 max_coords.extend(std::iter::repeat(u32::MAX).take(inner_dims));
336 self.query_bbox(
337 space,
338 DimensionVector::new(min_coords),
339 DimensionVector::new(max_coords),
340 as_of,
341 )
342 }
343
344 fn query_inner(
346 &mut self,
347 space: SpaceId,
348 key_range: Option<(u128, u128)>,
349 as_of: Option<RevisionId>,
350 ) -> io::Result<Vec<Record>> {
351 let rev_ceiling = as_of.unwrap_or_else(|| {
352 RevisionId(self.revision.load(Ordering::Relaxed))
353 });
354
355 let mut results: Vec<Record> = Vec::new();
356
357 if let Some(snapshot) = self.snapshots.get(&space.0) {
359 let block_ids: Vec<BlockId> = match key_range {
360 None => snapshot.blocks.values().copied().collect(),
361 Some((_, hi)) => {
362 snapshot.blocks.range(..=hi).map(|(_, id)| *id).collect()
366 }
367 };
368 for block_id in block_ids {
369 let block = self.store.read_block(block_id)?;
370 for record in block.records {
371 if record.revision <= rev_ceiling && !record.tombstone {
372 results.push(record);
373 }
374 }
375 }
376 }
377
378 let tombstoned: std::collections::HashSet<_> = self
381 .buffer
382 .iter()
383 .filter(|r| r.address.space == space && r.tombstone && r.revision <= rev_ceiling)
384 .map(|r| r.address.point.coords.clone())
385 .collect();
386
387 results.retain(|r| !tombstoned.contains(&r.address.point.coords));
388
389 for record in &self.buffer {
390 if record.address.space == space
391 && record.revision <= rev_ceiling
392 && !record.tombstone
393 && !tombstoned.contains(&record.address.point.coords)
394 {
395 if let Some((lo, hi)) = key_range {
396 let k = hilbert_key_for(&record.address.point);
397 if k < lo || k > hi {
398 continue;
399 }
400 }
401 results.push(record.clone());
402 }
403 }
404
405 Ok(results)
406 }
407
408 pub fn create_branch(
414 &mut self,
415 name: impl Into<String>,
416 from: BranchId,
417 ) -> Result<BranchId, String> {
418 let parent = self.branches.get(from).ok_or("Branch not found")?;
419 let new_id = BranchId(self.next_branch_id.fetch_add(1, Ordering::Relaxed));
420 let rev = RevisionId(self.revision.load(Ordering::Relaxed));
421 let branch = Branch {
422 id: new_id,
423 name: name.into(),
424 head: parent.head,
425 parent: Some(from),
426 forked_at: rev,
427 };
428 self.branches.insert(branch).map_err(|e| format!("{:?}", e))?;
429 Ok(new_id)
430 }
431
432 pub fn memory_stats(&self) -> MemoryStats {
438 let buffer_records = self.buffer.len();
439 let buffer_bytes: usize = self.buffer.iter()
440 .map(|r| 48 + r.data.len())
441 .sum();
442 let (cache_bytes, cache_blocks) = self.store.cache_stats();
443 let snapshot_entries: usize = self.snapshots.values()
444 .map(|s| s.blocks.len())
445 .sum();
446 MemoryStats {
447 buffer_records,
448 buffer_bytes,
449 cache_bytes,
450 cache_blocks,
451 snapshot_index_entries: snapshot_entries,
452 total_revision: self.revision.load(Ordering::Relaxed),
453 sealed_blocks: self.next_block_id.load(Ordering::Relaxed),
454 }
455 }
456
457 fn next_revision(&self) -> RevisionId {
462 RevisionId(self.revision.fetch_add(1, Ordering::Relaxed) + 1)
463 }
464
465 fn alloc_block_id(&self) -> BlockId {
466 BlockId(self.next_block_id.fetch_add(1, Ordering::Relaxed))
467 }
468
469 fn alloc_snapshot_id(&self) -> SnapshotId {
470 SnapshotId(self.next_snapshot_id.fetch_add(1, Ordering::Relaxed))
471 }
472
473 fn apply_wal_entry(&mut self, entry: WalEntry) -> io::Result<()> {
474 match entry {
475 WalEntry::Write { address, revision, data } => {
476 self.buffer.push(Record { address, revision, data, tombstone: false });
477 }
478 WalEntry::Tombstone { address, revision } => {
479 self.buffer.push(Record { address, revision, data: vec![], tombstone: true });
480 }
481 WalEntry::BlockSealed { .. } | WalEntry::Checkpoint { .. } => {}
482 }
483 Ok(())
484 }
485
486 fn persist_meta(&mut self) -> io::Result<()> {
487 let spaces_bytes = encode_to_vec(&self.spaces, standard())
489 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
490 self.store.write_meta("spaces.bin", &spaces_bytes)?;
491
492 let counters: [u64; 3] = [
494 self.revision.load(Ordering::Relaxed),
495 self.next_block_id.load(Ordering::Relaxed),
496 self.next_snapshot_id.load(Ordering::Relaxed),
497 ];
498 let counters_bytes = encode_to_vec(&counters, standard())
499 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
500 self.store.write_meta("counters.bin", &counters_bytes)?;
501
502 Ok(())
503 }
504}
505
506fn recover_wal(wal_path: &PathBuf) -> io::Result<Vec<WalEntry>> {
511 if !wal_path.exists() {
512 return Ok(vec![]);
513 }
514 let mut reader = crate::infinitedb_storage::wal::WalReader::open(wal_path.clone())?;
515 reader.entries()
516}
517
518#[derive(Debug, Clone)]
524pub struct MemoryStats {
525 pub buffer_records: usize,
527 pub buffer_bytes: usize,
529 pub cache_bytes: usize,
531 pub cache_blocks: usize,
533 pub snapshot_index_entries: usize,
535 pub total_revision: u64,
537 pub sealed_blocks: u64,
539}
540
541impl MemoryStats {
542 pub fn total_ram_bytes(&self) -> usize {
544 self.buffer_bytes
545 + self.cache_bytes
546 + self.snapshot_index_entries * 24
548 + 4096
550 }
551
552 pub fn print(&self) {
554 println!("\n╔═══ InfiniteDb Memory Stats ═══╗");
555 println!("║ Write buffer {:>6} records ({} bytes)",
556 self.buffer_records, fmt_bytes(self.buffer_bytes));
557 println!("║ LRU block cache {:>6} blocks ({} bytes / 10 MB limit)",
558 self.cache_blocks, fmt_bytes(self.cache_bytes));
559 println!("║ Snapshot index {:>6} entries", self.snapshot_index_entries);
560 println!("║ Total revisions {:>6}", self.total_revision);
561 println!("║ Sealed blocks {:>6}", self.sealed_blocks);
562 println!("║ ──────────────────────────────────────────────");
563 println!("║ Est. total RAM {}", fmt_bytes(self.total_ram_bytes()));
564 println!("╚════════════════════════════════");
565 }
566}
567
568fn fmt_bytes(b: usize) -> String {
569 if b < 1024 { format!("{} B", b) }
570 else if b < 1024 * 1024 { format!("{:.1} KB", b as f64 / 1024.0) }
571 else { format!("{:.2} MB", b as f64 / (1024.0 * 1024.0)) }
572}
573
574#[allow(clippy::type_complexity)]
581fn load_meta(
582 store: &BlockStore,
583) -> Option<(SpaceRegistry, BranchRegistry, BTreeMap<u64, Snapshot>, u64, u64, u64)> {
584 let counters_bytes = store.read_meta("counters.bin").ok()?;
585 let (counters, _): ([u64; 3], _) = decode_from_slice(&counters_bytes, standard()).ok()?;
586 let spaces_bytes = store.read_meta("spaces.bin").ok()?;
587 let (spaces, _): (SpaceRegistry, _) = decode_from_slice(&spaces_bytes, standard()).ok()?;
588 Some((
589 spaces,
590 BranchRegistry::new(),
591 BTreeMap::new(),
592 counters[0],
593 counters[1],
594 counters[2],
595 ))
596}
597
598type MetaTuple = (SpaceRegistry, BranchRegistry, BTreeMap<u64, Snapshot>, u64, u64, u64);
600
601fn default_meta() -> MetaTuple {
602 (SpaceRegistry::new(), BranchRegistry::new(), BTreeMap::new(), 0, 1, 1)
603}
604
605fn hilbert_key_for(point: &DimensionVector) -> u128 {
608 if point.coords.is_empty() {
609 return 0;
610 }
611 let mut key = CompositeKey::new(KeyConfig::STANDARD);
612 for &c in &point.coords {
613 key = key.push(Dimension::new("_", c));
614 }
615 key.encode()
616}
617
618#[cfg(test)]
623mod tests {
624 use super::*;
625 use tempfile::TempDir;
626 use crate::infinitedb_core::address::{DimensionVector, SpaceId};
627 use crate::infinitedb_core::branch::BranchId;
628
629 fn open_tmp() -> (InfiniteDb, TempDir) {
630 let dir = TempDir::new().unwrap();
631 let db = InfiniteDb::open(dir.path()).unwrap();
632 (db, dir)
633 }
634
635 #[test]
636 fn insert_and_query_unflushed() {
637 let (mut db, _dir) = open_tmp();
638 let space = SpaceId(1);
639 db.insert(space, DimensionVector::new(vec![10, 20]), vec![1, 2, 3]).unwrap();
640 let results = db.query(space, None).unwrap();
641 assert_eq!(results.len(), 1);
642 }
643
644 #[test]
645 fn insert_flush_query() {
646 let (mut db, _dir) = open_tmp();
647 let space = SpaceId(1);
648 db.insert(space, DimensionVector::new(vec![5, 5]), vec![42]).unwrap();
649 db.flush(space).unwrap();
650 let results = db.query(space, None).unwrap();
651 assert_eq!(results.len(), 1);
652 assert_eq!(results[0].data, vec![42]);
653 }
654
655 #[test]
656 fn delete_tombstones_record() {
657 let (mut db, _dir) = open_tmp();
658 let space = SpaceId(1);
659 let point = DimensionVector::new(vec![1, 1]);
660 db.insert(space, point.clone(), vec![99]).unwrap();
661 db.delete(space, point).unwrap();
662 let results = db.query(space, None).unwrap();
663 assert!(results.iter().all(|r| !r.tombstone));
665 }
666
667 #[test]
668 fn as_of_returns_historical_state() {
669 let (mut db, _dir) = open_tmp();
670 let space = SpaceId(1);
671 let rev1 = db.insert(space, DimensionVector::new(vec![1, 0]), vec![1]).unwrap();
672 let _rev2 = db.insert(space, DimensionVector::new(vec![2, 0]), vec![2]).unwrap();
673 let results = db.query(space, Some(rev1)).unwrap();
675 assert_eq!(results.len(), 1);
676 assert_eq!(results[0].data, vec![1]);
677 }
678
679 #[test]
680 fn create_branch_succeeds() {
681 let (mut db, _dir) = open_tmp();
682 let main = BranchId(1);
683 let feature = db.create_branch("feature", main).unwrap();
684 assert_ne!(feature, main);
685 }
686}