uni_store/runtime/l0_manager.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::l0::L0Buffer;
5use crate::runtime::wal::WriteAheadLog;
6use parking_lot::RwLock;
7use std::sync::Arc;
8
9/// Per-generation pin marker for snapshot isolation (Component C1).
10///
11/// Held by exactly two classes: the [`L0Manager`] keeps one clone for the
12/// current generation, and every live [`SnapshotView`] holds one. So
13/// `Arc::strong_count` on the manager's clone is `1 + (live snapshots of the
14/// current generation)`, which [`L0Manager::is_current_pinned`] uses to decide
15/// whether a commit must freeze the generation aside before mutating it. The
16/// private field stops any other code from minting a token and breaking that
17/// invariant.
18///
19/// Always compiled (so the inert threading types exist in every build); it is
20/// only ever *minted* by [`L0Manager::pin_snapshot`], which a transaction calls
21/// only when `UniConfig::ssi_enabled` is `true`.
22#[derive(Debug)]
23pub struct PinToken(());
24
25/// An isolated, reference-counted view of the L0 tier captured at a point in time.
26///
27/// Reads built from a `SnapshotView` see the L0 generation(s) that were visible
28/// at capture, not later commits: while any view of a generation is alive a
29/// commit that would mutate it first freezes it aside
30/// ([`L0Manager::freeze_current_for_snapshot`]), so the buffers behind `main`
31/// and `extra` are never mutated after capture. Dropping the view releases its
32/// pin; `Arc` reference counting reclaims a frozen generation once no view holds
33/// it. `started_at_version` is captured for the future C2 base-pinning hook and
34/// is not yet consulted.
35///
36/// Always compiled so it can thread through the executor as an inert
37/// `Option<SnapshotView>` in every build; it is only ever *constructed* by
38/// [`L0Manager::pin_snapshot`], which a transaction calls only when
39/// `UniConfig::ssi_enabled` is `true`, so with SSI off the threaded option is
40/// always `None`.
41#[derive(Clone)]
42pub struct SnapshotView {
43 /// The pinned main L0 generation at capture time.
44 pub main: Arc<RwLock<L0Buffer>>,
45 /// Generations being flushed at capture time, read after `main` (oldest visible state).
46 pub extra: Vec<Arc<RwLock<L0Buffer>>>,
47 /// Pin marker keeping the captured generation freeze-on-commit.
48 pin: Arc<PinToken>,
49 /// Main-L0 version at capture (the C2 hwm fed into `pinned_storage`).
50 pub started_at_version: u64,
51 /// C2: a `StorageManager` clone pinned to `started_at_version`
52 /// (`StorageManager::pinned_at_version`), so L1 scans filter to
53 /// `_version <= started_at_version` and an L0→L1 flush completing
54 /// mid-transaction cannot leak post-snapshot rows. Installed by the
55 /// transaction at begin (one per transaction — the pinned manager
56 /// carries a fresh `AdjacencyManager`); `None` for snapshots taken
57 /// without a storage pin.
58 pub pinned_storage: Option<Arc<crate::storage::manager::StorageManager>>,
59}
60
61impl std::fmt::Debug for SnapshotView {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 // Avoid requiring `L0Buffer: Debug` and dumping buffer contents.
64 f.debug_struct("SnapshotView")
65 .field("extra_generations", &self.extra.len())
66 .field("pins", &Arc::strong_count(&self.pin))
67 .field("started_at_version", &self.started_at_version)
68 .finish_non_exhaustive()
69 }
70}
71
72pub struct L0Manager {
73 // The current active L0 buffer.
74 // Outer RwLock protects the Arc (swapping L0s).
75 // Inner RwLock protects the L0Buffer content (concurrent reads/writes).
76 current: RwLock<Arc<RwLock<L0Buffer>>>,
77 // L0 buffers currently being flushed to L1.
78 // These remain visible to reads until flush completes successfully.
79 // This prevents data loss if L1 writes fail after rotation.
80 pending_flush: RwLock<Vec<Arc<RwLock<L0Buffer>>>>,
81 // Snapshot-isolation pin token for the current generation (Component C1).
82 // Reset on every rotate so a fresh generation starts unpinned. Read/cloned
83 // only under the `current` lock so a snapshot captures a buffer and token
84 // from the same generation. See `PinToken`.
85 current_pin: RwLock<Arc<PinToken>>,
86}
87
88impl L0Manager {
89 pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
90 let l0 = L0Buffer::new(start_version, wal);
91 Self {
92 current: RwLock::new(Arc::new(RwLock::new(l0))),
93 pending_flush: RwLock::new(Vec::new()),
94 current_pin: RwLock::new(Arc::new(PinToken(()))),
95 }
96 }
97
98 /// Create a read-only snapshot L0Manager from existing buffers.
99 ///
100 /// Used by the algorithm execution path to provide L0 visibility
101 /// without owning the actual L0 lifecycle (rotation, flush, WAL).
102 pub fn from_snapshot(
103 current: Arc<RwLock<L0Buffer>>,
104 pending_flush: Vec<Arc<RwLock<L0Buffer>>>,
105 ) -> Self {
106 Self {
107 current: RwLock::new(current),
108 pending_flush: RwLock::new(pending_flush),
109 current_pin: RwLock::new(Arc::new(PinToken(()))),
110 }
111 }
112
113 /// Get the current L0 buffer.
114 pub fn get_current(&self) -> Arc<RwLock<L0Buffer>> {
115 self.current.read().clone()
116 }
117
118 /// Get all L0 buffers that should be visible to reads.
119 /// This includes the current L0 plus any L0s being flushed.
120 pub fn get_all_readable(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
121 let current = self.get_current();
122 let pending = self.pending_flush.read().clone();
123 let mut all = vec![current];
124 all.extend(pending);
125 all
126 }
127
128 /// Get L0 buffers currently being flushed (for QueryContext).
129 pub fn get_pending_flush(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
130 self.pending_flush.read().clone()
131 }
132
133 /// Rotate L0. Returns the OLD L0 buffer.
134 /// The new L0 is initialized with `next_version` and `new_wal`.
135 pub fn rotate(
136 &self,
137 next_version: u64,
138 new_wal: Option<Arc<WriteAheadLog>>,
139 ) -> Arc<RwLock<L0Buffer>> {
140 let mut guard = self.current.write();
141 let old_l0 = guard.clone();
142
143 let new_l0 = L0Buffer::new(next_version, new_wal);
144 *guard = Arc::new(RwLock::new(new_l0));
145
146 // A fresh generation starts unpinned. Reset the pin token while still
147 // holding the `current` write guard: `pin_snapshot` clones the buffer
148 // and token under `current.read()`, so this serializes against it and a
149 // snapshot can never capture a buffer/token from different generations.
150 *self.current_pin.write() = Arc::new(PinToken(()));
151
152 old_l0
153 }
154
155 /// Begin flush: rotate L0 and add old L0 to pending flush list.
156 /// The old L0 remains visible to reads until `complete_flush` is called.
157 /// Returns the old L0 buffer to be flushed.
158 pub fn begin_flush(
159 &self,
160 next_version: u64,
161 new_wal: Option<Arc<WriteAheadLog>>,
162 ) -> Arc<RwLock<L0Buffer>> {
163 let old_l0 = self.rotate(next_version, new_wal);
164 self.pending_flush.write().push(old_l0.clone());
165 old_l0
166 }
167
168 /// Complete flush: remove the flushed L0 from pending list.
169 /// Call this only after L1 writes have succeeded.
170 pub fn complete_flush(&self, l0: &Arc<RwLock<L0Buffer>>) {
171 let mut pending = self.pending_flush.write();
172 pending.retain(|x| !Arc::ptr_eq(x, l0));
173 }
174
175 /// Captures an isolated snapshot of the current L0 (strategy D).
176 ///
177 /// Freezes the current buffer by rotating it aside — writers re-fetch
178 /// `get_current()` at write time, so they move to the fresh buffer and can
179 /// never mutate the frozen one — and keeps it readable via the pending
180 /// list. Returns the `(frozen_main, pending)` pair used to build a
181 /// [`QueryContext`] whose reads are isolated from later writes. Capture is
182 /// O(1): one empty-buffer allocation and an `Arc` move, with no deep copy.
183 ///
184 /// The caller must coordinate with the commit path (e.g. hold the writer's
185 /// `flush_lock`) so the rotation does not race an in-flight merge into the
186 /// current buffer. The frozen generation currently rides the pending-flush
187 /// list; a dedicated generation list with reader-count GC is the production
188 /// follow-up (see the proposal's open questions).
189 ///
190 /// [`QueryContext`]: crate::runtime::QueryContext
191 pub fn snapshot_isolated(
192 &self,
193 next_version: u64,
194 new_wal: Option<Arc<WriteAheadLog>>,
195 ) -> (Arc<RwLock<L0Buffer>>, Vec<Arc<RwLock<L0Buffer>>>) {
196 // Capture pending before freezing so the frozen buffer becomes the
197 // snapshot's main view rather than one of its pending peers.
198 let pending = self.pending_flush.read().clone();
199 let frozen = self.rotate(next_version, new_wal);
200 // Keep the frozen generation visible to latest (non-snapshot) reads.
201 self.pending_flush.write().push(frozen.clone());
202 (frozen, pending)
203 }
204
205 /// Pins an isolated view of the current L0 tier for a transaction.
206 ///
207 /// O(1): clones the current buffer handle, the pending-flush set, and the
208 /// generation's pin token. No freeze happens here — the current buffer keeps
209 /// taking writes; it is frozen aside lazily, and only if still pinned, when a
210 /// commit would next mutate it (see [`Self::freeze_current_for_snapshot`] and
211 /// [`Self::is_current_pinned`]). Holds the `current` read lock across the
212 /// buffer and token clones so both come from the same generation even if a
213 /// rotate races. Does not require the writer's `flush_lock`.
214 ///
215 /// # Examples
216 /// ```ignore
217 /// let snap = writer.l0_manager().pin_snapshot();
218 /// // build a QueryContext from `snap.main` + `snap.extra`
219 /// ```
220 pub fn pin_snapshot(&self) -> SnapshotView {
221 // Hold `current` read across both clones: a concurrent `rotate` needs
222 // `current.write()` and resets the pin token under it, so it cannot
223 // interleave and split the buffer/token across generations.
224 let current_guard = self.current.read();
225 let main = current_guard.clone();
226 let pin = self.current_pin.read().clone();
227 let started_at_version = main.read().current_version;
228 let extra = self.pending_flush.read().clone();
229 drop(current_guard);
230 SnapshotView {
231 main,
232 extra,
233 pin,
234 started_at_version,
235 pinned_storage: None,
236 }
237 }
238
239 /// Returns `true` if any live [`SnapshotView`] pins the current generation.
240 ///
241 /// `strong_count > 1` means a snapshot besides the manager holds the token.
242 /// Call under the writer's `flush_lock` at commit so the decision and any
243 /// resulting freeze are atomic with respect to the merge.
244 pub fn is_current_pinned(&self) -> bool {
245 Arc::strong_count(&self.current_pin.read()) > 1
246 }
247
248 /// Clones the current (pinned) generation aside so a commit can mutate a
249 /// fresh buffer without the pinning snapshots observing the write — lazy
250 /// copy-on-write, performed only when [`Self::is_current_pinned`] holds.
251 ///
252 /// The outgoing buffer — which the pinning [`SnapshotView`]s hold via `main`
253 /// — becomes immutable: a deep copy carrying the same data is installed as
254 /// the new current, the commit merges into that copy, and the original is
255 /// never mutated again. `L0Buffer::clone` drops the WAL handle, so the
256 /// original's WAL (already flushed at this commit's WAL step) is handed to
257 /// the copy; the frozen original keeps none, as it takes no more writes. The
258 /// original is **not** placed on the pending-flush list — it is reclaimed by
259 /// `Arc` refcount once the last snapshot drops, so nothing leaks. The new
260 /// generation starts unpinned (the pin token is reset). Must be called under
261 /// the writer's `flush_lock`, since it swaps the current buffer.
262 pub fn freeze_current_for_snapshot(&self) {
263 let mut guard = self.current.write();
264 let frozen = guard.clone();
265 let mut new_buf = frozen.read().clone();
266 // Hand the WAL from the now-frozen original to the writable copy.
267 new_buf.wal = frozen.write().wal.take();
268 *guard = Arc::new(RwLock::new(new_buf));
269 // The fresh generation starts unpinned; reset under the `current` write
270 // guard (consistent with `rotate`, which a non-clone path would use).
271 *self.current_pin.write() = Arc::new(PinToken(()));
272 }
273
274 /// Get the minimum WAL LSN across all pending flush L0s.
275 /// WAL truncation should not go past this LSN to preserve data for pending flushes.
276 /// Returns None if no pending flushes exist.
277 pub fn min_pending_wal_lsn(&self) -> Option<u64> {
278 let pending = self.pending_flush.read();
279 if pending.is_empty() {
280 return None;
281 }
282 pending
283 .iter()
284 .map(|l0_arc| {
285 let l0 = l0_arc.read();
286 l0.wal_lsn_at_flush
287 })
288 .min()
289 }
290}
291
292#[cfg(test)]
293mod snapshot_tests {
294 use super::*;
295 use crate::runtime::QueryContext;
296 use crate::runtime::l0_visibility::lookup_vertex_prop;
297 use uni_common::core::id::Vid;
298 use uni_common::{Properties, Value};
299
300 fn named(name: &str) -> Properties {
301 let mut props = Properties::new();
302 props.insert("name".to_string(), Value::String(name.to_string()));
303 props
304 }
305
306 fn name_of(vid: Vid, ctx: &QueryContext) -> Option<String> {
307 match lookup_vertex_prop(vid, "name", Some(ctx)) {
308 Some(Value::String(s)) => Some(s),
309 _ => None,
310 }
311 }
312
313 /// A strategy-D snapshot must not observe writes that land after capture,
314 /// while a fresh latest view must, and frozen data must stay visible.
315 #[test]
316 fn snapshot_isolated_from_later_writes() {
317 let mgr = L0Manager::new(0, None);
318 let alice = Vid::from(1_u64);
319 let bob = Vid::from(2_u64);
320 let labels = ["Node".to_string()];
321
322 // Pre-snapshot state.
323 {
324 let current = mgr.get_current();
325 let mut guard = current.write();
326 guard.insert_vertex_with_labels(alice, named("alice"), &labels);
327 guard.insert_vertex_with_labels(bob, named("bob"), &labels);
328 }
329
330 // Freeze-rotate snapshot.
331 let (frozen, pending) = mgr.snapshot_isolated(1, None);
332 let snap = QueryContext::new_with_pending(frozen, None, pending);
333
334 // Post-snapshot write into the fresh current buffer.
335 mgr.get_current()
336 .write()
337 .insert_vertex_with_labels(alice, named("alice2"), &labels);
338
339 // The snapshot is isolated: it still sees the pre-write value.
340 assert_eq!(name_of(alice, &snap).as_deref(), Some("alice"));
341
342 // A fresh latest view sees the new value...
343 let latest =
344 QueryContext::new_with_pending(mgr.get_current(), None, mgr.get_pending_flush());
345 assert_eq!(name_of(alice, &latest).as_deref(), Some("alice2"));
346
347 // ...and the untouched vertex remains visible via the frozen generation.
348 assert_eq!(name_of(bob, &latest).as_deref(), Some("bob"));
349 }
350
351 /// A pin marks the current generation; dropping the snapshot releases it.
352 #[test]
353 fn pin_marks_current_generation() {
354 let mgr = L0Manager::new(0, None);
355 assert!(!mgr.is_current_pinned());
356 let snap = mgr.pin_snapshot();
357 assert!(mgr.is_current_pinned());
358 drop(snap);
359 assert!(
360 !mgr.is_current_pinned(),
361 "dropping the snapshot releases the pin"
362 );
363 }
364
365 /// Clone-on-freeze: after a pinned generation is frozen aside, the snapshot
366 /// still observes its captured state while the new generation takes writes,
367 /// and the new generation starts unpinned.
368 #[test]
369 fn clone_freeze_isolates_pinned_snapshot() {
370 let mgr = L0Manager::new(0, None);
371 let alice = Vid::from(1_u64);
372 let labels = ["Node".to_string()];
373 mgr.get_current()
374 .write()
375 .insert_vertex_with_labels(alice, named("alice"), &labels);
376
377 let snap = mgr.pin_snapshot();
378 assert!(mgr.is_current_pinned());
379
380 // Commit-equivalent: freeze the pinned generation aside, then mutate the
381 // fresh current (where a real commit's merge would land).
382 mgr.freeze_current_for_snapshot();
383 assert!(
384 !mgr.is_current_pinned(),
385 "the fresh generation starts unpinned"
386 );
387 mgr.get_current()
388 .write()
389 .insert_vertex_with_labels(alice, named("alice2"), &labels);
390
391 // The snapshot still sees the pre-freeze value (isolated).
392 let snap_ctx = QueryContext::new_with_pending(snap.main.clone(), None, snap.extra.clone());
393 assert_eq!(name_of(alice, &snap_ctx).as_deref(), Some("alice"));
394
395 // A fresh latest view sees the post-freeze value.
396 let latest =
397 QueryContext::new_with_pending(mgr.get_current(), None, mgr.get_pending_flush());
398 assert_eq!(name_of(alice, &latest).as_deref(), Some("alice2"));
399
400 // Dropping the snapshot releases its hold on the frozen generation.
401 drop(snap);
402 }
403}