Skip to main content

infinite_db/concurrent/
concurrent_db.rs

1//! [`InfiniteDb`] — fire-and-forget writes with per-space I/O (v3) or global I/O (v2).
2
3use std::io;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use bincode::{config::standard, decode_from_slice, encode_to_vec};
9use parking_lot::{Mutex, RwLock};
10
11use crate::engine::branch_overlay::BranchOverlayStore;
12use crate::engine::coordinator::SpaceCoordinator;
13use crate::engine::hilbert_coordinator::HilbertCoordinator;
14use crate::engine::hilbert_live_tails::HilbertLiveTails;
15use crate::engine::io_thread::{open_io_pipeline, IoStats, IoThreadConfig, IoThreadHandle};
16use crate::engine::live_tail::LiveTailView;
17use crate::engine::merge::merge_branches;
18use crate::engine::query::{query_bbox, query_inner, snapshots_map_for_persist};
19use crate::engine::snapshot_store::SnapshotStore;
20use crate::engine::space_live_tails::SpaceLiveTails;
21use crate::engine::write_queue::{WriteJob, WriteQueueSender};
22use crate::infinitedb_core::{
23    address::{Address, DimensionVector, RevisionId, SpaceId},
24    block::Record,
25    branch::{Branch, BranchId, BranchRegistry},
26    merge::{MergeConflict, MergeResult, MergeStrategy},
27    space::{SpaceConfig, SpaceRegistry},
28    snapshot::SnapshotId,
29};
30use crate::infinitedb_storage::{
31    format::{FormatVersion, FORMAT_VERSION_V2, FORMAT_VERSION_V3, FORMAT_VERSION_V4},
32    nvme::BlockStore,
33    wal::WalEntry,
34};
35
36/// Options for opening [`InfiniteDb`] (formats v2–v4).
37#[derive(Debug, Clone)]
38pub struct OpenOptions {
39    /// I/O thread queue depth, staging, and durability settings.
40    pub io_thread: IoThreadConfig,
41    /// In-memory block cache size in bytes for the block store.
42    pub block_cache_bytes: usize,
43    /// When `None`, new databases use format v4 (Hilbert shards + branches).
44    pub format_version: Option<u32>,
45}
46
47impl Default for OpenOptions {
48    fn default() -> Self {
49        Self {
50            io_thread: IoThreadConfig::default(),
51            block_cache_bytes: 10 * 1024 * 1024,
52            format_version: None,
53        }
54    }
55}
56
57impl OpenOptions {
58    /// Open or create a database at `dir` using these options.
59    pub fn open<P: AsRef<Path>>(&self, dir: P) -> io::Result<InfiniteDb> {
60        InfiniteDb::open_with_options(dir, self)
61    }
62}
63
64enum WriteBackend {
65    /// Format v2: single global I/O thread.
66    V2 {
67        queue: WriteQueueSender,
68        io_handle: Mutex<IoThreadHandle>,
69        live_tail: Arc<LiveTailView>,
70    },
71    /// Format v3: one I/O thread per space.
72    V3 {
73        coordinator: SpaceCoordinator,
74    },
75    /// Format v4: Hilbert shards per space + branch overlays.
76    V4 {
77        coordinator: HilbertCoordinator,
78    },
79}
80
81/// Thread-safe embedded database with concurrent reads and fire-and-forget writes.
82pub struct InfiniteDb {
83    root: PathBuf,
84    format_version: u32,
85    pub(crate) store: Arc<BlockStore>,
86    pub(crate) spaces: Arc<RwLock<SpaceRegistry>>,
87    branches: Arc<RwLock<BranchRegistry>>,
88    pub(crate) snapshots: Arc<SnapshotStore>,
89    pub(crate) revision: Arc<AtomicU64>,
90    next_block_id: Arc<AtomicU64>,
91    next_snapshot_id: Arc<AtomicU64>,
92    next_branch_id: Arc<AtomicU64>,
93    pub(crate) branch_overlays: Arc<BranchOverlayStore>,
94    #[cfg(feature = "sync")]
95    conflicts: Arc<crate::infinitedb_sync::conflict_queue::ConflictQueue>,
96    backend: WriteBackend,
97}
98
99impl InfiniteDb {
100    /// Open or create a database at `dir` with default [`OpenOptions`].
101    pub fn open<P: AsRef<Path>>(dir: P) -> io::Result<Self> {
102        OpenOptions::default().open(dir)
103    }
104
105    /// Open or create a database at `dir` with explicit tuning and format version.
106    pub fn open_with_options<P: AsRef<Path>>(dir: P, options: &OpenOptions) -> io::Result<Self> {
107        let root = dir.as_ref().to_path_buf();
108        let store = Arc::new(BlockStore::open_with_cache(
109            root.clone(),
110            options.block_cache_bytes,
111        )?);
112
113        let format_version = match FormatVersion::read_from_meta(&root.join("meta"))? {
114            Some(v) => v.0,
115            None => options.format_version.unwrap_or(FORMAT_VERSION_V4),
116        };
117
118        match format_version {
119            FORMAT_VERSION_V2 | FORMAT_VERSION_V3 | FORMAT_VERSION_V4 => {}
120            other => {
121                return Err(io::Error::new(
122                    io::ErrorKind::InvalidData,
123                    format!("unsupported concurrent format version {other}"),
124                ));
125            }
126        }
127
128        if FormatVersion::read_from_meta(&root.join("meta"))?.is_none() {
129            FormatVersion(format_version).write_to_meta(&root.join("meta"))?;
130            if format_version == FORMAT_VERSION_V2 {
131                std::fs::create_dir_all(root.join("hot"))?;
132                std::fs::create_dir_all(root.join("wal"))?;
133            } else {
134                std::fs::create_dir_all(root.join("spaces"))?;
135            }
136        }
137
138        let branch_overlays = Arc::new(BranchOverlayStore::new());
139        #[cfg(feature = "sync")]
140        let conflicts = Arc::new(crate::infinitedb_sync::conflict_queue::ConflictQueue::open(&root)?);
141
142        let (spaces, branches, snapshots, next_rev, next_block, next_snap, next_branch) =
143            load_meta(&store).unwrap_or_else(default_meta);
144
145        let spaces = Arc::new(RwLock::new(spaces));
146        let branches = Arc::new(RwLock::new(branches));
147        let snapshots = Arc::new(SnapshotStore::new(snapshots));
148        let revision = Arc::new(AtomicU64::new(next_rev));
149        let next_block_id = Arc::new(AtomicU64::new(next_block));
150        let next_snapshot_id = Arc::new(AtomicU64::new(next_snap));
151        let next_branch_id = Arc::new(AtomicU64::new(next_branch));
152
153        if branches.read().get_by_name("main").is_none() {
154            let snap_id = SnapshotId(next_snap);
155            let _ = branches.write().insert(Branch {
156                id: BranchId(1),
157                name: "main".to_string(),
158                head: snap_id,
159                parent: None,
160                forked_at: RevisionId::ZERO,
161            });
162        }
163
164        let backend = if format_version == FORMAT_VERSION_V4 {
165            let coordinator = HilbertCoordinator::new(
166                root.clone(),
167                Arc::clone(&store),
168                Arc::clone(&snapshots),
169                Arc::clone(&branch_overlays),
170                Arc::clone(&spaces),
171                Arc::clone(&next_block_id),
172                Arc::clone(&next_snapshot_id),
173                options.io_thread.clone(),
174            );
175            coordinator.bootstrap_registered_spaces()?;
176            WriteBackend::V4 { coordinator }
177        } else if format_version == FORMAT_VERSION_V3 {
178            let coordinator = SpaceCoordinator::new(
179                root.clone(),
180                Arc::clone(&store),
181                Arc::clone(&snapshots),
182                Arc::clone(&spaces),
183                Arc::clone(&next_block_id),
184                Arc::clone(&next_snapshot_id),
185                options.io_thread.clone(),
186            );
187            coordinator.bootstrap_registered_spaces()?;
188            WriteBackend::V3 { coordinator }
189        } else {
190            let live_tail = Arc::new(LiveTailView::new());
191            let (queue, io_handle) = open_io_pipeline(
192                root.clone(),
193                Arc::clone(&store),
194                Arc::clone(&snapshots),
195                Arc::clone(&live_tail),
196                Arc::clone(&spaces),
197                Arc::clone(&revision),
198                Arc::clone(&next_block_id),
199                Arc::clone(&next_snapshot_id),
200                options.io_thread.clone(),
201            );
202            WriteBackend::V2 {
203                queue,
204                io_handle: Mutex::new(io_handle),
205                live_tail,
206            }
207        };
208
209        Ok(Self {
210            root,
211            format_version,
212            store,
213            spaces,
214            branches,
215            snapshots,
216            revision,
217            next_block_id,
218            next_snapshot_id,
219            next_branch_id,
220            branch_overlays,
221            #[cfg(feature = "sync")]
222            conflicts,
223            backend,
224        })
225    }
226
227    /// Head snapshot pointer for `branch`.
228    pub fn branch_head(&self, branch: BranchId) -> Option<SnapshotId> {
229        self.branches.read().get(branch).map(|b| b.head)
230    }
231
232    /// Resolve a branch id by name.
233    pub fn branch_id(&self, name: &str) -> Option<BranchId> {
234        self.branches.read().get_by_name(name).map(|b| b.id)
235    }
236
237    /// Conflict queue populated during sync replication (requires `sync` feature).
238    #[cfg(feature = "sync")]
239    pub fn conflicts(&self) -> &crate::infinitedb_sync::conflict_queue::ConflictQueue {
240        &self.conflicts
241    }
242
243    /// On-disk format version (2, 3, or 4) for this database directory.
244    pub fn format_version(&self) -> u32 {
245        self.format_version
246    }
247
248    /// Register a new space and persist catalog metadata. Required before writes to that space.
249    pub fn register_space(&self, config: SpaceConfig) -> Result<(), String> {
250        if config.bits_per_dim == 0 {
251            return Err("bits_per_dim must be at least 1".to_string());
252        }
253        if config.dims as u32 * config.bits_per_dim > 128 {
254            return Err(format!(
255                "dims * bits_per_dim must be <= 128 (got {} * {})",
256                config.dims, config.bits_per_dim
257            ));
258        }
259        let space_id = config.id.0;
260        let shard_bits = config.shard_bits;
261        self.spaces
262            .write()
263            .register(config)
264            .map_err(|e| format!("{:?}", e))?;
265        match &self.backend {
266            WriteBackend::V3 { coordinator } => {
267                coordinator
268                    .ensure_space(space_id)
269                    .map_err(|e| e.to_string())?;
270            }
271            WriteBackend::V4 { coordinator } => {
272                let count = crate::engine::hilbert_shard::shard_count(shard_bits);
273                for shard_id in 0..count {
274                    coordinator
275                        .ensure_shard(space_id, shard_id)
276                        .map_err(|e| e.to_string())?;
277                }
278            }
279            WriteBackend::V2 { .. } => {}
280        }
281        self.persist_meta().map_err(|e| e.to_string())?;
282        Ok(())
283    }
284
285    /// Fire-and-forget insert on `main`. Blocks only when the target queue is full.
286    pub fn insert(
287        &self,
288        space: SpaceId,
289        point: DimensionVector,
290        data: Vec<u8>,
291    ) -> io::Result<RevisionId> {
292        self.insert_on_branch(BranchId::MAIN, space, point, data)
293    }
294
295    /// Fire-and-forget insert on a branch (overlay for non-`main` branches).
296    pub fn insert_on_branch(
297        &self,
298        branch: BranchId,
299        space: SpaceId,
300        point: DimensionVector,
301        data: Vec<u8>,
302    ) -> io::Result<RevisionId> {
303        let rev = self.next_revision();
304        let address = Address::new(space, point);
305        let entry = WalEntry::Write {
306            address: address.clone(),
307            revision: rev,
308            data: data.clone(),
309        };
310        let record = Record {
311            address,
312            revision: rev,
313            data,
314            tombstone: false,
315        };
316        let job = WriteJob {
317            branch_id: branch,
318            revision: rev,
319            entry,
320            record,
321        };
322        self.enqueue(job)?;
323        Ok(rev)
324    }
325
326    /// Fire-and-forget delete on `main`.
327    pub fn delete(&self, space: SpaceId, point: DimensionVector) -> io::Result<RevisionId> {
328        self.delete_on_branch(BranchId::MAIN, space, point)
329    }
330
331    /// Fire-and-forget delete on a branch.
332    pub fn delete_on_branch(
333        &self,
334        branch: BranchId,
335        space: SpaceId,
336        point: DimensionVector,
337    ) -> io::Result<RevisionId> {
338        let rev = self.next_revision();
339        let address = Address::new(space, point);
340        let entry = WalEntry::Tombstone {
341            address: address.clone(),
342            revision: rev,
343        };
344        let record = Record {
345            address,
346            revision: rev,
347            data: vec![],
348            tombstone: true,
349        };
350        let job = WriteJob {
351            branch_id: branch,
352            revision: rev,
353            entry,
354            record,
355        };
356        self.enqueue(job)?;
357        Ok(rev)
358    }
359
360    /// Fork a new branch from `from` at the current revision.
361    pub fn create_branch(&self, name: &str, from: BranchId) -> Result<BranchId, String> {
362        let parent = self
363            .branches
364            .read()
365            .get(from)
366            .ok_or_else(|| format!("parent branch {:?} not found", from))?
367            .clone();
368        let id = BranchId(self.next_branch_id.fetch_add(1, Ordering::Relaxed));
369        let forked_at = RevisionId(self.revision.load(Ordering::Relaxed));
370        let branch = Branch {
371            id,
372            name: name.to_string(),
373            head: parent.head,
374            parent: Some(from),
375            forked_at,
376        };
377        self.branches
378            .write()
379            .insert(branch)
380            .map_err(|e| format!("{:?}", e))?;
381        self.persist_meta().map_err(|e| e.to_string())?;
382        Ok(id)
383    }
384
385    /// Three-way merge `source` into `target` (usually `main`).
386    pub fn merge_branch(
387        &self,
388        target: BranchId,
389        source: BranchId,
390        strategy: MergeStrategy,
391        resolver: Option<Box<dyn Fn(MergeConflict) -> Record + Send + Sync>>,
392    ) -> io::Result<MergeResult> {
393        self.sync()?;
394        let ctx = self.query_ctx();
395        let mut result = merge_branches(
396            &self.store,
397            &self.snapshots,
398            ctx.live_tail,
399            ctx.space_tails,
400            ctx.hilbert_tails,
401            &self.branch_overlays,
402            &self.spaces.read(),
403            &self.revision,
404            &self.branches.read(),
405            target,
406            source,
407            strategy,
408            resolver.as_deref(),
409        )?;
410        if strategy == MergeStrategy::Interactive && !result.conflicts.is_empty() {
411            return Ok(result);
412        }
413        let applied = std::mem::take(&mut result.applied_records);
414        for record in applied {
415            if record.tombstone {
416                self.delete_on_branch(target, record.address.space, record.address.point)?;
417            } else {
418                self.insert_on_branch(
419                    target,
420                    record.address.space,
421                    record.address.point,
422                    record.data,
423                )?;
424            }
425        }
426        self.branch_overlays.clear_branch(source);
427        self.sync()?;
428        Ok(result)
429    }
430
431    /// Query `space` through a branch overlay.
432    pub fn query_on_branch(
433        &self,
434        branch: BranchId,
435        space: SpaceId,
436        as_of: Option<RevisionId>,
437    ) -> io::Result<Vec<Record>> {
438        let ctx = self.query_ctx();
439        let branch_id = if branch == BranchId::MAIN {
440            None
441        } else {
442            Some(branch)
443        };
444        query_inner(
445            &self.store,
446            &self.snapshots,
447            ctx.live_tail,
448            ctx.space_tails,
449            &self.spaces.read(),
450            &self.revision,
451            space,
452            None,
453            as_of,
454            false,
455            ctx.hilbert_tails,
456            Some(&self.branch_overlays),
457            branch_id,
458        )
459    }
460
461    /// Enqueue writes across multiple spaces (ordered by space id).
462    ///
463    /// Caller-supplied [`WriteJob::revision`] values must not exceed the global
464    /// revision counter; this method advances the counter to the maximum job revision.
465    pub fn enqueue_batch(&self, jobs: Vec<WriteJob>) -> io::Result<()> {
466        for job in &jobs {
467            self.revision
468                .fetch_max(job.revision.0, Ordering::Relaxed);
469        }
470        let mut main_jobs = Vec::with_capacity(jobs.len());
471        for job in jobs {
472            if job.branch_id != BranchId::MAIN {
473                self.branch_overlays
474                    .append(job.branch_id, job.record.address.space, job.record);
475            } else {
476                main_jobs.push(job);
477            }
478        }
479        if main_jobs.is_empty() {
480            return Ok(());
481        }
482        match &self.backend {
483            WriteBackend::V4 { coordinator } => coordinator.enqueue_batch(main_jobs),
484            WriteBackend::V3 { coordinator } => coordinator.enqueue_batch(main_jobs),
485            WriteBackend::V2 { queue, .. } => {
486                for job in main_jobs {
487                    queue.enqueue_write(job)?;
488                }
489                Ok(())
490            }
491        }
492    }
493
494    /// Query all live records in `space` on `main`, optionally capped at `as_of`.
495    pub fn query(
496        &self,
497        space: SpaceId,
498        as_of: Option<RevisionId>,
499    ) -> io::Result<Vec<Record>> {
500        self.query_on_branch(BranchId::MAIN, space, as_of)
501    }
502
503    /// Bounding-box query on `main`.
504    pub fn query_bbox(
505        &self,
506        space: SpaceId,
507        min: DimensionVector,
508        max: DimensionVector,
509        as_of: Option<RevisionId>,
510    ) -> io::Result<Vec<Record>> {
511        self.query_bbox_on_branch(BranchId::MAIN, space, min, max, as_of)
512    }
513
514    /// Bounding-box query through a branch overlay.
515    pub fn query_bbox_on_branch(
516        &self,
517        branch: BranchId,
518        space: SpaceId,
519        min: DimensionVector,
520        max: DimensionVector,
521        as_of: Option<RevisionId>,
522    ) -> io::Result<Vec<Record>> {
523        let ctx = self.query_ctx();
524        let branch_id = if branch == BranchId::MAIN {
525            None
526        } else {
527            Some(branch)
528        };
529        query_bbox(
530            &self.store,
531            &self.snapshots,
532            ctx.live_tail,
533            ctx.space_tails,
534            &self.spaces.read(),
535            &self.revision,
536            space,
537            min,
538            max,
539            as_of,
540            ctx.hilbert_tails,
541            Some(&self.branch_overlays),
542            branch_id,
543        )
544    }
545
546    /// Flush pending writes for one space to durable storage without syncing all spaces.
547    pub fn flush(&self, space: SpaceId) -> io::Result<()> {
548        match &self.backend {
549            WriteBackend::V4 { coordinator } => coordinator.flush_space(space.0)?,
550            WriteBackend::V3 { coordinator } => coordinator.flush_space(space.0)?,
551            WriteBackend::V2 { queue, .. } => queue.request_flush(space.0)?,
552        }
553        self.persist_meta()
554    }
555
556    /// Flush all write queues and persist metadata. Call after writes to make data queryable.
557    pub fn sync(&self) -> io::Result<()> {
558        match &self.backend {
559            WriteBackend::V4 { coordinator } => coordinator.sync_all()?,
560            WriteBackend::V3 { coordinator } => coordinator.sync_all()?,
561            WriteBackend::V2 { queue, .. } => queue.request_sync()?,
562        }
563        self.persist_meta()
564    }
565
566    /// Current monotonic revision counter (logical write clock).
567    pub fn revision(&self) -> u64 {
568        self.revision.load(Ordering::Relaxed)
569    }
570
571    /// Begin a concurrent read transaction pinned at the current revision.
572    pub fn read(&self) -> crate::concurrent::read_txn::ReadTxn<'_> {
573        crate::concurrent::read_txn::ReadTxn::new(self)
574    }
575
576    /// I/O queue depth and write-path counters across all backend threads.
577    pub fn io_stats(&self) -> IoStats {
578        match &self.backend {
579            WriteBackend::V4 { coordinator } => coordinator.io_stats(),
580            WriteBackend::V3 { coordinator } => coordinator.io_stats(),
581            WriteBackend::V2 { queue, io_handle, .. } => {
582                let handle = io_handle.lock();
583                IoStats {
584                    queue_depth: queue.queued_count(),
585                    direct_writes: handle.direct_writes(),
586                    staged_writes: handle.staged_writes(),
587                    staging_wal_frames: 0,
588                }
589            }
590        }
591    }
592
593    /// Number of I/O shards (1 for format v2, per-space or per-Hilbert-shard for v3/v4).
594    pub fn space_shard_count(&self) -> usize {
595        match &self.backend {
596            WriteBackend::V4 { coordinator } => coordinator.shard_count(),
597            WriteBackend::V3 { coordinator } => coordinator.shard_count(),
598            WriteBackend::V2 { .. } => 1,
599        }
600    }
601
602    pub(crate) fn live_tail_refs(&self) -> (Option<&LiveTailView>, Option<&SpaceLiveTails>) {
603        let ctx = self.query_ctx();
604        (ctx.live_tail, ctx.space_tails)
605    }
606
607    pub(crate) fn query_ctx(&self) -> QueryCtx<'_> {
608        match &self.backend {
609            WriteBackend::V2 { live_tail, .. } => QueryCtx {
610                live_tail: Some(live_tail.as_ref()),
611                space_tails: None,
612                hilbert_tails: None,
613            },
614            WriteBackend::V3 { coordinator } => QueryCtx {
615                live_tail: None,
616                space_tails: Some(coordinator.live_tails()),
617                hilbert_tails: None,
618            },
619            WriteBackend::V4 { coordinator } => QueryCtx {
620                live_tail: None,
621                space_tails: None,
622                hilbert_tails: Some(coordinator.live_tails()),
623            },
624        }
625    }
626
627    fn enqueue(&self, job: WriteJob) -> io::Result<()> {
628        if job.branch_id != BranchId::MAIN {
629            self.branch_overlays
630                .append(job.branch_id, job.record.address.space, job.record);
631            return Ok(());
632        }
633        match &self.backend {
634            WriteBackend::V4 { coordinator } => coordinator.enqueue_write(job),
635            WriteBackend::V3 { coordinator } => coordinator.enqueue_write(job),
636            WriteBackend::V2 { queue, .. } => queue.enqueue_write(job),
637        }
638    }
639
640    fn next_revision(&self) -> RevisionId {
641        RevisionId(self.revision.fetch_add(1, Ordering::Relaxed) + 1)
642    }
643
644    fn persist_meta(&self) -> io::Result<()> {
645        let spaces_bytes = encode_to_vec(&*self.spaces.read(), standard())
646            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
647        self.store.write_meta("spaces.bin", &spaces_bytes)?;
648
649        let branches_bytes = encode_to_vec(&*self.branches.read(), standard())
650            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
651        self.store.write_meta("branches.bin", &branches_bytes)?;
652
653        let snapshots = snapshots_map_for_persist(&self.snapshots);
654        let snapshots_bytes = encode_to_vec(&snapshots, standard())
655            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
656        self.store.write_meta("snapshots.bin", &snapshots_bytes)?;
657
658        let counters: [u64; 4] = [
659            self.revision.load(Ordering::Relaxed),
660            self.next_block_id.load(Ordering::Relaxed),
661            self.next_snapshot_id.load(Ordering::Relaxed),
662            self.next_branch_id.load(Ordering::Relaxed),
663        ];
664        let counters_bytes = encode_to_vec(&counters, standard())
665            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
666        self.store.write_meta("counters.bin", &counters_bytes)?;
667        Ok(())
668    }
669}
670
671impl Drop for InfiniteDb {
672    fn drop(&mut self) {
673        let _ = self.persist_meta();
674        match &self.backend {
675            WriteBackend::V4 { coordinator } => {
676                let _ = coordinator.shutdown_all();
677            }
678            WriteBackend::V3 { coordinator } => {
679                let _ = coordinator.shutdown_all();
680            }
681            WriteBackend::V2 { queue, io_handle, .. } => {
682                let _ = queue.shutdown();
683                let _ = io_handle.lock().join();
684            }
685        }
686    }
687}
688
689type MetaTuple = (
690    SpaceRegistry,
691    BranchRegistry,
692    std::collections::BTreeMap<u64, crate::infinitedb_core::snapshot::Snapshot>,
693    u64,
694    u64,
695    u64,
696    u64,
697);
698
699fn load_meta(store: &BlockStore) -> Option<MetaTuple> {
700    let counters_bytes = store.read_meta("counters.bin").ok()?;
701    let (revision, next_block, next_snapshot, next_branch) =
702        match decode_from_slice::<[u64; 4], _>(&counters_bytes, standard()) {
703            Ok((c, _)) => (c[0], c[1], c[2], c[3]),
704            Err(_) => {
705                let (c, _): ([u64; 3], _) = decode_from_slice(&counters_bytes, standard()).ok()?;
706                (c[0], c[1], c[2], 2)
707            }
708        };
709
710    let spaces_bytes = store.read_meta("spaces.bin").ok()?;
711    let (spaces, _): (SpaceRegistry, _) = decode_from_slice(&spaces_bytes, standard()).ok()?;
712
713    let branches = store
714        .read_meta("branches.bin")
715        .ok()
716        .and_then(|b| decode_from_slice::<BranchRegistry, _>(&b, standard()).ok())
717        .map(|(r, _)| r)
718        .unwrap_or_else(BranchRegistry::new);
719
720    let snapshots = store
721        .read_meta("snapshots.bin")
722        .ok()
723        .and_then(|b| {
724            decode_from_slice::<
725                std::collections::BTreeMap<u64, crate::infinitedb_core::snapshot::Snapshot>,
726                _,
727            >(&b, standard())
728            .ok()
729        })
730        .map(|(m, _)| m)
731        .unwrap_or_default();
732
733    Some((
734        spaces,
735        branches,
736        snapshots,
737        revision,
738        next_block,
739        next_snapshot,
740        next_branch,
741    ))
742}
743
744pub(crate) struct QueryCtx<'a> {
745    pub live_tail: Option<&'a LiveTailView>,
746    pub space_tails: Option<&'a SpaceLiveTails>,
747    pub hilbert_tails: Option<&'a HilbertLiveTails>,
748}
749
750fn default_meta() -> MetaTuple {
751    (
752        SpaceRegistry::new(),
753        BranchRegistry::new(),
754        std::collections::BTreeMap::new(),
755        0,
756        1,
757        1,
758        2,
759    )
760}