uni-store 2.1.0

Storage layer for Uni graph database - Lance datasets, LSM deltas, and WAL
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team

use crate::runtime::l0::L0Buffer;
use crate::runtime::wal::WriteAheadLog;
use parking_lot::RwLock;
use std::sync::Arc;

/// Per-generation pin marker for snapshot isolation (Component C1).
///
/// Held by exactly two classes: the [`L0Manager`] keeps one clone for the
/// current generation, and every live [`SnapshotView`] holds one. So
/// `Arc::strong_count` on the manager's clone is `1 + (live snapshots of the
/// current generation)`, which [`L0Manager::is_current_pinned`] uses to decide
/// whether a commit must freeze the generation aside before mutating it. The
/// private field stops any other code from minting a token and breaking that
/// invariant.
///
/// Always compiled (so the inert threading types exist in every build); it is
/// only ever *minted* by [`L0Manager::pin_snapshot`], which a transaction calls
/// only when `UniConfig::ssi_enabled` is `true`.
#[derive(Debug)]
pub struct PinToken(());

/// An isolated, reference-counted view of the L0 tier captured at a point in time.
///
/// Reads built from a `SnapshotView` see the L0 generation(s) that were visible
/// at capture, not later commits: while any view of a generation is alive a
/// commit that would mutate it first freezes it aside
/// ([`L0Manager::freeze_current_for_snapshot`]), so the buffers behind `main`
/// and `extra` are never mutated after capture. Dropping the view releases its
/// pin; `Arc` reference counting reclaims a frozen generation once no view holds
/// it. `started_at_version` is captured for the future C2 base-pinning hook and
/// is not yet consulted.
///
/// Always compiled so it can thread through the executor as an inert
/// `Option<SnapshotView>` in every build; it is only ever *constructed* by
/// [`L0Manager::pin_snapshot`], which a transaction calls only when
/// `UniConfig::ssi_enabled` is `true`, so with SSI off the threaded option is
/// always `None`.
#[derive(Clone)]
pub struct SnapshotView {
    /// The pinned main L0 generation at capture time.
    pub main: Arc<RwLock<L0Buffer>>,
    /// Generations being flushed at capture time, read after `main` (oldest visible state).
    pub extra: Vec<Arc<RwLock<L0Buffer>>>,
    /// Pin marker keeping the captured generation freeze-on-commit.
    pin: Arc<PinToken>,
    /// Main-L0 version at capture (the C2 hwm fed into `pinned_storage`).
    pub started_at_version: u64,
    /// C2: a `StorageManager` clone pinned to `started_at_version`
    /// (`StorageManager::pinned_at_version`), so L1 scans filter to
    /// `_version <= started_at_version` and an L0→L1 flush completing
    /// mid-transaction cannot leak post-snapshot rows. Installed by the
    /// transaction at begin (one per transaction — the pinned manager
    /// carries a fresh `AdjacencyManager`); `None` for snapshots taken
    /// without a storage pin.
    pub pinned_storage: Option<Arc<crate::storage::manager::StorageManager>>,
}

impl std::fmt::Debug for SnapshotView {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // Avoid requiring `L0Buffer: Debug` and dumping buffer contents.
        f.debug_struct("SnapshotView")
            .field("extra_generations", &self.extra.len())
            .field("pins", &Arc::strong_count(&self.pin))
            .field("started_at_version", &self.started_at_version)
            .finish_non_exhaustive()
    }
}

pub struct L0Manager {
    // The current active L0 buffer.
    // Outer RwLock protects the Arc (swapping L0s).
    // Inner RwLock protects the L0Buffer content (concurrent reads/writes).
    current: RwLock<Arc<RwLock<L0Buffer>>>,
    // L0 buffers currently being flushed to L1.
    // These remain visible to reads until flush completes successfully.
    // This prevents data loss if L1 writes fail after rotation.
    pending_flush: RwLock<Vec<Arc<RwLock<L0Buffer>>>>,
    // Snapshot-isolation pin token for the current generation (Component C1).
    // Reset on every rotate so a fresh generation starts unpinned. Read/cloned
    // only under the `current` lock so a snapshot captures a buffer and token
    // from the same generation. See `PinToken`.
    current_pin: RwLock<Arc<PinToken>>,
}

