1use std::{
27 collections::BTreeMap,
28 io,
29 path::{Path, PathBuf},
30 sync::atomic::{AtomicU64, Ordering},
31};
32#[cfg(feature = "sync")]
33use std::{
34 sync::{Arc, Mutex},
35 time::Duration,
36};
37
38use bincode::{config::standard, decode_from_slice, encode_to_vec};
39
40use crate::infinitedb_core::{
41 adapter::{AdapterEndpoint, KindLabel, SpaceBinding},
42 address::{Address, DimensionVector, RevisionId, SpaceId},
43 block::{Block, BlockId, Record},
44 branch::{Branch, BranchId, BranchRegistry},
45 endpoint_index::{
46 decode_hyperedge_id, endpoint_index_point, endpoint_lookup_prefix,
47 edge_endpoints, ENDPOINT_INDEX_BITS_PER_DIM, ENDPOINT_INDEX_DIMS, ENDPOINT_INDEX_SPACE,
48 },
49 hyperedge::{EndpointRef, Hyperedge, HyperedgeId},
50 traversal::{Subgraph, TraversalSpec},
51 kind_catalog::KindCatalog,
52 query::Query,
53 signal::SignalSample,
54 snapshot::{BlockIndexEntry, Snapshot, SnapshotId},
55 space::{SpaceConfig, SpaceRegistry},
56};
57use crate::infinitedb_index::composite::KeyConfig;
58use crate::infinitedb_index::key::{hilbert_key_for, hilbert_key_standard};
59use crate::infinitedb_storage::{
60 compaction::{compact, CompactionConfig, CompactionResult},
61 gc::safe_to_delete,
62 nvme::{compute_checksum, BlockStore},
63 wal::{WalDurability, WalEntry, WalWriter},
64};
65
66#[path = "bulk/mod.rs"]
67mod bulk;
68pub use bulk::{
69 BulkHyperedgeImport, BulkHyperedgeImportOptions, BulkImportResult, BulkRecordImport,
70 BulkSignalImport, BulkWriteOptions, BulkWriteResult,
71};
72#[cfg(feature = "sync")]
73use crate::infinitedb_sync::{delta::Delta, merkle};
74#[cfg(feature = "sync")]
75use crate::infinitedb_sync::{
76 outbox::{load_outbox, save_outbox, OutboxState, SyncReport},
77 transport::{SyncOperation, SyncTransport},
78 worker::BackgroundSyncWorker,
79};
80
81#[derive(Debug, Clone)]
87pub struct OpenOptions {
88 pub wal_durability: WalDurability,
90 pub flush_threshold: usize,
92 pub block_cache_bytes: usize,
94}
95
96impl Default for OpenOptions {
97 fn default() -> Self {
98 Self {
99 wal_durability: WalDurability::Strict,
100 flush_threshold: 256,
101 block_cache_bytes: 10 * 1024 * 1024,
102 }
103 }
104}
105
106impl OpenOptions {
107 pub fn open<P: AsRef<Path>>(&self, dir: P) -> io::Result<InfiniteDb> {
109 InfiniteDb::open_with_options(dir, self)
110 }
111}
112
113pub struct InfiniteDb {
120 store: BlockStore,
121 wal: WalWriter,
122 spaces: SpaceRegistry,
123 branches: BranchRegistry,
124 buffer: Vec<Record>,
126 revision: AtomicU64,
128 next_block_id: AtomicU64,
130 next_snapshot_id: AtomicU64,
132 next_branch_id: AtomicU64,
134 snapshots: BTreeMap<u64, Snapshot>,
136 flush_threshold: usize,
138 defer_auto_flush: bool,
140 bulk_session_active: bool,
142 #[cfg(feature = "sync")]
143 outbox_path: PathBuf,
144 #[cfg(feature = "sync")]
145 outbox_state: Arc<Mutex<OutboxState>>,
146 #[cfg(feature = "sync")]
147 sync_worker: Option<BackgroundSyncWorker>,
148}
149
150impl InfiniteDb {
151 pub fn open<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
153 OpenOptions::default().open(dir)
154 }
155
156 pub fn open_with_options<P: AsRef<Path>>(dir: P, options: &OpenOptions) -> io::Result<Self> {
158 let root = dir.as_ref().to_path_buf();
159 let store = BlockStore::open_with_cache(root.clone(), options.block_cache_bytes)?;
160 let wal_path = store.wal_path();
161 #[cfg(feature = "sync")]
162 let outbox_path = root.join("meta").join("sync_outbox.bin");
163
164 let recovered = recover_wal(&wal_path)?;
166
167 let wal = WalWriter::open_with_durability(wal_path, options.wal_durability)?;
168
169 let (spaces, branches, snapshots, next_rev, next_block, next_snap, next_branch) =
171 load_meta(&store).unwrap_or_else(default_meta);
172
173 let mut db = Self {
174 store,
175 wal,
176 spaces,
177 branches,
178 buffer: Vec::new(),
179 revision: AtomicU64::new(next_rev),
180 next_block_id: AtomicU64::new(next_block),
181 next_snapshot_id: AtomicU64::new(next_snap),
182 next_branch_id: AtomicU64::new(next_branch), snapshots,
184 flush_threshold: options.flush_threshold,
185 defer_auto_flush: false,
186 bulk_session_active: false,
187 #[cfg(feature = "sync")]
188 outbox_state: Arc::new(Mutex::new(load_outbox(&outbox_path)?)),
189 #[cfg(feature = "sync")]
190 outbox_path,
191 #[cfg(feature = "sync")]
192 sync_worker: None,
193 };
194
195 let mut max_rev = db.revision.load(Ordering::Relaxed);
198 for entry in recovered {
199 if let WalEntry::Write { revision, .. } | WalEntry::Tombstone { revision, .. } = &entry {
200 max_rev = max_rev.max(revision.0);
201 }
202 db.apply_wal_entry(entry)?;
203 }
204 db.revision.store(max_rev, Ordering::Relaxed);
205
206 if db.branches.get_by_name("main").is_none() {
208 let snap_id = db.alloc_snapshot_id();
209 let _ = db.branches.insert(Branch {
211 id: BranchId(1),
212 name: "main".to_string(),
213 head: snap_id,
214 parent: None,
215 forked_at: RevisionId::ZERO,
216 });
217 }
218
219 Ok(db)
220 }
221
222 pub fn register_space(&mut self, config: SpaceConfig) -> Result<(), String> {
228 if config.bits_per_dim == 0 {
229 return Err("bits_per_dim must be at least 1".to_string());
230 }
231 if config.dims as u32 * config.bits_per_dim > 128 {
232 return Err(format!(
233 "dims * bits_per_dim must be <= 128 (got {} * {})",
234 config.dims, config.bits_per_dim
235 ));
236 }
237 self.spaces.register(config).map_err(|e| format!("{:?}", e))?;
238 self.persist_meta().map_err(|e| e.to_string())?;
239 Ok(())
240 }
241
242 fn space_key(&self, space: SpaceId, point: &DimensionVector) -> u128 {
245 match self.spaces.get(space) {
246 Some(config) => hilbert_key_for(point, KeyConfig { bits_per_dim: config.bits_per_dim }),
247 None => hilbert_key_standard(point),
248 }
249 }
250
251 fn ensure_endpoint_index_space(&mut self) -> io::Result<()> {
253 if self.spaces.get(ENDPOINT_INDEX_SPACE).is_none() {
254 self.register_space(
255 SpaceConfig::new(
256 ENDPOINT_INDEX_SPACE,
257 "__endpoint_index__",
258 ENDPOINT_INDEX_DIMS,
259 )
260 .with_bits_per_dim(ENDPOINT_INDEX_BITS_PER_DIM),
261 )
262 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
263 }
264 Ok(())
265 }
266
267 fn uses_centroid_keying(&self, space: SpaceId) -> bool {
269 self.spaces.get(space).map(|c| c.centroid_keying).unwrap_or(false)
270 }
271
272 fn edge_storage_point(&self, space: SpaceId, edge: &Hyperedge) -> (DimensionVector, bool) {
278 if self.uses_centroid_keying(space) {
279 if let Some(point) = centroid_hyperedge_point(edge) {
280 return (point, true);
281 }
282 }
283 (hyperedge_point(edge.id), false)
284 }
285
286 fn ensure_locator_space(&mut self) -> io::Result<()> {
288 if self.spaces.get(HYPEREDGE_LOCATOR_SPACE).is_none() {
289 self.register_space(SpaceConfig::new(
290 HYPEREDGE_LOCATOR_SPACE,
291 "__hyperedge_locator__",
292 HYPEREDGE_LOCATOR_DIMS,
293 ))
294 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
295 }
296 Ok(())
297 }
298
299 fn lookup_edge_locator(
301 &mut self,
302 space: SpaceId,
303 id: HyperedgeId,
304 as_of: Option<RevisionId>,
305 ) -> io::Result<Option<DimensionVector>> {
306 self.ensure_locator_space()?;
307 let p = locator_point(space, id);
308 let records = self.query_bbox(HYPEREDGE_LOCATOR_SPACE, p.clone(), p, as_of)?;
309 for r in records {
310 if let Ok((point, _)) = decode_from_slice::<DimensionVector, _>(&r.data, standard()) {
311 return Ok(Some(point));
312 }
313 }
314 Ok(None)
315 }
316
317 fn tombstone_edge_locator(&mut self, space: SpaceId, id: HyperedgeId) -> io::Result<()> {
319 self.ensure_locator_space()?;
320 self.delete(HYPEREDGE_LOCATOR_SPACE, locator_point(space, id))?;
321 Ok(())
322 }
323
324 fn tombstone_hyperedge_index(&mut self, edge: &Hyperedge) -> io::Result<()> {
326 self.ensure_endpoint_index_space()?;
327 for ep in edge_endpoints(edge) {
328 let point = endpoint_index_point(ep, edge.id);
329 self.delete(ENDPOINT_INDEX_SPACE, point)?;
330 }
331 Ok(())
332 }
333
334 fn query_endpoint_index_ids(
336 &mut self,
337 prefix: &[u32],
338 as_of: Option<RevisionId>,
339 ) -> io::Result<Vec<HyperedgeId>> {
340 let records = self.query(ENDPOINT_INDEX_SPACE, as_of)?;
343 Ok(records
344 .iter()
345 .filter(|r| {
346 prefix
347 .iter()
348 .enumerate()
349 .all(|(i, &v)| r.address.point.coords.get(i) == Some(&v))
350 })
351 .filter_map(|r| decode_hyperedge_id(&r.data))
352 .collect())
353 }
354
355 fn fetch_hyperedge_by_id(
357 &mut self,
358 space: SpaceId,
359 id: HyperedgeId,
360 as_of: Option<RevisionId>,
361 ) -> io::Result<Option<Hyperedge>> {
362 let p = if self.uses_centroid_keying(space) {
363 match self.lookup_edge_locator(space, id, as_of)? {
364 Some(point) => point,
365 None => return Ok(None),
366 }
367 } else {
368 hyperedge_point(id)
369 };
370 let records = self.query_bbox(space, p.clone(), p, as_of)?;
371 for r in records {
372 if let Ok((edge, _)) = decode_from_slice::<Hyperedge, _>(&r.data, standard()) {
373 if edge.id == id {
374 return Ok(Some(edge));
375 }
376 }
377 }
378 Ok(None)
379 }
380
381 pub fn insert(
388 &mut self,
389 space: SpaceId,
390 point: DimensionVector,
391 data: Vec<u8>,
392 ) -> io::Result<RevisionId> {
393 let rev = self.next_revision();
394 let address = Address::new(space, point);
395 let entry = WalEntry::Write {
396 address: address.clone(),
397 revision: rev,
398 data: data.clone(),
399 };
400 self.wal.append(&entry)?;
401 #[cfg(feature = "sync")]
402 self.enqueue_sync(SyncOperation::Write {
403 address: address.clone(),
404 revision: rev,
405 data: data.clone(),
406 })?;
407 self.buffer.push(Record {
408 address,
409 revision: rev,
410 data,
411 tombstone: false,
412 });
413 if !self.defer_auto_flush && self.buffer.len() >= self.flush_threshold {
414 self.flush(space)?;
415 }
416 Ok(rev)
417 }
418
419 pub fn delete(&mut self, space: SpaceId, point: DimensionVector) -> io::Result<RevisionId> {
421 let rev = self.next_revision();
422 let address = Address::new(space, point);
423 let entry = WalEntry::Tombstone {
424 address: address.clone(),
425 revision: rev,
426 };
427 self.wal.append(&entry)?;
428 #[cfg(feature = "sync")]
429 self.enqueue_sync(SyncOperation::Tombstone {
430 address: address.clone(),
431 revision: rev,
432 })?;
433 self.buffer.push(Record {
434 address,
435 revision: rev,
436 data: vec![],
437 tombstone: true,
438 });
439 Ok(rev)
440 }
441
442 pub fn insert_hyperedge(
444 &mut self,
445 space: SpaceId,
446 mut edge: Hyperedge,
447 ) -> io::Result<RevisionId> {
448 edge.validate()
449 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
450 if self.edge_storage_point(space, &edge).1 {
451 self.ensure_locator_space()?;
452 }
453 let build_index = true;
454 self.ensure_endpoint_index_space()?;
455 let rows = self.prepare_hyperedge_writes(space, &edge, build_index)?;
456 let rev = self.apply_prepared_writes_strict(rows)?;
457 edge.valid_from = rev;
458 if !self.defer_auto_flush && self.buffer.len() >= self.flush_threshold {
459 self.flush(space)?;
460 }
461 #[cfg(feature = "sync")]
462 self.enqueue_sync(SyncOperation::WriteHyperedge {
463 space,
464 edge,
465 revision: rev,
466 })?;
467 Ok(rev)
468 }
469
470 pub fn delete_hyperedge(&mut self, space: SpaceId, id: HyperedgeId) -> io::Result<RevisionId> {
472 let edge = self.fetch_hyperedge_by_id(space, id, None)?;
473 let point = match &edge {
476 Some(e) => self.edge_storage_point(space, e).0,
477 None => hyperedge_point(id),
478 };
479 if let Some(e) = &edge {
480 self.tombstone_hyperedge_index(e)?;
481 }
482 let rev = self.next_revision();
483 let address = Address::new(space, point);
484 self.wal.append(&WalEntry::Tombstone {
485 address: address.clone(),
486 revision: rev,
487 })?;
488 self.buffer.push(Record {
489 address,
490 revision: rev,
491 data: vec![],
492 tombstone: true,
493 });
494 if edge.is_some() && self.uses_centroid_keying(space) {
495 self.tombstone_edge_locator(space, id)?;
496 }
497 #[cfg(feature = "sync")]
498 self.enqueue_sync(SyncOperation::DeleteHyperedge {
499 space,
500 edge_id: id,
501 revision: rev,
502 })?;
503 Ok(rev)
504 }
505
506 pub fn query_hyperedges(
508 &mut self,
509 space: SpaceId,
510 as_of: Option<RevisionId>,
511 ) -> io::Result<Vec<Hyperedge>> {
512 self.query(space, as_of)?
513 .into_iter()
514 .map(|r| {
515 decode_from_slice::<Hyperedge, _>(&r.data, standard())
516 .map(|(edge, _)| edge)
517 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
518 })
519 .collect()
520 }
521
522 pub fn query_hyperedges_for_endpoint(
527 &mut self,
528 space: SpaceId,
529 endpoint: &EndpointRef,
530 as_of: Option<RevisionId>,
531 ) -> io::Result<Vec<Hyperedge>> {
532 self.ensure_endpoint_index_space()?;
533 let prefix = endpoint_lookup_prefix(endpoint);
534 let ids = self.query_endpoint_index_ids(&prefix, as_of)?;
535 let rev_ceiling = as_of.unwrap_or_else(|| {
536 RevisionId(self.revision.load(Ordering::Relaxed))
537 });
538 let mut edges = Vec::new();
539 for id in ids {
540 if let Some(edge) = self.fetch_hyperedge_by_id(space, id, as_of)? {
541 if edge.is_active_at(rev_ceiling)
542 && edge.endpoints.iter().any(|ep| {
543 ep.space == endpoint.space && ep.node.coords == endpoint.node.coords
544 })
545 {
546 edges.push(edge);
547 }
548 }
549 }
550 Ok(edges)
551 }
552
553 pub fn traverse(&mut self, edge_space: SpaceId, spec: &TraversalSpec) -> io::Result<Subgraph> {
555 let rev_ceiling = spec.as_of.unwrap_or_else(|| {
556 RevisionId(self.revision.load(Ordering::Relaxed))
557 });
558 let mut subgraph = Subgraph::default();
559 subgraph.add_node(spec.start.clone());
560
561 let mut frontier: std::collections::VecDeque<(EndpointRef, usize)> =
562 std::collections::VecDeque::from([(spec.start.clone(), 0)]);
563 let mut visited: std::collections::HashSet<(u64, Vec<u32>)> =
564 std::collections::HashSet::from([(spec.start.space.0, spec.start.node.coords.clone())]);
565
566 while let Some((node, depth)) = frontier.pop_front() {
567 let incident = self.query_hyperedges_for_endpoint(edge_space, &node, spec.as_of)?;
568 for edge in incident {
569 if let Some(ref kinds) = spec.follow_kinds {
570 if !kinds.iter().any(|k| k.as_str() == edge.kind.as_str()) {
571 continue;
572 }
573 }
574 if !edge.is_active_at(rev_ceiling) {
575 continue;
576 }
577 subgraph.add_edge(edge.clone());
578 for ep in &edge.endpoints {
579 if ep.space == node.space && ep.node.coords == node.node.coords {
580 continue;
581 }
582 let next_depth = depth + 1;
583 if next_depth > spec.max_depth {
584 continue;
585 }
586 let key = (ep.space.0, ep.node.coords.clone());
587 if visited.insert(key) {
588 subgraph.add_node(ep.clone());
589 frontier.push_back((ep.clone(), next_depth));
590 }
591 }
592 }
593 }
594 Ok(subgraph)
595 }
596
597 pub fn query_hyperedges_by_kind(
599 &mut self,
600 space: SpaceId,
601 kind: &str,
602 as_of: Option<RevisionId>,
603 ) -> io::Result<Vec<Hyperedge>> {
604 let edges = self.query_hyperedges(space, as_of)?;
605 Ok(edges
606 .into_iter()
607 .filter(|e| e.kind.as_str() == kind)
608 .collect())
609 }
610
611 pub fn insert_hyperedge_typed<K: KindLabel>(
613 &mut self,
614 space: SpaceId,
615 id: HyperedgeId,
616 kind: K,
617 endpoints: Vec<AdapterEndpoint>,
618 weight_milli: Option<i64>,
619 metadata: std::collections::BTreeMap<String, String>,
620 valid_to: Option<RevisionId>,
621 catalog: Option<&KindCatalog>,
622 ) -> io::Result<RevisionId> {
623 let kind_label = kind.label().to_string();
624 if let Some(catalog) = catalog {
625 catalog
626 .validate_edge_kind(&kind_label)
627 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
628 for ep in &endpoints {
629 catalog
630 .validate_endpoint_role(&ep.role)
631 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
632 }
633 }
634 let edge = Hyperedge {
635 id,
636 kind: kind_label.into(),
637 endpoints: endpoints.into_iter().map(EndpointRef::from).collect(),
638 weight_milli,
639 metadata,
640 valid_from: RevisionId::ZERO,
641 valid_to,
642 };
643 self.insert_hyperedge(space, edge)
644 }
645
646 pub fn query_hyperedges_by_kind_typed<K: KindLabel>(
648 &mut self,
649 space: SpaceId,
650 kind: K,
651 as_of: Option<RevisionId>,
652 ) -> io::Result<Vec<Hyperedge>> {
653 self.query_hyperedges_by_kind(space, kind.label(), as_of)
654 }
655
656 pub fn insert_signal_sample(
658 &mut self,
659 space: SpaceId,
660 sample: SignalSample,
661 ) -> io::Result<RevisionId> {
662 sample.validate()
663 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
664 let full_coords = sample
665 .scope
666 .address_coords(&sample.local_coords)
667 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{:?}", e)))?;
668 if let Some(cfg) = self.spaces.get(space) {
669 if cfg.dims != full_coords.len() {
670 return Err(io::Error::new(
671 io::ErrorKind::InvalidInput,
672 "signal coordinates do not match space dimensions",
673 ));
674 }
675 }
676 let data = encode_to_vec(&sample, standard())
677 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
678 let rev = self.next_revision();
679 let address = Address::new(space, DimensionVector::new(full_coords));
680 self.wal.append(&WalEntry::Write {
681 address: address.clone(),
682 revision: rev,
683 data: data.clone(),
684 })?;
685 self.buffer.push(Record {
686 address,
687 revision: rev,
688 data,
689 tombstone: false,
690 });
691 if !self.defer_auto_flush && self.buffer.len() >= self.flush_threshold {
692 self.flush(space)?;
693 }
694 #[cfg(feature = "sync")]
695 self.enqueue_sync(SyncOperation::WriteSignal {
696 space,
697 sample,
698 revision: rev,
699 })?;
700 Ok(rev)
701 }
702
703 pub fn insert_signal_sample_typed<SB: SpaceBinding, K: KindLabel>(
705 &mut self,
706 signal_id: crate::infinitedb_core::signal::SignalId,
707 kind: K,
708 parent_prefix: DimensionVector,
709 local_coords: Vec<u32>,
710 value_milli: i64,
711 source_revision: Option<RevisionId>,
712 constraint: Option<crate::infinitedb_core::signal::SignalConstraint>,
713 catalog: Option<&KindCatalog>,
714 ) -> io::Result<RevisionId> {
715 if let Some(cfg) = self.spaces.get(SB::SPACE_ID) {
716 if cfg.dims != SB::DIMS {
717 return Err(io::Error::new(
718 io::ErrorKind::InvalidInput,
719 format!(
720 "SpaceBinding dims mismatch for space {}: trait={}, registry={}",
721 SB::SPACE_ID.0,
722 SB::DIMS,
723 cfg.dims
724 ),
725 ));
726 }
727 } else {
728 return Err(io::Error::new(
729 io::ErrorKind::InvalidInput,
730 format!("SpaceBinding refers to unregistered space {}", SB::SPACE_ID.0),
731 ));
732 }
733 let kind_label = kind.label().to_string();
734 if let Some(catalog) = catalog {
735 catalog
736 .validate_signal_kind(&kind_label)
737 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
738 }
739 let sample = crate::infinitedb_core::signal::SignalSample {
740 signal_id,
741 kind: kind_label.into(),
742 scope: crate::infinitedb_core::signal::SignalScope {
743 parent_prefix,
744 total_dims: SB::DIMS,
745 },
746 local_coords,
747 value_milli,
748 source_revision,
749 constraint,
750 };
751 self.insert_signal_sample(SB::SPACE_ID, sample)
752 }
753
754 pub fn query_signal_scope(
756 &mut self,
757 space: SpaceId,
758 parent_coords: &[u32],
759 as_of: Option<RevisionId>,
760 ) -> io::Result<Vec<SignalSample>> {
761 let rows = self.query_subscope(space, parent_coords, as_of)?;
762 rows.into_iter()
763 .map(|r| {
764 decode_from_slice::<SignalSample, _>(&r.data, standard())
765 .map(|(sample, _)| sample)
766 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
767 })
768 .collect()
769 }
770
771 pub fn query_signal_range(
773 &mut self,
774 space: SpaceId,
775 parent_coords: &[u32],
776 min_local: &[u32],
777 max_local: &[u32],
778 as_of: Option<RevisionId>,
779 ) -> io::Result<Vec<SignalSample>> {
780 if min_local.len() != max_local.len() {
781 return Err(io::Error::new(
782 io::ErrorKind::InvalidInput,
783 "min_local and max_local dimensions differ",
784 ));
785 }
786 let mut min = parent_coords.to_vec();
787 min.extend_from_slice(min_local);
788 let mut max = parent_coords.to_vec();
789 max.extend_from_slice(max_local);
790 let rows = self.query_bbox(
791 space,
792 DimensionVector::new(min),
793 DimensionVector::new(max),
794 as_of,
795 )?;
796 rows.into_iter()
797 .map(|r| {
798 decode_from_slice::<SignalSample, _>(&r.data, standard())
799 .map(|(sample, _)| sample)
800 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
801 })
802 .collect()
803 }
804
805 pub fn flush(&mut self, space: SpaceId) -> io::Result<()> {
807 if self.buffer.is_empty() {
808 return Ok(());
809 }
810
811 let mut remaining = Vec::new();
813 let mut records: Vec<Record> = Vec::new();
814 for record in self.buffer.drain(..) {
815 if record.address.space == space {
816 records.push(record);
817 } else {
818 remaining.push(record);
819 }
820 }
821 self.buffer = remaining;
822
823 if records.is_empty() {
824 return Ok(());
825 }
826
827 records.sort_by_key(|r| {
829 let key = self.space_key(space, &r.address.point);
830 (key, r.revision.0)
831 });
832
833 let min_rev = records.iter().map(|r| r.revision).min().unwrap_or(RevisionId::ZERO);
834 let max_rev = records.iter().map(|r| r.revision).max().unwrap_or(RevisionId::ZERO);
835 let block_id = self.alloc_block_id();
836
837 let mut block = Block {
838 id: block_id,
839 space,
840 records,
841 min_revision: min_rev,
842 max_revision: max_rev,
843 checksum: [0u8; 32],
844 };
845 block.checksum = compute_checksum(&block)?;
846
847 self.store.write_block(&block)?;
849
850 let snap_id = self.alloc_snapshot_id();
852 self.wal.append(&WalEntry::BlockSealed {
853 block_id,
854 space,
855 snapshot: snap_id,
856 })?;
857
858 let hilbert_min = block
862 .records
863 .first()
864 .map(|r| self.space_key(space, &r.address.point))
865 .unwrap_or(0);
866 let hilbert_max = block
867 .records
868 .last()
869 .map(|r| self.space_key(space, &r.address.point))
870 .unwrap_or(hilbert_min);
871
872 let snapshot = self.snapshots.entry(space.0).or_insert_with(|| {
874 Snapshot::root(snap_id, space)
875 });
876 snapshot.blocks.insert(
877 hilbert_min,
878 BlockIndexEntry { block_id, max_key: hilbert_max },
879 );
880 snapshot.revision = max_rev;
881
882 self.persist_meta()?;
883 self.wal.sync()?;
884
885 self.rewrite_wal_from_buffer(max_rev)?;
890 Ok(())
891 }
892
893 fn rewrite_wal_from_buffer(&mut self, checkpoint: RevisionId) -> io::Result<()> {
895 let mut entries: Vec<WalEntry> = self
896 .buffer
897 .iter()
898 .map(|r| {
899 if r.tombstone {
900 WalEntry::Tombstone { address: r.address.clone(), revision: r.revision }
901 } else {
902 WalEntry::Write {
903 address: r.address.clone(),
904 revision: r.revision,
905 data: r.data.clone(),
906 }
907 }
908 })
909 .collect();
910 entries.push(WalEntry::Checkpoint { revision: checkpoint });
911 self.wal.rewrite(&entries)
912 }
913
914 pub fn current_snapshot(&self, space: SpaceId) -> Option<SnapshotId> {
920 self.snapshots.get(&space.0).map(|s| s.id)
921 }
922
923 pub fn query(
926 &mut self,
927 space: SpaceId,
928 as_of: Option<RevisionId>,
929 ) -> io::Result<Vec<Record>> {
930 self.query_inner(space, None, as_of, false)
931 }
932
933 pub fn execute(&mut self, q: &Query) -> io::Result<Vec<Record>> {
944 if let Some(current) = self.current_snapshot(q.space) {
945 if current != q.snapshot {
946 return Err(io::Error::new(
947 io::ErrorKind::InvalidInput,
948 "query snapshot does not match the current snapshot for this space",
949 ));
950 }
951 }
952
953 let key_range = match q.key_range {
954 Some(kr) => Some(kr),
955 None => q.range.as_ref().map(|r| {
956 let ka = self.space_key(q.space, &r.min);
957 let kb = self.space_key(q.space, &r.max);
958 if ka <= kb { (ka, kb) } else { (kb, ka) }
959 }),
960 };
961
962 let mut results = self.query_inner(q.space, key_range, q.as_of, q.include_tombstones)?;
963
964 if let Some(range) = &q.range {
965 results.retain(|r| r.address.point.within(&range.min, &range.max));
966 }
967
968 Ok(results)
969 }
970
971 pub fn query_bbox(
984 &mut self,
985 space: SpaceId,
986 min: DimensionVector,
987 max: DimensionVector,
988 as_of: Option<RevisionId>,
989 ) -> io::Result<Vec<Record>> {
990 assert_eq!(min.dims(), max.dims(), "min and max must have equal dimensions");
991 let k_min = self.space_key(space, &min);
994 let k_max = self.space_key(space, &max);
995 let (lo, hi) = if k_min <= k_max { (k_min, k_max) } else { (k_max, k_min) };
996 let mut results = self.query_inner(space, Some((lo, hi)), as_of, false)?;
997 results.retain(|r| r.address.point.within(&min, &max));
999 Ok(results)
1000 }
1001
1002 pub fn query_subscope(
1012 &mut self,
1013 space: SpaceId,
1014 parent_coords: &[u32],
1015 as_of: Option<RevisionId>,
1016 ) -> io::Result<Vec<Record>> {
1017 let dims = self.spaces.get(space)
1019 .map(|c| c.dims)
1020 .unwrap_or(parent_coords.len() + 1);
1021 assert!(
1022 parent_coords.len() <= dims,
1023 "parent_coords has more dimensions than the space"
1024 );
1025 let inner_dims = dims - parent_coords.len();
1026 let mut min_coords: Vec<u32> = parent_coords.to_vec();
1027 let mut max_coords: Vec<u32> = parent_coords.to_vec();
1028 min_coords.extend(std::iter::repeat(0).take(inner_dims));
1029 max_coords.extend(std::iter::repeat(u32::MAX).take(inner_dims));
1030 self.query_bbox(
1031 space,
1032 DimensionVector::new(min_coords),
1033 DimensionVector::new(max_coords),
1034 as_of,
1035 )
1036 }
1037
1038 fn query_inner(
1040 &mut self,
1041 space: SpaceId,
1042 key_range: Option<(u128, u128)>,
1043 as_of: Option<RevisionId>,
1044 include_tombstones: bool,
1045 ) -> io::Result<Vec<Record>> {
1046 let rev_ceiling = as_of.unwrap_or_else(|| {
1047 RevisionId(self.revision.load(Ordering::Relaxed))
1048 });
1049
1050 let mut results: Vec<Record> = Vec::new();
1051
1052 let mut tombstoned: std::collections::HashSet<_> = self
1053 .buffer
1054 .iter()
1055 .filter(|r| r.address.space == space && r.tombstone && r.revision <= rev_ceiling)
1056 .map(|r| r.address.point.coords.clone())
1057 .collect();
1058
1059 if let Some(snapshot) = self.snapshots.get(&space.0) {
1061 let block_ids: Vec<BlockId> = match key_range {
1062 None => snapshot.blocks.values().map(|e| e.block_id).collect(),
1063 Some((lo, hi)) => {
1064 snapshot
1069 .blocks
1070 .iter()
1071 .filter(|(min_key, entry)| **min_key <= hi && entry.max_key >= lo)
1072 .map(|(_, entry)| entry.block_id)
1073 .collect()
1074 }
1075 };
1076 for block_id in &block_ids {
1077 let block = self.store.read_block(*block_id)?;
1078 for record in &block.records {
1079 if record.address.space == space
1080 && record.tombstone
1081 && record.revision <= rev_ceiling
1082 {
1083 tombstoned.insert(record.address.point.coords.clone());
1084 }
1085 }
1086 }
1087 for block_id in block_ids {
1088 let block = self.store.read_block(block_id)?;
1089 for record in block.records {
1090 if record.revision > rev_ceiling {
1091 continue;
1092 }
1093 if !include_tombstones && record.tombstone {
1094 continue;
1095 }
1096 if let Some((lo, hi)) = key_range {
1098 if lo == hi {
1099 let k = self.space_key(space, &record.address.point);
1100 if k != lo {
1101 continue;
1102 }
1103 }
1104 }
1105 results.push(record);
1106 }
1107 }
1108 if !include_tombstones {
1109 results.retain(|r| !tombstoned.contains(&r.address.point.coords));
1110 }
1111 }
1112
1113 for record in &self.buffer {
1115 let visible = record.address.space == space
1116 && record.revision <= rev_ceiling
1117 && (include_tombstones || !record.tombstone)
1118 && (include_tombstones || !tombstoned.contains(&record.address.point.coords));
1119 if visible {
1120 if let Some((lo, hi)) = key_range {
1121 let k = self.space_key(space, &record.address.point);
1122 if k < lo || k > hi {
1123 continue;
1124 }
1125 }
1126 results.push(record.clone());
1127 }
1128 }
1129
1130 Ok(results)
1131 }
1132
1133 pub fn branch_id(&self, name: &str) -> Option<BranchId> {
1139 self.branches.get_by_name(name).map(|b| b.id)
1140 }
1141
1142 pub fn create_branch(
1144 &mut self,
1145 name: impl Into<String>,
1146 from: BranchId,
1147 ) -> Result<BranchId, String> {
1148 let parent = self.branches.get(from).ok_or("Branch not found")?;
1149 let new_id = BranchId(self.next_branch_id.fetch_add(1, Ordering::Relaxed));
1150 let rev = RevisionId(self.revision.load(Ordering::Relaxed));
1151 let branch = Branch {
1152 id: new_id,
1153 name: name.into(),
1154 head: parent.head,
1155 parent: Some(from),
1156 forked_at: rev,
1157 };
1158 self.branches.insert(branch).map_err(|e| format!("{:?}", e))?;
1159 self.persist_meta().map_err(|e| e.to_string())?;
1160 Ok(new_id)
1161 }
1162
1163 pub fn memory_stats(&self) -> MemoryStats {
1169 let buffer_records = self.buffer.len();
1170 let buffer_bytes: usize = self.buffer.iter()
1171 .map(|r| 48 + r.data.len())
1172 .sum();
1173 let (cache_bytes, cache_blocks) = self.store.cache_stats();
1174 let snapshot_entries: usize = self.snapshots.values()
1175 .map(|s| s.blocks.len())
1176 .sum();
1177 MemoryStats {
1178 buffer_records,
1179 buffer_bytes,
1180 cache_bytes,
1181 cache_blocks,
1182 snapshot_index_entries: snapshot_entries,
1183 total_revision: self.revision.load(Ordering::Relaxed),
1184 sealed_blocks: self.next_block_id.load(Ordering::Relaxed),
1185 }
1186 }
1187
1188 pub fn snapshot_for_space(&self, space: SpaceId) -> Option<Snapshot> {
1194 self.snapshots.get(&space.0).cloned()
1195 }
1196
1197 pub fn revision(&self) -> u64 {
1199 self.revision.load(Ordering::Relaxed)
1200 }
1201
1202 pub fn read_block(&mut self, id: BlockId) -> io::Result<Block> {
1204 self.store.read_block(id)
1205 }
1206
1207 pub fn compact_space(
1209 &mut self,
1210 space: SpaceId,
1211 config: &CompactionConfig,
1212 ) -> io::Result<CompactionResult> {
1213 let snapshot = self
1214 .snapshots
1215 .get(&space.0)
1216 .cloned()
1217 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no snapshot for space"))?;
1218
1219 let input_blocks: Vec<Block> = snapshot
1220 .blocks
1221 .values()
1222 .map(|e| self.store.read_block(e.block_id))
1223 .collect::<io::Result<_>>()?;
1224
1225 if input_blocks.len() <= 1 {
1226 return Ok(CompactionResult {
1227 new_blocks: vec![],
1228 superseded: vec![],
1229 });
1230 }
1231
1232 let result = compact(
1233 input_blocks,
1234 config,
1235 snapshot.id,
1236 || self.alloc_block_id(),
1237 );
1238
1239 let mut new_blocks = Vec::new();
1240 for mut block in result.new_blocks {
1241 let mut recs = block.records.clone();
1242 recs.sort_by_key(|r| self.space_key(space, &r.address.point));
1243 block.records = recs;
1244 block.checksum = compute_checksum(&block)?;
1245 self.store.write_block(&block)?;
1246 new_blocks.push(block);
1247 }
1248
1249 let mut updated = Snapshot::root(snapshot.id, space);
1250 updated.revision = snapshot.revision;
1251 for block in &new_blocks {
1252 let min_key = block
1253 .records
1254 .first()
1255 .map(|r| self.space_key(space, &r.address.point))
1256 .unwrap_or(0);
1257 let max_key = block
1258 .records
1259 .last()
1260 .map(|r| self.space_key(space, &r.address.point))
1261 .unwrap_or(min_key);
1262 updated.blocks.insert(
1263 min_key,
1264 BlockIndexEntry {
1265 block_id: block.id,
1266 max_key,
1267 },
1268 );
1269 }
1270 self.snapshots.insert(space.0, updated);
1271
1272 let live: Vec<Snapshot> = self.snapshots.values().cloned().collect();
1273 let deletable = safe_to_delete(&result.superseded, &live);
1274 self.gc_blocks(&deletable)?;
1275
1276 let rev = RevisionId(self.revision.load(Ordering::Relaxed));
1277 self.wal.append(&WalEntry::Checkpoint { revision: rev })?;
1278 self.persist_meta()?;
1279 Ok(CompactionResult {
1280 new_blocks,
1281 superseded: result.superseded,
1282 })
1283 }
1284
1285 pub fn gc_blocks(&mut self, superseded: &[BlockId]) -> io::Result<usize> {
1287 let live: Vec<Snapshot> = self.snapshots.values().cloned().collect();
1288 let deletable = safe_to_delete(superseded, &live);
1289 for id in &deletable {
1290 self.store.delete_block(*id)?;
1291 }
1292 Ok(deletable.len())
1293 }
1294
1295 #[cfg(feature = "sync")]
1296 pub fn snapshot_merkle(&mut self, space: SpaceId) -> io::Result<merkle::MerkleTree> {
1298 let snapshot = self
1299 .snapshots
1300 .get(&space.0)
1301 .cloned()
1302 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no snapshot for space"))?;
1303
1304 let mut leaves = Vec::new();
1305 for (_min_key, entry) in &snapshot.blocks {
1306 let block = self.store.read_block(entry.block_id)?;
1307 let mut recs = block.records;
1308 recs.sort_by_key(|r| (self.space_key(space, &r.address.point), r.revision.0));
1309 for record in &recs {
1310 let encoded = encode_to_vec(record, standard())
1311 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1312 leaves.push(merkle::hash_record(&encoded));
1313 }
1314 }
1315 Ok(merkle::MerkleTree::build(&leaves))
1316 }
1317
1318 #[cfg(feature = "sync")]
1319 pub fn apply_delta(&mut self, space: SpaceId, delta: &Delta) -> io::Result<()> {
1321 for block in &delta.added_blocks {
1322 self.store.write_block(block)?;
1323 }
1324 let current = self
1325 .snapshots
1326 .get(&space.0)
1327 .cloned()
1328 .unwrap_or_else(|| Snapshot::root(delta.target_snapshot, space));
1329 let updated = delta.apply(¤t);
1330 self.snapshots.insert(space.0, updated);
1331 self.gc_blocks(&delta.removed_block_ids)?;
1332 self.persist_meta()?;
1333 Ok(())
1334 }
1335
1336 #[cfg(feature = "sync")]
1342 pub fn sync_now(
1343 &mut self,
1344 transport: &dyn SyncTransport,
1345 max_batch: usize,
1346 ) -> io::Result<SyncReport> {
1347 let mut state = self
1348 .outbox_state
1349 .lock()
1350 .map_err(|_| io::Error::new(io::ErrorKind::Other, "sync outbox lock poisoned"))?;
1351 let report = state.process_once(transport, max_batch);
1352 if report.attempted > 0 {
1353 save_outbox(&self.outbox_path, &state)?;
1354 }
1355 Ok(report)
1356 }
1357
1358 #[cfg(feature = "sync")]
1360 pub fn start_background_sync(
1361 &mut self,
1362 transport: Arc<dyn SyncTransport>,
1363 interval: Duration,
1364 batch_size: usize,
1365 ) -> io::Result<()> {
1366 self.stop_background_sync();
1367 let worker = BackgroundSyncWorker::start(
1368 Arc::clone(&self.outbox_state),
1369 self.outbox_path.clone(),
1370 transport,
1371 interval,
1372 batch_size,
1373 )?;
1374 self.sync_worker = Some(worker);
1375 Ok(())
1376 }
1377
1378 #[cfg(feature = "sync")]
1380 pub fn stop_background_sync(&mut self) {
1381 if let Some(mut worker) = self.sync_worker.take() {
1382 worker.stop();
1383 }
1384 }
1385
1386 #[cfg(feature = "sync")]
1388 pub fn sync_pending_count(&self) -> usize {
1389 self.outbox_state
1390 .lock()
1391 .map(|s| s.pending_count())
1392 .unwrap_or(0)
1393 }
1394
1395 #[cfg(feature = "sync")]
1397 pub fn last_successful_sync_at_ms(&self) -> Option<u64> {
1398 self.outbox_state
1399 .lock()
1400 .ok()
1401 .and_then(|s| s.last_success_at_ms)
1402 }
1403
1404 #[cfg(feature = "sync")]
1406 pub fn last_sync_error(&self) -> Option<String> {
1407 self.outbox_state
1408 .lock()
1409 .ok()
1410 .and_then(|s| s.last_error.clone())
1411 }
1412
1413 fn next_revision(&self) -> RevisionId {
1418 RevisionId(self.revision.fetch_add(1, Ordering::Relaxed) + 1)
1419 }
1420
1421 fn alloc_block_id(&self) -> BlockId {
1422 BlockId(self.next_block_id.fetch_add(1, Ordering::Relaxed))
1423 }
1424
1425 fn alloc_snapshot_id(&self) -> SnapshotId {
1426 SnapshotId(self.next_snapshot_id.fetch_add(1, Ordering::Relaxed))
1427 }
1428
1429 fn apply_wal_entry(&mut self, entry: WalEntry) -> io::Result<()> {
1430 match entry {
1431 WalEntry::Write { address, revision, data } => {
1432 self.buffer.push(Record { address, revision, data, tombstone: false });
1433 }
1434 WalEntry::Tombstone { address, revision } => {
1435 self.buffer.push(Record { address, revision, data: vec![], tombstone: true });
1436 }
1437 WalEntry::BlockSealed { block_id, space, snapshot } => {
1438 self.reconcile_sealed_block(block_id, space, snapshot)?;
1439 }
1440 WalEntry::Checkpoint { .. } => {}
1441 }
1442 Ok(())
1443 }
1444
1445 fn reconcile_sealed_block(
1457 &mut self,
1458 block_id: BlockId,
1459 space: SpaceId,
1460 snapshot_id: SnapshotId,
1461 ) -> io::Result<()> {
1462 let block = match self.store.read_block(block_id) {
1463 Ok(block) => block,
1464 Err(_) => return Ok(()),
1465 };
1466
1467 let min_key = block
1468 .records
1469 .first()
1470 .map(|r| self.space_key(space, &r.address.point))
1471 .unwrap_or(0);
1472 let max_key = block
1473 .records
1474 .last()
1475 .map(|r| self.space_key(space, &r.address.point))
1476 .unwrap_or(min_key);
1477 let block_max_rev = block.max_revision;
1478
1479 let sealed: std::collections::HashSet<(Vec<u32>, u64)> = block
1480 .records
1481 .iter()
1482 .map(|r| (r.address.point.coords.clone(), r.revision.0))
1483 .collect();
1484
1485 let snapshot = self
1486 .snapshots
1487 .entry(space.0)
1488 .or_insert_with(|| Snapshot::root(snapshot_id, space));
1489 snapshot
1490 .blocks
1491 .insert(min_key, BlockIndexEntry { block_id, max_key });
1492 if snapshot.revision < block_max_rev {
1493 snapshot.revision = block_max_rev;
1494 }
1495
1496 self.buffer
1497 .retain(|r| !sealed.contains(&(r.address.point.coords.clone(), r.revision.0)));
1498 Ok(())
1499 }
1500
1501 fn persist_meta(&mut self) -> io::Result<()> {
1502 let spaces_bytes = encode_to_vec(&self.spaces, standard())
1504 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1505 self.store.write_meta("spaces.bin", &spaces_bytes)?;
1506
1507 let branches_bytes = encode_to_vec(&self.branches, standard())
1509 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1510 self.store.write_meta("branches.bin", &branches_bytes)?;
1511
1512 let snapshots_bytes = encode_to_vec(&self.snapshots, standard())
1515 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1516 self.store.write_meta("snapshots.bin", &snapshots_bytes)?;
1517
1518 let counters: [u64; 4] = [
1520 self.revision.load(Ordering::Relaxed),
1521 self.next_block_id.load(Ordering::Relaxed),
1522 self.next_snapshot_id.load(Ordering::Relaxed),
1523 self.next_branch_id.load(Ordering::Relaxed),
1524 ];
1525 let counters_bytes = encode_to_vec(&counters, standard())
1526 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
1527 self.store.write_meta("counters.bin", &counters_bytes)?;
1528
1529 Ok(())
1530 }
1531
1532 #[cfg(feature = "sync")]
1533 pub(super) fn enqueue_sync(&mut self, op: SyncOperation) -> io::Result<()> {
1534 let mut state = self
1535 .outbox_state
1536 .lock()
1537 .map_err(|_| io::Error::new(io::ErrorKind::Other, "sync outbox lock poisoned"))?;
1538 state.enqueue(op);
1539 save_outbox(&self.outbox_path, &state)
1540 }
1541}
1542
1543#[cfg(feature = "sync")]
1544impl Drop for InfiniteDb {
1545 fn drop(&mut self) {
1546 self.stop_background_sync();
1547 }
1548}
1549
1550fn recover_wal(wal_path: &PathBuf) -> io::Result<Vec<WalEntry>> {
1555 if !wal_path.exists() {
1556 return Ok(vec![]);
1557 }
1558 let mut reader = crate::infinitedb_storage::wal::WalReader::open(wal_path.clone())?;
1559 reader.entries()
1560}
1561
1562#[derive(Debug, Clone)]
1568pub struct MemoryStats {
1569 pub buffer_records: usize,
1571 pub buffer_bytes: usize,
1573 pub cache_bytes: usize,
1575 pub cache_blocks: usize,
1577 pub snapshot_index_entries: usize,
1579 pub total_revision: u64,
1581 pub sealed_blocks: u64,
1583}
1584
1585impl MemoryStats {
1586 pub fn total_ram_bytes(&self) -> usize {
1588 self.buffer_bytes
1589 + self.cache_bytes
1590 + self.snapshot_index_entries * 24
1592 + 4096
1594 }
1595
1596 pub fn print(&self) {
1598 println!("\n╔═══ InfiniteDb Memory Stats ═══╗");
1599 println!("║ Write buffer {:>6} records ({} bytes)",
1600 self.buffer_records, fmt_bytes(self.buffer_bytes));
1601 println!("║ LRU block cache {:>6} blocks ({} bytes / 10 MB limit)",
1602 self.cache_blocks, fmt_bytes(self.cache_bytes));
1603 println!("║ Snapshot index {:>6} entries", self.snapshot_index_entries);
1604 println!("║ Total revisions {:>6}", self.total_revision);
1605 println!("║ Sealed blocks {:>6}", self.sealed_blocks);
1606 println!("║ ──────────────────────────────────────────────");
1607 println!("║ Est. total RAM {}", fmt_bytes(self.total_ram_bytes()));
1608 println!("╚════════════════════════════════");
1609 }
1610}
1611
1612fn fmt_bytes(b: usize) -> String {
1613 if b < 1024 { format!("{} B", b) }
1614 else if b < 1024 * 1024 { format!("{:.1} KB", b as f64 / 1024.0) }
1615 else { format!("{:.2} MB", b as f64 / (1024.0 * 1024.0)) }
1616}
1617
1618#[allow(clippy::type_complexity)]
1625fn load_meta(store: &BlockStore) -> Option<MetaTuple> {
1626 let counters_bytes = store.read_meta("counters.bin").ok()?;
1627 let (revision, next_block, next_snapshot, next_branch) =
1630 match decode_from_slice::<[u64; 4], _>(&counters_bytes, standard()) {
1631 Ok((c, _)) => (c[0], c[1], c[2], c[3]),
1632 Err(_) => {
1633 let (c, _): ([u64; 3], _) =
1634 decode_from_slice(&counters_bytes, standard()).ok()?;
1635 (c[0], c[1], c[2], 2)
1636 }
1637 };
1638
1639 let spaces_bytes = store.read_meta("spaces.bin").ok()?;
1640 let (spaces, _): (SpaceRegistry, _) = decode_from_slice(&spaces_bytes, standard()).ok()?;
1641
1642 let branches = store
1645 .read_meta("branches.bin")
1646 .ok()
1647 .and_then(|b| decode_from_slice::<BranchRegistry, _>(&b, standard()).ok())
1648 .map(|(r, _)| r)
1649 .unwrap_or_else(BranchRegistry::new);
1650
1651 let snapshots = store
1652 .read_meta("snapshots.bin")
1653 .ok()
1654 .and_then(|b| decode_from_slice::<BTreeMap<u64, Snapshot>, _>(&b, standard()).ok())
1655 .map(|(m, _)| m)
1656 .unwrap_or_default();
1657
1658 Some((spaces, branches, snapshots, revision, next_block, next_snapshot, next_branch))
1659}
1660
1661type MetaTuple = (
1664 SpaceRegistry,
1665 BranchRegistry,
1666 BTreeMap<u64, Snapshot>,
1667 u64,
1668 u64,
1669 u64,
1670 u64,
1671);
1672
1673fn default_meta() -> MetaTuple {
1674 (
1675 SpaceRegistry::new(),
1676 BranchRegistry::new(),
1677 BTreeMap::new(),
1678 0,
1679 1,
1680 1,
1681 2,
1682 )
1683}
1684
1685fn hyperedge_point(id: HyperedgeId) -> DimensionVector {
1686 DimensionVector::new(vec![(id.0 >> 32) as u32, (id.0 & 0xFFFF_FFFF) as u32])
1687}
1688
1689pub(super) const HYPEREDGE_LOCATOR_SPACE: SpaceId = SpaceId(u64::MAX - 2);
1691const HYPEREDGE_LOCATOR_DIMS: usize = 4;
1693
1694pub(super) fn locator_point(space: SpaceId, id: HyperedgeId) -> DimensionVector {
1696 DimensionVector::new(vec![
1697 (space.0 >> 32) as u32,
1698 (space.0 & 0xFFFF_FFFF) as u32,
1699 (id.0 >> 32) as u32,
1700 (id.0 & 0xFFFF_FFFF) as u32,
1701 ])
1702}
1703
1704fn centroid_hyperedge_point(edge: &Hyperedge) -> Option<DimensionVector> {
1711 let (_space, centroid) = edge.endpoint_centroid()?;
1712 let mut coords = centroid;
1714 coords.truncate(14);
1715 coords.push((edge.id.0 >> 32) as u32);
1716 coords.push((edge.id.0 & 0xFFFF_FFFF) as u32);
1717 Some(DimensionVector::new(coords))
1718}
1719
1720#[cfg(test)]
1725mod tests {
1726 use super::*;
1727 use tempfile::TempDir;
1728 use crate::infinitedb_core::address::{DimensionVector, SpaceId};
1729 use crate::infinitedb_core::adapter::{AdapterEndpoint, KindLabel, SpaceBinding};
1730 use crate::infinitedb_core::branch::BranchId;
1731 use crate::infinitedb_core::hyperedge::{EndpointRef, EndpointRole, Hyperedge, HyperedgeId, HyperedgeKind};
1732 use crate::infinitedb_core::kind_catalog::{KindCatalog, KindDefinition, UnknownKindPolicy};
1733 use crate::infinitedb_core::signal::{SignalId, SignalKind, SignalSample, SignalScope};
1734 use crate::infinitedb_core::space::SpaceConfig;
1735 #[cfg(feature = "sync")]
1736 use std::sync::Arc;
1737 #[cfg(feature = "sync")]
1738 use crate::infinitedb_sync::transport::{SyncEnvelope, SyncResult, SyncTransport};
1739
1740 fn open_tmp() -> (InfiniteDb, TempDir) {
1741 let dir = TempDir::new().unwrap();
1742 let db = InfiniteDb::open(dir.path()).unwrap();
1743 (db, dir)
1744 }
1745
1746 enum BeamKinds {
1747 BearsOn,
1748 BendingMoment,
1749 }
1750
1751 impl KindLabel for BeamKinds {
1752 fn label(&self) -> &str {
1753 match self {
1754 BeamKinds::BearsOn => "beam.bears_on",
1755 BeamKinds::BendingMoment => "beam.bending_moment",
1756 }
1757 }
1758 }
1759
1760 struct BeamSignalSpace;
1761 impl SpaceBinding for BeamSignalSpace {
1762 const SPACE_ID: SpaceId = SpaceId(88);
1763 const DIMS: usize = 3;
1764 const SPACE_NAME: &'static str = "beam_signals";
1765 }
1766
1767 #[test]
1768 fn insert_and_query_unflushed() {
1769 let (mut db, _dir) = open_tmp();
1770 let space = SpaceId(1);
1771 db.insert(space, DimensionVector::new(vec![10, 20]), vec![1, 2, 3]).unwrap();
1772 let results = db.query(space, None).unwrap();
1773 assert_eq!(results.len(), 1);
1774 }
1775
1776 #[test]
1777 fn insert_flush_query() {
1778 let (mut db, _dir) = open_tmp();
1779 let space = SpaceId(1);
1780 db.insert(space, DimensionVector::new(vec![5, 5]), vec![42]).unwrap();
1781 db.flush(space).unwrap();
1782 let results = db.query(space, None).unwrap();
1783 assert_eq!(results.len(), 1);
1784 assert_eq!(results[0].data, vec![42]);
1785 }
1786
1787 #[test]
1788 fn flush_records_block_key_interval() {
1789 let (mut db, _dir) = open_tmp();
1790 let space = SpaceId(1);
1791 let p_lo = DimensionVector::new(vec![1, 1]);
1792 let p_hi = DimensionVector::new(vec![200, 200]);
1793 db.insert(space, p_lo.clone(), vec![1]).unwrap();
1794 db.insert(space, p_hi.clone(), vec![2]).unwrap();
1795 db.flush(space).unwrap();
1796
1797 let snapshot = db.snapshots.get(&space.0).unwrap();
1798 assert_eq!(snapshot.blocks.len(), 1);
1799 let (min_key, entry) = snapshot.blocks.iter().next().unwrap();
1800 let ka = hilbert_key_standard(&p_lo);
1801 let kb = hilbert_key_standard(&p_hi);
1802 assert_eq!(*min_key, ka.min(kb));
1803 assert_eq!(entry.max_key, ka.max(kb));
1804 }
1805
1806 #[test]
1807 fn range_pruning_skips_non_overlapping_blocks() {
1808 let (mut db, _dir) = open_tmp();
1809 let space = SpaceId(1);
1810 let points = [
1812 DimensionVector::new(vec![1, 1]),
1813 DimensionVector::new(vec![120, 30]),
1814 DimensionVector::new(vec![250, 200]),
1815 ];
1816 for (i, p) in points.iter().enumerate() {
1817 db.insert(space, p.clone(), vec![i as u8]).unwrap();
1818 db.flush(space).unwrap();
1819 }
1820 assert_eq!(db.snapshots.get(&space.0).unwrap().blocks.len(), 3);
1821
1822 let k_mid = hilbert_key_standard(&points[1]);
1824 let results = db.query_inner(space, Some((k_mid, k_mid)), None, false).unwrap();
1825 assert_eq!(results.len(), 1);
1826 assert_eq!(results[0].address.point, points[1]);
1827 }
1828
1829 #[test]
1830 fn execute_range_matches_query_bbox() {
1831 use crate::infinitedb_core::query::{Query, SpatialRange};
1832 let (mut db, _dir) = open_tmp();
1833 let space = SpaceId(1);
1834 db.register_space(SpaceConfig::new(space, "s", 2)).unwrap();
1835 db.insert(space, DimensionVector::new(vec![5, 5]), vec![1]).unwrap();
1836 db.insert(space, DimensionVector::new(vec![8, 2]), vec![2]).unwrap();
1837 db.insert(space, DimensionVector::new(vec![200, 200]), vec![3]).unwrap();
1838 db.flush(space).unwrap();
1839
1840 let min = DimensionVector::new(vec![0, 0]);
1841 let max = DimensionVector::new(vec![10, 10]);
1842
1843 let mut via_bbox = db
1844 .query_bbox(space, min.clone(), max.clone(), None)
1845 .unwrap();
1846 let snap = db.current_snapshot(space).unwrap();
1847 let q = Query::new(space, snap).with_range(SpatialRange::new(min, max));
1848 let mut via_execute = db.execute(&q).unwrap();
1849
1850 via_bbox.sort_by_key(|r| r.data.clone());
1851 via_execute.sort_by_key(|r| r.data.clone());
1852 assert_eq!(via_bbox.len(), 2);
1853 assert_eq!(
1854 via_bbox.iter().map(|r| r.data.clone()).collect::<Vec<_>>(),
1855 via_execute.iter().map(|r| r.data.clone()).collect::<Vec<_>>()
1856 );
1857 }
1858
1859 #[test]
1860 fn execute_include_tombstones_flag() {
1861 use crate::infinitedb_core::query::Query;
1862 let (mut db, _dir) = open_tmp();
1863 let space = SpaceId(1);
1864 let point = DimensionVector::new(vec![3, 3]);
1865 db.insert(space, point.clone(), vec![1]).unwrap();
1866 db.delete(space, point).unwrap();
1867
1868 let snap = SnapshotId(0);
1870 let default = db.execute(&Query::new(space, snap)).unwrap();
1871 assert_eq!(default.len(), 0, "tombstoned record hidden by default");
1872
1873 let with_tombstones = db
1874 .execute(&Query::new(space, snap).include_tombstones())
1875 .unwrap();
1876 assert!(
1877 with_tombstones.iter().any(|r| r.tombstone),
1878 "include_tombstones surfaces the tombstone"
1879 );
1880 }
1881
1882 #[test]
1883 fn block_sealed_replay_reconciles_after_partial_crash() {
1884 let dir = TempDir::new().unwrap();
1885 let space = SpaceId(1);
1886 let point = DimensionVector::new(vec![7, 9]);
1887 {
1888 let mut db = InfiniteDb::open(dir.path()).unwrap();
1889 db.register_space(SpaceConfig::new(space, "s", 2)).unwrap();
1890 let rev = db.insert(space, point.clone(), vec![5]).unwrap();
1891
1892 let block_id = db.alloc_block_id();
1896 let snap_id = db.alloc_snapshot_id();
1897 let record = Record {
1898 address: Address::new(space, point.clone()),
1899 revision: rev,
1900 data: vec![5],
1901 tombstone: false,
1902 };
1903 let mut block = Block {
1904 id: block_id,
1905 space,
1906 records: vec![record],
1907 min_revision: rev,
1908 max_revision: rev,
1909 checksum: [0u8; 32],
1910 };
1911 block.checksum = compute_checksum(&block).unwrap();
1912 db.store.write_block(&block).unwrap();
1913 db.wal
1914 .append(&WalEntry::BlockSealed { block_id, space, snapshot: snap_id })
1915 .unwrap();
1916 }
1917
1918 let mut db = InfiniteDb::open(dir.path()).unwrap();
1921 let results = db.query(space, None).unwrap();
1922 assert_eq!(results.len(), 1);
1923 assert_eq!(results[0].data, vec![5]);
1924 assert_eq!(db.snapshots.get(&space.0).unwrap().blocks.len(), 1);
1925 }
1926
1927 #[test]
1928 fn delete_tombstones_record() {
1929 let (mut db, _dir) = open_tmp();
1930 let space = SpaceId(1);
1931 let point = DimensionVector::new(vec![1, 1]);
1932 db.insert(space, point.clone(), vec![99]).unwrap();
1933 db.delete(space, point).unwrap();
1934 let results = db.query(space, None).unwrap();
1935 assert!(results.iter().all(|r| !r.tombstone));
1937 }
1938
1939 #[test]
1940 fn as_of_returns_historical_state() {
1941 let (mut db, _dir) = open_tmp();
1942 let space = SpaceId(1);
1943 let rev1 = db.insert(space, DimensionVector::new(vec![1, 0]), vec![1]).unwrap();
1944 let _rev2 = db.insert(space, DimensionVector::new(vec![2, 0]), vec![2]).unwrap();
1945 let results = db.query(space, Some(rev1)).unwrap();
1947 assert_eq!(results.len(), 1);
1948 assert_eq!(results[0].data, vec![1]);
1949 }
1950
1951 #[test]
1952 fn register_space_rejects_precision_overflow() {
1953 let (mut db, _dir) = open_tmp();
1954 let err = db
1955 .register_space(
1956 SpaceConfig::new(SpaceId(99), "big", 16).with_bits_per_dim(9),
1957 )
1958 .unwrap_err();
1959 assert!(err.contains("dims * bits_per_dim"));
1960 }
1961
1962 #[test]
1963 fn different_space_precision_produces_different_keys() {
1964 use crate::infinitedb_index::hilbert_key_for;
1965 use crate::infinitedb_index::composite::KeyConfig;
1966 let (mut db, _dir) = open_tmp();
1967 let coarse = SpaceId(10);
1968 let fine = SpaceId(11);
1969 db.register_space(SpaceConfig::new(coarse, "coarse", 2).with_bits_per_dim(4))
1970 .unwrap();
1971 db.register_space(SpaceConfig::new(fine, "fine", 2).with_bits_per_dim(8))
1972 .unwrap();
1973 let point = DimensionVector::new(vec![100, 200]);
1974 let k_coarse = hilbert_key_for(&point, KeyConfig { bits_per_dim: 4 });
1975 let k_fine = hilbert_key_for(&point, KeyConfig { bits_per_dim: 8 });
1976 assert_ne!(k_coarse, k_fine);
1977 db.insert(coarse, point.clone(), vec![1]).unwrap();
1978 db.insert(fine, point, vec![2]).unwrap();
1979 db.flush(coarse).unwrap();
1980 db.flush(fine).unwrap();
1981 }
1982
1983 #[test]
1984 fn endpoint_index_returns_incident_edges_only() {
1985 let (mut db, _dir) = open_tmp();
1986 let edge_space = SpaceId(50);
1987 db.register_space(SpaceConfig::new(edge_space, "edges", 2)).unwrap();
1988 let shared = EndpointRef {
1989 role: EndpointRole::new("hub"),
1990 space: SpaceId(1),
1991 node: DimensionVector::new(vec![5]),
1992 };
1993 let other = EndpointRef {
1994 role: EndpointRole::new("leaf"),
1995 space: SpaceId(2),
1996 node: DimensionVector::new(vec![99]),
1997 };
1998 for (id, ep_b) in [
1999 (1u64, DimensionVector::new(vec![10, 0])),
2000 (2, DimensionVector::new(vec![20, 0])),
2001 (3, DimensionVector::new(vec![30, 0])),
2002 ] {
2003 let edge = Hyperedge {
2004 id: HyperedgeId(id),
2005 kind: HyperedgeKind::new("link"),
2006 endpoints: vec![
2007 shared.clone(),
2008 EndpointRef {
2009 role: EndpointRole::new("peer"),
2010 space: SpaceId(3),
2011 node: ep_b,
2012 },
2013 ],
2014 weight_milli: None,
2015 metadata: Default::default(),
2016 valid_from: RevisionId::ZERO,
2017 valid_to: None,
2018 };
2019 db.insert_hyperedge(edge_space, edge).unwrap();
2020 }
2021 for id in [10u64, 11] {
2023 let edge = Hyperedge {
2024 id: HyperedgeId(id),
2025 kind: HyperedgeKind::new("other"),
2026 endpoints: vec![other.clone(), other.clone()],
2027 weight_milli: None,
2028 metadata: Default::default(),
2029 valid_from: RevisionId::ZERO,
2030 valid_to: None,
2031 };
2032 db.insert_hyperedge(edge_space, edge).unwrap();
2033 }
2034 db.flush(edge_space).unwrap();
2035 db.flush(ENDPOINT_INDEX_SPACE).unwrap();
2036 let found = db
2037 .query_hyperedges_for_endpoint(edge_space, &shared, None)
2038 .unwrap();
2039 assert_eq!(found.len(), 3, "only edges incident on the shared endpoint");
2040 }
2041
2042 #[test]
2043 fn traverse_respects_max_depth() {
2044 use crate::infinitedb_core::traversal::TraversalSpec;
2045 let (mut db, _dir) = open_tmp();
2046 let edge_space = SpaceId(60);
2047 db.register_space(SpaceConfig::new(edge_space, "edges", 2)).unwrap();
2048 let n0 = EndpointRef {
2049 role: EndpointRole::new("n"),
2050 space: SpaceId(10),
2051 node: DimensionVector::new(vec![1]),
2052 };
2053 let n1 = EndpointRef {
2054 role: EndpointRole::new("n"),
2055 space: SpaceId(11),
2056 node: DimensionVector::new(vec![2]),
2057 };
2058 let n2 = EndpointRef {
2059 role: EndpointRole::new("n"),
2060 space: SpaceId(12),
2061 node: DimensionVector::new(vec![3]),
2062 };
2063 let n3 = EndpointRef {
2064 role: EndpointRole::new("n"),
2065 space: SpaceId(13),
2066 node: DimensionVector::new(vec![4]),
2067 };
2068 db.insert_hyperedge(
2069 edge_space,
2070 Hyperedge {
2071 id: HyperedgeId(1),
2072 kind: HyperedgeKind::new("chain"),
2073 endpoints: vec![n0.clone(), n1.clone()],
2074 weight_milli: None,
2075 metadata: Default::default(),
2076 valid_from: RevisionId::ZERO,
2077 valid_to: None,
2078 },
2079 )
2080 .unwrap();
2081 db.insert_hyperedge(
2082 edge_space,
2083 Hyperedge {
2084 id: HyperedgeId(2),
2085 kind: HyperedgeKind::new("chain"),
2086 endpoints: vec![n1.clone(), n2.clone()],
2087 weight_milli: None,
2088 metadata: Default::default(),
2089 valid_from: RevisionId::ZERO,
2090 valid_to: None,
2091 },
2092 )
2093 .unwrap();
2094 db.insert_hyperedge(
2095 edge_space,
2096 Hyperedge {
2097 id: HyperedgeId(3),
2098 kind: HyperedgeKind::new("chain"),
2099 endpoints: vec![n2.clone(), n3.clone()],
2100 weight_milli: None,
2101 metadata: Default::default(),
2102 valid_from: RevisionId::ZERO,
2103 valid_to: None,
2104 },
2105 )
2106 .unwrap();
2107 db.flush(edge_space).unwrap();
2108 db.flush(ENDPOINT_INDEX_SPACE).unwrap();
2109
2110 assert!(
2111 db.query_hyperedges_for_endpoint(edge_space, &n0, None)
2112 .unwrap()
2113 .len()
2114 >= 1,
2115 "index must list edges incident on n0"
2116 );
2117 assert!(
2118 db.query_hyperedges_for_endpoint(edge_space, &n1, None)
2119 .unwrap()
2120 .len()
2121 >= 2,
2122 "index must list all edges incident on n1"
2123 );
2124
2125 let depth2 = db
2126 .traverse(
2127 edge_space,
2128 &TraversalSpec {
2129 start: n0.clone(),
2130 max_depth: 2,
2131 follow_kinds: None,
2132 as_of: None,
2133 },
2134 )
2135 .unwrap();
2136 assert!(
2137 depth2.edges.iter().any(|e| e.id == HyperedgeId(2)),
2138 "expected edge n1–n2 in subgraph, edges={:?}",
2139 depth2.edges.iter().map(|e| e.id.0).collect::<Vec<_>>()
2140 );
2141 assert!(
2142 depth2.nodes.iter().any(|n| n.space == SpaceId(12)),
2143 "expected n2 at depth 2, nodes={:?}",
2144 depth2
2145 .nodes
2146 .iter()
2147 .map(|n| (n.space.0, n.node.coords.clone()))
2148 .collect::<Vec<_>>()
2149 );
2150 assert!(!depth2.nodes.iter().any(|n| n.space == SpaceId(13)));
2151
2152 let depth1 = db
2153 .traverse(
2154 edge_space,
2155 &TraversalSpec {
2156 start: n0,
2157 max_depth: 1,
2158 follow_kinds: None,
2159 as_of: None,
2160 },
2161 )
2162 .unwrap();
2163 assert!(depth1.nodes.iter().any(|n| n.space == SpaceId(11)));
2164 assert!(!depth1.nodes.iter().any(|n| n.space == SpaceId(12)));
2165 }
2166
2167 #[test]
2168 fn compact_space_reduces_block_count() {
2169 use crate::infinitedb_storage::compaction::CompactionConfig;
2170 let (mut db, _dir) = open_tmp();
2171 let space = SpaceId(70);
2172 db.register_space(SpaceConfig::new(space, "data", 1)).unwrap();
2173 for i in 0..4u32 {
2174 db.insert(space, DimensionVector::new(vec![i]), vec![i as u8]).unwrap();
2175 db.flush(space).unwrap();
2176 }
2177 assert_eq!(db.snapshots.get(&space.0).unwrap().blocks.len(), 4);
2178 let result = db
2179 .compact_space(
2180 space,
2181 &CompactionConfig {
2182 max_records_per_block: 16,
2183 retain_history: true,
2184 },
2185 )
2186 .unwrap();
2187 assert_eq!(result.superseded.len(), 4);
2188 assert_eq!(result.new_blocks.len(), 1);
2189 assert_eq!(db.snapshots.get(&space.0).unwrap().blocks.len(), 1);
2190 let records = db.query(space, None).unwrap();
2191 assert_eq!(records.len(), 4);
2192 }
2193
2194 #[test]
2195 fn create_branch_succeeds() {
2196 let (mut db, _dir) = open_tmp();
2197 let main = BranchId(1);
2198 let feature = db.create_branch("feature", main).unwrap();
2199 assert_ne!(feature, main);
2200 }
2201
2202 fn clustered_edge(id: u64, a: Vec<u32>, b: Vec<u32>) -> Hyperedge {
2203 Hyperedge {
2204 id: HyperedgeId(id),
2205 kind: HyperedgeKind::new("near"),
2206 endpoints: vec![
2207 EndpointRef {
2208 role: EndpointRole::new("a"),
2209 space: SpaceId(1),
2210 node: DimensionVector::new(a),
2211 },
2212 EndpointRef {
2213 role: EndpointRole::new("b"),
2214 space: SpaceId(1),
2215 node: DimensionVector::new(b),
2216 },
2217 ],
2218 weight_milli: None,
2219 metadata: Default::default(),
2220 valid_from: RevisionId::ZERO,
2221 valid_to: None,
2222 }
2223 }
2224
2225 #[test]
2226 fn centroid_keying_clusters_nearby_edges() {
2227 let a = clustered_edge(1, vec![10, 10], vec![12, 12]); let b = clustered_edge(2, vec![13, 13], vec![15, 15]); let c = clustered_edge(3, vec![240, 240], vec![250, 250]); let pa = centroid_hyperedge_point(&a).unwrap();
2234 let pb = centroid_hyperedge_point(&b).unwrap();
2235 let pc = centroid_hyperedge_point(&c).unwrap();
2236
2237 let ka = hilbert_key_standard(&pa);
2238 let kb = hilbert_key_standard(&pb);
2239 let kc = hilbert_key_standard(&pc);
2240
2241 let near = ka.abs_diff(kb);
2242 let far = ka.abs_diff(kc);
2243 assert!(
2244 near < far,
2245 "nearby edges should cluster: near={near} far={far}"
2246 );
2247 }
2248
2249 #[test]
2250 fn centroid_keyed_edges_are_addressable_by_id() {
2251 let (mut db, _dir) = open_tmp();
2252 let edge_space = SpaceId(80);
2253 db.register_space(
2254 SpaceConfig::new(edge_space, "centroid_edges", 4).with_centroid_keying(),
2255 )
2256 .unwrap();
2257
2258 let edge = clustered_edge(7, vec![20, 30], vec![22, 34]);
2259 db.insert_hyperedge(edge_space, edge.clone()).unwrap();
2260 db.flush(edge_space).unwrap();
2261
2262 let fetched = db.fetch_hyperedge_by_id(edge_space, HyperedgeId(7), None).unwrap();
2264 assert!(fetched.is_some());
2265 assert_eq!(fetched.unwrap().id, HyperedgeId(7));
2266
2267 let incident = db
2269 .query_hyperedges_for_endpoint(edge_space, &edge.endpoints[0], None)
2270 .unwrap();
2271 assert!(incident.iter().any(|e| e.id == HyperedgeId(7)));
2272
2273 db.delete_hyperedge(edge_space, HyperedgeId(7)).unwrap();
2275 db.flush(edge_space).unwrap();
2276 let after = db.fetch_hyperedge_by_id(edge_space, HyperedgeId(7), None).unwrap();
2277 assert!(after.is_none(), "deleted edge must not be addressable");
2278 }
2279
2280 #[test]
2281 fn hyperedge_insert_query_and_delete() {
2282 let (mut db, _dir) = open_tmp();
2283 let edge_space = SpaceId(77);
2284 db.register_space(SpaceConfig::new(edge_space, "hyperedges", 2))
2285 .unwrap();
2286 let edge = Hyperedge {
2287 id: HyperedgeId(42),
2288 kind: HyperedgeKind::new("beam.bears_on"),
2289 endpoints: vec![
2290 EndpointRef {
2291 role: EndpointRole::new("parent"),
2292 space: SpaceId(10),
2293 node: DimensionVector::new(vec![100]),
2294 },
2295 EndpointRef {
2296 role: EndpointRole::new("support"),
2297 space: SpaceId(11),
2298 node: DimensionVector::new(vec![200]),
2299 },
2300 ],
2301 weight_milli: Some(1_000),
2302 metadata: std::collections::BTreeMap::new(),
2303 valid_from: RevisionId::ZERO,
2304 valid_to: None,
2305 };
2306 db.insert_hyperedge(edge_space, edge.clone()).unwrap();
2307 db.flush(edge_space).unwrap();
2308 let by_kind = db
2309 .query_hyperedges_by_kind(edge_space, "beam.bears_on", None)
2310 .unwrap();
2311 assert_eq!(by_kind.len(), 1);
2312 assert_eq!(by_kind[0].id.0, edge.id.0);
2313 db.delete_hyperedge(edge_space, edge.id).unwrap();
2314 let after_delete = db.query_hyperedges(edge_space, None).unwrap();
2315 assert!(after_delete.is_empty());
2316 }
2317
2318 #[test]
2319 fn signal_scope_and_range_queries() {
2320 let (mut db, _dir) = open_tmp();
2321 let signal_space = SpaceId(88);
2322 db.register_space(SpaceConfig::new(signal_space, "beam_signals", 3))
2323 .unwrap();
2324
2325 let scope = SignalScope {
2326 parent_prefix: DimensionVector::new(vec![7]),
2327 total_dims: 3,
2328 };
2329 db.insert_signal_sample(
2330 signal_space,
2331 SignalSample {
2332 signal_id: SignalId(1),
2333 kind: SignalKind::new("beam.bending_moment"),
2334 scope: scope.clone(),
2335 local_coords: vec![0, 0],
2336 value_milli: 10_000,
2337 source_revision: None,
2338 constraint: None,
2339 },
2340 )
2341 .unwrap();
2342 db.insert_signal_sample(
2343 signal_space,
2344 SignalSample {
2345 signal_id: SignalId(1),
2346 kind: SignalKind::new("beam.bending_moment"),
2347 scope,
2348 local_coords: vec![5, 0],
2349 value_milli: 20_000,
2350 source_revision: None,
2351 constraint: None,
2352 },
2353 )
2354 .unwrap();
2355 db.flush(signal_space).unwrap();
2356
2357 let scoped = db.query_signal_scope(signal_space, &[7], None).unwrap();
2358 assert_eq!(scoped.len(), 2);
2359 let ranged = db
2360 .query_signal_range(signal_space, &[7], &[0, 0], &[2, u32::MAX], None)
2361 .unwrap();
2362 assert_eq!(ranged.len(), 1);
2363 }
2364
2365 #[test]
2366 fn adapter_wrappers_accept_typed_kinds_and_space_binding() {
2367 let (mut db, _dir) = open_tmp();
2368 let edge_space = SpaceId(177);
2369 db.register_space(SpaceConfig::new(edge_space, "adapter_edges", 2))
2370 .unwrap();
2371 db.register_space(SpaceConfig::new(
2372 BeamSignalSpace::SPACE_ID,
2373 BeamSignalSpace::SPACE_NAME,
2374 BeamSignalSpace::DIMS,
2375 ))
2376 .unwrap();
2377
2378 let mut catalog = KindCatalog::new(UnknownKindPolicy::RejectUnknown);
2379 catalog.register_edge_kind(KindDefinition::new("beam.bears_on"));
2380 catalog.register_endpoint_role(KindDefinition::new("parent"));
2381 catalog.register_endpoint_role(KindDefinition::new("support"));
2382 catalog.register_signal_kind(KindDefinition::new("beam.bending_moment"));
2383
2384 db.insert_hyperedge_typed(
2385 edge_space,
2386 HyperedgeId(900),
2387 BeamKinds::BearsOn,
2388 vec![
2389 AdapterEndpoint::new("parent", SpaceId(1), DimensionVector::new(vec![10])),
2390 AdapterEndpoint::new("support", SpaceId(2), DimensionVector::new(vec![20])),
2391 ],
2392 Some(1000),
2393 std::collections::BTreeMap::new(),
2394 None,
2395 Some(&catalog),
2396 )
2397 .unwrap();
2398 db.flush(edge_space).unwrap();
2399 let edges = db
2400 .query_hyperedges_by_kind_typed(edge_space, BeamKinds::BearsOn, None)
2401 .unwrap();
2402 assert_eq!(edges.len(), 1);
2403
2404 db.insert_signal_sample_typed::<BeamSignalSpace, _>(
2405 SignalId(1),
2406 BeamKinds::BendingMoment,
2407 DimensionVector::new(vec![7]),
2408 vec![0, 0],
2409 1234,
2410 None,
2411 None,
2412 Some(&catalog),
2413 )
2414 .unwrap();
2415 }
2416
2417 #[test]
2418 fn adapter_rejects_unknown_kind_under_reject_policy() {
2419 let (mut db, _dir) = open_tmp();
2420 let edge_space = SpaceId(178);
2421 db.register_space(SpaceConfig::new(edge_space, "adapter_edges2", 2)).unwrap();
2422
2423 let catalog = KindCatalog::new(UnknownKindPolicy::RejectUnknown);
2424 let err = db
2425 .insert_hyperedge_typed(
2426 edge_space,
2427 HyperedgeId(901),
2428 "unknown.edge.kind",
2429 vec![
2430 AdapterEndpoint::new("parent", SpaceId(1), DimensionVector::new(vec![1])),
2431 AdapterEndpoint::new("support", SpaceId(2), DimensionVector::new(vec![2])),
2432 ],
2433 None,
2434 std::collections::BTreeMap::new(),
2435 None,
2436 Some(&catalog),
2437 )
2438 .unwrap_err();
2439 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
2440 }
2441
2442 #[test]
2443 fn adapter_rejects_space_binding_dim_mismatch() {
2444 struct WrongDimSpace;
2445 impl SpaceBinding for WrongDimSpace {
2446 const SPACE_ID: SpaceId = SpaceId(188);
2447 const DIMS: usize = 4;
2448 }
2449 let (mut db, _dir) = open_tmp();
2450 db.register_space(SpaceConfig::new(SpaceId(188), "wrong_dim", 3)).unwrap();
2451 let err = db
2452 .insert_signal_sample_typed::<WrongDimSpace, _>(
2453 SignalId(2),
2454 "beam.any",
2455 DimensionVector::new(vec![7]),
2456 vec![0, 0],
2457 999,
2458 None,
2459 None,
2460 None,
2461 )
2462 .unwrap_err();
2463 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
2464 }
2465
2466 #[cfg(feature = "sync")]
2467 struct AckTransport;
2468 #[cfg(feature = "sync")]
2469 impl SyncTransport for AckTransport {
2470 fn push_batch(&self, batch: &[SyncEnvelope]) -> Result<Vec<SyncResult>, String> {
2471 Ok(batch
2472 .iter()
2473 .map(|item| SyncResult::Ack { op_id: item.op_id })
2474 .collect())
2475 }
2476 }
2477
2478 #[cfg(feature = "sync")]
2479 struct FlakyTransport;
2480 #[cfg(feature = "sync")]
2481 impl SyncTransport for FlakyTransport {
2482 fn push_batch(&self, batch: &[SyncEnvelope]) -> Result<Vec<SyncResult>, String> {
2483 Ok(batch
2484 .iter()
2485 .map(|item| SyncResult::Retry {
2486 op_id: item.op_id,
2487 error: "offline".to_string(),
2488 })
2489 .collect())
2490 }
2491 }
2492
2493 #[cfg(feature = "sync")]
2494 struct StaleConflictTransport;
2495 #[cfg(feature = "sync")]
2496 impl SyncTransport for StaleConflictTransport {
2497 fn push_batch(&self, batch: &[SyncEnvelope]) -> Result<Vec<SyncResult>, String> {
2498 Ok(batch
2499 .iter()
2500 .map(|item| SyncResult::ConflictStale {
2501 op_id: item.op_id,
2502 reason: "stale write".to_string(),
2503 })
2504 .collect())
2505 }
2506 }
2507
2508 #[cfg(feature = "sync")]
2509 #[test]
2510 fn outbox_survives_restart() {
2511 let dir = TempDir::new().unwrap();
2512 let space = SpaceId(1);
2513 {
2514 let mut db = InfiniteDb::open(dir.path()).unwrap();
2515 db.insert(space, DimensionVector::new(vec![1, 2]), vec![3]).unwrap();
2516 assert_eq!(db.sync_pending_count(), 1);
2517 }
2518 let db = InfiniteDb::open(dir.path()).unwrap();
2519 assert_eq!(db.sync_pending_count(), 1);
2520 }
2521
2522 #[cfg(feature = "sync")]
2523 #[test]
2524 fn offline_queue_then_manual_sync_drains() {
2525 let (mut db, _dir) = open_tmp();
2526 let space = SpaceId(1);
2527 db.insert(space, DimensionVector::new(vec![10, 10]), vec![7]).unwrap();
2528 let retry_report = db.sync_now(&FlakyTransport, 32).unwrap();
2529 assert_eq!(retry_report.retried, 1);
2530 assert_eq!(db.sync_pending_count(), 1);
2531 std::thread::sleep(Duration::from_millis(2100));
2532 let ack_report = db.sync_now(&AckTransport, 32).unwrap();
2533 assert_eq!(ack_report.acked, 1);
2534 assert_eq!(db.sync_pending_count(), 0);
2535 }
2536
2537 #[cfg(feature = "sync")]
2538 #[test]
2539 fn stale_conflict_is_dropped_under_lww() {
2540 let (mut db, _dir) = open_tmp();
2541 let space = SpaceId(1);
2542 db.insert(space, DimensionVector::new(vec![11, 11]), vec![8]).unwrap();
2543 let report = db.sync_now(&StaleConflictTransport, 32).unwrap();
2544 assert_eq!(report.dropped_stale, 1);
2545 assert_eq!(db.sync_pending_count(), 0);
2546 }
2547
2548 #[cfg(feature = "sync")]
2549 #[test]
2550 fn background_worker_retries_and_acks() {
2551 let (mut db, _dir) = open_tmp();
2552 let space = SpaceId(1);
2553 db.insert(space, DimensionVector::new(vec![20, 20]), vec![1]).unwrap();
2554 db.start_background_sync(
2555 Arc::new(AckTransport),
2556 Duration::from_millis(20),
2557 16,
2558 )
2559 .unwrap();
2560 std::thread::sleep(Duration::from_millis(120));
2561 db.stop_background_sync();
2562 assert_eq!(db.sync_pending_count(), 0);
2563 }
2564}