uni_store/fork/scope.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! `ForkScope` — read-only state shared by every component of a forked
5//! session.
6//!
7//! A `ForkScope` is owned by a forked `Session`'s `UniInner` and carries
8//! everything `StorageManager` and `SchemaManager` need to resolve fork-
9//! aware reads:
10//!
11//! - `fork_info` — registry record, including the dataset → branch map
12//! used to route Lance reads through the fork's branches.
13//! - `overlay` — `SchemaDelta` merged on top of primary's schema by
14//! `UniInner::at_fork` at construction time.
15//! - `registry` — back-reference for liveness queries; holders are
16//! tracked here so drop refuses while sessions are alive.
17//! - `_holder` — RAII guard that decrements the holder count when the
18//! scope is dropped.
19//!
20//! `fork_info` is wrapped in plain `Arc` (no fork-side mutation today
21//! — datasets only grow through `register_dynamic_branch` which goes
22//! through the registry, not through `fork_info`). `overlay` is wrapped
23//! in `ArcSwap` so fork-local strict-schema additions can be applied
24//! atomically without rebuilding the scope.
25
26// Rust guideline compliant
27
28use std::sync::Arc;
29
30use anyhow::Context;
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use tokio::sync::Mutex as AsyncMutex;
34use uni_common::core::fork::{ForkId, ForkInfo, SchemaDelta};
35use uni_common::core::schema::{EdgeTypeMeta, LabelMeta};
36
37use super::registry::{ForkHolderGuard, ForkRegistryHandle};
38
39/// Phase 5a: tag for the fork-local index registry on `ForkScope`.
40/// Phase 5b extends with `Vector` and `FullText` for lossy fusion.
41///
42/// `#[non_exhaustive]` so additional kinds (e.g. inverted-set,
43/// JSON path) can land additively without breaking match sites.
44#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
45#[non_exhaustive]
46pub enum ForkLocalIndexKind {
47 /// Scalar BTree on a property — union fusion (Phase 5a-impl).
48 ScalarBtree,
49 /// Sorted on a property (range / ORDER BY) — k-way merge fusion (Phase 5a-impl).
50 Sorted,
51 /// VID/UID lookup index — fork-first fusion (Phase 5a-impl).
52 VidUid,
53 /// Vector (IVF/HNSW) index — top-k merge + rerank fusion (Phase 5b).
54 Vector,
55 /// Lance native FTS / inverted index — RRF fusion (Phase 5b).
56 FullText,
57 /// Learned-sparse (SPLADE) dot-product index — `SparseDot` rerank fusion
58 /// (issue #95 Task #4).
59 ///
60 /// v1 retrieval on a fork is a brute-force branch scan re-scored by
61 /// `sparse_dot` (see [`crate::storage::StorageManager::sparse_search`]); this
62 /// variant is a planner/EXPLAIN marker that switches `uni.sparse.query` to the
63 /// fused operator. A dedicated fork-local sparse postings dataset (Approach B)
64 /// is deferred behind the M5 benchmark.
65 Sparse,
66}
67
68/// Read-only scope identifying a forked session.
69///
70/// Constructed by `Session::fork(name).build()` (Day 7) via
71/// [`ForkScope::new`]. Once built, both `fork_info` and `overlay` are
72/// immutable for the scope's lifetime — Phase 1 forks are read-only.
73pub struct ForkScope {
74 fork_id: ForkId,
75 fork_info: Arc<ForkInfo>,
76 /// Schema additions on top of primary's schema. Mutable so that
77 /// `Session::fork_schema()` can introduce fork-local labels and
78 /// edge types without touching primary's `catalog/schema.json`.
79 /// `ArcSwap` makes reads cheap and atomic; the `overlay_lock`
80 /// below serializes the read-modify-write on the persistence side.
81 ///
82 /// # Invariant: fork-origin numeric ids are fork-local (L7)
83 ///
84 /// The overlay is frozen at fork time, so a label/edge-type id minted
85 /// inside a fork (via `max(existing)+1`) does not observe primary's
86 /// later additions and **can collide** with a primary id allocated
87 /// after the fork point. This is benign because nothing trusts a
88 /// fork-origin id across the fork↔primary boundary: promote
89 /// (`uni_fork::diff`) re-creates by NAME, primary re-allocates its own
90 /// id, and storage keys rows by label name. A fork-origin numeric id
91 /// MUST NOT be trusted outside the fork's own view.
92 overlay: Arc<ArcSwap<SchemaDelta>>,
93 /// Serializes overlay updates *within a single fork* so two
94 /// concurrent `add_label_to_overlay` calls don't clobber each
95 /// other's persisted state. Held across the registry PUT and the
96 /// `ArcSwap::store`. Cross-fork updates remain parallel.
97 overlay_lock: Arc<AsyncMutex<()>>,
98 registry: Arc<ForkRegistryHandle>,
99 /// Branches created after fork construction, e.g. by
100 /// [`crate::backend::BranchedBackend`] when the fork's writer
101 /// flushes to a label whose dataset wasn't branched at fork-point.
102 /// Consulted alongside `fork_info.datasets` by [`Self::branch_for`]
103 /// so reads on the same session see writes through the same
104 /// branch that produced them. Persisted out-of-band via
105 /// [`ForkRegistryHandle::register_dataset_branch`] so a restart
106 /// recovers the same mapping.
107 dynamic_branches: Arc<DashMap<String, String>>,
108 /// Phase 5a: per-table row count contributed by this fork's
109 /// writes. Bumped by `BranchedBackend` after each successful
110 /// flush. Read by `IndexRebuildManager` to decide whether to
111 /// schedule a fork-local index build for the table. In-memory
112 /// only — a process restart resets the counter, so the trigger
113 /// re-fires on the next flush. The on-disk row count is the
114 /// ground truth; this counter is only a flush-time accumulator.
115 fragment_counts: Arc<DashMap<String, u64>>,
116 /// Phase 5a: registry of completed fork-local index builds.
117 /// Keyed on `(label, column)`; value is the index kind that was
118 /// built. Read by the planner's `fork_index_exists` check to
119 /// decide whether to emit `FusedIndexScan`. Written by the
120 /// `IndexRebuildManager` after a fork-local build completes.
121 /// In-memory only — a restart re-detects existing fork-local
122 /// indexes by listing the fork's branch directory once at
123 /// `Uni::open` time (Phase 5a uses lazy first-touch detection;
124 /// see `repopulate_indexes_from_disk`).
125 fork_local_indexes: Arc<DashMap<(String, String), ForkLocalIndexKind>>,
126 /// RAII guard. Lifetime-tied to this `ForkScope`. Cloning the
127 /// containing `Arc<ForkScope>` does *not* increment the holder
128 /// count — only the constructor does, via `register_holder`.
129 _holder: ForkHolderGuard,
130}
131
132impl std::fmt::Debug for ForkScope {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 f.debug_struct("ForkScope")
135 .field("fork_id", &self.fork_id)
136 .field("fork_name", &self.fork_info.name)
137 .finish_non_exhaustive()
138 }
139}
140
141impl ForkScope {
142 /// Build a new fork scope, registering a holder on `registry`.
143 ///
144 /// `fork_info` must already be in `Active` status; callers should
145 /// have walked the registry's open-or-create flow before invoking.
146 /// `overlay` is the schema delta loaded from
147 /// `catalog/fork_schemas/{fork_id}.json`.
148 #[must_use]
149 pub fn new(
150 fork_info: Arc<ForkInfo>,
151 overlay: SchemaDelta,
152 registry: Arc<ForkRegistryHandle>,
153 ) -> Self {
154 let holder = registry.register_holder(fork_info.id);
155 Self {
156 fork_id: fork_info.id,
157 fork_info,
158 overlay: Arc::new(ArcSwap::from_pointee(overlay)),
159 overlay_lock: Arc::new(AsyncMutex::new(())),
160 registry,
161 dynamic_branches: Arc::new(DashMap::new()),
162 fragment_counts: Arc::new(DashMap::new()),
163 fork_local_indexes: Arc::new(DashMap::new()),
164 _holder: holder,
165 }
166 }
167
168 /// Phase 5a: record `rows_added` rows newly written through this
169 /// fork to `table_name`. Idempotent under repeated calls — the
170 /// counter is monotonically increasing within a process lifetime.
171 pub fn record_fork_fragment(&self, table_name: &str, rows_added: u64) {
172 if rows_added == 0 {
173 return;
174 }
175 self.fragment_counts
176 .entry(table_name.to_string())
177 .and_modify(|c| *c += rows_added)
178 .or_insert(rows_added);
179 }
180
181 /// Phase 5a: current accumulated row count for `table_name` on
182 /// this fork. Returns 0 if the fork has never written to it.
183 #[must_use]
184 pub fn fragment_count(&self, table_name: &str) -> u64 {
185 self.fragment_counts
186 .get(table_name)
187 .map(|r| *r.value())
188 .unwrap_or(0)
189 }
190
191 /// Phase 5a: snapshot of every (table, count) pair recorded on
192 /// this fork. Used by `IndexRebuildManager` to enumerate build
193 /// candidates each polling tick.
194 #[must_use]
195 pub fn all_fragment_counts(&self) -> Vec<(String, u64)> {
196 self.fragment_counts
197 .iter()
198 .map(|r| (r.key().clone(), *r.value()))
199 .collect()
200 }
201
202 /// Phase 5a: register a completed fork-local index build.
203 /// Called by `IndexRebuildManager` after the build lands on
204 /// the fork's branch.
205 pub fn register_fork_local_index(&self, label: &str, column: &str, kind: ForkLocalIndexKind) {
206 self.fork_local_indexes
207 .insert((label.to_string(), column.to_string()), kind);
208 }
209
210 /// Phase 5a: lookup the fork-local index kind for a `(label,
211 /// column)` pair, if one has been built. Returns `None` when
212 /// the planner should fall back to the inherited primary index
213 /// (or to a plain scan).
214 #[must_use]
215 pub fn fork_local_index(&self, label: &str, column: &str) -> Option<ForkLocalIndexKind> {
216 self.fork_local_indexes
217 .get(&(label.to_string(), column.to_string()))
218 .map(|r| *r.value())
219 }
220
221 /// Phase 5a: snapshot of every registered fork-local index.
222 #[must_use]
223 pub fn all_fork_local_indexes(&self) -> Vec<((String, String), ForkLocalIndexKind)> {
224 self.fork_local_indexes
225 .iter()
226 .map(|r| (r.key().clone(), *r.value()))
227 .collect()
228 }
229
230 /// Stable fork identifier.
231 #[must_use]
232 pub fn fork_id(&self) -> ForkId {
233 self.fork_id
234 }
235
236 /// Fork registry record (cheap `Arc::clone`).
237 #[must_use]
238 pub fn fork_info(&self) -> Arc<ForkInfo> {
239 self.fork_info.clone()
240 }
241
242 /// Parent fork id (Phase 3). `None` ⇒ parent is primary.
243 ///
244 /// Used by `UniInner::at_fork` to walk the ancestor chain for
245 /// overlay composition, and by `BranchedBackend` to route
246 /// on-the-fly dataset creation through the parent's branch.
247 #[must_use]
248 pub fn parent_fork_id(&self) -> Option<ForkId> {
249 self.fork_info.parent_fork_id
250 }
251
252 /// Schema delta to merge on top of primary's schema. Returns a
253 /// snapshot of the current overlay; subsequent
254 /// [`Self::add_label_to_overlay`] calls will not affect the
255 /// returned `Arc`.
256 #[must_use]
257 pub fn overlay(&self) -> Arc<SchemaDelta> {
258 self.overlay.load_full()
259 }
260
261 /// Branch name for a given Lance dataset, if this fork has one.
262 ///
263 /// Used by `StorageManager` dataset factories to route reads.
264 /// Consults both the immutable fork-point datasets map (set by
265 /// `finish_create`) and the dynamic-branches map (populated by
266 /// [`Self::register_dynamic_branch`] when a flush hits a dataset
267 /// that wasn't branched at fork-point). Returns `None` only if no
268 /// branch exists on either side — the BranchedBackend then either
269 /// creates one on the fly or surfaces an error.
270 #[must_use]
271 pub fn branch_for(&self, dataset_name: &str) -> Option<String> {
272 if let Some(b) = self.fork_info.datasets.get(dataset_name) {
273 return Some(b.clone());
274 }
275 self.dynamic_branches
276 .get(dataset_name)
277 .map(|r| r.value().clone())
278 }
279
280 /// Record a branch created after fork-point (e.g. for a dataset
281 /// that didn't exist on primary at fork creation, or for
282 /// compaction-only adjacency tables).
283 ///
284 /// In-memory only; the caller is responsible for persisting via
285 /// [`ForkRegistryHandle::register_dataset_branch`] so a restart
286 /// recovers the same mapping. Idempotent — re-registering an
287 /// existing entry is a no-op.
288 pub fn register_dynamic_branch(&self, dataset: String, branch: String) {
289 self.dynamic_branches.insert(dataset, branch);
290 }
291
292 /// Append a label to the fork-local schema overlay and persist
293 /// the new overlay to disk.
294 ///
295 /// Idempotent: if a label with the same name is already in the
296 /// overlay (or in primary's schema, accessible to the caller via
297 /// the merged `SchemaManager` not consulted here), the append
298 /// still records this entry — callers should check for duplicates
299 /// before invoking. The persistence-then-swap order means a
300 /// failed PUT leaves the in-memory `ArcSwap` untouched and the
301 /// returned error surfaces to the caller.
302 ///
303 /// Concurrency: serialized within a single fork by `overlay_lock`
304 /// so two concurrent appends don't clobber each other's
305 /// persisted state.
306 pub async fn add_label_to_overlay(&self, name: String, meta: LabelMeta) -> anyhow::Result<()> {
307 let _guard = self.overlay_lock.lock().await;
308 let mut next = (**self.overlay.load()).clone();
309 next.added_labels.push((name, meta));
310 self.registry
311 .update_schema_overlay(&self.fork_id, &next)
312 .await
313 .with_context(|| format!("persist schema overlay for fork {}", self.fork_id))?;
314 self.overlay.store(Arc::new(next));
315 Ok(())
316 }
317
318 /// Append an edge type to the fork-local schema overlay and
319 /// persist. Same semantics as [`Self::add_label_to_overlay`].
320 pub async fn add_edge_type_to_overlay(
321 &self,
322 name: String,
323 meta: EdgeTypeMeta,
324 ) -> anyhow::Result<()> {
325 let _guard = self.overlay_lock.lock().await;
326 let mut next = (**self.overlay.load()).clone();
327 next.added_edge_types.push((name, meta));
328 self.registry
329 .update_schema_overlay(&self.fork_id, &next)
330 .await
331 .with_context(|| format!("persist schema overlay for fork {}", self.fork_id))?;
332 self.overlay.store(Arc::new(next));
333 Ok(())
334 }
335
336 /// Registry handle (used by admin paths to e.g. compute holder counts).
337 #[must_use]
338 pub fn registry(&self) -> Arc<ForkRegistryHandle> {
339 self.registry.clone()
340 }
341}