Skip to main content

infinite_db/
db.rs

1//! `InfiniteDb` — the top-level embedded database handle.
2//!
3//! This is the single entry point for embedded use. It owns:
4//!   - A `BlockStore` (NVMe-aware file storage)
5//!   - A `WalWriter` (crash-safe append log)
6//!   - A `SpaceRegistry` (named dimension spaces)
7//!   - A `BranchRegistry` (named branch heads)
8//!   - A monotonic revision counter
9//!
10//! All writes go through the WAL before touching any block. On `open()`,
11//! the WAL is replayed to recover any in-flight writes from a prior crash.
12//!
13//! # Example
14//! ```no_run
15//! use infinitedb::InfiniteDb;
16//! use infinitedb::infinitedb_core::address::{DimensionVector, SpaceId};
17//! use infinitedb::infinitedb_core::space::{SpaceConfig, SpaceRegistry};
18//!
19//! let mut db = InfiniteDb::open("./mydb").unwrap();
20//! let space = SpaceId(1);
21//! let point = DimensionVector::new(vec![128, 64]);
22//! let data  = bincode::encode_to_vec(&42u32, bincode::config::standard()).unwrap();
23//! db.insert(space, point, data).unwrap();
24//! ```
25
26use std::{
27    collections::BTreeMap,
28    io,
29    path::{Path, PathBuf},
30    sync::atomic::{AtomicU64, Ordering},
31};
32
33use bincode::{config::standard, decode_from_slice, encode_to_vec};
34
35use crate::infinitedb_core::{
36    address::{Address, DimensionVector, RevisionId, SpaceId},
37    block::{Block, BlockId, Record},
38    branch::{Branch, BranchId, BranchRegistry},
39    snapshot::{Snapshot, SnapshotId},
40    space::{SpaceConfig, SpaceRegistry},
41};
42use crate::infinitedb_index::{
43    composite::{CompositeKey, Dimension, KeyConfig},
44};
45use crate::infinitedb_storage::{
46    nvme::{compute_checksum, BlockStore},
47    wal::{WalEntry, WalWriter},
48};
49
50// ---------------------------------------------------------------------------
51// InfiniteDb
52// ---------------------------------------------------------------------------
53
54/// The embedded database handle. Not `Send`/`Sync` — create one per thread
55/// or wrap in a `Mutex` for multi-threaded access.
56pub struct InfiniteDb {
57    store: BlockStore,
58    wal: WalWriter,
59    spaces: SpaceRegistry,
60    branches: BranchRegistry,
61    /// In-memory write buffer: accumulated records not yet sealed into a block.
62    buffer: Vec<Record>,
63    /// Monotonic revision counter. Persisted via WAL checkpoints.
64    revision: AtomicU64,
65    /// Next block ID to assign.
66    next_block_id: AtomicU64,
67    /// Next snapshot ID to assign.
68    next_snapshot_id: AtomicU64,
69    /// Next branch ID to assign.
70    next_branch_id: AtomicU64,
71    /// Active snapshot per space (space_id → snapshot).
72    snapshots: BTreeMap<u64, Snapshot>,
73    /// Flush threshold: seal a block after this many buffered records.
74    flush_threshold: usize,
75}
76
77impl InfiniteDb {
78    /// Open (or create) a database in `dir`. Replays the WAL on first open.
79    pub fn open<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
80        let root = dir.as_ref().to_path_buf();
81        let store = BlockStore::open(root.clone())?;
82        let wal_path = store.wal_path();
83
84        // Replay WAL to recover in-flight writes.
85        let recovered = recover_wal(&wal_path)?;
86
87        let wal = WalWriter::open(wal_path)?;
88
89        // Load persisted metadata (spaces, branches, snapshots) if present.
90        let (spaces, branches, snapshots, next_rev, next_block, next_snap) =
91            load_meta(&store).unwrap_or_else(default_meta);
92
93        let mut db = Self {
94            store,
95            wal,
96            spaces,
97            branches,
98            buffer: Vec::new(),
99            revision: AtomicU64::new(next_rev),
100            next_block_id: AtomicU64::new(next_block),
101            next_snapshot_id: AtomicU64::new(next_snap),
102            next_branch_id: AtomicU64::new(2), // 1 is reserved for main
103            snapshots,
104            flush_threshold: 256,
105        };
106
107        // Re-apply recovered WAL entries.
108        for entry in recovered {
109            db.apply_wal_entry(entry)?;
110        }
111
112        // Ensure a `main` branch exists.
113        if db.branches.get_by_name("main").is_none() {
114            let snap_id = db.alloc_snapshot_id();
115            // No spaces yet — main branch starts with no snapshot content.
116            let _ = db.branches.insert(Branch {
117                id: BranchId(1),
118                name: "main".to_string(),
119                head: snap_id,
120                parent: None,
121                forked_at: RevisionId::ZERO,
122            });
123        }
124
125        Ok(db)
126    }
127
128    // -----------------------------------------------------------------------
129    // Space management
130    // -----------------------------------------------------------------------
131
132    /// Register a new space. Must be called before inserting records into it.
133    pub fn register_space(&mut self, config: SpaceConfig) -> Result<(), String> {
134        self.spaces.register(config).map_err(|e| format!("{:?}", e))?;
135        self.persist_meta().map_err(|e| e.to_string())?;
136        Ok(())
137    }
138
139    // -----------------------------------------------------------------------
140    // Writes
141    // -----------------------------------------------------------------------
142
143    /// Insert or update a record. Appends a new revision to the WAL.
144    /// The record is buffered in memory; call `flush()` to seal it into a block.
145    pub fn insert(
146        &mut self,
147        space: SpaceId,
148        point: DimensionVector,
149        data: Vec<u8>,
150    ) -> io::Result<RevisionId> {
151        let rev = self.next_revision();
152        let address = Address::new(space, point);
153        let entry = WalEntry::Write {
154            address: address.clone(),
155            revision: rev,
156            data: data.clone(),
157        };
158        self.wal.append(&entry)?;
159        self.buffer.push(Record {
160            address,
161            revision: rev,
162            data,
163            tombstone: false,
164        });
165        if self.buffer.len() >= self.flush_threshold {
166            self.flush(space)?;
167        }
168        Ok(rev)
169    }
170
171    /// Logically delete a record at `point` in `space`.
172    pub fn delete(&mut self, space: SpaceId, point: DimensionVector) -> io::Result<RevisionId> {
173        let rev = self.next_revision();
174        let address = Address::new(space, point);
175        let entry = WalEntry::Tombstone {
176            address: address.clone(),
177            revision: rev,
178        };
179        self.wal.append(&entry)?;
180        self.buffer.push(Record {
181            address,
182            revision: rev,
183            data: vec![],
184            tombstone: true,
185        });
186        Ok(rev)
187    }
188
189    /// Seal all buffered records for `space` into a new block on disk.
190    pub fn flush(&mut self, space: SpaceId) -> io::Result<()> {
191        if self.buffer.is_empty() {
192            return Ok(());
193        }
194
195        // Partition: take only records for this space; keep the rest in the buffer.
196        let mut remaining = Vec::new();
197        let mut records: Vec<Record> = Vec::new();
198        for record in self.buffer.drain(..) {
199            if record.address.space == space {
200                records.push(record);
201            } else {
202                remaining.push(record);
203            }
204        }
205        self.buffer = remaining;
206
207        if records.is_empty() {
208            return Ok(());
209        }
210
211        // Sort by Hilbert key then revision.
212        records.sort_by_key(|r| {
213            let key = hilbert_key_for(&r.address.point);
214            (key, r.revision.0)
215        });
216
217        let min_rev = records.iter().map(|r| r.revision).min().unwrap_or(RevisionId::ZERO);
218        let max_rev = records.iter().map(|r| r.revision).max().unwrap_or(RevisionId::ZERO);
219        let block_id = self.alloc_block_id();
220
221        let mut block = Block {
222            id: block_id,
223            space,
224            records,
225            min_revision: min_rev,
226            max_revision: max_rev,
227            checksum: [0u8; 32],
228        };
229        block.checksum = compute_checksum(&block)?;
230
231        // Write block to NVMe store.
232        self.store.write_block(&block)?;
233
234        // Record block seal in WAL.
235        let snap_id = self.alloc_snapshot_id();
236        self.wal.append(&WalEntry::BlockSealed {
237            block_id,
238            space,
239            snapshot: snap_id,
240        })?;
241
242        // Advance the space's active snapshot.
243        let snapshot = self.snapshots.entry(space.0).or_insert_with(|| {
244            Snapshot::root(snap_id, space)
245        });
246        let hilbert_min = block
247            .records
248            .first()
249            .map(|r| hilbert_key_for(&r.address.point))
250            .unwrap_or(0);
251        snapshot.blocks.insert(hilbert_min, block_id);
252        snapshot.revision = max_rev;
253
254        self.persist_meta()?;
255        Ok(())
256    }
257
258    // -----------------------------------------------------------------------
259    // Reads
260    // -----------------------------------------------------------------------
261
262    /// Return the current snapshot ID for `space`.
263    pub fn current_snapshot(&self, space: SpaceId) -> Option<SnapshotId> {
264        self.snapshots.get(&space.0).map(|s| s.id)
265    }
266
267    /// Scan all live records in `space`, optionally capped at `as_of`.
268    /// Full-space scan — no spatial filtering. Use `query_bbox` to filter by coordinates.
269    pub fn query(
270        &mut self,
271        space: SpaceId,
272        as_of: Option<RevisionId>,
273    ) -> io::Result<Vec<Record>> {
274        self.query_inner(space, None, as_of)
275    }
276
277    /// Bounding-box query in N dimensions.
278    ///
279    /// Returns every live record in `space` whose point satisfies
280    /// `min[i] <= point[i] <= max[i]` on **every** axis simultaneously.
281    ///
282    /// Works correctly for any dimensionality (1–16 dims). A Hilbert-key
283    /// interval is used to prune candidate blocks at the block level; an
284    /// exact `within()` check per record eliminates any false positives caused
285    /// by the Hilbert curve mapping a bounding box to many disjoint intervals.
286    ///
287    /// `min` and `max` must have the same number of coordinates as the points
288    /// stored in the space.
289    pub fn query_bbox(
290        &mut self,
291        space: SpaceId,
292        min: DimensionVector,
293        max: DimensionVector,
294        as_of: Option<RevisionId>,
295    ) -> io::Result<Vec<Record>> {
296        assert_eq!(min.dims(), max.dims(), "min and max must have equal dimensions");
297        // Compute Hilbert keys for both bounding corners and use the interval
298        // as a block-level pre-filter (over-approximation — false positives OK).
299        let k_min = hilbert_key_for(&min);
300        let k_max = hilbert_key_for(&max);
301        let (lo, hi) = if k_min <= k_max { (k_min, k_max) } else { (k_max, k_min) };
302        let mut results = self.query_inner(space, Some((lo, hi)), as_of)?;
303        // Exact per-record coordinate filter removes all Hilbert false positives.
304        results.retain(|r| r.address.point.within(&min, &max));
305        Ok(results)
306    }
307
308    /// Sub-space query: pins the first `parent_coords.len()` dimensions to exact
309    /// values and leaves the remaining inner dimensions fully open (0..u32::MAX).
310    ///
311    /// This is the idiomatic way to retrieve all property records for a specific
312    /// parent element in the nested Hilbert design:
313    /// ```ignore
314    /// // All load properties of element 42:
315    /// db.query_subscope(SPACE_LOADS, &[42], None);
316    /// ```
317    pub fn query_subscope(
318        &mut self,
319        space: SpaceId,
320        parent_coords: &[u32],
321        as_of: Option<RevisionId>,
322    ) -> io::Result<Vec<Record>> {
323        // We need to know the total dims for this space to build the full vectors.
324        let dims = self.spaces.get(space)
325            .map(|c| c.dims)
326            .unwrap_or(parent_coords.len() + 1);
327        assert!(
328            parent_coords.len() <= dims,
329            "parent_coords has more dimensions than the space"
330        );
331        let inner_dims = dims - parent_coords.len();
332        let mut min_coords: Vec<u32> = parent_coords.to_vec();
333        let mut max_coords: Vec<u32> = parent_coords.to_vec();
334        min_coords.extend(std::iter::repeat(0).take(inner_dims));
335        max_coords.extend(std::iter::repeat(u32::MAX).take(inner_dims));
336        self.query_bbox(
337            space,
338            DimensionVector::new(min_coords),
339            DimensionVector::new(max_coords),
340            as_of,
341        )
342    }
343
344    // Shared core: reads from sealed blocks + the in-memory write buffer.
345    fn query_inner(
346        &mut self,
347        space: SpaceId,
348        key_range: Option<(u128, u128)>,
349        as_of: Option<RevisionId>,
350    ) -> io::Result<Vec<Record>> {
351        let rev_ceiling = as_of.unwrap_or_else(|| {
352            RevisionId(self.revision.load(Ordering::Relaxed))
353        });
354
355        let mut results: Vec<Record> = Vec::new();
356
357        // Query sealed blocks if a snapshot exists.
358        if let Some(snapshot) = self.snapshots.get(&space.0) {
359            let block_ids: Vec<BlockId> = match key_range {
360                None => snapshot.blocks.values().copied().collect(),
361                Some((_, hi)) => {
362                    // The snapshot map is keyed by the Hilbert key of each
363                    // block's first record. Include all blocks with a start key
364                    // <= hi; the within() filter handles exactness.
365                    snapshot.blocks.range(..=hi).map(|(_, id)| *id).collect()
366                }
367            };
368            for block_id in block_ids {
369                let block = self.store.read_block(block_id)?;
370                for record in block.records {
371                    if record.revision <= rev_ceiling && !record.tombstone {
372                        results.push(record);
373                    }
374                }
375            }
376        }
377
378        // Always check the in-memory buffer (records not yet flushed to disk).
379        // Collect tombstoned coordinates first so we can suppress stale sealed records.
380        let tombstoned: std::collections::HashSet<_> = self
381            .buffer
382            .iter()
383            .filter(|r| r.address.space == space && r.tombstone && r.revision <= rev_ceiling)
384            .map(|r| r.address.point.coords.clone())
385            .collect();
386
387        results.retain(|r| !tombstoned.contains(&r.address.point.coords));
388
389        for record in &self.buffer {
390            if record.address.space == space
391                && record.revision <= rev_ceiling
392                && !record.tombstone
393                && !tombstoned.contains(&record.address.point.coords)
394            {
395                if let Some((lo, hi)) = key_range {
396                    let k = hilbert_key_for(&record.address.point);
397                    if k < lo || k > hi {
398                        continue;
399                    }
400                }
401                results.push(record.clone());
402            }
403        }
404
405        Ok(results)
406    }
407
408    // -----------------------------------------------------------------------
409    // Branch management
410    // -----------------------------------------------------------------------
411
412    /// Create a new branch forked from an existing one at the current revision.
413    pub fn create_branch(
414        &mut self,
415        name: impl Into<String>,
416        from: BranchId,
417    ) -> Result<BranchId, String> {
418        let parent = self.branches.get(from).ok_or("Branch not found")?;
419        let new_id = BranchId(self.next_branch_id.fetch_add(1, Ordering::Relaxed));
420        let rev = RevisionId(self.revision.load(Ordering::Relaxed));
421        let branch = Branch {
422            id: new_id,
423            name: name.into(),
424            head: parent.head,
425            parent: Some(from),
426            forked_at: rev,
427        };
428        self.branches.insert(branch).map_err(|e| format!("{:?}", e))?;
429        Ok(new_id)
430    }
431
432    // -----------------------------------------------------------------------
433    // Diagnostics
434    // -----------------------------------------------------------------------
435
436    /// Returns a snapshot of current memory and cache usage.
437    pub fn memory_stats(&self) -> MemoryStats {
438        let buffer_records = self.buffer.len();
439        let buffer_bytes: usize = self.buffer.iter()
440            .map(|r| 48 + r.data.len())
441            .sum();
442        let (cache_bytes, cache_blocks) = self.store.cache_stats();
443        let snapshot_entries: usize = self.snapshots.values()
444            .map(|s| s.blocks.len())
445            .sum();
446        MemoryStats {
447            buffer_records,
448            buffer_bytes,
449            cache_bytes,
450            cache_blocks,
451            snapshot_index_entries: snapshot_entries,
452            total_revision: self.revision.load(Ordering::Relaxed),
453            sealed_blocks: self.next_block_id.load(Ordering::Relaxed),
454        }
455    }
456
457    // -----------------------------------------------------------------------
458    // Internal helpers
459    // -----------------------------------------------------------------------
460
461    fn next_revision(&self) -> RevisionId {
462        RevisionId(self.revision.fetch_add(1, Ordering::Relaxed) + 1)
463    }
464
465    fn alloc_block_id(&self) -> BlockId {
466        BlockId(self.next_block_id.fetch_add(1, Ordering::Relaxed))
467    }
468
469    fn alloc_snapshot_id(&self) -> SnapshotId {
470        SnapshotId(self.next_snapshot_id.fetch_add(1, Ordering::Relaxed))
471    }
472
473    fn apply_wal_entry(&mut self, entry: WalEntry) -> io::Result<()> {
474        match entry {
475            WalEntry::Write { address, revision, data } => {
476                self.buffer.push(Record { address, revision, data, tombstone: false });
477            }
478            WalEntry::Tombstone { address, revision } => {
479                self.buffer.push(Record { address, revision, data: vec![], tombstone: true });
480            }
481            WalEntry::BlockSealed { .. } | WalEntry::Checkpoint { .. } => {}
482        }
483        Ok(())
484    }
485
486    fn persist_meta(&mut self) -> io::Result<()> {
487        // Persist spaces registry.
488        let spaces_bytes = encode_to_vec(&self.spaces, standard())
489            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
490        self.store.write_meta("spaces.bin", &spaces_bytes)?;
491
492        // Persist revision counters as a simple [u64; 3].
493        let counters: [u64; 3] = [
494            self.revision.load(Ordering::Relaxed),
495            self.next_block_id.load(Ordering::Relaxed),
496            self.next_snapshot_id.load(Ordering::Relaxed),
497        ];
498        let counters_bytes = encode_to_vec(&counters, standard())
499            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
500        self.store.write_meta("counters.bin", &counters_bytes)?;
501
502        Ok(())
503    }
504}
505
506// ---------------------------------------------------------------------------
507// WAL recovery
508// ---------------------------------------------------------------------------
509
510fn recover_wal(wal_path: &PathBuf) -> io::Result<Vec<WalEntry>> {
511    if !wal_path.exists() {
512        return Ok(vec![]);
513    }
514    let mut reader = crate::infinitedb_storage::wal::WalReader::open(wal_path.clone())?;
515    reader.entries()
516}
517
518// ---------------------------------------------------------------------------
519// MemoryStats
520// ---------------------------------------------------------------------------
521
522/// Point-in-time snapshot of database memory and cache usage.
523#[derive(Debug, Clone)]
524pub struct MemoryStats {
525    /// Records currently in the in-memory write buffer (not yet flushed).
526    pub buffer_records: usize,
527    /// Approximate bytes occupied by the write buffer.
528    pub buffer_bytes: usize,
529    /// Bytes currently resident in the LRU block cache.
530    pub cache_bytes: usize,
531    /// Number of blocks held in the LRU block cache.
532    pub cache_blocks: usize,
533    /// Total Hilbert index entries across all active snapshots.
534    pub snapshot_index_entries: usize,
535    /// Highest revision number issued so far.
536    pub total_revision: u64,
537    /// Total sealed blocks written (cumulative, not current on-disk count).
538    pub sealed_blocks: u64,
539}
540
541impl MemoryStats {
542    /// Estimated total process-level RAM attributed to the database.
543    pub fn total_ram_bytes(&self) -> usize {
544        self.buffer_bytes
545            + self.cache_bytes
546            // Snapshot index: each entry is approx (u128 key + u64 id) = 24 bytes.
547            + self.snapshot_index_entries * 24
548            // Fixed overhead: registries, atomics, BTreeMap nodes.
549            + 4096
550    }
551
552    /// Pretty-print the current memory statistics to stdout.
553    pub fn print(&self) {
554        println!("\n╔═══ InfiniteDb Memory Stats ═══╗");
555        println!("║  Write buffer       {:>6} records  ({} bytes)",
556            self.buffer_records, fmt_bytes(self.buffer_bytes));
557        println!("║  LRU block cache    {:>6} blocks   ({} bytes / 10 MB limit)",
558            self.cache_blocks, fmt_bytes(self.cache_bytes));
559        println!("║  Snapshot index     {:>6} entries", self.snapshot_index_entries);
560        println!("║  Total revisions    {:>6}", self.total_revision);
561        println!("║  Sealed blocks      {:>6}", self.sealed_blocks);
562        println!("║  ──────────────────────────────────────────────");
563        println!("║  Est. total RAM     {}", fmt_bytes(self.total_ram_bytes()));
564        println!("╚════════════════════════════════");
565    }
566}
567
568fn fmt_bytes(b: usize) -> String {
569    if b < 1024 { format!("{} B", b) }
570    else if b < 1024 * 1024 { format!("{:.1} KB", b as f64 / 1024.0) }
571    else { format!("{:.2} MB", b as f64 / (1024.0 * 1024.0)) }
572}
573
574// ---------------------------------------------------------------------------
575// Metadata persistence helpers
576// ---------------------------------------------------------------------------
577
578/// Load persisted metadata from the block store.
579/// Returns defaults when no metadata exists yet.
580#[allow(clippy::type_complexity)]
581fn load_meta(
582    store: &BlockStore,
583) -> Option<(SpaceRegistry, BranchRegistry, BTreeMap<u64, Snapshot>, u64, u64, u64)> {
584    let counters_bytes = store.read_meta("counters.bin").ok()?;
585    let (counters, _): ([u64; 3], _) = decode_from_slice(&counters_bytes, standard()).ok()?;
586    let spaces_bytes = store.read_meta("spaces.bin").ok()?;
587    let (spaces, _): (SpaceRegistry, _) = decode_from_slice(&spaces_bytes, standard()).ok()?;
588    Some((
589        spaces,
590        BranchRegistry::new(),
591        BTreeMap::new(),
592        counters[0],
593        counters[1],
594        counters[2],
595    ))
596}
597
598/// Default state for a fresh database.
599type MetaTuple = (SpaceRegistry, BranchRegistry, BTreeMap<u64, Snapshot>, u64, u64, u64);
600
601fn default_meta() -> MetaTuple {
602    (SpaceRegistry::new(), BranchRegistry::new(), BTreeMap::new(), 0, 1, 1)
603}
604
605/// Compute the Hilbert key for a point using STANDARD 8-bit precision.
606/// All dimensions in the point are encoded.
607fn hilbert_key_for(point: &DimensionVector) -> u128 {
608    if point.coords.is_empty() {
609        return 0;
610    }
611    let mut key = CompositeKey::new(KeyConfig::STANDARD);
612    for &c in &point.coords {
613        key = key.push(Dimension::new("_", c));
614    }
615    key.encode()
616}
617
618// ---------------------------------------------------------------------------
619// Tests
620// ---------------------------------------------------------------------------
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625    use tempfile::TempDir;
626    use crate::infinitedb_core::address::{DimensionVector, SpaceId};
627    use crate::infinitedb_core::branch::BranchId;
628
629    fn open_tmp() -> (InfiniteDb, TempDir) {
630        let dir = TempDir::new().unwrap();
631        let db = InfiniteDb::open(dir.path()).unwrap();
632        (db, dir)
633    }
634
635    #[test]
636    fn insert_and_query_unflushed() {
637        let (mut db, _dir) = open_tmp();
638        let space = SpaceId(1);
639        db.insert(space, DimensionVector::new(vec![10, 20]), vec![1, 2, 3]).unwrap();
640        let results = db.query(space, None).unwrap();
641        assert_eq!(results.len(), 1);
642    }
643
644    #[test]
645    fn insert_flush_query() {
646        let (mut db, _dir) = open_tmp();
647        let space = SpaceId(1);
648        db.insert(space, DimensionVector::new(vec![5, 5]), vec![42]).unwrap();
649        db.flush(space).unwrap();
650        let results = db.query(space, None).unwrap();
651        assert_eq!(results.len(), 1);
652        assert_eq!(results[0].data, vec![42]);
653    }
654
655    #[test]
656    fn delete_tombstones_record() {
657        let (mut db, _dir) = open_tmp();
658        let space = SpaceId(1);
659        let point = DimensionVector::new(vec![1, 1]);
660        db.insert(space, point.clone(), vec![99]).unwrap();
661        db.delete(space, point).unwrap();
662        let results = db.query(space, None).unwrap();
663        // Tombstone suppresses the record in live queries.
664        assert!(results.iter().all(|r| !r.tombstone));
665    }
666
667    #[test]
668    fn as_of_returns_historical_state() {
669        let (mut db, _dir) = open_tmp();
670        let space = SpaceId(1);
671        let rev1 = db.insert(space, DimensionVector::new(vec![1, 0]), vec![1]).unwrap();
672        let _rev2 = db.insert(space, DimensionVector::new(vec![2, 0]), vec![2]).unwrap();
673        // Query at rev1 should see only the first record.
674        let results = db.query(space, Some(rev1)).unwrap();
675        assert_eq!(results.len(), 1);
676        assert_eq!(results[0].data, vec![1]);
677    }
678
679    #[test]
680    fn create_branch_succeeds() {
681        let (mut db, _dir) = open_tmp();
682        let main = BranchId(1);
683        let feature = db.create_branch("feature", main).unwrap();
684        assert_ne!(feature, main);
685    }
686}