rusty_files/watcher/
synchronizer.rs1use crate::core::config::SearchConfig;
2use crate::core::error::Result;
3use crate::filters::ExclusionFilter;
4use crate::indexer::incremental::IncrementalIndexer;
5use crate::storage::Database;
6use crate::watcher::debouncer::FileEventType;
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10
11#[derive(Clone, Debug)]
12pub struct FileEvent {
13 pub path: PathBuf,
14 pub event_type: FileEventType,
15}
16
17pub struct IndexSynchronizer {
18 indexer: Arc<IncrementalIndexer>,
19 event_receiver: Option<mpsc::UnboundedReceiver<FileEvent>>,
20 event_sender: mpsc::UnboundedSender<FileEvent>,
21}
22
23impl IndexSynchronizer {
24 pub fn new(
25 database: Arc<Database>,
26 config: Arc<SearchConfig>,
27 exclusion_filter: Arc<ExclusionFilter>,
28 ) -> Self {
29 let (sender, receiver) = mpsc::unbounded_channel();
30
31 let indexer = Arc::new(IncrementalIndexer::new(database, config, exclusion_filter));
32
33 Self {
34 indexer,
35 event_receiver: Some(receiver),
36 event_sender: sender,
37 }
38 }
39
40 pub fn get_sender(&self) -> mpsc::UnboundedSender<FileEvent> {
41 self.event_sender.clone()
42 }
43
44 pub async fn start(&mut self) -> Result<()> {
45 let mut receiver = self.event_receiver.take().ok_or_else(|| {
46 crate::core::error::SearchError::NotInitialized(
47 "Synchronizer already started".to_string(),
48 )
49 })?;
50
51 while let Some(event) = receiver.recv().await {
52 if let Err(e) = self.handle_event(event).await {
53 log::error!("Failed to handle file event: {}", e);
54 }
55 }
56
57 Ok(())
58 }
59
60 async fn handle_event(&self, event: FileEvent) -> Result<()> {
61 match event.event_type {
62 FileEventType::Created | FileEventType::Modified => {
63 self.indexer.update_file(&event.path)?;
64 }
65 FileEventType::Deleted => {
66 self.indexer
67 .update_file(&event.path)?;
68 }
69 FileEventType::Renamed => {
70 self.indexer.update_file(&event.path)?;
71 }
72 }
73
74 Ok(())
75 }
76
77 pub fn sync_path(&self, path: PathBuf) -> Result<()> {
78 self.indexer.update_file(path)?;
79 Ok(())
80 }
81
82 pub fn sync_paths(&self, paths: Vec<PathBuf>) -> Result<usize> {
83 self.indexer.update_files(&paths)
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use crate::core::config::SearchConfig;
91 use std::fs;
92 use tempfile::TempDir;
93
94 #[tokio::test]
95 async fn test_synchronizer() {
96 let temp_dir = TempDir::new().unwrap();
97 let file_path = temp_dir.path().join("test.txt");
98
99 fs::write(&file_path, "content").unwrap();
100
101 let db = Arc::new(Database::in_memory(10).unwrap());
102 let config = Arc::new(SearchConfig::default());
103 let filter = Arc::new(ExclusionFilter::default());
104
105 let synchronizer = IndexSynchronizer::new(db, config, filter);
106
107 let result = synchronizer.sync_path(file_path);
108 assert!(result.is_ok());
109 }
110}