uni_store/fork/id_alloc.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Fork-scoped [`IdAllocator`] factory.
5//!
6//! Each fork gets its own VID/EID counter persisted at
7//! `catalog/forks/{fork_id}/id_allocator.json`. This isolates fork
8//! allocations from primary's *concurrent* allocator (avoiding §10
9//! "fork operations don't block primary") while still ensuring fork
10//! VIDs do not collide with primary's existing on-disk data via the
11//! `base_paths` chain.
12//!
13//! # The collision problem
14//!
15//! Naively starting fork allocators at zero produces a read-time
16//! collision: primary's Lance branch (visible to the fork through
17//! `base_paths`) has rows at VIDs that the fork would re-allocate.
18//! Lance read merges prefer branch data, so the fork's first writes
19//! get shadowed by primary's pre-existing rows at the same VID.
20//!
21//! # Resolution: bootstrap from primary's HWM
22//!
23//! At fork-creation time we copy primary's `id_allocator.json` to
24//! `catalog/forks/{fork_id}/id_allocator.json` and load the fork's
25//! allocator from that copy. The fork starts allocating *above*
26//! primary's snapshot HWM, so its first VID does not collide with
27//! any primary row that was on disk at fork-point.
28//!
29//! Subsequent primary allocations may eventually reach the fork's
30//! range; that's a Phase 6 (promotion) concern. UniId is content-
31//! hashed, so user-visible identity stays consistent even when
32//! VIDs collide on the disk layer.
33//!
34//! # Path convention
35//!
36//! - Primary: `id_allocator.json` (under the database root store)
37//! - Fork: `catalog/forks/{fork_id}/id_allocator.json`
38//!
39//! These paths live on the same `ObjectStore` as the primary allocator.
40//! No new store is created per fork.
41
42// Rust guideline compliant
43
44use std::sync::Arc;
45
46use anyhow::Result;
47use object_store::ObjectStore;
48use object_store::path::Path as ObjectStorePath;
49use uni_common::core::fork::ForkId;
50
51use crate::runtime::id_allocator::IdAllocator;
52
53/// Default batch size — matches the primary allocator's default
54/// (`UniBuilder::build` reserves 1000 IDs at a time).
55pub const DEFAULT_FORK_BATCH_SIZE: u64 = 1000;
56
57/// Build the canonical persistence path for a fork's IdAllocator.
58///
59/// Used by both the factory below and the recovery driver in Day 6,
60/// which needs to know where to look for fork-local allocator state
61/// when reconstructing a fork's `Writer` on `Uni::open`.
62#[must_use]
63pub fn id_allocator_path(fork_id: &ForkId) -> ObjectStorePath {
64 ObjectStorePath::from(format!("catalog/forks/{fork_id}/id_allocator.json"))
65}
66
67/// Construct (or reload) a fork-scoped [`IdAllocator`].
68///
69/// Uses the same primary `ObjectStore`, just at a different path.
70/// Reusing the store keeps a single fsync/flush story for the
71/// database; the fork has its own counter file but isn't otherwise
72/// independent.
73///
74/// # Errors
75///
76/// Returns the underlying [`anyhow::Error`] if the store cannot be
77/// read or the persisted manifest is malformed.
78pub async fn new_for_fork(
79 store: Arc<dyn ObjectStore>,
80 fork_id: &ForkId,
81 batch_size: u64,
82) -> Result<IdAllocator> {
83 let path = id_allocator_path(fork_id);
84 IdAllocator::new(store, path, batch_size).await
85}
86
87/// Convenience for callers who want an `Arc<IdAllocator>`.
88///
89/// Day 4's Writer factory will use this directly.
90///
91/// # Errors
92///
93/// Same as [`new_for_fork`].
94pub async fn new_for_fork_arc(
95 store: Arc<dyn ObjectStore>,
96 fork_id: &ForkId,
97 batch_size: u64,
98) -> Result<Arc<IdAllocator>> {
99 Ok(Arc::new(new_for_fork(store, fork_id, batch_size).await?))
100}
101
102/// Bootstrap a fork's allocator file from primary's in-memory HWM.
103///
104/// Writes a fresh `CounterManifest` to
105/// `catalog/forks/{fork_id}/id_allocator.json` whose `next_vid_batch`
106/// and `next_eid_batch` are at least `vid_hwm` and `eid_hwm`. After
107/// this call, [`new_for_fork`] returns an allocator whose first
108/// allocations land *above* primary's HWM at fork-creation time —
109/// preventing the read-time VID collision documented in the module
110/// rustdoc.
111///
112/// Caller obtains the HWMs via [`IdAllocator::current_hwm`] on
113/// primary's allocator. Reading from primary's in-memory state
114/// avoids fragile file-copy across potentially-different
115/// `ObjectStore` roots (primary's allocator file may live on the
116/// database root store while the fork's allocator file lives on the
117/// storage-rooted store).
118///
119/// Idempotent: if the fork's file already exists, this is a no-op.
120/// Called at fork creation (Phase 2 Day 7); never at fork open.
121///
122/// # Errors
123///
124/// - Object-store IO failure on the fork's path.
125pub async fn bootstrap_fork_from_primary_hwm(
126 store: Arc<dyn ObjectStore>,
127 fork_id: &ForkId,
128 vid_hwm: u64,
129 eid_hwm: u64,
130) -> Result<()> {
131 use crate::store_utils::{DEFAULT_TIMEOUT, get_with_timeout, put_with_timeout};
132
133 let target = id_allocator_path(fork_id);
134
135 // Skip if the fork's allocator already exists — bootstrap is a
136 // creation-time operation, not an open-time one.
137 if get_with_timeout(&store, &target, DEFAULT_TIMEOUT)
138 .await
139 .is_ok()
140 {
141 return Ok(());
142 }
143
144 // Build a manifest that sets next_*_batch to primary's HWM. The
145 // fork's IdAllocator constructor sets `current_vid =
146 // manifest.next_vid_batch`, so loading this file produces a fork
147 // allocator that allocates from `vid_hwm` upward.
148 //
149 // We mirror the IdAllocator's persisted format directly.
150 #[derive(serde::Serialize)]
151 struct CounterManifestSnapshot {
152 next_vid_batch: u64,
153 next_eid_batch: u64,
154 }
155 let manifest = CounterManifestSnapshot {
156 next_vid_batch: vid_hwm,
157 next_eid_batch: eid_hwm,
158 };
159 let bytes = serde_json::to_vec_pretty(&manifest)?;
160 put_with_timeout(&store, &target, bytes::Bytes::from(bytes), DEFAULT_TIMEOUT).await?;
161 Ok(())
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use object_store::local::LocalFileSystem;
168 use tempfile::TempDir;
169
170 async fn fresh_store() -> (TempDir, Arc<dyn ObjectStore>) {
171 let dir = TempDir::new().unwrap();
172 let store: Arc<dyn ObjectStore> =
173 Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
174 (dir, store)
175 }
176
177 #[tokio::test]
178 async fn path_includes_fork_id_under_catalog_forks() {
179 let id = ForkId::new();
180 let p = id_allocator_path(&id);
181 let s = p.to_string();
182 assert!(s.starts_with("catalog/forks/"));
183 assert!(s.ends_with("/id_allocator.json"));
184 assert!(s.contains(&id.to_string()));
185 }
186
187 #[tokio::test]
188 async fn fresh_allocator_starts_from_zero() {
189 // Mirrors the primary `IdAllocator`'s `CounterManifest::default`
190 // — VIDs and EIDs are 0-indexed.
191 let (_dir, store) = fresh_store().await;
192 let id = ForkId::new();
193 let alloc = new_for_fork(store, &id, DEFAULT_FORK_BATCH_SIZE)
194 .await
195 .unwrap();
196 let v = alloc.allocate_vid().await.unwrap();
197 assert_eq!(u64::from(v), 0, "fresh fork allocator starts at VID 0");
198 }
199
200 #[tokio::test]
201 async fn two_forks_have_independent_vid_streams() {
202 let (_dir, store) = fresh_store().await;
203 let id_a = ForkId::new();
204 let id_b = ForkId::new();
205
206 let alloc_a = new_for_fork(store.clone(), &id_a, DEFAULT_FORK_BATCH_SIZE)
207 .await
208 .unwrap();
209 let alloc_b = new_for_fork(store.clone(), &id_b, DEFAULT_FORK_BATCH_SIZE)
210 .await
211 .unwrap();
212
213 // Each fork allocates the same VID values — they're independent
214 // namespaces. Promotion (Phase 6) translates fork-local VIDs
215 // via UniId dedup, so collisions are not a hazard.
216 let a_first = alloc_a.allocate_vid().await.unwrap();
217 let b_first = alloc_b.allocate_vid().await.unwrap();
218 assert_eq!(u64::from(a_first), 0);
219 assert_eq!(u64::from(b_first), 0);
220
221 // Pull a few more on each — they continue independently.
222 let a_next = alloc_a.allocate_vid().await.unwrap();
223 let b_next = alloc_b.allocate_vid().await.unwrap();
224 assert_eq!(u64::from(a_next), 1);
225 assert_eq!(u64::from(b_next), 1);
226 }
227
228 #[tokio::test]
229 async fn allocator_persists_across_reopen() {
230 let (_dir, store) = fresh_store().await;
231 let id = ForkId::new();
232
233 // First open: allocate three VIDs (0, 1, 2).
234 {
235 let alloc = new_for_fork(store.clone(), &id, DEFAULT_FORK_BATCH_SIZE)
236 .await
237 .unwrap();
238 for _ in 0..3 {
239 alloc.allocate_vid().await.unwrap();
240 }
241 }
242
243 // Reopen on the same path. The IdAllocator persists batch
244 // reservations, so the next VID jumps to the start of the
245 // reserved batch (>= batch_size) — durability without
246 // per-allocation fsync.
247 let alloc2 = new_for_fork(store, &id, DEFAULT_FORK_BATCH_SIZE)
248 .await
249 .unwrap();
250 let next = alloc2.allocate_vid().await.unwrap();
251 assert!(
252 u64::from(next) >= DEFAULT_FORK_BATCH_SIZE,
253 "reopened allocator must skip past previous batch's HWM; got {}",
254 u64::from(next)
255 );
256 }
257
258 #[tokio::test]
259 async fn primary_id_allocator_unaffected_by_fork_allocator() {
260 // Path isolation: the fork allocator's writes go to
261 // `catalog/forks/{id}/id_allocator.json`, not to the primary's
262 // `id_allocator.json`. Primary's file doesn't exist after a
263 // fork-only session, and a primary IdAllocator opened against
264 // the same store starts from scratch.
265 let (_dir, store) = fresh_store().await;
266 let id = ForkId::new();
267 let fork_alloc = new_for_fork(store.clone(), &id, DEFAULT_FORK_BATCH_SIZE)
268 .await
269 .unwrap();
270 for _ in 0..5 {
271 fork_alloc.allocate_vid().await.unwrap();
272 }
273
274 // Primary allocator opens against the canonical primary path.
275 let primary = IdAllocator::new(
276 store,
277 ObjectStorePath::from("id_allocator.json"),
278 DEFAULT_FORK_BATCH_SIZE,
279 )
280 .await
281 .unwrap();
282 let primary_first = primary.allocate_vid().await.unwrap();
283 assert_eq!(
284 u64::from(primary_first),
285 0,
286 "primary allocator must not see any fork-side allocations"
287 );
288 }
289}