1use std::collections::{HashSet, VecDeque};
14
15use crate::runtime::l0::{L0Buffer, OccReadSet, try_as_crdt};
16use crate::runtime::sync::{AtomicU64, Ordering};
17use uni_common::core::id::{Eid, Vid};
18
19#[derive(Debug, Default, Clone)]
21pub struct WriteSet {
22 pub vertices: HashSet<Vid>,
24 pub edges: HashSet<Eid>,
26}
27
28impl WriteSet {
29 pub fn from_l0(l0: &L0Buffer) -> Self {
44 let mut vertices: HashSet<Vid> = HashSet::new();
45 for (vid, props) in &l0.vertex_properties {
46 if !is_crdt_carveout(l0, vid, props) {
47 vertices.insert(*vid);
48 }
49 }
50 vertices.extend(l0.vertex_tombstones.iter().copied());
52 vertices.extend(l0.vertex_label_overwrites.iter().copied());
58
59 let mut edges: HashSet<Eid> = l0.edge_properties.keys().copied().collect();
60 edges.extend(l0.edge_endpoints.keys().copied());
61 edges.extend(l0.tombstones.keys().copied());
62 Self { vertices, edges }
63 }
64
65 pub fn is_empty(&self) -> bool {
67 self.vertices.is_empty() && self.edges.is_empty()
68 }
69
70 pub fn intersects(&self, other: &WriteSet) -> bool {
72 let (small, large) = if self.vertices.len() <= other.vertices.len() {
74 (&self.vertices, &other.vertices)
75 } else {
76 (&other.vertices, &self.vertices)
77 };
78 if small.iter().any(|v| large.contains(v)) {
79 return true;
80 }
81 let (small, large) = if self.edges.len() <= other.edges.len() {
82 (&self.edges, &other.edges)
83 } else {
84 (&other.edges, &self.edges)
85 };
86 small.iter().any(|e| large.contains(e))
87 }
88}
89
90fn is_crdt_carveout(l0: &L0Buffer, vid: &Vid, props: &uni_common::Properties) -> bool {
103 let label_changed = l0
104 .vertex_labels
105 .get(vid)
106 .is_some_and(|labels| !labels.is_empty());
107 let all_crdt = !props.is_empty() && props.values().all(|v| try_as_crdt(v).is_some());
108 all_crdt && !label_changed
109}
110
111#[derive(Debug)]
119pub struct CrdtVariantConflict {
120 pub vid: Vid,
122 pub property: String,
124}
125
126impl std::fmt::Display for CrdtVariantConflict {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 write!(
129 f,
130 "carved-out CRDT write to property {:?} would overwrite a different \
131 committed CRDT variant (a lost update); aborting",
132 self.property
133 )
134 }
135}
136
137pub fn crdt_carveout_overwrite(tx_l0: &L0Buffer, main: &L0Buffer) -> Option<CrdtVariantConflict> {
148 for (vid, props) in &tx_l0.vertex_properties {
149 if tx_l0.vertex_tombstones.contains(vid) || !is_crdt_carveout(tx_l0, vid, props) {
150 continue;
151 }
152 let Some(existing_props) = main.vertex_properties.get(vid) else {
153 continue;
154 };
155 for (key, value) in props {
156 let (Some(new_crdt), Some(existing_crdt)) = (
157 try_as_crdt(value),
158 existing_props.get(key).and_then(try_as_crdt),
159 ) else {
160 continue;
161 };
162 if new_crdt.type_name() != existing_crdt.type_name() {
163 return Some(CrdtVariantConflict {
164 vid: *vid,
165 property: key.clone(),
166 });
167 }
168 }
169 }
170 None
171}
172
173fn read_set_intersects(read_set: &OccReadSet, w: &WriteSet) -> bool {
175 read_set.vertices.iter().any(|v| w.vertices.contains(v))
176 || read_set.edges.iter().any(|e| w.edges.contains(e))
177}
178
179#[derive(Debug)]
181pub enum Conflict {
182 WriteWrite { seq: u64 },
184 ReadWrite { seq: u64 },
186 HistoryTruncated { read_seq: u64, oldest: u64 },
189}
190
191impl std::fmt::Display for Conflict {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 match self {
194 Conflict::WriteWrite { seq } => {
195 write!(f, "write-write conflict with commit sequence {seq}")
196 }
197 Conflict::ReadWrite { seq } => {
198 write!(f, "read-write antidependency with commit sequence {seq}")
199 }
200 Conflict::HistoryTruncated { read_seq, oldest } => write!(
201 f,
202 "commit history truncated below read sequence {read_seq} \
203 (oldest retained {oldest}); aborting conservatively"
204 ),
205 }
206 }
207}
208
209#[derive(Debug)]
214pub struct CommitRegistry {
215 entries: VecDeque<(u64, WriteSet)>,
216 capacity: usize,
217}
218
219impl CommitRegistry {
220 pub fn new(capacity: usize) -> Self {
226 assert!(capacity > 0, "CommitRegistry capacity must be non-zero");
227 Self {
228 entries: VecDeque::new(),
229 capacity,
230 }
231 }
232
233 pub fn record(&mut self, seq: u64, write_set: WriteSet) {
235 self.entries.push_back((seq, write_set));
236 while self.entries.len() > self.capacity {
237 self.entries.pop_front();
238 }
239 }
240
241 pub(crate) fn commit(&mut self, seq: &AtomicU64, write_set: WriteSet) -> u64 {
250 let next = seq.fetch_add(1, Ordering::Relaxed) + 1;
251 self.record(next, write_set);
252 next
253 }
254
255 pub fn check(
261 &self,
262 read_seq: u64,
263 write_set: &WriteSet,
264 read_set: Option<&OccReadSet>,
265 ) -> Option<Conflict> {
266 if let Some(&(oldest, _)) = self.entries.front()
270 && oldest > read_seq.saturating_add(1)
271 {
272 return Some(Conflict::HistoryTruncated { read_seq, oldest });
273 }
274 for (seq, committed) in &self.entries {
275 if *seq <= read_seq {
276 continue;
277 }
278 if write_set.intersects(committed) {
279 return Some(Conflict::WriteWrite { seq: *seq });
280 }
281 if let Some(rs) = read_set
282 && read_set_intersects(rs, committed)
283 {
284 return Some(Conflict::ReadWrite { seq: *seq });
285 }
286 }
287 None
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 fn ws(vids: &[u64]) -> WriteSet {
296 WriteSet {
297 vertices: vids.iter().map(|&v| Vid::from(v)).collect(),
298 edges: HashSet::new(),
299 }
300 }
301
302 fn es(eids: &[u64]) -> WriteSet {
304 WriteSet {
305 vertices: HashSet::new(),
306 edges: eids.iter().map(|&e| Eid::from(e)).collect(),
307 }
308 }
309
310 #[test]
311 fn disjoint_writes_do_not_conflict() {
312 let mut reg = CommitRegistry::new(16);
313 reg.record(1, ws(&[1, 2]));
314 assert!(reg.check(0, &ws(&[3, 4]), None).is_none());
315 }
316
317 #[test]
318 fn overlapping_write_after_read_seq_conflicts() {
319 let mut reg = CommitRegistry::new(16);
320 reg.record(1, ws(&[1, 2]));
321 assert!(matches!(
323 reg.check(0, &ws(&[2]), None),
324 Some(Conflict::WriteWrite { seq: 1 })
325 ));
326 }
327
328 #[test]
329 fn commit_at_or_before_read_seq_is_ignored() {
330 let mut reg = CommitRegistry::new(16);
331 reg.record(1, ws(&[1]));
332 assert!(reg.check(1, &ws(&[1]), None).is_none());
334 }
335
336 #[test]
337 fn read_write_antidependency_detected() {
338 let mut reg = CommitRegistry::new(16);
339 reg.record(1, ws(&[5]));
340 let mut rs = OccReadSet::default();
341 rs.vertices.insert(Vid::from(5));
342 assert!(matches!(
343 reg.check(0, &ws(&[99]), Some(&rs)),
344 Some(Conflict::ReadWrite { seq: 1 })
345 ));
346 }
347
348 #[test]
349 fn truncated_history_aborts_conservatively() {
350 let mut reg = CommitRegistry::new(2);
351 reg.record(1, ws(&[1]));
352 reg.record(2, ws(&[2]));
353 reg.record(3, ws(&[3])); assert!(matches!(
356 reg.check(0, &ws(&[42]), None),
357 Some(Conflict::HistoryTruncated {
358 read_seq: 0,
359 oldest: 2
360 })
361 ));
362 }
363
364 #[test]
365 fn commit_bumps_sequence_and_records() {
366 let seq = AtomicU64::new(0);
367 let mut reg = CommitRegistry::new(16);
368 assert_eq!(reg.commit(&seq, ws(&[1])), 1);
370 assert_eq!(reg.commit(&seq, ws(&[2])), 2);
371 assert_eq!(seq.load(Ordering::Relaxed), 2);
372 assert!(matches!(
374 reg.check(0, &ws(&[1]), None),
375 Some(Conflict::WriteWrite { seq: 1 })
376 ));
377 }
378
379 #[test]
380 fn intersects_detects_overlapping_edges() {
381 assert!(es(&[1, 2]).intersects(&es(&[2, 3])));
383 assert!(!es(&[1, 2]).intersects(&es(&[3, 4])));
384 assert!(!ws(&[1]).intersects(&es(&[1])));
387 }
388
389 #[test]
390 fn overlapping_edge_write_after_read_seq_conflicts() {
391 let mut reg = CommitRegistry::new(16);
392 reg.record(1, es(&[10, 11]));
393 assert!(matches!(
395 reg.check(0, &es(&[11]), None),
396 Some(Conflict::WriteWrite { seq: 1 })
397 ));
398 assert!(reg.check(0, &es(&[12]), None).is_none());
400 }
401
402 fn vid(n: u64) -> Vid {
405 Vid::from(n)
406 }
407
408 fn crdt_props(actor: &str, n: u64) -> uni_common::Properties {
410 let mut gc = uni_crdt::GCounter::new();
411 gc.increment(actor, n);
412 let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GCounter(gc))
413 .unwrap()
414 .into();
415 uni_common::Properties::from([("counter".to_string(), v)])
416 }
417
418 fn int_props(n: i64) -> uni_common::Properties {
419 uni_common::Properties::from([("n".to_string(), uni_common::Value::Int(n))])
420 }
421
422 fn gset_props(item: &str) -> uni_common::Properties {
425 let mut gs = uni_crdt::GSet::new();
426 gs.add(item.to_string());
427 let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GSet(gs))
428 .unwrap()
429 .into();
430 uni_common::Properties::from([("counter".to_string(), v)])
431 }
432
433 #[test]
434 fn crdt_only_write_without_labels_is_carved_out() {
435 let mut buf = L0Buffer::new(0, None);
436 buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
437 assert!(!WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
441 }
442
443 #[test]
444 fn non_crdt_write_without_labels_is_conflictable() {
445 let mut buf = L0Buffer::new(0, None);
446 buf.insert_vertex_with_labels(vid(1), int_props(1), &[]);
447 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
448 }
449
450 #[test]
451 fn crdt_write_with_labels_stays_conflictable() {
452 let mut buf = L0Buffer::new(0, None);
453 buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &["Counter".to_string()]);
456 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
457 }
458
459 #[test]
460 fn mixed_crdt_and_lww_write_is_conflictable() {
461 let mut buf = L0Buffer::new(0, None);
462 let mut props = crdt_props("a", 5);
463 props.insert("n".to_string(), uni_common::Value::Int(1));
464 buf.insert_vertex_with_labels(vid(1), props, &[]);
465 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
467 }
468
469 #[test]
470 fn plain_map_value_is_not_mistaken_for_crdt() {
471 let mut buf = L0Buffer::new(0, None);
472 let map = uni_common::Value::Map(std::collections::HashMap::from([(
473 "x".to_string(),
474 uni_common::Value::Int(1),
475 )]));
476 buf.insert_vertex_with_labels(
477 vid(1),
478 uni_common::Properties::from([("data".to_string(), map)]),
479 &[],
480 );
481 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
484 }
485
486 #[test]
487 fn tombstoned_vertex_is_conflictable() {
488 let mut buf = L0Buffer::new(0, None);
489 buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
490 buf.delete_vertex(vid(1)).unwrap();
491 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
493 }
494
495 #[test]
498 fn crdt_carveout_overwrite_detects_variant_mismatch() {
499 let mut main = L0Buffer::new(0, None);
503 main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
504 let mut tx = L0Buffer::new(0, None);
505 tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
506 let conflict = crdt_carveout_overwrite(&tx, &main).expect("variant mismatch");
507 assert_eq!(conflict.vid, vid(1));
508 assert_eq!(conflict.property, "counter");
509 }
510
511 #[test]
512 fn crdt_carveout_overwrite_allows_same_variant() {
513 let mut main = L0Buffer::new(0, None);
515 main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
516 let mut tx = L0Buffer::new(0, None);
517 tx.insert_vertex_with_labels(vid(1), crdt_props("b", 7), &[]);
518 assert!(crdt_carveout_overwrite(&tx, &main).is_none());
519 }
520
521 #[test]
522 fn crdt_carveout_overwrite_allows_new_vertex() {
523 let main = L0Buffer::new(0, None);
525 let mut tx = L0Buffer::new(0, None);
526 tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
527 assert!(crdt_carveout_overwrite(&tx, &main).is_none());
528 }
529
530 #[test]
531 fn crdt_carveout_overwrite_ignores_conflictable_writes() {
532 let mut main = L0Buffer::new(0, None);
535 main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
536 let mut tx = L0Buffer::new(0, None);
537 tx.insert_vertex_with_labels(vid(1), gset_props("x"), &["Counter".to_string()]);
538 assert!(crdt_carveout_overwrite(&tx, &main).is_none());
539 }
540
541 #[test]
544 fn long_lived_reader_within_retained_history_does_not_abort() {
545 let mut reg = CommitRegistry::new(16);
548 for seq in 1..=5 {
549 reg.record(seq, ws(&[seq + 100])); }
551 assert!(reg.check(0, &ws(&[1]), None).is_none());
552 }
553
554 #[test]
555 fn truncated_history_aborts_read_set_txn_conservatively() {
556 let mut reg = CommitRegistry::new(2);
559 reg.record(1, ws(&[1]));
560 reg.record(2, ws(&[2]));
561 reg.record(3, ws(&[3])); let mut rs = OccReadSet::default();
563 rs.vertices.insert(Vid::from(7));
564 assert!(matches!(
565 reg.check(0, &ws(&[42]), Some(&rs)),
566 Some(Conflict::HistoryTruncated { .. })
567 ));
568 }
569}