use std::collections::{HashMap, HashSet};
use crate::{
cilassembly::{changes::ChangeRefRc, Operation, TableOperation},
metadata::tables::TableDataOwned,
Error, Result,
};
#[derive(Debug, Clone)]
pub enum TableModifications {
Sparse {
operations: Vec<TableOperation>,
deleted_rows: HashSet<u32>,
inserted_rows: HashSet<u32>,
change_refs: HashMap<u32, ChangeRefRc>,
next_rid: u32,
original_row_count: u32,
},
Replaced(Vec<TableDataOwned>),
}
impl TableModifications {
pub fn new_sparse(next_rid: u32) -> Self {
let original_row_count = next_rid.saturating_sub(1);
Self::Sparse {
operations: Vec::new(),
deleted_rows: HashSet::new(),
inserted_rows: HashSet::new(),
change_refs: HashMap::new(),
next_rid,
original_row_count,
}
}
pub fn new_replaced(rows: Vec<TableDataOwned>) -> Self {
Self::Replaced(rows)
}
pub fn operation_count(&self) -> usize {
match self {
Self::Sparse { operations, .. } => operations.len(),
Self::Replaced(rows) => rows.len(),
}
}
pub fn has_modifications(&self) -> bool {
match self {
Self::Sparse { operations, .. } => !operations.is_empty(),
Self::Replaced(rows) => !rows.is_empty(),
}
}
pub fn apply_operation(&mut self, op: TableOperation) -> Result<()> {
match self {
Self::Sparse {
operations,
deleted_rows,
inserted_rows,
next_rid,
..
} => {
if let Operation::Delete(rid) = &op.operation {
if deleted_rows.contains(rid) {
return Ok(()); }
}
let insert_pos = match operations
.binary_search_by_key(&op.timestamp, |o| o.timestamp)
{
Ok(mut pos) => {
while pos < operations.len() && operations[pos].timestamp == op.timestamp {
pos += 1;
}
pos
}
Err(pos) => pos, };
operations.insert(insert_pos, op);
let inserted_op = &operations[insert_pos];
match &inserted_op.operation {
Operation::Insert(rid, _) => {
inserted_rows.insert(*rid);
if *rid >= *next_rid {
*next_rid = *rid + 1;
}
}
Operation::Delete(rid) => {
deleted_rows.insert(*rid);
inserted_rows.remove(rid);
}
Operation::Update(rid, _) => {
deleted_rows.remove(rid);
}
}
Ok(())
}
Self::Replaced(_) => Err(Error::CannotModifyReplacedTable),
}
}
pub fn consolidate_operations(&mut self) {
match self {
Self::Sparse {
operations,
deleted_rows,
inserted_rows,
..
} => {
if operations.is_empty() {
return;
}
let mut latest_ops: std::collections::HashMap<u32, usize> =
std::collections::HashMap::new();
for (index, op) in operations.iter().enumerate() {
let rid = op.operation.get_rid();
latest_ops.insert(rid, index);
}
let mut indices_to_remove: Vec<usize> = Vec::new();
for (index, op) in operations.iter().enumerate() {
let rid = op.operation.get_rid();
if latest_ops.get(&rid) != Some(&index) {
indices_to_remove.push(index);
}
}
indices_to_remove.sort_unstable();
for &index in indices_to_remove.iter().rev() {
operations.remove(index);
}
deleted_rows.clear();
inserted_rows.clear();
for op in operations {
match &op.operation {
Operation::Delete(rid) => {
deleted_rows.insert(*rid);
}
Operation::Insert(rid, _) => {
inserted_rows.insert(*rid);
}
Operation::Update(_, _) => {}
}
}
}
Self::Replaced(_) => {
}
}
}
pub fn validate_operation(&self, op: &TableOperation) -> Result<()> {
match &op.operation {
Operation::Insert(rid, _) => {
if *rid == 0 {
return Err(Error::ModificationInvalid(format!(
"RID cannot be zero: {rid}"
)));
}
if self.has_row(*rid) {
return Err(Error::ModificationInvalid(format!(
"Cannot insert row: RID {rid} already exists in table (duplicate insert or existing row)"
)));
}
Ok(())
}
Operation::Update(rid, _) => {
if *rid == 0 {
return Err(Error::ModificationInvalid(format!(
"RID cannot be zero: {rid}"
)));
}
if !self.has_row(*rid) {
return Err(Error::ModificationInvalid(format!(
"RID {rid} not found for update"
)));
}
Ok(())
}
Operation::Delete(rid) => {
if *rid == 0 {
return Err(Error::ModificationInvalid(format!(
"RID cannot be zero: {rid}"
)));
}
if !self.has_row(*rid) {
return Err(Error::ModificationInvalid(format!(
"RID {rid} not found for deletion"
)));
}
Ok(())
}
}
}
pub fn has_row(&self, rid: u32) -> bool {
match self {
Self::Sparse {
deleted_rows,
inserted_rows,
..
} => {
if deleted_rows.contains(&rid) {
return false;
}
if inserted_rows.contains(&rid) {
return true;
}
rid > 0 && rid <= self.original_row_count()
}
Self::Replaced(rows) => {
rid > 0 && (rid as usize) <= rows.len()
}
}
}
fn original_row_count(&self) -> u32 {
match self {
Self::Sparse {
original_row_count, ..
} => *original_row_count,
Self::Replaced(_) => 0, }
}
pub fn register_change_ref(&mut self, rid: u32, change_ref: ChangeRefRc) {
if let Self::Sparse { change_refs, .. } = self {
change_refs.insert(rid, change_ref);
}
}
pub fn get_change_ref(&self, rid: u32) -> Option<&ChangeRefRc> {
match self {
Self::Sparse { change_refs, .. } => change_refs.get(&rid),
Self::Replaced(_) => None,
}
}
pub fn change_refs(&self) -> impl Iterator<Item = (&u32, &ChangeRefRc)> {
match self {
Self::Sparse { change_refs, .. } => {
Box::new(change_refs.iter()) as Box<dyn Iterator<Item = (&u32, &ChangeRefRc)>>
}
Self::Replaced(_) => Box::new(std::iter::empty()),
}
}
pub fn deleted_rids(&self) -> impl Iterator<Item = u32> + '_ {
match self {
Self::Sparse { deleted_rows, .. } => {
Box::new(deleted_rows.iter().copied()) as Box<dyn Iterator<Item = u32>>
}
Self::Replaced(_) => Box::new(std::iter::empty()),
}
}
pub fn next_rid(&self) -> crate::Result<u32> {
match self {
Self::Sparse { next_rid, .. } => Ok(*next_rid),
Self::Replaced(rows) => {
let len = u32::try_from(rows.len()).map_err(|_| {
crate::Error::LayoutFailed(format!(
"Table row count {} exceeds maximum u32 value",
rows.len()
))
})?;
Ok(len + 1)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cilassembly::{Operation, TableOperation};
use crate::metadata::tables::{ModuleRaw, TableDataOwned};
use crate::metadata::token::Token;
fn make_test_row(name_idx: u32) -> TableDataOwned {
TableDataOwned::Module(ModuleRaw {
rid: 1,
token: Token::new(0x00000001),
offset: 0,
generation: 0,
name: name_idx,
mvid: 1,
encid: 0,
encbaseid: 0,
})
}
fn make_op_with_timestamp(op: Operation, timestamp: u64) -> TableOperation {
TableOperation::new_with_timestamp(op, timestamp)
}
#[test]
fn test_table_modifications_creation() {
let sparse = TableModifications::new_sparse(1);
assert!(!sparse.has_modifications());
assert_eq!(sparse.operation_count(), 0);
let replaced = TableModifications::new_replaced(vec![]);
assert!(!replaced.has_modifications());
assert_eq!(replaced.operation_count(), 0);
}
#[test]
fn test_sparse_with_existing_rows() {
let sparse = TableModifications::new_sparse(6);
assert!(!sparse.has_modifications());
assert!(sparse.has_row(1));
assert!(sparse.has_row(5));
assert!(!sparse.has_row(6)); assert!(!sparse.has_row(0)); }
#[test]
fn test_apply_insert_operation() {
let mut mods = TableModifications::new_sparse(1);
let row = make_test_row(100);
let op = TableOperation::new(Operation::Insert(1, row));
assert!(mods.apply_operation(op).is_ok());
assert!(mods.has_modifications());
assert_eq!(mods.operation_count(), 1);
assert!(mods.has_row(1));
}
#[test]
fn test_apply_update_operation() {
let mut mods = TableModifications::new_sparse(6);
let row = make_test_row(200);
let op = TableOperation::new(Operation::Update(3, row));
assert!(mods.apply_operation(op).is_ok());
assert!(mods.has_modifications());
assert_eq!(mods.operation_count(), 1);
assert!(mods.has_row(3)); }
#[test]
fn test_apply_delete_operation() {
let mut mods = TableModifications::new_sparse(6);
let op = TableOperation::new(Operation::Delete(3));
assert!(mods.apply_operation(op).is_ok());
assert!(mods.has_modifications());
assert_eq!(mods.operation_count(), 1);
assert!(!mods.has_row(3)); assert!(mods.has_row(2)); assert!(mods.has_row(4));
}
#[test]
fn test_validate_operation_rid_zero() {
let mods = TableModifications::new_sparse(6);
let insert_op = TableOperation::new(Operation::Insert(0, make_test_row(1)));
assert!(mods.validate_operation(&insert_op).is_err());
let update_op = TableOperation::new(Operation::Update(0, make_test_row(1)));
assert!(mods.validate_operation(&update_op).is_err());
let delete_op = TableOperation::new(Operation::Delete(0));
assert!(mods.validate_operation(&delete_op).is_err());
}
#[test]
fn test_validate_insert_duplicate_rid() {
let mods = TableModifications::new_sparse(6);
let op = TableOperation::new(Operation::Insert(3, make_test_row(1)));
let result = mods.validate_operation(&op);
assert!(result.is_err());
}
#[test]
fn test_validate_update_nonexistent_rid() {
let mods = TableModifications::new_sparse(6);
let op = TableOperation::new(Operation::Update(10, make_test_row(1)));
let result = mods.validate_operation(&op);
assert!(result.is_err());
}
#[test]
fn test_validate_delete_nonexistent_rid() {
let mods = TableModifications::new_sparse(6);
let op = TableOperation::new(Operation::Delete(10));
let result = mods.validate_operation(&op);
assert!(result.is_err());
}
#[test]
fn test_consolidate_operations_keeps_latest() {
let mut mods = TableModifications::new_sparse(1);
let op1 = make_op_with_timestamp(Operation::Insert(1, make_test_row(100)), 1000);
let op2 = make_op_with_timestamp(Operation::Update(1, make_test_row(200)), 2000);
let op3 = make_op_with_timestamp(Operation::Update(1, make_test_row(300)), 3000);
mods.apply_operation(op1).unwrap();
mods.apply_operation(op2).unwrap();
mods.apply_operation(op3).unwrap();
assert_eq!(mods.operation_count(), 3);
mods.consolidate_operations();
assert_eq!(mods.operation_count(), 1);
if let TableModifications::Sparse { operations, .. } = &mods {
assert_eq!(operations[0].timestamp, 3000);
if let Operation::Update(rid, data) = &operations[0].operation {
assert_eq!(*rid, 1);
if let TableDataOwned::Module(row) = data {
assert_eq!(row.name, 300);
}
} else {
panic!("Expected Update operation");
}
}
}
#[test]
fn test_consolidate_operations_multiple_rids() {
let mut mods = TableModifications::new_sparse(1);
let op1 = make_op_with_timestamp(Operation::Insert(1, make_test_row(100)), 1000);
let op2 = make_op_with_timestamp(Operation::Insert(2, make_test_row(200)), 2000);
let op3 = make_op_with_timestamp(Operation::Update(1, make_test_row(150)), 3000);
mods.apply_operation(op1).unwrap();
mods.apply_operation(op2).unwrap();
mods.apply_operation(op3).unwrap();
assert_eq!(mods.operation_count(), 3);
mods.consolidate_operations();
assert_eq!(mods.operation_count(), 2);
}
#[test]
fn test_consolidate_updates_deleted_rows() {
let mut mods = TableModifications::new_sparse(6);
let op1 = make_op_with_timestamp(Operation::Delete(3), 1000);
mods.apply_operation(op1).unwrap();
assert!(!mods.has_row(3));
let op2 = make_op_with_timestamp(Operation::Update(3, make_test_row(300)), 2000);
mods.apply_operation(op2).unwrap();
mods.consolidate_operations();
if let TableModifications::Sparse { deleted_rows, .. } = &mods {
assert!(!deleted_rows.contains(&3));
}
}
#[test]
fn test_replaced_table_operations() {
let rows = vec![make_test_row(1), make_test_row(2), make_test_row(3)];
let mut replaced = TableModifications::new_replaced(rows);
assert!(replaced.has_modifications());
assert_eq!(replaced.operation_count(), 3);
assert!(replaced.has_row(1));
assert!(replaced.has_row(2));
assert!(replaced.has_row(3));
assert!(!replaced.has_row(4));
assert!(!replaced.has_row(0));
let op = TableOperation::new(Operation::Insert(4, make_test_row(4)));
assert!(replaced.apply_operation(op).is_err());
}
#[test]
fn test_has_row_after_insert() {
let mut mods = TableModifications::new_sparse(1);
assert!(!mods.has_row(1));
let op = TableOperation::new(Operation::Insert(1, make_test_row(100)));
mods.apply_operation(op).unwrap();
assert!(mods.has_row(1));
}
#[test]
fn test_next_rid_updates_on_insert() {
let mut mods = TableModifications::new_sparse(1);
let op = TableOperation::new(Operation::Insert(5, make_test_row(100)));
mods.apply_operation(op).unwrap();
if let TableModifications::Sparse { next_rid, .. } = &mods {
assert_eq!(*next_rid, 6);
}
}
#[test]
fn test_empty_consolidate() {
let mut mods = TableModifications::new_sparse(1);
mods.consolidate_operations();
assert!(!mods.has_modifications());
}
#[test]
fn test_same_timestamp_fifo_ordering() {
let mut mods = TableModifications::new_sparse(1);
let fixed_timestamp = 1000u64;
let insert_op = TableOperation::new_with_timestamp(
Operation::Insert(10, make_test_row(100)),
fixed_timestamp,
);
let update_op = TableOperation::new_with_timestamp(
Operation::Update(10, make_test_row(200)),
fixed_timestamp,
);
mods.apply_operation(insert_op).unwrap();
mods.apply_operation(update_op).unwrap();
if let TableModifications::Sparse { operations, .. } = &mods {
assert_eq!(operations.len(), 2);
assert!(operations[0].is_insert(), "Insert should be first");
assert!(operations[1].is_update(), "Update should be second");
} else {
panic!("Expected Sparse modifications");
}
}
#[test]
fn test_same_timestamp_multiple_operations() {
let mut mods = TableModifications::new_sparse(1);
let fixed_timestamp = 1000u64;
let op1 = TableOperation::new_with_timestamp(
Operation::Insert(10, make_test_row(100)),
fixed_timestamp,
);
let op2 = TableOperation::new_with_timestamp(
Operation::Insert(11, make_test_row(200)),
fixed_timestamp,
);
let op3 = TableOperation::new_with_timestamp(
Operation::Insert(12, make_test_row(300)),
fixed_timestamp,
);
mods.apply_operation(op1).unwrap();
mods.apply_operation(op2).unwrap();
mods.apply_operation(op3).unwrap();
if let TableModifications::Sparse { operations, .. } = &mods {
assert_eq!(operations.len(), 3);
assert_eq!(
operations[0].get_rid(),
10,
"First operation should be RID 10"
);
assert_eq!(
operations[1].get_rid(),
11,
"Second operation should be RID 11"
);
assert_eq!(
operations[2].get_rid(),
12,
"Third operation should be RID 12"
);
} else {
panic!("Expected Sparse modifications");
}
}
}