nodedb_cluster/install_snapshot/
finalize.rs1use std::path::PathBuf;
26use std::sync::{Arc, Mutex};
27
28use nodedb_raft::{InstallSnapshotRequest, InstallSnapshotResponse};
29
30use crate::error::ClusterError;
31use crate::install_snapshot::state::PartialSnapshotState;
32use crate::multi_raft::MultiRaft;
33
34pub async fn commit(
40 state: PartialSnapshotState,
41 multi_raft: &Arc<Mutex<MultiRaft>>,
42) -> Result<InstallSnapshotResponse, ClusterError> {
43 let group_id = state.group_id;
44 let partial_path = state.partial_path.clone();
45 let expected_crc = state.running_crc;
46
47 if let Some(file) = state.partial_file {
51 tokio::task::spawn_blocking(move || -> std::io::Result<()> { file.sync_all() })
52 .await
53 .map_err(|e| ClusterError::PartialSnapshotCorrupt {
54 group_id,
55 detail: format!("spawn_blocking join error on sync: {e}"),
56 })?
57 .map_err(|e| ClusterError::Storage {
58 detail: format!("sync partial file for group {group_id}: {e}"),
59 })?;
60 }
61
62 let file_bytes = tokio::task::spawn_blocking({
65 let path = partial_path.clone();
66 move || std::fs::read(&path)
67 })
68 .await
69 .map_err(|e| ClusterError::PartialSnapshotCorrupt {
70 group_id,
71 detail: format!("spawn_blocking join error on read: {e}"),
72 })?
73 .map_err(|e| ClusterError::Storage {
74 detail: format!("read partial file for group {group_id}: {e}"),
75 })?;
76
77 if !file_bytes.is_empty() {
78 let computed = crc32c::crc32c(&file_bytes);
79 if computed != expected_crc {
80 return Err(ClusterError::SnapshotCrcMismatch {
81 group_id,
82 stored: expected_crc,
83 computed,
84 });
85 }
86 }
87
88 let snap_path = snap_path_for(&partial_path);
90 tokio::task::spawn_blocking({
91 let from = partial_path.clone();
92 let to = snap_path.clone();
93 move || std::fs::rename(&from, &to)
94 })
95 .await
96 .map_err(|e| ClusterError::PartialSnapshotCorrupt {
97 group_id,
98 detail: format!("spawn_blocking join error on rename: {e}"),
99 })?
100 .map_err(|e| ClusterError::Storage {
101 detail: format!("rename partial to snap for group {group_id}: {e}"),
102 })?;
103
104 let req = InstallSnapshotRequest {
108 term: state.term,
109 leader_id: state.leader_id,
110 last_included_index: state.last_included_index,
111 last_included_term: state.last_included_term,
112 offset: 0,
113 data: file_bytes,
114 done: true,
115 group_id,
116 total_size: 0,
117 };
118
119 let mut mr = multi_raft.lock().unwrap_or_else(|p| p.into_inner());
120 let resp = mr.handle_install_snapshot(&req)?;
121 Ok(resp)
122}
123
124fn snap_path_for(partial: &std::path::Path) -> PathBuf {
126 let parent = partial
127 .parent()
128 .unwrap_or_else(|| std::path::Path::new("."));
129 let stem = partial
130 .file_stem()
131 .unwrap_or_else(|| std::ffi::OsStr::new("unknown"));
132 parent.join(format!("{}.snap", stem.to_string_lossy()))
133}