Skip to main content

uni_store/runtime/
occ.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Optimistic concurrency control: commit-time conflict detection (SSI/OCC).
5//!
6//! Commits are already serialized at the Writer's `flush_lock`, which gives the
7//! validate phase a natural critical section. Each transaction captures the
8//! Writer's commit-sequence at begin (`L0Buffer::occ_read_seq`); at commit it
9//! checks its write-set (and, under SSI, read-set) against the write-sets of
10//! every transaction that committed since. A conflict aborts the committer with
11//! a retryable error.
12
13use std::collections::{HashSet, VecDeque};
14
15use crate::runtime::l0::{L0Buffer, OccReadSet, try_as_crdt};
16use crate::runtime::sync::{AtomicU64, Ordering};
17use uni_common::core::id::{Eid, Vid};
18
19/// The set of items a transaction wrote, used for conflict detection.
20#[derive(Debug, Default, Clone)]
21pub struct WriteSet {
22    /// Vertices created, updated, or deleted by the transaction.
23    pub vertices: HashSet<Vid>,
24    /// Edges created, updated, or deleted by the transaction.
25    pub edges: HashSet<Eid>,
26}
27
28impl WriteSet {
29    /// Builds a write-set from a transaction's private L0 buffer.
30    ///
31    /// Item-level granularity: a touched vertex/edge id is a conflict candidate
32    /// regardless of which columns were written (the conservative lost-update
33    /// rule). The one exception is the CRDT carve-out: a vertex whose write
34    /// touched *only* CRDT-mergeable properties — with no delete and no label
35    /// change — is excluded, because `L0Buffer::merge_crdt_properties` will
36    /// commute those writes at commit. This lets concurrent CRDT-counter
37    /// increments to the same vertex both commit (and merge) instead of aborting.
38    ///
39    /// Mixed CRDT+non-CRDT writes, label changes, and deletes stay conflictable
40    /// (their last-writer-wins / structural part can still be lost). Edges are
41    /// always conflictable: every live edge write asserts endpoints/type, which
42    /// is non-commutative topology that no CRDT carve-out can cover.
43    pub fn from_l0(l0: &L0Buffer) -> Self {
44        let mut vertices: HashSet<Vid> = HashSet::new();
45        for (vid, props) in &l0.vertex_properties {
46            if !is_crdt_carveout(l0, vid, props) {
47                vertices.insert(*vid);
48            }
49        }
50        // A delete is never commutative with a concurrent CRDT increment.
51        vertices.extend(l0.vertex_tombstones.iter().copied());
52        // A label-only mutation (`SET n:Label` / `REMOVE n:Label`) is a
53        // structural write — not CRDT-commutative — so the vertex is
54        // conflictable. `vertex_label_overwrites` flags exactly the vids whose
55        // labels were explicitly replaced, so this is precise: a pure-CRDT
56        // increment (no label op) is never flagged and stays carved out.
57        vertices.extend(l0.vertex_label_overwrites.iter().copied());
58
59        let mut edges: HashSet<Eid> = l0.edge_properties.keys().copied().collect();
60        edges.extend(l0.edge_endpoints.keys().copied());
61        edges.extend(l0.tombstones.keys().copied());
62        Self { vertices, edges }
63    }
64
65    /// Returns `true` when the write-set touches nothing (a read-only commit).
66    pub fn is_empty(&self) -> bool {
67        self.vertices.is_empty() && self.edges.is_empty()
68    }
69
70    /// Returns `true` when this and `other` write any common vertex or edge.
71    pub fn intersects(&self, other: &WriteSet) -> bool {
72        // Iterate the smaller side for cheaper membership checks.
73        let (small, large) = if self.vertices.len() <= other.vertices.len() {
74            (&self.vertices, &other.vertices)
75        } else {
76            (&other.vertices, &self.vertices)
77        };
78        if small.iter().any(|v| large.contains(v)) {
79            return true;
80        }
81        let (small, large) = if self.edges.len() <= other.edges.len() {
82            (&self.edges, &other.edges)
83        } else {
84            (&other.edges, &self.edges)
85        };
86        small.iter().any(|e| large.contains(e))
87    }
88}
89
90/// Returns `true` when a vertex write is a pure CRDT-mergeable carve-out.
91///
92/// A write qualifies when every property is a CRDT value and the write made no
93/// label change. Such a write commutes at commit via
94/// [`L0Buffer::merge_crdt_properties`], so it is excluded from the write-set to
95/// let concurrent CRDT increments to the same vertex both commit. A
96/// re-asserted/changed non-empty label set is not CRDT-mergeable, and a pure
97/// increment is written with no labels (`&[]`), so it stays eligible. Tombstones
98/// are handled by the caller (a delete never commutes with an increment).
99///
100/// Shared by [`WriteSet::from_l0`] and [`crdt_carveout_overwrite`] so the
101/// carve-out decision and its commit-time soundness check stay identical.
102fn is_crdt_carveout(l0: &L0Buffer, vid: &Vid, props: &uni_common::Properties) -> bool {
103    let label_changed = l0
104        .vertex_labels
105        .get(vid)
106        .is_some_and(|labels| !labels.is_empty());
107    let all_crdt = !props.is_empty() && props.values().all(|v| try_as_crdt(v).is_some());
108    all_crdt && !label_changed
109}
110
111/// A carved-out CRDT write whose committed value is a different CRDT variant.
112///
113/// The write-set carve-out ([`WriteSet::from_l0`]) drops a pure-CRDT vertex
114/// write from conflict detection assuming its merge commutes. That holds only
115/// when the committed value is the *same* CRDT variant. For a different variant,
116/// `merge_crdt_properties` falls through to a last-writer-wins overwrite — a
117/// silent lost update the carve-out would otherwise hide.
118#[derive(Debug)]
119pub struct CrdtVariantConflict {
120    /// The vertex whose carved-out CRDT write would be overwritten.
121    pub vid: Vid,
122    /// The property whose committed CRDT variant differs from the write.
123    pub property: String,
124}
125
126impl std::fmt::Display for CrdtVariantConflict {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(
129            f,
130            "carved-out CRDT write to property {:?} would overwrite a different \
131             committed CRDT variant (a lost update); aborting",
132            self.property
133        )
134    }
135}
136
137/// Detects carved-out CRDT writes that would silently overwrite a committed value.
138///
139/// The write-set carve-out removes pure-CRDT writes from conflict detection, so
140/// this commit-time check (against the merged main L0, under `flush_lock`) is
141/// what keeps the carve-out sound when a property's committed value is a
142/// *different* CRDT variant than the write — the one case `merge_crdt_properties`
143/// would overwrite rather than merge. Returns the first such mismatch, or `None`
144/// when every carved-out write merges cleanly. Declared CRDT properties are
145/// additionally guarded at write time; this also covers undeclared CRDT-shaped
146/// values that bypass that path.
147pub fn crdt_carveout_overwrite(tx_l0: &L0Buffer, main: &L0Buffer) -> Option<CrdtVariantConflict> {
148    for (vid, props) in &tx_l0.vertex_properties {
149        if tx_l0.vertex_tombstones.contains(vid) || !is_crdt_carveout(tx_l0, vid, props) {
150            continue;
151        }
152        let Some(existing_props) = main.vertex_properties.get(vid) else {
153            continue;
154        };
155        for (key, value) in props {
156            let (Some(new_crdt), Some(existing_crdt)) = (
157                try_as_crdt(value),
158                existing_props.get(key).and_then(try_as_crdt),
159            ) else {
160                continue;
161            };
162            if new_crdt.type_name() != existing_crdt.type_name() {
163                return Some(CrdtVariantConflict {
164                    vid: *vid,
165                    property: key.clone(),
166                });
167            }
168        }
169    }
170    None
171}
172
173/// Returns `true` when a committed write touched something the read-set saw.
174fn read_set_intersects(read_set: &OccReadSet, w: &WriteSet) -> bool {
175    read_set.vertices.iter().any(|v| w.vertices.contains(v))
176        || read_set.edges.iter().any(|e| w.edges.contains(e))
177}
178
179/// Outcome of a commit-time conflict check.
180#[derive(Debug)]
181pub enum Conflict {
182    /// A concurrent commit wrote an item this transaction also wrote.
183    WriteWrite { seq: u64 },
184    /// A concurrent commit wrote an item this transaction read (SSI).
185    ReadWrite { seq: u64 },
186    /// The commit history was pruned below this transaction's read sequence,
187    /// so a potential conflict cannot be ruled out; abort conservatively.
188    HistoryTruncated { read_seq: u64, oldest: u64 },
189}
190
191impl std::fmt::Display for Conflict {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        match self {
194            Conflict::WriteWrite { seq } => {
195                write!(f, "write-write conflict with commit sequence {seq}")
196            }
197            Conflict::ReadWrite { seq } => {
198                write!(f, "read-write antidependency with commit sequence {seq}")
199            }
200            Conflict::HistoryTruncated { read_seq, oldest } => write!(
201                f,
202                "commit history truncated below read sequence {read_seq} \
203                 (oldest retained {oldest}); aborting conservatively"
204            ),
205        }
206    }
207}
208
209/// Bounded log of recently-committed write-sets, keyed by commit sequence.
210///
211/// Mutated only under the Writer's `flush_lock`, so it needs no internal
212/// synchronization beyond the `Mutex` the Writer wraps it in.
213#[derive(Debug)]
214pub struct CommitRegistry {
215    entries: VecDeque<(u64, WriteSet)>,
216    capacity: usize,
217}
218
219impl CommitRegistry {
220    /// Creates a registry retaining at most `capacity` recent commits.
221    ///
222    /// # Panics
223    /// Panics if `capacity` is zero (a programming error — the registry must
224    /// retain at least one commit to detect any conflict).
225    pub fn new(capacity: usize) -> Self {
226        assert!(capacity > 0, "CommitRegistry capacity must be non-zero");
227        Self {
228            entries: VecDeque::new(),
229            capacity,
230        }
231    }
232
233    /// Records a committed write-set under `seq`, pruning to capacity.
234    pub fn record(&mut self, seq: u64, write_set: WriteSet) {
235        self.entries.push_back((seq, write_set));
236        while self.entries.len() > self.capacity {
237            self.entries.pop_front();
238        }
239    }
240
241    /// Allocates the next commit sequence and records `write_set` under it,
242    /// returning the assigned sequence.
243    ///
244    /// This is the single shared mutation a committer performs on the OCC state.
245    /// The production commit path and the loom/shuttle models call exactly this
246    /// method, so the checked bump-then-record step cannot drift from production.
247    /// Must be called under the Writer's `flush_lock` (the registry `Mutex`
248    /// serializes the `record`, and the bump is the sole writer of `seq`).
249    pub(crate) fn commit(&mut self, seq: &AtomicU64, write_set: WriteSet) -> u64 {
250        let next = seq.fetch_add(1, Ordering::Relaxed) + 1;
251        self.record(next, write_set);
252        next
253    }
254
255    /// Checks a committing transaction against all commits newer than its read
256    /// sequence. Returns the first [`Conflict`] found, or `None` if it may commit.
257    ///
258    /// `read_set` is `Some` only for SSI read-write transactions; passing `None`
259    /// performs write-set-only (lost-update) detection.
260    pub fn check(
261        &self,
262        read_seq: u64,
263        write_set: &WriteSet,
264        read_set: Option<&OccReadSet>,
265    ) -> Option<Conflict> {
266        // If the oldest retained commit is newer than read_seq+1, commits in the
267        // gap were pruned and cannot be checked — abort conservatively (sound:
268        // never misses a real conflict, at the cost of rare false aborts).
269        if let Some(&(oldest, _)) = self.entries.front()
270            && oldest > read_seq.saturating_add(1)
271        {
272            return Some(Conflict::HistoryTruncated { read_seq, oldest });
273        }
274        for (seq, committed) in &self.entries {
275            if *seq <= read_seq {
276                continue;
277            }
278            if write_set.intersects(committed) {
279                return Some(Conflict::WriteWrite { seq: *seq });
280            }
281            if let Some(rs) = read_set
282                && read_set_intersects(rs, committed)
283            {
284                return Some(Conflict::ReadWrite { seq: *seq });
285            }
286        }
287        None
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    fn ws(vids: &[u64]) -> WriteSet {
296        WriteSet {
297            vertices: vids.iter().map(|&v| Vid::from(v)).collect(),
298            edges: HashSet::new(),
299        }
300    }
301
302    /// An edge-only write-set (mirror of [`ws`] for the edge conflict path).
303    fn es(eids: &[u64]) -> WriteSet {
304        WriteSet {
305            vertices: HashSet::new(),
306            edges: eids.iter().map(|&e| Eid::from(e)).collect(),
307        }
308    }
309
310    #[test]
311    fn disjoint_writes_do_not_conflict() {
312        let mut reg = CommitRegistry::new(16);
313        reg.record(1, ws(&[1, 2]));
314        assert!(reg.check(0, &ws(&[3, 4]), None).is_none());
315    }
316
317    #[test]
318    fn overlapping_write_after_read_seq_conflicts() {
319        let mut reg = CommitRegistry::new(16);
320        reg.record(1, ws(&[1, 2]));
321        // A tx that began at read_seq 0 and writes vertex 2 must abort.
322        assert!(matches!(
323            reg.check(0, &ws(&[2]), None),
324            Some(Conflict::WriteWrite { seq: 1 })
325        ));
326    }
327
328    #[test]
329    fn commit_at_or_before_read_seq_is_ignored() {
330        let mut reg = CommitRegistry::new(16);
331        reg.record(1, ws(&[1]));
332        // A tx that began AFTER commit 1 (read_seq 1) does not conflict with it.
333        assert!(reg.check(1, &ws(&[1]), None).is_none());
334    }
335
336    #[test]
337    fn read_write_antidependency_detected() {
338        let mut reg = CommitRegistry::new(16);
339        reg.record(1, ws(&[5]));
340        let mut rs = OccReadSet::default();
341        rs.vertices.insert(Vid::from(5));
342        assert!(matches!(
343            reg.check(0, &ws(&[99]), Some(&rs)),
344            Some(Conflict::ReadWrite { seq: 1 })
345        ));
346    }
347
348    #[test]
349    fn truncated_history_aborts_conservatively() {
350        let mut reg = CommitRegistry::new(2);
351        reg.record(1, ws(&[1]));
352        reg.record(2, ws(&[2]));
353        reg.record(3, ws(&[3])); // evicts seq 1
354        // A tx with read_seq 0 cannot verify against the evicted seq 1.
355        assert!(matches!(
356            reg.check(0, &ws(&[42]), None),
357            Some(Conflict::HistoryTruncated {
358                read_seq: 0,
359                oldest: 2
360            })
361        ));
362    }
363
364    #[test]
365    fn commit_bumps_sequence_and_records() {
366        let seq = AtomicU64::new(0);
367        let mut reg = CommitRegistry::new(16);
368        // Sequences are allocated 1, 2, … (the bump returns the new value).
369        assert_eq!(reg.commit(&seq, ws(&[1])), 1);
370        assert_eq!(reg.commit(&seq, ws(&[2])), 2);
371        assert_eq!(seq.load(Ordering::Relaxed), 2);
372        // The recorded write to vertex 1 is now visible to a stale committer.
373        assert!(matches!(
374            reg.check(0, &ws(&[1]), None),
375            Some(Conflict::WriteWrite { seq: 1 })
376        ));
377    }
378
379    #[test]
380    fn intersects_detects_overlapping_edges() {
381        // `intersects` must check the edge sets, not just the vertices.
382        assert!(es(&[1, 2]).intersects(&es(&[2, 3])));
383        assert!(!es(&[1, 2]).intersects(&es(&[3, 4])));
384        // Vertices and edges live in separate namespaces: vertex id 1 and edge
385        // id 1 must NOT be mistaken for an overlap.
386        assert!(!ws(&[1]).intersects(&es(&[1])));
387    }
388
389    #[test]
390    fn overlapping_edge_write_after_read_seq_conflicts() {
391        let mut reg = CommitRegistry::new(16);
392        reg.record(1, es(&[10, 11]));
393        // A tx that began at read_seq 0 and writes edge 11 must abort.
394        assert!(matches!(
395            reg.check(0, &es(&[11]), None),
396            Some(Conflict::WriteWrite { seq: 1 })
397        ));
398        // A disjoint edge write does not conflict.
399        assert!(reg.check(0, &es(&[12]), None).is_none());
400    }
401
402    // ── CRDT carve-out (`from_l0`) ───────────────────────────────────────────
403
404    fn vid(n: u64) -> Vid {
405        Vid::from(n)
406    }
407
408    /// A property map with a single GCounter CRDT value under `counter`.
409    fn crdt_props(actor: &str, n: u64) -> uni_common::Properties {
410        let mut gc = uni_crdt::GCounter::new();
411        gc.increment(actor, n);
412        let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GCounter(gc))
413            .unwrap()
414            .into();
415        uni_common::Properties::from([("counter".to_string(), v)])
416    }
417
418    fn int_props(n: i64) -> uni_common::Properties {
419        uni_common::Properties::from([("n".to_string(), uni_common::Value::Int(n))])
420    }
421
422    /// A property map with a single GSet CRDT value under `counter` — a
423    /// *different* CRDT variant than [`crdt_props`]'s GCounter.
424    fn gset_props(item: &str) -> uni_common::Properties {
425        let mut gs = uni_crdt::GSet::new();
426        gs.add(item.to_string());
427        let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GSet(gs))
428            .unwrap()
429            .into();
430        uni_common::Properties::from([("counter".to_string(), v)])
431    }
432
433    #[test]
434    fn crdt_only_write_without_labels_is_carved_out() {
435        let mut buf = L0Buffer::new(0, None);
436        buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
437        // A pure CRDT increment with no label change merges at commit, so it must
438        // not be a conflict candidate — this is what lets concurrent increments
439        // both commit.
440        assert!(!WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
441    }
442
443    #[test]
444    fn non_crdt_write_without_labels_is_conflictable() {
445        let mut buf = L0Buffer::new(0, None);
446        buf.insert_vertex_with_labels(vid(1), int_props(1), &[]);
447        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
448    }
449
450    #[test]
451    fn crdt_write_with_labels_stays_conflictable() {
452        let mut buf = L0Buffer::new(0, None);
453        // A label change is not CRDT-mergeable, so even an otherwise pure CRDT
454        // write stays a conflict candidate.
455        buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &["Counter".to_string()]);
456        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
457    }
458
459    #[test]
460    fn mixed_crdt_and_lww_write_is_conflictable() {
461        let mut buf = L0Buffer::new(0, None);
462        let mut props = crdt_props("a", 5);
463        props.insert("n".to_string(), uni_common::Value::Int(1));
464        buf.insert_vertex_with_labels(vid(1), props, &[]);
465        // The LWW `n` can be lost, so the vertex must stay conflictable.
466        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
467    }
468
469    #[test]
470    fn plain_map_value_is_not_mistaken_for_crdt() {
471        let mut buf = L0Buffer::new(0, None);
472        let map = uni_common::Value::Map(std::collections::HashMap::from([(
473            "x".to_string(),
474            uni_common::Value::Int(1),
475        )]));
476        buf.insert_vertex_with_labels(
477            vid(1),
478            uni_common::Properties::from([("data".to_string(), map)]),
479            &[],
480        );
481        // A non-CRDT map is overwritten (LWW) by `merge_crdt_properties`, so it
482        // must remain conflictable.
483        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
484    }
485
486    #[test]
487    fn tombstoned_vertex_is_conflictable() {
488        let mut buf = L0Buffer::new(0, None);
489        buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
490        buf.delete_vertex(vid(1)).unwrap();
491        // Deletion is not commutative with a concurrent increment.
492        assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
493    }
494
495    // ── CRDT carve-out soundness (`crdt_carveout_overwrite`) ─────────────────
496
497    #[test]
498    fn crdt_carveout_overwrite_detects_variant_mismatch() {
499        // main holds a GCounter; a carved-out write puts a GSet under the same
500        // property. `merge_crdt_properties` would silently overwrite the GCounter
501        // (a lost update the carve-out hid), so this must be flagged.
502        let mut main = L0Buffer::new(0, None);
503        main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
504        let mut tx = L0Buffer::new(0, None);
505        tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
506        let conflict = crdt_carveout_overwrite(&tx, &main).expect("variant mismatch");
507        assert_eq!(conflict.vid, vid(1));
508        assert_eq!(conflict.property, "counter");
509    }
510
511    #[test]
512    fn crdt_carveout_overwrite_allows_same_variant() {
513        // Same CRDT variant merges commutatively — the carve-out is sound, no abort.
514        let mut main = L0Buffer::new(0, None);
515        main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
516        let mut tx = L0Buffer::new(0, None);
517        tx.insert_vertex_with_labels(vid(1), crdt_props("b", 7), &[]);
518        assert!(crdt_carveout_overwrite(&tx, &main).is_none());
519    }
520
521    #[test]
522    fn crdt_carveout_overwrite_allows_new_vertex() {
523        // No committed value to overwrite — the merge just inserts.
524        let main = L0Buffer::new(0, None);
525        let mut tx = L0Buffer::new(0, None);
526        tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
527        assert!(crdt_carveout_overwrite(&tx, &main).is_none());
528    }
529
530    #[test]
531    fn crdt_carveout_overwrite_ignores_conflictable_writes() {
532        // A labelled (non-carved-out) write is already in the write-set and
533        // handled by ordinary conflict detection, so it is not re-flagged here.
534        let mut main = L0Buffer::new(0, None);
535        main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
536        let mut tx = L0Buffer::new(0, None);
537        tx.insert_vertex_with_labels(vid(1), gset_props("x"), &["Counter".to_string()]);
538        assert!(crdt_carveout_overwrite(&tx, &main).is_none());
539    }
540
541    // ── Registry pruning under a long-lived reader ───────────────────────────
542
543    #[test]
544    fn long_lived_reader_within_retained_history_does_not_abort() {
545        // Capacity comfortably holds every commit since the reader's snapshot, so
546        // a long-lived reader (low read_seq) is not falsely aborted by truncation.
547        let mut reg = CommitRegistry::new(16);
548        for seq in 1..=5 {
549            reg.record(seq, ws(&[seq + 100])); // disjoint vids → no real conflict
550        }
551        assert!(reg.check(0, &ws(&[1]), None).is_none());
552    }
553
554    #[test]
555    fn truncated_history_aborts_read_set_txn_conservatively() {
556        // A read-write (SSI) transaction whose snapshot predates evicted commits
557        // also aborts conservatively, not just write-set-only transactions.
558        let mut reg = CommitRegistry::new(2);
559        reg.record(1, ws(&[1]));
560        reg.record(2, ws(&[2]));
561        reg.record(3, ws(&[3])); // evicts seq 1
562        let mut rs = OccReadSet::default();
563        rs.vertices.insert(Vid::from(7));
564        assert!(matches!(
565            reg.check(0, &ws(&[42]), Some(&rs)),
566            Some(Conflict::HistoryTruncated { .. })
567        ));
568    }
569}