use std::fs;
use std::time::UNIX_EPOCH;
use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use walkdir::WalkDir;
use crate::backend::{Backend, SourceEvent, WatchHandle};
use crate::error::{AgentdirError, Result};
use crate::types::{EntryType, SourceMetadata, SourcePath};
pub struct LocalBackend;
#[async_trait]
impl Backend for LocalBackend {
async fn scan(&self, root: &SourcePath) -> Result<Vec<(SourcePath, SourceMetadata)>> {
let root_path = root.as_path().to_path_buf();
tokio::task::spawn_blocking(move || {
let mut results = Vec::new();
for entry in WalkDir::new(&root_path)
.follow_links(false) .into_iter()
.filter_map(|e| e.ok())
{
let path = entry.path().to_path_buf();
let metadata = fs::symlink_metadata(&path)?;
let mtime_ns = metadata
.modified()
.ok()
.and_then(|t| t.duration_since(UNIX_EPOCH).ok())
.map(|d| d.as_nanos())
.unwrap_or(0);
if metadata.is_symlink() {
continue;
}
let entry_type = if metadata.is_dir() {
EntryType::Directory
} else {
EntryType::File
};
let source_meta = SourceMetadata {
mtime_ns,
size_bytes: metadata.len(),
entry_type,
};
results.push((SourcePath::new(path), source_meta));
}
Ok(results)
})
.await
.map_err(|e| AgentdirError::Io(std::io::Error::other(e)))?
}
async fn metadata(&self, path: &SourcePath) -> Result<SourceMetadata> {
let path = path.as_path().to_path_buf();
tokio::task::spawn_blocking(move || {
let metadata = fs::symlink_metadata(&path)?;
let mtime_ns = metadata
.modified()
.ok()
.and_then(|t| t.duration_since(UNIX_EPOCH).ok())
.map(|d| d.as_nanos())
.unwrap_or(0);
let entry_type = if metadata.is_symlink() {
EntryType::File
} else if metadata.is_dir() {
EntryType::Directory
} else {
EntryType::File
};
Ok(SourceMetadata {
mtime_ns,
size_bytes: metadata.len(),
entry_type,
})
})
.await
.map_err(|e| AgentdirError::Io(std::io::Error::other(e)))?
}
async fn read_bytes(&self, path: &SourcePath) -> Result<Vec<u8>> {
let path = path.as_path().to_path_buf();
tokio::task::spawn_blocking(move || fs::read(&path).map_err(AgentdirError::Io))
.await
.map_err(|e| AgentdirError::Io(std::io::Error::other(e)))?
}
async fn watch(
&self,
roots: &[SourcePath],
tx: mpsc::Sender<SourceEvent>,
) -> Result<WatchHandle> {
use notify::RecursiveMode;
use notify_debouncer_full::new_debouncer;
use std::time::Duration;
let tx_clone = tx.clone();
let roots_owned: Vec<std::path::PathBuf> =
roots.iter().map(|r| r.as_path().to_path_buf()).collect();
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_child = cancel_token.child_token();
let (ready_tx, ready_rx) = oneshot::channel();
std::thread::spawn(move || {
let (debounce_tx, debounce_rx) = std::sync::mpsc::channel();
let mut debouncer = match new_debouncer(Duration::from_millis(500), None, debounce_tx) {
Ok(d) => d,
Err(e) => {
let error = AgentdirError::Io(std::io::Error::other(format!(
"failed to create watcher debouncer: {e}"
)));
let _ = ready_tx.send(Err(error));
let _ = tx_clone.blocking_send(SourceEvent::RescanNeeded);
tracing::error!("failed to create debouncer: {}", e);
return;
}
};
for root in &roots_owned {
if let Err(e) = debouncer.watch(root, RecursiveMode::Recursive) {
let error = AgentdirError::Io(std::io::Error::other(format!(
"failed to watch {root:?}: {e}"
)));
let _ = ready_tx.send(Err(error));
tracing::warn!("failed to watch {:?}: {}", root, e);
return;
}
}
if ready_tx.send(Ok(())).is_err() {
return;
}
'watch_loop: loop {
if cancel_child.is_cancelled() {
break;
}
match debounce_rx.recv_timeout(Duration::from_millis(100)) {
Ok(Ok(events)) => {
for event in events {
if event.need_rescan() {
if tx_clone.blocking_send(SourceEvent::RescanNeeded).is_err() {
break 'watch_loop;
}
continue;
}
use notify::EventKind;
let source_event = match event.kind {
EventKind::Create(_) => {
event.paths.first().map(|p| SourceEvent::Created {
path: SourcePath::new(p.clone()),
})
}
EventKind::Modify(notify::event::ModifyKind::Data(_)) => {
event.paths.first().map(|p| SourceEvent::Modified {
path: SourcePath::new(p.clone()),
})
}
EventKind::Remove(_) => {
event.paths.first().map(|p| SourceEvent::Deleted {
path: SourcePath::new(p.clone()),
})
}
EventKind::Modify(notify::event::ModifyKind::Name(
notify::event::RenameMode::Both,
)) => {
if event.paths.len() >= 2 {
Some(SourceEvent::Renamed {
from: SourcePath::new(event.paths[0].clone()),
to: SourcePath::new(event.paths[1].clone()),
})
} else {
None
}
}
_ => None,
};
if let Some(ev) = source_event {
if tx_clone.blocking_send(ev).is_err() {
break 'watch_loop;
}
}
}
}
Ok(Err(errors)) => {
for e in errors {
tracing::warn!("watcher error: {}", e);
}
if tx_clone.blocking_send(SourceEvent::RescanNeeded).is_err() {
break;
}
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
}
});
ready_rx
.await
.map_err(|e| AgentdirError::Io(std::io::Error::other(e)))??;
Ok(WatchHandle::new(cancel_token))
}
fn name(&self) -> &str {
"local"
}
fn supports_reflink(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_scan_directory() {
let dir = TempDir::new().unwrap();
std::fs::write(dir.path().join("file1.txt"), b"hello").unwrap();
std::fs::write(dir.path().join("file2.txt"), b"world").unwrap();
std::fs::create_dir(dir.path().join("subdir")).unwrap();
std::fs::write(dir.path().join("subdir/nested.txt"), b"nested").unwrap();
let backend = LocalBackend;
let root = SourcePath::new(dir.path().to_path_buf());
let entries = backend.scan(&root).await.unwrap();
assert!(entries.len() >= 4);
let file1 = entries
.iter()
.find(|(p, _)| p.as_path().ends_with("file1.txt"));
assert!(file1.is_some());
assert_eq!(file1.unwrap().1.size_bytes, 5);
}
#[tokio::test]
async fn test_metadata_returns_correct_size() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.txt");
std::fs::write(&path, b"12345").unwrap();
let backend = LocalBackend;
let source = SourcePath::new(path);
let meta = backend.metadata(&source).await.unwrap();
assert_eq!(meta.size_bytes, 5);
assert!(matches!(meta.entry_type, EntryType::File));
}
#[tokio::test]
async fn test_read_bytes_returns_content() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.txt");
std::fs::write(&path, b"hello agentdir").unwrap();
let backend = LocalBackend;
let source = SourcePath::new(path);
let bytes = backend.read_bytes(&source).await.unwrap();
assert_eq!(bytes, b"hello agentdir");
}
#[tokio::test]
async fn test_scan_empty_directory() {
let dir = TempDir::new().unwrap();
let backend = LocalBackend;
let root = SourcePath::new(dir.path().to_path_buf());
let entries = backend.scan(&root).await.unwrap();
assert!(!entries.is_empty());
}
}