1use std::collections::{HashSet, VecDeque};
14
15use crate::runtime::l0::{L0Buffer, OccReadSet, try_as_crdt};
16use uni_common::core::id::{Eid, Vid};
17
18#[derive(Debug, Default, Clone)]
20pub struct WriteSet {
21 pub vertices: HashSet<Vid>,
23 pub edges: HashSet<Eid>,
25}
26
27impl WriteSet {
28 pub fn from_l0(l0: &L0Buffer) -> Self {
43 let mut vertices: HashSet<Vid> = HashSet::new();
44 for (vid, props) in &l0.vertex_properties {
45 if !is_crdt_carveout(l0, vid, props) {
46 vertices.insert(*vid);
47 }
48 }
49 vertices.extend(l0.vertex_tombstones.iter().copied());
51 vertices.extend(l0.vertex_label_overwrites.iter().copied());
57
58 let mut edges: HashSet<Eid> = l0.edge_properties.keys().copied().collect();
59 edges.extend(l0.edge_endpoints.keys().copied());
60 edges.extend(l0.tombstones.keys().copied());
61 Self { vertices, edges }
62 }
63
64 pub fn is_empty(&self) -> bool {
66 self.vertices.is_empty() && self.edges.is_empty()
67 }
68
69 pub fn intersects(&self, other: &WriteSet) -> bool {
71 let (small, large) = if self.vertices.len() <= other.vertices.len() {
73 (&self.vertices, &other.vertices)
74 } else {
75 (&other.vertices, &self.vertices)
76 };
77 if small.iter().any(|v| large.contains(v)) {
78 return true;
79 }
80 let (small, large) = if self.edges.len() <= other.edges.len() {
81 (&self.edges, &other.edges)
82 } else {
83 (&other.edges, &self.edges)
84 };
85 small.iter().any(|e| large.contains(e))
86 }
87}
88
89fn is_crdt_carveout(l0: &L0Buffer, vid: &Vid, props: &uni_common::Properties) -> bool {
102 let label_changed = l0
103 .vertex_labels
104 .get(vid)
105 .is_some_and(|labels| !labels.is_empty());
106 let all_crdt = !props.is_empty() && props.values().all(|v| try_as_crdt(v).is_some());
107 all_crdt && !label_changed
108}
109
110#[derive(Debug)]
118pub struct CrdtVariantConflict {
119 pub vid: Vid,
121 pub property: String,
123}
124
125impl std::fmt::Display for CrdtVariantConflict {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 write!(
128 f,
129 "carved-out CRDT write to property {:?} would overwrite a different \
130 committed CRDT variant (a lost update); aborting",
131 self.property
132 )
133 }
134}
135
136pub fn crdt_carveout_overwrite(tx_l0: &L0Buffer, main: &L0Buffer) -> Option<CrdtVariantConflict> {
147 for (vid, props) in &tx_l0.vertex_properties {
148 if tx_l0.vertex_tombstones.contains(vid) || !is_crdt_carveout(tx_l0, vid, props) {
149 continue;
150 }
151 let Some(existing_props) = main.vertex_properties.get(vid) else {
152 continue;
153 };
154 for (key, value) in props {
155 let (Some(new_crdt), Some(existing_crdt)) = (
156 try_as_crdt(value),
157 existing_props.get(key).and_then(try_as_crdt),
158 ) else {
159 continue;
160 };
161 if new_crdt.type_name() != existing_crdt.type_name() {
162 return Some(CrdtVariantConflict {
163 vid: *vid,
164 property: key.clone(),
165 });
166 }
167 }
168 }
169 None
170}
171
172fn read_set_intersects(read_set: &OccReadSet, w: &WriteSet) -> bool {
174 read_set.vertices.iter().any(|v| w.vertices.contains(v))
175 || read_set.edges.iter().any(|e| w.edges.contains(e))
176}
177
178#[derive(Debug)]
180pub enum Conflict {
181 WriteWrite { seq: u64 },
183 ReadWrite { seq: u64 },
185 HistoryTruncated { read_seq: u64, oldest: u64 },
188}
189
190impl std::fmt::Display for Conflict {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 match self {
193 Conflict::WriteWrite { seq } => {
194 write!(f, "write-write conflict with commit sequence {seq}")
195 }
196 Conflict::ReadWrite { seq } => {
197 write!(f, "read-write antidependency with commit sequence {seq}")
198 }
199 Conflict::HistoryTruncated { read_seq, oldest } => write!(
200 f,
201 "commit history truncated below read sequence {read_seq} \
202 (oldest retained {oldest}); aborting conservatively"
203 ),
204 }
205 }
206}
207
208#[derive(Debug)]
213pub struct CommitRegistry {
214 entries: VecDeque<(u64, WriteSet)>,
215 capacity: usize,
216}
217
218impl CommitRegistry {
219 pub fn new(capacity: usize) -> Self {
225 assert!(capacity > 0, "CommitRegistry capacity must be non-zero");
226 Self {
227 entries: VecDeque::new(),
228 capacity,
229 }
230 }
231
232 pub fn record(&mut self, seq: u64, write_set: WriteSet) {
234 self.entries.push_back((seq, write_set));
235 while self.entries.len() > self.capacity {
236 self.entries.pop_front();
237 }
238 }
239
240 pub fn check(
246 &self,
247 read_seq: u64,
248 write_set: &WriteSet,
249 read_set: Option<&OccReadSet>,
250 ) -> Option<Conflict> {
251 if let Some(&(oldest, _)) = self.entries.front()
255 && oldest > read_seq.saturating_add(1)
256 {
257 return Some(Conflict::HistoryTruncated { read_seq, oldest });
258 }
259 for (seq, committed) in &self.entries {
260 if *seq <= read_seq {
261 continue;
262 }
263 if write_set.intersects(committed) {
264 return Some(Conflict::WriteWrite { seq: *seq });
265 }
266 if let Some(rs) = read_set
267 && read_set_intersects(rs, committed)
268 {
269 return Some(Conflict::ReadWrite { seq: *seq });
270 }
271 }
272 None
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 fn ws(vids: &[u64]) -> WriteSet {
281 WriteSet {
282 vertices: vids.iter().map(|&v| Vid::from(v)).collect(),
283 edges: HashSet::new(),
284 }
285 }
286
287 #[test]
288 fn disjoint_writes_do_not_conflict() {
289 let mut reg = CommitRegistry::new(16);
290 reg.record(1, ws(&[1, 2]));
291 assert!(reg.check(0, &ws(&[3, 4]), None).is_none());
292 }
293
294 #[test]
295 fn overlapping_write_after_read_seq_conflicts() {
296 let mut reg = CommitRegistry::new(16);
297 reg.record(1, ws(&[1, 2]));
298 assert!(matches!(
300 reg.check(0, &ws(&[2]), None),
301 Some(Conflict::WriteWrite { seq: 1 })
302 ));
303 }
304
305 #[test]
306 fn commit_at_or_before_read_seq_is_ignored() {
307 let mut reg = CommitRegistry::new(16);
308 reg.record(1, ws(&[1]));
309 assert!(reg.check(1, &ws(&[1]), None).is_none());
311 }
312
313 #[test]
314 fn read_write_antidependency_detected() {
315 let mut reg = CommitRegistry::new(16);
316 reg.record(1, ws(&[5]));
317 let mut rs = OccReadSet::default();
318 rs.vertices.insert(Vid::from(5));
319 assert!(matches!(
320 reg.check(0, &ws(&[99]), Some(&rs)),
321 Some(Conflict::ReadWrite { seq: 1 })
322 ));
323 }
324
325 #[test]
326 fn truncated_history_aborts_conservatively() {
327 let mut reg = CommitRegistry::new(2);
328 reg.record(1, ws(&[1]));
329 reg.record(2, ws(&[2]));
330 reg.record(3, ws(&[3])); assert!(matches!(
333 reg.check(0, &ws(&[42]), None),
334 Some(Conflict::HistoryTruncated {
335 read_seq: 0,
336 oldest: 2
337 })
338 ));
339 }
340
341 fn vid(n: u64) -> Vid {
344 Vid::from(n)
345 }
346
347 fn crdt_props(actor: &str, n: u64) -> uni_common::Properties {
349 let mut gc = uni_crdt::GCounter::new();
350 gc.increment(actor, n);
351 let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GCounter(gc))
352 .unwrap()
353 .into();
354 uni_common::Properties::from([("counter".to_string(), v)])
355 }
356
357 fn int_props(n: i64) -> uni_common::Properties {
358 uni_common::Properties::from([("n".to_string(), uni_common::Value::Int(n))])
359 }
360
361 fn gset_props(item: &str) -> uni_common::Properties {
364 let mut gs = uni_crdt::GSet::new();
365 gs.add(item.to_string());
366 let v: uni_common::Value = serde_json::to_value(uni_crdt::Crdt::GSet(gs))
367 .unwrap()
368 .into();
369 uni_common::Properties::from([("counter".to_string(), v)])
370 }
371
372 #[test]
373 fn crdt_only_write_without_labels_is_carved_out() {
374 let mut buf = L0Buffer::new(0, None);
375 buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
376 assert!(!WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
380 }
381
382 #[test]
383 fn non_crdt_write_without_labels_is_conflictable() {
384 let mut buf = L0Buffer::new(0, None);
385 buf.insert_vertex_with_labels(vid(1), int_props(1), &[]);
386 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
387 }
388
389 #[test]
390 fn crdt_write_with_labels_stays_conflictable() {
391 let mut buf = L0Buffer::new(0, None);
392 buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &["Counter".to_string()]);
395 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
396 }
397
398 #[test]
399 fn mixed_crdt_and_lww_write_is_conflictable() {
400 let mut buf = L0Buffer::new(0, None);
401 let mut props = crdt_props("a", 5);
402 props.insert("n".to_string(), uni_common::Value::Int(1));
403 buf.insert_vertex_with_labels(vid(1), props, &[]);
404 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
406 }
407
408 #[test]
409 fn plain_map_value_is_not_mistaken_for_crdt() {
410 let mut buf = L0Buffer::new(0, None);
411 let map = uni_common::Value::Map(std::collections::HashMap::from([(
412 "x".to_string(),
413 uni_common::Value::Int(1),
414 )]));
415 buf.insert_vertex_with_labels(
416 vid(1),
417 uni_common::Properties::from([("data".to_string(), map)]),
418 &[],
419 );
420 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
423 }
424
425 #[test]
426 fn tombstoned_vertex_is_conflictable() {
427 let mut buf = L0Buffer::new(0, None);
428 buf.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
429 buf.delete_vertex(vid(1)).unwrap();
430 assert!(WriteSet::from_l0(&buf).vertices.contains(&vid(1)));
432 }
433
434 #[test]
437 fn crdt_carveout_overwrite_detects_variant_mismatch() {
438 let mut main = L0Buffer::new(0, None);
442 main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
443 let mut tx = L0Buffer::new(0, None);
444 tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
445 let conflict = crdt_carveout_overwrite(&tx, &main).expect("variant mismatch");
446 assert_eq!(conflict.vid, vid(1));
447 assert_eq!(conflict.property, "counter");
448 }
449
450 #[test]
451 fn crdt_carveout_overwrite_allows_same_variant() {
452 let mut main = L0Buffer::new(0, None);
454 main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
455 let mut tx = L0Buffer::new(0, None);
456 tx.insert_vertex_with_labels(vid(1), crdt_props("b", 7), &[]);
457 assert!(crdt_carveout_overwrite(&tx, &main).is_none());
458 }
459
460 #[test]
461 fn crdt_carveout_overwrite_allows_new_vertex() {
462 let main = L0Buffer::new(0, None);
464 let mut tx = L0Buffer::new(0, None);
465 tx.insert_vertex_with_labels(vid(1), gset_props("x"), &[]);
466 assert!(crdt_carveout_overwrite(&tx, &main).is_none());
467 }
468
469 #[test]
470 fn crdt_carveout_overwrite_ignores_conflictable_writes() {
471 let mut main = L0Buffer::new(0, None);
474 main.insert_vertex_with_labels(vid(1), crdt_props("a", 5), &[]);
475 let mut tx = L0Buffer::new(0, None);
476 tx.insert_vertex_with_labels(vid(1), gset_props("x"), &["Counter".to_string()]);
477 assert!(crdt_carveout_overwrite(&tx, &main).is_none());
478 }
479
480 #[test]
483 fn long_lived_reader_within_retained_history_does_not_abort() {
484 let mut reg = CommitRegistry::new(16);
487 for seq in 1..=5 {
488 reg.record(seq, ws(&[seq + 100])); }
490 assert!(reg.check(0, &ws(&[1]), None).is_none());
491 }
492
493 #[test]
494 fn truncated_history_aborts_read_set_txn_conservatively() {
495 let mut reg = CommitRegistry::new(2);
498 reg.record(1, ws(&[1]));
499 reg.record(2, ws(&[2]));
500 reg.record(3, ws(&[3])); let mut rs = OccReadSet::default();
502 rs.vertices.insert(Vid::from(7));
503 assert!(matches!(
504 reg.check(0, &ws(&[42]), Some(&rs)),
505 Some(Conflict::HistoryTruncated { .. })
506 ));
507 }
508}