use std::collections::VecDeque;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use anyhow::{Context, Result};
use tantivy::Index;
use tokio::sync::Mutex;
use tokio::time;
use tracing::{debug, error, info};
use crate::indexer::{IndexBuilder, IndexConfig, IndexSchema};
use crate::scanner::ScannedFile;
use crate::watcher::{FileEvent, FileWatcher, WatcherConfig};
#[derive(Debug, Clone)]
pub struct WriteAheadLog {
operations: VecDeque<WalEntry>,
max_entries: usize,
}
#[derive(Debug, Clone)]
pub struct WalEntry {
pub operation: WalOperation,
pub path: PathBuf,
pub timestamp: SystemTime,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WalOperation {
Create,
Modify,
Delete,
}
impl WriteAheadLog {
pub fn new() -> Self {
Self { operations: VecDeque::new(), max_entries: 1000 }
}
pub fn log_create(&mut self, path: &Path) {
self.add_entry(WalOperation::Create, path);
}
pub fn log_modify(&mut self, path: &Path) {
self.add_entry(WalOperation::Modify, path);
}
pub fn log_delete(&mut self, path: &Path) {
self.add_entry(WalOperation::Delete, path);
}
fn add_entry(&mut self, operation: WalOperation, path: &Path) {
let entry = WalEntry { operation, path: path.to_path_buf(), timestamp: SystemTime::now() };
self.operations.push_back(entry);
while self.operations.len() > self.max_entries {
self.operations.pop_front();
}
}
pub fn recent_operations(&self, limit: usize) -> Vec<&WalEntry> {
self.operations.iter().rev().take(limit).collect()
}
pub fn clear(&mut self) {
self.operations.clear();
}
pub fn len(&self) -> usize {
self.operations.len()
}
pub fn is_empty(&self) -> bool {
self.operations.is_empty()
}
}
impl Default for WriteAheadLog {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Default)]
pub struct IncrementalStats {
pub created_count: usize,
pub modified_count: usize,
pub deleted_count: usize,
pub pending_changes: usize,
pub error_count: usize,
pub last_update: Option<SystemTime>,
}
#[derive(Debug, Clone)]
pub struct IncrementalConfig {
pub index_config: IndexConfig,
pub watcher_config: WatcherConfig,
pub commit_threshold: usize,
pub auto_commit_interval: u64,
}
impl Default for IncrementalConfig {
fn default() -> Self {
Self {
index_config: IndexConfig::default(),
watcher_config: WatcherConfig::default(),
commit_threshold: 50,
auto_commit_interval: 30,
}
}
}
pub struct IncrementalIndexer {
index: Index,
schema: IndexSchema,
builder: Arc<Mutex<IndexBuilder>>,
watcher: Arc<Mutex<FileWatcher>>,
wal: Arc<Mutex<WriteAheadLog>>,
stats: Arc<Mutex<IncrementalStats>>,
config: IncrementalConfig,
}
impl IncrementalIndexer {
pub async fn new(config: IncrementalConfig) -> Result<Self> {
info!("Creating incremental indexer");
let builder = IndexBuilder::with_config(config.index_config.clone())?;
let index = builder.index().clone();
let schema = builder.schema().clone();
let watcher = FileWatcher::new(config.watcher_config.clone())?;
Ok(Self {
index,
schema,
builder: Arc::new(Mutex::new(builder)),
watcher: Arc::new(Mutex::new(watcher)),
wal: Arc::new(Mutex::new(WriteAheadLog::new())),
stats: Arc::new(Mutex::new(IncrementalStats::default())),
config,
})
}
pub async fn watch(&self, path: &Path) -> Result<()> {
info!("Starting watch on: {}", path.display());
let mut watcher = self.watcher.lock().await;
watcher.watch_path(path)?;
Ok(())
}
pub async fn unwatch(&self, path: &Path) -> Result<()> {
info!("Stopping watch on: {}", path.display());
let mut watcher = self.watcher.lock().await;
watcher.unwatch_path(path)?;
Ok(())
}
pub async fn run(&self) -> Result<()> {
info!("Starting incremental indexer event loop");
let mut commit_interval =
time::interval(Duration::from_secs(self.config.auto_commit_interval));
loop {
tokio::select! {
_ = commit_interval.tick() => {
if let Err(e) = self.commit_if_needed().await {
error!("Auto commit failed: {}", e);
}
}
_ = time::sleep(Duration::from_millis(100)) => {
if let Err(e) = self.process_events().await {
error!("Process events failed: {}", e);
}
}
}
}
}
async fn process_events(&self) -> Result<()> {
let mut watcher = self.watcher.lock().await;
let events = watcher.recv_events()?;
if events.is_empty() {
return Ok(());
}
debug!("Processing {} file events", events.len());
for event in events {
if let Err(e) = self.handle_event(event).await {
error!("Handle event failed: {}", e);
let mut stats = self.stats.lock().await;
stats.error_count += 1;
}
}
self.commit_if_needed().await?;
Ok(())
}
async fn handle_event(&self, event: FileEvent) -> Result<()> {
match event {
FileEvent::Created(path) => {
self.apply_create(path).await?;
}
FileEvent::Modified(path) => {
self.apply_modify(path).await?;
}
FileEvent::Deleted(path) => {
self.apply_delete(path).await?;
}
FileEvent::Renamed { from, to } => {
self.apply_delete(from).await?;
self.apply_create(to).await?;
}
}
Ok(())
}
async fn apply_create(&self, path: PathBuf) -> Result<()> {
debug!("Applying create: {}", path.display());
if !path.exists() {
debug!("File no longer exists, skipping: {}", path.display());
return Ok(());
}
let file = self.scan_file(&path)?;
let mut builder = self.builder.lock().await;
builder.add_document(&file)?;
let mut wal = self.wal.lock().await;
wal.log_create(&path);
let mut stats = self.stats.lock().await;
stats.created_count += 1;
stats.pending_changes += 1;
stats.last_update = Some(SystemTime::now());
info!("Indexed new file: {}", path.display());
Ok(())
}
async fn apply_modify(&self, path: PathBuf) -> Result<()> {
debug!("Applying modify: {}", path.display());
if !path.exists() {
debug!("File no longer exists, treating as delete: {}", path.display());
return self.apply_delete(path).await;
}
let mut builder = self.builder.lock().await;
builder.delete_document(&path)?;
let file = self.scan_file(&path)?;
builder.add_document(&file)?;
let mut wal = self.wal.lock().await;
wal.log_modify(&path);
let mut stats = self.stats.lock().await;
stats.modified_count += 1;
stats.pending_changes += 1;
stats.last_update = Some(SystemTime::now());
info!("Updated file in index: {}", path.display());
Ok(())
}
async fn apply_delete(&self, path: PathBuf) -> Result<()> {
debug!("Applying delete: {}", path.display());
let mut builder = self.builder.lock().await;
builder.delete_document(&path)?;
let mut wal = self.wal.lock().await;
wal.log_delete(&path);
let mut stats = self.stats.lock().await;
stats.deleted_count += 1;
stats.pending_changes += 1;
stats.last_update = Some(SystemTime::now());
Ok(())
}
fn scan_file(&self, path: &Path) -> Result<ScannedFile> {
let metadata =
fs::metadata(path).with_context(|| format!("Failed to read metadata: {:?}", path))?;
Ok(ScannedFile {
path: path.to_path_buf(),
size: metadata.len(),
modified: metadata.modified().ok(),
is_dir: metadata.is_dir(),
})
}
async fn commit_if_needed(&self) -> Result<()> {
let stats = self.stats.lock().await;
let pending = stats.pending_changes;
drop(stats);
if pending >= self.config.commit_threshold {
info!("Committing {} pending changes", pending);
self.commit().await?;
}
Ok(())
}
pub async fn commit(&self) -> Result<()> {
info!("Committing index changes");
let mut builder = self.builder.lock().await;
builder.commit_changes()?;
let mut stats = self.stats.lock().await;
stats.pending_changes = 0;
Ok(())
}
pub async fn stats(&self) -> IncrementalStats {
self.stats.lock().await.clone()
}
pub async fn recent_operations(&self, limit: usize) -> Vec<WalEntry> {
let wal = self.wal.lock().await;
wal.recent_operations(limit).into_iter().cloned().collect()
}
pub fn index(&self) -> &Index {
&self.index
}
pub fn schema(&self) -> &IndexSchema {
&self.schema
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_wal_basic() {
let mut wal = WriteAheadLog::new();
wal.log_create(Path::new("test1.txt"));
wal.log_modify(Path::new("test2.txt"));
wal.log_delete(Path::new("test3.txt"));
assert_eq!(wal.len(), 3);
let recent = wal.recent_operations(2);
assert_eq!(recent.len(), 2);
assert_eq!(recent[0].operation, WalOperation::Delete);
assert_eq!(recent[1].operation, WalOperation::Modify);
}
#[test]
fn test_wal_max_entries() {
let mut wal = WriteAheadLog::new();
for i in 0..1500 {
wal.log_create(&PathBuf::from(format!("test{}.txt", i)));
}
assert_eq!(wal.len(), 1000);
}
#[test]
fn test_wal_clear() {
let mut wal = WriteAheadLog::new();
wal.log_create(Path::new("test.txt"));
assert_eq!(wal.len(), 1);
wal.clear();
assert_eq!(wal.len(), 0);
assert!(wal.is_empty());
}
#[tokio::test]
async fn test_incremental_config_default() {
let config = IncrementalConfig::default();
assert_eq!(config.commit_threshold, 50);
assert_eq!(config.auto_commit_interval, 30);
}
#[tokio::test]
async fn test_incremental_indexer_creation() {
let temp_dir = TempDir::new().unwrap();
let config = IncrementalConfig {
index_config: IndexConfig {
index_path: temp_dir.path().join("index"),
..Default::default()
},
..Default::default()
};
let indexer = IncrementalIndexer::new(config).await;
assert!(indexer.is_ok());
}
#[tokio::test]
async fn test_incremental_stats() {
let temp_dir = TempDir::new().unwrap();
let config = IncrementalConfig {
index_config: IndexConfig {
index_path: temp_dir.path().join("index"),
..Default::default()
},
..Default::default()
};
let indexer = IncrementalIndexer::new(config).await.unwrap();
let stats = indexer.stats().await;
assert_eq!(stats.created_count, 0);
assert_eq!(stats.modified_count, 0);
assert_eq!(stats.deleted_count, 0);
}
#[tokio::test]
async fn test_incremental_watch() {
let temp_dir = TempDir::new().unwrap();
let config = IncrementalConfig {
index_config: IndexConfig {
index_path: temp_dir.path().join("index"),
..Default::default()
},
..Default::default()
};
let indexer = IncrementalIndexer::new(config).await.unwrap();
let result = indexer.watch(temp_dir.path()).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_scan_file() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("test.txt");
fs::write(&test_file, "hello").unwrap();
let config = IncrementalConfig {
index_config: IndexConfig {
index_path: temp_dir.path().join("index"),
..Default::default()
},
..Default::default()
};
let indexer = IncrementalIndexer::new(config).await.unwrap();
let scanned = indexer.scan_file(&test_file).unwrap();
assert_eq!(scanned.path, test_file);
assert_eq!(scanned.size, 5);
assert!(!scanned.is_dir);
}
#[tokio::test]
async fn test_apply_create() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("test.txt");
fs::write(&test_file, "hello").unwrap();
let config = IncrementalConfig {
index_config: IndexConfig {
index_path: temp_dir.path().join("index"),
..Default::default()
},
..Default::default()
};
let indexer = IncrementalIndexer::new(config).await.unwrap();
let result = indexer.apply_create(test_file).await;
assert!(result.is_ok());
let stats = indexer.stats().await;
assert_eq!(stats.created_count, 1);
assert_eq!(stats.pending_changes, 1);
}
#[tokio::test]
async fn test_apply_delete() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("test.txt");
let config = IncrementalConfig {
index_config: IndexConfig {
index_path: temp_dir.path().join("index"),
..Default::default()
},
..Default::default()
};
let indexer = IncrementalIndexer::new(config).await.unwrap();
let result = indexer.apply_delete(test_file).await;
assert!(result.is_ok());
let stats = indexer.stats().await;
assert_eq!(stats.deleted_count, 1);
}
#[tokio::test]
async fn test_recent_operations() {
let temp_dir = TempDir::new().unwrap();
let config = IncrementalConfig {
index_config: IndexConfig {
index_path: temp_dir.path().join("index"),
..Default::default()
},
..Default::default()
};
let indexer = IncrementalIndexer::new(config).await.unwrap();
{
let mut wal = indexer.wal.lock().await;
wal.log_create(Path::new("file1.txt"));
wal.log_modify(Path::new("file2.txt"));
wal.log_delete(Path::new("file3.txt"));
}
let ops = indexer.recent_operations(5).await;
assert_eq!(ops.len(), 3);
}
}