Skip to main content

infinite_db/
db.rs

1//! `InfiniteDb` — the top-level embedded database handle.
2//!
3//! This is the single entry point for embedded use. It owns:
4//!   - A `BlockStore` (NVMe-aware file storage)
5//!   - A `WalWriter` (crash-safe append log)
6//!   - A `SpaceRegistry` (named dimension spaces)
7//!   - A `BranchRegistry` (named branch heads)
8//!   - A monotonic revision counter
9//!
10//! All writes go through the WAL before touching any block. On `open()`,
11//! the WAL is replayed to recover any in-flight writes from a prior crash.
12//!
13//! # Example
14//! ```no_run
15//! use infinite_db::InfiniteDb;
16//! use infinite_db::infinitedb_core::address::{DimensionVector, SpaceId};
17//! use infinite_db::infinitedb_core::space::{SpaceConfig, SpaceRegistry};
18//!
19//! let mut db = InfiniteDb::open("./mydb").unwrap();
20//! let space = SpaceId(1);
21//! let point = DimensionVector::new(vec![128, 64]);
22//! let data  = bincode::encode_to_vec(&42u32, bincode::config::standard()).unwrap();
23//! db.insert(space, point, data).unwrap();
24//! ```
25
26use std::{
27    collections::BTreeMap,
28    io,
29    path::{Path, PathBuf},
30    sync::atomic::{AtomicU64, Ordering},
31};
32#[cfg(feature = "sync")]
33use std::{
34    sync::{Arc, Mutex},
35    time::Duration,
36};
37
38use bincode::{config::standard, decode_from_slice, encode_to_vec};
39
40use crate::infinitedb_core::{
41    adapter::{AdapterEndpoint, KindLabel, SpaceBinding},
42    address::{Address, DimensionVector, RevisionId, SpaceId},
43    block::{Block, BlockId, Record},
44    branch::{Branch, BranchId, BranchRegistry},
45    endpoint_index::{
46        decode_hyperedge_id, endpoint_index_point, endpoint_lookup_prefix,
47        edge_endpoints, ENDPOINT_INDEX_BITS_PER_DIM, ENDPOINT_INDEX_DIMS, ENDPOINT_INDEX_SPACE,
48    },
49    hyperedge::{EndpointRef, Hyperedge, HyperedgeId},
50    traversal::{Subgraph, TraversalSpec},
51    kind_catalog::KindCatalog,
52    query::Query,
53    signal::SignalSample,
54    snapshot::{BlockIndexEntry, Snapshot, SnapshotId},
55    space::{SpaceConfig, SpaceRegistry},
56};
57use crate::infinitedb_index::composite::KeyConfig;
58use crate::infinitedb_index::key::{hilbert_key_for, hilbert_key_standard};
59use crate::infinitedb_storage::{
60    compaction::{compact, CompactionConfig, CompactionResult},
61    gc::safe_to_delete,
62    nvme::{compute_checksum, BlockStore},
63    wal::{WalDurability, WalEntry, WalWriter},
64};
65
66#[path = "bulk/mod.rs"]
67mod bulk;
68pub use bulk::{
69    BulkHyperedgeImport, BulkHyperedgeImportOptions, BulkImportResult, BulkRecordImport,
70    BulkSignalImport, BulkWriteOptions, BulkWriteResult,
71};
72#[cfg(feature = "sync")]
73use crate::infinitedb_sync::{delta::Delta, merkle};
74#[cfg(feature = "sync")]
75use crate::infinitedb_sync::{
76    outbox::{load_outbox, save_outbox, OutboxState, SyncReport},
77    transport::{SyncOperation, SyncTransport},
78    worker::BackgroundSyncWorker,
79};
80
81// ---------------------------------------------------------------------------
82// Open options
83// ---------------------------------------------------------------------------
84
85/// Options for opening an embedded database.
86#[derive(Debug, Clone)]
87pub struct OpenOptions {
88    /// WAL fsync policy. Default: [`WalDurability::Strict`].
89    pub wal_durability: WalDurability,
90    /// In-memory records before auto-sealing a block. Default: 256.
91    pub flush_threshold: usize,
92    /// LRU cache size for decoded blocks. Default: 10 MiB.
93    pub block_cache_bytes: usize,
94}
95
96impl Default for OpenOptions {
97    fn default() -> Self {
98        Self {
99            wal_durability: WalDurability::Strict,
100            flush_threshold: 256,
101            block_cache_bytes: 10 * 1024 * 1024,
102        }
103    }
104}
105
106impl OpenOptions {
107    /// Open (or create) a database directory with these options.
108    pub fn open<P: AsRef<Path>>(&self, dir: P) -> io::Result<InfiniteDb> {
109        InfiniteDb::open_with_options(dir, self)
110    }
111}
112
113// ---------------------------------------------------------------------------
114// InfiniteDb
115// ---------------------------------------------------------------------------
116
117/// The embedded database handle. Not `Send`/`Sync` — create one per thread
118/// or wrap in a `Mutex` for multi-threaded access.
119pub struct InfiniteDb {
120    store: BlockStore,
121    wal: WalWriter,
122    spaces: SpaceRegistry,
123    branches: BranchRegistry,
124    /// In-memory write buffer: accumulated records not yet sealed into a block.
125    buffer: Vec<Record>,
126    /// Monotonic revision counter. Persisted via WAL checkpoints.
127    revision: AtomicU64,
128    /// Next block ID to assign.
129    next_block_id: AtomicU64,
130    /// Next snapshot ID to assign.
131    next_snapshot_id: AtomicU64,
132    /// Next branch ID to assign.
133    next_branch_id: AtomicU64,
134    /// Active snapshot per space (space_id → snapshot).
135    snapshots: BTreeMap<u64, Snapshot>,
136    /// Flush threshold: seal a block after this many buffered records.
137    flush_threshold: usize,
138    /// When true, auto-flush on buffer size is suppressed (bulk import).
139    defer_auto_flush: bool,
140    /// True while a bulk write guard holds an active session.
141    bulk_session_active: bool,
142    #[cfg(feature = "sync")]
143    outbox_path: PathBuf,
144    #[cfg(feature = "sync")]
145    outbox_state: Arc<Mutex<OutboxState>>,
146    #[cfg(feature = "sync")]
147    sync_worker: Option<BackgroundSyncWorker>,
148}
149
150impl InfiniteDb {
151    /// Open (or create) a database in `dir`. Replays the WAL on first open.
152    pub fn open<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
153        OpenOptions::default().open(dir)
154    }
155
156    /// Open with explicit tuning (cache size, flush threshold, WAL policy).
157    pub fn open_with_options<P: AsRef<Path>>(dir: P, options: &OpenOptions) -> io::Result<Self> {
158        let root = dir.as_ref().to_path_buf();
159        let store = BlockStore::open_with_cache(root.clone(), options.block_cache_bytes)?;
160        let wal_path = store.wal_path();
161        #[cfg(feature = "sync")]
162        let outbox_path = root.join("meta").join("sync_outbox.bin");
163
164        // Replay WAL to recover in-flight writes.
165        let recovered = recover_wal(&wal_path)?;
166
167        let wal = WalWriter::open_with_durability(wal_path, options.wal_durability)?;
168
169        // Load persisted metadata (spaces, branches, snapshots) if present.
170        let (spaces, branches, snapshots, next_rev, next_block, next_snap, next_branch) =
171            load_meta(&store).unwrap_or_else(default_meta);
172
173        let mut db = Self {
174            store,
175            wal,
176            spaces,
177            branches,
178            buffer: Vec::new(),
179            revision: AtomicU64::new(next_rev),
180            next_block_id: AtomicU64::new(next_block),
181            next_snapshot_id: AtomicU64::new(next_snap),
182            next_branch_id: AtomicU64::new(next_branch), // 1 is reserved for main
183            snapshots,
184            flush_threshold: options.flush_threshold,
185            defer_auto_flush: false,
186            bulk_session_active: false,
187            #[cfg(feature = "sync")]
188            outbox_state: Arc::new(Mutex::new(load_outbox(&outbox_path)?)),
189            #[cfg(feature = "sync")]
190            outbox_path,
191            #[cfg(feature = "sync")]
192            sync_worker: None,
193        };
194
195        // Re-apply recovered WAL entries, advancing the revision counter to
196        // cover any replayed write so live queries don't filter them out.
197        let mut max_rev = db.revision.load(Ordering::Relaxed);
198        for entry in recovered {
199            if let WalEntry::Write { revision, .. } | WalEntry::Tombstone { revision, .. } = &entry {
200                max_rev = max_rev.max(revision.0);
201            }
202            db.apply_wal_entry(entry)?;
203        }
204        db.revision.store(max_rev, Ordering::Relaxed);
205
206        // Ensure a `main` branch exists.
207        if db.branches.get_by_name("main").is_none() {
208            let snap_id = db.alloc_snapshot_id();
209            // No spaces yet — main branch starts with no snapshot content.
210            let _ = db.branches.insert(Branch {
211                id: BranchId(1),
212                name: "main".to_string(),
213                head: snap_id,
214                parent: None,
215                forked_at: RevisionId::ZERO,
216            });
217        }
218
219        Ok(db)
220    }
221
222    // -----------------------------------------------------------------------
223    // Space management
224    // -----------------------------------------------------------------------
225
226    /// Register a new space. Must be called before inserting records into it.
227    pub fn register_space(&mut self, config: SpaceConfig) -> Result<(), String> {
228        if config.bits_per_dim == 0 {
229            return Err("bits_per_dim must be at least 1".to_string());
230        }
231        if config.dims as u32 * config.bits_per_dim > 128 {
232            return Err(format!(
233                "dims * bits_per_dim must be <= 128 (got {} * {})",
234                config.dims, config.bits_per_dim
235            ));
236        }
237        self.spaces.register(config).map_err(|e| format!("{:?}", e))?;
238        self.persist_meta().map_err(|e| e.to_string())?;
239        Ok(())
240    }
241
242    /// Hilbert key for a point using the precision configured for its space.
243    /// Falls back to standard 8-bit precision for unregistered/reserved spaces.
244    fn space_key(&self, space: SpaceId, point: &DimensionVector) -> u128 {
245        match self.spaces.get(space) {
246            Some(config) => hilbert_key_for(point, KeyConfig { bits_per_dim: config.bits_per_dim }),
247            None => hilbert_key_standard(point),
248        }
249    }
250
251    /// Register the reserved endpoint index space if not already present.
252    fn ensure_endpoint_index_space(&mut self) -> io::Result<()> {
253        if self.spaces.get(ENDPOINT_INDEX_SPACE).is_none() {
254            self.register_space(
255                SpaceConfig::new(
256                    ENDPOINT_INDEX_SPACE,
257                    "__endpoint_index__",
258                    ENDPOINT_INDEX_DIMS,
259                )
260                .with_bits_per_dim(ENDPOINT_INDEX_BITS_PER_DIM),
261            )
262            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
263        }
264        Ok(())
265    }
266
267    /// True when `space` is configured to key hyperedges by endpoint centroid.
268    fn uses_centroid_keying(&self, space: SpaceId) -> bool {
269        self.spaces.get(space).map(|c| c.centroid_keying).unwrap_or(false)
270    }
271
272    /// Choose the storage point for a hyperedge record.
273    ///
274    /// Returns `(point, is_centroid)`. Centroid keying yields spatial locality
275    /// for spatially-related edges; it falls back to id-based keying for spaces
276    /// without the flag or for purely cross-space edges with no shared frame.
277    fn edge_storage_point(&self, space: SpaceId, edge: &Hyperedge) -> (DimensionVector, bool) {
278        if self.uses_centroid_keying(space) {
279            if let Some(point) = centroid_hyperedge_point(edge) {
280                return (point, true);
281            }
282        }
283        (hyperedge_point(edge.id), false)
284    }
285
286    /// Register the reserved hyperedge locator space if not already present.
287    fn ensure_locator_space(&mut self) -> io::Result<()> {
288        if self.spaces.get(HYPEREDGE_LOCATOR_SPACE).is_none() {
289            self.register_space(SpaceConfig::new(
290                HYPEREDGE_LOCATOR_SPACE,
291                "__hyperedge_locator__",
292                HYPEREDGE_LOCATOR_DIMS,
293            ))
294            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
295        }
296        Ok(())
297    }
298
299    /// Resolve the stored point for a centroid-keyed edge via its locator.
300    fn lookup_edge_locator(
301        &mut self,
302        space: SpaceId,
303        id: HyperedgeId,
304        as_of: Option<RevisionId>,
305    ) -> io::Result<Option<DimensionVector>> {
306        self.ensure_locator_space()?;
307        let p = locator_point(space, id);
308        let records = self.query_bbox(HYPEREDGE_LOCATOR_SPACE, p.clone(), p, as_of)?;
309        for r in records {
310            if let Ok((point, _)) = decode_from_slice::<DimensionVector, _>(&r.data, standard()) {
311                return Ok(Some(point));
312            }
313        }
314        Ok(None)
315    }
316
317    /// Tombstone the id→point locator for a centroid-keyed edge.
318    fn tombstone_edge_locator(&mut self, space: SpaceId, id: HyperedgeId) -> io::Result<()> {
319        self.ensure_locator_space()?;
320        self.delete(HYPEREDGE_LOCATOR_SPACE, locator_point(space, id))?;
321        Ok(())
322    }
323
324    /// Tombstone reverse-index records for every endpoint on a hyperedge.
325    fn tombstone_hyperedge_index(&mut self, edge: &Hyperedge) -> io::Result<()> {
326        self.ensure_endpoint_index_space()?;
327        for ep in edge_endpoints(edge) {
328            let point = endpoint_index_point(ep, edge.id);
329            self.delete(ENDPOINT_INDEX_SPACE, point)?;
330        }
331        Ok(())
332    }
333
334    /// Return hyperedge IDs registered at an endpoint index coordinate.
335    fn query_endpoint_index_ids(
336        &mut self,
337        prefix: &[u32],
338        as_of: Option<RevisionId>,
339    ) -> io::Result<Vec<HyperedgeId>> {
340        // Full-space scan: Hilbert subscope can drop index entries in 16-D layout
341        // (see endpoint_index_point). Prefix filter is cheap vs per-edge hyperedge scan.
342        let records = self.query(ENDPOINT_INDEX_SPACE, as_of)?;
343        Ok(records
344            .iter()
345            .filter(|r| {
346                prefix
347                    .iter()
348                    .enumerate()
349                    .all(|(i, &v)| r.address.point.coords.get(i) == Some(&v))
350            })
351            .filter_map(|r| decode_hyperedge_id(&r.data))
352            .collect())
353    }
354
355    /// Load a single hyperedge by id from the hyperedge space.
356    fn fetch_hyperedge_by_id(
357        &mut self,
358        space: SpaceId,
359        id: HyperedgeId,
360        as_of: Option<RevisionId>,
361    ) -> io::Result<Option<Hyperedge>> {
362        let p = if self.uses_centroid_keying(space) {
363            match self.lookup_edge_locator(space, id, as_of)? {
364                Some(point) => point,
365                None => return Ok(None),
366            }
367        } else {
368            hyperedge_point(id)
369        };
370        let records = self.query_bbox(space, p.clone(), p, as_of)?;
371        for r in records {
372            if let Ok((edge, _)) = decode_from_slice::<Hyperedge, _>(&r.data, standard()) {
373                if edge.id == id {
374                    return Ok(Some(edge));
375                }
376            }
377        }
378        Ok(None)
379    }
380
381    // -----------------------------------------------------------------------
382    // Writes
383    // -----------------------------------------------------------------------
384
385    /// Insert or update a record. Appends a new revision to the WAL.
386    /// The record is buffered in memory; call `flush()` to seal it into a block.
387    pub fn insert(
388        &mut self,
389        space: SpaceId,
390        point: DimensionVector,
391        data: Vec<u8>,
392    ) -> io::Result<RevisionId> {
393        let rev = self.next_revision();
394        let address = Address::new(space, point);
395        let entry = WalEntry::Write {
396            address: address.clone(),
397            revision: rev,
398            data: data.clone(),
399        };
400        self.wal.append(&entry)?;
401        #[cfg(feature = "sync")]
402        self.enqueue_sync(SyncOperation::Write {
403            address: address.clone(),
404            revision: rev,
405            data: data.clone(),
406        })?;
407        self.buffer.push(Record {
408            address,
409            revision: rev,
410            data,
411            tombstone: false,
412        });
413        if !self.defer_auto_flush && self.buffer.len() >= self.flush_threshold {
414            self.flush(space)?;
415        }
416        Ok(rev)
417    }
418
419    /// Logically delete a record at `point` in `space`.
420    pub fn delete(&mut self, space: SpaceId, point: DimensionVector) -> io::Result<RevisionId> {
421        let rev = self.next_revision();
422        let address = Address::new(space, point);
423        let entry = WalEntry::Tombstone {
424            address: address.clone(),
425            revision: rev,
426        };
427        self.wal.append(&entry)?;
428        #[cfg(feature = "sync")]
429        self.enqueue_sync(SyncOperation::Tombstone {
430            address: address.clone(),
431            revision: rev,
432        })?;
433        self.buffer.push(Record {
434            address,
435            revision: rev,
436            data: vec![],
437            tombstone: true,
438        });
439        Ok(rev)
440    }
441
442    /// Insert or update a typed hyperedge record.
443    pub fn insert_hyperedge(
444        &mut self,
445        space: SpaceId,
446        mut edge: Hyperedge,
447    ) -> io::Result<RevisionId> {
448        edge.validate()
449            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
450        if self.edge_storage_point(space, &edge).1 {
451            self.ensure_locator_space()?;
452        }
453        let build_index = true;
454        self.ensure_endpoint_index_space()?;
455        let rows = self.prepare_hyperedge_writes(space, &edge, build_index)?;
456        let rev = self.apply_prepared_writes_strict(rows)?;
457        edge.valid_from = rev;
458        if !self.defer_auto_flush && self.buffer.len() >= self.flush_threshold {
459            self.flush(space)?;
460        }
461        #[cfg(feature = "sync")]
462        self.enqueue_sync(SyncOperation::WriteHyperedge {
463            space,
464            edge,
465            revision: rev,
466        })?;
467        Ok(rev)
468    }
469
470    /// Logically delete a hyperedge record by ID.
471    pub fn delete_hyperedge(&mut self, space: SpaceId, id: HyperedgeId) -> io::Result<RevisionId> {
472        let edge = self.fetch_hyperedge_by_id(space, id, None)?;
473        // Tombstone at the edge's actual stored point. For centroid-keyed
474        // spaces this differs from the id-based point.
475        let point = match &edge {
476            Some(e) => self.edge_storage_point(space, e).0,
477            None => hyperedge_point(id),
478        };
479        if let Some(e) = &edge {
480            self.tombstone_hyperedge_index(e)?;
481        }
482        let rev = self.next_revision();
483        let address = Address::new(space, point);
484        self.wal.append(&WalEntry::Tombstone {
485            address: address.clone(),
486            revision: rev,
487        })?;
488        self.buffer.push(Record {
489            address,
490            revision: rev,
491            data: vec![],
492            tombstone: true,
493        });
494        if edge.is_some() && self.uses_centroid_keying(space) {
495            self.tombstone_edge_locator(space, id)?;
496        }
497        #[cfg(feature = "sync")]
498        self.enqueue_sync(SyncOperation::DeleteHyperedge {
499            space,
500            edge_id: id,
501            revision: rev,
502        })?;
503        Ok(rev)
504    }
505
506    /// Query all decodable hyperedges from a space.
507    pub fn query_hyperedges(
508        &mut self,
509        space: SpaceId,
510        as_of: Option<RevisionId>,
511    ) -> io::Result<Vec<Hyperedge>> {
512        self.query(space, as_of)?
513            .into_iter()
514            .map(|r| {
515                decode_from_slice::<Hyperedge, _>(&r.data, standard())
516                    .map(|(edge, _)| edge)
517                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
518            })
519            .collect()
520    }
521
522    /// Query hyperedges that reference the provided endpoint.
523    ///
524    /// Uses the reverse endpoint index in [`ENDPOINT_INDEX_SPACE`] so this does
525    /// not scan every hyperedge in the space.
526    pub fn query_hyperedges_for_endpoint(
527        &mut self,
528        space: SpaceId,
529        endpoint: &EndpointRef,
530        as_of: Option<RevisionId>,
531    ) -> io::Result<Vec<Hyperedge>> {
532        self.ensure_endpoint_index_space()?;
533        let prefix = endpoint_lookup_prefix(endpoint);
534        let ids = self.query_endpoint_index_ids(&prefix, as_of)?;
535        let rev_ceiling = as_of.unwrap_or_else(|| {
536            RevisionId(self.revision.load(Ordering::Relaxed))
537        });
538        let mut edges = Vec::new();
539        for id in ids {
540            if let Some(edge) = self.fetch_hyperedge_by_id(space, id, as_of)? {
541                if edge.is_active_at(rev_ceiling)
542                    && edge.endpoints.iter().any(|ep| {
543                        ep.space == endpoint.space && ep.node.coords == endpoint.node.coords
544                    })
545                {
546                    edges.push(edge);
547                }
548            }
549        }
550        Ok(edges)
551    }
552
553    /// BFS walk from `spec.start`, following hyperedges up to `spec.max_depth`.
554    pub fn traverse(&mut self, edge_space: SpaceId, spec: &TraversalSpec) -> io::Result<Subgraph> {
555        let rev_ceiling = spec.as_of.unwrap_or_else(|| {
556            RevisionId(self.revision.load(Ordering::Relaxed))
557        });
558        let mut subgraph = Subgraph::default();
559        subgraph.add_node(spec.start.clone());
560
561        let mut frontier: std::collections::VecDeque<(EndpointRef, usize)> =
562            std::collections::VecDeque::from([(spec.start.clone(), 0)]);
563        let mut visited: std::collections::HashSet<(u64, Vec<u32>)> =
564            std::collections::HashSet::from([(spec.start.space.0, spec.start.node.coords.clone())]);
565
566        while let Some((node, depth)) = frontier.pop_front() {
567            let incident = self.query_hyperedges_for_endpoint(edge_space, &node, spec.as_of)?;
568            for edge in incident {
569                if let Some(ref kinds) = spec.follow_kinds {
570                    if !kinds.iter().any(|k| k.as_str() == edge.kind.as_str()) {
571                        continue;
572                    }
573                }
574                if !edge.is_active_at(rev_ceiling) {
575                    continue;
576                }
577                subgraph.add_edge(edge.clone());
578                for ep in &edge.endpoints {
579                    if ep.space == node.space && ep.node.coords == node.node.coords {
580                        continue;
581                    }
582                    let next_depth = depth + 1;
583                    if next_depth > spec.max_depth {
584                        continue;
585                    }
586                    let key = (ep.space.0, ep.node.coords.clone());
587                    if visited.insert(key) {
588                        subgraph.add_node(ep.clone());
589                        frontier.push_back((ep.clone(), next_depth));
590                    }
591                }
592            }
593        }
594        Ok(subgraph)
595    }
596
597    /// Query hyperedges by relationship kind.
598    pub fn query_hyperedges_by_kind(
599        &mut self,
600        space: SpaceId,
601        kind: &str,
602        as_of: Option<RevisionId>,
603    ) -> io::Result<Vec<Hyperedge>> {
604        let edges = self.query_hyperedges(space, as_of)?;
605        Ok(edges
606            .into_iter()
607            .filter(|e| e.kind.as_str() == kind)
608            .collect())
609    }
610
611    /// Adapter-friendly hyperedge write API with optional catalog enforcement.
612    pub fn insert_hyperedge_typed<K: KindLabel>(
613        &mut self,
614        space: SpaceId,
615        id: HyperedgeId,
616        kind: K,
617        endpoints: Vec<AdapterEndpoint>,
618        weight_milli: Option<i64>,
619        metadata: std::collections::BTreeMap<String, String>,
620        valid_to: Option<RevisionId>,
621        catalog: Option<&KindCatalog>,
622    ) -> io::Result<RevisionId> {
623        let kind_label = kind.label().to_string();
624        if let Some(catalog) = catalog {
625            catalog
626                .validate_edge_kind(&kind_label)
627                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
628            for ep in &endpoints {
629                catalog
630                    .validate_endpoint_role(&ep.role)
631                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
632            }
633        }
634        let edge = Hyperedge {
635            id,
636            kind: kind_label.into(),
637            endpoints: endpoints.into_iter().map(EndpointRef::from).collect(),
638            weight_milli,
639            metadata,
640            valid_from: RevisionId::ZERO,
641            valid_to,
642        };
643        self.insert_hyperedge(space, edge)
644    }
645
646    /// Adapter-friendly kind-filter query.
647    pub fn query_hyperedges_by_kind_typed<K: KindLabel>(
648        &mut self,
649        space: SpaceId,
650        kind: K,
651        as_of: Option<RevisionId>,
652    ) -> io::Result<Vec<Hyperedge>> {
653        self.query_hyperedges_by_kind(space, kind.label(), as_of)
654    }
655
656    /// Insert a signal sample in the provided signal space.
657    pub fn insert_signal_sample(
658        &mut self,
659        space: SpaceId,
660        sample: SignalSample,
661    ) -> io::Result<RevisionId> {
662        sample.validate()
663            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
664        let full_coords = sample
665            .scope
666            .address_coords(&sample.local_coords)
667            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
668        if let Some(cfg) = self.spaces.get(space) {
669            if cfg.dims != full_coords.len() {
670                return Err(io::Error::new(
671                    io::ErrorKind::InvalidInput,
672                    "signal coordinates do not match space dimensions",
673                ));
674            }
675        }
676        let data = encode_to_vec(&sample, standard())
677            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
678        let rev = self.next_revision();
679        let address = Address::new(space, DimensionVector::new(full_coords));
680        self.wal.append(&WalEntry::Write {
681            address: address.clone(),
682            revision: rev,
683            data: data.clone(),
684        })?;
685        self.buffer.push(Record {
686            address,
687            revision: rev,
688            data,
689            tombstone: false,
690        });
691        if !self.defer_auto_flush && self.buffer.len() >= self.flush_threshold {
692            self.flush(space)?;
693        }
694        #[cfg(feature = "sync")]
695        self.enqueue_sync(SyncOperation::WriteSignal {
696            space,
697            sample,
698            revision: rev,
699        })?;
700        Ok(rev)
701    }
702
703    /// Adapter-friendly signal write API bound to a typed space.
704    pub fn insert_signal_sample_typed<SB: SpaceBinding, K: KindLabel>(
705        &mut self,
706        signal_id: crate::infinitedb_core::signal::SignalId,
707        kind: K,
708        parent_prefix: DimensionVector,
709        local_coords: Vec<u32>,
710        value_milli: i64,
711        source_revision: Option<RevisionId>,
712        constraint: Option<crate::infinitedb_core::signal::SignalConstraint>,
713        catalog: Option<&KindCatalog>,
714    ) -> io::Result<RevisionId> {
715        if let Some(cfg) = self.spaces.get(SB::SPACE_ID) {
716            if cfg.dims != SB::DIMS {
717                return Err(io::Error::new(
718                    io::ErrorKind::InvalidInput,
719                    format!(
720                        "SpaceBinding dims mismatch for space {}: trait={}, registry={}",
721                        SB::SPACE_ID.0,
722                        SB::DIMS,
723                        cfg.dims
724                    ),
725                ));
726            }
727        } else {
728            return Err(io::Error::new(
729                io::ErrorKind::InvalidInput,
730                format!("SpaceBinding refers to unregistered space {}", SB::SPACE_ID.0),
731            ));
732        }
733        let kind_label = kind.label().to_string();
734        if let Some(catalog) = catalog {
735            catalog
736                .validate_signal_kind(&kind_label)
737                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
738        }
739        let sample = crate::infinitedb_core::signal::SignalSample {
740            signal_id,
741            kind: kind_label.into(),
742            scope: crate::infinitedb_core::signal::SignalScope {
743                parent_prefix,
744                total_dims: SB::DIMS,
745            },
746            local_coords,
747            value_milli,
748            source_revision,
749            constraint,
750        };
751        self.insert_signal_sample(SB::SPACE_ID, sample)
752    }
753
754    /// Query all signal samples under a parent scope prefix.
755    pub fn query_signal_scope(
756        &mut self,
757        space: SpaceId,
758        parent_coords: &[u32],
759        as_of: Option<RevisionId>,
760    ) -> io::Result<Vec<SignalSample>> {
761        let rows = self.query_subscope(space, parent_coords, as_of)?;
762        rows.into_iter()
763            .map(|r| {
764                decode_from_slice::<SignalSample, _>(&r.data, standard())
765                    .map(|(sample, _)| sample)
766                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
767            })
768            .collect()
769    }
770
771    /// Query signal samples in a local coordinate range under a parent scope.
772    pub fn query_signal_range(
773        &mut self,
774        space: SpaceId,
775        parent_coords: &[u32],
776        min_local: &[u32],
777        max_local: &[u32],
778        as_of: Option<RevisionId>,
779    ) -> io::Result<Vec<SignalSample>> {
780        if min_local.len() != max_local.len() {
781            return Err(io::Error::new(
782                io::ErrorKind::InvalidInput,
783                "min_local and max_local dimensions differ",
784            ));
785        }
786        let mut min = parent_coords.to_vec();
787        min.extend_from_slice(min_local);
788        let mut max = parent_coords.to_vec();
789        max.extend_from_slice(max_local);
790        let rows = self.query_bbox(
791            space,
792            DimensionVector::new(min),
793            DimensionVector::new(max),
794            as_of,
795        )?;
796        rows.into_iter()
797            .map(|r| {
798                decode_from_slice::<SignalSample, _>(&r.data, standard())
799                    .map(|(sample, _)| sample)
800                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
801            })
802            .collect()
803    }
804
805    /// Seal all buffered records for `space` into a new block on disk.
806    pub fn flush(&mut self, space: SpaceId) -> io::Result<()> {
807        if self.buffer.is_empty() {
808            return Ok(());
809        }
810
811        // Partition: take only records for this space; keep the rest in the buffer.
812        let mut remaining = Vec::new();
813        let mut records: Vec<Record> = Vec::new();
814        for record in self.buffer.drain(..) {
815            if record.address.space == space {
816                records.push(record);
817            } else {
818                remaining.push(record);
819            }
820        }
821        self.buffer = remaining;
822
823        if records.is_empty() {
824            return Ok(());
825        }
826
827        // Sort by Hilbert key then revision.
828        records.sort_by_key(|r| {
829            let key = self.space_key(space, &r.address.point);
830            (key, r.revision.0)
831        });
832
833        let min_rev = records.iter().map(|r| r.revision).min().unwrap_or(RevisionId::ZERO);
834        let max_rev = records.iter().map(|r| r.revision).max().unwrap_or(RevisionId::ZERO);
835        let block_id = self.alloc_block_id();
836
837        let mut block = Block {
838            id: block_id,
839            space,
840            records,
841            min_revision: min_rev,
842            max_revision: max_rev,
843            checksum: [0u8; 32],
844        };
845        block.checksum = compute_checksum(&block)?;
846
847        // Write block to NVMe store.
848        self.store.write_block(&block)?;
849
850        // Record block seal in WAL.
851        let snap_id = self.alloc_snapshot_id();
852        self.wal.append(&WalEntry::BlockSealed {
853            block_id,
854            space,
855            snapshot: snap_id,
856        })?;
857
858        // Records are sorted by Hilbert key, so the first/last give the block's
859        // key interval used for range pruning. Compute before borrowing the
860        // snapshot mutably below.
861        let hilbert_min = block
862            .records
863            .first()
864            .map(|r| self.space_key(space, &r.address.point))
865            .unwrap_or(0);
866        let hilbert_max = block
867            .records
868            .last()
869            .map(|r| self.space_key(space, &r.address.point))
870            .unwrap_or(hilbert_min);
871
872        // Advance the space's active snapshot.
873        let snapshot = self.snapshots.entry(space.0).or_insert_with(|| {
874            Snapshot::root(snap_id, space)
875        });
876        snapshot.blocks.insert(
877            hilbert_min,
878            BlockIndexEntry { block_id, max_key: hilbert_max },
879        );
880        snapshot.revision = max_rev;
881
882        self.persist_meta()?;
883        self.wal.sync()?;
884
885        // The sealed records are now durable in `snapshots.bin`. Rewrite the WAL
886        // so it retains only the records still buffered (other spaces, not yet
887        // sealed) plus a checkpoint marker — otherwise replay on the next open
888        // would re-add records that already live in the sealed block.
889        self.rewrite_wal_from_buffer(max_rev)?;
890        Ok(())
891    }
892
893    /// Rewrite the WAL to mirror the current buffer, terminated by a checkpoint.
894    fn rewrite_wal_from_buffer(&mut self, checkpoint: RevisionId) -> io::Result<()> {
895        let mut entries: Vec<WalEntry> = self
896            .buffer
897            .iter()
898            .map(|r| {
899                if r.tombstone {
900                    WalEntry::Tombstone { address: r.address.clone(), revision: r.revision }
901                } else {
902                    WalEntry::Write {
903                        address: r.address.clone(),
904                        revision: r.revision,
905                        data: r.data.clone(),
906                    }
907                }
908            })
909            .collect();
910        entries.push(WalEntry::Checkpoint { revision: checkpoint });
911        self.wal.rewrite(&entries)
912    }
913
914    // -----------------------------------------------------------------------
915    // Reads
916    // -----------------------------------------------------------------------
917
918    /// Return the current snapshot ID for `space`.
919    pub fn current_snapshot(&self, space: SpaceId) -> Option<SnapshotId> {
920        self.snapshots.get(&space.0).map(|s| s.id)
921    }
922
923    /// Scan all live records in `space`, optionally capped at `as_of`.
924    /// Full-space scan — no spatial filtering. Use `query_bbox` to filter by coordinates.
925    pub fn query(
926        &mut self,
927        space: SpaceId,
928        as_of: Option<RevisionId>,
929    ) -> io::Result<Vec<Record>> {
930        self.query_inner(space, None, as_of, false)
931    }
932
933    /// Execute a `Query` descriptor against the space's current snapshot.
934    ///
935    /// Resolution order for the block-pruning key interval:
936    /// 1. `q.key_range` when present (a precomputed Hilbert interval);
937    /// 2. otherwise derived from `q.range`'s corner coordinates;
938    /// 3. otherwise a full-space scan.
939    ///
940    /// When `q.range` is set, an exact per-record coordinate filter is applied
941    /// so results match `query_bbox`. Historical snapshot reads are not yet
942    /// supported, so `q.snapshot` must match the space's current snapshot.
943    pub fn execute(&mut self, q: &Query) -> io::Result<Vec<Record>> {
944        if let Some(current) = self.current_snapshot(q.space) {
945            if current != q.snapshot {
946                return Err(io::Error::new(
947                    io::ErrorKind::InvalidInput,
948                    "query snapshot does not match the current snapshot for this space",
949                ));
950            }
951        }
952
953        let key_range = match q.key_range {
954            Some(kr) => Some(kr),
955            None => q.range.as_ref().map(|r| {
956                let ka = self.space_key(q.space, &r.min);
957                let kb = self.space_key(q.space, &r.max);
958                if ka <= kb { (ka, kb) } else { (kb, ka) }
959            }),
960        };
961
962        let mut results = self.query_inner(q.space, key_range, q.as_of, q.include_tombstones)?;
963
964        if let Some(range) = &q.range {
965            results.retain(|r| r.address.point.within(&range.min, &range.max));
966        }
967
968        Ok(results)
969    }
970
971    /// Bounding-box query in N dimensions.
972    ///
973    /// Returns every live record in `space` whose point satisfies
974    /// `min[i] <= point[i] <= max[i]` on **every** axis simultaneously.
975    ///
976    /// Works correctly for any dimensionality (1–16 dims). A Hilbert-key
977    /// interval is used to prune candidate blocks at the block level; an
978    /// exact `within()` check per record eliminates any false positives caused
979    /// by the Hilbert curve mapping a bounding box to many disjoint intervals.
980    ///
981    /// `min` and `max` must have the same number of coordinates as the points
982    /// stored in the space.
983    pub fn query_bbox(
984        &mut self,
985        space: SpaceId,
986        min: DimensionVector,
987        max: DimensionVector,
988        as_of: Option<RevisionId>,
989    ) -> io::Result<Vec<Record>> {
990        assert_eq!(min.dims(), max.dims(), "min and max must have equal dimensions");
991        // Compute Hilbert keys for both bounding corners and use the interval
992        // as a block-level pre-filter (over-approximation — false positives OK).
993        let k_min = self.space_key(space, &min);
994        let k_max = self.space_key(space, &max);
995        let (lo, hi) = if k_min <= k_max { (k_min, k_max) } else { (k_max, k_min) };
996        let mut results = self.query_inner(space, Some((lo, hi)), as_of, false)?;
997        // Exact per-record coordinate filter removes all Hilbert false positives.
998        results.retain(|r| r.address.point.within(&min, &max));
999        Ok(results)
1000    }
1001
1002    /// Sub-space query: pins the first `parent_coords.len()` dimensions to exact
1003    /// values and leaves the remaining inner dimensions fully open (0..u32::MAX).
1004    ///
1005    /// This is the idiomatic way to retrieve all property records for a specific
1006    /// parent element in the nested Hilbert design:
1007    /// ```ignore
1008    /// // All load properties of element 42:
1009    /// db.query_subscope(SPACE_LOADS, &[42], None);
1010    /// ```
1011    pub fn query_subscope(
1012        &mut self,
1013        space: SpaceId,
1014        parent_coords: &[u32],
1015        as_of: Option<RevisionId>,
1016    ) -> io::Result<Vec<Record>> {
1017        // We need to know the total dims for this space to build the full vectors.
1018        let dims = self.spaces.get(space)
1019            .map(|c| c.dims)
1020            .unwrap_or(parent_coords.len() + 1);
1021        assert!(
1022            parent_coords.len() <= dims,
1023            "parent_coords has more dimensions than the space"
1024        );
1025        let inner_dims = dims - parent_coords.len();
1026        let mut min_coords: Vec<u32> = parent_coords.to_vec();
1027        let mut max_coords: Vec<u32> = parent_coords.to_vec();
1028        min_coords.extend(std::iter::repeat(0).take(inner_dims));
1029        max_coords.extend(std::iter::repeat(u32::MAX).take(inner_dims));
1030        self.query_bbox(
1031            space,
1032            DimensionVector::new(min_coords),
1033            DimensionVector::new(max_coords),
1034            as_of,
1035        )
1036    }
1037
1038    // Shared core: reads from sealed blocks + the in-memory write buffer.
1039    fn query_inner(
1040        &mut self,
1041        space: SpaceId,
1042        key_range: Option<(u128, u128)>,
1043        as_of: Option<RevisionId>,
1044        include_tombstones: bool,
1045    ) -> io::Result<Vec<Record>> {
1046        let rev_ceiling = as_of.unwrap_or_else(|| {
1047            RevisionId(self.revision.load(Ordering::Relaxed))
1048        });
1049
1050        let mut results: Vec<Record> = Vec::new();
1051
1052        let mut tombstoned: std::collections::HashSet<_> = self
1053            .buffer
1054            .iter()
1055            .filter(|r| r.address.space == space && r.tombstone && r.revision <= rev_ceiling)
1056            .map(|r| r.address.point.coords.clone())
1057            .collect();
1058
1059        // Query sealed blocks if a snapshot exists.
1060        if let Some(snapshot) = self.snapshots.get(&space.0) {
1061            let block_ids: Vec<BlockId> = match key_range {
1062                None => snapshot.blocks.values().map(|e| e.block_id).collect(),
1063                Some((lo, hi)) => {
1064                    // Each block covers the Hilbert interval [min_key, max_key]
1065                    // (min_key is the map key). Prune to blocks whose interval
1066                    // overlaps the query interval [lo, hi]; the per-record
1067                    // within() filter still removes Hilbert false positives.
1068                    snapshot
1069                        .blocks
1070                        .iter()
1071                        .filter(|(min_key, entry)| **min_key <= hi && entry.max_key >= lo)
1072                        .map(|(_, entry)| entry.block_id)
1073                        .collect()
1074                }
1075            };
1076            for block_id in &block_ids {
1077                let block = self.store.read_block(*block_id)?;
1078                for record in &block.records {
1079                    if record.address.space == space
1080                        && record.tombstone
1081                        && record.revision <= rev_ceiling
1082                    {
1083                        tombstoned.insert(record.address.point.coords.clone());
1084                    }
1085                }
1086            }
1087            for block_id in block_ids {
1088                let block = self.store.read_block(block_id)?;
1089                for record in block.records {
1090                    if record.revision > rev_ceiling {
1091                        continue;
1092                    }
1093                    if !include_tombstones && record.tombstone {
1094                        continue;
1095                    }
1096                    // Point lookups (lo == hi): records are sorted by Hilbert key at seal.
1097                    if let Some((lo, hi)) = key_range {
1098                        if lo == hi {
1099                            let k = self.space_key(space, &record.address.point);
1100                            if k != lo {
1101                                continue;
1102                            }
1103                        }
1104                    }
1105                    results.push(record);
1106                }
1107            }
1108            if !include_tombstones {
1109                results.retain(|r| !tombstoned.contains(&r.address.point.coords));
1110            }
1111        }
1112
1113        // In-memory buffer (records not yet flushed).
1114        for record in &self.buffer {
1115            let visible = record.address.space == space
1116                && record.revision <= rev_ceiling
1117                && (include_tombstones || !record.tombstone)
1118                && (include_tombstones || !tombstoned.contains(&record.address.point.coords));
1119            if visible {
1120                if let Some((lo, hi)) = key_range {
1121                    let k = self.space_key(space, &record.address.point);
1122                    if k < lo || k > hi {
1123                        continue;
1124                    }
1125                }
1126                results.push(record.clone());
1127            }
1128        }
1129
1130        Ok(results)
1131    }
1132
1133    // -----------------------------------------------------------------------
1134    // Branch management
1135    // -----------------------------------------------------------------------
1136
1137    /// Look up a branch ID by name, if one exists.
1138    pub fn branch_id(&self, name: &str) -> Option<BranchId> {
1139        self.branches.get_by_name(name).map(|b| b.id)
1140    }
1141
1142    /// Create a new branch forked from an existing one at the current revision.
1143    pub fn create_branch(
1144        &mut self,
1145        name: impl Into<String>,
1146        from: BranchId,
1147    ) -> Result<BranchId, String> {
1148        let parent = self.branches.get(from).ok_or("Branch not found")?;
1149        let new_id = BranchId(self.next_branch_id.fetch_add(1, Ordering::Relaxed));
1150        let rev = RevisionId(self.revision.load(Ordering::Relaxed));
1151        let branch = Branch {
1152            id: new_id,
1153            name: name.into(),
1154            head: parent.head,
1155            parent: Some(from),
1156            forked_at: rev,
1157        };
1158        self.branches.insert(branch).map_err(|e| format!("{:?}", e))?;
1159        self.persist_meta().map_err(|e| e.to_string())?;
1160        Ok(new_id)
1161    }
1162
1163    // -----------------------------------------------------------------------
1164    // Diagnostics
1165    // -----------------------------------------------------------------------
1166
1167    /// Returns a snapshot of current memory and cache usage.
1168    pub fn memory_stats(&self) -> MemoryStats {
1169        let buffer_records = self.buffer.len();
1170        let buffer_bytes: usize = self.buffer.iter()
1171            .map(|r| 48 + r.data.len())
1172            .sum();
1173        let (cache_bytes, cache_blocks) = self.store.cache_stats();
1174        let snapshot_entries: usize = self.snapshots.values()
1175            .map(|s| s.blocks.len())
1176            .sum();
1177        MemoryStats {
1178            buffer_records,
1179            buffer_bytes,
1180            cache_bytes,
1181            cache_blocks,
1182            snapshot_index_entries: snapshot_entries,
1183            total_revision: self.revision.load(Ordering::Relaxed),
1184            sealed_blocks: self.next_block_id.load(Ordering::Relaxed),
1185        }
1186    }
1187
1188    // -----------------------------------------------------------------------
1189    // Snapshot access, compaction, and block I/O
1190    // -----------------------------------------------------------------------
1191
1192    /// Return the active snapshot for a space, if one exists.
1193    pub fn snapshot_for_space(&self, space: SpaceId) -> Option<Snapshot> {
1194        self.snapshots.get(&space.0).cloned()
1195    }
1196
1197    /// Current monotonic revision counter.
1198    pub fn revision(&self) -> u64 {
1199        self.revision.load(Ordering::Relaxed)
1200    }
1201
1202    /// Read a sealed block from disk (through the LRU cache).
1203    pub fn read_block(&mut self, id: BlockId) -> io::Result<Block> {
1204        self.store.read_block(id)
1205    }
1206
1207    /// Merge fragmented blocks in `space` into fewer larger blocks.
1208    pub fn compact_space(
1209        &mut self,
1210        space: SpaceId,
1211        config: &CompactionConfig,
1212    ) -> io::Result<CompactionResult> {
1213        let snapshot = self
1214            .snapshots
1215            .get(&space.0)
1216            .cloned()
1217            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no snapshot for space"))?;
1218
1219        let input_blocks: Vec<Block> = snapshot
1220            .blocks
1221            .values()
1222            .map(|e| self.store.read_block(e.block_id))
1223            .collect::<io::Result<_>>()?;
1224
1225        if input_blocks.len() <= 1 {
1226            return Ok(CompactionResult {
1227                new_blocks: vec![],
1228                superseded: vec![],
1229            });
1230        }
1231
1232        let result = compact(
1233            input_blocks,
1234            config,
1235            snapshot.id,
1236            || self.alloc_block_id(),
1237        );
1238
1239        let mut new_blocks = Vec::new();
1240        for mut block in result.new_blocks {
1241            let mut recs = block.records.clone();
1242            recs.sort_by_key(|r| self.space_key(space, &r.address.point));
1243            block.records = recs;
1244            block.checksum = compute_checksum(&block)?;
1245            self.store.write_block(&block)?;
1246            new_blocks.push(block);
1247        }
1248
1249        let mut updated = Snapshot::root(snapshot.id, space);
1250        updated.revision = snapshot.revision;
1251        for block in &new_blocks {
1252            let min_key = block
1253                .records
1254                .first()
1255                .map(|r| self.space_key(space, &r.address.point))
1256                .unwrap_or(0);
1257            let max_key = block
1258                .records
1259                .last()
1260                .map(|r| self.space_key(space, &r.address.point))
1261                .unwrap_or(min_key);
1262            updated.blocks.insert(
1263                min_key,
1264                BlockIndexEntry {
1265                    block_id: block.id,
1266                    max_key,
1267                },
1268            );
1269        }
1270        self.snapshots.insert(space.0, updated);
1271
1272        let live: Vec<Snapshot> = self.snapshots.values().cloned().collect();
1273        let deletable = safe_to_delete(&result.superseded, &live);
1274        self.gc_blocks(&deletable)?;
1275
1276        let rev = RevisionId(self.revision.load(Ordering::Relaxed));
1277        self.wal.append(&WalEntry::Checkpoint { revision: rev })?;
1278        self.persist_meta()?;
1279        Ok(CompactionResult {
1280            new_blocks,
1281            superseded: result.superseded,
1282        })
1283    }
1284
1285    /// Delete block files that are no longer referenced by any live snapshot.
1286    pub fn gc_blocks(&mut self, superseded: &[BlockId]) -> io::Result<usize> {
1287        let live: Vec<Snapshot> = self.snapshots.values().cloned().collect();
1288        let deletable = safe_to_delete(superseded, &live);
1289        for id in &deletable {
1290            self.store.delete_block(*id)?;
1291        }
1292        Ok(deletable.len())
1293    }
1294
1295    #[cfg(feature = "sync")]
1296    /// Build a Merkle tree over all records in `space`'s snapshot (Hilbert order).
1297    pub fn snapshot_merkle(&mut self, space: SpaceId) -> io::Result<merkle::MerkleTree> {
1298        let snapshot = self
1299            .snapshots
1300            .get(&space.0)
1301            .cloned()
1302            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no snapshot for space"))?;
1303
1304        let mut leaves = Vec::new();
1305        for (_min_key, entry) in &snapshot.blocks {
1306            let block = self.store.read_block(entry.block_id)?;
1307            let mut recs = block.records;
1308            recs.sort_by_key(|r| (self.space_key(space, &r.address.point), r.revision.0));
1309            for record in &recs {
1310                let encoded = encode_to_vec(record, standard())
1311                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1312                leaves.push(merkle::hash_record(&encoded));
1313            }
1314        }
1315        Ok(merkle::MerkleTree::build(&leaves))
1316    }
1317
1318    #[cfg(feature = "sync")]
1319    /// Apply a replication delta: write blocks, update snapshot index, GC superseded files.
1320    pub fn apply_delta(&mut self, space: SpaceId, delta: &Delta) -> io::Result<()> {
1321        for block in &delta.added_blocks {
1322            self.store.write_block(block)?;
1323        }
1324        let current = self
1325            .snapshots
1326            .get(&space.0)
1327            .cloned()
1328            .unwrap_or_else(|| Snapshot::root(delta.target_snapshot, space));
1329        let updated = delta.apply(&current);
1330        self.snapshots.insert(space.0, updated);
1331        self.gc_blocks(&delta.removed_block_ids)?;
1332        self.persist_meta()?;
1333        Ok(())
1334    }
1335
1336    // -----------------------------------------------------------------------
1337    // Replication sync (offline-first)
1338    // -----------------------------------------------------------------------
1339
1340    /// Run one immediate sync pass against the provided transport.
1341    #[cfg(feature = "sync")]
1342    pub fn sync_now(
1343        &mut self,
1344        transport: &dyn SyncTransport,
1345        max_batch: usize,
1346    ) -> io::Result<SyncReport> {
1347        let mut state = self
1348            .outbox_state
1349            .lock()
1350            .map_err(|_| io::Error::new(io::ErrorKind::Other, "sync outbox lock poisoned"))?;
1351        let report = state.process_once(transport, max_batch);
1352        if report.attempted > 0 {
1353            save_outbox(&self.outbox_path, &state)?;
1354        }
1355        Ok(report)
1356    }
1357
1358    /// Start a background worker that retries queued outbox writes.
1359    #[cfg(feature = "sync")]
1360    pub fn start_background_sync(
1361        &mut self,
1362        transport: Arc<dyn SyncTransport>,
1363        interval: Duration,
1364        batch_size: usize,
1365    ) -> io::Result<()> {
1366        self.stop_background_sync();
1367        let worker = BackgroundSyncWorker::start(
1368            Arc::clone(&self.outbox_state),
1369            self.outbox_path.clone(),
1370            transport,
1371            interval,
1372            batch_size,
1373        )?;
1374        self.sync_worker = Some(worker);
1375        Ok(())
1376    }
1377
1378    /// Stop the running background sync worker (if any).
1379    #[cfg(feature = "sync")]
1380    pub fn stop_background_sync(&mut self) {
1381        if let Some(mut worker) = self.sync_worker.take() {
1382            worker.stop();
1383        }
1384    }
1385
1386    /// Current number of queued operations waiting to sync.
1387    #[cfg(feature = "sync")]
1388    pub fn sync_pending_count(&self) -> usize {
1389        self.outbox_state
1390            .lock()
1391            .map(|s| s.pending_count())
1392            .unwrap_or(0)
1393    }
1394
1395    /// Last successful outbox sync timestamp (epoch ms).
1396    #[cfg(feature = "sync")]
1397    pub fn last_successful_sync_at_ms(&self) -> Option<u64> {
1398        self.outbox_state
1399            .lock()
1400            .ok()
1401            .and_then(|s| s.last_success_at_ms)
1402    }
1403
1404    /// Last sync error seen by the outbox processor.
1405    #[cfg(feature = "sync")]
1406    pub fn last_sync_error(&self) -> Option<String> {
1407        self.outbox_state
1408            .lock()
1409            .ok()
1410            .and_then(|s| s.last_error.clone())
1411    }
1412
1413    // -----------------------------------------------------------------------
1414    // Internal helpers
1415    // -----------------------------------------------------------------------
1416
1417    fn next_revision(&self) -> RevisionId {
1418        RevisionId(self.revision.fetch_add(1, Ordering::Relaxed) + 1)
1419    }
1420
1421    fn alloc_block_id(&self) -> BlockId {
1422        BlockId(self.next_block_id.fetch_add(1, Ordering::Relaxed))
1423    }
1424
1425    fn alloc_snapshot_id(&self) -> SnapshotId {
1426        SnapshotId(self.next_snapshot_id.fetch_add(1, Ordering::Relaxed))
1427    }
1428
1429    fn apply_wal_entry(&mut self, entry: WalEntry) -> io::Result<()> {
1430        match entry {
1431            WalEntry::Write { address, revision, data } => {
1432                self.buffer.push(Record { address, revision, data, tombstone: false });
1433            }
1434            WalEntry::Tombstone { address, revision } => {
1435                self.buffer.push(Record { address, revision, data: vec![], tombstone: true });
1436            }
1437            WalEntry::BlockSealed { block_id, space, snapshot } => {
1438                self.reconcile_sealed_block(block_id, space, snapshot)?;
1439            }
1440            WalEntry::Checkpoint { .. } => {}
1441        }
1442        Ok(())
1443    }
1444
1445    /// Re-link a sealed block into the snapshot index during recovery.
1446    ///
1447    /// This covers a crash between `write_block` and the `persist_meta` that
1448    /// records the block in `snapshots.bin`: the block exists on disk and the
1449    /// WAL has its `BlockSealed` marker, but the persisted snapshot does not yet
1450    /// reference it. We read the block, insert its index entry, and drop the now
1451    /// sealed records from the recovery buffer so queries don't double-count.
1452    ///
1453    /// If the block file is absent (crash before it was durably written) the
1454    /// records remain in the buffer via their `Write` entries and will be
1455    /// resealed on the next flush, so we simply skip.
1456    fn reconcile_sealed_block(
1457        &mut self,
1458        block_id: BlockId,
1459        space: SpaceId,
1460        snapshot_id: SnapshotId,
1461    ) -> io::Result<()> {
1462        let block = match self.store.read_block(block_id) {
1463            Ok(block) => block,
1464            Err(_) => return Ok(()),
1465        };
1466
1467        let min_key = block
1468            .records
1469            .first()
1470            .map(|r| self.space_key(space, &r.address.point))
1471            .unwrap_or(0);
1472        let max_key = block
1473            .records
1474            .last()
1475            .map(|r| self.space_key(space, &r.address.point))
1476            .unwrap_or(min_key);
1477        let block_max_rev = block.max_revision;
1478
1479        let sealed: std::collections::HashSet<(Vec<u32>, u64)> = block
1480            .records
1481            .iter()
1482            .map(|r| (r.address.point.coords.clone(), r.revision.0))
1483            .collect();
1484
1485        let snapshot = self
1486            .snapshots
1487            .entry(space.0)
1488            .or_insert_with(|| Snapshot::root(snapshot_id, space));
1489        snapshot
1490            .blocks
1491            .insert(min_key, BlockIndexEntry { block_id, max_key });
1492        if snapshot.revision < block_max_rev {
1493            snapshot.revision = block_max_rev;
1494        }
1495
1496        self.buffer
1497            .retain(|r| !sealed.contains(&(r.address.point.coords.clone(), r.revision.0)));
1498        Ok(())
1499    }
1500
1501    fn persist_meta(&mut self) -> io::Result<()> {
1502        // Persist spaces registry.
1503        let spaces_bytes = encode_to_vec(&self.spaces, standard())
1504            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1505        self.store.write_meta("spaces.bin", &spaces_bytes)?;
1506
1507        // Persist branch registry so user branches survive a restart.
1508        let branches_bytes = encode_to_vec(&self.branches, standard())
1509            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1510        self.store.write_meta("branches.bin", &branches_bytes)?;
1511
1512        // Persist the per-space snapshot index. Without this the sealed blocks
1513        // on disk become unreachable after reopen.
1514        let snapshots_bytes = encode_to_vec(&self.snapshots, standard())
1515            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1516        self.store.write_meta("snapshots.bin", &snapshots_bytes)?;
1517
1518        // Persist revision counters as a [u64; 4].
1519        let counters: [u64; 4] = [
1520            self.revision.load(Ordering::Relaxed),
1521            self.next_block_id.load(Ordering::Relaxed),
1522            self.next_snapshot_id.load(Ordering::Relaxed),
1523            self.next_branch_id.load(Ordering::Relaxed),
1524        ];
1525        let counters_bytes = encode_to_vec(&counters, standard())
1526            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1527        self.store.write_meta("counters.bin", &counters_bytes)?;
1528
1529        Ok(())
1530    }
1531
1532    #[cfg(feature = "sync")]
1533    pub(super) fn enqueue_sync(&mut self, op: SyncOperation) -> io::Result<()> {
1534        let mut state = self
1535            .outbox_state
1536            .lock()
1537            .map_err(|_| io::Error::new(io::ErrorKind::Other, "sync outbox lock poisoned"))?;
1538        state.enqueue(op);
1539        save_outbox(&self.outbox_path, &state)
1540    }
1541}
1542
1543#[cfg(feature = "sync")]
1544impl Drop for InfiniteDb {
1545    fn drop(&mut self) {
1546        self.stop_background_sync();
1547    }
1548}
1549
1550// ---------------------------------------------------------------------------
1551// WAL recovery
1552// ---------------------------------------------------------------------------
1553
1554fn recover_wal(wal_path: &PathBuf) -> io::Result<Vec<WalEntry>> {
1555    if !wal_path.exists() {
1556        return Ok(vec![]);
1557    }
1558    let mut reader = crate::infinitedb_storage::wal::WalReader::open(wal_path.clone())?;
1559    reader.entries()
1560}
1561
1562// ---------------------------------------------------------------------------
1563// MemoryStats
1564// ---------------------------------------------------------------------------
1565
1566/// Point-in-time snapshot of database memory and cache usage.
1567#[derive(Debug, Clone)]
1568pub struct MemoryStats {
1569    /// Records currently in the in-memory write buffer (not yet flushed).
1570    pub buffer_records: usize,
1571    /// Approximate bytes occupied by the write buffer.
1572    pub buffer_bytes: usize,
1573    /// Bytes currently resident in the LRU block cache.
1574    pub cache_bytes: usize,
1575    /// Number of blocks held in the LRU block cache.
1576    pub cache_blocks: usize,
1577    /// Total Hilbert index entries across all active snapshots.
1578    pub snapshot_index_entries: usize,
1579    /// Highest revision number issued so far.
1580    pub total_revision: u64,
1581    /// Total sealed blocks written (cumulative, not current on-disk count).
1582    pub sealed_blocks: u64,
1583}
1584
1585impl MemoryStats {
1586    /// Estimated total process-level RAM attributed to the database.
1587    pub fn total_ram_bytes(&self) -> usize {
1588        self.buffer_bytes
1589            + self.cache_bytes
1590            // Snapshot index: each entry is approx (u128 key + u64 id) = 24 bytes.
1591            + self.snapshot_index_entries * 24
1592            // Fixed overhead: registries, atomics, BTreeMap nodes.
1593            + 4096
1594    }
1595
1596    /// Pretty-print the current memory statistics to stdout.
1597    pub fn print(&self) {
1598        println!("\n╔═══ InfiniteDb Memory Stats ═══╗");
1599        println!("║  Write buffer       {:>6} records  ({} bytes)",
1600            self.buffer_records, fmt_bytes(self.buffer_bytes));
1601        println!("║  LRU block cache    {:>6} blocks   ({} bytes / 10 MB limit)",
1602            self.cache_blocks, fmt_bytes(self.cache_bytes));
1603        println!("║  Snapshot index     {:>6} entries", self.snapshot_index_entries);
1604        println!("║  Total revisions    {:>6}", self.total_revision);
1605        println!("║  Sealed blocks      {:>6}", self.sealed_blocks);
1606        println!("║  ──────────────────────────────────────────────");
1607        println!("║  Est. total RAM     {}", fmt_bytes(self.total_ram_bytes()));
1608        println!("╚════════════════════════════════");
1609    }
1610}
1611
1612fn fmt_bytes(b: usize) -> String {
1613    if b < 1024 { format!("{} B", b) }
1614    else if b < 1024 * 1024 { format!("{:.1} KB", b as f64 / 1024.0) }
1615    else { format!("{:.2} MB", b as f64 / (1024.0 * 1024.0)) }
1616}
1617
1618// ---------------------------------------------------------------------------
1619// Metadata persistence helpers
1620// ---------------------------------------------------------------------------
1621
1622/// Load persisted metadata from the block store.
1623/// Returns defaults when no metadata exists yet.
1624#[allow(clippy::type_complexity)]
1625fn load_meta(store: &BlockStore) -> Option<MetaTuple> {
1626    let counters_bytes = store.read_meta("counters.bin").ok()?;
1627    // Current format is [u64; 4]; fall back to the legacy [u64; 3] layout
1628    // (pre branch-persistence) and default the branch counter.
1629    let (revision, next_block, next_snapshot, next_branch) =
1630        match decode_from_slice::<[u64; 4], _>(&counters_bytes, standard()) {
1631            Ok((c, _)) => (c[0], c[1], c[2], c[3]),
1632            Err(_) => {
1633                let (c, _): ([u64; 3], _) =
1634                    decode_from_slice(&counters_bytes, standard()).ok()?;
1635                (c[0], c[1], c[2], 2)
1636            }
1637        };
1638
1639    let spaces_bytes = store.read_meta("spaces.bin").ok()?;
1640    let (spaces, _): (SpaceRegistry, _) = decode_from_slice(&spaces_bytes, standard()).ok()?;
1641
1642    // Branches and snapshots may be absent on databases created before they
1643    // were persisted — fall back to empty registries in that case.
1644    let branches = store
1645        .read_meta("branches.bin")
1646        .ok()
1647        .and_then(|b| decode_from_slice::<BranchRegistry, _>(&b, standard()).ok())
1648        .map(|(r, _)| r)
1649        .unwrap_or_else(BranchRegistry::new);
1650
1651    let snapshots = store
1652        .read_meta("snapshots.bin")
1653        .ok()
1654        .and_then(|b| decode_from_slice::<BTreeMap<u64, Snapshot>, _>(&b, standard()).ok())
1655        .map(|(m, _)| m)
1656        .unwrap_or_default();
1657
1658    Some((spaces, branches, snapshots, revision, next_block, next_snapshot, next_branch))
1659}
1660
1661/// Persisted metadata: spaces, branches, snapshots, and the four ID counters
1662/// (`revision`, `next_block`, `next_snapshot`, `next_branch`).
1663type MetaTuple = (
1664    SpaceRegistry,
1665    BranchRegistry,
1666    BTreeMap<u64, Snapshot>,
1667    u64,
1668    u64,
1669    u64,
1670    u64,
1671);
1672
1673fn default_meta() -> MetaTuple {
1674    (
1675        SpaceRegistry::new(),
1676        BranchRegistry::new(),
1677        BTreeMap::new(),
1678        0,
1679        1,
1680        1,
1681        2,
1682    )
1683}
1684
1685fn hyperedge_point(id: HyperedgeId) -> DimensionVector {
1686    DimensionVector::new(vec![(id.0 >> 32) as u32, (id.0 & 0xFFFF_FFFF) as u32])
1687}
1688
1689/// Reserved space mapping `(edge space, HyperedgeId)` → stored centroid point.
1690pub(super) const HYPEREDGE_LOCATOR_SPACE: SpaceId = SpaceId(u64::MAX - 2);
1691/// Locator key dims: `[space_hi, space_lo, id_hi, id_lo]`.
1692const HYPEREDGE_LOCATOR_DIMS: usize = 4;
1693
1694/// Deterministic locator key for an `(edge space, edge id)` pair.
1695pub(super) fn locator_point(space: SpaceId, id: HyperedgeId) -> DimensionVector {
1696    DimensionVector::new(vec![
1697        (space.0 >> 32) as u32,
1698        (space.0 & 0xFFFF_FFFF) as u32,
1699        (id.0 >> 32) as u32,
1700        (id.0 & 0xFFFF_FFFF) as u32,
1701    ])
1702}
1703
1704/// Centroid-based storage point for a hyperedge: `[centroid…, id_hi, id_lo]`.
1705///
1706/// Spatially-related edges (similar endpoint centroids) get numerically close
1707/// Hilbert keys; the trailing id dimensions keep distinct edges from colliding
1708/// at the same record address. Returns `None` when the edge has no shared-space
1709/// centroid (caller falls back to id-based keying).
1710fn centroid_hyperedge_point(edge: &Hyperedge) -> Option<DimensionVector> {
1711    let (_space, centroid) = edge.endpoint_centroid()?;
1712    // Reserve the final two dimensions for the id; cap the centroid accordingly.
1713    let mut coords = centroid;
1714    coords.truncate(14);
1715    coords.push((edge.id.0 >> 32) as u32);
1716    coords.push((edge.id.0 & 0xFFFF_FFFF) as u32);
1717    Some(DimensionVector::new(coords))
1718}
1719
1720// ---------------------------------------------------------------------------
1721// Tests
1722// ---------------------------------------------------------------------------
1723
1724#[cfg(test)]
1725mod tests {
1726    use super::*;
1727    use tempfile::TempDir;
1728    use crate::infinitedb_core::address::{DimensionVector, SpaceId};
1729    use crate::infinitedb_core::adapter::{AdapterEndpoint, KindLabel, SpaceBinding};
1730    use crate::infinitedb_core::branch::BranchId;
1731    use crate::infinitedb_core::hyperedge::{EndpointRef, EndpointRole, Hyperedge, HyperedgeId, HyperedgeKind};
1732    use crate::infinitedb_core::kind_catalog::{KindCatalog, KindDefinition, UnknownKindPolicy};
1733    use crate::infinitedb_core::signal::{SignalId, SignalKind, SignalSample, SignalScope};
1734    use crate::infinitedb_core::space::SpaceConfig;
1735    #[cfg(feature = "sync")]
1736    use std::sync::Arc;
1737    #[cfg(feature = "sync")]
1738    use crate::infinitedb_sync::transport::{SyncEnvelope, SyncResult, SyncTransport};
1739
1740    fn open_tmp() -> (InfiniteDb, TempDir) {
1741        let dir = TempDir::new().unwrap();
1742        let db = InfiniteDb::open(dir.path()).unwrap();
1743        (db, dir)
1744    }
1745
1746    enum BeamKinds {
1747        BearsOn,
1748        BendingMoment,
1749    }
1750
1751    impl KindLabel for BeamKinds {
1752        fn label(&self) -> &str {
1753            match self {
1754                BeamKinds::BearsOn => "beam.bears_on",
1755                BeamKinds::BendingMoment => "beam.bending_moment",
1756            }
1757        }
1758    }
1759
1760    struct BeamSignalSpace;
1761    impl SpaceBinding for BeamSignalSpace {
1762        const SPACE_ID: SpaceId = SpaceId(88);
1763        const DIMS: usize = 3;
1764        const SPACE_NAME: &'static str = "beam_signals";
1765    }
1766
1767    #[test]
1768    fn insert_and_query_unflushed() {
1769        let (mut db, _dir) = open_tmp();
1770        let space = SpaceId(1);
1771        db.insert(space, DimensionVector::new(vec![10, 20]), vec![1, 2, 3]).unwrap();
1772        let results = db.query(space, None).unwrap();
1773        assert_eq!(results.len(), 1);
1774    }
1775
1776    #[test]
1777    fn insert_flush_query() {
1778        let (mut db, _dir) = open_tmp();
1779        let space = SpaceId(1);
1780        db.insert(space, DimensionVector::new(vec![5, 5]), vec![42]).unwrap();
1781        db.flush(space).unwrap();
1782        let results = db.query(space, None).unwrap();
1783        assert_eq!(results.len(), 1);
1784        assert_eq!(results[0].data, vec![42]);
1785    }
1786
1787    #[test]
1788    fn flush_records_block_key_interval() {
1789        let (mut db, _dir) = open_tmp();
1790        let space = SpaceId(1);
1791        let p_lo = DimensionVector::new(vec![1, 1]);
1792        let p_hi = DimensionVector::new(vec![200, 200]);
1793        db.insert(space, p_lo.clone(), vec![1]).unwrap();
1794        db.insert(space, p_hi.clone(), vec![2]).unwrap();
1795        db.flush(space).unwrap();
1796
1797        let snapshot = db.snapshots.get(&space.0).unwrap();
1798        assert_eq!(snapshot.blocks.len(), 1);
1799        let (min_key, entry) = snapshot.blocks.iter().next().unwrap();
1800        let ka = hilbert_key_standard(&p_lo);
1801        let kb = hilbert_key_standard(&p_hi);
1802        assert_eq!(*min_key, ka.min(kb));
1803        assert_eq!(entry.max_key, ka.max(kb));
1804    }
1805
1806    #[test]
1807    fn range_pruning_skips_non_overlapping_blocks() {
1808        let (mut db, _dir) = open_tmp();
1809        let space = SpaceId(1);
1810        // Three separated points, each sealed into its own single-record block.
1811        let points = [
1812            DimensionVector::new(vec![1, 1]),
1813            DimensionVector::new(vec![120, 30]),
1814            DimensionVector::new(vec![250, 200]),
1815        ];
1816        for (i, p) in points.iter().enumerate() {
1817            db.insert(space, p.clone(), vec![i as u8]).unwrap();
1818            db.flush(space).unwrap();
1819        }
1820        assert_eq!(db.snapshots.get(&space.0).unwrap().blocks.len(), 3);
1821
1822        // Query the exact key interval of the middle block only.
1823        let k_mid = hilbert_key_standard(&points[1]);
1824        let results = db.query_inner(space, Some((k_mid, k_mid)), None, false).unwrap();
1825        assert_eq!(results.len(), 1);
1826        assert_eq!(results[0].address.point, points[1]);
1827    }
1828
1829    #[test]
1830    fn execute_range_matches_query_bbox() {
1831        use crate::infinitedb_core::query::{Query, SpatialRange};
1832        let (mut db, _dir) = open_tmp();
1833        let space = SpaceId(1);
1834        db.register_space(SpaceConfig::new(space, "s", 2)).unwrap();
1835        db.insert(space, DimensionVector::new(vec![5, 5]), vec![1]).unwrap();
1836        db.insert(space, DimensionVector::new(vec![8, 2]), vec![2]).unwrap();
1837        db.insert(space, DimensionVector::new(vec![200, 200]), vec![3]).unwrap();
1838        db.flush(space).unwrap();
1839
1840        let min = DimensionVector::new(vec![0, 0]);
1841        let max = DimensionVector::new(vec![10, 10]);
1842
1843        let mut via_bbox = db
1844            .query_bbox(space, min.clone(), max.clone(), None)
1845            .unwrap();
1846        let snap = db.current_snapshot(space).unwrap();
1847        let q = Query::new(space, snap).with_range(SpatialRange::new(min, max));
1848        let mut via_execute = db.execute(&q).unwrap();
1849
1850        via_bbox.sort_by_key(|r| r.data.clone());
1851        via_execute.sort_by_key(|r| r.data.clone());
1852        assert_eq!(via_bbox.len(), 2);
1853        assert_eq!(
1854            via_bbox.iter().map(|r| r.data.clone()).collect::<Vec<_>>(),
1855            via_execute.iter().map(|r| r.data.clone()).collect::<Vec<_>>()
1856        );
1857    }
1858
1859    #[test]
1860    fn execute_include_tombstones_flag() {
1861        use crate::infinitedb_core::query::Query;
1862        let (mut db, _dir) = open_tmp();
1863        let space = SpaceId(1);
1864        let point = DimensionVector::new(vec![3, 3]);
1865        db.insert(space, point.clone(), vec![1]).unwrap();
1866        db.delete(space, point).unwrap();
1867
1868        // No snapshot exists yet (nothing flushed); validation is skipped.
1869        let snap = SnapshotId(0);
1870        let default = db.execute(&Query::new(space, snap)).unwrap();
1871        assert_eq!(default.len(), 0, "tombstoned record hidden by default");
1872
1873        let with_tombstones = db
1874            .execute(&Query::new(space, snap).include_tombstones())
1875            .unwrap();
1876        assert!(
1877            with_tombstones.iter().any(|r| r.tombstone),
1878            "include_tombstones surfaces the tombstone"
1879        );
1880    }
1881
1882    #[test]
1883    fn block_sealed_replay_reconciles_after_partial_crash() {
1884        let dir = TempDir::new().unwrap();
1885        let space = SpaceId(1);
1886        let point = DimensionVector::new(vec![7, 9]);
1887        {
1888            let mut db = InfiniteDb::open(dir.path()).unwrap();
1889            db.register_space(SpaceConfig::new(space, "s", 2)).unwrap();
1890            let rev = db.insert(space, point.clone(), vec![5]).unwrap();
1891
1892            // Simulate a crash between sealing a block and persisting metadata:
1893            // write the block durably and append its WAL marker, but never call
1894            // persist_meta / rewrite the WAL.
1895            let block_id = db.alloc_block_id();
1896            let snap_id = db.alloc_snapshot_id();
1897            let record = Record {
1898                address: Address::new(space, point.clone()),
1899                revision: rev,
1900                data: vec![5],
1901                tombstone: false,
1902            };
1903            let mut block = Block {
1904                id: block_id,
1905                space,
1906                records: vec![record],
1907                min_revision: rev,
1908                max_revision: rev,
1909                checksum: [0u8; 32],
1910            };
1911            block.checksum = compute_checksum(&block).unwrap();
1912            db.store.write_block(&block).unwrap();
1913            db.wal
1914                .append(&WalEntry::BlockSealed { block_id, space, snapshot: snap_id })
1915                .unwrap();
1916        }
1917
1918        // Reopen: snapshots.bin never recorded the block, but BlockSealed replay
1919        // must relink it and the record must surface exactly once.
1920        let mut db = InfiniteDb::open(dir.path()).unwrap();
1921        let results = db.query(space, None).unwrap();
1922        assert_eq!(results.len(), 1);
1923        assert_eq!(results[0].data, vec![5]);
1924        assert_eq!(db.snapshots.get(&space.0).unwrap().blocks.len(), 1);
1925    }
1926
1927    #[test]
1928    fn delete_tombstones_record() {
1929        let (mut db, _dir) = open_tmp();
1930        let space = SpaceId(1);
1931        let point = DimensionVector::new(vec![1, 1]);
1932        db.insert(space, point.clone(), vec![99]).unwrap();
1933        db.delete(space, point).unwrap();
1934        let results = db.query(space, None).unwrap();
1935        // Tombstone suppresses the record in live queries.
1936        assert!(results.iter().all(|r| !r.tombstone));
1937    }
1938
1939    #[test]
1940    fn as_of_returns_historical_state() {
1941        let (mut db, _dir) = open_tmp();
1942        let space = SpaceId(1);
1943        let rev1 = db.insert(space, DimensionVector::new(vec![1, 0]), vec![1]).unwrap();
1944        let _rev2 = db.insert(space, DimensionVector::new(vec![2, 0]), vec![2]).unwrap();
1945        // Query at rev1 should see only the first record.
1946        let results = db.query(space, Some(rev1)).unwrap();
1947        assert_eq!(results.len(), 1);
1948        assert_eq!(results[0].data, vec![1]);
1949    }
1950
1951    #[test]
1952    fn register_space_rejects_precision_overflow() {
1953        let (mut db, _dir) = open_tmp();
1954        let err = db
1955            .register_space(
1956                SpaceConfig::new(SpaceId(99), "big", 16).with_bits_per_dim(9),
1957            )
1958            .unwrap_err();
1959        assert!(err.contains("dims * bits_per_dim"));
1960    }
1961
1962    #[test]
1963    fn different_space_precision_produces_different_keys() {
1964        use crate::infinitedb_index::hilbert_key_for;
1965        use crate::infinitedb_index::composite::KeyConfig;
1966        let (mut db, _dir) = open_tmp();
1967        let coarse = SpaceId(10);
1968        let fine = SpaceId(11);
1969        db.register_space(SpaceConfig::new(coarse, "coarse", 2).with_bits_per_dim(4))
1970            .unwrap();
1971        db.register_space(SpaceConfig::new(fine, "fine", 2).with_bits_per_dim(8))
1972            .unwrap();
1973        let point = DimensionVector::new(vec![100, 200]);
1974        let k_coarse = hilbert_key_for(&point, KeyConfig { bits_per_dim: 4 });
1975        let k_fine = hilbert_key_for(&point, KeyConfig { bits_per_dim: 8 });
1976        assert_ne!(k_coarse, k_fine);
1977        db.insert(coarse, point.clone(), vec![1]).unwrap();
1978        db.insert(fine, point, vec![2]).unwrap();
1979        db.flush(coarse).unwrap();
1980        db.flush(fine).unwrap();
1981    }
1982
1983    #[test]
1984    fn endpoint_index_returns_incident_edges_only() {
1985        let (mut db, _dir) = open_tmp();
1986        let edge_space = SpaceId(50);
1987        db.register_space(SpaceConfig::new(edge_space, "edges", 2)).unwrap();
1988        let shared = EndpointRef {
1989            role: EndpointRole::new("hub"),
1990            space: SpaceId(1),
1991            node: DimensionVector::new(vec![5]),
1992        };
1993        let other = EndpointRef {
1994            role: EndpointRole::new("leaf"),
1995            space: SpaceId(2),
1996            node: DimensionVector::new(vec![99]),
1997        };
1998        for (id, ep_b) in [
1999            (1u64, DimensionVector::new(vec![10, 0])),
2000            (2, DimensionVector::new(vec![20, 0])),
2001            (3, DimensionVector::new(vec![30, 0])),
2002        ] {
2003            let edge = Hyperedge {
2004                id: HyperedgeId(id),
2005                kind: HyperedgeKind::new("link"),
2006                endpoints: vec![
2007                    shared.clone(),
2008                    EndpointRef {
2009                        role: EndpointRole::new("peer"),
2010                        space: SpaceId(3),
2011                        node: ep_b,
2012                    },
2013                ],
2014                weight_milli: None,
2015                metadata: Default::default(),
2016                valid_from: RevisionId::ZERO,
2017                valid_to: None,
2018            };
2019            db.insert_hyperedge(edge_space, edge).unwrap();
2020        }
2021        // Two edges that do not touch `shared`.
2022        for id in [10u64, 11] {
2023            let edge = Hyperedge {
2024                id: HyperedgeId(id),
2025                kind: HyperedgeKind::new("other"),
2026                endpoints: vec![other.clone(), other.clone()],
2027                weight_milli: None,
2028                metadata: Default::default(),
2029                valid_from: RevisionId::ZERO,
2030                valid_to: None,
2031            };
2032            db.insert_hyperedge(edge_space, edge).unwrap();
2033        }
2034        db.flush(edge_space).unwrap();
2035        db.flush(ENDPOINT_INDEX_SPACE).unwrap();
2036        let found = db
2037            .query_hyperedges_for_endpoint(edge_space, &shared, None)
2038            .unwrap();
2039        assert_eq!(found.len(), 3, "only edges incident on the shared endpoint");
2040    }
2041
2042    #[test]
2043    fn traverse_respects_max_depth() {
2044        use crate::infinitedb_core::traversal::TraversalSpec;
2045        let (mut db, _dir) = open_tmp();
2046        let edge_space = SpaceId(60);
2047        db.register_space(SpaceConfig::new(edge_space, "edges", 2)).unwrap();
2048        let n0 = EndpointRef {
2049            role: EndpointRole::new("n"),
2050            space: SpaceId(10),
2051            node: DimensionVector::new(vec![1]),
2052        };
2053        let n1 = EndpointRef {
2054            role: EndpointRole::new("n"),
2055            space: SpaceId(11),
2056            node: DimensionVector::new(vec![2]),
2057        };
2058        let n2 = EndpointRef {
2059            role: EndpointRole::new("n"),
2060            space: SpaceId(12),
2061            node: DimensionVector::new(vec![3]),
2062        };
2063        let n3 = EndpointRef {
2064            role: EndpointRole::new("n"),
2065            space: SpaceId(13),
2066            node: DimensionVector::new(vec![4]),
2067        };
2068        db.insert_hyperedge(
2069            edge_space,
2070            Hyperedge {
2071                id: HyperedgeId(1),
2072                kind: HyperedgeKind::new("chain"),
2073                endpoints: vec![n0.clone(), n1.clone()],
2074                weight_milli: None,
2075                metadata: Default::default(),
2076                valid_from: RevisionId::ZERO,
2077                valid_to: None,
2078            },
2079        )
2080        .unwrap();
2081        db.insert_hyperedge(
2082            edge_space,
2083            Hyperedge {
2084                id: HyperedgeId(2),
2085                kind: HyperedgeKind::new("chain"),
2086                endpoints: vec![n1.clone(), n2.clone()],
2087                weight_milli: None,
2088                metadata: Default::default(),
2089                valid_from: RevisionId::ZERO,
2090                valid_to: None,
2091            },
2092        )
2093        .unwrap();
2094        db.insert_hyperedge(
2095            edge_space,
2096            Hyperedge {
2097                id: HyperedgeId(3),
2098                kind: HyperedgeKind::new("chain"),
2099                endpoints: vec![n2.clone(), n3.clone()],
2100                weight_milli: None,
2101                metadata: Default::default(),
2102                valid_from: RevisionId::ZERO,
2103                valid_to: None,
2104            },
2105        )
2106        .unwrap();
2107        db.flush(edge_space).unwrap();
2108        db.flush(ENDPOINT_INDEX_SPACE).unwrap();
2109
2110        assert!(
2111            db.query_hyperedges_for_endpoint(edge_space, &n0, None)
2112                .unwrap()
2113                .len()
2114                >= 1,
2115            "index must list edges incident on n0"
2116        );
2117        assert!(
2118            db.query_hyperedges_for_endpoint(edge_space, &n1, None)
2119                .unwrap()
2120                .len()
2121                >= 2,
2122            "index must list all edges incident on n1"
2123        );
2124
2125        let depth2 = db
2126            .traverse(
2127                edge_space,
2128                &TraversalSpec {
2129                    start: n0.clone(),
2130                    max_depth: 2,
2131                    follow_kinds: None,
2132                    as_of: None,
2133                },
2134            )
2135            .unwrap();
2136        assert!(
2137            depth2.edges.iter().any(|e| e.id == HyperedgeId(2)),
2138            "expected edge n1–n2 in subgraph, edges={:?}",
2139            depth2.edges.iter().map(|e| e.id.0).collect::<Vec<_>>()
2140        );
2141        assert!(
2142            depth2.nodes.iter().any(|n| n.space == SpaceId(12)),
2143            "expected n2 at depth 2, nodes={:?}",
2144            depth2
2145                .nodes
2146                .iter()
2147                .map(|n| (n.space.0, n.node.coords.clone()))
2148                .collect::<Vec<_>>()
2149        );
2150        assert!(!depth2.nodes.iter().any(|n| n.space == SpaceId(13)));
2151
2152        let depth1 = db
2153            .traverse(
2154                edge_space,
2155                &TraversalSpec {
2156                    start: n0,
2157                    max_depth: 1,
2158                    follow_kinds: None,
2159                    as_of: None,
2160                },
2161            )
2162            .unwrap();
2163        assert!(depth1.nodes.iter().any(|n| n.space == SpaceId(11)));
2164        assert!(!depth1.nodes.iter().any(|n| n.space == SpaceId(12)));
2165    }
2166
2167    #[test]
2168    fn compact_space_reduces_block_count() {
2169        use crate::infinitedb_storage::compaction::CompactionConfig;
2170        let (mut db, _dir) = open_tmp();
2171        let space = SpaceId(70);
2172        db.register_space(SpaceConfig::new(space, "data", 1)).unwrap();
2173        for i in 0..4u32 {
2174            db.insert(space, DimensionVector::new(vec![i]), vec![i as u8]).unwrap();
2175            db.flush(space).unwrap();
2176        }
2177        assert_eq!(db.snapshots.get(&space.0).unwrap().blocks.len(), 4);
2178        let result = db
2179            .compact_space(
2180                space,
2181                &CompactionConfig {
2182                    max_records_per_block: 16,
2183                    retain_history: true,
2184                },
2185            )
2186            .unwrap();
2187        assert_eq!(result.superseded.len(), 4);
2188        assert_eq!(result.new_blocks.len(), 1);
2189        assert_eq!(db.snapshots.get(&space.0).unwrap().blocks.len(), 1);
2190        let records = db.query(space, None).unwrap();
2191        assert_eq!(records.len(), 4);
2192    }
2193
2194    #[test]
2195    fn create_branch_succeeds() {
2196        let (mut db, _dir) = open_tmp();
2197        let main = BranchId(1);
2198        let feature = db.create_branch("feature", main).unwrap();
2199        assert_ne!(feature, main);
2200    }
2201
2202    fn clustered_edge(id: u64, a: Vec<u32>, b: Vec<u32>) -> Hyperedge {
2203        Hyperedge {
2204            id: HyperedgeId(id),
2205            kind: HyperedgeKind::new("near"),
2206            endpoints: vec![
2207                EndpointRef {
2208                    role: EndpointRole::new("a"),
2209                    space: SpaceId(1),
2210                    node: DimensionVector::new(a),
2211                },
2212                EndpointRef {
2213                    role: EndpointRole::new("b"),
2214                    space: SpaceId(1),
2215                    node: DimensionVector::new(b),
2216                },
2217            ],
2218            weight_milli: None,
2219            metadata: Default::default(),
2220            valid_from: RevisionId::ZERO,
2221            valid_to: None,
2222        }
2223    }
2224
2225    #[test]
2226    fn centroid_keying_clusters_nearby_edges() {
2227        // Edges over nearby endpoints should get closer Hilbert keys than edges
2228        // over distant endpoints.
2229        let a = clustered_edge(1, vec![10, 10], vec![12, 12]); // centroid ~ (11,11)
2230        let b = clustered_edge(2, vec![13, 13], vec![15, 15]); // centroid ~ (14,14)
2231        let c = clustered_edge(3, vec![240, 240], vec![250, 250]); // centroid ~ (245,245)
2232
2233        let pa = centroid_hyperedge_point(&a).unwrap();
2234        let pb = centroid_hyperedge_point(&b).unwrap();
2235        let pc = centroid_hyperedge_point(&c).unwrap();
2236
2237        let ka = hilbert_key_standard(&pa);
2238        let kb = hilbert_key_standard(&pb);
2239        let kc = hilbert_key_standard(&pc);
2240
2241        let near = ka.abs_diff(kb);
2242        let far = ka.abs_diff(kc);
2243        assert!(
2244            near < far,
2245            "nearby edges should cluster: near={near} far={far}"
2246        );
2247    }
2248
2249    #[test]
2250    fn centroid_keyed_edges_are_addressable_by_id() {
2251        let (mut db, _dir) = open_tmp();
2252        let edge_space = SpaceId(80);
2253        db.register_space(
2254            SpaceConfig::new(edge_space, "centroid_edges", 4).with_centroid_keying(),
2255        )
2256        .unwrap();
2257
2258        let edge = clustered_edge(7, vec![20, 30], vec![22, 34]);
2259        db.insert_hyperedge(edge_space, edge.clone()).unwrap();
2260        db.flush(edge_space).unwrap();
2261
2262        // Fetch by id resolves through the locator (point is not derivable from id).
2263        let fetched = db.fetch_hyperedge_by_id(edge_space, HyperedgeId(7), None).unwrap();
2264        assert!(fetched.is_some());
2265        assert_eq!(fetched.unwrap().id, HyperedgeId(7));
2266
2267        // The endpoint index still resolves incident edges.
2268        let incident = db
2269            .query_hyperedges_for_endpoint(edge_space, &edge.endpoints[0], None)
2270            .unwrap();
2271        assert!(incident.iter().any(|e| e.id == HyperedgeId(7)));
2272
2273        // Delete removes it from id lookup.
2274        db.delete_hyperedge(edge_space, HyperedgeId(7)).unwrap();
2275        db.flush(edge_space).unwrap();
2276        let after = db.fetch_hyperedge_by_id(edge_space, HyperedgeId(7), None).unwrap();
2277        assert!(after.is_none(), "deleted edge must not be addressable");
2278    }
2279
2280    #[test]
2281    fn hyperedge_insert_query_and_delete() {
2282        let (mut db, _dir) = open_tmp();
2283        let edge_space = SpaceId(77);
2284        db.register_space(SpaceConfig::new(edge_space, "hyperedges", 2))
2285        .unwrap();
2286        let edge = Hyperedge {
2287            id: HyperedgeId(42),
2288            kind: HyperedgeKind::new("beam.bears_on"),
2289            endpoints: vec![
2290                EndpointRef {
2291                    role: EndpointRole::new("parent"),
2292                    space: SpaceId(10),
2293                    node: DimensionVector::new(vec![100]),
2294                },
2295                EndpointRef {
2296                    role: EndpointRole::new("support"),
2297                    space: SpaceId(11),
2298                    node: DimensionVector::new(vec![200]),
2299                },
2300            ],
2301            weight_milli: Some(1_000),
2302            metadata: std::collections::BTreeMap::new(),
2303            valid_from: RevisionId::ZERO,
2304            valid_to: None,
2305        };
2306        db.insert_hyperedge(edge_space, edge.clone()).unwrap();
2307        db.flush(edge_space).unwrap();
2308        let by_kind = db
2309            .query_hyperedges_by_kind(edge_space, "beam.bears_on", None)
2310            .unwrap();
2311        assert_eq!(by_kind.len(), 1);
2312        assert_eq!(by_kind[0].id.0, edge.id.0);
2313        db.delete_hyperedge(edge_space, edge.id).unwrap();
2314        let after_delete = db.query_hyperedges(edge_space, None).unwrap();
2315        assert!(after_delete.is_empty());
2316    }
2317
2318    #[test]
2319    fn signal_scope_and_range_queries() {
2320        let (mut db, _dir) = open_tmp();
2321        let signal_space = SpaceId(88);
2322        db.register_space(SpaceConfig::new(signal_space, "beam_signals", 3))
2323        .unwrap();
2324
2325        let scope = SignalScope {
2326            parent_prefix: DimensionVector::new(vec![7]),
2327            total_dims: 3,
2328        };
2329        db.insert_signal_sample(
2330            signal_space,
2331            SignalSample {
2332                signal_id: SignalId(1),
2333                kind: SignalKind::new("beam.bending_moment"),
2334                scope: scope.clone(),
2335                local_coords: vec![0, 0],
2336                value_milli: 10_000,
2337                source_revision: None,
2338                constraint: None,
2339            },
2340        )
2341        .unwrap();
2342        db.insert_signal_sample(
2343            signal_space,
2344            SignalSample {
2345                signal_id: SignalId(1),
2346                kind: SignalKind::new("beam.bending_moment"),
2347                scope,
2348                local_coords: vec![5, 0],
2349                value_milli: 20_000,
2350                source_revision: None,
2351                constraint: None,
2352            },
2353        )
2354        .unwrap();
2355        db.flush(signal_space).unwrap();
2356
2357        let scoped = db.query_signal_scope(signal_space, &[7], None).unwrap();
2358        assert_eq!(scoped.len(), 2);
2359        let ranged = db
2360            .query_signal_range(signal_space, &[7], &[0, 0], &[2, u32::MAX], None)
2361            .unwrap();
2362        assert_eq!(ranged.len(), 1);
2363    }
2364
2365    #[test]
2366    fn adapter_wrappers_accept_typed_kinds_and_space_binding() {
2367        let (mut db, _dir) = open_tmp();
2368        let edge_space = SpaceId(177);
2369        db.register_space(SpaceConfig::new(edge_space, "adapter_edges", 2))
2370        .unwrap();
2371        db.register_space(SpaceConfig::new(
2372            BeamSignalSpace::SPACE_ID,
2373            BeamSignalSpace::SPACE_NAME,
2374            BeamSignalSpace::DIMS,
2375        ))
2376        .unwrap();
2377
2378        let mut catalog = KindCatalog::new(UnknownKindPolicy::RejectUnknown);
2379        catalog.register_edge_kind(KindDefinition::new("beam.bears_on"));
2380        catalog.register_endpoint_role(KindDefinition::new("parent"));
2381        catalog.register_endpoint_role(KindDefinition::new("support"));
2382        catalog.register_signal_kind(KindDefinition::new("beam.bending_moment"));
2383
2384        db.insert_hyperedge_typed(
2385            edge_space,
2386            HyperedgeId(900),
2387            BeamKinds::BearsOn,
2388            vec![
2389                AdapterEndpoint::new("parent", SpaceId(1), DimensionVector::new(vec![10])),
2390                AdapterEndpoint::new("support", SpaceId(2), DimensionVector::new(vec![20])),
2391            ],
2392            Some(1000),
2393            std::collections::BTreeMap::new(),
2394            None,
2395            Some(&catalog),
2396        )
2397        .unwrap();
2398        db.flush(edge_space).unwrap();
2399        let edges = db
2400            .query_hyperedges_by_kind_typed(edge_space, BeamKinds::BearsOn, None)
2401            .unwrap();
2402        assert_eq!(edges.len(), 1);
2403
2404        db.insert_signal_sample_typed::<BeamSignalSpace, _>(
2405            SignalId(1),
2406            BeamKinds::BendingMoment,
2407            DimensionVector::new(vec![7]),
2408            vec![0, 0],
2409            1234,
2410            None,
2411            None,
2412            Some(&catalog),
2413        )
2414        .unwrap();
2415    }
2416
2417    #[test]
2418    fn adapter_rejects_unknown_kind_under_reject_policy() {
2419        let (mut db, _dir) = open_tmp();
2420        let edge_space = SpaceId(178);
2421        db.register_space(SpaceConfig::new(edge_space, "adapter_edges2", 2)).unwrap();
2422
2423        let catalog = KindCatalog::new(UnknownKindPolicy::RejectUnknown);
2424        let err = db
2425            .insert_hyperedge_typed(
2426                edge_space,
2427                HyperedgeId(901),
2428                "unknown.edge.kind",
2429                vec![
2430                    AdapterEndpoint::new("parent", SpaceId(1), DimensionVector::new(vec![1])),
2431                    AdapterEndpoint::new("support", SpaceId(2), DimensionVector::new(vec![2])),
2432                ],
2433                None,
2434                std::collections::BTreeMap::new(),
2435                None,
2436                Some(&catalog),
2437            )
2438            .unwrap_err();
2439        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
2440    }
2441
2442    #[test]
2443    fn adapter_rejects_space_binding_dim_mismatch() {
2444        struct WrongDimSpace;
2445        impl SpaceBinding for WrongDimSpace {
2446            const SPACE_ID: SpaceId = SpaceId(188);
2447            const DIMS: usize = 4;
2448        }
2449        let (mut db, _dir) = open_tmp();
2450        db.register_space(SpaceConfig::new(SpaceId(188), "wrong_dim", 3)).unwrap();
2451        let err = db
2452            .insert_signal_sample_typed::<WrongDimSpace, _>(
2453                SignalId(2),
2454                "beam.any",
2455                DimensionVector::new(vec![7]),
2456                vec![0, 0],
2457                999,
2458                None,
2459                None,
2460                None,
2461            )
2462            .unwrap_err();
2463        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
2464    }
2465
2466    #[cfg(feature = "sync")]
2467    struct AckTransport;
2468    #[cfg(feature = "sync")]
2469    impl SyncTransport for AckTransport {
2470        fn push_batch(&self, batch: &[SyncEnvelope]) -> Result<Vec<SyncResult>, String> {
2471            Ok(batch
2472                .iter()
2473                .map(|item| SyncResult::Ack { op_id: item.op_id })
2474                .collect())
2475        }
2476    }
2477
2478    #[cfg(feature = "sync")]
2479    struct FlakyTransport;
2480    #[cfg(feature = "sync")]
2481    impl SyncTransport for FlakyTransport {
2482        fn push_batch(&self, batch: &[SyncEnvelope]) -> Result<Vec<SyncResult>, String> {
2483            Ok(batch
2484                .iter()
2485                .map(|item| SyncResult::Retry {
2486                    op_id: item.op_id,
2487                    error: "offline".to_string(),
2488                })
2489                .collect())
2490        }
2491    }
2492
2493    #[cfg(feature = "sync")]
2494    struct StaleConflictTransport;
2495    #[cfg(feature = "sync")]
2496    impl SyncTransport for StaleConflictTransport {
2497        fn push_batch(&self, batch: &[SyncEnvelope]) -> Result<Vec<SyncResult>, String> {
2498            Ok(batch
2499                .iter()
2500                .map(|item| SyncResult::ConflictStale {
2501                    op_id: item.op_id,
2502                    reason: "stale write".to_string(),
2503                })
2504                .collect())
2505        }
2506    }
2507
2508    #[cfg(feature = "sync")]
2509    #[test]
2510    fn outbox_survives_restart() {
2511        let dir = TempDir::new().unwrap();
2512        let space = SpaceId(1);
2513        {
2514            let mut db = InfiniteDb::open(dir.path()).unwrap();
2515            db.insert(space, DimensionVector::new(vec![1, 2]), vec![3]).unwrap();
2516            assert_eq!(db.sync_pending_count(), 1);
2517        }
2518        let db = InfiniteDb::open(dir.path()).unwrap();
2519        assert_eq!(db.sync_pending_count(), 1);
2520    }
2521
2522    #[cfg(feature = "sync")]
2523    #[test]
2524    fn offline_queue_then_manual_sync_drains() {
2525        let (mut db, _dir) = open_tmp();
2526        let space = SpaceId(1);
2527        db.insert(space, DimensionVector::new(vec![10, 10]), vec![7]).unwrap();
2528        let retry_report = db.sync_now(&FlakyTransport, 32).unwrap();
2529        assert_eq!(retry_report.retried, 1);
2530        assert_eq!(db.sync_pending_count(), 1);
2531        std::thread::sleep(Duration::from_millis(2100));
2532        let ack_report = db.sync_now(&AckTransport, 32).unwrap();
2533        assert_eq!(ack_report.acked, 1);
2534        assert_eq!(db.sync_pending_count(), 0);
2535    }
2536
2537    #[cfg(feature = "sync")]
2538    #[test]
2539    fn stale_conflict_is_dropped_under_lww() {
2540        let (mut db, _dir) = open_tmp();
2541        let space = SpaceId(1);
2542        db.insert(space, DimensionVector::new(vec![11, 11]), vec![8]).unwrap();
2543        let report = db.sync_now(&StaleConflictTransport, 32).unwrap();
2544        assert_eq!(report.dropped_stale, 1);
2545        assert_eq!(db.sync_pending_count(), 0);
2546    }
2547
2548    #[cfg(feature = "sync")]
2549    #[test]
2550    fn background_worker_retries_and_acks() {
2551        let (mut db, _dir) = open_tmp();
2552        let space = SpaceId(1);
2553        db.insert(space, DimensionVector::new(vec![20, 20]), vec![1]).unwrap();
2554        db.start_background_sync(
2555            Arc::new(AckTransport),
2556            Duration::from_millis(20),
2557            16,
2558        )
2559        .unwrap();
2560        std::thread::sleep(Duration::from_millis(120));
2561        db.stop_background_sync();
2562        assert_eq!(db.sync_pending_count(), 0);
2563    }
2564}