Skip to main content

uni_store/runtime/
l0_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::l0::L0Buffer;
5use crate::runtime::wal::WriteAheadLog;
6use parking_lot::RwLock;
7use std::sync::Arc;
8
9/// Per-generation pin marker for snapshot isolation (Component C1).
10///
11/// Held by exactly two classes: the [`L0Manager`] keeps one clone for the
12/// current generation, and every live [`SnapshotView`] holds one. So
13/// `Arc::strong_count` on the manager's clone is `1 + (live snapshots of the
14/// current generation)`, which [`L0Manager::is_current_pinned`] uses to decide
15/// whether a commit must freeze the generation aside before mutating it. The
16/// private field stops any other code from minting a token and breaking that
17/// invariant.
18///
19/// Always compiled (so the inert threading types exist in every build); it is
20/// only ever *minted* by [`L0Manager::pin_snapshot`], which a transaction calls
21/// only when `UniConfig::ssi_enabled` is `true`.
22#[derive(Debug)]
23pub struct PinToken(());
24
25/// An isolated, reference-counted view of the L0 tier captured at a point in time.
26///
27/// Reads built from a `SnapshotView` see the L0 generation(s) that were visible
28/// at capture, not later commits: while any view of a generation is alive a
29/// commit that would mutate it first freezes it aside
30/// ([`L0Manager::freeze_current_for_snapshot`]), so the buffers behind `main`
31/// and `extra` are never mutated after capture. Dropping the view releases its
32/// pin; `Arc` reference counting reclaims a frozen generation once no view holds
33/// it. `started_at_version` is captured for the future C2 base-pinning hook and
34/// is not yet consulted.
35///
36/// Always compiled so it can thread through the executor as an inert
37/// `Option<SnapshotView>` in every build; it is only ever *constructed* by
38/// [`L0Manager::pin_snapshot`], which a transaction calls only when
39/// `UniConfig::ssi_enabled` is `true`, so with SSI off the threaded option is
40/// always `None`.
41#[derive(Clone)]
42pub struct SnapshotView {
43    /// The pinned main L0 generation at capture time.
44    pub main: Arc<RwLock<L0Buffer>>,
45    /// Generations being flushed at capture time, read after `main` (oldest visible state).
46    pub extra: Vec<Arc<RwLock<L0Buffer>>>,
47    /// Pin marker keeping the captured generation freeze-on-commit.
48    pin: Arc<PinToken>,
49    /// Main-L0 version at capture; the hook a future C2 will feed to `open_at`.
50    pub started_at_version: u64,
51}
52
53impl std::fmt::Debug for SnapshotView {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        // Avoid requiring `L0Buffer: Debug` and dumping buffer contents.
56        f.debug_struct("SnapshotView")
57            .field("extra_generations", &self.extra.len())
58            .field("pins", &Arc::strong_count(&self.pin))
59            .field("started_at_version", &self.started_at_version)
60            .finish_non_exhaustive()
61    }
62}
63
64pub struct L0Manager {
65    // The current active L0 buffer.
66    // Outer RwLock protects the Arc (swapping L0s).
67    // Inner RwLock protects the L0Buffer content (concurrent reads/writes).
68    current: RwLock<Arc<RwLock<L0Buffer>>>,
69    // L0 buffers currently being flushed to L1.
70    // These remain visible to reads until flush completes successfully.
71    // This prevents data loss if L1 writes fail after rotation.
72    pending_flush: RwLock<Vec<Arc<RwLock<L0Buffer>>>>,
73    // Snapshot-isolation pin token for the current generation (Component C1).
74    // Reset on every rotate so a fresh generation starts unpinned. Read/cloned
75    // only under the `current` lock so a snapshot captures a buffer and token
76    // from the same generation. See `PinToken`.
77    current_pin: RwLock<Arc<PinToken>>,
78}
79
80impl L0Manager {
81    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
82        let l0 = L0Buffer::new(start_version, wal);
83        Self {
84            current: RwLock::new(Arc::new(RwLock::new(l0))),
85            pending_flush: RwLock::new(Vec::new()),
86            current_pin: RwLock::new(Arc::new(PinToken(()))),
87        }
88    }
89
90    /// Create a read-only snapshot L0Manager from existing buffers.
91    ///
92    /// Used by the algorithm execution path to provide L0 visibility
93    /// without owning the actual L0 lifecycle (rotation, flush, WAL).
94    pub fn from_snapshot(
95        current: Arc<RwLock<L0Buffer>>,
96        pending_flush: Vec<Arc<RwLock<L0Buffer>>>,
97    ) -> Self {
98        Self {
99            current: RwLock::new(current),
100            pending_flush: RwLock::new(pending_flush),
101            current_pin: RwLock::new(Arc::new(PinToken(()))),
102        }
103    }
104
105    /// Get the current L0 buffer.
106    pub fn get_current(&self) -> Arc<RwLock<L0Buffer>> {
107        self.current.read().clone()
108    }
109
110    /// Get all L0 buffers that should be visible to reads.
111    /// This includes the current L0 plus any L0s being flushed.
112    pub fn get_all_readable(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
113        let current = self.get_current();
114        let pending = self.pending_flush.read().clone();
115        let mut all = vec![current];
116        all.extend(pending);
117        all
118    }
119
120    /// Get L0 buffers currently being flushed (for QueryContext).
121    pub fn get_pending_flush(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
122        self.pending_flush.read().clone()
123    }
124
125    /// Rotate L0. Returns the OLD L0 buffer.
126    /// The new L0 is initialized with `next_version` and `new_wal`.
127    pub fn rotate(
128        &self,
129        next_version: u64,
130        new_wal: Option<Arc<WriteAheadLog>>,
131    ) -> Arc<RwLock<L0Buffer>> {
132        let mut guard = self.current.write();
133        let old_l0 = guard.clone();
134
135        let new_l0 = L0Buffer::new(next_version, new_wal);
136        *guard = Arc::new(RwLock::new(new_l0));
137
138        // A fresh generation starts unpinned. Reset the pin token while still
139        // holding the `current` write guard: `pin_snapshot` clones the buffer
140        // and token under `current.read()`, so this serializes against it and a
141        // snapshot can never capture a buffer/token from different generations.
142        *self.current_pin.write() = Arc::new(PinToken(()));
143
144        old_l0
145    }
146
147    /// Begin flush: rotate L0 and add old L0 to pending flush list.
148    /// The old L0 remains visible to reads until `complete_flush` is called.
149    /// Returns the old L0 buffer to be flushed.
150    pub fn begin_flush(
151        &self,
152        next_version: u64,
153        new_wal: Option<Arc<WriteAheadLog>>,
154    ) -> Arc<RwLock<L0Buffer>> {
155        let old_l0 = self.rotate(next_version, new_wal);
156        self.pending_flush.write().push(old_l0.clone());
157        old_l0
158    }
159
160    /// Complete flush: remove the flushed L0 from pending list.
161    /// Call this only after L1 writes have succeeded.
162    pub fn complete_flush(&self, l0: &Arc<RwLock<L0Buffer>>) {
163        let mut pending = self.pending_flush.write();
164        pending.retain(|x| !Arc::ptr_eq(x, l0));
165    }
166
167    /// Captures an isolated snapshot of the current L0 (strategy D).
168    ///
169    /// Freezes the current buffer by rotating it aside — writers re-fetch
170    /// `get_current()` at write time, so they move to the fresh buffer and can
171    /// never mutate the frozen one — and keeps it readable via the pending
172    /// list. Returns the `(frozen_main, pending)` pair used to build a
173    /// [`QueryContext`] whose reads are isolated from later writes. Capture is
174    /// O(1): one empty-buffer allocation and an `Arc` move, with no deep copy.
175    ///
176    /// The caller must coordinate with the commit path (e.g. hold the writer's
177    /// `flush_lock`) so the rotation does not race an in-flight merge into the
178    /// current buffer. The frozen generation currently rides the pending-flush
179    /// list; a dedicated generation list with reader-count GC is the production
180    /// follow-up (see the proposal's open questions).
181    ///
182    /// [`QueryContext`]: crate::runtime::QueryContext
183    pub fn snapshot_isolated(
184        &self,
185        next_version: u64,
186        new_wal: Option<Arc<WriteAheadLog>>,
187    ) -> (Arc<RwLock<L0Buffer>>, Vec<Arc<RwLock<L0Buffer>>>) {
188        // Capture pending before freezing so the frozen buffer becomes the
189        // snapshot's main view rather than one of its pending peers.
190        let pending = self.pending_flush.read().clone();
191        let frozen = self.rotate(next_version, new_wal);
192        // Keep the frozen generation visible to latest (non-snapshot) reads.
193        self.pending_flush.write().push(frozen.clone());
194        (frozen, pending)
195    }
196
197    /// Pins an isolated view of the current L0 tier for a transaction.
198    ///
199    /// O(1): clones the current buffer handle, the pending-flush set, and the
200    /// generation's pin token. No freeze happens here — the current buffer keeps
201    /// taking writes; it is frozen aside lazily, and only if still pinned, when a
202    /// commit would next mutate it (see [`Self::freeze_current_for_snapshot`] and
203    /// [`Self::is_current_pinned`]). Holds the `current` read lock across the
204    /// buffer and token clones so both come from the same generation even if a
205    /// rotate races. Does not require the writer's `flush_lock`.
206    ///
207    /// # Examples
208    /// ```ignore
209    /// let snap = writer.l0_manager().pin_snapshot();
210    /// // build a QueryContext from `snap.main` + `snap.extra`
211    /// ```
212    pub fn pin_snapshot(&self) -> SnapshotView {
213        // Hold `current` read across both clones: a concurrent `rotate` needs
214        // `current.write()` and resets the pin token under it, so it cannot
215        // interleave and split the buffer/token across generations.
216        let current_guard = self.current.read();
217        let main = current_guard.clone();
218        let pin = self.current_pin.read().clone();
219        let started_at_version = main.read().current_version;
220        let extra = self.pending_flush.read().clone();
221        drop(current_guard);
222        SnapshotView {
223            main,
224            extra,
225            pin,
226            started_at_version,
227        }
228    }
229
230    /// Returns `true` if any live [`SnapshotView`] pins the current generation.
231    ///
232    /// `strong_count > 1` means a snapshot besides the manager holds the token.
233    /// Call under the writer's `flush_lock` at commit so the decision and any
234    /// resulting freeze are atomic with respect to the merge.
235    pub fn is_current_pinned(&self) -> bool {
236        Arc::strong_count(&self.current_pin.read()) > 1
237    }
238
239    /// Clones the current (pinned) generation aside so a commit can mutate a
240    /// fresh buffer without the pinning snapshots observing the write — lazy
241    /// copy-on-write, performed only when [`Self::is_current_pinned`] holds.
242    ///
243    /// The outgoing buffer — which the pinning [`SnapshotView`]s hold via `main`
244    /// — becomes immutable: a deep copy carrying the same data is installed as
245    /// the new current, the commit merges into that copy, and the original is
246    /// never mutated again. `L0Buffer::clone` drops the WAL handle, so the
247    /// original's WAL (already flushed at this commit's WAL step) is handed to
248    /// the copy; the frozen original keeps none, as it takes no more writes. The
249    /// original is **not** placed on the pending-flush list — it is reclaimed by
250    /// `Arc` refcount once the last snapshot drops, so nothing leaks. The new
251    /// generation starts unpinned (the pin token is reset). Must be called under
252    /// the writer's `flush_lock`, since it swaps the current buffer.
253    pub fn freeze_current_for_snapshot(&self) {
254        let mut guard = self.current.write();
255        let frozen = guard.clone();
256        let mut new_buf = frozen.read().clone();
257        // Hand the WAL from the now-frozen original to the writable copy.
258        new_buf.wal = frozen.write().wal.take();
259        *guard = Arc::new(RwLock::new(new_buf));
260        // The fresh generation starts unpinned; reset under the `current` write
261        // guard (consistent with `rotate`, which a non-clone path would use).
262        *self.current_pin.write() = Arc::new(PinToken(()));
263    }
264
265    /// Get the minimum WAL LSN across all pending flush L0s.
266    /// WAL truncation should not go past this LSN to preserve data for pending flushes.
267    /// Returns None if no pending flushes exist.
268    pub fn min_pending_wal_lsn(&self) -> Option<u64> {
269        let pending = self.pending_flush.read();
270        if pending.is_empty() {
271            return None;
272        }
273        pending
274            .iter()
275            .map(|l0_arc| {
276                let l0 = l0_arc.read();
277                l0.wal_lsn_at_flush
278            })
279            .min()
280    }
281}
282
283#[cfg(test)]
284mod snapshot_tests {
285    use super::*;
286    use crate::runtime::QueryContext;
287    use crate::runtime::l0_visibility::lookup_vertex_prop;
288    use uni_common::core::id::Vid;
289    use uni_common::{Properties, Value};
290
291    fn named(name: &str) -> Properties {
292        let mut props = Properties::new();
293        props.insert("name".to_string(), Value::String(name.to_string()));
294        props
295    }
296
297    fn name_of(vid: Vid, ctx: &QueryContext) -> Option<String> {
298        match lookup_vertex_prop(vid, "name", Some(ctx)) {
299            Some(Value::String(s)) => Some(s),
300            _ => None,
301        }
302    }
303
304    /// A strategy-D snapshot must not observe writes that land after capture,
305    /// while a fresh latest view must, and frozen data must stay visible.
306    #[test]
307    fn snapshot_isolated_from_later_writes() {
308        let mgr = L0Manager::new(0, None);
309        let alice = Vid::from(1_u64);
310        let bob = Vid::from(2_u64);
311        let labels = ["Node".to_string()];
312
313        // Pre-snapshot state.
314        {
315            let current = mgr.get_current();
316            let mut guard = current.write();
317            guard.insert_vertex_with_labels(alice, named("alice"), &labels);
318            guard.insert_vertex_with_labels(bob, named("bob"), &labels);
319        }
320
321        // Freeze-rotate snapshot.
322        let (frozen, pending) = mgr.snapshot_isolated(1, None);
323        let snap = QueryContext::new_with_pending(frozen, None, pending);
324
325        // Post-snapshot write into the fresh current buffer.
326        mgr.get_current()
327            .write()
328            .insert_vertex_with_labels(alice, named("alice2"), &labels);
329
330        // The snapshot is isolated: it still sees the pre-write value.
331        assert_eq!(name_of(alice, &snap).as_deref(), Some("alice"));
332
333        // A fresh latest view sees the new value...
334        let latest =
335            QueryContext::new_with_pending(mgr.get_current(), None, mgr.get_pending_flush());
336        assert_eq!(name_of(alice, &latest).as_deref(), Some("alice2"));
337
338        // ...and the untouched vertex remains visible via the frozen generation.
339        assert_eq!(name_of(bob, &latest).as_deref(), Some("bob"));
340    }
341
342    /// A pin marks the current generation; dropping the snapshot releases it.
343    #[test]
344    fn pin_marks_current_generation() {
345        let mgr = L0Manager::new(0, None);
346        assert!(!mgr.is_current_pinned());
347        let snap = mgr.pin_snapshot();
348        assert!(mgr.is_current_pinned());
349        drop(snap);
350        assert!(
351            !mgr.is_current_pinned(),
352            "dropping the snapshot releases the pin"
353        );
354    }
355
356    /// Clone-on-freeze: after a pinned generation is frozen aside, the snapshot
357    /// still observes its captured state while the new generation takes writes,
358    /// and the new generation starts unpinned.
359    #[test]
360    fn clone_freeze_isolates_pinned_snapshot() {
361        let mgr = L0Manager::new(0, None);
362        let alice = Vid::from(1_u64);
363        let labels = ["Node".to_string()];
364        mgr.get_current()
365            .write()
366            .insert_vertex_with_labels(alice, named("alice"), &labels);
367
368        let snap = mgr.pin_snapshot();
369        assert!(mgr.is_current_pinned());
370
371        // Commit-equivalent: freeze the pinned generation aside, then mutate the
372        // fresh current (where a real commit's merge would land).
373        mgr.freeze_current_for_snapshot();
374        assert!(
375            !mgr.is_current_pinned(),
376            "the fresh generation starts unpinned"
377        );
378        mgr.get_current()
379            .write()
380            .insert_vertex_with_labels(alice, named("alice2"), &labels);
381
382        // The snapshot still sees the pre-freeze value (isolated).
383        let snap_ctx = QueryContext::new_with_pending(snap.main.clone(), None, snap.extra.clone());
384        assert_eq!(name_of(alice, &snap_ctx).as_deref(), Some("alice"));
385
386        // A fresh latest view sees the post-freeze value.
387        let latest =
388            QueryContext::new_with_pending(mgr.get_current(), None, mgr.get_pending_flush());
389        assert_eq!(name_of(alice, &latest).as_deref(), Some("alice2"));
390
391        // Dropping the snapshot releases its hold on the frozen generation.
392        drop(snap);
393    }
394}