use std::cmp::Ordering;
use xxhash_rust::xxh3::xxh3_64;
use crate::calvin::sequencer::error::SequencerError;
use crate::calvin::sequencer::inbox::{AdmittedTx, RejectedTx};
use crate::calvin::sequencer::metrics::ConflictKey;
use crate::calvin::types::{EngineKeySet, SequencedTxn};
#[derive(Debug)]
struct WriteEntry {
discriminant: u8,
engine_name: &'static str,
collection: String,
key_bytes: Vec<u8>,
txn_index: usize,
}
impl WriteEntry {
fn sort_key(&self) -> (&u8, &str, &[u8]) {
(
&self.discriminant,
self.collection.as_str(),
&self.key_bytes,
)
}
}
fn flatten_write_set(tx: &AdmittedTx, txn_index: usize) -> Vec<WriteEntry> {
let mut out = Vec::new();
for key_set in &tx.tx_class.write_set.0 {
match key_set {
EngineKeySet::Document {
collection,
surrogates,
} => {
for &s in surrogates.as_slice() {
out.push(WriteEntry {
discriminant: 0,
engine_name: "document",
collection: collection.clone(),
key_bytes: s.to_le_bytes().to_vec(),
txn_index,
});
}
}
EngineKeySet::Vector {
collection,
surrogates,
} => {
for &s in surrogates.as_slice() {
out.push(WriteEntry {
discriminant: 1,
engine_name: "vector",
collection: collection.clone(),
key_bytes: s.to_le_bytes().to_vec(),
txn_index,
});
}
}
EngineKeySet::Kv { collection, keys } => {
for k in keys.as_slice() {
out.push(WriteEntry {
discriminant: 2,
engine_name: "kv",
collection: collection.clone(),
key_bytes: k.clone(),
txn_index,
});
}
}
EngineKeySet::Edge { collection, edges } => {
for &(src, dst) in edges.as_slice() {
let mut key_bytes = src.to_le_bytes().to_vec();
key_bytes.extend_from_slice(&dst.to_le_bytes());
out.push(WriteEntry {
discriminant: 3,
engine_name: "edge",
collection: collection.clone(),
key_bytes,
txn_index,
});
}
}
}
}
out
}
fn admitted_sort_key(tx: &AdmittedTx) -> (u64, u64, u64) {
let plan_hash = xxh3_64(&tx.tx_class.plans);
(tx.inbox_seq, tx.tx_class.tenant_id.as_u64(), plan_hash)
}
pub fn validate_batch_with_assignments(
epoch: u64,
mut candidates: Vec<AdmittedTx>,
) -> (Vec<(u64, SequencedTxn)>, Vec<RejectedTx>) {
if candidates.is_empty() {
return (vec![], vec![]);
}
candidates.sort_by_key(admitted_sort_key);
let mut flat: Vec<WriteEntry> = Vec::new();
for (i, tx) in candidates.iter().enumerate() {
flat.extend(flatten_write_set(tx, i));
}
flat.sort_by(|a, b| match a.discriminant.cmp(&b.discriminant) {
Ordering::Equal => match a.collection.cmp(&b.collection) {
Ordering::Equal => a.key_bytes.cmp(&b.key_bytes),
other => other,
},
other => other,
});
let n = candidates.len();
let mut rejected = vec![false; n];
let mut admitted_position_for: std::collections::HashMap<usize, u32> =
std::collections::HashMap::new();
let mut i = 0;
while i < flat.len() {
let mut j = i + 1;
while j < flat.len() && flat[j].sort_key() == flat[i].sort_key() {
j += 1;
}
let min_txn = flat[i..j].iter().map(|e| e.txn_index).min().unwrap_or(i);
for entry in &flat[i..j] {
if entry.txn_index != min_txn {
rejected[entry.txn_index] = true;
admitted_position_for.entry(entry.txn_index).or_insert(0);
}
}
i = j;
}
let mut position_map = vec![0u32; n];
let mut next_position: u32 = 0;
for (idx, is_rejected) in rejected.iter().enumerate() {
if !*is_rejected {
position_map[idx] = next_position;
next_position += 1;
}
}
let mut admitted_out: Vec<(u64, SequencedTxn)> = Vec::new();
let mut rejected_out: Vec<RejectedTx> = Vec::new();
for (idx, tx) in candidates.into_iter().enumerate() {
if rejected[idx] {
let (winner_position, conflict_context) =
find_winner_position_and_context(&flat, idx, &position_map, &tx);
rejected_out.push(RejectedTx {
admitted: tx,
reason: SequencerError::Conflict {
position_admitted: winner_position,
},
conflict_context,
});
} else {
let inbox_seq = tx.inbox_seq;
admitted_out.push((
inbox_seq,
SequencedTxn {
epoch,
position: position_map[idx],
tx_class: tx.tx_class,
epoch_system_ms: 0,
},
));
}
}
(admitted_out, rejected_out)
}
pub fn validate_batch(
epoch: u64,
candidates: Vec<AdmittedTx>,
) -> (Vec<SequencedTxn>, Vec<RejectedTx>) {
let (admitted, rejected) = validate_batch_with_assignments(epoch, candidates);
(admitted.into_iter().map(|(_, txn)| txn).collect(), rejected)
}
fn find_winner_position_and_context(
flat: &[WriteEntry],
loser_idx: usize,
position_map: &[u32],
loser_tx: &AdmittedTx,
) -> (u32, Option<ConflictKey>) {
for entry in flat.iter().filter(|e| e.txn_index == loser_idx) {
let min_idx = flat
.iter()
.filter(|e| e.sort_key() == entry.sort_key() && e.txn_index != loser_idx)
.map(|e| e.txn_index)
.min();
if let Some(winner) = min_idx {
let ctx = ConflictKey {
tenant: loser_tx.tx_class.tenant_id.as_u64(),
engine: entry.engine_name,
collection: entry.collection.clone(),
};
return (position_map[winner], Some(ctx));
}
}
(0, None)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::calvin::sequencer::inbox::AdmittedTx;
use crate::calvin::types::{EngineKeySet, ReadWriteSet, SortedVec, TxClass};
use nodedb_types::{
TenantId,
id::{DatabaseId, VShardId},
};
fn find_two_distinct_collections() -> (String, String) {
let mut first: Option<(String, u32)> = None;
for i in 0u32..512 {
let name = format!("col_{i}");
let vshard = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &name).as_u32();
if let Some((ref fname, fv)) = first {
if fv != vshard {
return (fname.clone(), name);
}
} else {
first = Some((name, vshard));
}
}
panic!("could not find two distinct-vshard collections in 512 tries");
}
fn make_tx(
inbox_seq: u64,
col_a: &str,
surrogates_a: Vec<u32>,
col_b: &str,
surrogates_b: Vec<u32>,
) -> AdmittedTx {
let write_set = ReadWriteSet::new(vec![
EngineKeySet::Document {
collection: col_a.to_owned(),
surrogates: SortedVec::new(surrogates_a),
},
EngineKeySet::Document {
collection: col_b.to_owned(),
surrogates: SortedVec::new(surrogates_b),
},
]);
let tx_class = TxClass::new(
ReadWriteSet::new(vec![]),
write_set,
vec![inbox_seq as u8],
TenantId::new(1),
None,
)
.expect("valid TxClass");
AdmittedTx {
inbox_seq,
tx_class,
}
}
#[test]
fn empty_input_produces_empty_output() {
let (admitted, rejected) = validate_batch(1, vec![]);
assert!(admitted.is_empty());
assert!(rejected.is_empty());
}
#[test]
fn single_txn_admitted_at_position_zero() {
let (col_a, col_b) = find_two_distinct_collections();
let tx = make_tx(0, &col_a, vec![1], &col_b, vec![2]);
let (admitted, rejected) = validate_batch(1, vec![tx]);
assert_eq!(admitted.len(), 1);
assert!(rejected.is_empty());
assert_eq!(admitted[0].position, 0);
assert_eq!(admitted[0].epoch, 1);
}
#[test]
fn two_non_conflicting_txns_both_admitted_in_inbox_seq_order() {
let (col_a, col_b) = find_two_distinct_collections();
let tx0 = make_tx(0, &col_a, vec![1], &col_b, vec![10]);
let tx1 = make_tx(1, &col_a, vec![2], &col_b, vec![20]);
let (admitted, rejected) = validate_batch(2, vec![tx0, tx1]);
assert_eq!(admitted.len(), 2);
assert!(rejected.is_empty());
assert_eq!(admitted[0].position, 0);
assert_eq!(admitted[1].position, 1);
}
#[test]
fn two_conflicting_txns_first_admitted_second_rejected() {
let (col_a, col_b) = find_two_distinct_collections();
let tx0 = make_tx(0, &col_a, vec![42], &col_b, vec![1]);
let tx1 = make_tx(1, &col_a, vec![42], &col_b, vec![2]);
let (admitted, rejected) = validate_batch(3, vec![tx0, tx1]);
assert_eq!(admitted.len(), 1);
assert_eq!(rejected.len(), 1);
assert_eq!(admitted[0].position, 0);
assert!(matches!(
rejected[0].reason,
SequencerError::Conflict { .. }
));
}
#[test]
fn deterministic_ordering_across_repeated_runs() {
let (col_a, col_b) = find_two_distinct_collections();
let tx0 = make_tx(0, &col_a, vec![1], &col_b, vec![10]);
let tx1 = make_tx(1, &col_a, vec![1], &col_b, vec![20]);
let (admitted1, rejected1) = validate_batch(1, vec![tx0.clone(), tx1.clone()]);
let (admitted2, rejected2) = validate_batch(1, vec![tx0, tx1]);
assert_eq!(admitted1.len(), admitted2.len());
assert_eq!(rejected1.len(), rejected2.len());
for (a, b) in admitted1.iter().zip(admitted2.iter()) {
assert_eq!(a.position, b.position);
}
}
#[test]
fn rejected_txn_carries_conflict_error_with_winner_position() {
let (col_a, col_b) = find_two_distinct_collections();
let tx0 = make_tx(0, &col_a, vec![99], &col_b, vec![1]);
let tx1 = make_tx(1, &col_a, vec![99], &col_b, vec![2]);
let (_admitted, rejected) = validate_batch(5, vec![tx0, tx1]);
assert_eq!(rejected.len(), 1);
assert_eq!(
rejected[0].reason,
SequencerError::Conflict {
position_admitted: 0
}
);
}
#[test]
fn conflict_fairness_metric_keyed() {
let (col_a, col_b) = find_two_distinct_collections();
let tx0 = make_tx(0, &col_a, vec![7], &col_b, vec![1]);
let tx1 = make_tx(1, &col_a, vec![7], &col_b, vec![2]);
let (_admitted, rejected) = validate_batch(10, vec![tx0, tx1]);
assert_eq!(rejected.len(), 1);
let ctx = rejected[0]
.conflict_context
.as_ref()
.expect("conflict_context must be Some for a Conflict rejection");
assert_eq!(ctx.tenant, 1, "tenant should match tx tenant_id");
assert_eq!(
ctx.engine, "document",
"engine should be 'document' for Document key set"
);
assert_eq!(
ctx.collection, col_a,
"collection should be the conflicting collection"
);
}
}