use crate::node::Node;
use crate::sync::SyncManager;
use crate::types::{compare_entries, Entry, Metadata};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_dynamic_membership() {
let store = Node::new(1, vec![2, 3]);
let peers = store.read().get_peers();
assert_eq!(peers.len(), 2);
assert!(peers.contains(&2));
assert!(peers.contains(&3));
let all_nodes = store.read().get_all_nodes();
assert_eq!(all_nodes.len(), 3); assert!(all_nodes.contains(&1));
let peer2_state = store.read().get_peer_state(2).unwrap();
assert_eq!(peer2_state.local_ack, 0);
assert_eq!(peer2_state.peer_ack, 0);
assert!(store.write().add_peer(4).unwrap());
let peers = store.read().get_peers();
assert_eq!(peers.len(), 3);
assert!(peers.contains(&4));
assert!(!store.write().add_peer(4).unwrap());
assert_eq!(store.read().get_peers().len(), 3);
assert!(!store.write().add_peer(1).unwrap());
assert_eq!(store.read().get_peers().len(), 3);
assert!(store.write().remove_peer(3).unwrap());
let peers = store.read().get_peers();
assert_eq!(peers.len(), 2);
assert!(!peers.contains(&3));
assert!(!store.write().remove_peer(1).unwrap());
assert!(!store.write().remove_peer(99).unwrap());
let all_nodes = store.read().get_all_nodes();
assert_eq!(all_nodes.len(), 3); assert!(all_nodes.contains(&1));
assert!(all_nodes.contains(&2));
assert!(all_nodes.contains(&4));
}
#[tokio::test]
async fn test_prefix_scan() {
let store = Node::new(1, vec![]);
store
.write()
.put("user:1001:name".to_string(), b"Alice".to_vec())
.unwrap();
store
.write()
.put("user:1001:age".to_string(), b"25".to_vec())
.unwrap();
store
.write()
.put("user:1002:name".to_string(), b"Bob".to_vec())
.unwrap();
store
.write()
.put("user:1002:age".to_string(), b"30".to_vec())
.unwrap();
store
.write()
.put("product:2001:name".to_string(), b"Laptop".to_vec())
.unwrap();
store
.write()
.put("product:2001:price".to_string(), b"1000".to_vec())
.unwrap();
let user_1001 = store.read().get_by_prefix("user:1001:");
assert_eq!(user_1001.len(), 2);
assert_eq!(
user_1001.get("user:1001:name").unwrap().value,
Some(b"Alice".to_vec())
);
let all_users = store.read().get_by_prefix("user:");
assert_eq!(all_users.len(), 4);
let products = store.read().get_by_prefix("product:");
assert_eq!(products.len(), 2);
let empty = store.read().get_by_prefix("order:");
assert_eq!(empty.len(), 0);
}
#[tokio::test]
async fn test_basic_put_get() {
let store = Node::new(1, vec![]);
let item = store
.write()
.put("key1".to_string(), b"value1".to_vec())
.unwrap();
assert_eq!(item.key, "key1");
assert_eq!(item.value, Some(b"value1".to_vec()));
let retrieved = store.read().get("key1");
assert!(retrieved.is_some());
let retrieved = retrieved.unwrap();
assert_eq!(retrieved.key, "key1");
assert_eq!(retrieved.value, Some(b"value1".to_vec()));
let missing = store.read().get("nonexistent");
assert!(missing.is_none());
}
#[tokio::test]
async fn test_multiple_puts_same_key() {
let store = Node::new(1, vec![]);
store
.write()
.put("key1".to_string(), b"value1".to_vec())
.unwrap();
store
.write()
.put("key1".to_string(), b"value2".to_vec())
.unwrap();
let retrieved = store.read().get("key1").unwrap();
assert_eq!(retrieved.value, Some(b"value2".to_vec()));
}
#[tokio::test]
async fn test_delete() {
let store = Node::new(1, vec![]);
store
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
store.write().delete("key1".to_string()).unwrap();
let result = store.read().get("key1");
assert!(result.is_none());
}
#[tokio::test]
async fn test_item_comparison() {
let entry1 = Entry::new_put(
Metadata::new(1, 1, 1000),
"key1".to_string(),
b"value1".to_vec(),
);
let entry2 = Entry::new_put(
Metadata::new(2, 2, 2000),
"key1".to_string(),
b"value2".to_vec(),
);
assert_eq!(compare_entries(&entry1, &entry2), std::cmp::Ordering::Less);
assert_eq!(
compare_entries(&entry2, &entry1),
std::cmp::Ordering::Greater
);
}
#[tokio::test]
async fn test_sync_between_stores() {
let store1 = Arc::new(Node::new(1, vec![2]));
let store2 = Arc::new(Node::new(2, vec![1]));
let item1 = store1
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
let updated = store2.write().sync(item1.clone()).unwrap();
assert!(updated);
let retrieved = store2.read().get("key1");
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().value, Some(b"value1".to_vec()));
}
#[tokio::test]
async fn test_delete_propagation() {
let store1 = Arc::new(Node::new(1, vec![2]));
let store2 = Arc::new(Node::new(2, vec![1]));
store1
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
store2
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
sleep(Duration::from_millis(2)).await;
store1.write().delete("key1".to_string()).unwrap();
let tombstone = store1.read().get_including_tombstones("key1").unwrap();
assert!(tombstone.value.is_none());
store2.write().sync(tombstone).unwrap();
let result = store2.read().get("key1");
assert!(result.is_none());
}
#[tokio::test]
async fn test_concurrent_writes_resolution() {
let store1 = Arc::new(Node::new(1, vec![]));
let store2 = Arc::new(Node::new(2, vec![]));
sleep(Duration::from_millis(10)).await;
let _item1 = store1
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
sleep(Duration::from_millis(10)).await;
let item2 = store2
.write()
.put("key1".to_string(), "value2".to_string())
.unwrap();
store1.write().sync(item2.clone()).unwrap();
let final_value = store1.read().get("key1").unwrap();
assert_eq!(final_value.value, Some(b"value2".to_vec()));
}
#[tokio::test]
async fn test_lww_equal_timestamp_tombstone_wins_compare() {
let put = Entry::new_put(Metadata::new(1, 1, 1234), "k".into(), b"v".to_vec());
let del = Entry::new_delete(Metadata::new(2, 2, 1234), "k".into());
assert_eq!(compare_entries(&put, &del), std::cmp::Ordering::Less);
assert_eq!(compare_entries(&del, &put), std::cmp::Ordering::Greater);
}
#[tokio::test]
async fn test_lww_equal_timestamp_node_id_tie_non_tombstone() {
let a = Entry::new_put(Metadata::new(1, 1, 2000), "k".into(), b"a".to_vec());
let b = Entry::new_put(Metadata::new(2, 2, 2000), "k".into(), b"b".to_vec());
assert_eq!(compare_entries(&a, &b), std::cmp::Ordering::Less);
assert_eq!(compare_entries(&b, &a), std::cmp::Ordering::Greater);
}
#[tokio::test]
async fn test_sync_equal_timestamp_tombstone_wins() {
let store = Node::new(1, vec![]);
let put = Entry::new_put(Metadata::new(1, 1, 1000), "k".into(), b"v".to_vec());
store.write().sync(put).unwrap();
let del = Entry::new_delete(Metadata::new(2, 2, 1000), "k".into());
let updated = store.write().sync(del).unwrap();
assert!(updated);
assert!(store.read().get("k").is_none());
}
#[tokio::test]
async fn test_wal_recovery_equal_timestamp_tombstone_wins() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let wal_path = temp_dir.path();
let store = Node::new_with_persistence(1, vec![], wal_path).unwrap();
store
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
drop(store);
let recovered = Node::new_with_persistence(1, vec![], wal_path).unwrap();
let item = recovered.read().get("key1").unwrap();
assert_eq!(item.value, Some(b"value1".to_vec()));
}
#[tokio::test]
async fn test_tombstone_cleanup() {
let store = Node::new(1, vec![]);
store
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
store.write().delete("key1".to_string()).unwrap();
assert!(store.read().get_including_tombstones("key1").is_some());
assert!(store
.read()
.get_including_tombstones("key1")
.unwrap()
.value
.is_none());
sleep(Duration::from_secs(2)).await;
let removed = store
.write()
.cleanup_expired_tombstones(Duration::from_secs(100))
.unwrap();
assert_eq!(removed, 0);
let removed = store
.write()
.cleanup_expired_tombstones(Duration::from_secs(0))
.unwrap();
assert_eq!(removed, 1);
}
#[tokio::test]
async fn test_log_exchange() {
use crate::sync::{ExchangeInterface, SyncMessage, SyncResponse};
use anyhow::Result;
#[derive(Clone)]
struct TestNetworkHandler {
target_store: Node,
}
impl ExchangeInterface for TestNetworkHandler {
async fn sync_to(
&self,
_node: &Node,
_peer: u32,
msg: SyncMessage,
) -> Result<SyncResponse> {
let store = self.target_store.clone();
store.write().apply_pushed_entries(msg.clone())?;
let (entries, is_snapshot) = match store.read().get_peer_missing_logs(&msg.sender_ack) {
Some(entries) => (entries, false),
None => (store.read().kv_to_log_entries(), true),
};
let progress = store.read().get_local_ack();
let peer_id = store.read().id;
Ok(SyncResponse {
entries,
is_snapshot,
progress,
peer_id,
})
}
}
let store1 = Node::new(1, vec![2]);
let store2 = Node::new(2, vec![1]);
store1
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
store1
.write()
.put("key2".to_string(), b"value2".to_vec())
.unwrap();
let network1 = TestNetworkHandler {
target_store: store2.clone(),
};
let sync1 = SyncManager::new(store1.clone(), network1);
let msg = SyncMessage {
sender_id: store1.read().id,
sender_uuid: vec![],
sender_ack: store1.read().get_local_ack(),
entries: vec![], };
let response = sync1.handle_sync(msg).unwrap();
match response {
SyncResponse {
entries,
is_snapshot,
progress,
peer_id: _,
} => {
assert!(!is_snapshot);
assert_eq!(entries.len(), 0); assert!(progress.contains_key(&2)); }
}
}
#[tokio::test]
async fn test_wal_persistence() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let wal_path = temp_dir.path();
{
let store = Node::new_with_persistence(1, vec![], wal_path).unwrap();
store
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
store
.write()
.put("key2".to_string(), b"value2".to_vec())
.unwrap();
store.write().delete("key1".to_string()).unwrap();
}
{
let recovered_store = Node::new_with_persistence(1, vec![], wal_path).unwrap();
let key1 = recovered_store.read().get("key1");
assert!(key1.is_none());
let key2 = recovered_store.read().get("key2");
assert!(key2.is_some());
assert_eq!(key2.unwrap().value, Some(b"value2".to_vec()));
}
}
#[tokio::test]
async fn test_origin_seq_preservation() {
let store1 = Arc::new(Node::new(1, vec![2]));
let store2 = Arc::new(Node::new(2, vec![1]));
let entry1 = store1
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
assert_eq!(entry1.meta.seq, 1);
let entry2 = store1
.write()
.put("key2".to_string(), "value2".to_string())
.unwrap();
assert_eq!(entry2.meta.seq, 2);
store2.write().sync(entry1.clone()).unwrap();
store2.write().sync(entry2.clone()).unwrap();
let retrieved1 = store2.read().get_including_tombstones("key1").unwrap();
assert_eq!(retrieved1.meta.seq, 1);
assert_eq!(retrieved1.meta.node, 1);
let retrieved2 = store2.read().get_including_tombstones("key2").unwrap();
assert_eq!(retrieved2.meta.seq, 2);
assert_eq!(retrieved2.meta.node, 1);
}
#[tokio::test]
async fn test_wal_recovery_lww() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let wal_path = temp_dir.path();
{
let store = Node::new_with_persistence(1, vec![], wal_path).unwrap();
store
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
sleep(Duration::from_millis(10)).await; store
.write()
.put("key1".to_string(), "value2".to_string())
.unwrap();
}
let recovered_store = Node::new_with_persistence(1, vec![], wal_path).unwrap();
let item = recovered_store.read().get("key1").unwrap();
assert_eq!(item.value, Some(b"value2".to_vec()));
}
#[tokio::test]
async fn test_get_missing_log_entries() {
let store = Node::new(1, vec![2]);
store
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
store
.write()
.put("key2".to_string(), "value2".to_string())
.unwrap();
store
.write()
.put("key3".to_string(), b"value3".to_vec())
.unwrap();
let mut peer_progress = std::collections::HashMap::new();
peer_progress.insert(1, 1_u64);
let missing = store.read().get_peer_missing_logs(&peer_progress).unwrap();
assert_eq!(missing.len(), 2);
}
#[tokio::test]
async fn test_kv_to_log_entries() {
let store = Node::new(1, vec![]);
store
.write()
.put("key1".to_string(), "value1".to_string())
.unwrap();
store
.write()
.put("key2".to_string(), "value2".to_string())
.unwrap();
store.write().delete("key3".to_string()).unwrap();
let entries = store.read().kv_to_log_entries();
assert_eq!(entries.len(), 3);
}
#[tokio::test]
async fn test_sync_config_defaults() {
use crate::sync::SyncConfig;
let config = SyncConfig::default();
assert_eq!(config.interval, Duration::from_secs(30));
assert_eq!(config.timeout, Duration::from_secs(10));
}
#[tokio::test]
async fn test_sync_manager_with_config() {
use crate::sync::{ExchangeInterface, SyncConfig, SyncMessage, SyncResponse};
use anyhow::Result;
#[derive(Clone)]
struct TestNetwork {
target: Node,
}
impl ExchangeInterface for TestNetwork {
async fn sync_to(
&self,
_node: &Node,
_peer: u32,
msg: SyncMessage,
) -> Result<SyncResponse> {
self.target.write().apply_pushed_entries(msg.clone())?;
let (entries, is_snapshot) =
match self.target.read().get_peer_missing_logs(&msg.sender_ack) {
Some(e) => (e, false),
None => (self.target.read().kv_to_log_entries(), true),
};
Ok(SyncResponse {
peer_id: self.target.read().id,
entries,
progress: self.target.read().get_local_ack(),
is_snapshot,
})
}
}
let store1 = Node::new(1, vec![2]);
let store2 = Node::new(2, vec![1]);
let config = SyncConfig {
interval: Duration::from_millis(100),
timeout: Duration::from_secs(5),
};
let network = TestNetwork {
target: store2.clone(),
};
let sync = SyncManager::with_config(store1.clone(), network, config);
store1
.write()
.put("key".to_string(), "value".to_string())
.unwrap();
let msg = SyncMessage {
sender_id: 1,
sender_uuid: vec![],
sender_ack: store1.read().get_local_ack(),
entries: vec![],
};
let response = sync.handle_sync(msg).unwrap();
assert!(!response.is_snapshot);
}
#[tokio::test]
async fn test_sync_timeout() {
use crate::sync::{ExchangeInterface, SyncConfig, SyncMessage, SyncResponse};
use anyhow::Result;
use std::sync::Arc;
#[derive(Clone)]
struct SlowNetwork {
delay: Duration,
target: Node,
}
impl ExchangeInterface for SlowNetwork {
async fn sync_to(
&self,
_node: &Node,
_peer: u32,
msg: SyncMessage,
) -> Result<SyncResponse> {
sleep(self.delay).await;
self.target.write().apply_pushed_entries(msg.clone())?;
Ok(SyncResponse {
peer_id: self.target.read().id,
entries: vec![],
progress: self.target.read().get_local_ack(),
is_snapshot: false,
})
}
}
let store1 = Node::new(1, vec![2]);
let store2 = Node::new(2, vec![1]);
let config = SyncConfig {
interval: Duration::from_secs(30),
timeout: Duration::from_millis(50),
};
let network = SlowNetwork {
delay: Duration::from_millis(200),
target: store2.clone(),
};
let sync = Arc::new(SyncManager::with_config(store1.clone(), network, config));
store1
.write()
.put("key".to_string(), "value".to_string())
.unwrap();
let result = sync.bootstrap().await;
assert!(result.is_ok());
}