use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, LazyLock, Mutex, OnceLock};
use std::thread;
use std::time::{Duration, Instant};
use crate::shell_exec::Cmd;
use dashmap::DashMap;
use once_cell::sync::OnceCell;
use wait_timeout::ChildExt;
use anyhow::{Context, bail};
use dunce::canonicalize;
use crate::config::{LoadError, ProjectConfig, ResolvedConfig, UserConfig};
use super::{DefaultBranchName, GitError, IntegrationReason, LineDiff, WorktreeInfo};
pub(super) use super::{BranchCategory, CompletionBranch, DiffStats, GitRemoteUrl};
mod branch;
mod branches;
mod config;
mod diff;
mod integration;
mod remotes;
mod sha_cache;
mod working_tree;
mod worktrees;
pub use branch::Branch;
pub use working_tree::WorkingTree;
pub(super) use working_tree::path_to_logging_context;
#[derive(Debug)]
pub(crate) struct StreamCommandError {
pub output: String,
pub command: String,
pub exit_info: String,
}
impl std::fmt::Display for StreamCommandError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.output)
}
}
impl std::error::Error for StreamCommandError {}
fn stream_exit_result(
status: std::process::ExitStatus,
buffer: &Arc<Mutex<Vec<String>>>,
cmd_str: &str,
) -> anyhow::Result<()> {
if status.success() {
return Ok(());
}
let lines = buffer.lock().unwrap();
let exit_info = status
.code()
.map(|c| format!("exit code {c}"))
.unwrap_or_else(|| "killed by signal".to_string());
Err(StreamCommandError {
output: lines.join("\n"),
command: cmd_str.to_string(),
exit_info,
}
.into())
}
#[derive(Debug, Default)]
pub(super) struct RepoCache {
pub(super) is_bare: OnceCell<bool>,
pub(super) repo_path: OnceCell<PathBuf>,
pub(super) default_branch: OnceCell<Option<String>>,
pub(super) invalid_default_branch: OnceCell<Option<String>>,
pub(super) integration_target: OnceCell<Option<String>>,
pub(super) primary_remote: OnceCell<Option<String>>,
pub(super) primary_remote_url: OnceCell<Option<String>>,
pub(super) project_identifier: OnceCell<String>,
pub(super) project_config: OnceCell<Option<ProjectConfig>>,
pub(super) user_config: OnceCell<UserConfig>,
pub(super) resolved_config: OnceCell<ResolvedConfig>,
pub(super) sparse_checkout_paths: OnceCell<Vec<String>>,
pub(super) merge_base: DashMap<(String, String), Option<String>>,
pub(super) ahead_behind: DashMap<(String, String), (usize, usize)>,
pub(super) effective_remote_urls: DashMap<String, Option<String>>,
pub(super) resolved_refs: DashMap<String, String>,
pub(super) effective_integration_targets: DashMap<String, String>,
pub(super) integration_reasons: DashMap<(String, String), (String, Option<IntegrationReason>)>,
pub(super) tree_shas: DashMap<String, String>,
pub(super) commit_shas: DashMap<String, String>,
pub(super) git_dirs: DashMap<PathBuf, PathBuf>,
pub(super) worktree_roots: DashMap<PathBuf, PathBuf>,
pub(super) current_branches: DashMap<PathBuf, Option<String>>,
}
#[derive(Debug, Clone)]
pub enum ResolvedWorktree {
Worktree {
path: PathBuf,
branch: Option<String>,
},
BranchOnly {
branch: String,
},
}
static BASE_PATH: OnceLock<PathBuf> = OnceLock::new();
static DEFAULT_BASE_PATH: LazyLock<PathBuf> = LazyLock::new(|| PathBuf::from("."));
static GIT_COMMON_DIR_CACHE: LazyLock<DashMap<PathBuf, PathBuf>> = LazyLock::new(DashMap::new);
pub fn set_base_path(path: PathBuf) {
BASE_PATH.set(path).ok();
}
fn base_path() -> &'static PathBuf {
BASE_PATH.get().unwrap_or(&DEFAULT_BASE_PATH)
}
#[derive(Debug, Clone)]
pub struct Repository {
discovery_path: PathBuf,
git_common_dir: PathBuf,
pub(super) cache: Arc<RepoCache>,
}
impl Repository {
pub fn current() -> anyhow::Result<Self> {
Self::at(base_path().clone())
}
pub fn at(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
let discovery_path = path.into();
let git_common_dir = Self::resolve_git_common_dir(&discovery_path)?;
Ok(Self {
discovery_path,
git_common_dir,
cache: Arc::new(RepoCache::default()),
})
}
pub fn config(&self) -> &ResolvedConfig {
self.cache.resolved_config.get_or_init(|| {
let project_id = self.project_identifier().ok();
self.user_config().resolved(project_id.as_deref())
})
}
pub fn user_config(&self) -> &UserConfig {
self.cache.user_config.get_or_init(|| {
let (config, warnings) = UserConfig::load_with_warnings();
for warning in &warnings {
match warning {
LoadError::File { path, label, err } => {
crate::styling::eprintln!(
"{}",
crate::styling::warning_message(format!(
"{label} at {} failed to parse, skipping",
crate::path::format_path_for_display(path),
))
);
crate::styling::eprintln!(
"{}",
crate::styling::format_with_gutter(&err.to_string(), None)
);
}
LoadError::Env { err, vars } => {
let var_list: Vec<_> = vars
.iter()
.map(|(name, value)| format!("{name}={value}"))
.collect();
crate::styling::eprintln!(
"{}",
crate::styling::warning_message(format!(
"Ignoring env var overrides: {}",
var_list.join(", ")
))
);
crate::styling::eprintln!(
"{}",
crate::styling::format_with_gutter(err.trim(), None)
);
}
LoadError::Validation(err) => {
crate::styling::eprintln!(
"{}",
crate::styling::warning_message(format!(
"Config validation warning: {err}"
))
);
}
}
}
config
})
}
#[doc(hidden)]
pub fn shares_cache_with(&self, other: &Repository) -> bool {
Arc::ptr_eq(&self.cache, &other.cache)
}
fn resolve_git_common_dir(discovery_path: &Path) -> anyhow::Result<PathBuf> {
if let Some(cached) = GIT_COMMON_DIR_CACHE.get(discovery_path) {
return Ok(cached.clone());
}
let output = Cmd::new("git")
.args(["rev-parse", "--git-common-dir"])
.current_dir(discovery_path)
.context(path_to_logging_context(discovery_path))
.run()
.context("Failed to execute: git rev-parse --git-common-dir")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!("{}", stderr.trim());
}
let stdout = String::from_utf8_lossy(&output.stdout);
let path = PathBuf::from(stdout.trim());
let absolute_path = if path.is_relative() {
discovery_path.join(&path)
} else {
path
};
let resolved =
canonicalize(&absolute_path).context("Failed to resolve git common directory")?;
GIT_COMMON_DIR_CACHE.insert(discovery_path.to_path_buf(), resolved.clone());
Ok(resolved)
}
pub fn discovery_path(&self) -> &Path {
&self.discovery_path
}
pub fn current_worktree(&self) -> WorkingTree<'_> {
self.worktree_at(base_path().clone())
}
pub fn worktree_at(&self, path: impl Into<PathBuf>) -> WorkingTree<'_> {
let raw = path.into();
let path = canonicalize(&raw).unwrap_or(raw);
WorkingTree { repo: self, path }
}
pub fn branch(&self, name: &str) -> Branch<'_> {
Branch {
repo: self,
name: name.to_string(),
}
}
pub fn require_current_branch(&self, action: &str) -> anyhow::Result<String> {
self.current_worktree().branch()?.ok_or_else(|| {
GitError::DetachedHead {
action: Some(action.into()),
}
.into()
})
}
pub fn git_common_dir(&self) -> &Path {
&self.git_common_dir
}
pub fn last_fetch_epoch(&self) -> Option<u64> {
let fetch_head = self.git_common_dir().join("FETCH_HEAD");
let metadata = std::fs::metadata(fetch_head).ok()?;
let modified = metadata.modified().ok()?;
modified
.duration_since(std::time::UNIX_EPOCH)
.ok()
.map(|d| d.as_secs())
}
pub fn wt_dir(&self) -> PathBuf {
self.git_common_dir().join("wt")
}
pub fn clear_git_commands_cache(&self) -> usize {
sha_cache::clear_all(self)
}
pub fn wt_logs_dir(&self) -> PathBuf {
self.wt_dir().join("logs")
}
pub fn wt_trash_dir(&self) -> PathBuf {
self.wt_dir().join("trash")
}
pub fn repo_path(&self) -> anyhow::Result<&Path> {
self.cache
.repo_path
.get_or_try_init(|| {
if let Ok(out) = Cmd::new("git")
.args(["rev-parse", "--show-toplevel"])
.current_dir(&self.git_common_dir)
.context(path_to_logging_context(&self.git_common_dir))
.run()
&& out.status.success()
{
return Ok(PathBuf::from(String::from_utf8_lossy(&out.stdout).trim()));
}
if self.is_bare()? {
Ok(self.git_common_dir.clone())
} else {
Ok(self
.git_common_dir
.parent()
.expect("Git directory has no parent")
.to_path_buf())
}
})
.map(|p| p.as_path())
}
pub fn is_bare(&self) -> anyhow::Result<bool> {
self.cache
.is_bare
.get_or_try_init(|| {
let output = Cmd::new("git")
.args(["config", "--type=bool", "core.bare"])
.current_dir(&self.git_common_dir)
.context(path_to_logging_context(&self.git_common_dir))
.run()
.context("failed to check if repository is bare")?;
match output.status.code() {
Some(0) => Ok(String::from_utf8_lossy(&output.stdout).trim() == "true"),
Some(1) => Ok(false),
_ => {
let stderr = String::from_utf8_lossy(&output.stderr);
bail!("git config core.bare failed: {}", stderr.trim());
}
}
})
.copied()
}
pub fn sparse_checkout_paths(&self) -> &[String] {
self.cache.sparse_checkout_paths.get_or_init(|| {
let output = match self.run_command_output(&["sparse-checkout", "list"]) {
Ok(out) => out,
Err(_) => return Vec::new(),
};
if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
stdout.lines().map(String::from).collect()
} else {
Vec::new()
}
})
}
pub fn is_builtin_fsmonitor_enabled(&self) -> bool {
self.run_command(&["config", "--get", "core.fsmonitor"])
.ok()
.map(|s| s.trim() == "true")
.unwrap_or(false)
}
pub fn start_fsmonitor_daemon_at(&self, path: &Path) {
log::debug!("$ git fsmonitor--daemon start [{}]", path.display());
let mut cmd = std::process::Command::new("git");
cmd.args(["fsmonitor--daemon", "start"])
.current_dir(path)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null());
crate::shell_exec::scrub_directive_env_vars(&mut cmd);
let result = cmd.status();
match result {
Ok(status) if !status.success() => {
log::debug!("fsmonitor daemon start exited {status} (usually fine)");
}
Err(e) => {
log::debug!("fsmonitor daemon start failed (usually fine): {e}");
}
_ => {}
}
}
pub fn worktree_state(&self) -> anyhow::Result<Option<String>> {
let git_dir = self.worktree_at(self.discovery_path()).git_dir()?;
if git_dir.join("MERGE_HEAD").exists() {
return Ok(Some("MERGING".to_string()));
}
if git_dir.join("rebase-merge").exists() || git_dir.join("rebase-apply").exists() {
let rebase_dir = if git_dir.join("rebase-merge").exists() {
git_dir.join("rebase-merge")
} else {
git_dir.join("rebase-apply")
};
if let (Ok(msgnum), Ok(end)) = (
std::fs::read_to_string(rebase_dir.join("msgnum")),
std::fs::read_to_string(rebase_dir.join("end")),
) {
let current = msgnum.trim();
let total = end.trim();
return Ok(Some(format!("REBASING {}/{}", current, total)));
}
return Ok(Some("REBASING".to_string()));
}
if git_dir.join("CHERRY_PICK_HEAD").exists() {
return Ok(Some("CHERRY-PICKING".to_string()));
}
if git_dir.join("REVERT_HEAD").exists() {
return Ok(Some("REVERTING".to_string()));
}
if git_dir.join("BISECT_LOG").exists() {
return Ok(Some("BISECTING".to_string()));
}
Ok(None)
}
fn logging_context(&self) -> String {
path_to_logging_context(&self.discovery_path)
}
pub fn run_command(&self, args: &[&str]) -> anyhow::Result<String> {
let output = Cmd::new("git")
.args(args.iter().copied())
.current_dir(&self.discovery_path)
.context(self.logging_context())
.run()
.with_context(|| format!("Failed to execute: git {}", args.join(" ")))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let stderr = stderr.replace('\r', "\n");
let stdout = String::from_utf8_lossy(&output.stdout);
let error_msg = [stderr.trim(), stdout.trim()]
.into_iter()
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
.join("\n");
bail!("{}", error_msg);
}
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
Ok(stdout)
}
pub fn run_command_check(&self, args: &[&str]) -> anyhow::Result<bool> {
Ok(self.run_command_output(args)?.status.success())
}
pub const SLOW_OPERATION_DELAY_MS: i64 = 400;
pub fn run_command_delayed_stream(
&self,
args: &[&str],
delay_ms: i64,
progress_message: Option<String>,
) -> anyhow::Result<()> {
let delay_ms = std::env::var("WORKTRUNK_TEST_DELAYED_STREAM_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(delay_ms);
let cmd_str = format!("git {}", args.join(" "));
log::debug!(
"$ {} [{}] (delayed stream, {}ms)",
cmd_str,
self.logging_context(),
delay_ms
);
let mut cmd = std::process::Command::new("git");
cmd.args(args)
.current_dir(&self.discovery_path)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
crate::shell_exec::scrub_directive_env_vars(&mut cmd);
let mut child = cmd
.spawn()
.with_context(|| format!("Failed to spawn: {}", cmd_str))?;
let stdout = child.stdout.take().expect("stdout was piped");
let stderr = child.stderr.take().expect("stderr was piped");
let streaming = Arc::new(AtomicBool::new(false));
let buffer = Arc::new(Mutex::new(Vec::new()));
let stdout_handle = {
let streaming = streaming.clone();
let buffer = buffer.clone();
thread::spawn(move || {
let reader = BufReader::new(stdout);
for line in reader.lines().map_while(Result::ok) {
if streaming.load(Ordering::Relaxed) {
let _ = writeln!(std::io::stderr(), "{}", line);
let _ = std::io::stderr().flush();
} else {
buffer.lock().unwrap().push(line);
}
}
})
};
let stderr_handle = {
let streaming = streaming.clone();
let buffer = buffer.clone();
thread::spawn(move || {
let reader = BufReader::new(stderr);
for line in reader.lines().map_while(Result::ok) {
if streaming.load(Ordering::Relaxed) {
let _ = writeln!(std::io::stderr(), "{}", line);
let _ = std::io::stderr().flush();
} else {
buffer.lock().unwrap().push(line);
}
}
})
};
let start = Instant::now();
if delay_ms >= 0 {
let delay = Duration::from_millis(delay_ms as u64);
let remaining = delay.saturating_sub(start.elapsed());
if !remaining.is_zero()
&& let Some(status) = child
.wait_timeout(remaining)
.context("Failed to wait for command")?
{
let _ = stdout_handle.join();
let _ = stderr_handle.join();
return stream_exit_result(status, &buffer, &cmd_str);
}
streaming.store(true, Ordering::Relaxed);
if let Some(ref msg) = progress_message {
let _ = writeln!(std::io::stderr(), "{}", msg);
}
for line in buffer.lock().unwrap().drain(..) {
let _ = writeln!(std::io::stderr(), "{}", line);
}
let _ = std::io::stderr().flush();
}
let status = child.wait().context("Failed to wait for command")?;
let _ = stdout_handle.join();
let _ = stderr_handle.join();
stream_exit_result(status, &buffer, &cmd_str)
}
pub(super) fn run_command_output(&self, args: &[&str]) -> anyhow::Result<std::process::Output> {
Cmd::new("git")
.args(args.iter().copied())
.current_dir(&self.discovery_path)
.context(self.logging_context())
.run()
.with_context(|| format!("Failed to execute: git {}", args.join(" ")))
}
pub fn extract_failed_command(
err: &anyhow::Error,
) -> (String, Option<super::error::FailedCommand>) {
match err.downcast_ref::<StreamCommandError>() {
Some(e) => (
e.output.clone(),
Some(super::error::FailedCommand {
command: e.command.clone(),
exit_info: e.exit_info.clone(),
}),
),
None => (err.to_string(), None),
}
}
}
#[cfg(test)]
mod tests;