impl L0Manager {
    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
        let l0 = L0Buffer::new(start_version, wal);
        Self {
            current: RwLock::new(Arc::new(RwLock::new(l0))),
            pending_flush: RwLock::new(Vec::new()),
            current_pin: RwLock::new(Arc::new(PinToken(()))),
        }
    }

    /// Create a read-only snapshot L0Manager from existing buffers.
    ///
    /// Used by the algorithm execution path to provide L0 visibility
    /// without owning the actual L0 lifecycle (rotation, flush, WAL).
    pub fn from_snapshot(
        current: Arc<RwLock<L0Buffer>>,
        pending_flush: Vec<Arc<RwLock<L0Buffer>>>,
    ) -> Self {
        Self {
            current: RwLock::new(current),
            pending_flush: RwLock::new(pending_flush),
            current_pin: RwLock::new(Arc::new(PinToken(()))),
        }
    }

    /// Get the current L0 buffer.
    pub fn get_current(&self) -> Arc<RwLock<L0Buffer>> {
        self.current.read().clone()
    }

    /// Get all L0 buffers that should be visible to reads.
    /// This includes the current L0 plus any L0s being flushed.
    pub fn get_all_readable(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
        let current = self.get_current();
        let pending = self.pending_flush.read().clone();
        let mut all = vec![current];
        all.extend(pending);
        all
    }

    /// Get L0 buffers currently being flushed (for QueryContext).
    pub fn get_pending_flush(&self) -> Vec<Arc<RwLock<L0Buffer>>> {
        self.pending_flush.read().clone()
    }

    /// Rotate L0. Returns the OLD L0 buffer.
    /// The new L0 is initialized with `next_version` and `new_wal`.
    pub fn rotate(
        &self,
        next_version: u64,
        new_wal: Option<Arc<WriteAheadLog>>,
    ) -> Arc<RwLock<L0Buffer>> {
        let mut guard = self.current.write();
        let old_l0 = guard.clone();

        let new_l0 = L0Buffer::new(next_version, new_wal);
        *guard = Arc::new(RwLock::new(new_l0));

        // A fresh generation starts unpinned. Reset the pin token while still
        // holding the `current` write guard: `pin_snapshot` clones the buffer
        // and token under `current.read()`, so this serializes against it and a
        // snapshot can never capture a buffer/token from different generations.
        *self.current_pin.write() = Arc::new(PinToken(()));

        old_l0
    }

    /// Begin flush: rotate L0 and add old L0 to pending flush list.
    /// The old L0 remains visible to reads until `complete_flush` is called.
    /// Returns the old L0 buffer to be flushed.
    pub fn begin_flush(
        &self,
        next_version: u64,
        new_wal: Option<Arc<WriteAheadLog>>,
    ) -> Arc<RwLock<L0Buffer>> {
        let old_l0 = self.rotate(next_version, new_wal);
        self.pending_flush.write().push(old_l0.clone());
        old_l0
    }

    /// Complete flush: remove the flushed L0 from pending list.
    /// Call this only after L1 writes have succeeded.
    pub fn complete_flush(&self, l0: &Arc<RwLock<L0Buffer>>) {
        let mut pending = self.pending_flush.write();
        pending.retain(|x| !Arc::ptr_eq(x, l0));
    }

    /// Captures an isolated snapshot of the current L0 (strategy D).
    ///
    /// Freezes the current buffer by rotating it aside — writers re-fetch
    /// `get_current()` at write time, so they move to the fresh buffer and can
    /// never mutate the frozen one — and keeps it readable via the pending
    /// list. Returns the `(frozen_main, pending)` pair used to build a
    /// [`QueryContext`] whose reads are isolated from later writes. Capture is
    /// O(1): one empty-buffer allocation and an `Arc` move, with no deep copy.
    ///
    /// The caller must coordinate with the commit path (e.g. hold the writer's
    /// `flush_lock`) so the rotation does not race an in-flight merge into the
    /// current buffer. The frozen generation currently rides the pending-flush
    /// list; a dedicated generation list with reader-count GC is the production
    /// follow-up (see the proposal's open questions).
    ///
    /// [`QueryContext`]: crate::runtime::QueryContext
    pub fn snapshot_isolated(
        &self,
        next_version: u64,
        new_wal: Option<Arc<WriteAheadLog>>,
    ) -> (Arc<RwLock<L0Buffer>>, Vec<Arc<RwLock<L0Buffer>>>) {
        // Capture pending before freezing so the frozen buffer becomes the
        // snapshot's main view rather than one of its pending peers.
        let pending = self.pending_flush.read().clone();
        let frozen = self.rotate(next_version, new_wal);
        // Keep the frozen generation visible to latest (non-snapshot) reads.
        self.pending_flush.write().push(frozen.clone());
        (frozen, pending)
    }

    /// Pins an isolated view of the current L0 tier for a transaction.
    ///
    /// O(1): clones the current buffer handle, the pending-flush set, and the
    /// generation's pin token. No freeze happens here — the current buffer keeps
    /// taking writes; it is frozen aside lazily, and only if still pinned, when a
    /// commit would next mutate it (see [`Self::freeze_current_for_snapshot`] and
    /// [`Self::is_current_pinned`]). Holds the `current` read lock across the
    /// buffer and token clones so both come from the same generation even if a
    /// rotate races. Does not require the writer's `flush_lock`.
    ///
    /// # Examples
    /// ```ignore
    /// let snap = writer.l0_manager().pin_snapshot();
    /// // build a QueryContext from `snap.main` + `snap.extra`
    /// ```
    pub fn pin_snapshot(&self) -> SnapshotView {
        // Hold `current` read across both clones: a concurrent `rotate` needs
        // `current.write()` and resets the pin token under it, so it cannot
        // interleave and split the buffer/token across generations.
        let current_guard = self.current.read();
        let main = current_guard.clone();
        let pin = self.current_pin.read().clone();
        let started_at_version = main.read().current_version;
        let extra = self.pending_flush.read().clone();
        drop(current_guard);
        SnapshotView {
            main,
            extra,
            pin,
            started_at_version,
            pinned_storage: None,
        }
    }

    /// Returns `true` if any live [`SnapshotView`] pins the current generation.
    ///
    /// `strong_count > 1` means a snapshot besides the manager holds the token.
    /// Call under the writer's `flush_lock` at commit so the decision and any
    /// resulting freeze are atomic with respect to the merge.
    pub fn is_current_pinned(&self) -> bool {
        Arc::strong_count(&self.current_pin.read()) > 1
    }

    /// Clones the current (pinned) generation aside so a commit can mutate a
    /// fresh buffer without the pinning snapshots observing the write — lazy
    /// copy-on-write, performed only when [`Self::is_current_pinned`] holds.
    ///
    /// The outgoing buffer — which the pinning [`SnapshotView`]s hold via `main`
    /// — becomes immutable: a deep copy carrying the same data is installed as
    /// the new current, the commit merges into that copy, and the original is
    /// never mutated again. `L0Buffer::clone` drops the WAL handle, so the
    /// original's WAL (already flushed at this commit's WAL step) is handed to
    /// the copy; the frozen original keeps none, as it takes no more writes. The
    /// original is **not** placed on the pending-flush list — it is reclaimed by
    /// `Arc` refcount once the last snapshot drops, so nothing leaks. The new
    /// generation starts unpinned (the pin token is reset). Must be called under
    /// the writer's `flush_lock`, since it swaps the current buffer.
    pub fn freeze_current_for_snapshot(&self) {
        let mut guard = self.current.write();
        let frozen = guard.clone();
        let mut new_buf = frozen.read().clone();
        // Hand the WAL from the now-frozen original to the writable copy.
        new_buf.wal = frozen.write().wal.take();
        *guard = Arc::new(RwLock::new(new_buf));
        // The fresh generation starts unpinned; reset under the `current` write
        // guard (consistent with `rotate`, which a non-clone path would use).
        *self.current_pin.write() = Arc::new(PinToken(()));
    }

    /// Get the minimum WAL LSN across all pending flush L0s.
    /// WAL truncation should not go past this LSN to preserve data for pending flushes.
    /// Returns None if no pending flushes exist.
    pub fn min_pending_wal_lsn(&self) -> Option<u64> {
        let pending = self.pending_flush.read();
        if pending.is_empty() {
            return None;
        }
        pending
            .iter()
            .map(|l0_arc| {
                let l0 = l0_arc.read();
                l0.wal_lsn_at_flush
            })
            .min()
    }
}

