use std::io::Write as _;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::collections::HashMap;
use nodedb_raft::InstallSnapshotRequest;
use crate::error::ClusterError;
use crate::install_snapshot::finalize;
use crate::install_snapshot::state::PartialSnapshotState;
use crate::multi_raft::MultiRaft;
pub type PartialSnapshotMap = Mutex<HashMap<u64, PartialSnapshotState>>;
#[derive(Debug)]
pub enum ChunkOutcome {
Pending,
Committed(nodedb_raft::InstallSnapshotResponse),
}
pub async fn handle_chunk(
req: &InstallSnapshotRequest,
partial_map: &PartialSnapshotMap,
data_dir: &Path,
multi_raft: &std::sync::Arc<std::sync::Mutex<MultiRaft>>,
) -> Result<ChunkOutcome, ClusterError> {
let group_id = req.group_id;
let recv_dir = data_dir.join("recv_snapshots");
tokio::task::spawn_blocking({
let recv_dir = recv_dir.clone();
move || std::fs::create_dir_all(&recv_dir)
})
.await
.map_err(|e| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: format!("spawn_blocking join error: {e}"),
})?
.map_err(|e| ClusterError::Storage {
detail: format!("create recv_snapshots dir: {e}"),
})?;
if req.offset == 0 {
let partial_path = partial_path_for(&recv_dir, group_id);
let partial_file = tokio::task::spawn_blocking({
let path = partial_path.clone();
move || {
std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)
}
})
.await
.map_err(|e| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: format!("spawn_blocking join error: {e}"),
})?
.map_err(|e| ClusterError::Storage {
detail: format!("open partial file for group {group_id}: {e}"),
})?;
let state = PartialSnapshotState {
group_id,
leader_id: req.leader_id,
term: req.term,
last_included_index: req.last_included_index,
last_included_term: req.last_included_term,
next_expected_offset: 0,
running_crc: 0,
running_crc_initialized: false,
partial_file: Some(partial_file),
partial_path,
};
let mut map = partial_map.lock().unwrap_or_else(|p| p.into_inner());
map.insert(group_id, state);
} else {
let map = partial_map.lock().unwrap_or_else(|p| p.into_inner());
match map.get(&group_id) {
None => {
return Err(ClusterError::SnapshotOffsetRegression {
group_id,
expected: 0,
actual: req.offset,
});
}
Some(state) if state.next_expected_offset != req.offset => {
let expected = state.next_expected_offset;
let actual = req.offset;
drop(map);
return Err(ClusterError::SnapshotOffsetRegression {
group_id,
expected,
actual,
});
}
Some(_) => {}
}
}
let chunk_bytes = req.data.clone();
let written_len = chunk_bytes.len() as u64;
let file = {
let file = {
let mut map = partial_map.lock().unwrap_or_else(|p| p.into_inner());
let state =
map.get_mut(&group_id)
.ok_or_else(|| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: "partial state disappeared during write".into(),
})?;
state
.partial_file
.take()
.ok_or_else(|| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: "partial file already taken".into(),
})?
};
tokio::task::spawn_blocking({
let bytes = chunk_bytes.clone();
move || -> std::io::Result<std::fs::File> {
let mut f = file;
f.write_all(&bytes)?;
f.flush()?;
Ok(f)
}
})
.await
.map_err(|e| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: format!("spawn_blocking join error during write: {e}"),
})?
.map_err(|e| ClusterError::Storage {
detail: format!("write to partial file for group {group_id}: {e}"),
})?
};
{
let mut map = partial_map.lock().unwrap_or_else(|p| p.into_inner());
let state = map
.get_mut(&group_id)
.ok_or_else(|| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: "partial state disappeared after write".into(),
})?;
if written_len > 0 {
if !state.running_crc_initialized {
state.running_crc = crc32c::crc32c(&chunk_bytes);
state.running_crc_initialized = true;
} else {
state.running_crc = crc32c::crc32c_append(state.running_crc, &chunk_bytes);
}
}
state.next_expected_offset += written_len;
state.partial_file = Some(file);
}
if !req.done {
return Ok(ChunkOutcome::Pending);
}
let state = {
let mut map = partial_map.lock().unwrap_or_else(|p| p.into_inner());
map.remove(&group_id)
.ok_or_else(|| ClusterError::PartialSnapshotCorrupt {
group_id,
detail: "partial state disappeared before finalization".into(),
})?
};
let resp = finalize::commit(state, multi_raft).await?;
Ok(ChunkOutcome::Committed(resp))
}
pub fn partial_path_for(recv_dir: &Path, group_id: u64) -> PathBuf {
recv_dir.join(format!("{group_id}.partial"))
}