1use crate::{
4 file_ops::FileOperator,
5 models::{OperationParams, OperationType, TargetType},
6};
7use anyhow::{anyhow, Result};
8use notify::{
9 event::{CreateKind, EventKind, ModifyKind},
10 Event, RecommendedWatcher, RecursiveMode, Watcher,
11};
12use std::{
13 path::{Path, PathBuf},
14 sync::mpsc::{self, Receiver, Sender},
15 time::Duration,
16};
17use thiserror::Error;
18
19#[derive(Error, Debug)]
21pub enum WatchError {
22 #[error("Watch error: {0}")]
23 NotifyError(#[from] notify::Error),
24 #[error("IO error: {0}")]
25 IoError(#[from] std::io::Error),
26 #[error("Channel error: {0}")]
27 ChannelError(String),
28 #[error("Invalid watch configuration: {0}")]
29 InvalidConfig(String),
30}
31
32#[derive(Debug, Clone)]
34pub struct WatchConfig {
35 pub watch_dir: PathBuf,
37 pub target_dir: Option<PathBuf>,
39 pub operation: OperationType,
41 pub password: String,
43 pub delete_source: bool,
45 pub compress: bool,
47 pub watch_extensions: Option<Vec<String>>,
49 pub process_existing: bool,
51 pub debounce_ms: u64,
53}
54
55impl WatchConfig {
56 pub fn new(
58 watch_dir: PathBuf,
59 operation: OperationType,
60 password: String,
61 ) -> Self {
62 Self {
63 watch_dir,
64 target_dir: None,
65 operation,
66 password,
67 delete_source: false,
68 compress: false,
69 watch_extensions: None,
70 process_existing: false,
71 debounce_ms: 1000, }
73 }
74
75 pub fn with_target_dir(mut self, target_dir: PathBuf) -> Self {
77 self.target_dir = Some(target_dir);
78 self
79 }
80
81 pub fn with_delete_source(mut self, delete_source: bool) -> Self {
83 self.delete_source = delete_source;
84 self
85 }
86
87 pub fn with_compression(mut self, compress: bool) -> Self {
89 self.compress = compress;
90 self
91 }
92
93 pub fn with_extensions(mut self, extensions: Vec<String>) -> Self {
95 self.watch_extensions = Some(extensions);
96 self
97 }
98
99 pub fn with_process_existing(mut self, process_existing: bool) -> Self {
101 self.process_existing = process_existing;
102 self
103 }
104
105 pub fn with_debounce_ms(mut self, debounce_ms: u64) -> Self {
107 self.debounce_ms = debounce_ms;
108 self
109 }
110
111 pub fn should_process_file(&self, path: &Path) -> bool {
113 match &self.watch_extensions {
114 Some(extensions) => {
115 if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
116 extensions.iter().any(|e| e.eq_ignore_ascii_case(ext))
117 } else {
118 false
119 }
120 }
121 None => true, }
123 }
124
125 pub fn should_process_by_operation(&self, path: &Path) -> bool {
127 let path_str = path.to_string_lossy();
128 match self.operation {
129 OperationType::Encrypt => {
130 !path_str.ends_with(".sf") && !path_str.ends_with(".sf.gz") && !path_str.ends_with(".hsf")
132 }
133 OperationType::Decrypt => {
134 path_str.ends_with(".sf") || path_str.ends_with(".sf.gz")
136 }
137 OperationType::HybridEncrypt => {
138 !path_str.ends_with(".sf") && !path_str.ends_with(".sf.gz") && !path_str.ends_with(".hsf")
140 }
141 OperationType::HybridDecrypt => {
142 path_str.ends_with(".hsf")
144 }
145 }
146 }
147}
148
149pub struct FileWatcher {
151 config: WatchConfig,
152 operator: FileOperator,
153}
154
155impl FileWatcher {
156 pub fn new(config: WatchConfig) -> Self {
158 Self {
159 config,
160 operator: FileOperator::new(),
161 }
162 }
163
164 pub fn config(&self) -> &WatchConfig {
166 &self.config
167 }
168
169 pub async fn start(&self) -> Result<()> {
171 if !self.config.watch_dir.exists() {
173 return Err(anyhow!(
174 "Watch directory does not exist: {}",
175 self.config.watch_dir.display()
176 ));
177 }
178
179 if let Some(ref target_dir) = self.config.target_dir {
181 std::fs::create_dir_all(target_dir)?;
182 }
183
184 println!(
185 "🔍 Starting {} watcher on directory: {}",
186 match self.config.operation {
187 OperationType::Encrypt => "encryption",
188 OperationType::Decrypt => "decryption",
189 OperationType::HybridEncrypt => "hybrid encryption",
190 OperationType::HybridDecrypt => "hybrid decryption",
191 },
192 self.config.watch_dir.display()
193 );
194
195 if let Some(ref target_dir) = self.config.target_dir {
196 println!("📁 Target directory: {}", target_dir.display());
197 }
198
199 println!("🗑️ Delete source files: {}", self.config.delete_source);
200 println!("🗜️ Compression: {}", self.config.compress);
201
202 if self.config.process_existing {
204 println!("📂 Processing existing files...");
205 self.process_existing_files().await?;
206 }
207
208 let (tx, rx): (Sender<notify::Result<Event>>, Receiver<notify::Result<Event>>) =
210 mpsc::channel();
211
212 let mut watcher = RecommendedWatcher::new(
213 move |res| {
214 if let Err(e) = tx.send(res) {
215 eprintln!("Failed to send watch event: {}", e);
216 }
217 },
218 notify::Config::default(),
219 )?;
220
221 watcher.watch(&self.config.watch_dir, RecursiveMode::Recursive)?;
222
223 println!("👀 Watching for file changes... Press Ctrl+C to stop.");
224
225 self.event_loop(rx).await?;
227
228 Ok(())
229 }
230
231 async fn process_existing_files(&self) -> Result<()> {
233 fn collect_files(dir: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
234 for entry in std::fs::read_dir(dir)? {
235 let entry = entry?;
236 let path = entry.path();
237 if path.is_dir() {
238 collect_files(&path, files)?;
239 } else {
240 files.push(path);
241 }
242 }
243 Ok(())
244 }
245
246 let mut files = Vec::new();
247 collect_files(&self.config.watch_dir, &mut files)?;
248
249 for file in files {
250 if self.should_process_file(&file) {
251 self.process_file(&file).await?;
252 }
253 }
254
255 Ok(())
256 }
257
258 async fn event_loop(&self, rx: Receiver<notify::Result<Event>>) -> Result<()> {
260 let mut debounce_map = std::collections::HashMap::new();
261
262 loop {
263 match rx.recv_timeout(Duration::from_millis(100)) {
264 Ok(Ok(event)) => {
265 self.handle_event(event, &mut debounce_map).await?;
266 }
267 Ok(Err(e)) => {
268 eprintln!("Watch error: {}", e);
269 }
270 Err(mpsc::RecvTimeoutError::Timeout) => {
271 self.process_debounced_files(&mut debounce_map).await?;
273 }
274 Err(mpsc::RecvTimeoutError::Disconnected) => {
275 println!("Watch channel disconnected, stopping...");
276 break;
277 }
278 }
279 }
280
281 Ok(())
282 }
283
284 async fn handle_event(
286 &self,
287 event: Event,
288 debounce_map: &mut std::collections::HashMap<PathBuf, std::time::Instant>,
289 ) -> Result<()> {
290 match event.kind {
291 EventKind::Create(CreateKind::File) | EventKind::Modify(ModifyKind::Data(_)) => {
292 for path in event.paths {
293 if path.is_file() && self.should_process_file(&path) {
294 debounce_map.insert(path, std::time::Instant::now());
296 }
297 }
298 }
299 _ => {}
300 }
301
302 Ok(())
303 }
304
305 async fn process_debounced_files(
307 &self,
308 debounce_map: &mut std::collections::HashMap<PathBuf, std::time::Instant>,
309 ) -> Result<()> {
310 let now = std::time::Instant::now();
311 let debounce_duration = Duration::from_millis(self.config.debounce_ms);
312
313 let mut files_to_process = Vec::new();
314 let mut files_to_remove = Vec::new();
315
316 for (path, timestamp) in debounce_map.iter() {
317 if now.duration_since(*timestamp) >= debounce_duration {
318 if path.exists() {
319 files_to_process.push(path.clone());
320 }
321 files_to_remove.push(path.clone());
322 }
323 }
324
325 for path in files_to_remove {
327 debounce_map.remove(&path);
328 }
329
330 for path in files_to_process {
332 if let Err(e) = self.process_file(&path).await {
333 eprintln!("Failed to process file {}: {}", path.display(), e);
334 }
335 }
336
337 Ok(())
338 }
339
340 fn should_process_file(&self, path: &Path) -> bool {
342 if !path.is_file() {
344 return false;
345 }
346
347 if path
349 .file_name()
350 .and_then(|n| n.to_str())
351 .map(|n| n.starts_with('.'))
352 .unwrap_or(false)
353 {
354 return false;
355 }
356
357 if !self.config.should_process_file(path) {
359 return false;
360 }
361
362 if !self.config.should_process_by_operation(path) {
364 return false;
365 }
366
367 true
368 }
369
370 async fn process_file(&self, source_path: &Path) -> Result<()> {
372 let target_path = if let Some(ref target_dir) = self.config.target_dir {
374 let relative_path = source_path
375 .strip_prefix(&self.config.watch_dir)
376 .unwrap_or(source_path.file_name().unwrap().as_ref());
377 target_dir.join(relative_path)
378 } else {
379 source_path.to_path_buf()
380 };
381
382 let mut params = OperationParams::new(
384 self.config.operation.clone(),
385 TargetType::File,
386 source_path.to_path_buf(),
387 )
388 .with_compression(self.config.compress)
389 .with_delete_source(self.config.delete_source)
390 .with_progress(false); if self.config.target_dir.is_some() {
394 let destination = match self.config.operation {
395 OperationType::Encrypt => {
396 if self.config.compress {
397 target_path.with_extension("sf.gz")
398 } else {
399 target_path.with_extension("sf")
400 }
401 }
402 OperationType::Decrypt => {
403 let path_str = target_path.to_string_lossy();
404 if path_str.ends_with(".sf.gz") {
405 PathBuf::from(path_str.trim_end_matches(".sf.gz"))
406 } else if path_str.ends_with(".sf") {
407 PathBuf::from(path_str.trim_end_matches(".sf"))
408 } else {
409 target_path.with_extension("decrypted")
410 }
411 }
412 OperationType::HybridEncrypt => {
413 target_path.with_extension("hsf")
414 }
415 OperationType::HybridDecrypt => {
416 let path_str = target_path.to_string_lossy();
417 if path_str.ends_with(".hsf") {
418 PathBuf::from(path_str.trim_end_matches(".hsf"))
419 } else {
420 target_path.with_extension("decrypted")
421 }
422 }
423 };
424 params = params.with_destination(destination);
425 }
426
427 if let Some(destination) = ¶ms.destination {
429 if let Some(parent) = destination.parent() {
430 std::fs::create_dir_all(parent)?;
431 }
432 }
433
434 println!(
435 "🔄 Processing file: {} -> {}",
436 source_path.display(),
437 params.get_destination().display()
438 );
439
440 let result = self.operator.process(¶ms, &self.config.password).await;
442
443 if result.success {
444 println!("✅ {}", result);
445 } else {
446 eprintln!("❌ {}", result);
447 return Err(anyhow!(
448 "Failed to process file: {}",
449 result.error.unwrap_or_default()
450 ));
451 }
452
453 Ok(())
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[tokio::test]
462 async fn test_watch_config_creation() {
463 let config = WatchConfig::new(
464 PathBuf::from("/tmp/watch"),
465 OperationType::Encrypt,
466 "password123".to_string(),
467 );
468
469 assert_eq!(config.operation, OperationType::Encrypt);
470 assert_eq!(config.password, "password123");
471 assert!(!config.delete_source);
472 assert!(!config.compress);
473 }
474
475 #[tokio::test]
476 async fn test_should_process_file() {
477 let config = WatchConfig::new(
478 PathBuf::from("/tmp"),
479 OperationType::Encrypt,
480 "password".to_string(),
481 );
482
483 assert!(config.should_process_by_operation(Path::new("test.txt")));
485
486 assert!(!config.should_process_by_operation(Path::new("test.sf")));
488 assert!(!config.should_process_by_operation(Path::new("test.sf.gz")));
489
490 let decrypt_config = WatchConfig::new(
492 PathBuf::from("/tmp"),
493 OperationType::Decrypt,
494 "password".to_string(),
495 );
496
497 assert!(decrypt_config.should_process_by_operation(Path::new("test.sf")));
499 assert!(decrypt_config.should_process_by_operation(Path::new("test.sf.gz")));
500
501 assert!(!decrypt_config.should_process_by_operation(Path::new("test.txt")));
503 }
504
505 #[tokio::test]
506 async fn test_extension_filtering() {
507 let config = WatchConfig::new(
508 PathBuf::from("/tmp"),
509 OperationType::Encrypt,
510 "password".to_string(),
511 ).with_extensions(vec!["txt".to_string(), "doc".to_string()]);
512
513 assert!(config.should_process_file(Path::new("test.txt")));
515 assert!(config.should_process_file(Path::new("document.doc")));
516
517 assert!(!config.should_process_file(Path::new("image.jpg")));
519 assert!(!config.should_process_file(Path::new("video.mp4")));
520 }
521}