#[cfg(test)]
mod snapshot_tests {
    use super::*;
    use crate::runtime::QueryContext;
    use crate::runtime::l0_visibility::lookup_vertex_prop;
    use uni_common::core::id::Vid;
    use uni_common::{Properties, Value};

    fn named(name: &str) -> Properties {
        let mut props = Properties::new();
        props.insert("name".to_string(), Value::String(name.to_string()));
        props
    }

    fn name_of(vid: Vid, ctx: &QueryContext) -> Option<String> {
        match lookup_vertex_prop(vid, "name", Some(ctx)) {
            Some(Value::String(s)) => Some(s),
            _ => None,
        }
    }

    /// A strategy-D snapshot must not observe writes that land after capture,
    /// while a fresh latest view must, and frozen data must stay visible.
    #[test]
    fn snapshot_isolated_from_later_writes() {
        let mgr = L0Manager::new(0, None);
        let alice = Vid::from(1_u64);
        let bob = Vid::from(2_u64);
        let labels = ["Node".to_string()];

        // Pre-snapshot state.
        {
            let current = mgr.get_current();
            let mut guard = current.write();
            guard.insert_vertex_with_labels(alice, named("alice"), &labels);
            guard.insert_vertex_with_labels(bob, named("bob"), &labels);
        }

        // Freeze-rotate snapshot.
        let (frozen, pending) = mgr.snapshot_isolated(1, None);
        let snap = QueryContext::new_with_pending(frozen, None, pending);

        // Post-snapshot write into the fresh current buffer.
        mgr.get_current()
            .write()
            .insert_vertex_with_labels(alice, named("alice2"), &labels);

        // The snapshot is isolated: it still sees the pre-write value.
        assert_eq!(name_of(alice, &snap).as_deref(), Some("alice"));

        // A fresh latest view sees the new value...
        let latest =
            QueryContext::new_with_pending(mgr.get_current(), None, mgr.get_pending_flush());
        assert_eq!(name_of(alice, &latest).as_deref(), Some("alice2"));

        // ...and the untouched vertex remains visible via the frozen generation.
        assert_eq!(name_of(bob, &latest).as_deref(), Some("bob"));
    }

