use crate::{Database, Error, Result};
use borsh::{BorshDeserialize, BorshSerialize};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info, instrument, warn};
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
pub struct ReplicationLog {
pub operation_id: String,
pub source_node_id: String,
pub timestamp_micros: u64,
pub operation: ReplicationOperation,
pub checksum: Option<[u8; 32]>,
}
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
pub enum ReplicationOperation {
Put {
collection: String,
key: Vec<u8>,
value: Vec<u8>,
},
Delete {
collection: String,
key: Vec<u8>,
},
Batch {
collection: String,
operations: Vec<BatchOp>,
},
}
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
pub enum BatchOp {
Put { key: Vec<u8>, value: Vec<u8> },
Delete { key: Vec<u8> },
}
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
struct ReplicationMetadata {
timestamp_micros: u64,
is_tombstone: bool,
}
impl ReplicationMetadata {
fn new(timestamp_micros: u64) -> Self {
Self {
timestamp_micros,
is_tombstone: false,
}
}
fn tombstone(timestamp_micros: u64) -> Self {
Self {
timestamp_micros,
is_tombstone: true,
}
}
}
impl ReplicationLog {
pub fn new(source_node_id: String, operation: ReplicationOperation) -> Self {
Self {
operation_id: generate_operation_id(),
source_node_id,
timestamp_micros: current_timestamp_micros(),
operation,
checksum: None,
}
}
pub fn validate(&self) -> Result<()> {
if self.source_node_id.is_empty() {
return Err(Error::Replication(
"Source node ID cannot be empty".to_string(),
));
}
if self.operation_id.is_empty() {
return Err(Error::Replication(
"Operation ID cannot be empty".to_string(),
));
}
Ok(())
}
pub fn collection(&self) -> &str {
match &self.operation {
ReplicationOperation::Put { collection, .. } => collection,
ReplicationOperation::Delete { collection, .. } => collection,
ReplicationOperation::Batch { collection, .. } => collection,
}
}
}
#[derive(Debug, Clone)]
pub struct ReplicationConfig {
pub enabled: bool,
pub node_id: String,
pub peer_nodes: Vec<String>,
pub conflict_resolution: ConflictResolution,
pub verify_checksums: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictResolution {
LastWriteWins,
FirstWriteWins,
Custom,
}
impl Default for ReplicationConfig {
fn default() -> Self {
Self {
enabled: false,
node_id: String::new(),
peer_nodes: Vec::new(),
conflict_resolution: ConflictResolution::LastWriteWins,
verify_checksums: true,
}
}
}
impl ReplicationConfig {
pub fn new(node_id: impl Into<String>) -> Self {
Self {
node_id: node_id.into(),
..Default::default()
}
}
pub fn enable(mut self) -> Self {
self.enabled = true;
self
}
pub fn with_peers(mut self, peers: Vec<String>) -> Self {
self.peer_nodes = peers;
self
}
pub fn conflict_resolution(mut self, strategy: ConflictResolution) -> Self {
self.conflict_resolution = strategy;
self
}
pub fn validate(&self) -> Result<()> {
if self.enabled && self.node_id.is_empty() {
return Err(Error::Replication(
"Node ID is required when replication is enabled".to_string(),
));
}
Ok(())
}
}
pub trait ReplicationHook: Send + Sync {
fn before_apply(&self, _log: &ReplicationLog) -> Result<()> {
Ok(())
}
fn after_apply(&self, _log: &ReplicationLog) -> Result<()> {
Ok(())
}
fn on_replication_error(&self, _log: &ReplicationLog, _error: &Error) {
}
fn resolve_conflict(&self, _existing: &[u8], new: &[u8]) -> Result<Vec<u8>> {
Ok(new.to_vec())
}
}
pub struct ReplicationManager {
config: ReplicationConfig,
hooks: Vec<Arc<dyn ReplicationHook>>,
db: Database,
applied_operations: Arc<RwLock<HashMap<String, u64>>>,
}
impl ReplicationManager {
fn meta_cf_name(collection: &str) -> String {
format!("__ngdb_repl_meta_{}", collection)
}
fn ensure_meta_cf(&self, collection: &str) -> Result<()> {
let meta_cf = Self::meta_cf_name(collection);
if self.db.inner.db.cf_handle(&meta_cf).is_none() {
debug!("Auto-creating metadata column family: {}", meta_cf);
self.db
.inner
.db
.create_cf(&meta_cf, &rocksdb::Options::default())
.map_err(|e| {
Error::Database(format!(
"Failed to create metadata column family '{}': {}",
meta_cf, e
))
})?;
}
Ok(())
}
fn get_metadata(&self, collection: &str, key: &[u8]) -> Result<Option<ReplicationMetadata>> {
let meta_cf_name = Self::meta_cf_name(collection);
let meta_cf =
self.db.inner.db.cf_handle(&meta_cf_name).ok_or_else(|| {
Error::Database(format!("Metadata CF '{}' not found", meta_cf_name))
})?;
match self.db.inner.db.get_cf(&meta_cf, key)? {
Some(bytes) => {
let metadata = ReplicationMetadata::try_from_slice(&bytes).map_err(|e| {
Error::Deserialization(format!("Failed to deserialize metadata: {}", e))
})?;
Ok(Some(metadata))
}
None => Ok(None),
}
}
fn set_metadata(
&self,
collection: &str,
key: &[u8],
metadata: &ReplicationMetadata,
) -> Result<()> {
let meta_cf_name = Self::meta_cf_name(collection);
let meta_cf =
self.db.inner.db.cf_handle(&meta_cf_name).ok_or_else(|| {
Error::Database(format!("Metadata CF '{}' not found", meta_cf_name))
})?;
let bytes = borsh::to_vec(metadata)
.map_err(|e| Error::Serialization(format!("Failed to serialize metadata: {}", e)))?;
self.db
.inner
.db
.put_cf(&meta_cf, key, bytes)
.map_err(|e| Error::Database(format!("Failed to write metadata: {}", e)))?;
Ok(())
}
}
impl ReplicationManager {
#[instrument(skip(db, config))]
pub fn new(db: Database, config: ReplicationConfig) -> Result<Self> {
config.validate()?;
info!(
"Creating replication manager for node {} with {} peers",
config.node_id,
config.peer_nodes.len()
);
Ok(Self {
config,
hooks: Vec::new(),
db,
applied_operations: Arc::new(RwLock::new(HashMap::new())),
})
}
#[instrument(skip(self, hook))]
pub fn register_hook(&mut self, hook: Arc<dyn ReplicationHook>) {
info!("Registering replication hook");
self.hooks.push(hook);
}
#[instrument(skip(self, log), fields(op_id = %log.operation_id, source = %log.source_node_id))]
pub fn apply_replication(&self, log: ReplicationLog) -> Result<()> {
if !self.config.enabled {
warn!("Attempted to apply replication but replication is not enabled");
return Err(Error::Replication("Replication is not enabled".to_string()));
}
info!("Applying replication from {}", log.source_node_id);
log.validate()?;
if self.config.verify_checksums && log.checksum.is_some() {
debug!("Verifying checksum for operation {}", log.operation_id);
let computed = compute_checksum(&log);
if log.checksum.as_ref() != Some(&computed) {
error!(
"Checksum mismatch for operation {}: expected {:?}, got {:?}",
log.operation_id, log.checksum, computed
);
return Err(Error::Replication(format!(
"Checksum mismatch for operation {}",
log.operation_id
)));
}
}
{
let applied = self
.applied_operations
.read()
.map_err(|e| Error::Replication(format!("Failed to acquire read lock: {}", e)))?;
if let Some(&existing_ts) = applied.get(&log.operation_id) {
if existing_ts >= log.timestamp_micros {
debug!(
"Skipping duplicate operation {} (existing: {}, incoming: {})",
log.operation_id, existing_ts, log.timestamp_micros
);
return Ok(());
}
info!(
"Re-applying operation {} with newer timestamp (existing: {}, incoming: {})",
log.operation_id, existing_ts, log.timestamp_micros
);
}
}
for hook in &self.hooks {
hook.before_apply(&log)?;
}
let result = self.apply_operation(&log, log.timestamp_micros);
if let Err(ref e) = result {
error!("Replication application failed: {}", e);
for hook in &self.hooks {
hook.on_replication_error(&log, e);
}
return result;
}
info!("Replication applied successfully: {}", log.operation_id);
{
let mut applied = self
.applied_operations
.write()
.map_err(|e| Error::Replication(format!("Failed to acquire write lock: {}", e)))?;
applied.insert(log.operation_id.clone(), log.timestamp_micros);
const MAX_AGE_MICROS: u64 = 3_600_000_000; let current_time = current_timestamp_micros();
let cutoff_time = current_time.saturating_sub(MAX_AGE_MICROS);
let before_count = applied.len();
applied.retain(|_k, &mut v| v >= cutoff_time);
let after_count = applied.len();
if before_count != after_count {
info!(
"Cleaned up {} old operation entries (kept {} recent entries)",
before_count - after_count,
after_count
);
}
if applied.len() > 50000 {
warn!(
"Operation tracking map exceeded 50000 entries ({}), performing emergency cleanup",
applied.len()
);
let mut entries: Vec<_> = applied.iter().map(|(k, v)| (k.clone(), *v)).collect();
entries.sort_by_key(|(_k, v)| *v);
let remove_count = entries.len().saturating_sub(25000);
for (key, _) in entries.iter().take(remove_count) {
applied.remove(key);
}
info!(
"Emergency cleanup completed, now tracking {} entries",
applied.len()
);
}
}
for hook in &self.hooks {
if let Err(e) = hook.after_apply(&log) {
warn!("Hook after_apply failed: {}", e);
}
}
Ok(())
}
#[instrument(skip(self, log))]
fn apply_operation(&self, log: &ReplicationLog, timestamp: u64) -> Result<()> {
match &log.operation {
ReplicationOperation::Put {
collection,
key,
value,
} => {
debug!("Applying PUT operation to collection '{}'", collection);
self.apply_put(collection, key, value, timestamp)
}
ReplicationOperation::Delete { collection, key } => {
debug!("Applying DELETE operation to collection '{}'", collection);
self.apply_delete(collection, key, timestamp)
}
ReplicationOperation::Batch {
collection,
operations,
} => {
debug!(
"Applying BATCH operation with {} ops to collection '{}'",
operations.len(),
collection
);
self.apply_batch(collection, operations, timestamp)
}
}
}
#[instrument(skip(self, key, value))]
fn apply_put(&self, collection: &str, key: &[u8], value: &[u8], timestamp: u64) -> Result<()> {
self.ensure_meta_cf(collection)?;
let cf =
self.db.inner.db.cf_handle(collection).ok_or_else(|| {
Error::Database(format!("Column family '{}' not found", collection))
})?;
let existing_metadata = self.get_metadata(collection, key)?;
let existing_data = self
.db
.inner
.db
.get_cf(&cf, key)
.map_err(|e| Error::Database(format!("Failed to get existing value: {}", e)))?;
let final_value = if let Some(existing_metadata) = existing_metadata {
match self.config.conflict_resolution {
ConflictResolution::LastWriteWins => {
if existing_metadata.timestamp_micros >= timestamp {
debug!(
"Conflict resolved: LastWriteWins - keeping existing value (ts: {} >= {})",
existing_metadata.timestamp_micros, timestamp
);
return Ok(());
}
debug!("Conflict resolved: LastWriteWins - accepting new value");
value.to_vec()
}
ConflictResolution::FirstWriteWins => {
debug!("Conflict resolved: FirstWriteWins - keeping existing value");
return Ok(());
}
ConflictResolution::Custom => {
let mut resolved_value = value.to_vec();
let existing_value = existing_data.unwrap_or_default();
if self.hooks.is_empty() {
warn!(
"Custom conflict resolution set but no hook registered, defaulting to LastWriteWins"
);
} else {
for hook in &self.hooks {
debug!("Applying custom conflict resolution hook");
resolved_value =
hook.resolve_conflict(&existing_value, &resolved_value)?;
}
}
resolved_value
}
}
} else {
value.to_vec()
};
self.db
.inner
.db
.put_cf(&cf, key, &final_value)
.map_err(|e| Error::Database(format!("Failed to put value: {}", e)))?;
let metadata = ReplicationMetadata::new(timestamp);
self.set_metadata(collection, key, &metadata)?;
Ok(())
}
#[instrument(skip(self, key))]
fn apply_delete(&self, collection: &str, key: &[u8], timestamp: u64) -> Result<()> {
self.ensure_meta_cf(collection)?;
let cf =
self.db.inner.db.cf_handle(collection).ok_or_else(|| {
Error::Database(format!("Column family '{}' not found", collection))
})?;
if let Some(existing_metadata) = self.get_metadata(collection, key)? {
if existing_metadata.timestamp_micros > timestamp {
debug!(
"Delete rejected: existing value is newer (ts: {} > {})",
existing_metadata.timestamp_micros, timestamp
);
return Ok(());
}
}
self.db
.inner
.db
.delete_cf(&cf, key)
.map_err(|e| Error::Database(format!("Failed to delete value: {}", e)))?;
let tombstone = ReplicationMetadata::tombstone(timestamp);
self.set_metadata(collection, key, &tombstone)?;
Ok(())
}
#[instrument(skip(self, operations))]
fn apply_batch(
&self,
collection: &str,
operations: &[BatchOp],
batch_timestamp: u64,
) -> Result<()> {
self.ensure_meta_cf(collection)?;
let cf =
self.db.inner.db.cf_handle(collection).ok_or_else(|| {
Error::Database(format!("Column family '{}' not found", collection))
})?;
let meta_cf_name = Self::meta_cf_name(collection);
let meta_cf =
self.db.inner.db.cf_handle(&meta_cf_name).ok_or_else(|| {
Error::Database(format!("Metadata CF '{}' not found", meta_cf_name))
})?;
let mut batch = rocksdb::WriteBatch::default();
for op in operations {
match op {
BatchOp::Put { key, value } => {
if let Some(existing_metadata) = self.get_metadata(collection, key)? {
match self.config.conflict_resolution {
ConflictResolution::LastWriteWins => {
if existing_metadata.timestamp_micros >= batch_timestamp {
debug!(
"Batch: LastWriteWins - skipping outdated write (existing: {} >= batch: {})",
existing_metadata.timestamp_micros, batch_timestamp
);
continue;
}
}
ConflictResolution::FirstWriteWins => {
debug!("Batch: FirstWriteWins - skipping existing key");
continue; }
ConflictResolution::Custom => {
let existing_data = self
.db
.inner
.db
.get_cf(&cf, key)
.map_err(|e| {
Error::Database(format!(
"Failed to get existing value: {}",
e
))
})?
.unwrap_or_default();
let mut resolved_value = value.clone();
for hook in &self.hooks {
resolved_value =
hook.resolve_conflict(&existing_data, &resolved_value)?;
}
batch.put_cf(&cf, key, &resolved_value);
let metadata = ReplicationMetadata::new(batch_timestamp);
let meta_bytes = borsh::to_vec(&metadata).map_err(|e| {
Error::Serialization(format!(
"Failed to serialize metadata: {}",
e
))
})?;
batch.put_cf(&meta_cf, key, &meta_bytes);
continue;
}
}
}
batch.put_cf(&cf, key, value);
let metadata = ReplicationMetadata::new(batch_timestamp);
let meta_bytes = borsh::to_vec(&metadata).map_err(|e| {
Error::Serialization(format!("Failed to serialize metadata: {}", e))
})?;
batch.put_cf(&meta_cf, key, &meta_bytes);
}
BatchOp::Delete { key } => {
batch.delete_cf(&cf, key);
let tombstone = ReplicationMetadata::tombstone(batch_timestamp);
let meta_bytes = borsh::to_vec(&tombstone).map_err(|e| {
Error::Serialization(format!("Failed to serialize metadata: {}", e))
})?;
batch.put_cf(&meta_cf, key, &meta_bytes);
}
}
}
self.db
.inner
.db
.write(batch)
.map_err(|e| Error::Database(format!("Failed to write batch: {}", e)))?;
Ok(())
}
pub fn create_put_log(
&self,
collection: impl Into<String>,
key: Vec<u8>,
value: Vec<u8>,
) -> ReplicationLog {
ReplicationLog::new(
self.config.node_id.clone(),
ReplicationOperation::Put {
collection: collection.into(),
key,
value,
},
)
.with_checksum()
}
pub fn create_delete_log(&self, collection: impl Into<String>, key: Vec<u8>) -> ReplicationLog {
ReplicationLog::new(
self.config.node_id.clone(),
ReplicationOperation::Delete {
collection: collection.into(),
key,
},
)
.with_checksum()
}
pub fn config(&self) -> &ReplicationConfig {
&self.config
}
pub fn stats(&self) -> Result<ReplicationStats> {
let applied = self
.applied_operations
.read()
.map_err(|e| Error::Replication(format!("Failed to acquire read lock: {}", e)))?;
let total = applied.len();
let oldest = applied.values().min().copied();
let newest = applied.values().max().copied();
Ok(ReplicationStats {
total_operations: total,
oldest_operation_timestamp: oldest,
newest_operation_timestamp: newest,
})
}
pub fn clear_operation_history(&self) -> Result<()> {
let mut applied = self
.applied_operations
.write()
.map_err(|e| Error::Replication(format!("Failed to acquire write lock: {}", e)))?;
applied.clear();
info!("Cleared replication operation history");
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ReplicationStats {
pub total_operations: usize,
pub oldest_operation_timestamp: Option<u64>,
pub newest_operation_timestamp: Option<u64>,
}
fn generate_operation_id() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::SeqCst);
let timestamp = current_timestamp_micros();
let process_id = std::process::id();
let random_component = {
let mut hasher = blake3::Hasher::new();
hasher.update(×tamp.to_le_bytes());
hasher.update(&counter.to_le_bytes());
hasher.update(&process_id.to_le_bytes());
let hash = hasher.finalize();
u32::from_le_bytes([
hash.as_bytes()[0],
hash.as_bytes()[1],
hash.as_bytes()[2],
hash.as_bytes()[3],
])
};
format!(
"{:08x}_{:016x}_{:08x}_{:08x}",
process_id, timestamp, counter, random_component
)
}
fn current_timestamp_micros() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time before Unix epoch")
.as_micros() as u64
}
fn compute_checksum(log: &ReplicationLog) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(log.operation_id.as_bytes());
hasher.update(log.source_node_id.as_bytes());
hasher.update(&log.timestamp_micros.to_le_bytes());
match &log.operation {
ReplicationOperation::Put {
collection,
key,
value,
} => {
hasher.update(b"PUT");
hasher.update(collection.as_bytes());
hasher.update(key);
hasher.update(value);
}
ReplicationOperation::Delete { collection, key } => {
hasher.update(b"DELETE");
hasher.update(collection.as_bytes());
hasher.update(key);
}
ReplicationOperation::Batch {
collection,
operations,
} => {
hasher.update(b"BATCH");
hasher.update(collection.as_bytes());
hasher.update(&operations.len().to_le_bytes());
for op in operations {
match op {
BatchOp::Put { key, value } => {
hasher.update(b"PUT");
hasher.update(key);
hasher.update(value);
}
BatchOp::Delete { key } => {
hasher.update(b"DELETE");
hasher.update(key);
}
}
}
}
}
*hasher.finalize().as_bytes()
}
impl ReplicationLog {
pub fn with_checksum(mut self) -> Self {
self.checksum = Some(compute_checksum(&self));
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_replication_log_creation() {
let log = ReplicationLog::new(
"node-1".to_string(),
ReplicationOperation::Put {
collection: "test".to_string(),
key: vec![1, 2, 3],
value: vec![4, 5, 6],
},
);
assert_eq!(log.source_node_id, "node-1");
assert!(!log.operation_id.is_empty());
assert!(log.timestamp_micros > 0);
}
#[test]
fn test_replication_config() {
let config = ReplicationConfig::new("node-1")
.enable()
.with_peers(vec!["node-2".to_string()]);
assert!(config.enabled);
assert_eq!(config.node_id, "node-1");
assert_eq!(config.peer_nodes.len(), 1);
}
#[test]
fn test_operation_types() {
let put_op = ReplicationOperation::Put {
collection: "test".to_string(),
key: vec![1],
value: vec![2],
};
let delete_op = ReplicationOperation::Delete {
collection: "test".to_string(),
key: vec![1],
};
let batch_op = ReplicationOperation::Batch {
collection: "test".to_string(),
operations: vec![
BatchOp::Put {
key: vec![1],
value: vec![2],
},
BatchOp::Delete { key: vec![3] },
],
};
assert!(matches!(put_op, ReplicationOperation::Put { .. }));
assert!(matches!(delete_op, ReplicationOperation::Delete { .. }));
assert!(matches!(batch_op, ReplicationOperation::Batch { .. }));
}
#[test]
fn test_compute_checksum() {
let log = ReplicationLog::new(
"node-1".to_string(),
ReplicationOperation::Put {
collection: "test".to_string(),
key: vec![1, 2, 3],
value: vec![4, 5, 6],
},
);
let checksum1 = compute_checksum(&log);
let checksum2 = compute_checksum(&log);
assert_eq!(checksum1, checksum2);
assert_eq!(checksum1.len(), 32);
}
#[test]
fn test_with_checksum() {
let log = ReplicationLog::new(
"node-1".to_string(),
ReplicationOperation::Put {
collection: "test".to_string(),
key: vec![1],
value: vec![2],
},
)
.with_checksum();
assert!(log.checksum.is_some());
}
#[test]
fn test_replication_metadata() {
let metadata = ReplicationMetadata::new(12345);
assert_eq!(metadata.timestamp_micros, 12345);
assert!(!metadata.is_tombstone);
let tombstone = ReplicationMetadata::tombstone(67890);
assert_eq!(tombstone.timestamp_micros, 67890);
assert!(tombstone.is_tombstone);
}
#[test]
fn test_metadata_serialization() {
let metadata = ReplicationMetadata::new(12345);
let serialized = borsh::to_vec(&metadata).unwrap();
let deserialized: ReplicationMetadata = borsh::from_slice(&serialized).unwrap();
assert_eq!(deserialized.timestamp_micros, 12345);
assert!(!deserialized.is_tombstone);
let tombstone = ReplicationMetadata::tombstone(67890);
let serialized = borsh::to_vec(&tombstone).unwrap();
let deserialized: ReplicationMetadata = borsh::from_slice(&serialized).unwrap();
assert_eq!(deserialized.timestamp_micros, 67890);
assert!(deserialized.is_tombstone);
}
#[test]
fn test_replication_log_serialization() {
let log = ReplicationLog::new(
"node-1".to_string(),
ReplicationOperation::Put {
collection: "test".to_string(),
key: vec![1, 2, 3],
value: vec![4, 5, 6],
},
)
.with_checksum();
let serialized = borsh::to_vec(&log).unwrap();
let deserialized: ReplicationLog = borsh::from_slice(&serialized).unwrap();
assert_eq!(deserialized.operation_id, log.operation_id);
assert_eq!(deserialized.source_node_id, log.source_node_id);
assert_eq!(deserialized.timestamp_micros, log.timestamp_micros);
assert_eq!(deserialized.checksum, log.checksum);
}
#[test]
fn test_operation_id_format() {
let op_id = generate_operation_id();
let parts: Vec<&str> = op_id.split('_').collect();
assert_eq!(parts.len(), 4, "Operation ID should have 4 parts");
for part in parts {
assert!(u64::from_str_radix(part, 16).is_ok() || u32::from_str_radix(part, 16).is_ok());
}
}
#[test]
fn test_stats_no_panic() {
use crate::DatabaseConfig;
let temp_dir = tempfile::tempdir().unwrap();
let db = DatabaseConfig::new(temp_dir.path())
.create_if_missing(true)
.add_column_family("test")
.open()
.unwrap();
let config = ReplicationConfig::new("test-node").enable();
let manager = ReplicationManager::new(db, config).unwrap();
let stats = manager.stats().unwrap();
assert_eq!(stats.total_operations, 0);
assert_eq!(stats.oldest_operation_timestamp, None);
assert_eq!(stats.newest_operation_timestamp, None);
}
#[test]
fn test_batch_last_write_wins_conflict() {
use crate::DatabaseConfig;
let temp_dir = tempfile::tempdir().unwrap();
let db = DatabaseConfig::new(temp_dir.path())
.create_if_missing(true)
.add_column_family("test")
.open()
.unwrap();
let config = ReplicationConfig::new("test-node")
.enable()
.conflict_resolution(ConflictResolution::LastWriteWins);
let manager = ReplicationManager::new(db.clone(), config).unwrap();
let key = vec![1, 2, 3];
let newer_value = vec![10, 20, 30];
let older_value = vec![40, 50, 60];
let newer_log = ReplicationLog {
operation_id: "op1".to_string(),
source_node_id: "node1".to_string(),
timestamp_micros: 1000,
operation: ReplicationOperation::Put {
collection: "test".to_string(),
key: key.clone(),
value: newer_value.clone(),
},
checksum: None,
};
manager.apply_replication(newer_log).unwrap();
let cf = db.inner.db.cf_handle("test").unwrap();
let stored = db.inner.db.get_cf(&cf, &key).unwrap();
assert_eq!(stored, Some(newer_value.clone()));
let older_batch_log = ReplicationLog {
operation_id: "op2".to_string(),
source_node_id: "node2".to_string(),
timestamp_micros: 500, operation: ReplicationOperation::Batch {
collection: "test".to_string(),
operations: vec![BatchOp::Put {
key: key.clone(),
value: older_value.clone(),
}],
},
checksum: None,
};
manager.apply_replication(older_batch_log).unwrap();
let stored = db.inner.db.get_cf(&cf, &key).unwrap();
assert_eq!(
stored,
Some(newer_value),
"LastWriteWins should reject older batch operation"
);
let even_newer_value = vec![70, 80, 90];
let newer_batch_log = ReplicationLog {
operation_id: "op3".to_string(),
source_node_id: "node3".to_string(),
timestamp_micros: 2000, operation: ReplicationOperation::Batch {
collection: "test".to_string(),
operations: vec![BatchOp::Put {
key: key.clone(),
value: even_newer_value.clone(),
}],
},
checksum: None,
};
manager.apply_replication(newer_batch_log).unwrap();
let stored = db.inner.db.get_cf(&cf, &key).unwrap();
assert_eq!(
stored,
Some(even_newer_value),
"LastWriteWins should accept newer batch operation"
);
}
}