use super::events::FileEvent;
use super::git_poller::{GitPoller, GitPollerConfig, GitPollerError};
use std::path::PathBuf;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use tracing::{debug, info};
#[derive(Debug, Error)]
pub enum WatcherError {
#[error("not a git repository: {0}")]
NotGitRepository(PathBuf),
#[error("git poller error: {0}")]
GitPollerError(#[from] GitPollerError),
#[error("watcher task failed: {0}")]
TaskFailed(String),
#[error("watcher is already running")]
AlreadyRunning,
}
#[derive(Debug, Clone)]
pub struct WatcherConfig {
pub debounce_ms: u64,
pub channel_capacity: usize,
pub poll_interval_ms: u64,
pub include_untracked: bool,
pub detect_renames: bool,
pub git_timeout_ms: u64,
}
impl Default for WatcherConfig {
fn default() -> Self {
Self {
debounce_ms: 500, channel_capacity: 1000,
poll_interval_ms: 3000,
include_untracked: true,
detect_renames: true,
git_timeout_ms: 10000,
}
}
}
impl From<&WatcherConfig> for GitPollerConfig {
fn from(config: &WatcherConfig) -> Self {
Self {
poll_interval: Duration::from_millis(config.poll_interval_ms),
include_untracked: config.include_untracked,
detect_renames: config.detect_renames,
git_timeout: Duration::from_millis(config.git_timeout_ms),
channel_capacity: config.channel_capacity,
..Default::default()
}
}
}
pub struct FileWatcher {
root: PathBuf,
config: WatcherConfig,
shutdown_tx: Option<watch::Sender<bool>>,
poller_handle: Option<JoinHandle<Result<(), GitPollerError>>>,
}
impl FileWatcher {
pub fn new(
path: PathBuf,
config: WatcherConfig,
) -> Result<(Self, mpsc::Receiver<FileEvent>), WatcherError> {
let poller_config = GitPollerConfig::from(&config);
let (poller, event_rx, shutdown_tx) =
GitPoller::new(path.clone(), poller_config).map_err(|e| match e {
GitPollerError::NotGitRepository { path } => WatcherError::NotGitRepository(path),
other => WatcherError::GitPollerError(other),
})?;
let handle = tokio::spawn(async move {
let mut poller = poller;
poller.run().await
});
let watcher = Self {
root: path,
config,
shutdown_tx: Some(shutdown_tx),
poller_handle: Some(handle),
};
info!(
"Created git polling watcher for: {}",
watcher.root.display()
);
Ok((watcher, event_rx))
}
pub fn watch(&mut self, path: &std::path::Path) -> Result<(), WatcherError> {
debug!(
"watch() called for {} (polling already active for {})",
path.display(),
self.root.display()
);
if path != self.root {
debug!(
"Note: watch path {} differs from root {}",
path.display(),
self.root.display()
);
}
Ok(())
}
pub fn stop(&mut self) -> Result<(), WatcherError> {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
debug!("Sending shutdown signal to git poller");
let _ = shutdown_tx.send(true);
}
Ok(())
}
pub async fn stop_and_wait(&mut self) -> Result<(), WatcherError> {
self.stop()?;
if let Some(handle) = self.poller_handle.take() {
match handle.await {
Ok(Ok(())) => {
debug!("Git poller stopped successfully");
}
Ok(Err(e)) => {
debug!("Git poller stopped with error: {}", e);
}
Err(e) => {
return Err(WatcherError::TaskFailed(e.to_string()));
}
}
}
Ok(())
}
pub fn root(&self) -> &PathBuf {
&self.root
}
pub fn config(&self) -> &WatcherConfig {
&self.config
}
}
impl Drop for FileWatcher {
fn drop(&mut self) {
let _ = self.stop();
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_temp_git_repo() -> TempDir {
let dir = TempDir::new().unwrap();
std::process::Command::new("git")
.args(["init"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(dir.path())
.output()
.unwrap();
std::process::Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(dir.path())
.output()
.unwrap();
dir
}
#[test]
fn test_watcher_config_default() {
let config = WatcherConfig::default();
assert_eq!(config.debounce_ms, 500);
assert_eq!(config.channel_capacity, 1000);
assert_eq!(config.poll_interval_ms, 3000);
assert!(config.include_untracked);
assert!(config.detect_renames);
assert_eq!(config.git_timeout_ms, 10000);
}
#[test]
fn test_watcher_config_to_git_poller_config() {
let config = WatcherConfig {
debounce_ms: 100,
channel_capacity: 500,
poll_interval_ms: 5000,
include_untracked: false,
detect_renames: false,
git_timeout_ms: 30000,
};
let poller_config = GitPollerConfig::from(&config);
assert_eq!(poller_config.poll_interval, Duration::from_millis(5000));
assert!(!poller_config.include_untracked);
assert!(!poller_config.detect_renames);
assert_eq!(poller_config.git_timeout, Duration::from_millis(30000));
assert_eq!(poller_config.channel_capacity, 500);
}
#[tokio::test]
async fn test_new_valid_repo() {
let dir = create_temp_git_repo();
let config = WatcherConfig::default();
let result = FileWatcher::new(dir.path().to_path_buf(), config);
assert!(result.is_ok());
let (mut watcher, _rx) = result.unwrap();
watcher.stop_and_wait().await.unwrap();
}
#[tokio::test]
async fn test_new_invalid_repo() {
let dir = TempDir::new().unwrap();
let config = WatcherConfig::default();
let result = FileWatcher::new(dir.path().to_path_buf(), config);
assert!(matches!(result, Err(WatcherError::NotGitRepository(_))));
}
#[tokio::test]
async fn test_watch_is_noop() {
let dir = create_temp_git_repo();
let config = WatcherConfig::default();
let (mut watcher, _rx) = FileWatcher::new(dir.path().to_path_buf(), config).unwrap();
let result = watcher.watch(dir.path());
assert!(result.is_ok());
watcher.stop_and_wait().await.unwrap();
}
#[tokio::test]
async fn test_detects_new_file() {
let dir = create_temp_git_repo();
let config = WatcherConfig {
poll_interval_ms: 100, ..Default::default()
};
let (mut watcher, mut rx) = FileWatcher::new(dir.path().to_path_buf(), config).unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
std::fs::write(dir.path().join("test.txt"), "hello").unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let event = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
if let Ok(Some(FileEvent::Modified(path))) = event {
assert_eq!(path.file_name().unwrap(), "test.txt");
}
watcher.stop_and_wait().await.unwrap();
}
}