use nodedb_raft::{InstallSnapshotRequest, transport::RaftTransport};
use crate::error::ClusterError;
use crate::transport::NexarTransport;
pub struct SendChunkedParams<'a> {
pub peer: u64,
pub group_id: u64,
pub term: u64,
pub leader_id: u64,
pub last_included_index: u64,
pub last_included_term: u64,
pub snapshot_bytes: &'a [u8],
pub chunk_bytes: u64,
}
pub async fn send_chunked(
transport: &NexarTransport,
params: SendChunkedParams<'_>,
) -> Result<u64, ClusterError> {
let SendChunkedParams {
peer,
group_id,
term,
leader_id,
last_included_index,
last_included_term,
snapshot_bytes,
chunk_bytes,
} = params;
if snapshot_bytes.is_empty() {
let req = InstallSnapshotRequest {
term,
leader_id,
last_included_index,
last_included_term,
offset: 0,
data: vec![],
done: true,
group_id,
total_size: 0,
};
let resp =
transport
.install_snapshot(peer, req)
.await
.map_err(|e| ClusterError::Transport {
detail: format!("install_snapshot peer={peer} group={group_id}: {e}"),
})?;
return Ok(resp.term);
}
let chunk_size = chunk_bytes.max(1) as usize;
let total = snapshot_bytes.len() as u64;
let mut offset = 0usize;
let mut last_term = term;
while offset < snapshot_bytes.len() {
let end = (offset + chunk_size).min(snapshot_bytes.len());
let chunk_payload = &snapshot_bytes[offset..end];
let done = end == snapshot_bytes.len();
let framed = chunk_payload.to_vec();
let req = InstallSnapshotRequest {
term,
leader_id,
last_included_index,
last_included_term,
offset: offset as u64,
data: framed,
done,
group_id,
total_size: total,
};
let resp =
transport
.install_snapshot(peer, req)
.await
.map_err(|e| ClusterError::Transport {
detail: format!(
"install_snapshot peer={peer} group={group_id} offset={offset}: {e}"
),
})?;
last_term = resp.term;
offset = end;
}
Ok(last_term)
}