use std::path::PathBuf;
use std::time::Duration;
use thiserror::Error;
use tokio::process::Command;
use tokio::sync::{mpsc, watch};
use tracing::{debug, info, warn};
use super::events::FileEvent;
use super::git_state::{GitState, GitStateError};
#[derive(Debug, Clone)]
pub struct GitPollerConfig {
pub poll_interval: Duration,
pub include_untracked: bool,
pub detect_renames: bool,
pub git_timeout: Duration,
pub retry_count: u32,
pub retry_delay: Duration,
pub channel_capacity: usize,
}
impl Default for GitPollerConfig {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(3),
include_untracked: true,
detect_renames: true,
git_timeout: Duration::from_secs(10),
retry_count: 3,
retry_delay: Duration::from_secs(1),
channel_capacity: 1000,
}
}
}
#[derive(Debug, Error)]
pub enum GitPollerError {
#[error("git command failed: {stderr}")]
GitExecutionError { stderr: String },
#[error("git command timed out after {timeout:?}")]
GitTimeout { timeout: Duration },
#[error("not a git repository: {path}")]
NotGitRepository { path: PathBuf },
#[error("git operation in progress (index.lock exists)")]
GitOperationInProgress,
#[error("failed to parse git status: {reason}")]
ParseError { line: String, reason: String },
#[error("event channel closed")]
ChannelClosed,
#[error("io error: {0}")]
IoError(#[from] std::io::Error),
}
impl From<GitStateError> for GitPollerError {
fn from(err: GitStateError) -> Self {
match err {
GitStateError::InvalidPath { path, reason } => GitPollerError::ParseError {
line: path.display().to_string(),
reason,
},
GitStateError::ParseError { line, reason } => {
GitPollerError::ParseError { line, reason }
}
}
}
}
pub struct GitPoller {
root: PathBuf,
config: GitPollerConfig,
previous_state: GitState,
previous_head: Option<String>,
event_tx: mpsc::Sender<FileEvent>,
shutdown_rx: watch::Receiver<bool>,
}
impl GitPoller {
pub fn new(
root: PathBuf,
config: GitPollerConfig,
) -> Result<(Self, mpsc::Receiver<FileEvent>, watch::Sender<bool>), GitPollerError> {
let output = std::process::Command::new("git")
.args(["rev-parse", "--git-dir"])
.current_dir(&root)
.output()
.map_err(|e| GitPollerError::GitExecutionError {
stderr: e.to_string(),
})?;
if !output.status.success() {
return Err(GitPollerError::NotGitRepository { path: root });
}
let (event_tx, event_rx) = mpsc::channel(config.channel_capacity);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let poller = Self {
root,
config,
previous_state: GitState::default(),
previous_head: None,
event_tx,
shutdown_rx,
};
Ok((poller, event_rx, shutdown_tx))
}
pub async fn run(&mut self) -> Result<(), GitPollerError> {
let mut interval = tokio::time::interval(self.config.poll_interval);
loop {
tokio::select! {
_ = interval.tick() => {
match self.poll_with_retry().await {
Ok(events) => {
for event in events {
let absolute_event = self.make_event_absolute(event);
if self.event_tx.send(absolute_event).await.is_err() {
return Err(GitPollerError::ChannelClosed);
}
}
}
Err(GitPollerError::GitOperationInProgress) => {
debug!("git operation in progress, skipping poll cycle");
}
Err(e) => {
warn!("git poll error: {}", e);
}
}
}
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
info!("git poller shutting down");
return Ok(());
}
}
}
}
}
pub async fn poll_once(&mut self) -> Result<Vec<FileEvent>, GitPollerError> {
let events = self.poll_once_inner().await?;
debug!("poll_once: {} events detected", events.len());
Ok(events)
}
async fn poll_with_retry(&mut self) -> Result<Vec<FileEvent>, GitPollerError> {
let mut last_error = None;
for attempt in 0..self.config.retry_count {
match self.poll_once_inner().await {
Ok(events) => return Ok(events),
Err(GitPollerError::GitOperationInProgress) => {
return Err(GitPollerError::GitOperationInProgress);
}
Err(e) => {
debug!("poll attempt {} failed: {}", attempt + 1, e);
last_error = Some(e);
if attempt < self.config.retry_count - 1 {
tokio::time::sleep(self.config.retry_delay).await;
}
}
}
}
Err(last_error.unwrap_or(GitPollerError::GitExecutionError {
stderr: "unknown error after retries".to_string(),
}))
}
async fn poll_once_inner(&mut self) -> Result<Vec<FileEvent>, GitPollerError> {
let mut events = Vec::new();
if let Ok(current_head) = self.get_head().await {
if let Some(ref prev_head) = self.previous_head {
if ¤t_head != prev_head {
debug!(
"HEAD changed from {} to {}",
&prev_head[..8.min(prev_head.len())],
¤t_head[..8.min(current_head.len())]
);
match self.get_commit_changes(prev_head, ¤t_head).await {
Ok(changed_files) => {
for file in changed_files {
events.push(FileEvent::Modified(file));
}
}
Err(e) => {
warn!("failed to get commit changes: {}", e);
}
}
}
}
self.previous_head = Some(current_head);
}
let output = self.run_git_status().await?;
let new_state = GitState::from_git_status(&output)?;
let status_events = self.previous_state.diff(&new_state);
self.previous_state = new_state;
events.extend(status_events);
Ok(events)
}
async fn get_head(&self) -> Result<String, GitPollerError> {
let output = tokio::time::timeout(
self.config.git_timeout,
Command::new("git")
.args(["rev-parse", "HEAD"])
.current_dir(&self.root)
.output(),
)
.await
.map_err(|_| GitPollerError::GitTimeout {
timeout: self.config.git_timeout,
})?
.map_err(|e| GitPollerError::GitExecutionError {
stderr: e.to_string(),
})?;
if !output.status.success() {
return Err(GitPollerError::GitExecutionError {
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
});
}
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
async fn get_commit_changes(
&self,
old: &str,
new: &str,
) -> Result<Vec<PathBuf>, GitPollerError> {
let output = tokio::time::timeout(
self.config.git_timeout,
Command::new("git")
.args(["diff", "--name-only", &format!("{}..{}", old, new)])
.current_dir(&self.root)
.output(),
)
.await
.map_err(|_| GitPollerError::GitTimeout {
timeout: self.config.git_timeout,
})?
.map_err(|e| GitPollerError::GitExecutionError {
stderr: e.to_string(),
})?;
if !output.status.success() {
debug!(
"git diff failed (old commit may not exist): {}",
String::from_utf8_lossy(&output.stderr)
);
return Ok(vec![]);
}
let files = String::from_utf8_lossy(&output.stdout)
.lines()
.filter(|line| !line.is_empty())
.map(|line| PathBuf::from(line.trim()))
.collect();
Ok(files)
}
async fn run_git_status(&self) -> Result<String, GitPollerError> {
let mut cmd = Command::new("git");
cmd.args(["status", "--porcelain"]);
if self.config.detect_renames {
cmd.arg("-M");
}
if self.config.include_untracked {
cmd.arg("-u");
}
cmd.current_dir(&self.root);
let output = tokio::time::timeout(self.config.git_timeout, cmd.output())
.await
.map_err(|_| GitPollerError::GitTimeout {
timeout: self.config.git_timeout,
})?
.map_err(|e| GitPollerError::GitExecutionError {
stderr: e.to_string(),
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if stderr.contains("not a git repository") {
return Err(GitPollerError::NotGitRepository {
path: self.root.clone(),
});
}
if stderr.contains(".git/index.lock") || stderr.contains("index.lock") {
return Err(GitPollerError::GitOperationInProgress);
}
return Err(GitPollerError::GitExecutionError { stderr });
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
pub fn root(&self) -> &PathBuf {
&self.root
}
pub fn config(&self) -> &GitPollerConfig {
&self.config
}
pub fn stats(&self) -> GitPollerStats {
GitPollerStats {
tracked_files: self.previous_state.len(),
}
}
fn make_event_absolute(&self, event: FileEvent) -> FileEvent {
match event {
FileEvent::Modified(path) => FileEvent::Modified(self.root.join(path)),
FileEvent::Deleted(path) => FileEvent::Deleted(self.root.join(path)),
FileEvent::Renamed(old, new) => {
FileEvent::Renamed(self.root.join(old), self.root.join(new))
}
}
}
}
#[derive(Debug, Clone)]
pub struct GitPollerStats {
pub tracked_files: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_temp_git_repo() -> TempDir {
let dir = TempDir::new().unwrap();
std::process::Command::new("git")
.args(["init"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(dir.path())
.output()
.unwrap();
dir
}
#[test]
fn test_config_default() {
let config = GitPollerConfig::default();
assert_eq!(config.poll_interval, Duration::from_secs(3));
assert!(config.include_untracked);
assert!(config.detect_renames);
assert_eq!(config.git_timeout, Duration::from_secs(10));
assert_eq!(config.retry_count, 3);
assert_eq!(config.retry_delay, Duration::from_secs(1));
assert_eq!(config.channel_capacity, 1000);
}
#[test]
fn test_new_valid_repo() {
let dir = create_temp_git_repo();
let config = GitPollerConfig::default();
let result = GitPoller::new(dir.path().to_path_buf(), config);
assert!(result.is_ok());
}
#[test]
fn test_new_invalid_repo() {
let dir = TempDir::new().unwrap();
let config = GitPollerConfig::default();
let result = GitPoller::new(dir.path().to_path_buf(), config);
assert!(matches!(
result,
Err(GitPollerError::NotGitRepository { .. })
));
}
#[tokio::test]
async fn test_poll_once_empty_repo() {
let dir = create_temp_git_repo();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let events = poller.poll_once().await.unwrap();
assert!(events.is_empty());
}
#[tokio::test]
async fn test_poll_once_detects_new_file() {
let dir = create_temp_git_repo();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let _ = poller.poll_once().await.unwrap();
std::fs::write(dir.path().join("test.txt"), "hello").unwrap();
let events = poller.poll_once().await.unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
FileEvent::Modified(path) => {
assert_eq!(path.file_name().unwrap(), "test.txt");
}
_ => panic!("Expected Modified event"),
}
}
#[tokio::test]
async fn test_poll_once_detects_modification() {
let dir = create_temp_git_repo();
let file_path = dir.path().join("test.txt");
std::fs::write(&file_path, "initial").unwrap();
std::process::Command::new("git")
.args(["add", "test.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(dir.path())
.output()
.unwrap();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let _ = poller.poll_once().await.unwrap();
std::fs::write(&file_path, "modified").unwrap();
let events = poller.poll_once().await.unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
FileEvent::Modified(path) => {
assert_eq!(path.file_name().unwrap(), "test.txt");
}
_ => panic!("Expected Modified event"),
}
}
#[tokio::test]
async fn test_poll_once_detects_deletion() {
let dir = create_temp_git_repo();
let file_path = dir.path().join("test.txt");
std::fs::write(&file_path, "content").unwrap();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let _ = poller.poll_once().await.unwrap();
std::fs::remove_file(&file_path).unwrap();
let events = poller.poll_once().await.unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
FileEvent::Deleted(path) => {
assert_eq!(path.file_name().unwrap(), "test.txt");
}
_ => panic!("Expected Deleted event"),
}
}
#[tokio::test]
async fn test_shutdown_signal() {
let dir = create_temp_git_repo();
let config = GitPollerConfig {
poll_interval: Duration::from_millis(50),
..Default::default()
};
let (mut poller, _rx, shutdown_tx) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let handle = tokio::spawn(async move { poller.run().await });
tokio::time::sleep(Duration::from_millis(100)).await;
shutdown_tx.send(true).unwrap();
let result = tokio::time::timeout(Duration::from_secs(1), handle)
.await
.expect("poller should stop within timeout");
assert!(result.is_ok());
assert!(result.unwrap().is_ok());
}
#[test]
fn test_stats() {
let dir = create_temp_git_repo();
let config = GitPollerConfig::default();
let (poller, _rx, _shutdown) = GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let stats = poller.stats();
assert_eq!(stats.tracked_files, 0);
}
#[test]
fn test_error_conversion_from_git_state_error() {
let git_state_err = GitStateError::InvalidPath {
path: PathBuf::from("/test"),
reason: "test reason".to_string(),
};
let poller_err: GitPollerError = git_state_err.into();
assert!(matches!(poller_err, GitPollerError::ParseError { .. }));
}
#[tokio::test]
async fn test_detects_file_created_and_committed_within_poll_interval() {
let dir = create_temp_git_repo();
let initial_file = dir.path().join("initial.txt");
std::fs::write(&initial_file, "initial").unwrap();
std::process::Command::new("git")
.args(["add", "initial.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "initial commit"])
.current_dir(dir.path())
.output()
.unwrap();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let _ = poller.poll_once().await.unwrap();
let new_file = dir.path().join("new_file.txt");
std::fs::write(&new_file, "new content").unwrap();
std::process::Command::new("git")
.args(["add", "new_file.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "add new file"])
.current_dir(dir.path())
.output()
.unwrap();
let events = poller.poll_once().await.unwrap();
assert!(
!events.is_empty(),
"Expected events for committed file, got none"
);
let has_new_file_event = events.iter().any(|e| match e {
FileEvent::Modified(path) => path.file_name().unwrap() == "new_file.txt",
_ => false,
});
assert!(
has_new_file_event,
"Expected Modified event for new_file.txt, got: {:?}",
events
);
}
#[tokio::test]
async fn test_detects_file_modified_and_committed_within_poll_interval() {
let dir = create_temp_git_repo();
let file_path = dir.path().join("test.txt");
std::fs::write(&file_path, "initial").unwrap();
std::process::Command::new("git")
.args(["add", "test.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "initial commit"])
.current_dir(dir.path())
.output()
.unwrap();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let _ = poller.poll_once().await.unwrap();
std::fs::write(&file_path, "modified content").unwrap();
std::process::Command::new("git")
.args(["add", "test.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "modify file"])
.current_dir(dir.path())
.output()
.unwrap();
let events = poller.poll_once().await.unwrap();
let has_test_file_event = events.iter().any(|e| match e {
FileEvent::Modified(path) => path.file_name().unwrap() == "test.txt",
_ => false,
});
assert!(
has_test_file_event,
"Expected Modified event for test.txt, got: {:?}",
events
);
}
#[tokio::test]
async fn test_multiple_commits_within_poll_interval() {
let dir = create_temp_git_repo();
let initial = dir.path().join("initial.txt");
std::fs::write(&initial, "initial").unwrap();
std::process::Command::new("git")
.args(["add", "initial.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(dir.path())
.output()
.unwrap();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let _ = poller.poll_once().await.unwrap();
std::fs::write(dir.path().join("file1.txt"), "content1").unwrap();
std::process::Command::new("git")
.args(["add", "file1.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "add file1"])
.current_dir(dir.path())
.output()
.unwrap();
std::fs::write(dir.path().join("file2.txt"), "content2").unwrap();
std::process::Command::new("git")
.args(["add", "file2.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "add file2"])
.current_dir(dir.path())
.output()
.unwrap();
let events = poller.poll_once().await.unwrap();
let file_names: Vec<String> = events
.iter()
.filter_map(|e| match e {
FileEvent::Modified(path) => {
path.file_name().map(|n| n.to_string_lossy().to_string())
}
_ => None,
})
.collect();
assert!(
file_names.contains(&"file1.txt".to_string()),
"Expected file1.txt in events: {:?}",
file_names
);
assert!(
file_names.contains(&"file2.txt".to_string()),
"Expected file2.txt in events: {:?}",
file_names
);
}
#[tokio::test]
async fn test_empty_repo_no_head_tracking() {
let dir = create_temp_git_repo();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let events = poller.poll_once().await.unwrap();
assert!(events.is_empty());
let events = poller.poll_once().await.unwrap();
assert!(events.is_empty());
}
#[tokio::test]
async fn test_first_poll_records_head_no_diff() {
let dir = create_temp_git_repo();
std::fs::write(dir.path().join("initial.txt"), "content").unwrap();
std::process::Command::new("git")
.args(["add", "initial.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(dir.path())
.output()
.unwrap();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let events = poller.poll_once().await.unwrap();
assert!(
events.is_empty(),
"Expected no events on first poll, got: {:?}",
events
);
assert!(poller.previous_head.is_some());
}
#[tokio::test]
async fn test_head_unchanged_no_extra_events() {
let dir = create_temp_git_repo();
std::fs::write(dir.path().join("initial.txt"), "content").unwrap();
std::process::Command::new("git")
.args(["add", "initial.txt"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(dir.path())
.output()
.unwrap();
let config = GitPollerConfig::default();
let (mut poller, _rx, _shutdown) =
GitPoller::new(dir.path().to_path_buf(), config).unwrap();
let _ = poller.poll_once().await.unwrap();
std::fs::write(dir.path().join("untracked.txt"), "untracked").unwrap();
let events = poller.poll_once().await.unwrap();
assert_eq!(events.len(), 1, "Expected 1 event, got: {:?}", events);
match &events[0] {
FileEvent::Modified(path) => {
assert_eq!(path.file_name().unwrap(), "untracked.txt");
}
_ => panic!("Expected Modified event for untracked.txt"),
}
}
}