1use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use bincode::{config::standard, decode_from_slice, encode_to_vec};
9use parking_lot::{Mutex, RwLock};
10
11use crate::engine::branch_overlay::BranchOverlayStore;
12use crate::engine::coordinator::SpaceCoordinator;
13use crate::engine::hilbert_coordinator::HilbertCoordinator;
14use crate::engine::hilbert_live_tails::HilbertLiveTails;
15use crate::engine::io_thread::{open_io_pipeline, IoStats, IoThreadConfig, IoThreadHandle};
16use crate::engine::live_tail::LiveTailView;
17use crate::engine::merge::merge_branches;
18use crate::engine::query::{query_bbox, query_inner, snapshots_map_for_persist};
19use crate::engine::snapshot_store::SnapshotStore;
20use crate::engine::space_live_tails::SpaceLiveTails;
21use crate::engine::write_queue::{WriteJob, WriteQueueSender};
22use crate::infinitedb_core::{
23 address::{Address, DimensionVector, RevisionId, SpaceId},
24 block::Record,
25 branch::{Branch, BranchId, BranchRegistry},
26 merge::{MergeConflict, MergeResult, MergeStrategy},
27 space::{SpaceConfig, SpaceRegistry},
28 snapshot::SnapshotId,
29};
30use crate::infinitedb_storage::{
31 format::{FormatVersion, FORMAT_VERSION_V2, FORMAT_VERSION_V3, FORMAT_VERSION_V4},
32 nvme::BlockStore,
33 wal::WalEntry,
34};
35
36#[derive(Debug, Clone)]
38pub struct OpenOptions {
39 pub io_thread: IoThreadConfig,
41 pub block_cache_bytes: usize,
43 pub format_version: Option<u32>,
45}
46
47impl Default for OpenOptions {
48 fn default() -> Self {
49 Self {
50 io_thread: IoThreadConfig::default(),
51 block_cache_bytes: 10 * 1024 * 1024,
52 format_version: None,
53 }
54 }
55}
56
57impl OpenOptions {
58 pub fn open<P: AsRef<Path>>(&self, dir: P) -> io::Result<InfiniteDb> {
60 InfiniteDb::open_with_options(dir, self)
61 }
62}
63
64enum WriteBackend {
65 V2 {
67 queue: WriteQueueSender,
68 io_handle: Mutex<IoThreadHandle>,
69 live_tail: Arc<LiveTailView>,
70 },
71 V3 {
73 coordinator: SpaceCoordinator,
74 },
75 V4 {
77 coordinator: HilbertCoordinator,
78 },
79}
80
81pub struct InfiniteDb {
83 root: PathBuf,
84 format_version: u32,
85 pub(crate) store: Arc<BlockStore>,
86 pub(crate) spaces: Arc<RwLock<SpaceRegistry>>,
87 branches: Arc<RwLock<BranchRegistry>>,
88 pub(crate) snapshots: Arc<SnapshotStore>,
89 pub(crate) revision: Arc<AtomicU64>,
90 next_block_id: Arc<AtomicU64>,
91 next_snapshot_id: Arc<AtomicU64>,
92 next_branch_id: Arc<AtomicU64>,
93 pub(crate) branch_overlays: Arc<BranchOverlayStore>,
94 #[cfg(feature = "sync")]
95 conflicts: Arc<crate::infinitedb_sync::conflict_queue::ConflictQueue>,
96 backend: WriteBackend,
97}
98
99impl InfiniteDb {
100 pub fn open<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
102 OpenOptions::default().open(dir)
103 }
104
105 pub fn open_with_options<P: AsRef<Path>>(dir: P, options: &OpenOptions) -> io::Result<Self> {
107 let root = dir.as_ref().to_path_buf();
108 let store = Arc::new(BlockStore::open_with_cache(
109 root.clone(),
110 options.block_cache_bytes,
111 )?);
112
113 let format_version = match FormatVersion::read_from_meta(&root.join("meta"))? {
114 Some(v) => v.0,
115 None => options.format_version.unwrap_or(FORMAT_VERSION_V4),
116 };
117
118 match format_version {
119 FORMAT_VERSION_V2 | FORMAT_VERSION_V3 | FORMAT_VERSION_V4 => {}
120 other => {
121 return Err(io::Error::new(
122 io::ErrorKind::InvalidData,
123 format!("unsupported concurrent format version {other}"),
124 ));
125 }
126 }
127
128 if FormatVersion::read_from_meta(&root.join("meta"))?.is_none() {
129 FormatVersion(format_version).write_to_meta(&root.join("meta"))?;
130 if format_version == FORMAT_VERSION_V2 {
131 std::fs::create_dir_all(root.join("hot"))?;
132 std::fs::create_dir_all(root.join("wal"))?;
133 } else {
134 std::fs::create_dir_all(root.join("spaces"))?;
135 }
136 }
137
138 let branch_overlays = Arc::new(BranchOverlayStore::new());
139 #[cfg(feature = "sync")]
140 let conflicts = Arc::new(crate::infinitedb_sync::conflict_queue::ConflictQueue::open(&root)?);
141
142 let (spaces, branches, snapshots, next_rev, next_block, next_snap, next_branch) =
143 load_meta(&store).unwrap_or_else(default_meta);
144
145 let spaces = Arc::new(RwLock::new(spaces));
146 let branches = Arc::new(RwLock::new(branches));
147 let snapshots = Arc::new(SnapshotStore::new(snapshots));
148 let revision = Arc::new(AtomicU64::new(next_rev));
149 let next_block_id = Arc::new(AtomicU64::new(next_block));
150 let next_snapshot_id = Arc::new(AtomicU64::new(next_snap));
151 let next_branch_id = Arc::new(AtomicU64::new(next_branch));
152
153 if branches.read().get_by_name("main").is_none() {
154 let snap_id = SnapshotId(next_snap);
155 let _ = branches.write().insert(Branch {
156 id: BranchId(1),
157 name: "main".to_string(),
158 head: snap_id,
159 parent: None,
160 forked_at: RevisionId::ZERO,
161 });
162 }
163
164 let backend = if format_version == FORMAT_VERSION_V4 {
165 let coordinator = HilbertCoordinator::new(
166 root.clone(),
167 Arc::clone(&store),
168 Arc::clone(&snapshots),
169 Arc::clone(&branch_overlays),
170 Arc::clone(&spaces),
171 Arc::clone(&next_block_id),
172 Arc::clone(&next_snapshot_id),
173 options.io_thread.clone(),
174 );
175 coordinator.bootstrap_registered_spaces()?;
176 WriteBackend::V4 { coordinator }
177 } else if format_version == FORMAT_VERSION_V3 {
178 let coordinator = SpaceCoordinator::new(
179 root.clone(),
180 Arc::clone(&store),
181 Arc::clone(&snapshots),
182 Arc::clone(&spaces),
183 Arc::clone(&next_block_id),
184 Arc::clone(&next_snapshot_id),
185 options.io_thread.clone(),
186 );
187 coordinator.bootstrap_registered_spaces()?;
188 WriteBackend::V3 { coordinator }
189 } else {
190 let live_tail = Arc::new(LiveTailView::new());
191 let (queue, io_handle) = open_io_pipeline(
192 root.clone(),
193 Arc::clone(&store),
194 Arc::clone(&snapshots),
195 Arc::clone(&live_tail),
196 Arc::clone(&spaces),
197 Arc::clone(&revision),
198 Arc::clone(&next_block_id),
199 Arc::clone(&next_snapshot_id),
200 options.io_thread.clone(),
201 );
202 WriteBackend::V2 {
203 queue,
204 io_handle: Mutex::new(io_handle),
205 live_tail,
206 }
207 };
208
209 Ok(Self {
210 root,
211 format_version,
212 store,
213 spaces,
214 branches,
215 snapshots,
216 revision,
217 next_block_id,
218 next_snapshot_id,
219 next_branch_id,
220 branch_overlays,
221 #[cfg(feature = "sync")]
222 conflicts,
223 backend,
224 })
225 }
226
227 pub fn branch_head(&self, branch: BranchId) -> Option<SnapshotId> {
229 self.branches.read().get(branch).map(|b| b.head)
230 }
231
232 pub fn branch_id(&self, name: &str) -> Option<BranchId> {
234 self.branches.read().get_by_name(name).map(|b| b.id)
235 }
236
237 #[cfg(feature = "sync")]
239 pub fn conflicts(&self) -> &crate::infinitedb_sync::conflict_queue::ConflictQueue {
240 &self.conflicts
241 }
242
243 pub fn format_version(&self) -> u32 {
245 self.format_version
246 }
247
248 pub fn register_space(&self, config: SpaceConfig) -> Result<(), String> {
250 if config.bits_per_dim == 0 {
251 return Err("bits_per_dim must be at least 1".to_string());
252 }
253 if config.dims as u32 * config.bits_per_dim > 128 {
254 return Err(format!(
255 "dims * bits_per_dim must be <= 128 (got {} * {})",
256 config.dims, config.bits_per_dim
257 ));
258 }
259 let space_id = config.id.0;
260 let shard_bits = config.shard_bits;
261 self.spaces
262 .write()
263 .register(config)
264 .map_err(|e| format!("{:?}", e))?;
265 match &self.backend {
266 WriteBackend::V3 { coordinator } => {
267 coordinator
268 .ensure_space(space_id)
269 .map_err(|e| e.to_string())?;
270 }
271 WriteBackend::V4 { coordinator } => {
272 let count = crate::engine::hilbert_shard::shard_count(shard_bits);
273 for shard_id in 0..count {
274 coordinator
275 .ensure_shard(space_id, shard_id)
276 .map_err(|e| e.to_string())?;
277 }
278 }
279 WriteBackend::V2 { .. } => {}
280 }
281 self.persist_meta().map_err(|e| e.to_string())?;
282 Ok(())
283 }
284
285 pub fn insert(
287 &self,
288 space: SpaceId,
289 point: DimensionVector,
290 data: Vec<u8>,
291 ) -> io::Result<RevisionId> {
292 self.insert_on_branch(BranchId::MAIN, space, point, data)
293 }
294
295 pub fn insert_on_branch(
297 &self,
298 branch: BranchId,
299 space: SpaceId,
300 point: DimensionVector,
301 data: Vec<u8>,
302 ) -> io::Result<RevisionId> {
303 let rev = self.next_revision();
304 let address = Address::new(space, point);
305 let entry = WalEntry::Write {
306 address: address.clone(),
307 revision: rev,
308 data: data.clone(),
309 };
310 let record = Record {
311 address,
312 revision: rev,
313 data,
314 tombstone: false,
315 };
316 let job = WriteJob {
317 branch_id: branch,
318 revision: rev,
319 entry,
320 record,
321 };
322 self.enqueue(job)?;
323 Ok(rev)
324 }
325
326 pub fn delete(&self, space: SpaceId, point: DimensionVector) -> io::Result<RevisionId> {
328 self.delete_on_branch(BranchId::MAIN, space, point)
329 }
330
331 pub fn delete_on_branch(
333 &self,
334 branch: BranchId,
335 space: SpaceId,
336 point: DimensionVector,
337 ) -> io::Result<RevisionId> {
338 let rev = self.next_revision();
339 let address = Address::new(space, point);
340 let entry = WalEntry::Tombstone {
341 address: address.clone(),
342 revision: rev,
343 };
344 let record = Record {
345 address,
346 revision: rev,
347 data: vec![],
348 tombstone: true,
349 };
350 let job = WriteJob {
351 branch_id: branch,
352 revision: rev,
353 entry,
354 record,
355 };
356 self.enqueue(job)?;
357 Ok(rev)
358 }
359
360 pub fn create_branch(&self, name: &str, from: BranchId) -> Result<BranchId, String> {
362 let parent = self
363 .branches
364 .read()
365 .get(from)
366 .ok_or_else(|| format!("parent branch {:?} not found", from))?
367 .clone();
368 let id = BranchId(self.next_branch_id.fetch_add(1, Ordering::Relaxed));
369 let forked_at = RevisionId(self.revision.load(Ordering::Relaxed));
370 let branch = Branch {
371 id,
372 name: name.to_string(),
373 head: parent.head,
374 parent: Some(from),
375 forked_at,
376 };
377 self.branches
378 .write()
379 .insert(branch)
380 .map_err(|e| format!("{:?}", e))?;
381 self.persist_meta().map_err(|e| e.to_string())?;
382 Ok(id)
383 }
384
385 pub fn merge_branch(
387 &self,
388 target: BranchId,
389 source: BranchId,
390 strategy: MergeStrategy,
391 resolver: Option<Box<dyn Fn(MergeConflict) -> Record + Send + Sync>>,
392 ) -> io::Result<MergeResult> {
393 self.sync()?;
394 let ctx = self.query_ctx();
395 let mut result = merge_branches(
396 &self.store,
397 &self.snapshots,
398 ctx.live_tail,
399 ctx.space_tails,
400 ctx.hilbert_tails,
401 &self.branch_overlays,
402 &self.spaces.read(),
403 &self.revision,
404 &self.branches.read(),
405 target,
406 source,
407 strategy,
408 resolver.as_deref(),
409 )?;
410 if strategy == MergeStrategy::Interactive && !result.conflicts.is_empty() {
411 return Ok(result);
412 }
413 let applied = std::mem::take(&mut result.applied_records);
414 for record in applied {
415 if record.tombstone {
416 self.delete_on_branch(target, record.address.space, record.address.point)?;
417 } else {
418 self.insert_on_branch(
419 target,
420 record.address.space,
421 record.address.point,
422 record.data,
423 )?;
424 }
425 }
426 self.branch_overlays.clear_branch(source);
427 self.sync()?;
428 Ok(result)
429 }
430
431 pub fn query_on_branch(
433 &self,
434 branch: BranchId,
435 space: SpaceId,
436 as_of: Option<RevisionId>,
437 ) -> io::Result<Vec<Record>> {
438 let ctx = self.query_ctx();
439 let branch_id = if branch == BranchId::MAIN {
440 None
441 } else {
442 Some(branch)
443 };
444 query_inner(
445 &self.store,
446 &self.snapshots,
447 ctx.live_tail,
448 ctx.space_tails,
449 &self.spaces.read(),
450 &self.revision,
451 space,
452 None,
453 as_of,
454 false,
455 ctx.hilbert_tails,
456 Some(&self.branch_overlays),
457 branch_id,
458 )
459 }
460
461 pub fn enqueue_batch(&self, jobs: Vec<WriteJob>) -> io::Result<()> {
466 for job in &jobs {
467 self.revision
468 .fetch_max(job.revision.0, Ordering::Relaxed);
469 }
470 let mut main_jobs = Vec::with_capacity(jobs.len());
471 for job in jobs {
472 if job.branch_id != BranchId::MAIN {
473 self.branch_overlays
474 .append(job.branch_id, job.record.address.space, job.record);
475 } else {
476 main_jobs.push(job);
477 }
478 }
479 if main_jobs.is_empty() {
480 return Ok(());
481 }
482 match &self.backend {
483 WriteBackend::V4 { coordinator } => coordinator.enqueue_batch(main_jobs),
484 WriteBackend::V3 { coordinator } => coordinator.enqueue_batch(main_jobs),
485 WriteBackend::V2 { queue, .. } => {
486 for job in main_jobs {
487 queue.enqueue_write(job)?;
488 }
489 Ok(())
490 }
491 }
492 }
493
494 pub fn query(
496 &self,
497 space: SpaceId,
498 as_of: Option<RevisionId>,
499 ) -> io::Result<Vec<Record>> {
500 self.query_on_branch(BranchId::MAIN, space, as_of)
501 }
502
503 pub fn query_bbox(
505 &self,
506 space: SpaceId,
507 min: DimensionVector,
508 max: DimensionVector,
509 as_of: Option<RevisionId>,
510 ) -> io::Result<Vec<Record>> {
511 self.query_bbox_on_branch(BranchId::MAIN, space, min, max, as_of)
512 }
513
514 pub fn query_bbox_on_branch(
516 &self,
517 branch: BranchId,
518 space: SpaceId,
519 min: DimensionVector,
520 max: DimensionVector,
521 as_of: Option<RevisionId>,
522 ) -> io::Result<Vec<Record>> {
523 let ctx = self.query_ctx();
524 let branch_id = if branch == BranchId::MAIN {
525 None
526 } else {
527 Some(branch)
528 };
529 query_bbox(
530 &self.store,
531 &self.snapshots,
532 ctx.live_tail,
533 ctx.space_tails,
534 &self.spaces.read(),
535 &self.revision,
536 space,
537 min,
538 max,
539 as_of,
540 ctx.hilbert_tails,
541 Some(&self.branch_overlays),
542 branch_id,
543 )
544 }
545
546 pub fn flush(&self, space: SpaceId) -> io::Result<()> {
548 match &self.backend {
549 WriteBackend::V4 { coordinator } => coordinator.flush_space(space.0)?,
550 WriteBackend::V3 { coordinator } => coordinator.flush_space(space.0)?,
551 WriteBackend::V2 { queue, .. } => queue.request_flush(space.0)?,
552 }
553 self.persist_meta()
554 }
555
556 pub fn sync(&self) -> io::Result<()> {
558 match &self.backend {
559 WriteBackend::V4 { coordinator } => coordinator.sync_all()?,
560 WriteBackend::V3 { coordinator } => coordinator.sync_all()?,
561 WriteBackend::V2 { queue, .. } => queue.request_sync()?,
562 }
563 self.persist_meta()
564 }
565
566 pub fn revision(&self) -> u64 {
568 self.revision.load(Ordering::Relaxed)
569 }
570
571 pub fn read(&self) -> crate::concurrent::read_txn::ReadTxn<'_> {
573 crate::concurrent::read_txn::ReadTxn::new(self)
574 }
575
576 pub fn io_stats(&self) -> IoStats {
578 match &self.backend {
579 WriteBackend::V4 { coordinator } => coordinator.io_stats(),
580 WriteBackend::V3 { coordinator } => coordinator.io_stats(),
581 WriteBackend::V2 { queue, io_handle, .. } => {
582 let handle = io_handle.lock();
583 IoStats {
584 queue_depth: queue.queued_count(),
585 direct_writes: handle.direct_writes(),
586 staged_writes: handle.staged_writes(),
587 staging_wal_frames: 0,
588 }
589 }
590 }
591 }
592
593 pub fn space_shard_count(&self) -> usize {
595 match &self.backend {
596 WriteBackend::V4 { coordinator } => coordinator.shard_count(),
597 WriteBackend::V3 { coordinator } => coordinator.shard_count(),
598 WriteBackend::V2 { .. } => 1,
599 }
600 }
601
602 pub(crate) fn live_tail_refs(&self) -> (Option<&LiveTailView>, Option<&SpaceLiveTails>) {
603 let ctx = self.query_ctx();
604 (ctx.live_tail, ctx.space_tails)
605 }
606
607 pub(crate) fn query_ctx(&self) -> QueryCtx<'_> {
608 match &self.backend {
609 WriteBackend::V2 { live_tail, .. } => QueryCtx {
610 live_tail: Some(live_tail.as_ref()),
611 space_tails: None,
612 hilbert_tails: None,
613 },
614 WriteBackend::V3 { coordinator } => QueryCtx {
615 live_tail: None,
616 space_tails: Some(coordinator.live_tails()),
617 hilbert_tails: None,
618 },
619 WriteBackend::V4 { coordinator } => QueryCtx {
620 live_tail: None,
621 space_tails: None,
622 hilbert_tails: Some(coordinator.live_tails()),
623 },
624 }
625 }
626
627 fn enqueue(&self, job: WriteJob) -> io::Result<()> {
628 if job.branch_id != BranchId::MAIN {
629 self.branch_overlays
630 .append(job.branch_id, job.record.address.space, job.record);
631 return Ok(());
632 }
633 match &self.backend {
634 WriteBackend::V4 { coordinator } => coordinator.enqueue_write(job),
635 WriteBackend::V3 { coordinator } => coordinator.enqueue_write(job),
636 WriteBackend::V2 { queue, .. } => queue.enqueue_write(job),
637 }
638 }
639
640 fn next_revision(&self) -> RevisionId {
641 RevisionId(self.revision.fetch_add(1, Ordering::Relaxed) + 1)
642 }
643
644 fn persist_meta(&self) -> io::Result<()> {
645 let spaces_bytes = encode_to_vec(&*self.spaces.read(), standard())
646 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
647 self.store.write_meta("spaces.bin", &spaces_bytes)?;
648
649 let branches_bytes = encode_to_vec(&*self.branches.read(), standard())
650 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
651 self.store.write_meta("branches.bin", &branches_bytes)?;
652
653 let snapshots = snapshots_map_for_persist(&self.snapshots);
654 let snapshots_bytes = encode_to_vec(&snapshots, standard())
655 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
656 self.store.write_meta("snapshots.bin", &snapshots_bytes)?;
657
658 let counters: [u64; 4] = [
659 self.revision.load(Ordering::Relaxed),
660 self.next_block_id.load(Ordering::Relaxed),
661 self.next_snapshot_id.load(Ordering::Relaxed),
662 self.next_branch_id.load(Ordering::Relaxed),
663 ];
664 let counters_bytes = encode_to_vec(&counters, standard())
665 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
666 self.store.write_meta("counters.bin", &counters_bytes)?;
667 Ok(())
668 }
669}
670
671impl Drop for InfiniteDb {
672 fn drop(&mut self) {
673 let _ = self.persist_meta();
674 match &self.backend {
675 WriteBackend::V4 { coordinator } => {
676 let _ = coordinator.shutdown_all();
677 }
678 WriteBackend::V3 { coordinator } => {
679 let _ = coordinator.shutdown_all();
680 }
681 WriteBackend::V2 { queue, io_handle, .. } => {
682 let _ = queue.shutdown();
683 let _ = io_handle.lock().join();
684 }
685 }
686 }
687}
688
689type MetaTuple = (
690 SpaceRegistry,
691 BranchRegistry,
692 std::collections::BTreeMap<u64, crate::infinitedb_core::snapshot::Snapshot>,
693 u64,
694 u64,
695 u64,
696 u64,
697);
698
699fn load_meta(store: &BlockStore) -> Option<MetaTuple> {
700 let counters_bytes = store.read_meta("counters.bin").ok()?;
701 let (revision, next_block, next_snapshot, next_branch) =
702 match decode_from_slice::<[u64; 4], _>(&counters_bytes, standard()) {
703 Ok((c, _)) => (c[0], c[1], c[2], c[3]),
704 Err(_) => {
705 let (c, _): ([u64; 3], _) = decode_from_slice(&counters_bytes, standard()).ok()?;
706 (c[0], c[1], c[2], 2)
707 }
708 };
709
710 let spaces_bytes = store.read_meta("spaces.bin").ok()?;
711 let (spaces, _): (SpaceRegistry, _) = decode_from_slice(&spaces_bytes, standard()).ok()?;
712
713 let branches = store
714 .read_meta("branches.bin")
715 .ok()
716 .and_then(|b| decode_from_slice::<BranchRegistry, _>(&b, standard()).ok())
717 .map(|(r, _)| r)
718 .unwrap_or_else(BranchRegistry::new);
719
720 let snapshots = store
721 .read_meta("snapshots.bin")
722 .ok()
723 .and_then(|b| {
724 decode_from_slice::<
725 std::collections::BTreeMap<u64, crate::infinitedb_core::snapshot::Snapshot>,
726 _,
727 >(&b, standard())
728 .ok()
729 })
730 .map(|(m, _)| m)
731 .unwrap_or_default();
732
733 Some((
734 spaces,
735 branches,
736 snapshots,
737 revision,
738 next_block,
739 next_snapshot,
740 next_branch,
741 ))
742}
743
744pub(crate) struct QueryCtx<'a> {
745 pub live_tail: Option<&'a LiveTailView>,
746 pub space_tails: Option<&'a SpaceLiveTails>,
747 pub hilbert_tails: Option<&'a HilbertLiveTails>,
748}
749
750fn default_meta() -> MetaTuple {
751 (
752 SpaceRegistry::new(),
753 BranchRegistry::new(),
754 std::collections::BTreeMap::new(),
755 0,
756 1,
757 1,
758 2,
759 )
760}