Skip to main content

uni_store/runtime/
occ.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Optimistic concurrency control: commit-time conflict detection (SSI/OCC).
5//!
6//! Commits are already serialized at the Writer's `flush_lock`, which gives the
7//! validate phase a natural critical section. Each transaction captures the
8//! Writer's commit-sequence at begin (`L0Buffer::occ_read_seq`); at commit it
9//! checks its write-set (and, under SSI, read-set) against the write-sets of
10//! every transaction that committed since. A conflict aborts the committer with
11//! a retryable error.
12
13use std::collections::{HashSet, VecDeque};
14
15use crate::runtime::l0::{L0Buffer, OccReadSet, try_as_crdt};
16use uni_common::core::id::{Eid, Vid};
17
18/// The set of items a transaction wrote, used for conflict detection.
19#[derive(Debug, Default, Clone)]
20pub struct WriteSet {
21    /// Vertices created, updated, or deleted by the transaction.
22    pub vertices: HashSet<Vid>,
23    /// Edges created, updated, or deleted by the transaction.
24    pub edges: HashSet<Eid>,
25}
26
27impl WriteSet {
28    /// Builds a write-set from a transaction's private L0 buffer.
29    ///
30    /// Item-level granularity: a touched vertex/edge id is a conflict candidate
31    /// regardless of which columns were written (the conservative lost-update
32    /// rule). The one exception is the CRDT carve-out: a vertex whose write
33    /// touched *only* CRDT-mergeable properties — with no delete and no label
34    /// change — is excluded, because `L0Buffer::merge_crdt_properties` will
35    /// commute those writes at commit. This lets concurrent CRDT-counter
36    /// increments to the same vertex both commit (and merge) instead of aborting.
37    ///
38    /// Mixed CRDT+non-CRDT writes, label changes, and deletes stay conflictable
39    /// (their last-writer-wins / structural part can still be lost). Edges are
40    /// always conflictable: every live edge write asserts endpoints/type, which
41    /// is non-commutative topology that no CRDT carve-out can cover.
42    pub fn from_l0(l0: &L0Buffer) -> Self {
43        let mut vertices: HashSet<Vid> = HashSet::new();
44        for (vid, props) in &l0.vertex_properties {
45            if !is_crdt_carveout(l0, vid, props) {
46                vertices.insert(*vid);
47            }
48        }
49        // A delete is never commutative with a concurrent CRDT increment.
50        vertices.extend(l0.vertex_tombstones.iter().copied());
51        // A label-only mutation (`SET n:Label` / `REMOVE n:Label`) is a
52        // structural write — not CRDT-commutative — so the vertex is
53        // conflictable. `vertex_label_overwrites` flags exactly the vids whose
54        // labels were explicitly replaced, so this is precise: a pure-CRDT
55        // increment (no label op) is never flagged and stays carved out.
56        vertices.extend(l0.vertex_label_overwrites.iter().copied());
57
58        let mut edges: HashSet<Eid> = l0.edge_properties.keys().copied().collect();
59        edges.extend(l0.edge_endpoints.keys().copied());
60        edges.extend(l0.tombstones.keys().copied());
61        Self { vertices, edges }
62    }
63
64    /// Returns `true` when the write-set touches nothing (a read-only commit).
65    pub fn is_empty(&self) -> bool {
66        self.vertices.is_empty() && self.edges.is_empty()
67    }
68
69    /// Returns `true` when this and `other` write any common vertex or edge.
70    pub fn intersects(&self, other: &WriteSet) -> bool {
71        // Iterate the smaller side for cheaper membership checks.
72        let (small, large) = if self.vertices.len() <= other.vertices.len() {
73            (&self.vertices, &other.vertices)
74        } else {
75            (&other.vertices, &self.vertices)
76        };
77        if small.iter().any(|v| large.contains(v)) {
78            return true;
79        }
80        let (small, large) = if self.edges.len() <= other.edges.len() {
81            (&self.edges, &other.edges)
82        } else {
83            (&other.edges, &self.edges)
84        };
85        small.iter().any(|e| large.contains(e))
86    }
87}
88
89/// Returns `true` when a vertex write is a pure CRDT-mergeable carve-out.
90///
91/// A write qualifies when every property is a CRDT value and the write made no
92/// label change. Such a write commutes at commit via
93/// [`L0Buffer::merge_crdt_properties`], so it is excluded from the write-set to
94/// let concurrent CRDT increments to the same vertex both commit. A
95/// re-asserted/changed non-empty label set is not CRDT-mergeable, and a pure
96/// increment is written with no labels (`&[]`), so it stays eligible. Tombstones
97/// are handled by the caller (a delete never commutes with an increment).
98///
99/// Shared by [`WriteSet::from_l0`] and [`crdt_carveout_overwrite`] so the
100/// carve-out decision and its commit-time soundness check stay identical.
101fn is_crdt_carveout(l0: &L0Buffer, vid: &Vid, props: &uni_common::Properties) -> bool {
102    let label_changed = l0
103        .vertex_labels
104        .get(vid)
105        .is_some_and(|labels| !labels.is_empty());
106    let all_crdt = !props.is_empty() && props.values().all(|v| try_as_crdt(v).is_some());
107    all_crdt && !label_changed
108}
109
110/// A carved-out CRDT write whose committed value is a different CRDT variant.
111///
112/// The write-set carve-out ([`WriteSet::from_l0`]) drops a pure-CRDT vertex
113/// write from conflict detection assuming its merge commutes. That holds only
114/// when the committed value is the *same* CRDT variant. For a different variant,
115/// `merge_crdt_properties` falls through to a last-writer-wins overwrite — a
116/// silent lost update the carve-out would otherwise hide.
117#[derive(Debug)]
118pub struct CrdtVariantConflict {
119    /// The vertex whose carved-out CRDT write would be overwritten.
120    pub vid: Vid,
121    /// The property whose committed CRDT variant differs from the write.
122    pub property: String,
123}
124
125impl std::fmt::Display for CrdtVariantConflict {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        write!(
128            f,
129            "carved-out CRDT write to property {:?} would overwrite a different \
130             committed CRDT variant (a lost update); aborting",
131            self.property
132        )
133    }
134}
135
136/// Detects carved-out CRDT writes that would silently overwrite a committed value.
137///
138/// The write-set carve-out removes pure-CRDT writes from conflict detection, so
139/// this commit-time check (against the merged main L0, under `flush_lock`) is
140/// what keeps the carve-out sound when a property's committed value is a
141/// *different* CRDT variant than the write — the one case `merge_crdt_properties`
142/// would overwrite rather than merge. Returns the first such mismatch, or `None`
143/// when every carved-out write merges cleanly. Declared CRDT properties are
144/// additionally guarded at write time; this also covers undeclared CRDT-shaped
145/// values that bypass that path.
146pub fn crdt_carveout_overwrite(tx_l0: &L0Buffer, main: &L0Buffer) -> Option<CrdtVariantConflict> {
147    for (vid, props) in &tx_l0.vertex_properties {
148        if tx_l0.vertex_tombstones.contains(vid) || !is_crdt_carveout(tx_l0, vid, props) {
149            continue;
150        }
151        let Some(existing_props) = main.vertex_properties.get(vid) else {
152            continue;
153        };
154        for (key, value) in props {
155            let (Some(new_crdt), Some(existing_crdt)) = (
156                try_as_crdt(value),
157                existing_props.get(key).and_then(try_as_crdt),
158            ) else {
159                continue;
160            };
161            if new_crdt.type_name() != existing_crdt.type_name() {
162                return Some(CrdtVariantConflict {
163                    vid: *vid,
164                    property: key.clone(),
165                });
166            }
167        }
168    }
169    None
170}
171
172/// Returns `true` when a committed write touched something the read-set saw.
173fn read_set_intersects(read_set: &OccReadSet, w: &WriteSet) -> bool {
174    read_set.vertices.iter().any(|v| w.vertices.contains(v))
175        || read_set.edges.iter().any(|e| w.edges.contains(e))
176}
177
178/// Outcome of a commit-time conflict check.
179#[derive(Debug)]
180pub enum Conflict {
181    /// A concurrent commit wrote an item this transaction also wrote.
182    WriteWrite { seq: u64 },
183    /// A concurrent commit wrote an item this transaction read (SSI).
184    ReadWrite { seq: u64 },
185    /// The commit history was pruned below this transaction's read sequence,
186    /// so a potential conflict cannot be ruled out; abort conservatively.
187    HistoryTruncated { read_seq: u64, oldest: u64 },
188}
189
190impl std::fmt::Display for Conflict {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        match self {
193            Conflict::WriteWrite { seq } => {
194                write!(f, "write-write conflict with commit sequence {seq}")
195            }
196            Conflict::ReadWrite { seq } => {
197                write!(f, "read-write antidependency with commit sequence {seq}")
198            }
199            Conflict::HistoryTruncated { read_seq, oldest } => write!(
200                f,
201                "commit history truncated below read sequence {read_seq} \
202                 (oldest retained {oldest}); aborting conservatively"
203            ),
204        }
205    }
206}
207
208/// Bounded log of recently-committed write-sets, keyed by commit sequence.
209///
210/// Mutated only under the Writer's `flush_lock`, so it needs no internal
211/// synchronization beyond the `Mutex` the Writer wraps it in.
212#[derive(Debug)]
213pub struct CommitRegistry {
214    entries: VecDeque<(u64, WriteSet)>,
215    capacity: usize,
216}
217
218impl CommitRegistry {
219    /// Creates a registry retaining at most `capacity` recent commits.
220    ///
221    /// # Panics
222    /// Panics if `capacity` is zero (a programming error — the registry must
223    /// retain at least one commit to detect any conflict).
224    pub fn new(capacity: usize) -> Self {
225        assert!(capacity > 0, "CommitRegistry capacity must be non-zero");
226        Self {
227            entries: VecDeque::new(),
228            capacity,
229        }
230    }
231
232    /// Records a committed write-set under `seq`, pruning to capacity.
233    pub fn record(&mut self, seq: u64, write_set: WriteSet) {
234        self.entries.push_back((seq, write_set));
235        while self.entries.len() > self.capacity {
236            self.entries.pop_front();
237        }
238    }
239
240    /// Checks a committing transaction against all commits newer than its read
241    /// sequence. Returns the first [`Conflict`] found, or `None` if it may commit.
242    ///
243    /// `read_set` is `Some` only for SSI read-write transactions; passing `None`
244    /// performs write-set-only (lost-update) detection.
245    pub fn check(
246        &self,
247        read_seq: u64,
248        write_set: &WriteSet,
249        read_set: Option<&OccReadSet>,
250    ) -> Option<Conflict> {
251        // If the oldest retained commit is newer than read_seq+1, commits in the
252        // gap were pruned and cannot be checked — abort conservatively (sound:
253        // never misses a real conflict, at the cost of rare false aborts).
254        if let Some(&(oldest, _)) = self.entries.front()
255            && oldest > read_seq.saturating_add(1)
256        {
257            return Some(Conflict::HistoryTruncated { read_seq, oldest });
258        }
259        for (seq, committed) in &self.entries {
260            if *seq <= read_seq {
261                continue;
262            }
263            if write_set.intersects(committed) {
264                return Some(Conflict::WriteWrite { seq: *seq });
265            }
266            if let Some(rs) = read_set
267                && read_set_intersects(rs, committed)
268            {
269                return Some(Conflict::ReadWrite { seq: *seq });
270            }
271        }
272        None
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    fn ws(vids: &[u64]) -> WriteSet {
281        WriteSet {
282            vertices: vids.iter().map(|&v| Vid::from(v)).collect(),
283            edges: HashSet::new(),
284        }
285    }
286
287    #[test]
288    fn disjoint_writes_do_not_conflict() {
289        let mut reg = CommitRegistry::new(16);
290        reg.record(1, ws(&[1, 2]));
291        assert!(reg.check(0, &ws(&[3, 4]), None).is_none());
292    }
293
294    #[test]
295    fn overlapping_write_after_read_seq_conflicts() {
296        let mut reg = CommitRegistry::new(16);
297        reg.record(1, ws(&[1, 2]));
298        // A tx that began at read_seq 0 and writes vertex 2 must abort.
299        assert!(matches!(
300            reg.check(0, &ws(&[2]), None),
301            Some(Conflict::WriteWrite { seq: 1 })
302        ));
303    }
304
305    #[test]
306    fn commit_at_or_before_read_seq_is_ignored() {
307        let mut reg = CommitRegistry::new(16);
308        reg.record(1, ws(&[1]));
309        // A tx that began AFTER commit 1 (read_seq 1) does not conflict with it.
310        assert!(reg.check(1, &ws(&[1]), None).is_none());
311    }
312
313    #[test]
314    fn read_write_antidependency_detected() {
315        let mut reg = CommitRegistry::new(16);
316        reg.record(1, ws(&[5]));
317        let mut rs = OccReadSet::default();
318        rs.vertices.insert(Vid::from(5));
319        assert!(matches!(
320            reg.check(0, &ws(&[99]), Some(&rs)),
321            Some(Conflict::ReadWrite { seq: 1 })
322        ));
323    }
324
325    #[test]
326    fn truncated_history_aborts_conservatively() {
327        let mut reg = CommitRegistry::new(2);
328        reg.record(1, ws(&[1]));
329        reg.record(2, ws(&[2]));
330        reg.record(3, ws(&[3])); // evicts seq 1
331        // A tx with read_seq 0 cannot verify against the evicted seq 1.
332        assert!(matches!(
333            reg.check(0, &ws(&[42]), None),
334            Some(Conflict::HistoryTruncated {
335                read_seq: 0,
336                oldest: 2
337            })
338        ));
339    }
340
341    // ── CRDT carve-out (`from_l0`) ───────────────────────────────────────────
342
343    fn vid(n: u64) -> Vid {
344        Vid::from(n)
345    }
346
347    /// A property map with a single GCounter CRDT value under `counter`.
348    fn crdt_props(actor: &str, n: u64) -> uni_common::Properties {
349        let mut gc = uni_crdt::GCounter::new();
350        gc.increment(actor, n);
351        let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GCounter(gc))
352            .unwrap()
353            .into();
354        uni_common::Properties::from([("counter".to_string(), v)])
355    }
356
357    fn int_props(n: i64) -> uni_common::Properties {
358        uni_common::Properties::from([("n".to_string(), uni_common::Value::Int(n))])
359    }
360
361    /// A property map with a single GSet CRDT value under `counter` — a
362    /// *different* CRDT variant than [`crdt_props`]'s GCounter.
363    fn gset_props(item: &str) -> uni_common::Properties {
364        let mut gs = uni_crdt::GSet::new();
365        gs.add(item.to_string());
366        let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GSet(gs))
367            .unwrap()
368            .into();
369        uni_common::Properties::from([("counter".to_string(), v)])
370    }
371
372    #[test]
373    fn crdt_only_write_without_labels_is_carved_out() {
374        let mut buf = L0Buffer::new(0, None);
375        buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
376        // A pure CRDT increment with no label change merges at commit, so it must
377        // not be a conflict candidate — this is what lets concurrent increments
378        // both commit.
379        assert!(!WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
380    }
381
382    #[test]
383    fn non_crdt_write_without_labels_is_conflictable() {
384        let mut buf = L0Buffer::new(0, None);
385        buf.insert_vertex_with_labels(vid(1), int_props(1), &[]);
386        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
387    }
388
389    #[test]
390    fn crdt_write_with_labels_stays_conflictable() {
391        let mut buf = L0Buffer::new(0, None);
392        // A label change is not CRDT-mergeable, so even an otherwise pure CRDT
393        // write stays a conflict candidate.
394        buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &["Counter".to_string()]);
395        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
396    }
397
398    #[test]
399    fn mixed_crdt_and_lww_write_is_conflictable() {
400        let mut buf = L0Buffer::new(0, None);
401        let mut props = crdt_props("a", 5);
402        props.insert("n".to_string(), uni_common::Value::Int(1));
403        buf.insert_vertex_with_labels(vid(1), props, &[]);
404        // The LWW `n` can be lost, so the vertex must stay conflictable.
405        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
406    }
407
408    #[test]
409    fn plain_map_value_is_not_mistaken_for_crdt() {
410        let mut buf = L0Buffer::new(0, None);
411        let map = uni_common::Value::Map(std::collections::HashMap::from([(
412            "x".to_string(),
413            uni_common::Value::Int(1),
414        )]));
415        buf.insert_vertex_with_labels(
416            vid(1),
417            uni_common::Properties::from([("data".to_string(), map)]),
418            &[],
419        );
420        // A non-CRDT map is overwritten (LWW) by `merge_crdt_properties`, so it
421        // must remain conflictable.
422        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
423    }
424
425    #[test]
426    fn tombstoned_vertex_is_conflictable() {
427        let mut buf = L0Buffer::new(0, None);
428        buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
429        buf.delete_vertex(vid(1)).unwrap();
430        // Deletion is not commutative with a concurrent increment.
431        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
432    }
433
434    // ── CRDT carve-out soundness (`crdt_carveout_overwrite`) ─────────────────
435
436    #[test]
437    fn crdt_carveout_overwrite_detects_variant_mismatch() {
438        // main holds a GCounter; a carved-out write puts a GSet under the same
439        // property. `merge_crdt_properties` would silently overwrite the GCounter
440        // (a lost update the carve-out hid), so this must be flagged.
441        let mut main = L0Buffer::new(0, None);
442        main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
443        let mut tx = L0Buffer::new(0, None);
444        tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
445        let conflict = crdt_carveout_overwrite(&tx, &main).expect("variant mismatch");
446        assert_eq!(conflict.vid, vid(1));
447        assert_eq!(conflict.property, "counter");
448    }
449
450    #[test]
451    fn crdt_carveout_overwrite_allows_same_variant() {
452        // Same CRDT variant merges commutatively — the carve-out is sound, no abort.
453        let mut main = L0Buffer::new(0, None);
454        main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
455        let mut tx = L0Buffer::new(0, None);
456        tx.insert_vertex_with_labels(vid(1), crdt_props("b", 7), &[]);
457        assert!(crdt_carveout_overwrite(&tx, &main).is_none());
458    }
459
460    #[test]
461    fn crdt_carveout_overwrite_allows_new_vertex() {
462        // No committed value to overwrite — the merge just inserts.
463        let main = L0Buffer::new(0, None);
464        let mut tx = L0Buffer::new(0, None);
465        tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
466        assert!(crdt_carveout_overwrite(&tx, &main).is_none());
467    }
468
469    #[test]
470    fn crdt_carveout_overwrite_ignores_conflictable_writes() {
471        // A labelled (non-carved-out) write is already in the write-set and
472        // handled by ordinary conflict detection, so it is not re-flagged here.
473        let mut main = L0Buffer::new(0, None);
474        main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
475        let mut tx = L0Buffer::new(0, None);
476        tx.insert_vertex_with_labels(vid(1), gset_props("x"), &["Counter".to_string()]);
477        assert!(crdt_carveout_overwrite(&tx, &main).is_none());
478    }
479
480    // ── Registry pruning under a long-lived reader ───────────────────────────
481
482    #[test]
483    fn long_lived_reader_within_retained_history_does_not_abort() {
484        // Capacity comfortably holds every commit since the reader's snapshot, so
485        // a long-lived reader (low read_seq) is not falsely aborted by truncation.
486        let mut reg = CommitRegistry::new(16);
487        for seq in 1..=5 {
488            reg.record(seq, ws(&[seq + 100])); // disjoint vids → no real conflict
489        }
490        assert!(reg.check(0, &ws(&[1]), None).is_none());
491    }
492
493    #[test]
494    fn truncated_history_aborts_read_set_txn_conservatively() {
495        // A read-write (SSI) transaction whose snapshot predates evicted commits
496        // also aborts conservatively, not just write-set-only transactions.
497        let mut reg = CommitRegistry::new(2);
498        reg.record(1, ws(&[1]));
499        reg.record(2, ws(&[2]));
500        reg.record(3, ws(&[3])); // evicts seq 1
501        let mut rs = OccReadSet::default();
502        rs.vertices.insert(Vid::from(7));
503        assert!(matches!(
504            reg.check(0, &ws(&[42]), Some(&rs)),
505            Some(Conflict::HistoryTruncated { .. })
506        ));
507    }
508}