Skip to main content

uni_store/fork/
id_alloc.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Fork-scoped [`IdAllocator`] factory.
5//!
6//! Each fork gets its own VID/EID counter persisted at
7//! `catalog/forks/{fork_id}/id_allocator.json`. This isolates fork
8//! allocations from primary's *concurrent* allocator (avoiding §10
9//! "fork operations don't block primary") while still ensuring fork
10//! VIDs do not collide with primary's existing on-disk data via the
11//! `base_paths` chain.
12//!
13//! # The collision problem
14//!
15//! Naively starting fork allocators at zero produces a read-time
16//! collision: primary's Lance branch (visible to the fork through
17//! `base_paths`) has rows at VIDs that the fork would re-allocate.
18//! Lance read merges prefer branch data, so the fork's first writes
19//! get shadowed by primary's pre-existing rows at the same VID.
20//!
21//! # Resolution: bootstrap from primary's HWM
22//!
23//! At fork-creation time we copy primary's `id_allocator.json` to
24//! `catalog/forks/{fork_id}/id_allocator.json` and load the fork's
25//! allocator from that copy. The fork starts allocating *above*
26//! primary's snapshot HWM, so its first VID does not collide with
27//! any primary row that was on disk at fork-point.
28//!
29//! Subsequent primary allocations may eventually reach the fork's
30//! range; that's a Phase 6 (promotion) concern. UniId is content-
31//! hashed, so user-visible identity stays consistent even when
32//! VIDs collide on the disk layer.
33//!
34//! # Path convention
35//!
36//! - Primary: `id_allocator.json` (under the database root store)
37//! - Fork: `catalog/forks/{fork_id}/id_allocator.json`
38//!
39//! These paths live on the same `ObjectStore` as the primary allocator.
40//! No new store is created per fork.
41
42// Rust guideline compliant
43
44use std::sync::Arc;
45
46use anyhow::Result;
47use object_store::ObjectStore;
48use object_store::path::Path as ObjectStorePath;
49use uni_common::core::fork::ForkId;
50
51use crate::runtime::id_allocator::IdAllocator;
52
53/// Default batch size — matches the primary allocator's default
54/// (`UniBuilder::build` reserves 1000 IDs at a time).
55pub const DEFAULT_FORK_BATCH_SIZE: u64 = 1000;
56
57/// Build the canonical persistence path for a fork's IdAllocator.
58///
59/// Used by both the factory below and the recovery driver in Day 6,
60/// which needs to know where to look for fork-local allocator state
61/// when reconstructing a fork's `Writer` on `Uni::open`.
62#[must_use]
63pub fn id_allocator_path(fork_id: &ForkId) -> ObjectStorePath {
64    ObjectStorePath::from(format!("catalog/forks/{fork_id}/id_allocator.json"))
65}
66
67/// Construct (or reload) a fork-scoped [`IdAllocator`].
68///
69/// Uses the same primary `ObjectStore`, just at a different path.
70/// Reusing the store keeps a single fsync/flush story for the
71/// database; the fork has its own counter file but isn't otherwise
72/// independent.
73///
74/// # Errors
75///
76/// Returns the underlying [`anyhow::Error`] if the store cannot be
77/// read or the persisted manifest is malformed.
78pub async fn new_for_fork(
79    store: Arc<dyn ObjectStore>,
80    fork_id: &ForkId,
81    batch_size: u64,
82) -> Result<IdAllocator> {
83    let path = id_allocator_path(fork_id);
84    IdAllocator::new(store, path, batch_size).await
85}
86
87/// Convenience for callers who want an `Arc<IdAllocator>`.
88///
89/// Day 4's Writer factory will use this directly.
90///
91/// # Errors
92///
93/// Same as [`new_for_fork`].
94pub async fn new_for_fork_arc(
95    store: Arc<dyn ObjectStore>,
96    fork_id: &ForkId,
97    batch_size: u64,
98) -> Result<Arc<IdAllocator>> {
99    Ok(Arc::new(new_for_fork(store, fork_id, batch_size).await?))
100}
101
102/// Bootstrap a fork's allocator file from primary's in-memory HWM.
103///
104/// Writes a fresh `CounterManifest` to
105/// `catalog/forks/{fork_id}/id_allocator.json` whose `next_vid_batch`
106/// and `next_eid_batch` are at least `vid_hwm` and `eid_hwm`. After
107/// this call, [`new_for_fork`] returns an allocator whose first
108/// allocations land *above* primary's HWM at fork-creation time —
109/// preventing the read-time VID collision documented in the module
110/// rustdoc.
111///
112/// Caller obtains the HWMs via [`IdAllocator::current_hwm`] on
113/// primary's allocator. Reading from primary's in-memory state
114/// avoids fragile file-copy across potentially-different
115/// `ObjectStore` roots (primary's allocator file may live on the
116/// database root store while the fork's allocator file lives on the
117/// storage-rooted store).
118///
119/// Idempotent: if the fork's file already exists, this is a no-op.
120/// Called at fork creation (Phase 2 Day 7); never at fork open.
121///
122/// # Errors
123///
124/// - Object-store IO failure on the fork's path.
125pub async fn bootstrap_fork_from_primary_hwm(
126    store: Arc<dyn ObjectStore>,
127    fork_id: &ForkId,
128    vid_hwm: u64,
129    eid_hwm: u64,
130) -> Result<()> {
131    use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, put_with_timeout};
132
133    let target = id_allocator_path(fork_id);
134
135    // Skip if the fork's allocator already exists — bootstrap is a
136    // creation-time operation, not an open-time one.
137    if get_with_timeout(&store, &target, DEFAULT_TIMEOUT)
138        .await
139        .is_ok()
140    {
141        return Ok(());
142    }
143
144    // Build a manifest that sets next_*_batch to primary's HWM. The
145    // fork's IdAllocator constructor sets `current_vid =
146    // manifest.next_vid_batch`, so loading this file produces a fork
147    // allocator that allocates from `vid_hwm` upward.
148    //
149    // We mirror the IdAllocator's persisted format directly.
150    #[derive(serde::Serialize)]
151    struct CounterManifestSnapshot {
152        next_vid_batch: u64,
153        next_eid_batch: u64,
154    }
155    let manifest = CounterManifestSnapshot {
156        next_vid_batch: vid_hwm,
157        next_eid_batch: eid_hwm,
158    };
159    let bytes = serde_json::to_vec_pretty(&manifest)?;
160    put_with_timeout(&store, &target, bytes::Bytes::from(bytes), DEFAULT_TIMEOUT).await?;
161    Ok(())
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use object_store::local::LocalFileSystem;
168    use tempfile::TempDir;
169
170    async fn fresh_store() -> (TempDir, Arc<dyn ObjectStore>) {
171        let dir = TempDir::new().unwrap();
172        let store: Arc<dyn ObjectStore> =
173            Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
174        (dir, store)
175    }
176
177    #[tokio::test]
178    async fn path_includes_fork_id_under_catalog_forks() {
179        let id = ForkId::new();
180        let p = id_allocator_path(&id);
181        let s = p.to_string();
182        assert!(s.starts_with("catalog/forks/"));
183        assert!(s.ends_with("/id_allocator.json"));
184        assert!(s.contains(&id.to_string()));
185    }
186
187    #[tokio::test]
188    async fn fresh_allocator_starts_from_zero() {
189        // Mirrors the primary `IdAllocator`'s `CounterManifest::default`
190        // — VIDs and EIDs are 0-indexed.
191        let (_dir, store) = fresh_store().await;
192        let id = ForkId::new();
193        let alloc = new_for_fork(store, &id, DEFAULT_FORK_BATCH_SIZE)
194            .await
195            .unwrap();
196        let v = alloc.allocate_vid().await.unwrap();
197        assert_eq!(u64::from(v), 0, "fresh fork allocator starts at VID 0");
198    }
199
200    #[tokio::test]
201    async fn two_forks_have_independent_vid_streams() {
202        let (_dir, store) = fresh_store().await;
203        let id_a = ForkId::new();
204        let id_b = ForkId::new();
205
206        let alloc_a = new_for_fork(store.clone(), &id_a, DEFAULT_FORK_BATCH_SIZE)
207            .await
208            .unwrap();
209        let alloc_b = new_for_fork(store.clone(), &id_b, DEFAULT_FORK_BATCH_SIZE)
210            .await
211            .unwrap();
212
213        // Each fork allocates the same VID values — they're independent
214        // namespaces. Promotion (Phase 6) translates fork-local VIDs
215        // via UniId dedup, so collisions are not a hazard.
216        let a_first = alloc_a.allocate_vid().await.unwrap();
217        let b_first = alloc_b.allocate_vid().await.unwrap();
218        assert_eq!(u64::from(a_first), 0);
219        assert_eq!(u64::from(b_first), 0);
220
221        // Pull a few more on each — they continue independently.
222        let a_next = alloc_a.allocate_vid().await.unwrap();
223        let b_next = alloc_b.allocate_vid().await.unwrap();
224        assert_eq!(u64::from(a_next), 1);
225        assert_eq!(u64::from(b_next), 1);
226    }
227
228    #[tokio::test]
229    async fn allocator_persists_across_reopen() {
230        let (_dir, store) = fresh_store().await;
231        let id = ForkId::new();
232
233        // First open: allocate three VIDs (0, 1, 2).
234        {
235            let alloc = new_for_fork(store.clone(), &id, DEFAULT_FORK_BATCH_SIZE)
236                .await
237                .unwrap();
238            for _ in 0..3 {
239                alloc.allocate_vid().await.unwrap();
240            }
241        }
242
243        // Reopen on the same path. The IdAllocator persists batch
244        // reservations, so the next VID jumps to the start of the
245        // reserved batch (>= batch_size) — durability without
246        // per-allocation fsync.
247        let alloc2 = new_for_fork(store, &id, DEFAULT_FORK_BATCH_SIZE)
248            .await
249            .unwrap();
250        let next = alloc2.allocate_vid().await.unwrap();
251        assert!(
252            u64::from(next) >= DEFAULT_FORK_BATCH_SIZE,
253            "reopened allocator must skip past previous batch's HWM; got {}",
254            u64::from(next)
255        );
256    }
257
258    #[tokio::test]
259    async fn primary_id_allocator_unaffected_by_fork_allocator() {
260        // Path isolation: the fork allocator's writes go to
261        // `catalog/forks/{id}/id_allocator.json`, not to the primary's
262        // `id_allocator.json`. Primary's file doesn't exist after a
263        // fork-only session, and a primary IdAllocator opened against
264        // the same store starts from scratch.
265        let (_dir, store) = fresh_store().await;
266        let id = ForkId::new();
267        let fork_alloc = new_for_fork(store.clone(), &id, DEFAULT_FORK_BATCH_SIZE)
268            .await
269            .unwrap();
270        for _ in 0..5 {
271            fork_alloc.allocate_vid().await.unwrap();
272        }
273
274        // Primary allocator opens against the canonical primary path.
275        let primary = IdAllocator::new(
276            store,
277            ObjectStorePath::from("id_allocator.json"),
278            DEFAULT_FORK_BATCH_SIZE,
279        )
280        .await
281        .unwrap();
282        let primary_first = primary.allocate_vid().await.unwrap();
283        assert_eq!(
284            u64::from(primary_first),
285            0,
286            "primary allocator must not see any fork-side allocations"
287        );
288    }
289}