use super::conflict::{ChangeEntry as SyncChangeEntry, ConflictDetector, ConflictReport as SyncConflictReport, ConflictType};
use super::message::{Operation, RowDelta, RowId};
use super::vector_clock::VectorClock;
use crate::storage::{StorageEngine, Transaction};
use crate::{Error, Result, Tuple};
use chrono::Utc;
use parking_lot::RwLock;
use rocksdb::DB;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::sync::Arc;
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct DeltaStats {
pub applied_count: u64,
pub conflict_count: u64,
pub rollback_count: u64,
pub last_applied_lsn: u64,
pub skipped_count: u64,
}
#[derive(Debug, Clone)]
pub enum ApplyResult {
Applied {
lsn: u64
},
Conflict {
report: ConflictReport
},
Skipped {
reason: String
},
}
#[derive(Debug, Clone)]
pub struct BatchApplyResult {
pub applied: Vec<u64>,
pub conflicts: Vec<ConflictReport>,
pub failed: Vec<(u64, String)>,
pub total: usize,
}
pub type ConflictReport = SyncConflictReport;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeEntry {
pub lsn: u64,
pub table: String,
pub operation: Operation,
pub row_id: RowId,
pub data: Vec<u8>,
pub vector_clock: VectorClock,
pub checksum: u32,
pub node_id: Uuid,
}
impl ChangeEntry {
pub fn from_row_delta(delta: RowDelta, lsn: u64, node_id: Uuid) -> Self {
Self {
lsn,
table: delta.table,
operation: delta.operation,
row_id: delta.row_id,
data: delta.data,
vector_clock: delta.vector_clock,
checksum: delta.checksum,
node_id,
}
}
pub fn verify_checksum(&self) -> bool {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
self.data.hash(&mut hasher);
let calculated = hasher.finish() as u32;
calculated == self.checksum
}
}
struct AppliedLSNTracker {
db: Arc<DB>,
cache: Arc<RwLock<HashSet<u64>>>,
}
impl AppliedLSNTracker {
fn new(db: Arc<DB>) -> Result<Self> {
let cache = Arc::new(RwLock::new(HashSet::new()));
let tracker = Self {
db: Arc::clone(&db),
cache: Arc::clone(&cache)
};
tracker.load_cache()?;
Ok(tracker)
}
fn load_cache(&self) -> Result<()> {
let prefix = b"sync:applied_lsn:";
let iter = self.db.prefix_iterator(prefix);
let mut cache = self.cache.write();
for item in iter {
let (key, _) = item.map_err(|e| Error::storage(format!("LSN iterator error: {}", e)))?;
if let Ok(key_str) = std::str::from_utf8(&key) {
if let Some(lsn_str) = key_str.strip_prefix("sync:applied_lsn:") {
if let Ok(lsn) = lsn_str.parse::<u64>() {
cache.insert(lsn);
}
}
}
}
debug!("Loaded {} applied LSNs into cache", cache.len());
Ok(())
}
fn mark_applied(&self, lsn: u64) -> Result<()> {
let key = format!("sync:applied_lsn:{}", lsn).into_bytes();
self.db.put(&key, &[1])
.map_err(|e| Error::storage(format!("Failed to mark LSN applied: {}", e)))?;
self.cache.write().insert(lsn);
Ok(())
}
fn is_applied(&self, lsn: u64) -> bool {
self.cache.read().contains(&lsn)
}
}
pub struct DeltaApplicator {
storage: Arc<StorageEngine>,
conflict_detector: Arc<ConflictDetector>,
stats: Arc<RwLock<DeltaStats>>,
lsn_tracker: Arc<AppliedLSNTracker>,
node_id: Uuid,
row_clocks: Arc<RwLock<std::collections::HashMap<(String, Vec<u8>), VectorClock>>>,
}
impl DeltaApplicator {
pub fn new(
storage: Arc<StorageEngine>,
conflict_detector: Arc<ConflictDetector>
) -> Result<Self> {
let db = Arc::clone(&storage.db);
let lsn_tracker = Arc::new(AppliedLSNTracker::new(db)?);
Ok(Self {
storage,
conflict_detector,
stats: Arc::new(RwLock::new(DeltaStats::default())),
lsn_tracker,
node_id: Uuid::new_v4(),
row_clocks: Arc::new(RwLock::new(std::collections::HashMap::new())),
})
}
pub fn apply_change(&self, change: &ChangeEntry) -> Result<ApplyResult> {
if !change.verify_checksum() {
return Ok(ApplyResult::Skipped {
reason: "Checksum verification failed".to_string(),
});
}
if self.lsn_tracker.is_applied(change.lsn) {
debug!("Skipping already applied LSN {}", change.lsn);
return Ok(ApplyResult::Skipped {
reason: format!("LSN {} already applied", change.lsn),
});
}
let result = match &change.operation {
Operation::Insert => self.apply_insert(change),
Operation::Update { columns: _ } => self.apply_update(change),
Operation::Delete => self.apply_delete(change),
};
match result {
Ok(()) => {
self.lsn_tracker.mark_applied(change.lsn)?;
let mut stats = self.stats.write();
stats.applied_count += 1;
stats.last_applied_lsn = change.lsn;
Ok(ApplyResult::Applied { lsn: change.lsn })
}
Err(e) => {
if let Some(report) = self.detect_conflict(change, &e) {
let mut stats = self.stats.write();
stats.conflict_count += 1;
Ok(ApplyResult::Conflict { report })
} else {
Err(e)
}
}
}
}
pub fn apply_batch(&self, changes: Vec<ChangeEntry>) -> Result<BatchApplyResult> {
let total = changes.len();
let mut sorted_changes = changes;
sorted_changes.sort_by_key(|c| c.lsn);
let tx = self.storage.begin_transaction()?;
let mut result = BatchApplyResult {
applied: Vec::new(),
conflicts: Vec::new(),
failed: Vec::new(),
total,
};
let mut should_rollback = false;
for change in sorted_changes {
match self.apply_change_in_transaction(&tx, &change) {
Ok(ApplyResult::Applied { lsn }) => {
result.applied.push(lsn);
}
Ok(ApplyResult::Conflict { report }) => {
result.conflicts.push(report);
}
Ok(ApplyResult::Skipped { reason }) => {
debug!("Skipped LSN {}: {}", change.lsn, reason);
}
Err(e) => {
warn!("Critical error applying LSN {}: {}", change.lsn, e);
result.failed.push((change.lsn, e.to_string()));
should_rollback = true;
break; }
}
}
if should_rollback || !result.failed.is_empty() {
tx.rollback()?;
let mut stats = self.stats.write();
stats.rollback_count += 1;
info!("Rolled back batch due to {} failures", result.failed.len());
} else {
tx.commit()?;
info!("Committed batch: {} applied, {} conflicts",
result.applied.len(), result.conflicts.len());
}
Ok(result)
}
fn apply_change_in_transaction(
&self,
_tx: &Transaction,
change: &ChangeEntry
) -> Result<ApplyResult> {
self.apply_change(change)
}
fn apply_insert(&self, change: &ChangeEntry) -> Result<()> {
if self.row_exists(&change.table, &change.row_id)? {
return Err(Error::storage(
format!("INSERT conflict: row already exists in table {}", change.table)
));
}
let tuple: Tuple = bincode::deserialize(&change.data)
.map_err(|e| Error::storage(format!("Failed to deserialize tuple: {}", e)))?;
self.storage.insert_tuple(&change.table, tuple)?;
{
let key = (change.table.clone(), change.row_id.clone());
let mut row_clocks = self.row_clocks.write();
let mut clock = change.vector_clock.clone();
clock.increment(self.node_id);
row_clocks.insert(key, clock);
}
debug!("Applied INSERT to table {} (LSN {})", change.table, change.lsn);
Ok(())
}
fn apply_update(&self, change: &ChangeEntry) -> Result<()> {
if !self.row_exists(&change.table, &change.row_id)? {
return Err(Error::storage(
format!("UPDATE conflict: row does not exist in table {}", change.table)
));
}
let _current = self.get_tuple(&change.table, &change.row_id)?;
let key = (change.table.clone(), change.row_id.clone());
{
let row_clocks = self.row_clocks.read();
if let Some(local_clock) = row_clocks.get(&key) {
if change.vector_clock.happens_before(local_clock) {
debug!(
"Rejecting stale UPDATE to table {} (LSN {}): incoming clock happens-before local",
change.table, change.lsn
);
return Err(Error::storage(
format!("UPDATE conflict: stale update for row in table {}", change.table)
));
}
if change.vector_clock.conflicts_with(local_clock) {
warn!(
"Concurrent modification detected for table {} (LSN {})",
change.table, change.lsn
);
return Err(Error::storage(
format!("UPDATE conflict: concurrent modification in table {}", change.table)
));
}
}
}
let tuple: Tuple = bincode::deserialize(&change.data)
.map_err(|e| Error::storage(format!("Failed to deserialize tuple: {}", e)))?;
let row_id_u64 = self.row_id_to_u64(&change.row_id)?;
self.storage.insert_tuple(&change.table, tuple)?;
{
let mut row_clocks = self.row_clocks.write();
let entry = row_clocks.entry(key).or_insert_with(VectorClock::new);
entry.merge(&change.vector_clock);
entry.increment(self.node_id);
}
debug!("Applied UPDATE to table {} row {} (LSN {})",
change.table, row_id_u64, change.lsn);
Ok(())
}
fn apply_delete(&self, change: &ChangeEntry) -> Result<()> {
let row_id = self.row_id_to_u64(&change.row_id)?;
let key = format!("data:{}:{}", change.table, row_id).into_bytes();
self.storage.db.delete(&key)
.map_err(|e| Error::storage(format!("Failed to delete row: {}", e)))?;
debug!("Applied DELETE to table {} row {} (LSN {})",
change.table, row_id, change.lsn);
Ok(())
}
fn row_exists(&self, table: &str, row_id: &RowId) -> Result<bool> {
let row_id_u64 = self.row_id_to_u64(row_id)?;
let key = format!("data:{}:{}", table, row_id_u64).into_bytes();
self.storage.db.get(&key)
.map(|opt| opt.is_some())
.map_err(|e| Error::storage(format!("Failed to check row existence: {}", e)))
}
fn get_tuple(&self, table: &str, row_id: &RowId) -> Result<Option<Vec<u8>>> {
let row_id_u64 = self.row_id_to_u64(row_id)?;
let key = format!("data:{}:{}", table, row_id_u64).into_bytes();
self.storage.db.get(&key)
.map_err(|e| Error::storage(format!("Failed to get tuple: {}", e)))
}
fn row_id_to_u64(&self, row_id: &RowId) -> Result<u64> {
if row_id.len() == 8 {
let bytes: [u8; 8] = row_id.as_slice().try_into()
.map_err(|_| Error::storage("Invalid row ID length"))?;
Ok(u64::from_be_bytes(bytes))
} else if row_id.len() == 1 {
Ok(u64::from(row_id[0]))
} else {
String::from_utf8(row_id.clone())
.ok()
.and_then(|s| s.parse().ok())
.ok_or_else(|| Error::storage("Failed to parse row ID"))
}
}
fn detect_conflict(&self, change: &ChangeEntry, _error: &Error) -> Option<ConflictReport> {
let local_data = self.get_tuple(&change.table, &change.row_id).ok().flatten();
if let Some(local_bytes) = local_data {
let local_clock = {
let key = (change.table.clone(), change.row_id.clone());
let row_clocks = self.row_clocks.read();
row_clocks.get(&key).cloned().unwrap_or_default()
};
let local_entry = SyncChangeEntry {
data: local_bytes,
timestamp: Utc::now(),
node_id: self.node_id,
vector_clock: local_clock, operation: super::conflict::ChangeOperation::Update,
};
let remote_entry = SyncChangeEntry {
data: change.data.clone(),
timestamp: Utc::now(),
node_id: change.node_id,
vector_clock: change.vector_clock.clone(),
operation: match &change.operation {
Operation::Insert => super::conflict::ChangeOperation::Insert,
Operation::Update { .. } => super::conflict::ChangeOperation::Update,
Operation::Delete => super::conflict::ChangeOperation::Delete,
},
};
if let Some(conflict) = self.conflict_detector.detect(
&change.table,
&change.row_id,
&local_entry,
&remote_entry,
) {
if let Ok(report) = self.conflict_detector.resolve(conflict) {
return Some(report);
}
}
}
None
}
pub fn get_stats(&self) -> DeltaStats {
self.stats.read().clone()
}
pub fn rollback(&self, _transaction_id: u64) -> Result<()> {
let mut stats = self.stats.write();
stats.rollback_count += 1;
Ok(())
}
pub fn reset_stats(&self) {
let mut stats = self.stats.write();
*stats = DeltaStats::default();
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::{Config, Value};
use uuid::Uuid;
fn create_test_storage() -> Arc<StorageEngine> {
let config = Config::default();
let storage = StorageEngine::open_in_memory(&config).unwrap();
Arc::new(storage)
}
fn create_test_change(lsn: u64, operation: Operation) -> ChangeEntry {
let tuple = Tuple {
values: vec![Value::Int8(42), Value::String("test".to_string())],
};
let data = bincode::serialize(&tuple).unwrap();
let checksum = {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
data.hash(&mut hasher);
hasher.finish() as u32
};
ChangeEntry {
lsn,
table: "test_table".to_string(),
operation,
row_id: vec![1],
data,
vector_clock: VectorClock::new(),
checksum,
node_id: Uuid::new_v4(),
}
}
#[test]
fn test_apply_single_change() {
let storage = create_test_storage();
let conflict_detector = Arc::new(ConflictDetector::default());
let applicator = DeltaApplicator::new(storage, conflict_detector).unwrap();
let change = create_test_change(1, Operation::Insert);
let result = applicator.apply_change(&change).unwrap();
match result {
ApplyResult::Applied { lsn } => assert_eq!(lsn, 1),
_ => panic!("Expected Applied result"),
}
let stats = applicator.get_stats();
assert_eq!(stats.applied_count, 1);
assert_eq!(stats.last_applied_lsn, 1);
}
#[test]
fn test_idempotency() {
let storage = create_test_storage();
let conflict_detector = Arc::new(ConflictDetector::default());
let applicator = DeltaApplicator::new(storage, conflict_detector).unwrap();
let change = create_test_change(1, Operation::Insert);
applicator.apply_change(&change).unwrap();
let result = applicator.apply_change(&change).unwrap();
match result {
ApplyResult::Skipped { reason } => {
assert!(reason.contains("already applied"));
}
_ => panic!("Expected Skipped result"),
}
let stats = applicator.get_stats();
assert_eq!(stats.applied_count, 1); }
#[test]
fn test_batch_application() {
let storage = create_test_storage();
let conflict_detector = Arc::new(ConflictDetector::default());
let applicator = DeltaApplicator::new(storage, conflict_detector).unwrap();
let changes = vec![
create_test_change(1, Operation::Insert),
create_test_change(2, Operation::Insert),
create_test_change(3, Operation::Insert),
];
let result = applicator.apply_batch(changes).unwrap();
assert_eq!(result.applied.len(), 3);
assert_eq!(result.conflicts.len(), 0);
assert_eq!(result.failed.len(), 0);
assert_eq!(result.total, 3);
}
#[test]
fn test_checksum_verification() {
let storage = create_test_storage();
let conflict_detector = Arc::new(ConflictDetector::default());
let applicator = DeltaApplicator::new(storage, conflict_detector).unwrap();
let mut change = create_test_change(1, Operation::Insert);
change.checksum = 0;
let result = applicator.apply_change(&change).unwrap();
match result {
ApplyResult::Skipped { reason } => {
assert!(reason.contains("Checksum"));
}
_ => panic!("Expected Skipped result"),
}
}
#[test]
fn test_lsn_ordering() {
let storage = create_test_storage();
let conflict_detector = Arc::new(ConflictDetector::default());
let applicator = DeltaApplicator::new(storage, conflict_detector).unwrap();
let changes = vec![
create_test_change(3, Operation::Insert),
create_test_change(1, Operation::Insert),
create_test_change(2, Operation::Insert),
];
let result = applicator.apply_batch(changes).unwrap();
assert_eq!(result.applied, vec![1, 2, 3]);
}
}