use std::{
io::{BufRead, BufReader, BufWriter, Write},
path::Path,
process::Stdio,
thread,
time::Duration,
};
use defer::defer;
use log::{debug, warn};
use unindent::unindent;
use anyhow::{anyhow, bail, Context, Result};
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
use itertools::Itertools;
use chrono::prelude::*;
use rand::{thread_rng, Rng};
use crate::config;
use super::git_definitions::{
GIT_ORIGIN, GIT_PERF_REMOTE, REFS_NOTES_ADD_TARGET_PREFIX, REFS_NOTES_BRANCH,
REFS_NOTES_MERGE_BRANCH_PREFIX, REFS_NOTES_READ_PREFIX, REFS_NOTES_REWRITE_TARGET_PREFIX,
REFS_NOTES_WRITE_SYMBOLIC_REF, REFS_NOTES_WRITE_TARGET_PREFIX,
};
use super::git_lowlevel::{
capture_git_output, get_git_perf_remote, git_rev_parse, git_rev_parse_symbolic_ref,
git_update_ref, internal_get_head_revision, is_shallow_repo, map_git_error,
set_git_perf_remote, spawn_git_command,
};
use super::git_types::GitError;
use super::git_types::Reference;
pub use super::git_lowlevel::get_head_revision;
pub use super::git_lowlevel::check_git_version;
fn map_git_error_for_backoff(e: GitError) -> ::backoff::Error<GitError> {
match e {
GitError::RefFailedToPush { .. }
| GitError::RefFailedToLock { .. }
| GitError::RefConcurrentModification { .. } => ::backoff::Error::transient(e),
GitError::ExecError { .. }
| GitError::IoError(..)
| GitError::ShallowRepository
| GitError::MissingHead { .. }
| GitError::NoRemoteMeasurements { .. }
| GitError::NoUpstream { .. }
| GitError::MissingMeasurements => ::backoff::Error::permanent(e),
}
}
fn default_backoff() -> ExponentialBackoff {
let max_elapsed = config::backoff_max_elapsed_seconds();
ExponentialBackoffBuilder::default()
.with_max_elapsed_time(Some(Duration::from_secs(max_elapsed)))
.build()
}
pub fn add_note_line_to_head(line: &str) -> Result<()> {
let op = || -> Result<(), ::backoff::Error<GitError>> {
raw_add_note_line_to_head(line).map_err(map_git_error_for_backoff)
};
let backoff = default_backoff();
::backoff::retry(backoff, op).map_err(|e| match e {
::backoff::Error::Permanent(err) => {
anyhow!(err).context("Permanent failure while adding note line to head")
}
::backoff::Error::Transient { err, .. } => {
anyhow!(err).context("Timed out while adding note line to head")
}
})?;
Ok(())
}
fn raw_add_note_line_to_head(line: &str) -> Result<(), GitError> {
ensure_symbolic_write_ref_exists()?;
let current_note_head =
git_rev_parse(REFS_NOTES_WRITE_SYMBOLIC_REF).unwrap_or(EMPTY_OID.to_string());
let current_symbolic_ref_target = git_rev_parse_symbolic_ref(REFS_NOTES_WRITE_SYMBOLIC_REF)
.expect("Missing symbolic-ref for target");
let temp_target = create_temp_add_head(¤t_note_head)?;
defer!(remove_reference(&temp_target)
.expect("Deleting our own temp ref for adding should never fail"));
if internal_get_head_revision().is_err() {
return Err(GitError::MissingHead {
reference: "HEAD".to_string(),
});
}
capture_git_output(
&[
"notes",
"--ref",
&temp_target,
"append",
"-m",
line,
],
&None,
)?;
git_update_ref(unindent(
format!(
r#"
start
symref-verify {REFS_NOTES_WRITE_SYMBOLIC_REF} {current_symbolic_ref_target}
update {current_symbolic_ref_target} {temp_target} {current_note_head}
commit
"#
)
.as_str(),
))?;
Ok(())
}
fn ensure_remote_exists() -> Result<(), GitError> {
if get_git_perf_remote(GIT_PERF_REMOTE).is_some() {
return Ok(());
}
if let Some(x) = get_git_perf_remote(GIT_ORIGIN) {
return set_git_perf_remote(GIT_PERF_REMOTE, &x);
}
Err(GitError::NoUpstream {})
}
fn create_temp_ref_name(prefix: &str) -> String {
let suffix = random_suffix();
format!("{prefix}{suffix}")
}
fn ensure_symbolic_write_ref_exists() -> Result<(), GitError> {
if git_rev_parse(REFS_NOTES_WRITE_SYMBOLIC_REF).is_err() {
let target = create_temp_ref_name(REFS_NOTES_WRITE_TARGET_PREFIX);
git_update_ref(unindent(
format!(
r#"
start
symref-create {REFS_NOTES_WRITE_SYMBOLIC_REF} {target}
commit
"#
)
.as_str(),
))
.or_else(|err| {
if let GitError::RefFailedToLock { .. } = err {
Ok(())
} else {
Err(err)
}
})?;
}
Ok(())
}
fn random_suffix() -> String {
let suffix: u32 = thread_rng().gen();
format!("{suffix:08x}")
}
fn fetch(work_dir: Option<&Path>) -> Result<(), GitError> {
ensure_remote_exists()?;
let ref_before = git_rev_parse(REFS_NOTES_BRANCH).ok();
capture_git_output(
&[
"fetch",
"--atomic",
"--no-write-fetch-head",
GIT_PERF_REMOTE,
format!("+{REFS_NOTES_BRANCH}:{REFS_NOTES_BRANCH}").as_str(),
],
&work_dir,
)
.map_err(map_git_error)?;
let ref_after = git_rev_parse(REFS_NOTES_BRANCH).ok();
if ref_before == ref_after {
println!("Already up to date");
}
Ok(())
}
fn reconcile_branch_with(target: &str, branch: &str) -> Result<(), GitError> {
_ = capture_git_output(
&[
"notes",
"--ref",
target,
"merge",
"-s",
"cat_sort_uniq",
branch,
],
&None,
)?;
Ok(())
}
fn create_temp_ref(prefix: &str, current_head: &str) -> Result<String, GitError> {
let target = create_temp_ref_name(prefix);
if current_head != EMPTY_OID {
git_update_ref(unindent(
format!(
r#"
start
create {target} {current_head}
commit
"#
)
.as_str(),
))?;
}
Ok(target)
}
fn create_temp_rewrite_head(current_notes_head: &str) -> Result<String, GitError> {
create_temp_ref(REFS_NOTES_REWRITE_TARGET_PREFIX, current_notes_head)
}
fn create_temp_add_head(current_notes_head: &str) -> Result<String, GitError> {
create_temp_ref(REFS_NOTES_ADD_TARGET_PREFIX, current_notes_head)
}
fn compact_head(target: &str) -> Result<(), GitError> {
let new_removal_head = git_rev_parse(format!("{target}^{{tree}}").as_str())?;
let compaction_head = capture_git_output(
&["commit-tree", "-m", "cutoff history", &new_removal_head],
&None,
)?
.stdout;
let compaction_head = compaction_head.trim();
git_update_ref(unindent(
format!(
r#"
start
update {target} {compaction_head}
commit
"#
)
.as_str(),
))?;
Ok(())
}
fn retry_notify(err: GitError, dur: Duration) {
debug!("Error happened at {dur:?}: {err}");
warn!("Retrying...");
}
pub fn remove_measurements_from_commits(older_than: DateTime<Utc>) -> Result<()> {
let op = || -> Result<(), ::backoff::Error<GitError>> {
raw_remove_measurements_from_commits(older_than).map_err(map_git_error_for_backoff)
};
let backoff = default_backoff();
::backoff::retry_notify(backoff, op, retry_notify).map_err(|e| match e {
::backoff::Error::Permanent(err) => {
anyhow!(err).context("Permanent failure while adding note line to head")
}
::backoff::Error::Transient { err, .. } => {
anyhow!(err).context("Timed out while adding note line to head")
}
})?;
Ok(())
}
fn raw_remove_measurements_from_commits(older_than: DateTime<Utc>) -> Result<(), GitError> {
fetch(None)?;
let current_notes_head = git_rev_parse(REFS_NOTES_BRANCH)?;
let target = create_temp_rewrite_head(¤t_notes_head)?;
remove_measurements_from_reference(&target, older_than)?;
compact_head(&target)?;
git_push_notes_ref(¤t_notes_head, &target, &None)?;
git_update_ref(unindent(
format!(
r#"
start
update {REFS_NOTES_BRANCH} {target}
commit
"#
)
.as_str(),
))?;
remove_reference(&target)?;
Ok(())
}
fn remove_measurements_from_reference(
reference: &str,
older_than: DateTime<Utc>,
) -> Result<(), GitError> {
let oldest_timestamp = older_than.timestamp();
let mut list_notes = spawn_git_command(&["notes", "--ref", reference, "list"], &None, None)?;
let notes_out = list_notes.stdout.take().unwrap();
let mut get_commit_dates = spawn_git_command(
&[
"log",
"--ignore-missing",
"--no-walk",
"--pretty=format:%H %ct",
"--stdin",
],
&None,
Some(Stdio::piped()),
)?;
let dates_in = get_commit_dates.stdin.take().unwrap();
let dates_out = get_commit_dates.stdout.take().unwrap();
let mut remove_measurements = spawn_git_command(
&[
"notes",
"--ref",
reference,
"remove",
"--stdin",
"--ignore-missing",
],
&None,
Some(Stdio::piped()),
)?;
let removal_in = remove_measurements.stdin.take().unwrap();
let removal_out = remove_measurements.stdout.take().unwrap();
let removal_handler = thread::spawn(move || {
let reader = BufReader::new(dates_out);
let mut writer = BufWriter::new(removal_in);
for line in reader.lines().map_while(Result::ok) {
if let Some((commit, timestamp)) = line.split_whitespace().take(2).collect_tuple() {
if let Ok(timestamp) = timestamp.parse::<i64>() {
if timestamp <= oldest_timestamp {
writeln!(writer, "{commit}").expect("Could not write to stream");
}
}
}
}
});
let debugging_handler = thread::spawn(move || {
let reader = BufReader::new(removal_out);
reader
.lines()
.map_while(Result::ok)
.for_each(|l| println!("{l}"))
});
{
let reader = BufReader::new(notes_out);
let mut writer = BufWriter::new(dates_in);
reader.lines().map_while(Result::ok).for_each(|line| {
if let Some(line) = line.split_whitespace().nth(1) {
writeln!(writer, "{line}").expect("Failed to write to pipe");
}
});
}
removal_handler.join().expect("Failed to join");
debugging_handler.join().expect("Failed to join");
list_notes.wait()?;
get_commit_dates.wait()?;
remove_measurements.wait()?;
Ok(())
}
fn new_symbolic_write_ref() -> Result<String, GitError> {
let target = create_temp_ref_name(REFS_NOTES_WRITE_TARGET_PREFIX);
git_update_ref(unindent(
format!(
r#"
start
symref-update {REFS_NOTES_WRITE_SYMBOLIC_REF} {target}
commit
"#
)
.as_str(),
))?;
Ok(target)
}
const EMPTY_OID: &str = "0000000000000000000000000000000000000000";
fn consolidate_write_branches_into(
current_upstream_oid: &str,
target: &str,
except_ref: Option<&str>,
) -> Result<Vec<Reference>, GitError> {
git_update_ref(unindent(
format!(
r#"
start
verify {REFS_NOTES_BRANCH} {current_upstream_oid}
update {target} {current_upstream_oid} {EMPTY_OID}
commit
"#
)
.as_str(),
))?;
let additional_args = vec![format!("{REFS_NOTES_WRITE_TARGET_PREFIX}*")];
let refs = get_refs(additional_args)?
.into_iter()
.filter(|r| r.refname != except_ref.unwrap_or_default())
.collect_vec();
for reference in &refs {
reconcile_branch_with(target, &reference.oid)?;
}
Ok(refs)
}
fn remove_reference(ref_name: &str) -> Result<(), GitError> {
git_update_ref(unindent(
format!(
r#"
start
delete {ref_name}
commit
"#
)
.as_str(),
))
}
fn raw_push(work_dir: Option<&Path>) -> Result<(), GitError> {
ensure_remote_exists()?;
let new_write_ref = new_symbolic_write_ref()?;
let merge_ref = create_temp_ref_name(REFS_NOTES_MERGE_BRANCH_PREFIX);
defer!(remove_reference(&merge_ref).expect("Deleting our own branch should never fail"));
let current_upstream_oid = git_rev_parse(REFS_NOTES_BRANCH).unwrap_or(EMPTY_OID.to_string());
let refs =
consolidate_write_branches_into(¤t_upstream_oid, &merge_ref, Some(&new_write_ref))?;
if refs.is_empty() && current_upstream_oid == EMPTY_OID {
return Err(GitError::MissingMeasurements);
}
git_push_notes_ref(¤t_upstream_oid, &merge_ref, &work_dir)?;
fetch(None)?;
let mut commands = Vec::new();
commands.push(String::from("start"));
for Reference { refname, oid } in &refs {
commands.push(format!("delete {refname} {oid}"));
}
commands.push(String::from("commit"));
commands.push(String::new());
let commands = commands.join("\n");
git_update_ref(commands)?;
Ok(())
}
fn git_push_notes_ref(
expected_upstream: &str,
push_ref: &str,
working_dir: &Option<&Path>,
) -> Result<(), GitError> {
let output = capture_git_output(
&[
"push",
"--porcelain",
format!("--force-with-lease={REFS_NOTES_BRANCH}:{expected_upstream}").as_str(),
GIT_PERF_REMOTE,
format!("{push_ref}:{REFS_NOTES_BRANCH}").as_str(),
],
working_dir,
);
match output {
Ok(output) => {
print!("{}", &output.stdout);
Ok(())
}
Err(GitError::ExecError { command: _, output }) => {
let successful_push = output.stdout.lines().any(|l| {
l.contains(format!("{REFS_NOTES_BRANCH}:").as_str()) && !l.starts_with('!')
});
if successful_push {
Ok(())
} else {
Err(GitError::RefFailedToPush { output })
}
}
Err(e) => Err(e),
}?;
Ok(())
}
pub fn prune() -> Result<()> {
let op = || -> Result<(), ::backoff::Error<GitError>> {
raw_prune().map_err(map_git_error_for_backoff)
};
let backoff = default_backoff();
::backoff::retry_notify(backoff, op, retry_notify).map_err(|e| match e {
::backoff::Error::Permanent(err) => {
anyhow!(err).context("Permanent failure while pruning refs")
}
::backoff::Error::Transient { err, .. } => anyhow!(err).context("Timed out pushing refs"),
})?;
Ok(())
}
fn raw_prune() -> Result<(), GitError> {
if is_shallow_repo()? {
return Err(GitError::ShallowRepository);
}
pull_internal(None)?;
let current_notes_head = git_rev_parse(REFS_NOTES_BRANCH)?;
let target = create_temp_rewrite_head(¤t_notes_head)?;
capture_git_output(&["notes", "--ref", &target, "prune"], &None)?;
compact_head(&target)?;
git_push_notes_ref(¤t_notes_head, &target, &None)?;
git_update_ref(unindent(
format!(
r#"
start
update {REFS_NOTES_BRANCH} {target}
commit
"#
)
.as_str(),
))?;
remove_reference(&target)?;
Ok(())
}
fn get_refs(additional_args: Vec<String>) -> Result<Vec<Reference>, GitError> {
let mut args = vec!["for-each-ref", "--format=%(refname)%00%(objectname)"];
args.extend(additional_args.iter().map(|s| s.as_str()));
let output = capture_git_output(&args, &None)?;
Ok(output
.stdout
.lines()
.map(|s| {
let items = s.split('\0').take(2).collect_vec();
assert!(items.len() == 2);
Reference {
refname: items[0].to_string(),
oid: items[1].to_string(),
}
})
.collect_vec())
}
struct TempRef {
ref_name: String,
}
impl TempRef {
fn new(prefix: &str) -> Result<Self, GitError> {
Ok(TempRef {
ref_name: create_temp_ref(prefix, EMPTY_OID)?,
})
}
}
impl Drop for TempRef {
fn drop(&mut self) {
remove_reference(&self.ref_name)
.unwrap_or_else(|_| panic!("Failed to remove reference: {}", self.ref_name))
}
}
fn update_read_branch() -> Result<TempRef, GitError> {
let temp_ref = TempRef::new(REFS_NOTES_READ_PREFIX)?;
let current_upstream_oid = git_rev_parse(REFS_NOTES_BRANCH).unwrap_or(EMPTY_OID.to_string());
let _ = consolidate_write_branches_into(¤t_upstream_oid, &temp_ref.ref_name, None)?;
Ok(temp_ref)
}
pub fn walk_commits(num_commits: usize) -> Result<Vec<(String, Vec<String>)>> {
let temp_ref = update_read_branch()?;
let output = capture_git_output(
&[
"--no-pager",
"log",
"--no-color",
"--ignore-missing",
"-n",
num_commits.to_string().as_str(),
"--first-parent",
"--pretty=--,%H,%D%n%N",
"--decorate=full",
format!("--notes={}", temp_ref.ref_name).as_str(),
"HEAD",
],
&None,
)
.context("Failed to retrieve commits")?;
let mut commits: Vec<(String, Vec<String>)> = Vec::new();
let mut detected_shallow = false;
let mut current_commit: Option<String> = None;
for l in output.stdout.lines() {
if l.starts_with("--") {
let info = l.split(',').collect_vec();
let commit_hash = info
.get(1)
.expect("No commit header found before measurement line in git log output");
detected_shallow |= info[2..].contains(&"grafted");
current_commit = Some(commit_hash.to_string());
commits.push((commit_hash.to_string(), Vec::new()));
} else if let Some(commit_hash) = current_commit.as_ref() {
if let Some(last) = commits.last_mut() {
last.1.push(l.to_string());
} else {
commits.push((commit_hash.to_string(), vec![l.to_string()]));
}
}
}
if detected_shallow && commits.len() < num_commits {
bail!("Refusing to continue as commit log depth was limited by shallow clone");
}
Ok(commits)
}
pub fn pull(work_dir: Option<&Path>) -> Result<()> {
pull_internal(work_dir)?;
Ok(())
}
fn pull_internal(work_dir: Option<&Path>) -> Result<(), GitError> {
fetch(work_dir).or_else(|err| match err {
GitError::RefConcurrentModification { .. } | GitError::RefFailedToLock { .. } => Ok(()),
_ => Err(err),
})?;
Ok(())
}
pub fn push(work_dir: Option<&Path>) -> Result<()> {
let op = || {
raw_push(work_dir)
.map_err(map_git_error_for_backoff)
.map_err(|e: ::backoff::Error<GitError>| match e {
::backoff::Error::Transient { .. } => {
match pull_internal(work_dir).map_err(map_git_error_for_backoff) {
Ok(_) => e,
Err(e) => e,
}
}
::backoff::Error::Permanent { .. } => e,
})
};
let backoff = default_backoff();
::backoff::retry_notify(backoff, op, retry_notify).map_err(|e| match e {
::backoff::Error::Permanent(err) => {
anyhow!(err).context("Permanent failure while pushing refs")
}
::backoff::Error::Transient { err, .. } => anyhow!(err).context("Timed out pushing refs"),
})?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
use std::env::{self, set_current_dir};
use std::process;
use httptest::{
http::{header::AUTHORIZATION, Uri},
matchers::{self, request},
responders::status_code,
Expectation, Server,
};
use serial_test::serial;
use tempfile::{tempdir, TempDir};
fn run_git_command(args: &[&str], dir: &Path) {
assert!(process::Command::new("git")
.args(args)
.envs([
("GIT_CONFIG_NOSYSTEM", "true"),
("GIT_CONFIG_GLOBAL", "/dev/null"),
("GIT_AUTHOR_NAME", "testuser"),
("GIT_AUTHOR_EMAIL", "testuser@example.com"),
("GIT_COMMITTER_NAME", "testuser"),
("GIT_COMMITTER_EMAIL", "testuser@example.com"),
])
.stdout(Stdio::null())
.stderr(Stdio::null())
.current_dir(dir)
.status()
.expect("Failed to spawn git command")
.success());
}
fn init_repo(dir: &Path) {
run_git_command(&["init", "--initial-branch", "master"], dir);
run_git_command(&["commit", "--allow-empty", "-m", "Initial commit"], dir);
}
fn dir_with_repo() -> TempDir {
let tempdir = tempdir().unwrap();
init_repo(tempdir.path());
tempdir
}
fn add_server_remote(origin_url: Uri, extra_header: &str, dir: &Path) {
let url = origin_url.to_string();
run_git_command(&["remote", "add", "origin", &url], dir);
run_git_command(
&[
"config",
"--add",
format!("http.{}.extraHeader", url).as_str(),
extra_header,
],
dir,
);
}
fn hermetic_git_env() {
env::set_var("GIT_CONFIG_NOSYSTEM", "true");
env::set_var("GIT_CONFIG_GLOBAL", "/dev/null");
env::set_var("GIT_AUTHOR_NAME", "testuser");
env::set_var("GIT_AUTHOR_EMAIL", "testuser@example.com");
env::set_var("GIT_COMMITTER_NAME", "testuser");
env::set_var("GIT_COMMITTER_EMAIL", "testuser@example.com");
}
#[test]
#[serial]
fn test_customheader_pull() {
let tempdir = dir_with_repo();
set_current_dir(tempdir.path()).expect("Failed to change dir");
let test_server = Server::run();
add_server_remote(
test_server.url(""),
"AUTHORIZATION: sometoken",
tempdir.path(),
);
test_server.expect(
Expectation::matching(request::headers(matchers::contains((
AUTHORIZATION.as_str(),
"sometoken",
))))
.times(1..)
.respond_with(status_code(200)),
);
hermetic_git_env();
pull(None).expect_err("We have no valid git http server setup -> should fail");
}
#[test]
#[serial]
fn test_customheader_push() {
let tempdir = dir_with_repo();
set_current_dir(tempdir.path()).expect("Failed to change dir");
let test_server = Server::run();
add_server_remote(
test_server.url(""),
"AUTHORIZATION: someothertoken",
tempdir.path(),
);
test_server.expect(
Expectation::matching(request::headers(matchers::contains((
AUTHORIZATION.as_str(),
"someothertoken",
))))
.times(1..)
.respond_with(status_code(200)),
);
ensure_symbolic_write_ref_exists().expect("Failed to ensure symbolic write ref exists");
add_note_line_to_head("test note line").expect("Failed to add note line");
hermetic_git_env();
let error = push(None);
error
.as_ref()
.expect_err("We have no valid git http server setup -> should fail");
dbg!(&error);
}
#[test]
fn test_random_suffix() {
for _ in 1..1000 {
let first = random_suffix();
dbg!(&first);
let second = random_suffix();
dbg!(&second);
let all_hex = |s: &String| s.chars().all(|c| c.is_ascii_hexdigit());
assert_ne!(first, second);
assert_eq!(first.len(), 8);
assert_eq!(second.len(), 8);
assert!(all_hex(&first));
assert!(all_hex(&second));
}
}
#[test]
#[serial]
fn test_empty_or_never_pushed_remote_error_for_fetch() {
let tempdir = tempdir().unwrap();
init_repo(tempdir.path());
set_current_dir(tempdir.path()).expect("Failed to change dir");
let git_dir_url = format!("file://{}", tempdir.path().display());
run_git_command(&["remote", "add", "origin", &git_dir_url], tempdir.path());
std::env::set_var("GIT_TRACE", "true");
let result = super::fetch(Some(tempdir.path()));
match result {
Err(GitError::NoRemoteMeasurements { output }) => {
assert!(
output.stderr.contains(GIT_PERF_REMOTE),
"Expected output to contain {GIT_PERF_REMOTE}. Output: '{}'",
output.stderr
)
}
other => panic!("Expected NoRemoteMeasurements error, got: {:?}", other),
}
}
#[test]
#[serial]
fn test_empty_or_never_pushed_remote_error_for_push() {
let tempdir = tempdir().unwrap();
init_repo(tempdir.path());
set_current_dir(tempdir.path()).expect("Failed to change dir");
run_git_command(
&["remote", "add", "origin", "invalid invalid"],
tempdir.path(),
);
std::env::set_var("GIT_TRACE", "true");
add_note_line_to_head("test line, invalid measurement, does not matter").unwrap();
let result = super::raw_push(Some(tempdir.path()));
match result {
Err(GitError::RefFailedToPush { output }) => {
assert!(
output.stderr.contains(GIT_PERF_REMOTE),
"Expected output to contain {GIT_PERF_REMOTE}, got: {}",
output.stderr
)
}
other => panic!("Expected RefFailedToPush error, got: {:?}", other),
}
}
}