use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use crate::error::PawError;
use crate::logging;
#[derive(Debug, Clone, Copy)]
enum AnsiState {
Normal,
SeenEsc,
InCsi,
InOsc,
InOscEscSeen,
}
pub fn strip_ansi(input: &str) -> String {
let mut output = String::with_capacity(input.len());
let mut state = AnsiState::Normal;
for ch in input.chars() {
match state {
AnsiState::Normal => {
if ch == '\x1b' {
state = AnsiState::SeenEsc;
} else {
output.push(ch);
}
}
AnsiState::SeenEsc => {
if ch == '[' {
state = AnsiState::InCsi;
} else if ch == ']' {
state = AnsiState::InOsc;
} else {
output.push(ch);
state = AnsiState::Normal;
}
}
AnsiState::InCsi => {
if ('@'..='~').contains(&ch) {
state = AnsiState::Normal;
}
}
AnsiState::InOsc => {
if ch == '\x07' {
state = AnsiState::Normal;
} else if ch == '\x1b' {
state = AnsiState::InOscEscSeen;
}
}
AnsiState::InOscEscSeen => {
if ch == '\\' {
state = AnsiState::Normal;
} else {
state = AnsiState::InOsc;
}
}
}
}
output
}
pub fn resolve_session(repo_root: &Path, session_flag: Option<&str>) -> Result<String, PawError> {
let sessions = logging::list_log_sessions(repo_root)?;
if sessions.is_empty() {
return Err(PawError::ReplayError(
"No log sessions found. Session logging may not be enabled.".to_string(),
));
}
if let Some(name) = session_flag {
if sessions.contains(&name.to_string()) {
return Ok(name.to_string());
}
return Err(PawError::ReplayError(format!(
"Session '{name}' not found. Run `git paw replay --list` to see available sessions."
)));
}
let logs_base = repo_root.join(".git-paw").join("logs");
let mut best: Option<(String, std::time::SystemTime)> = None;
for session in &sessions {
let path = logs_base.join(session);
if let Ok(meta) = fs::metadata(&path)
&& let Ok(mtime) = meta.modified()
&& best.as_ref().is_none_or(|(_, t)| mtime > *t)
{
best = Some((session.clone(), mtime));
}
}
best.map(|(name, _)| name)
.ok_or_else(|| PawError::ReplayError("No accessible log sessions found.".to_string()))
}
pub fn find_log(repo_root: &Path, session: &str, branch_query: &str) -> Result<PathBuf, PawError> {
let entries = logging::list_logs_for_session(repo_root, session)?;
if entries.is_empty() {
return Err(PawError::ReplayError(format!(
"No logs found in session '{session}'."
)));
}
for entry in &entries {
let filename = entry.path.file_name().unwrap_or_default().to_string_lossy();
let stem = filename.trim_end_matches(".log");
if entry.branch == branch_query || stem == branch_query || *filename == *branch_query {
return Ok(entry.path.clone());
}
}
let available: Vec<String> = entries
.iter()
.map(|e| {
let filename = e.path.file_name().unwrap_or_default().to_string_lossy();
format!(" {filename} \u{2192} {}", e.branch)
})
.collect();
Err(PawError::ReplayError(format!(
"No log matching '{branch_query}' in session '{session}'. Available branches:\n{}",
available.join("\n")
)))
}
pub fn replay_stripped(log_path: &Path) -> Result<(), PawError> {
let content = fs::read_to_string(log_path)
.map_err(|e| PawError::ReplayError(format!("cannot read log file: {e}")))?;
if content.is_empty() {
return Ok(());
}
let stripped = strip_ansi(&content);
std::io::stdout()
.write_all(stripped.as_bytes())
.map_err(|e| PawError::ReplayError(format!("cannot write to stdout: {e}")))?;
Ok(())
}
pub fn replay_colored(log_path: &Path) -> Result<(), PawError> {
replay_colored_with_path(log_path, std::env::var_os("PATH").as_ref())
}
fn replay_colored_with_path(
log_path: &Path,
path: Option<&std::ffi::OsString>,
) -> Result<(), PawError> {
let content = fs::read(log_path)
.map_err(|e| PawError::ReplayError(format!("cannot read log file: {e}")))?;
if content.is_empty() {
return Ok(());
}
let less_available = if let Some(path_str) = path {
let less_path = Path::new(path_str).join("less");
less_path.exists()
} else {
which::which("less").is_ok()
};
if less_available {
let less_binary = if let Some(path_str) = path {
Path::new(path_str).join("less")
} else {
which::which("less").unwrap_or_default()
};
let mut child = std::process::Command::new(less_binary)
.arg("-R")
.stdin(std::process::Stdio::piped())
.spawn()
.map_err(|e| PawError::ReplayError(format!("cannot start less: {e}")))?;
if let Some(ref mut stdin) = child.stdin {
let _ = stdin.write_all(&content);
}
let _ = child.wait();
} else {
eprintln!("warning: 'less' not found, printing raw output");
std::io::stdout()
.write_all(&content)
.map_err(|e| PawError::ReplayError(format!("cannot write to stdout: {e}")))?;
}
Ok(())
}
pub fn display_list(repo_root: &Path) -> Result<(), PawError> {
let sessions = logging::list_log_sessions(repo_root)?;
if sessions.is_empty() {
println!("No log sessions found. Start a session with logging enabled to capture output.");
return Ok(());
}
for session in &sessions {
let entries = logging::list_logs_for_session(repo_root, session)?;
let label = if entries.len() == 1 {
"branch"
} else {
"branches"
};
println!("{session} ({} {label})", entries.len());
for entry in &entries {
let filename = entry.path.file_name().unwrap_or_default().to_string_lossy();
println!(" {filename} \u{2192} {}", entry.branch);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn plain_text_unchanged() {
assert_eq!(strip_ansi("hello world"), "hello world");
}
#[test]
fn sgr_removed() {
assert_eq!(strip_ansi("\x1b[31mred text\x1b[0m"), "red text");
}
#[test]
fn cursor_sequences_removed() {
assert_eq!(strip_ansi("\x1b[Hhello\x1b[2J"), "hello");
}
#[test]
fn multiple_sequences_per_line() {
assert_eq!(
strip_ansi("\x1b[1m\x1b[31mBold Red\x1b[0m Normal"),
"Bold Red Normal"
);
}
#[test]
fn incomplete_csi_at_end() {
assert_eq!(strip_ansi("hello\x1b["), "hello");
}
#[test]
fn incomplete_esc_at_end() {
assert_eq!(strip_ansi("hello\x1b"), "hello");
}
#[test]
fn empty_input() {
assert_eq!(strip_ansi(""), "");
}
#[test]
fn cursor_movement_codes() {
assert_eq!(strip_ansi("\x1b[5Aup\x1b[3Bdown"), "updown");
}
#[test]
fn erase_line_codes() {
assert_eq!(strip_ansi("\x1b[Ktext\x1b[2K"), "text");
}
#[test]
fn complex_sgr_params() {
assert_eq!(strip_ansi("\x1b[38;5;196mcolor\x1b[0m"), "color");
}
#[test]
fn osc_title_bel_terminated() {
assert_eq!(strip_ansi("\x1b]0;my title\x07hello"), "hello");
}
#[test]
fn osc_title_st_terminated() {
assert_eq!(strip_ansi("\x1b]0;my title\x1b\\hello"), "hello");
}
#[test]
fn osc_file_uri() {
assert_eq!(
strip_ansi("\x1b]7;file:///Users/me/project\x07prompt$ "),
"prompt$ "
);
}
#[test]
fn osc_hyperlink() {
assert_eq!(
strip_ansi("\x1b]8;;https://example.com\x07click me\x1b]8;;\x07"),
"click me"
);
}
#[test]
fn osc_mixed_with_csi() {
assert_eq!(
strip_ansi("\x1b]0;title\x07\x1b[31mred\x1b[0m plain"),
"red plain"
);
}
#[test]
fn osc_incomplete_at_end() {
assert_eq!(strip_ansi("hello\x1b]0;partial title"), "hello");
}
#[test]
fn osc_incomplete_st_at_end() {
assert_eq!(strip_ansi("hello\x1b]0;title\x1b"), "hello");
}
fn setup_log_dir(root: &Path, session: &str, files: &[&str]) {
let dir = root.join(".git-paw").join("logs").join(session);
fs::create_dir_all(&dir).unwrap();
for f in files {
fs::write(dir.join(f), "log content").unwrap();
}
}
#[test]
fn resolve_explicit_session_found() {
let tmp = TempDir::new().unwrap();
setup_log_dir(tmp.path(), "paw-test", &["main.log"]);
assert_eq!(
resolve_session(tmp.path(), Some("paw-test")).unwrap(),
"paw-test"
);
}
#[test]
fn resolve_explicit_session_not_found() {
let tmp = TempDir::new().unwrap();
setup_log_dir(tmp.path(), "paw-test", &["main.log"]);
let err = resolve_session(tmp.path(), Some("nope")).unwrap_err();
let msg = err.to_string();
assert!(msg.contains("nope"));
assert!(msg.contains("--list"));
}
#[test]
fn resolve_default_most_recent() {
let tmp = TempDir::new().unwrap();
setup_log_dir(tmp.path(), "paw-old", &["main.log"]);
std::thread::sleep(std::time::Duration::from_millis(50));
setup_log_dir(tmp.path(), "paw-new", &["main.log"]);
assert_eq!(resolve_session(tmp.path(), None).unwrap(), "paw-new");
}
#[test]
fn resolve_no_sessions_error() {
let tmp = TempDir::new().unwrap();
let err = resolve_session(tmp.path(), None).unwrap_err();
assert!(err.to_string().contains("No log sessions"));
}
#[test]
fn resolve_session_no_logs_dir_mentions_logging() {
let tmp = TempDir::new().unwrap();
let err = resolve_session(tmp.path(), None).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("logging"),
"error should mention logging not enabled, got: {msg}"
);
}
#[test]
fn find_log_by_original_branch() {
let tmp = TempDir::new().unwrap();
setup_log_dir(tmp.path(), "s", &["feat--add-auth.log"]);
assert!(find_log(tmp.path(), "s", "feat/add-auth").is_ok());
}
#[test]
fn find_log_by_sanitized_name() {
let tmp = TempDir::new().unwrap();
setup_log_dir(tmp.path(), "s", &["feat--add-auth.log"]);
assert!(find_log(tmp.path(), "s", "feat--add-auth").is_ok());
}
#[test]
fn find_log_no_match_lists_available() {
let tmp = TempDir::new().unwrap();
setup_log_dir(tmp.path(), "s", &["main.log", "feat--auth.log"]);
let err = find_log(tmp.path(), "s", "nonexistent").unwrap_err();
let msg = err.to_string();
assert!(msg.contains("nonexistent"));
assert!(msg.contains("main"));
}
#[test]
fn display_list_no_sessions() {
let tmp = TempDir::new().unwrap();
assert!(display_list(tmp.path()).is_ok());
}
#[test]
fn display_list_with_sessions() {
let tmp = TempDir::new().unwrap();
setup_log_dir(tmp.path(), "paw-proj", &["main.log", "feat--x.log"]);
assert!(display_list(tmp.path()).is_ok());
}
#[test]
fn replay_stripped_empty_file() {
let tmp = TempDir::new().unwrap();
let p = tmp.path().join("empty.log");
fs::write(&p, "").unwrap();
assert!(replay_stripped(&p).is_ok());
}
#[test]
fn replay_stripped_strips_ansi() {
let tmp = TempDir::new().unwrap();
let p = tmp.path().join("colored.log");
fs::write(&p, "\x1b[31mred\x1b[0m plain").unwrap();
assert!(replay_stripped(&p).is_ok());
}
#[test]
fn replay_colored_succeeds_with_ansi_content() {
let tmp = TempDir::new().unwrap();
let p = tmp.path().join("colored.log");
fs::write(&p, "\x1b[31mred text\x1b[0m and plain").unwrap();
let mock_less = tmp.path().join("less");
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
fs::write(&mock_less, "#!/bin/sh\nexit 0").unwrap();
let mut perms = fs::metadata(&mock_less).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&mock_less, perms).unwrap();
}
#[cfg(not(unix))]
{
if which::which("less").is_err() {
return;
}
}
let result = replay_colored_with_path(&p, Some(&tmp.path().as_os_str().to_os_string()));
assert!(
result.is_ok(),
"replay_colored should succeed, got: {result:?}"
);
}
#[test]
fn replay_colored_empty_file_succeeds() {
let tmp = TempDir::new().unwrap();
let p = tmp.path().join("empty.log");
fs::write(&p, "").unwrap();
assert!(replay_colored(&p).is_ok());
}
}