rusty_files/watcher/
synchronizer.rs

1use crate::core::config::SearchConfig;
2use crate::core::error::Result;
3use crate::filters::ExclusionFilter;
4use crate::indexer::incremental::IncrementalIndexer;
5use crate::storage::Database;
6use crate::watcher::debouncer::FileEventType;
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10
11#[derive(Clone, Debug)]
12pub struct FileEvent {
13    pub path: PathBuf,
14    pub event_type: FileEventType,
15}
16
17pub struct IndexSynchronizer {
18    indexer: Arc<IncrementalIndexer>,
19    event_receiver: Option<mpsc::UnboundedReceiver<FileEvent>>,
20    event_sender: mpsc::UnboundedSender<FileEvent>,
21}
22
23impl IndexSynchronizer {
24    pub fn new(
25        database: Arc<Database>,
26        config: Arc<SearchConfig>,
27        exclusion_filter: Arc<ExclusionFilter>,
28    ) -> Self {
29        let (sender, receiver) = mpsc::unbounded_channel();
30
31        let indexer = Arc::new(IncrementalIndexer::new(database, config, exclusion_filter));
32
33        Self {
34            indexer,
35            event_receiver: Some(receiver),
36            event_sender: sender,
37        }
38    }
39
40    pub fn get_sender(&self) -> mpsc::UnboundedSender<FileEvent> {
41        self.event_sender.clone()
42    }
43
44    pub async fn start(&mut self) -> Result<()> {
45        let mut receiver = self.event_receiver.take().ok_or_else(|| {
46            crate::core::error::SearchError::NotInitialized(
47                "Synchronizer already started".to_string(),
48            )
49        })?;
50
51        while let Some(event) = receiver.recv().await {
52            if let Err(e) = self.handle_event(event).await {
53                log::error!("Failed to handle file event: {}", e);
54            }
55        }
56
57        Ok(())
58    }
59
60    async fn handle_event(&self, event: FileEvent) -> Result<()> {
61        match event.event_type {
62            FileEventType::Created | FileEventType::Modified => {
63                self.indexer.update_file(&event.path)?;
64            }
65            FileEventType::Deleted => {
66                self.indexer
67                    .update_file(&event.path)?;
68            }
69            FileEventType::Renamed => {
70                self.indexer.update_file(&event.path)?;
71            }
72        }
73
74        Ok(())
75    }
76
77    pub fn sync_path(&self, path: PathBuf) -> Result<()> {
78        self.indexer.update_file(path)?;
79        Ok(())
80    }
81
82    pub fn sync_paths(&self, paths: Vec<PathBuf>) -> Result<usize> {
83        self.indexer.update_files(&paths)
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use crate::core::config::SearchConfig;
91    use std::fs;
92    use tempfile::TempDir;
93
94    #[tokio::test]
95    async fn test_synchronizer() {
96        let temp_dir = TempDir::new().unwrap();
97        let file_path = temp_dir.path().join("test.txt");
98
99        fs::write(&file_path, "content").unwrap();
100
101        let db = Arc::new(Database::in_memory(10).unwrap());
102        let config = Arc::new(SearchConfig::default());
103        let filter = Arc::new(ExclusionFilter::default());
104
105        let synchronizer = IndexSynchronizer::new(db, config, filter);
106
107        let result = synchronizer.sync_path(file_path);
108        assert!(result.is_ok());
109    }
110}