use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use chrono::Utc;
use super::import::{ImportPipeline, MarkdownImporter, JsonImporter};
use super::storage::KnowledgeStore;
use super::types::*;
#[derive(Clone, Debug)]
pub struct WatcherConfig {
pub inbox_dir: PathBuf,
pub processed_subdir: String,
pub error_subdir: String,
pub import_config: ImportConfig,
pub max_file_size: u64,
}
impl WatcherConfig {
pub fn new(inbox_dir: impl Into<PathBuf>) -> Self {
Self {
inbox_dir: inbox_dir.into(),
processed_subdir: "processed".to_owned(),
error_subdir: "error".to_owned(),
import_config: ImportConfig {
default_policy: ImportPolicy::Merge,
split_strategy: SplitStrategy::ByHeading,
duplicate_strategy: DuplicateStrategy::Skip,
max_document_size_bytes: 10 * 1024 * 1024, },
max_file_size: 50 * 1024 * 1024, }
}
}
#[derive(Clone, Debug)]
pub enum FileOutcome {
Imported {
report: ImportReport,
},
Skipped {
reason: String,
},
Failed {
error: String,
},
NeedsStt {
audio_path: PathBuf,
},
}
#[derive(Clone, Debug)]
pub struct WatchResult {
pub file_name: String,
pub original_path: PathBuf,
pub moved_to: Option<PathBuf>,
pub outcome: FileOutcome,
}
pub struct DirectoryWatcher {
config: WatcherConfig,
seen: HashMap<PathBuf, SystemTime>,
}
impl DirectoryWatcher {
pub fn new(config: WatcherConfig) -> Result<Self, KcError> {
fs::create_dir_all(&config.inbox_dir).map_err(|e| {
KcError::ImportError(format!(
"Failed to create inbox directory '{}': {}",
config.inbox_dir.display(),
e
))
})?;
let processed_dir = config.inbox_dir.join(&config.processed_subdir);
fs::create_dir_all(&processed_dir).map_err(|e| {
KcError::ImportError(format!(
"Failed to create processed directory '{}': {}",
processed_dir.display(),
e
))
})?;
let error_dir = config.inbox_dir.join(&config.error_subdir);
fs::create_dir_all(&error_dir).map_err(|e| {
KcError::ImportError(format!(
"Failed to create error directory '{}': {}",
error_dir.display(),
e
))
})?;
Ok(Self {
config,
seen: HashMap::new(),
})
}
pub fn poll<S: KnowledgeStore>(&mut self, store: &S) -> Result<Vec<WatchResult>, KcError> {
let entries = fs::read_dir(&self.config.inbox_dir).map_err(|e| {
KcError::ImportError(format!(
"Failed to read inbox directory '{}': {}",
self.config.inbox_dir.display(),
e
))
})?;
let mut results = Vec::new();
for entry in entries {
let entry = match entry {
Ok(e) => e,
Err(e) => {
log::warn!("Failed to read directory entry: {}", e);
continue;
}
};
let path = entry.path();
if path.is_dir() {
continue;
}
if path
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with('.'))
.unwrap_or(true)
{
continue;
}
let modified = match entry.metadata().and_then(|m| m.modified()) {
Ok(t) => t,
Err(_) => continue,
};
if let Some(prev) = self.seen.get(&path) {
if *prev >= modified {
continue; }
}
let result = self.process_file(&path, store);
self.seen.insert(path.clone(), modified);
results.push(result);
}
self.seen.retain(|p, _| p.exists());
Ok(results)
}
fn process_file<S: KnowledgeStore>(&self, path: &Path, store: &S) -> WatchResult {
let file_name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_owned();
let file_size = match fs::metadata(path) {
Ok(m) => m.len(),
Err(e) => {
return WatchResult {
file_name,
original_path: path.to_owned(),
moved_to: None,
outcome: FileOutcome::Failed {
error: format!("Cannot read file metadata: {}", e),
},
};
}
};
if file_size > self.config.max_file_size {
return WatchResult {
file_name,
original_path: path.to_owned(),
moved_to: None,
outcome: FileOutcome::Skipped {
reason: format!(
"File too large ({} bytes, max {})",
file_size, self.config.max_file_size
),
},
};
}
let ext = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("")
.to_lowercase();
let outcome = match ext.as_str() {
"md" | "txt" => self.import_text_file(path, store),
"json" => self.import_json_file(path, store),
"ogg" | "wav" | "mp3" | "m4a" | "flac" | "webm" => {
Ok(FileOutcome::NeedsStt {
audio_path: path.to_owned(),
})
}
_ => Ok(FileOutcome::Skipped {
reason: format!("Unsupported file extension: .{}", ext),
}),
};
let (outcome, moved_to) = match outcome {
Ok(outcome) => {
let dest = match &outcome {
FileOutcome::Imported { .. } => {
self.move_file(path, &self.config.processed_subdir)
}
FileOutcome::Failed { .. } => {
self.move_file(path, &self.config.error_subdir)
}
FileOutcome::NeedsStt { .. } => {
Ok(None)
}
FileOutcome::Skipped { .. } => {
Ok(None)
}
};
(outcome, dest.unwrap_or(None))
}
Err(e) => {
let dest = self.move_file(path, &self.config.error_subdir).unwrap_or(None);
(
FileOutcome::Failed {
error: format!("{}", e),
},
dest,
)
}
};
WatchResult {
file_name,
original_path: path.to_owned(),
moved_to,
outcome,
}
}
fn import_text_file<S: KnowledgeStore>(
&self,
path: &Path,
store: &S,
) -> Result<FileOutcome, KcError> {
let importer = MarkdownImporter {
split: self.config.import_config.split_strategy.clone(),
};
let report = ImportPipeline::run(store, &importer, path, &self.config.import_config)?;
Ok(FileOutcome::Imported { report })
}
fn import_json_file<S: KnowledgeStore>(
&self,
path: &Path,
store: &S,
) -> Result<FileOutcome, KcError> {
let importer = JsonImporter;
let report = ImportPipeline::run(store, &importer, path, &self.config.import_config)?;
Ok(FileOutcome::Imported { report })
}
fn move_file(&self, path: &Path, subdir: &str) -> Result<Option<PathBuf>, KcError> {
let dest_dir = self.config.inbox_dir.join(subdir);
let file_name = path
.file_name()
.ok_or_else(|| KcError::ImportError("File has no name".to_owned()))?;
let mut dest = dest_dir.join(file_name);
if dest.exists() {
let stem = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("file");
let ext = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("");
let ts = Utc::now().format("%Y%m%d-%H%M%S");
let new_name = if ext.is_empty() {
format!("{}-{}", stem, ts)
} else {
format!("{}-{}.{}", stem, ts, ext)
};
dest = dest_dir.join(new_name);
}
if let Err(e) = fs::rename(path, &dest) {
if e.raw_os_error() == Some(18) {
fs::copy(path, &dest)
.and_then(|_| fs::remove_file(path))
.map_err(|copy_err| {
KcError::ImportError(format!(
"Failed to move file '{}' → '{}': {} (and copy fallback: {})",
path.display(),
dest.display(),
e,
copy_err
))
})?;
} else {
return Err(KcError::ImportError(format!(
"Failed to move file '{}' → '{}': {}",
path.display(),
dest.display(),
e
)));
}
}
Ok(Some(dest))
}
pub fn import_transcription<S: KnowledgeStore>(
&self,
audio_path: &Path,
transcript: &str,
store: &S,
) -> Result<WatchResult, KcError> {
let file_name = audio_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("audio")
.to_owned();
let content = format!(
"# Voice Note: {}\n\nTranscribed: {}\n\n{}",
file_name,
Utc::now().format("%Y-%m-%d %H:%M"),
transcript
);
let tmp_path = self.config.inbox_dir.join(format!(
".tmp-transcript-{}.md",
Utc::now().timestamp_millis()
));
fs::write(&tmp_path, &content).map_err(|e| {
KcError::ImportError(format!("Failed to write transcript: {}", e))
})?;
let importer = MarkdownImporter {
split: SplitStrategy::Smart,
};
let result = ImportPipeline::run(store, &importer, &tmp_path, &self.config.import_config);
let _ = fs::remove_file(&tmp_path);
match result {
Ok(report) => {
let moved = self.move_file(audio_path, &self.config.processed_subdir)?;
Ok(WatchResult {
file_name,
original_path: audio_path.to_owned(),
moved_to: moved,
outcome: FileOutcome::Imported { report },
})
}
Err(e) => {
let moved = self.move_file(audio_path, &self.config.error_subdir)?;
Ok(WatchResult {
file_name,
original_path: audio_path.to_owned(),
moved_to: moved,
outcome: FileOutcome::Failed {
error: format!("{}", e),
},
})
}
}
}
pub fn inbox_dir(&self) -> &Path {
&self.config.inbox_dir
}
pub fn processed_dir(&self) -> PathBuf {
self.config.inbox_dir.join(&self.config.processed_subdir)
}
pub fn error_dir(&self) -> PathBuf {
self.config.inbox_dir.join(&self.config.error_subdir)
}
pub fn tracked_count(&self) -> usize {
self.seen.len()
}
pub fn reset(&mut self) {
self.seen.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::storage::SqliteKnowledgeStore;
use tempfile::TempDir;
fn make_store() -> SqliteKnowledgeStore {
let store = SqliteKnowledgeStore::in_memory().unwrap();
store.init_schema().unwrap();
store
}
fn make_watcher(dir: &Path) -> DirectoryWatcher {
let config = WatcherConfig::new(dir);
DirectoryWatcher::new(config).unwrap()
}
#[test]
fn test_new_creates_directories() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
assert!(!inbox.exists());
let _watcher = make_watcher(&inbox);
assert!(inbox.exists());
assert!(inbox.join("processed").exists());
assert!(inbox.join("error").exists());
}
#[test]
fn test_new_with_existing_directory() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
fs::create_dir_all(&inbox).unwrap();
let watcher = make_watcher(&inbox);
assert_eq!(watcher.tracked_count(), 0);
}
#[test]
fn test_poll_empty_inbox() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let results = watcher.poll(&store).unwrap();
assert!(results.is_empty());
}
#[test]
fn test_poll_imports_markdown_file() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let md_path = inbox.join("note.md");
fs::write(&md_path, "# My Note\n\nThis is a test note about Rust.\n").unwrap();
let results = watcher.poll(&store).unwrap();
assert_eq!(results.len(), 1);
let r = &results[0];
assert_eq!(r.file_name, "note.md");
assert!(matches!(r.outcome, FileOutcome::Imported { .. }));
assert!(r.moved_to.is_some());
assert!(!md_path.exists(), "original file should be moved");
assert!(inbox.join("processed/note.md").exists());
}
#[test]
fn test_poll_imports_txt_file() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let txt_path = inbox.join("thought.txt");
fs::write(&txt_path, "Plain text thought about AI agents.\n").unwrap();
let results = watcher.poll(&store).unwrap();
assert_eq!(results.len(), 1);
assert!(matches!(results[0].outcome, FileOutcome::Imported { .. }));
assert!(!txt_path.exists());
}
#[test]
fn test_poll_imports_json_file() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let json_path = inbox.join("data.json");
fs::write(
&json_path,
r#"[{"content": "JSON memory entry", "source": "test", "content_hash": "h1", "metadata": {}}]"#,
)
.unwrap();
let results = watcher.poll(&store).unwrap();
assert_eq!(results.len(), 1);
let _outcome = &results[0].outcome;
}
#[test]
fn test_poll_skips_unsupported_extension() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let bmp_path = inbox.join("image.bmp");
fs::write(&bmp_path, b"fake image data").unwrap();
let results = watcher.poll(&store).unwrap();
assert_eq!(results.len(), 1);
assert!(matches!(results[0].outcome, FileOutcome::Skipped { .. }));
assert!(bmp_path.exists());
}
#[test]
fn test_poll_skips_dotfiles() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
fs::write(inbox.join(".hidden"), "hidden file").unwrap();
fs::write(inbox.join(".DS_Store"), "mac garbage").unwrap();
let results = watcher.poll(&store).unwrap();
assert!(results.is_empty(), "dotfiles should be ignored");
}
#[test]
fn test_poll_skips_oversized_file() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut config = WatcherConfig::new(&inbox);
config.max_file_size = 100; let mut watcher = DirectoryWatcher::new(config).unwrap();
let big_path = inbox.join("big.md");
fs::write(&big_path, "x".repeat(200)).unwrap();
let results = watcher.poll(&store).unwrap();
assert_eq!(results.len(), 1);
assert!(matches!(results[0].outcome, FileOutcome::Skipped { .. }));
}
#[test]
fn test_poll_audio_needs_stt() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
for ext in &["ogg", "wav", "mp3", "m4a", "flac", "webm"] {
let path = inbox.join(format!("voice.{}", ext));
fs::write(&path, b"fake audio data").unwrap();
}
let results = watcher.poll(&store).unwrap();
assert_eq!(results.len(), 6);
for r in &results {
assert!(
matches!(r.outcome, FileOutcome::NeedsStt { .. }),
"Audio file {} should need STT, got {:?}",
r.file_name,
r.outcome
);
}
for ext in &["ogg", "wav", "mp3", "m4a", "flac", "webm"] {
assert!(inbox.join(format!("voice.{}", ext)).exists());
}
}
#[test]
fn test_poll_does_not_reprocess() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let path = inbox.join("image.png");
fs::write(&path, b"fake image").unwrap();
let r1 = watcher.poll(&store).unwrap();
assert_eq!(r1.len(), 1);
let r2 = watcher.poll(&store).unwrap();
assert!(r2.is_empty(), "Second poll should skip already-seen files");
}
#[test]
fn test_poll_reprocesses_modified_file() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let path = inbox.join("image.png");
fs::write(&path, b"v1").unwrap();
let r1 = watcher.poll(&store).unwrap();
assert_eq!(r1.len(), 1);
std::thread::sleep(std::time::Duration::from_millis(50));
fs::write(&path, b"v2 modified").unwrap();
let r2 = watcher.poll(&store).unwrap();
assert_eq!(r2.len(), 1, "Modified file should be re-processed");
}
#[test]
fn test_import_transcription() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let watcher = make_watcher(&inbox);
let audio_path = inbox.join("recording.ogg");
fs::write(&audio_path, b"fake audio").unwrap();
let transcript = "This is the transcribed text from a voice recording about Rust programming.";
let result = watcher
.import_transcription(&audio_path, transcript, &store)
.unwrap();
assert_eq!(result.file_name, "recording.ogg");
assert!(matches!(result.outcome, FileOutcome::Imported { .. }));
assert!(!audio_path.exists(), "audio file should be moved");
assert!(inbox.join("processed/recording.ogg").exists());
}
#[test]
fn test_reset_clears_tracking() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let path = inbox.join("skip.png");
fs::write(&path, b"data").unwrap();
let _ = watcher.poll(&store).unwrap();
assert_eq!(watcher.tracked_count(), 1);
watcher.reset();
assert_eq!(watcher.tracked_count(), 0);
let r = watcher.poll(&store).unwrap();
assert_eq!(r.len(), 1);
}
#[test]
fn test_poll_multiple_files() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
fs::write(inbox.join("a.md"), "# Note A\n\nContent A\n").unwrap();
fs::write(inbox.join("b.txt"), "Note B content\n").unwrap();
fs::write(inbox.join("c.png"), b"image data").unwrap();
fs::write(inbox.join("d.ogg"), b"audio data").unwrap();
let results = watcher.poll(&store).unwrap();
assert_eq!(results.len(), 4);
let outcomes: HashMap<String, &FileOutcome> = results
.iter()
.map(|r| (r.file_name.clone(), &r.outcome))
.collect();
assert!(matches!(outcomes["a.md"], FileOutcome::Imported { .. }));
assert!(matches!(outcomes["b.txt"], FileOutcome::Imported { .. }));
assert!(matches!(outcomes["c.png"], FileOutcome::Skipped { .. }));
assert!(matches!(outcomes["d.ogg"], FileOutcome::NeedsStt { .. }));
}
#[test]
fn test_move_file_handles_collision() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
fs::write(inbox.join("processed/note.md"), "old processed").unwrap();
fs::write(inbox.join("note.md"), "# New Note\n\nNew content\n").unwrap();
let results = watcher.poll(&store).unwrap();
assert_eq!(results.len(), 1);
assert!(matches!(results[0].outcome, FileOutcome::Imported { .. }));
let moved = results[0].moved_to.as_ref().unwrap();
assert!(moved.exists());
assert_ne!(
moved.file_name().unwrap().to_str().unwrap(),
"note.md",
"Should have timestamp suffix to avoid collision"
);
}
#[test]
fn test_poll_ignores_subdirectories() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let store = make_store();
let mut watcher = make_watcher(&inbox);
let subdir = inbox.join("nested");
fs::create_dir_all(&subdir).unwrap();
fs::write(subdir.join("nested.md"), "# Nested\n\nShould be ignored\n").unwrap();
let results = watcher.poll(&store).unwrap();
assert!(results.is_empty(), "Files in subdirectories should be ignored");
}
#[test]
fn test_directory_accessors() {
let tmp = TempDir::new().unwrap();
let inbox = tmp.path().join("inbox");
let watcher = make_watcher(&inbox);
assert_eq!(watcher.inbox_dir(), inbox);
assert_eq!(watcher.processed_dir(), inbox.join("processed"));
assert_eq!(watcher.error_dir(), inbox.join("error"));
}
}