    /// A pin marks the current generation; dropping the snapshot releases it.
    #[test]
    fn pin_marks_current_generation() {
        let mgr = L0Manager::new(0, None);
        assert!(!mgr.is_current_pinned());
        let snap = mgr.pin_snapshot();
        assert!(mgr.is_current_pinned());
        drop(snap);
        assert!(
            !mgr.is_current_pinned(),
            "dropping the snapshot releases the pin"
        );
    }

    /// Clone-on-freeze: after a pinned generation is frozen aside, the snapshot
    /// still observes its captured state while the new generation takes writes,
    /// and the new generation starts unpinned.
    #[test]
    fn clone_freeze_isolates_pinned_snapshot() {
        let mgr = L0Manager::new(0, None);
        let alice = Vid::from(1_u64);
        let labels = ["Node".to_string()];
        mgr.get_current()
            .write()
            .insert_vertex_with_labels(alice, named("alice"), &labels);

        let snap = mgr.pin_snapshot();
        assert!(mgr.is_current_pinned());

        // Commit-equivalent: freeze the pinned generation aside, then mutate the
        // fresh current (where a real commit's merge would land).
        mgr.freeze_current_for_snapshot();
        assert!(
            !mgr.is_current_pinned(),
            "the fresh generation starts unpinned"
        );
        mgr.get_current()
            .write()
            .insert_vertex_with_labels(alice, named("alice2"), &labels);

        // The snapshot still sees the pre-freeze value (isolated).
        let snap_ctx = QueryContext::new_with_pending(snap.main.clone(), None, snap.extra.clone());
        assert_eq!(name_of(alice, &snap_ctx).as_deref(), Some("alice"));

        // A fresh latest view sees the post-freeze value.
        let latest =
            QueryContext::new_with_pending(mgr.get_current(), None, mgr.get_pending_flush());
        assert_eq!(name_of(alice, &latest).as_deref(), Some("alice2"));

        // Dropping the snapshot releases its hold on the frozen generation.
        drop(snap);
    }
}