Skip to main content

maw/merge/
commit.rs

1#![allow(clippy::missing_errors_doc)]
2
3use std::fs::{self, File};
4use std::io::Write;
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use serde::{Deserialize, Serialize};
9
10use crate::model::types::GitOid;
11use crate::refs::{self, RefError};
12
13/// Commit-phase state persistence path relative to the repo root.
14///
15/// This is intentionally distinct from `.manifold/merge-state.json` used by
16/// the main merge state machine (`merge_state.rs`).
17const MERGE_STATE_REL_PATH: &str = ".manifold/commit-state.json";
18
19/// Result of running the COMMIT phase.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum CommitResult {
22    /// Both refs moved to the candidate commit.
23    Committed,
24}
25
26/// Recovery result for a partially-applied commit phase.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum CommitRecovery {
29    /// Both refs already point at the candidate commit.
30    AlreadyCommitted,
31    /// Epoch ref was moved and main ref was finalized during recovery.
32    FinalizedMainRef,
33    /// Neither ref moved yet.
34    NotCommitted,
35}
36
37/// COMMIT phase and merge-state errors.
38#[derive(Debug)]
39pub enum CommitError {
40    Ref(RefError),
41    Io(std::io::Error),
42    Serde(serde_json::Error),
43    /// Epoch was advanced but branch ref update failed.
44    /// This is recoverable by calling [`recover_partial_commit`].
45    PartialCommit,
46    /// Ref state does not match any expected crash-recovery shape.
47    InconsistentRefState {
48        epoch: Option<GitOid>,
49        branch: Option<GitOid>,
50    },
51    /// Injected failpoint fired during commit phase.
52    #[cfg(feature = "failpoints")]
53    Failpoint(String),
54}
55
56impl std::fmt::Display for CommitError {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        match self {
59            Self::Ref(e) => write!(f, "ref update failed: {e}"),
60            Self::Io(e) => write!(f, "I/O error: {e}"),
61            Self::Serde(e) => write!(f, "merge-state JSON error: {e}"),
62            Self::PartialCommit => write!(
63                f,
64                "commit phase partially applied: epoch ref moved but branch ref did not"
65            ),
66            Self::InconsistentRefState { epoch, branch } => write!(
67                f,
68                "inconsistent ref state during commit recovery (epoch={epoch:?}, branch={branch:?})"
69            ),
70            #[cfg(feature = "failpoints")]
71            Self::Failpoint(msg) => write!(f, "failpoint: {msg}"),
72        }
73    }
74}
75
76impl std::error::Error for CommitError {
77    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
78        match self {
79            Self::Ref(e) => Some(e),
80            Self::Io(e) => Some(e),
81            Self::Serde(e) => Some(e),
82            Self::PartialCommit | Self::InconsistentRefState { .. } => None,
83            #[cfg(feature = "failpoints")]
84            Self::Failpoint(_) => None,
85        }
86    }
87}
88
89impl From<RefError> for CommitError {
90    fn from(value: RefError) -> Self {
91        Self::Ref(value)
92    }
93}
94
95impl From<std::io::Error> for CommitError {
96    fn from(value: std::io::Error) -> Self {
97        Self::Io(value)
98    }
99}
100
101impl From<serde_json::Error> for CommitError {
102    fn from(value: serde_json::Error) -> Self {
103        Self::Serde(value)
104    }
105}
106
107/// Persisted merge-state for COMMIT phase recovery.
108#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
109pub struct CommitStateFile {
110    pub phase: CommitPhase,
111    pub epoch_before: GitOid,
112    pub epoch_candidate: GitOid,
113    pub epoch_ref_updated: bool,
114    pub branch_ref_updated: bool,
115    pub updated_at_unix_ms: u128,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
119#[serde(rename_all = "snake_case")]
120pub enum CommitPhase {
121    Commit,
122    Committed,
123}
124
125/// Run COMMIT phase:
126/// 1. CAS move `refs/manifold/epoch/current` old->new
127/// 2. CAS move `refs/heads/<branch>` old->new
128/// 3. Persist merge-state at each transition (atomic write + fsync)
129pub fn run_commit_phase(
130    root: &Path,
131    branch: &str,
132    epoch_before: &GitOid,
133    epoch_candidate: &GitOid,
134) -> Result<CommitResult, CommitError> {
135    let mut state = CommitStateFile {
136        phase: CommitPhase::Commit,
137        epoch_before: epoch_before.clone(),
138        epoch_candidate: epoch_candidate.clone(),
139        epoch_ref_updated: false,
140        branch_ref_updated: false,
141        updated_at_unix_ms: now_unix_ms(),
142    };
143
144    write_merge_state(root, &state)?;
145
146    // FP: crash before the atomic CAS that moves epoch + branch refs.
147    fp_commit("FP_COMMIT_BEFORE_BRANCH_CAS")?;
148
149    let branch_ref = format!("refs/heads/{branch}");
150    refs::update_refs_atomic(
151        root,
152        &[
153            (refs::EPOCH_CURRENT, epoch_before, epoch_candidate),
154            (&branch_ref, epoch_before, epoch_candidate),
155        ],
156    )?;
157
158    // FP: crash after epoch ref moved — HIGHEST risk point: refs advanced
159    // but commit-state.json still says "Commit".
160    fp_commit("FP_COMMIT_BETWEEN_CAS_OPS")?;
161
162    // FP: crash after CAS, before final state persistence.
163    fp_commit("FP_COMMIT_AFTER_EPOCH_CAS")?;
164
165    state.phase = CommitPhase::Committed;
166    state.epoch_ref_updated = true;
167    state.branch_ref_updated = true;
168    state.updated_at_unix_ms = now_unix_ms();
169    write_merge_state(root, &state)?;
170
171    Ok(CommitResult::Committed)
172}
173
174/// Crash-recover COMMIT phase by inspecting refs and finalizing if safe.
175pub fn recover_partial_commit(
176    root: &Path,
177    branch: &str,
178    epoch_before: &GitOid,
179    epoch_candidate: &GitOid,
180) -> Result<CommitRecovery, CommitError> {
181    let branch_ref = format!("refs/heads/{branch}");
182    let epoch = refs::read_ref(root, refs::EPOCH_CURRENT)?;
183    let branch_head = refs::read_ref(root, &branch_ref)?;
184
185    if epoch.as_ref() == Some(epoch_candidate) && branch_head.as_ref() == Some(epoch_candidate) {
186        return Ok(CommitRecovery::AlreadyCommitted);
187    }
188
189    if epoch.as_ref() == Some(epoch_candidate) && branch_head.as_ref() == Some(epoch_before) {
190        refs::write_ref_cas(root, &branch_ref, epoch_before, epoch_candidate)?;
191
192        let state = CommitStateFile {
193            phase: CommitPhase::Committed,
194            epoch_before: epoch_before.clone(),
195            epoch_candidate: epoch_candidate.clone(),
196            epoch_ref_updated: true,
197            branch_ref_updated: true,
198            updated_at_unix_ms: now_unix_ms(),
199        };
200        write_merge_state(root, &state)?;
201
202        return Ok(CommitRecovery::FinalizedMainRef);
203    }
204
205    if epoch.as_ref() == Some(epoch_before) && branch_head.as_ref() == Some(epoch_before) {
206        return Ok(CommitRecovery::NotCommitted);
207    }
208
209    Err(CommitError::InconsistentRefState {
210        epoch,
211        branch: branch_head,
212    })
213}
214
215pub fn read_merge_state(root: &Path) -> Result<CommitStateFile, CommitError> {
216    let path = merge_state_path(root);
217    let bytes = fs::read(path)?;
218    Ok(serde_json::from_slice(&bytes)?)
219}
220
221fn merge_state_path(root: &Path) -> PathBuf {
222    root.join(MERGE_STATE_REL_PATH)
223}
224
225fn write_merge_state(root: &Path, state: &CommitStateFile) -> Result<(), CommitError> {
226    let path = merge_state_path(root);
227    if let Some(parent) = path.parent() {
228        fs::create_dir_all(parent)?;
229    }
230
231    let tmp = path.with_extension("tmp");
232    let data = serde_json::to_vec_pretty(state)?;
233
234    let mut file = File::create(&tmp)?;
235    file.write_all(&data)?;
236    file.write_all(b"\n")?;
237    file.sync_all()?;
238
239    fs::rename(&tmp, &path)?;
240
241    if let Some(parent) = path.parent() {
242        // Fsync parent directory so the rename is durable across power loss.
243        let dir = File::open(parent)?;
244        dir.sync_all()?;
245    }
246
247    Ok(())
248}
249
250/// Invoke a failpoint and convert the result to [`CommitError`].
251///
252/// Without the `failpoints` feature this compiles to a no-op.
253fn fp_commit(_name: &str) -> Result<(), CommitError> {
254    #[cfg(feature = "failpoints")]
255    {
256        crate::fp!(_name).map_err(|e| CommitError::Failpoint(e.to_string()))?;
257    }
258    Ok(())
259}
260
261fn now_unix_ms() -> u128 {
262    SystemTime::now()
263        .duration_since(UNIX_EPOCH)
264        .unwrap_or_default()
265        .as_millis()
266}
267
268#[cfg(test)]
269mod tests {
270    use std::process::Command;
271
272    use tempfile::TempDir;
273
274    use super::*;
275
276    fn run_git(root: &Path, args: &[&str]) {
277        let out = Command::new("git")
278            .args(args)
279            .current_dir(root)
280            .output()
281            .unwrap();
282        assert!(
283            out.status.success(),
284            "git {} failed: {}",
285            args.join(" "),
286            String::from_utf8_lossy(&out.stderr)
287        );
288    }
289
290    fn setup_repo_with_main() -> (TempDir, GitOid, GitOid) {
291        let dir = TempDir::new().unwrap();
292        let root = dir.path();
293
294        run_git(root, &["init"]);
295        run_git(root, &["config", "user.name", "Test"]);
296        run_git(root, &["config", "user.email", "test@test.com"]);
297        run_git(root, &["config", "commit.gpgsign", "false"]);
298
299        fs::write(root.join("README.md"), "hello\n").unwrap();
300        run_git(root, &["add", "."]);
301        run_git(root, &["commit", "-m", "initial"]);
302        run_git(root, &["branch", "-M", "main"]);
303
304        let old = git_oid(root, "HEAD");
305
306        fs::write(root.join("README.md"), "hello world\n").unwrap();
307        run_git(root, &["add", "."]);
308        run_git(root, &["commit", "-m", "candidate"]);
309
310        let new = git_oid(root, "HEAD");
311
312        // Reset branch and epoch ref to old so COMMIT phase can advance both.
313        run_git(root, &["update-ref", "refs/heads/main", old.as_str()]);
314        run_git(root, &["update-ref", refs::EPOCH_CURRENT, old.as_str()]);
315
316        (dir, old, new)
317    }
318
319    fn git_oid(root: &Path, rev: &str) -> GitOid {
320        let out = Command::new("git")
321            .args(["rev-parse", rev])
322            .current_dir(root)
323            .output()
324            .unwrap();
325        assert!(
326            out.status.success(),
327            "rev-parse {rev} failed: {}",
328            String::from_utf8_lossy(&out.stderr)
329        );
330        GitOid::new(String::from_utf8_lossy(&out.stdout).trim()).unwrap()
331    }
332
333    fn assert_repo_usable(root: &Path) {
334        run_git(root, &["fsck", "--no-progress", "--connectivity-only"]);
335    }
336
337    fn assert_commit_exists(root: &Path, oid: &GitOid) {
338        run_git(
339            root,
340            &["cat-file", "-e", &format!("{}^{{commit}}", oid.as_str())],
341        );
342    }
343
344    #[test]
345    fn commit_phase_updates_epoch_and_main() {
346        let (dir, old, new) = setup_repo_with_main();
347        let root = dir.path();
348
349        let result = run_commit_phase(root, "main", &old, &new).unwrap();
350        assert_eq!(result, CommitResult::Committed);
351
352        let epoch = refs::read_ref(root, refs::EPOCH_CURRENT).unwrap();
353        let main = refs::read_ref(root, "refs/heads/main").unwrap();
354        assert_eq!(epoch, Some(new.clone()));
355        assert_eq!(main, Some(new.clone()));
356
357        let state = read_merge_state(root).unwrap();
358        assert_eq!(state.phase, CommitPhase::Committed);
359        assert!(state.epoch_ref_updated);
360        assert!(state.branch_ref_updated);
361
362        assert_repo_usable(root);
363        assert_commit_exists(root, &old);
364        assert_commit_exists(root, &new);
365    }
366
367    #[test]
368    fn recovery_finalizes_when_only_epoch_moved() {
369        let (dir, old, new) = setup_repo_with_main();
370        let root = dir.path();
371
372        refs::advance_epoch(root, &old, &new).unwrap();
373
374        let recovery = recover_partial_commit(root, "main", &old, &new).unwrap();
375        assert_eq!(recovery, CommitRecovery::FinalizedMainRef);
376
377        let main = refs::read_ref(root, "refs/heads/main").unwrap();
378        assert_eq!(main, Some(new.clone()));
379
380        assert_repo_usable(root);
381        assert_commit_exists(root, &old);
382        assert_commit_exists(root, &new);
383    }
384
385    #[test]
386    fn recovery_reports_already_committed_when_both_refs_new() {
387        let (dir, old, new) = setup_repo_with_main();
388        let root = dir.path();
389
390        run_git(root, &["update-ref", refs::EPOCH_CURRENT, new.as_str()]);
391        run_git(root, &["update-ref", "refs/heads/main", new.as_str()]);
392
393        let recovery = recover_partial_commit(root, "main", &old, &new).unwrap();
394        assert_eq!(recovery, CommitRecovery::AlreadyCommitted);
395
396        assert_repo_usable(root);
397        assert_commit_exists(root, &old);
398        assert_commit_exists(root, &new);
399    }
400
401    #[test]
402    fn recovery_reports_not_committed_when_both_refs_old() {
403        let (dir, old, new) = setup_repo_with_main();
404        let root = dir.path();
405
406        let recovery = recover_partial_commit(root, "main", &old, &new).unwrap();
407        assert_eq!(recovery, CommitRecovery::NotCommitted);
408
409        assert_repo_usable(root);
410        assert_commit_exists(root, &old);
411        assert_commit_exists(root, &new);
412    }
413}