1#![allow(clippy::missing_errors_doc)]
2
3use std::fs::{self, File};
4use std::io::Write;
5use std::path::{Path, PathBuf};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use serde::{Deserialize, Serialize};
9
10use crate::model::types::GitOid;
11use crate::refs::{self, RefError};
12
13const MERGE_STATE_REL_PATH: &str = ".manifold/commit-state.json";
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum CommitResult {
22 Committed,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum CommitRecovery {
29 AlreadyCommitted,
31 FinalizedMainRef,
33 NotCommitted,
35}
36
37#[derive(Debug)]
39pub enum CommitError {
40 Ref(RefError),
41 Io(std::io::Error),
42 Serde(serde_json::Error),
43 PartialCommit,
46 InconsistentRefState {
48 epoch: Option<GitOid>,
49 branch: Option<GitOid>,
50 },
51 #[cfg(feature = "failpoints")]
53 Failpoint(String),
54}
55
56impl std::fmt::Display for CommitError {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 match self {
59 Self::Ref(e) => write!(f, "ref update failed: {e}"),
60 Self::Io(e) => write!(f, "I/O error: {e}"),
61 Self::Serde(e) => write!(f, "merge-state JSON error: {e}"),
62 Self::PartialCommit => write!(
63 f,
64 "commit phase partially applied: epoch ref moved but branch ref did not"
65 ),
66 Self::InconsistentRefState { epoch, branch } => write!(
67 f,
68 "inconsistent ref state during commit recovery (epoch={epoch:?}, branch={branch:?})"
69 ),
70 #[cfg(feature = "failpoints")]
71 Self::Failpoint(msg) => write!(f, "failpoint: {msg}"),
72 }
73 }
74}
75
76impl std::error::Error for CommitError {
77 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
78 match self {
79 Self::Ref(e) => Some(e),
80 Self::Io(e) => Some(e),
81 Self::Serde(e) => Some(e),
82 Self::PartialCommit | Self::InconsistentRefState { .. } => None,
83 #[cfg(feature = "failpoints")]
84 Self::Failpoint(_) => None,
85 }
86 }
87}
88
89impl From<RefError> for CommitError {
90 fn from(value: RefError) -> Self {
91 Self::Ref(value)
92 }
93}
94
95impl From<std::io::Error> for CommitError {
96 fn from(value: std::io::Error) -> Self {
97 Self::Io(value)
98 }
99}
100
101impl From<serde_json::Error> for CommitError {
102 fn from(value: serde_json::Error) -> Self {
103 Self::Serde(value)
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
109pub struct CommitStateFile {
110 pub phase: CommitPhase,
111 pub epoch_before: GitOid,
112 pub epoch_candidate: GitOid,
113 pub epoch_ref_updated: bool,
114 pub branch_ref_updated: bool,
115 pub updated_at_unix_ms: u128,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
119#[serde(rename_all = "snake_case")]
120pub enum CommitPhase {
121 Commit,
122 Committed,
123}
124
125pub fn run_commit_phase(
130 root: &Path,
131 branch: &str,
132 epoch_before: &GitOid,
133 epoch_candidate: &GitOid,
134) -> Result<CommitResult, CommitError> {
135 let mut state = CommitStateFile {
136 phase: CommitPhase::Commit,
137 epoch_before: epoch_before.clone(),
138 epoch_candidate: epoch_candidate.clone(),
139 epoch_ref_updated: false,
140 branch_ref_updated: false,
141 updated_at_unix_ms: now_unix_ms(),
142 };
143
144 write_merge_state(root, &state)?;
145
146 fp_commit("FP_COMMIT_BEFORE_BRANCH_CAS")?;
148
149 let branch_ref = format!("refs/heads/{branch}");
150 refs::update_refs_atomic(
151 root,
152 &[
153 (refs::EPOCH_CURRENT, epoch_before, epoch_candidate),
154 (&branch_ref, epoch_before, epoch_candidate),
155 ],
156 )?;
157
158 fp_commit("FP_COMMIT_BETWEEN_CAS_OPS")?;
161
162 fp_commit("FP_COMMIT_AFTER_EPOCH_CAS")?;
164
165 state.phase = CommitPhase::Committed;
166 state.epoch_ref_updated = true;
167 state.branch_ref_updated = true;
168 state.updated_at_unix_ms = now_unix_ms();
169 write_merge_state(root, &state)?;
170
171 Ok(CommitResult::Committed)
172}
173
174pub fn recover_partial_commit(
176 root: &Path,
177 branch: &str,
178 epoch_before: &GitOid,
179 epoch_candidate: &GitOid,
180) -> Result<CommitRecovery, CommitError> {
181 let branch_ref = format!("refs/heads/{branch}");
182 let epoch = refs::read_ref(root, refs::EPOCH_CURRENT)?;
183 let branch_head = refs::read_ref(root, &branch_ref)?;
184
185 if epoch.as_ref() == Some(epoch_candidate) && branch_head.as_ref() == Some(epoch_candidate) {
186 return Ok(CommitRecovery::AlreadyCommitted);
187 }
188
189 if epoch.as_ref() == Some(epoch_candidate) && branch_head.as_ref() == Some(epoch_before) {
190 refs::write_ref_cas(root, &branch_ref, epoch_before, epoch_candidate)?;
191
192 let state = CommitStateFile {
193 phase: CommitPhase::Committed,
194 epoch_before: epoch_before.clone(),
195 epoch_candidate: epoch_candidate.clone(),
196 epoch_ref_updated: true,
197 branch_ref_updated: true,
198 updated_at_unix_ms: now_unix_ms(),
199 };
200 write_merge_state(root, &state)?;
201
202 return Ok(CommitRecovery::FinalizedMainRef);
203 }
204
205 if epoch.as_ref() == Some(epoch_before) && branch_head.as_ref() == Some(epoch_before) {
206 return Ok(CommitRecovery::NotCommitted);
207 }
208
209 Err(CommitError::InconsistentRefState {
210 epoch,
211 branch: branch_head,
212 })
213}
214
215pub fn read_merge_state(root: &Path) -> Result<CommitStateFile, CommitError> {
216 let path = merge_state_path(root);
217 let bytes = fs::read(path)?;
218 Ok(serde_json::from_slice(&bytes)?)
219}
220
221fn merge_state_path(root: &Path) -> PathBuf {
222 root.join(MERGE_STATE_REL_PATH)
223}
224
225fn write_merge_state(root: &Path, state: &CommitStateFile) -> Result<(), CommitError> {
226 let path = merge_state_path(root);
227 if let Some(parent) = path.parent() {
228 fs::create_dir_all(parent)?;
229 }
230
231 let tmp = path.with_extension("tmp");
232 let data = serde_json::to_vec_pretty(state)?;
233
234 let mut file = File::create(&tmp)?;
235 file.write_all(&data)?;
236 file.write_all(b"\n")?;
237 file.sync_all()?;
238
239 fs::rename(&tmp, &path)?;
240
241 if let Some(parent) = path.parent() {
242 let dir = File::open(parent)?;
244 dir.sync_all()?;
245 }
246
247 Ok(())
248}
249
250fn fp_commit(_name: &str) -> Result<(), CommitError> {
254 #[cfg(feature = "failpoints")]
255 {
256 crate::fp!(_name).map_err(|e| CommitError::Failpoint(e.to_string()))?;
257 }
258 Ok(())
259}
260
261fn now_unix_ms() -> u128 {
262 SystemTime::now()
263 .duration_since(UNIX_EPOCH)
264 .unwrap_or_default()
265 .as_millis()
266}
267
268#[cfg(test)]
269mod tests {
270 use std::process::Command;
271
272 use tempfile::TempDir;
273
274 use super::*;
275
276 fn run_git(root: &Path, args: &[&str]) {
277 let out = Command::new("git")
278 .args(args)
279 .current_dir(root)
280 .output()
281 .unwrap();
282 assert!(
283 out.status.success(),
284 "git {} failed: {}",
285 args.join(" "),
286 String::from_utf8_lossy(&out.stderr)
287 );
288 }
289
290 fn setup_repo_with_main() -> (TempDir, GitOid, GitOid) {
291 let dir = TempDir::new().unwrap();
292 let root = dir.path();
293
294 run_git(root, &["init"]);
295 run_git(root, &["config", "user.name", "Test"]);
296 run_git(root, &["config", "user.email", "test@test.com"]);
297 run_git(root, &["config", "commit.gpgsign", "false"]);
298
299 fs::write(root.join("README.md"), "hello\n").unwrap();
300 run_git(root, &["add", "."]);
301 run_git(root, &["commit", "-m", "initial"]);
302 run_git(root, &["branch", "-M", "main"]);
303
304 let old = git_oid(root, "HEAD");
305
306 fs::write(root.join("README.md"), "hello world\n").unwrap();
307 run_git(root, &["add", "."]);
308 run_git(root, &["commit", "-m", "candidate"]);
309
310 let new = git_oid(root, "HEAD");
311
312 run_git(root, &["update-ref", "refs/heads/main", old.as_str()]);
314 run_git(root, &["update-ref", refs::EPOCH_CURRENT, old.as_str()]);
315
316 (dir, old, new)
317 }
318
319 fn git_oid(root: &Path, rev: &str) -> GitOid {
320 let out = Command::new("git")
321 .args(["rev-parse", rev])
322 .current_dir(root)
323 .output()
324 .unwrap();
325 assert!(
326 out.status.success(),
327 "rev-parse {rev} failed: {}",
328 String::from_utf8_lossy(&out.stderr)
329 );
330 GitOid::new(String::from_utf8_lossy(&out.stdout).trim()).unwrap()
331 }
332
333 fn assert_repo_usable(root: &Path) {
334 run_git(root, &["fsck", "--no-progress", "--connectivity-only"]);
335 }
336
337 fn assert_commit_exists(root: &Path, oid: &GitOid) {
338 run_git(
339 root,
340 &["cat-file", "-e", &format!("{}^{{commit}}", oid.as_str())],
341 );
342 }
343
344 #[test]
345 fn commit_phase_updates_epoch_and_main() {
346 let (dir, old, new) = setup_repo_with_main();
347 let root = dir.path();
348
349 let result = run_commit_phase(root, "main", &old, &new).unwrap();
350 assert_eq!(result, CommitResult::Committed);
351
352 let epoch = refs::read_ref(root, refs::EPOCH_CURRENT).unwrap();
353 let main = refs::read_ref(root, "refs/heads/main").unwrap();
354 assert_eq!(epoch, Some(new.clone()));
355 assert_eq!(main, Some(new.clone()));
356
357 let state = read_merge_state(root).unwrap();
358 assert_eq!(state.phase, CommitPhase::Committed);
359 assert!(state.epoch_ref_updated);
360 assert!(state.branch_ref_updated);
361
362 assert_repo_usable(root);
363 assert_commit_exists(root, &old);
364 assert_commit_exists(root, &new);
365 }
366
367 #[test]
368 fn recovery_finalizes_when_only_epoch_moved() {
369 let (dir, old, new) = setup_repo_with_main();
370 let root = dir.path();
371
372 refs::advance_epoch(root, &old, &new).unwrap();
373
374 let recovery = recover_partial_commit(root, "main", &old, &new).unwrap();
375 assert_eq!(recovery, CommitRecovery::FinalizedMainRef);
376
377 let main = refs::read_ref(root, "refs/heads/main").unwrap();
378 assert_eq!(main, Some(new.clone()));
379
380 assert_repo_usable(root);
381 assert_commit_exists(root, &old);
382 assert_commit_exists(root, &new);
383 }
384
385 #[test]
386 fn recovery_reports_already_committed_when_both_refs_new() {
387 let (dir, old, new) = setup_repo_with_main();
388 let root = dir.path();
389
390 run_git(root, &["update-ref", refs::EPOCH_CURRENT, new.as_str()]);
391 run_git(root, &["update-ref", "refs/heads/main", new.as_str()]);
392
393 let recovery = recover_partial_commit(root, "main", &old, &new).unwrap();
394 assert_eq!(recovery, CommitRecovery::AlreadyCommitted);
395
396 assert_repo_usable(root);
397 assert_commit_exists(root, &old);
398 assert_commit_exists(root, &new);
399 }
400
401 #[test]
402 fn recovery_reports_not_committed_when_both_refs_old() {
403 let (dir, old, new) = setup_repo_with_main();
404 let root = dir.path();
405
406 let recovery = recover_partial_commit(root, "main", &old, &new).unwrap();
407 assert_eq!(recovery, CommitRecovery::NotCommitted);
408
409 assert_repo_usable(root);
410 assert_commit_exists(root, &old);
411 assert_commit_exists(root, &new);
412 }
413}