git_perf/
git_interop.rs

1use std::{
2    env::current_dir,
3    io::{self, BufRead, BufReader, BufWriter, Write},
4    path::{Path, PathBuf},
5    process::{self, Child, Stdio},
6    thread,
7    time::Duration,
8};
9
10use defer::defer;
11use log::{debug, trace, warn};
12use unindent::unindent;
13
14use ::backoff::ExponentialBackoffBuilder;
15use anyhow::{anyhow, bail, Context, Result};
16use itertools::Itertools;
17
18use chrono::prelude::*;
19use rand::{thread_rng, Rng};
20
21#[derive(Debug)]
22struct GitOutput {
23    stdout: String,
24    stderr: String,
25}
26
27// TODO(kaihowl) separate into git low and high level logic
28
29#[derive(Debug, thiserror::Error)]
30enum GitError {
31    #[error("A ref failed to be pushed:\n{0}\n{1}", output.stdout, output.stderr)]
32    RefFailedToPush { output: GitOutput },
33
34    #[error("Missing HEAD for {reference}")]
35    MissingHead { reference: String },
36
37    #[error("A ref failed to be locked:\n{0}\n{1}", output.stdout, output.stderr)]
38    RefFailedToLock { output: GitOutput },
39
40    #[error("Shallow repository. Refusing operation.")]
41    ShallowRepository,
42
43    #[error("This repo does not have any measurements.")]
44    MissingMeasurements,
45
46    #[error("A concurrent change to the ref occurred:\n{0}\n{1}", output.stdout, output.stderr)]
47    RefConcurrentModification { output: GitOutput },
48
49    #[error("Git failed to execute.\n\nstdout:\n{0}\nstderr:\n{1}", output.stdout, output.stderr)]
50    ExecError { command: String, output: GitOutput },
51
52    #[error("No measurements found on remote")]
53    NoRemoteMeasurements {},
54
55    #[error("No upstream found. Consider setting origin or {}.", GIT_PERF_REMOTE)]
56    NoUpstream {},
57
58    #[error("Failed to execute git command")]
59    IoError(#[from] io::Error),
60}
61
62fn spawn_git_command(
63    args: &[&str],
64    working_dir: &Option<&Path>,
65    stdin: Option<Stdio>,
66) -> Result<Child, io::Error> {
67    let working_dir = working_dir.map(PathBuf::from).unwrap_or(current_dir()?);
68    let stdin = stdin.unwrap_or(Stdio::null());
69    debug!("execute: git {}", args.join(" "));
70    process::Command::new("git")
71        // TODO(kaihowl) set correct encoding and lang?
72        .env("LANG", "")
73        .stdin(stdin)
74        .stdout(Stdio::piped())
75        .stderr(Stdio::piped())
76        .env("LC_ALL", "C")
77        .current_dir(working_dir)
78        .args(args)
79        .spawn()
80}
81
82fn capture_git_output(args: &[&str], working_dir: &Option<&Path>) -> Result<GitOutput, GitError> {
83    feed_git_command(args, working_dir, None)
84}
85
86fn feed_git_command(
87    args: &[&str],
88    working_dir: &Option<&Path>,
89    input: Option<&str>,
90) -> Result<GitOutput, GitError> {
91    let stdin = input.and_then(|_s| Some(Stdio::piped()));
92
93    let child = spawn_git_command(args, working_dir, stdin)?;
94
95    debug!("input: {}", input.unwrap_or(""));
96
97    let output = match child.stdin {
98        Some(ref stdin) => {
99            let mut writer = BufWriter::new(stdin);
100            writer.write_all(input.unwrap().as_bytes())?;
101            drop(writer);
102            child.wait_with_output()
103        }
104        None => child.wait_with_output(),
105    }?;
106
107    let stdout = String::from_utf8_lossy(&output.stdout).to_string();
108    trace!("stdout: {}", stdout);
109
110    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
111    trace!("stderr: {}", stderr);
112
113    let git_output = GitOutput { stdout, stderr };
114
115    if output.status.success() {
116        trace!("exec succeeded");
117    } else {
118        trace!("exec failed");
119        return Err(GitError::ExecError {
120            command: args.join(" "),
121            output: git_output,
122        });
123    }
124
125    Ok(git_output)
126}
127
128// TODO(kaihowl) missing docs
129const REFS_NOTES_BRANCH: &str = "refs/notes/perf-v3";
130const REFS_NOTES_WRITE_SYMBOLIC_REF: &str = "refs/notes/perf-v3-write";
131const REFS_NOTES_WRITE_TARGET_PREFIX: &str = "refs/notes/perf-v3-write-";
132const REFS_NOTES_ADD_TARGET_PREFIX: &str = "refs/notes/perf-v3-add-";
133const REFS_NOTES_REWRITE_TARGET_PREFIX: &str = "refs/notes/perf-v3-rewrite-";
134const REFS_NOTES_MERGE_BRANCH_PREFIX: &str = "refs/notes/perf-v3-merge-";
135const REFS_NOTES_READ_BRANCH: &str = "refs/notes/perf-v3-read";
136const GIT_PERF_REMOTE: &str = "git-perf-origin";
137const GIT_ORIGIN: &str = "origin";
138
139fn map_git_error_for_backoff(e: GitError) -> ::backoff::Error<GitError> {
140    match e {
141        GitError::RefFailedToPush { .. }
142        | GitError::RefFailedToLock { .. }
143        | GitError::RefConcurrentModification { .. } => ::backoff::Error::transient(e),
144        GitError::ExecError { .. }
145        | GitError::IoError(..)
146        | GitError::ShallowRepository
147        | GitError::MissingHead { .. }
148        | GitError::NoRemoteMeasurements { .. }
149        | GitError::NoUpstream { .. }
150        | GitError::MissingMeasurements => ::backoff::Error::permanent(e),
151    }
152}
153
154pub fn add_note_line_to_head(line: &str) -> Result<()> {
155    let op = || -> Result<(), ::backoff::Error<GitError>> {
156        raw_add_note_line_to_head(line).map_err(map_git_error_for_backoff)
157    };
158
159    // TODO(kaihowl) configure
160    let backoff = ExponentialBackoffBuilder::default()
161        .with_max_elapsed_time(Some(Duration::from_secs(60)))
162        .build();
163
164    ::backoff::retry(backoff, op).map_err(|e| match e {
165        ::backoff::Error::Permanent(err) => {
166            anyhow!(err).context("Permanent failure while adding note line to head")
167        }
168        ::backoff::Error::Transient { err, .. } => {
169            anyhow!(err).context("Timed out while adding note line to head")
170        }
171    })?;
172
173    Ok(())
174}
175
176fn raw_add_note_line_to_head(line: &str) -> Result<(), GitError> {
177    ensure_symbolic_write_ref_exists()?;
178
179    // `git notes append` is not safe to use concurrently.
180    // We create a new type of temporary reference: Cannot reuse the normal write references as
181    // they only get merged upon push. This can take arbitrarily long.
182    let current_note_head =
183        git_rev_parse(REFS_NOTES_WRITE_SYMBOLIC_REF).unwrap_or(EMPTY_OID.to_string());
184    let current_symbolic_ref_target = git_rev_parse_symbolic_ref(REFS_NOTES_WRITE_SYMBOLIC_REF)
185        .expect("Missing symbolic-ref for target");
186    let temp_target = create_temp_add_head(&current_note_head)?;
187
188    defer!(git_update_ref(unindent(
189        format!(
190            r#"
191            start
192            delete {temp_target}
193            commit
194            "#
195        )
196        .as_str(),
197    ))
198    .expect("Deleting our own temp ref for adding should never fail"));
199
200    // Test if the repo has any commit checked out at HEAD
201    if let Err(_) = internal_get_head_revision() {
202        return Err(GitError::MissingHead {
203            reference: "HEAD".to_string(),
204        });
205    }
206
207    capture_git_output(
208        &[
209            "notes",
210            "--ref",
211            &temp_target,
212            "append",
213            // TODO(kaihowl) disabled until #96 is solved
214            // "--no-separator",
215            "-m",
216            line,
217        ],
218        &None,
219    )?;
220
221    // Update current write branch with pending write
222    // TODO(kaihowl) duplication
223    git_update_ref(unindent(
224        format!(
225            r#"
226            start
227            symref-verify {REFS_NOTES_WRITE_SYMBOLIC_REF} {current_symbolic_ref_target}
228            update {current_symbolic_ref_target} {temp_target} {current_note_head}
229            commit
230            "#
231        )
232        .as_str(),
233    ))?;
234
235    Ok(())
236}
237
238fn get_git_perf_remote(remote: &str) -> Option<String> {
239    capture_git_output(&["remote", "get-url", remote], &None)
240        .ok()
241        .map(|s| s.stdout.trim().to_owned())
242}
243
244fn set_git_perf_remote(remote: &str, url: &str) -> Result<(), GitError> {
245    capture_git_output(&["remote", "add", remote, url], &None).map(|_| ())
246}
247
248fn ensure_remote_exists() -> Result<(), GitError> {
249    if let Some(_) = get_git_perf_remote(GIT_PERF_REMOTE) {
250        return Ok(());
251    }
252
253    if let Some(x) = get_git_perf_remote(GIT_ORIGIN) {
254        return set_git_perf_remote(GIT_PERF_REMOTE, &x);
255    }
256
257    return Err(GitError::NoUpstream {});
258}
259
260fn ensure_symbolic_write_ref_exists() -> Result<(), GitError> {
261    if let Err(GitError::MissingHead { .. }) = git_rev_parse(REFS_NOTES_WRITE_SYMBOLIC_REF) {
262        let suffix = random_suffix();
263        let target = format!("{REFS_NOTES_WRITE_TARGET_PREFIX}{suffix}");
264
265        git_update_ref(unindent(
266            format!(
267                // Commit only if not yet created
268                r#"
269                start
270                symref-create {REFS_NOTES_WRITE_SYMBOLIC_REF} {target}
271                commit
272                "#
273            )
274            .as_str(),
275        ))
276        .or_else(|err| {
277            if let GitError::RefFailedToLock { .. } = err {
278                return Ok(());
279            } else {
280                return Err(err);
281            }
282        })?;
283    }
284    Ok(())
285}
286
287fn random_suffix() -> String {
288    let suffix: u32 = thread_rng().gen();
289    format!("{:08x}", suffix)
290}
291
292fn git_update_ref(commands: impl AsRef<str>) -> Result<(), GitError> {
293    feed_git_command(
294        &[
295            "update-ref",
296            // When updating existing symlinks, we want to update the source symlink and not its target
297            "--no-deref",
298            "--stdin",
299        ],
300        &None,
301        Some(commands.as_ref()),
302    )
303    .map_err(map_git_error)
304    .map(|_| ())
305}
306
307pub fn get_head_revision() -> Result<String> {
308    Ok(internal_get_head_revision()?)
309}
310
311fn internal_get_head_revision() -> Result<String, GitError> {
312    git_rev_parse("HEAD")
313}
314
315fn map_git_error(err: GitError) -> GitError {
316    // TODO(kaihowl) is parsing user facing string such a good idea. Probably not...
317    match err {
318        GitError::ExecError { command: _, output } if output.stderr.contains("cannot lock ref") => {
319            GitError::RefFailedToLock { output }
320        }
321        GitError::ExecError { command: _, output } if output.stderr.contains("but expected") => {
322            GitError::RefConcurrentModification { output }
323        }
324        GitError::ExecError { command: _, output } if output.stderr.contains("find remote ref") => {
325            GitError::NoRemoteMeasurements {}
326        }
327        _ => err,
328    }
329}
330
331fn fetch(work_dir: Option<&Path>) -> Result<(), GitError> {
332    ensure_remote_exists()?;
333
334    let ref_before = git_rev_parse(REFS_NOTES_BRANCH).ok();
335    // Use git directly to avoid having to implement ssh-agent and/or extraHeader handling
336    capture_git_output(
337        &[
338            "fetch",
339            "--no-write-fetch-head",
340            "origin",
341            // Always force overwrite the local reference
342            // Separation into write, merge, and read branches ensures that this does not lead to
343            // any data loss
344            format!("+{REFS_NOTES_BRANCH}:{REFS_NOTES_BRANCH}").as_str(),
345        ],
346        &work_dir,
347    )
348    .map(|output| print!("{}", output.stderr))
349    .map_err(map_git_error)?;
350
351    let ref_after = git_rev_parse(REFS_NOTES_BRANCH).ok();
352
353    if ref_before == ref_after {
354        println!("Already up to date");
355    }
356
357    Ok(())
358}
359
360fn reconcile_branch_with(target: &str, branch: &str) -> Result<(), GitError> {
361    _ = capture_git_output(
362        &[
363            "notes",
364            "--ref",
365            target,
366            "merge",
367            "-s",
368            "cat_sort_uniq",
369            branch,
370        ],
371        &None,
372    )?;
373    Ok(())
374}
375
376// TODO(kaihowl) duplication
377fn create_temp_rewrite_head(current_notes_head: &str) -> Result<String, GitError> {
378    let suffix = random_suffix();
379    let target = format!("{REFS_NOTES_REWRITE_TARGET_PREFIX}{suffix}");
380
381    // Clone reference
382    git_update_ref(unindent(
383        format!(
384            r#"
385            start
386            create {target} {current_notes_head}
387            commit
388            "#
389        )
390        .as_str(),
391    ))?;
392
393    Ok(target)
394}
395
396fn create_temp_add_head(current_notes_head: &str) -> Result<String, GitError> {
397    let suffix = random_suffix();
398    let target = format!("{REFS_NOTES_ADD_TARGET_PREFIX}{suffix}");
399
400    // TODO(kaihowl) humpty dumpty
401    if current_notes_head != EMPTY_OID {
402        // Clone reference
403        git_update_ref(unindent(
404            format!(
405                r#"
406            start
407            create {target} {current_notes_head}
408            commit
409            "#
410            )
411            .as_str(),
412        ))?;
413    }
414
415    Ok(target)
416}
417
418fn compact_head(target: &str) -> Result<(), GitError> {
419    let new_removal_head = git_rev_parse(&format!("{target}^{{tree}}").as_str())?;
420
421    // Orphan compaction commit
422    let compaction_head = capture_git_output(
423        &["commit-tree", "-m", "cutoff history", &new_removal_head],
424        &None,
425    )?
426    .stdout;
427
428    let compaction_head = compaction_head.trim();
429
430    git_update_ref(unindent(
431        format!(
432            r#"
433            start
434            update {target} {compaction_head}
435            commit
436            "#
437        )
438        .as_str(),
439    ))?;
440
441    Ok(())
442}
443
444fn retry_notify(err: GitError, dur: Duration) {
445    debug!("Error happened at {:?}: {}", dur, err);
446    warn!("Retrying...");
447}
448
449pub fn remove_measurements_from_commits(older_than: DateTime<Utc>) -> Result<()> {
450    let op = || -> Result<(), ::backoff::Error<GitError>> {
451        raw_remove_measurements_from_commits(older_than).map_err(map_git_error_for_backoff)
452    };
453
454    // TODO(kaihowl) configure
455    let backoff = ExponentialBackoffBuilder::default()
456        .with_max_elapsed_time(Some(Duration::from_secs(60)))
457        .build();
458
459    ::backoff::retry_notify(backoff, op, retry_notify).map_err(|e| match e {
460        ::backoff::Error::Permanent(err) => {
461            anyhow!(err).context("Permanent failure while adding note line to head")
462        }
463        ::backoff::Error::Transient { err, .. } => {
464            anyhow!(err).context("Timed out while adding note line to head")
465        }
466    })?;
467
468    Ok(())
469}
470
471fn raw_remove_measurements_from_commits(older_than: DateTime<Utc>) -> Result<(), GitError> {
472    // TODO(kaihowl) flow
473    // 1. pull
474    // 2. remove measurements
475    // 3. compact
476    // 4. try to push
477    // TODO(kaihowl) repeat with back off
478    // TODO(kaihowl) clean up branches
479
480    // TODO(kaihowl) better error message for remote empty / never pushed
481    fetch(None)?;
482
483    let current_notes_head = git_rev_parse(REFS_NOTES_BRANCH)?;
484
485    let target = create_temp_rewrite_head(&current_notes_head)?;
486
487    remove_measurements_from_reference(&target, older_than)?;
488
489    compact_head(&target)?;
490
491    // TODO(kaihowl) actual push needed
492    git_push_notes_ref(&current_notes_head, &target, &None)?;
493
494    git_update_ref(unindent(
495        format!(
496            r#"
497            start
498            update {REFS_NOTES_BRANCH} {target}
499            commit
500            "#
501        )
502        .as_str(),
503    ))?;
504
505    // Delete target
506    git_update_ref(unindent(
507        format!(
508            r#"
509            start
510            delete {target}
511            commit
512            "#
513        )
514        .as_str(),
515    ))?;
516
517    Ok(())
518}
519
520// Remove notes pertaining to git commits whose commit date is older than specified.
521fn remove_measurements_from_reference(
522    reference: &str,
523    older_than: DateTime<Utc>,
524) -> Result<(), GitError> {
525    let oldest_timestamp = older_than.timestamp();
526    // Outputs line-by-line <note_oid> <annotated_oid>
527    let mut list_notes = spawn_git_command(&["notes", "--ref", reference, "list"], &None, None)?;
528    let notes_out = list_notes.stdout.take().unwrap();
529
530    let mut get_commit_dates = spawn_git_command(
531        &[
532            "log",
533            "--ignore-missing",
534            "--no-walk",
535            "--pretty=format:%H %ct",
536            "--stdin",
537        ],
538        &None,
539        Some(Stdio::piped()),
540    )?;
541    let dates_in = get_commit_dates.stdin.take().unwrap();
542    let dates_out = get_commit_dates.stdout.take().unwrap();
543
544    let mut remove_measurements = spawn_git_command(
545        &[
546            "notes",
547            "--ref",
548            reference,
549            "remove",
550            "--stdin",
551            "--ignore-missing",
552        ],
553        &None,
554        Some(Stdio::piped()),
555    )?;
556    let removal_in = remove_measurements.stdin.take().unwrap();
557    let removal_out = remove_measurements.stdout.take().unwrap();
558
559    let removal_handler = thread::spawn(move || {
560        let reader = BufReader::new(dates_out);
561        let mut writer = BufWriter::new(removal_in);
562        for line in reader.lines().map_while(Result::ok) {
563            if let Some((commit, timestamp)) = line.split_whitespace().take(2).collect_tuple() {
564                if let Ok(timestamp) = timestamp.parse::<i64>() {
565                    if timestamp <= oldest_timestamp {
566                        writeln!(writer, "{}", commit).expect("Could not write to stream");
567                    }
568                }
569            }
570        }
571    });
572
573    let debugging_handler = thread::spawn(move || {
574        let reader = BufReader::new(removal_out);
575        reader
576            .lines()
577            .map_while(Result::ok)
578            .for_each(|l| println!("{}", l))
579    });
580
581    {
582        let reader = BufReader::new(notes_out);
583        let mut writer = BufWriter::new(dates_in);
584
585        reader.lines().map_while(Result::ok).for_each(|line| {
586            if let Some(line) = line.split_whitespace().nth(1) {
587                writeln!(writer, "{}", line).expect("Failed to write to pipe");
588            }
589        });
590
591        // TODO(kaihowl) necessary?
592        drop(writer);
593    }
594
595    removal_handler.join().expect("Failed to join");
596    debugging_handler.join().expect("Failed to join");
597
598    list_notes.wait()?;
599    get_commit_dates.wait()?;
600    remove_measurements.wait()?;
601
602    Ok(())
603}
604
605fn new_symbolic_write_ref() -> Result<String, GitError> {
606    let suffix = random_suffix();
607    let target = format!("{REFS_NOTES_WRITE_TARGET_PREFIX}{suffix}");
608
609    // TODO(kaihowl) can this actually return a failure upon abort?
610    // TODO(kaihowl) does this actually run atomically as it claims?
611    // See https://github.com/libgit2/libgit2/issues/5918 for a counter example
612    // Also source code for the refs/files-backend.c does not look up to the task?
613    // Do we need packed references after all? Or the new reftable format?
614    git_update_ref(unindent(
615        format!(
616            r#"
617            start
618            symref-update {REFS_NOTES_WRITE_SYMBOLIC_REF} {target}
619            commit
620            "#
621        )
622        .as_str(),
623    ))?;
624    Ok(target)
625}
626
627const EMPTY_OID: &str = "0000000000000000000000000000000000000000";
628
629fn git_rev_parse(reference: &str) -> Result<String, GitError> {
630    capture_git_output(&["rev-parse", "--verify", "-q", reference], &None)
631        .map_err(|_e| GitError::MissingHead {
632            reference: reference.into(),
633        })
634        .map(|s| s.stdout.trim().to_owned())
635}
636
637fn git_rev_parse_symbolic_ref(reference: &str) -> Option<String> {
638    capture_git_output(&["symbolic-ref", "-q", reference], &None)
639        .ok()
640        .map(|s| s.stdout.trim().to_owned())
641}
642
643fn consolidate_write_branches_into(
644    current_upstream_oid: &str,
645    target: &str,
646    except_ref: Option<&str>,
647) -> Result<Vec<Reference>, GitError> {
648    // - Reset the merge ref to the upstream perf ref iff it still matches the captured OID
649    //   - otherwise concurrent pull occurred.
650    git_update_ref(unindent(
651        format!(
652            r#"
653                start
654                verify {REFS_NOTES_BRANCH} {current_upstream_oid}
655                update {target} {current_upstream_oid} {EMPTY_OID}
656                commit
657            "#
658        )
659        .as_str(),
660    ))?;
661
662    // - merge in all existing write refs, except for the newly created one from first step
663    //     - Same step (except for filtering of the new ref) happens on local read as well.)
664    //     - Relies on unrelated histories, cat_sort_uniq merge strategy
665    //     - Allows to cut off the history on upstream periodically
666    let additional_args = vec![format!("{REFS_NOTES_WRITE_TARGET_PREFIX}*")];
667    let refs = get_refs(additional_args)?
668        .into_iter()
669        .filter(|r| r.refname != except_ref.unwrap_or_default())
670        .collect_vec();
671
672    // TODO(kaihowl) explicit test needed, currently only fails in concurrency test
673    // when push is called before the first add.
674    if refs.is_empty() {
675        return Ok([].into());
676    }
677
678    for reference in &refs {
679        reconcile_branch_with(&target, &reference.oid)?;
680    }
681
682    Ok(refs)
683}
684
685//TODO(kaihowl) clean up pub methods
686fn raw_push(work_dir: Option<&Path>) -> Result<(), GitError> {
687    ensure_remote_exists()?;
688    // This might merge concurrently created write branches. There is no protection against that.
689    // This wants to achieve an at-least-once semantic. The exactly-once semantic is ensured by the
690    // cat_sort_uniq merge strategy.
691
692    // - Reset the symbolic-ref “write” to a new unique write ref.
693    //     - Allows to continue committing measurements while pushing.
694    //     - ?? What happens when a git notes amend concurrently still writes to the old ref?
695    let new_write_ref = new_symbolic_write_ref()?;
696
697    // TODO(kaihowl) catch all dupes with this pattern
698    let suffix = random_suffix();
699    let merge_ref = format!("{REFS_NOTES_MERGE_BRANCH_PREFIX}{suffix}");
700
701    defer!(git_update_ref(unindent(
702        format!(
703            r#"
704                    start
705                    delete {merge_ref}
706                    commit
707                "#
708        )
709        .as_str()
710    ))
711    .expect("Deleting our own branch should never fail"));
712
713    // - Create a temporary merge ref, set to the upstream perf ref, merge in all existing write refs except the newly created one from the previous step.
714    //     - Same step (except for filtering of the new ref) happens on local read as well.)
715    //     - Relies on unrelated histories, cat_sort_uniq merge strategy
716    //     - Allows to cut off the history on upstream periodically
717    // NEW
718    // - Note down the current upstream perf ref oid
719    let current_upstream_oid = git_rev_parse(REFS_NOTES_BRANCH).unwrap_or(EMPTY_OID.to_string());
720    let refs =
721        consolidate_write_branches_into(&current_upstream_oid, &merge_ref, Some(&new_write_ref))?;
722
723    if refs.is_empty() && current_upstream_oid == EMPTY_OID {
724        return Err(GitError::MissingMeasurements);
725    }
726
727    git_push_notes_ref(&current_upstream_oid, &merge_ref, &work_dir)?;
728
729    // TODO(kaihowl) can git push immediately update the local ref as well?
730    // This might be necessary for a concurrent push in between the last push from here and the now
731    // following fetch. Otherwise, the `verify` will fail in the update-ref call later.
732    fetch(None)?;
733
734    // Delete merged-in write references
735    let mut commands = Vec::new();
736    commands.push(String::from("start"));
737    for Reference { refname, oid } in &refs {
738        commands.push(format!("delete {refname} {oid}"));
739    }
740    commands.push(String::from("commit"));
741    // empty line
742    commands.push(String::new());
743    let commands = commands.join("\n");
744    git_update_ref(commands)?;
745
746    Ok(())
747
748    // TODO(kaihowl) - Clean up all local write refs that have been merged into the upstream branch.
749}
750
751fn git_push_notes_ref(
752    expected_upstream: &str,
753    push_ref: &str,
754    working_dir: &Option<&Path>,
755) -> Result<(), GitError> {
756    // TODO(kaihowl) configure remote?
757    // TODO(kaihowl) factor into constants
758    // TODO(kaihowl) capture output
759    // - CAS push the temporary merge ref to upstream using the noted down upstream ref
760    //     - In case of concurrent pushes, back off and restart fresh from previous step.
761    let output = capture_git_output(
762        &[
763            "push",
764            "--porcelain",
765            format!("--force-with-lease={REFS_NOTES_BRANCH}:{expected_upstream}").as_str(),
766            "origin",
767            format!("{push_ref}:{REFS_NOTES_BRANCH}").as_str(),
768        ],
769        &working_dir,
770    );
771
772    // - Clean your own temporary merge ref and all others with a merge commit older than x days.
773    //     - In case of crashes before clean up, old merge refs are eliminated eventually.
774
775    match output {
776        Ok(output) => {
777            print!("{}", &output.stdout);
778            Ok(())
779        }
780        Err(GitError::ExecError { command: _, output }) => {
781            let successful_push = output.stdout.lines().any(|l| {
782                l.contains(format!("{REFS_NOTES_BRANCH}:").as_str()) && !l.starts_with('!')
783            });
784            if successful_push {
785                Ok(())
786            } else {
787                Err(GitError::RefFailedToPush { output })
788            }
789        }
790        Err(e) => Err(e),
791    }?;
792
793    Ok(())
794}
795
796// TODO(kaihowl) what happens with a git dir supplied with -C?
797pub fn prune() -> Result<()> {
798    // TODO(kaihowl) put the transient / permanent error in its own function, reuse
799    let op = || -> Result<(), ::backoff::Error<GitError>> {
800        raw_prune().map_err(map_git_error_for_backoff)
801    };
802
803    // TODO(kaihowl) configure
804    let backoff = ExponentialBackoffBuilder::default()
805        .with_max_elapsed_time(Some(Duration::from_secs(60)))
806        .build();
807
808    ::backoff::retry_notify(backoff, op, retry_notify).map_err(|e| match e {
809        ::backoff::Error::Permanent(err) => {
810            anyhow!(err).context("Permanent failure while pushing refs")
811        }
812        ::backoff::Error::Transient { err, .. } => anyhow!(err).context("Timed out pushing refs"),
813    })?;
814
815    Ok(())
816}
817
818fn raw_prune() -> Result<(), GitError> {
819    // TODO(kaihowl) missing raw + retry
820    if is_shallow_repo()? {
821        // TODO(kaihowl) is this not already checked by git itself?
822        return Err(GitError::ShallowRepository);
823    }
824
825    // TODO(kaihowl) code duplication with remove_measurements_from_commits
826
827    // - update local upstream from remote
828    pull_internal(None)?;
829
830    // - create temp branch for pruning and set to current upstream
831    let current_notes_head = git_rev_parse(REFS_NOTES_BRANCH)?;
832    let target = create_temp_rewrite_head(&current_notes_head)?;
833
834    // - invoke prune
835    capture_git_output(&["notes", "--ref", &target, "prune"], &None)?;
836
837    // - compact the new head
838    compact_head(&target)?;
839
840    // TODO(kaihowl) add additional test coverage checking that the head has been compacted
841    // / elements are dropped
842
843    // - CAS remote upstream
844    git_push_notes_ref(&current_notes_head, &target, &None)?;
845    git_update_ref(unindent(
846        format!(
847            r#"
848            start
849            update {REFS_NOTES_BRANCH} {target}
850            commit
851            "#
852        )
853        .as_str(),
854    ))?;
855
856    // - clean up temp branch
857    // TODO(kaihowl) clean up old temp branches
858    git_update_ref(unindent(
859        format!(
860            r#"
861            start
862            delete {target}
863            commit
864            "#
865        )
866        .as_str(),
867    ))?;
868
869    Ok(())
870}
871
872fn is_shallow_repo() -> Result<bool, GitError> {
873    let output = capture_git_output(&["rev-parse", "--is-shallow-repository"], &None)?;
874
875    Ok(output.stdout.starts_with("true"))
876}
877
878#[derive(Debug, PartialEq)]
879struct Reference {
880    refname: String,
881    oid: String,
882}
883
884fn get_refs(additional_args: Vec<String>) -> Result<Vec<Reference>, GitError> {
885    let mut args = vec!["for-each-ref", "--format=%(refname)%00%(objectname)"];
886    args.extend(additional_args.iter().map(|s| s.as_str()));
887
888    let output = capture_git_output(&args, &None)?;
889    Ok(output
890        .stdout
891        .lines()
892        .map(|s| {
893            let items = s.split('\0').take(2).collect_vec();
894            assert!(items.len() == 2);
895            Reference {
896                refname: items[0].to_string(),
897                oid: items[1].to_string(),
898            }
899        })
900        .collect_vec())
901}
902
903fn update_read_branch() -> Result<()> {
904    // TODO(kaihowl) use temp branches and return RAII object
905    git_update_ref(unindent(
906        format!(
907            r#"
908            start
909            delete {REFS_NOTES_READ_BRANCH}
910            commit
911            "#
912        )
913        .as_str(),
914    ))?;
915
916    // - With the upstream refs/notes/perf-v3
917    //     - If not merged into refs/notes/perf-v3-read: set refs/notes/perf-v3-read to refs/notes/perf-v3
918    //     - Protect against concurrent invocations by checking that the refs/notes/perf-v3-read has
919    //     not changed between invocations!
920    //
921    // TODO(kaihowl) add test for bug:
922    //   read branch might not be up to date with the remote branch after a history cut off.
923    //   Then the _old_ read branch might have all writes already merged in.
924    //   But the upstream does not. But we check the pending write branches against the old read
925    //   branch......
926    //   Better to just create the read branch fresh from the remote and add in all pending write
927    //   branches and not optimize. This should be the same as creating the merge branch. Can the
928    //   code be ..merged..?
929
930    let current_upstream_oid = git_rev_parse(REFS_NOTES_BRANCH).unwrap_or(EMPTY_OID.to_string());
931    // TODO(kaihowl) protect against concurrent writes with temp read branch?
932    let _ = consolidate_write_branches_into(&current_upstream_oid, REFS_NOTES_READ_BRANCH, None)?;
933
934    Ok(())
935}
936
937// TODO(kaihowl) return a nested iterator / generator instead?
938pub fn walk_commits(num_commits: usize) -> Result<Vec<(String, Vec<String>)>> {
939    // update local read branch
940    update_read_branch()?;
941
942    // TODO(kaihowl) update the local read branch
943    let output = capture_git_output(
944        &[
945            "--no-pager",
946            "log",
947            "--no-color",
948            "--ignore-missing",
949            "-n",
950            num_commits.to_string().as_str(),
951            "--first-parent",
952            "--pretty=--,%H,%D%n%N",
953            "--decorate=full",
954            format!("--notes={REFS_NOTES_READ_BRANCH}").as_str(),
955            "HEAD",
956        ],
957        &None,
958    )
959    .context("Failed to retrieve commits")?;
960
961    let mut current_commit = None;
962    let mut detected_shallow = false;
963
964    // TODO(kaihowl) iterator or generator instead / how to propagate exit code?
965    let it = output.stdout.lines().filter_map(|l| {
966        if l.starts_with("--") {
967            let info = l.split(',').collect_vec();
968
969            current_commit = Some(
970                info.get(1)
971                    .expect("Could not read commit header.")
972                    .to_owned(),
973            );
974
975            detected_shallow |= info[2..].iter().any(|s| *s == "grafted");
976
977            None
978        } else {
979            // TODO(kaihowl) lot's of string copies...
980            Some((
981                current_commit.as_ref().expect("TODO(kaihowl)").to_owned(),
982                l,
983            ))
984        }
985    });
986
987    let commits: Vec<_> = it
988        .group_by(|it| it.0.to_owned())
989        .into_iter()
990        .map(|(k, v)| {
991            (
992                k.to_owned(),
993                // TODO(kaihowl) joining what was split above already
994                // TODO(kaihowl) lot's of string copies...
995                v.map(|(_, v)| v.to_owned()).collect::<Vec<_>>(),
996            )
997        })
998        .collect();
999
1000    if detected_shallow && commits.len() < num_commits {
1001        bail!("Refusing to continue as commit log depth was limited by shallow clone");
1002    }
1003
1004    Ok(commits)
1005}
1006
1007pub fn pull(work_dir: Option<&Path>) -> Result<()> {
1008    pull_internal(work_dir)?;
1009    Ok(())
1010}
1011
1012fn pull_internal(work_dir: Option<&Path>) -> Result<(), GitError> {
1013    fetch(work_dir).or_else(|err| match err {
1014        // A concurrent modification comes from a concurrent fetch.
1015        // Don't fail for that.
1016        // TODO(kaihowl) must potentially be moved into the retry logic from the push backoff as it
1017        // only is there safe to assume that we successfully pulled.
1018        GitError::RefConcurrentModification { .. } | GitError::RefFailedToLock { .. } => Ok(()),
1019        _ => Err(err),
1020    })?;
1021
1022    Ok(())
1023}
1024
1025pub fn push(work_dir: Option<&Path>) -> Result<()> {
1026    // TODO(kaihowl) check transient/permanent error
1027    let op = || {
1028        raw_push(work_dir)
1029            .map_err(map_git_error_for_backoff)
1030            .map_err(|e: ::backoff::Error<GitError>| match e {
1031                ::backoff::Error::Transient { .. } => {
1032                    match pull_internal(work_dir).map_err(map_git_error_for_backoff) {
1033                        Ok(_) => e,
1034                        Err(e) => e,
1035                    }
1036                }
1037                ::backoff::Error::Permanent { .. } => e,
1038            })
1039    };
1040
1041    // TODO(kaihowl) configure
1042    let backoff = ExponentialBackoffBuilder::default()
1043        .with_max_elapsed_time(Some(Duration::from_secs(60)))
1044        .build();
1045
1046    ::backoff::retry_notify(backoff, op, retry_notify).map_err(|e| match e {
1047        ::backoff::Error::Permanent(err) => {
1048            anyhow!(err).context("Permanent failure while pushing refs")
1049        }
1050        ::backoff::Error::Transient { err, .. } => anyhow!(err).context("Timed out pushing refs"),
1051    })?;
1052
1053    Ok(())
1054}
1055
1056fn parse_git_version(version: &str) -> Result<(i32, i32, i32)> {
1057    let version = version
1058        .split_whitespace()
1059        .nth(2)
1060        .ok_or(anyhow!("Could not find git version in string {version}"))?;
1061    match version.split('.').collect_vec()[..] {
1062        [major, minor, patch] => Ok((major.parse()?, minor.parse()?, patch.parse()?)),
1063        _ => Err(anyhow!("Failed determine semantic version from {version}")),
1064    }
1065}
1066
1067fn get_git_version() -> Result<(i32, i32, i32)> {
1068    let version = capture_git_output(&["--version"], &None)
1069        .context("Determine git version")?
1070        .stdout;
1071    parse_git_version(&version)
1072}
1073
1074fn concat_version(version_tuple: (i32, i32, i32)) -> String {
1075    format!(
1076        "{}.{}.{}",
1077        version_tuple.0, version_tuple.1, version_tuple.2
1078    )
1079}
1080
1081pub fn check_git_version() -> Result<()> {
1082    let version_tuple = get_git_version().context("Determining compatible git version")?;
1083    let expected_version = (2, 41, 0);
1084    if version_tuple < expected_version {
1085        bail!(
1086            "Version {} is smaller than {}",
1087            concat_version(version_tuple),
1088            concat_version(expected_version)
1089        )
1090    }
1091    Ok(())
1092}
1093
1094#[cfg(test)]
1095mod test {
1096    use super::*;
1097    use std::env::{self, set_current_dir};
1098
1099    use httptest::{
1100        http::{header::AUTHORIZATION, Uri},
1101        matchers::{self, request},
1102        responders::status_code,
1103        Expectation, Server,
1104    };
1105    use serial_test::serial;
1106    use tempfile::{tempdir, TempDir};
1107
1108    fn run_git_command(args: &[&str], dir: &Path) {
1109        assert!(process::Command::new("git")
1110            .args(args)
1111            .envs([
1112                ("GIT_CONFIG_NOSYSTEM", "true"),
1113                ("GIT_CONFIG_GLOBAL", "/dev/null"),
1114                ("GIT_AUTHOR_NAME", "testuser"),
1115                ("GIT_AUTHOR_EMAIL", "testuser@example.com"),
1116                ("GIT_COMMITTER_NAME", "testuser"),
1117                ("GIT_COMMITTER_EMAIL", "testuser@example.com"),
1118            ])
1119            .current_dir(dir)
1120            .status()
1121            .expect("Failed to spawn git command")
1122            .success());
1123    }
1124
1125    fn init_repo(dir: &Path) {
1126        run_git_command(&["init", "--initial-branch", "master"], dir);
1127        run_git_command(&["commit", "--allow-empty", "-m", "Initial commit"], dir);
1128    }
1129
1130    fn dir_with_repo() -> TempDir {
1131        let tempdir = tempdir().unwrap();
1132        init_repo(tempdir.path());
1133        tempdir
1134    }
1135
1136    fn add_server_remote(origin_url: Uri, extra_header: &str, dir: &Path) {
1137        let url = origin_url.to_string();
1138
1139        run_git_command(&["remote", "add", "origin", &url], dir);
1140        run_git_command(
1141            &[
1142                "config",
1143                "--add",
1144                format!("http.{}.extraHeader", url).as_str(),
1145                extra_header,
1146            ],
1147            dir,
1148        );
1149    }
1150
1151    fn hermetic_git_env() {
1152        env::set_var("GIT_CONFIG_NOSYSTEM", "true");
1153        env::set_var("GIT_CONFIG_GLOBAL", "/dev/null");
1154        env::set_var("GIT_AUTHOR_NAME", "testuser");
1155        env::set_var("GIT_AUTHOR_EMAIL", "testuser@example.com");
1156        env::set_var("GIT_COMMITTER_NAME", "testuser");
1157        env::set_var("GIT_COMMITTER_EMAIL", "testuser@example.com");
1158    }
1159
1160    #[test]
1161    #[serial]
1162    fn test_customheader_pull() {
1163        let tempdir = dir_with_repo();
1164        set_current_dir(tempdir.path()).expect("Failed to change dir");
1165
1166        let test_server = Server::run();
1167        add_server_remote(
1168            test_server.url(""),
1169            "AUTHORIZATION: sometoken",
1170            tempdir.path(),
1171        );
1172
1173        test_server.expect(
1174            Expectation::matching(request::headers(matchers::contains((
1175                AUTHORIZATION.as_str(),
1176                "sometoken",
1177            ))))
1178            .times(1..)
1179            .respond_with(status_code(200)),
1180        );
1181
1182        // TODO(kaihowl) not so great test as this fails with/without authorization
1183        // We only want to verify that a call on the server with the authorization header was
1184        // received.
1185        hermetic_git_env();
1186        pull(None).expect_err("We have no valid git http server setup -> should fail");
1187    }
1188
1189    #[test]
1190    // TODO(kaihowl) properly pass current working directory into commands and remove serial
1191    // execution again
1192    #[serial]
1193    fn test_customheader_push() {
1194        let tempdir = dir_with_repo();
1195        set_current_dir(tempdir.path()).expect("Failed to change dir");
1196
1197        let test_server = Server::run();
1198        add_server_remote(
1199            test_server.url(""),
1200            "AUTHORIZATION: someothertoken",
1201            tempdir.path(),
1202        );
1203
1204        test_server.expect(
1205            Expectation::matching(request::headers(matchers::contains((
1206                AUTHORIZATION.as_str(),
1207                "someothertoken",
1208            ))))
1209            .times(1..)
1210            .respond_with(status_code(200)),
1211        );
1212
1213        // Must add a single write as a push without pending local writes just succeeds
1214        add_note_line_to_head("test note line").expect("Failed to add note line");
1215
1216        // TODO(kaihowl) duplication, leaks out of this test
1217        hermetic_git_env();
1218
1219        let error = push(None);
1220        error
1221            .as_ref()
1222            .expect_err("We have no valid git http server setup -> should fail");
1223        dbg!(&error);
1224    }
1225
1226    #[test]
1227    #[serial]
1228    fn test_get_head_revision() {
1229        let repo_dir = dir_with_repo();
1230        set_current_dir(repo_dir.path()).expect("Failed to change dir");
1231        let revision = internal_get_head_revision().unwrap();
1232        assert!(
1233            &revision.chars().all(|c| c.is_ascii_alphanumeric()),
1234            "'{}' contained non alphanumeric or non ASCII characters",
1235            &revision
1236        )
1237    }
1238
1239    #[test]
1240    fn test_parse_git_version() {
1241        let version = parse_git_version("git version 2.52.0");
1242        assert_eq!(version.unwrap(), (2, 52, 0));
1243
1244        let version = parse_git_version("git version 2.52.0\n");
1245        assert_eq!(version.unwrap(), (2, 52, 0));
1246    }
1247
1248    #[test]
1249    fn test_random_suffix() {
1250        for _ in 1..1000 {
1251            let first = random_suffix();
1252            dbg!(&first);
1253            let second = random_suffix();
1254            dbg!(&second);
1255
1256            let all_hex = |s: &String| s.chars().all(|c| c.is_ascii_hexdigit());
1257
1258            assert_ne!(first, second);
1259            assert_eq!(first.len(), 8);
1260            assert_eq!(second.len(), 8);
1261            assert!(all_hex(&first));
1262            assert!(all_hex(&second));
1263        }
1264    }
1265}