use super::*;
fn create_leader_with_snapshot(state_data: Vec<u8>) -> (RaftNode, tempfile::TempDir) {
let dir = tempfile::TempDir::new().expect("create temp dir");
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.snapshot_dir = Some(dir.path().to_path_buf());
config.snapshot_threshold = 5;
let node = RaftNode::new(config).expect("create node");
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..6 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("propose");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("set commit");
log.set_applied_index(6).expect("set applied");
}
node.maybe_create_snapshot(state_data)
.expect("create snapshot");
(node, dir)
}
#[test]
fn test_prepare_install_snapshot_small_single_shot() -> RaftResult<()> {
let (node, _dir) = create_leader_with_snapshot(b"small state".to_vec());
{
let mut ls = node.leader_state.write();
let state = ls.as_mut().expect("leader state");
state.next_index.insert(2, 1);
}
let req = node
.prepare_install_snapshot(2)?
.expect("should produce a request for a lagging peer");
assert!(req.done, "small snapshot must have done=true");
assert_eq!(req.offset, 0, "complete transfer must start at offset 0");
assert_eq!(req.last_included_index, 6);
assert!(
node.snapshot_streamers.read().is_empty(),
"no streamers should remain after single-shot transfer"
);
Ok(())
}
#[test]
fn test_prepare_install_snapshot_large_streams_chunks() -> RaftResult<()> {
let dir = tempfile::TempDir::new().expect("create temp dir");
let state_data: Vec<u8> = (0..1024u16).map(|i| (i % 256) as u8).collect();
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.snapshot_dir = Some(dir.path().to_path_buf());
config.snapshot_threshold = 5;
config.snapshot_chunk_threshold_bytes = 16;
config.snapshot_chunk_size_bytes = 64;
let node = RaftNode::new(config).expect("create node");
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..6 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("propose");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("set commit");
log.set_applied_index(6).expect("set applied");
}
node.maybe_create_snapshot(state_data.clone())
.expect("create snapshot");
{
let mut ls = node.leader_state.write();
let state = ls.as_mut().expect("leader state");
state.next_index.insert(2, 1);
}
let mut receiver = crate::snapshot::SnapshotReceiver::new(6, 1);
let mut chunk_count = 0usize;
let assembled_snapshot = loop {
let req = node
.prepare_install_snapshot(2)?
.expect("should produce a chunk while peer is still lagging");
chunk_count += 1;
let done = req.done;
match receiver.receive_chunk(&req)? {
Some(snap) => {
assert!(done, "receive_chunk returned Some but done was false");
break snap;
}
None => {
assert!(!done, "chunk was not done but receiver returned None");
}
}
};
assert!(
chunk_count > 1,
"expected multiple chunks, got {}",
chunk_count
);
assert!(
assembled_snapshot.verify_checksum(),
"reassembled snapshot checksum must pass"
);
assert!(
node.snapshot_streamers.read().is_empty(),
"streamer map must be empty after the final chunk is delivered"
);
Ok(())
}
#[test]
fn test_streamer_cleared_when_follower_catches_up() -> RaftResult<()> {
let dir = tempfile::TempDir::new().expect("create temp dir");
let state_data: Vec<u8> = (0..512u16).map(|i| (i % 256) as u8).collect();
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.snapshot_dir = Some(dir.path().to_path_buf());
config.snapshot_threshold = 5;
config.snapshot_chunk_threshold_bytes = 16;
config.snapshot_chunk_size_bytes = 64;
let node = RaftNode::new(config).expect("create node");
node.start_election();
let resp = RequestVoteResponse::granted(1);
node.handle_vote_response(2, resp);
for i in 0..6 {
node.propose(Command::from_str(&format!("cmd{}", i)))
.expect("propose");
}
{
let mut log = node.log.write();
log.set_commit_index(6).expect("set commit");
log.set_applied_index(6).expect("set applied");
}
node.maybe_create_snapshot(state_data)
.expect("create snapshot");
{
let mut ls = node.leader_state.write();
let state = ls.as_mut().expect("leader state");
state.next_index.insert(2, 1);
}
let first_req = node.prepare_install_snapshot(2)?;
assert!(first_req.is_some(), "first call must produce a chunk");
assert!(
!node.snapshot_streamers.read().is_empty(),
"streamer must be registered after the first chunk"
);
{
let mut ls = node.leader_state.write();
let state = ls.as_mut().expect("leader state");
state.next_index.insert(2, 10);
}
let no_req = node.prepare_install_snapshot(2)?;
assert!(no_req.is_none(), "caught-up peer must receive Ok(None)");
assert!(
node.snapshot_streamers.read().is_empty(),
"streamer must be removed after the peer catches up"
);
Ok(())
}
#[test]
fn test_snapshot_transfer_to_new_node() -> RaftResult<()> {
let dir = tempfile::TempDir::new().expect("create temp dir");
let mut config = RaftConfig::new(1, vec![1, 2, 3]);
config.snapshot_dir = Some(dir.path().to_path_buf());
config.snapshot_threshold = 5;
let leader = RaftNode::new(config).expect("create leader node");
leader.start_election();
let granted = RequestVoteResponse::granted(1);
leader.handle_vote_response(2, granted);
let state_payload = b"cluster-state-v1".to_vec();
for i in 0..6u64 {
leader
.propose(Command::from_str(&format!("SET k{} {}", i, i)))
.expect("propose must succeed on leader");
}
{
let mut log = leader.log.write();
log.set_commit_index(6).expect("set commit index");
log.set_applied_index(6).expect("set applied index");
}
let snapped = leader
.maybe_create_snapshot(state_payload)
.expect("snapshot creation must succeed");
assert!(snapped, "snapshot must have been created");
{
let log = leader.log.read();
assert_eq!(
log.snapshot_index(),
6,
"log must be compacted to snap index 6"
);
}
{
let mut ls_guard = leader.leader_state.write();
let ls = ls_guard.as_mut().expect("leader state must exist");
ls.next_index.insert(4, 1);
ls.match_index.entry(4).or_insert(0);
}
assert!(
leader.follower_needs_snapshot(4),
"leader must identify node 4 as needing a snapshot"
);
let snap_req = leader
.prepare_install_snapshot(4)
.expect("prepare_install_snapshot must not error")
.expect("must produce a request for a lagging peer");
assert_eq!(
snap_req.last_included_index, 6,
"snapshot request must cover up to the applied index"
);
assert!(
snap_req.done,
"small snapshot must complete in a single request"
);
let follower_config = RaftConfig::new(4, vec![1, 2, 3, 4, 5]);
let follower = RaftNode::new(follower_config).expect("create follower node 4");
let install_resp = follower
.handle_install_snapshot(snap_req)
.expect("handle_install_snapshot must succeed");
assert_eq!(
install_resp.term,
leader.current_term(),
"follower term must match leader term post-install"
);
{
let log = follower.log.read();
assert_eq!(
log.snapshot_index(),
6,
"follower log snapshot index must be 6 after install"
);
}
Ok(())
}