Skip to main content

nodedb_cluster/install_snapshot/
finalize.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Final snapshot commit: CRC validation → atomic rename → Raft log boundary advance.
4//!
5//! Called only when the last chunk (`done == true`) has been written to the
6//! `.partial` file. Performs three operations in sequence:
7//!
8//! 1. **CRC validation** — re-reads the assembled file and recomputes the
9//!    CRC32C. If it disagrees with the running CRC accumulated during chunk
10//!    writes, the partial file is left in place and `SnapshotCrcMismatch` is
11//!    returned. The partial file is intentionally *not* deleted on CRC failure
12//!    so the operator can inspect it.
13//!
14//! 2. **Atomic rename** — the `.partial` file is renamed to `<group_id>.snap`.
15//!    The rename is atomic on POSIX filesystems (same directory, same inode
16//!    table). If the process crashes between steps 1 and 2, the partial file
17//!    survives; the GC sweeper will remove it after `orphan_partial_max_age_secs`.
18//!
19//! 3. **Raft log boundary advance** — calls
20//!    `MultiRaft::handle_install_snapshot` to advance the Raft log pointer to
21//!    `last_included_index` / `last_included_term`. This is the same call the
22//!    existing stub in `handle_rpc.rs` made; we now call it only here, after
23//!    CRC validation, to prevent advancing Raft state on corrupt data.
24
25use std::path::PathBuf;
26use std::sync::{Arc, Mutex};
27
28use nodedb_raft::{InstallSnapshotRequest, InstallSnapshotResponse};
29
30use crate::error::ClusterError;
31use crate::install_snapshot::state::PartialSnapshotState;
32use crate::multi_raft::MultiRaft;
33
34/// Validate, rename, and advance Raft state after the last chunk.
35///
36/// Returns the `InstallSnapshotResponse` produced by
37/// `MultiRaft::handle_install_snapshot` so callers can propagate the
38/// Raft term back to the leader.
39pub async fn commit(
40    state: PartialSnapshotState,
41    multi_raft: &Arc<Mutex<MultiRaft>>,
42) -> Result<InstallSnapshotResponse, ClusterError> {
43    let group_id = state.group_id;
44    let partial_path = state.partial_path.clone();
45    let expected_crc = state.running_crc;
46
47    // Flush and close the partial file before reading it back.
48    // `state.partial_file` may be `None` if the snapshot had zero bytes
49    // (bootstrap stub). In that case skip the I/O validation.
50    if let Some(file) = state.partial_file {
51        tokio::task::spawn_blocking(move || -> std::io::Result<()> { file.sync_all() })
52            .await
53            .map_err(|e| ClusterError::PartialSnapshotCorrupt {
54                group_id,
55                detail: format!("spawn_blocking join error on sync: {e}"),
56            })?
57            .map_err(|e| ClusterError::Storage {
58                detail: format!("sync partial file for group {group_id}: {e}"),
59            })?;
60    }
61
62    // CRC validation: re-read the file and compare against running CRC.
63    // If the file is empty (bootstrap stub), skip.
64    let file_bytes = tokio::task::spawn_blocking({
65        let path = partial_path.clone();
66        move || std::fs::read(&path)
67    })
68    .await
69    .map_err(|e| ClusterError::PartialSnapshotCorrupt {
70        group_id,
71        detail: format!("spawn_blocking join error on read: {e}"),
72    })?
73    .map_err(|e| ClusterError::Storage {
74        detail: format!("read partial file for group {group_id}: {e}"),
75    })?;
76
77    if !file_bytes.is_empty() {
78        let computed = crc32c::crc32c(&file_bytes);
79        if computed != expected_crc {
80            return Err(ClusterError::SnapshotCrcMismatch {
81                group_id,
82                stored: expected_crc,
83                computed,
84            });
85        }
86    }
87
88    // Atomic rename: .partial → .snap
89    let snap_path = snap_path_for(&partial_path);
90    tokio::task::spawn_blocking({
91        let from = partial_path.clone();
92        let to = snap_path.clone();
93        move || std::fs::rename(&from, &to)
94    })
95    .await
96    .map_err(|e| ClusterError::PartialSnapshotCorrupt {
97        group_id,
98        detail: format!("spawn_blocking join error on rename: {e}"),
99    })?
100    .map_err(|e| ClusterError::Storage {
101        detail: format!("rename partial to snap for group {group_id}: {e}"),
102    })?;
103
104    // Advance Raft log boundary. Build a minimal InstallSnapshotRequest
105    // that satisfies `handle_install_snapshot` — `data` is the assembled
106    // bytes (may be empty for the bootstrap stub), `done` is always `true`.
107    let req = InstallSnapshotRequest {
108        term: state.term,
109        leader_id: state.leader_id,
110        last_included_index: state.last_included_index,
111        last_included_term: state.last_included_term,
112        offset: 0,
113        data: file_bytes,
114        done: true,
115        group_id,
116        total_size: 0,
117    };
118
119    let mut mr = multi_raft.lock().unwrap_or_else(|p| p.into_inner());
120    let resp = mr.handle_install_snapshot(&req)?;
121    Ok(resp)
122}
123
124/// Derive the `.snap` path from the `.partial` path (same directory, stem only).
125fn snap_path_for(partial: &std::path::Path) -> PathBuf {
126    let parent = partial
127        .parent()
128        .unwrap_or_else(|| std::path::Path::new("."));
129    let stem = partial
130        .file_stem()
131        .unwrap_or_else(|| std::ffi::OsStr::new("unknown"));
132    parent.join(format!("{}.snap", stem.to_string_lossy()))
133}