use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use nodedb_raft::{InstallSnapshotRequest, InstallSnapshotResponse};
use crate::error::ClusterError;
use crate::install_snapshot::state::PartialSnapshotState;
use crate::multi_raft::MultiRaft;
pub async fn commit(
state: PartialSnapshotState,
multi_raft: &Arc<Mutex<MultiRaft>>,
) -> Result<InstallSnapshotResponse, ClusterError> {
let group_id = state.group_id;
let partial_path = state.partial_path.clone();
let expected_crc = state.running_crc;
if let Some(file) = state.partial_file {
tokio::task::spawn_blocking(move || -> std::io::Result<()> { file.sync_all() })
.await
.map_err(|e| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: format!("spawn_blocking join error on sync: {e}"),
})?
.map_err(|e| ClusterError::Storage {
detail: format!("sync partial file for group {group_id}: {e}"),
})?;
}
let file_bytes = tokio::task::spawn_blocking({
let path = partial_path.clone();
move || std::fs::read(&path)
})
.await
.map_err(|e| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: format!("spawn_blocking join error on read: {e}"),
})?
.map_err(|e| ClusterError::Storage {
detail: format!("read partial file for group {group_id}: {e}"),
})?;
if !file_bytes.is_empty() {
let computed = crc32c::crc32c(&file_bytes);
if computed != expected_crc {
return Err(ClusterError::SnapshotCrcMismatch {
group_id,
stored: expected_crc,
computed,
});
}
}
let snap_path = snap_path_for(&partial_path);
tokio::task::spawn_blocking({
let from = partial_path.clone();
let to = snap_path.clone();
move || std::fs::rename(&from, &to)
})
.await
.map_err(|e| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: format!("spawn_blocking join error on rename: {e}"),
})?
.map_err(|e| ClusterError::Storage {
detail: format!("rename partial to snap for group {group_id}: {e}"),
})?;
let req = InstallSnapshotRequest {
term: state.term,
leader_id: state.leader_id,
last_included_index: state.last_included_index,
last_included_term: state.last_included_term,
offset: 0,
data: file_bytes,
done: true,
group_id,
total_size: 0,
};
let mut mr = multi_raft.lock().unwrap_or_else(|p| p.into_inner());
let resp = mr.handle_install_snapshot(&req)?;
Ok(resp)
}
fn snap_path_for(partial: &std::path::Path) -> PathBuf {
let parent = partial
.parent()
.unwrap_or_else(|| std::path::Path::new("."));
let stem = partial
.file_stem()
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"));
parent.join(format!("{}.snap", stem.to_string_lossy()))
}