Skip to main content

cp_desktop/
lib.rs

1//! CP Desktop - Canon node ingestion pipeline and local knowledge management
2//!
3//! Watches directories for documents, parses them, computes embeddings,
4//! builds a local search index, and tracks changes for network sync.
5
6use cp_core::hlc::Hlc;
7use cp_core::{CPError, Document, Embedding, Result};
8use cp_embeddings::EmbeddingEngine;
9use cp_graph::GraphStore;
10use cp_parser::{ChunkConfig, Chunker};
11use std::path::PathBuf;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, Mutex};
14use tracing::info;
15use uuid::Uuid;
16
17pub mod arweave;
18mod sync;
19mod watcher;
20
21pub use arweave::{ArweaveConfig, ArweaveWorker};
22pub use sync::SyncManager;
23pub use watcher::FileWatcher;
24
25#[derive(Clone)]
26pub struct DesktopApp {
27    watch_dirs: Vec<PathBuf>,
28    graph: Arc<Mutex<GraphStore>>,
29    sync_manager: Arc<SyncManager>,
30    status: Arc<Mutex<String>>,
31}
32
33/// Worker that processes files
34struct IngestionWorker {
35    graph: Arc<Mutex<GraphStore>>,
36    embedder: Arc<EmbeddingEngine>,
37    chunker: Chunker,
38    sync_manager: Arc<SyncManager>,
39    status: Arc<Mutex<String>>,
40    device_id: Uuid,
41    root_seq: Arc<AtomicU64>,
42    /// Last HLC timestamp, for proper counter advancement
43    last_hlc: Mutex<Hlc>,
44}
45
46impl IngestionWorker {
47    /// Compute and store the new Merkle state root after a graph mutation
48    fn update_state_root(&self) {
49        let mut graph = self.graph.lock().expect("graph lock poisoned");
50        match graph.compute_merkle_root() {
51            Ok(hash) => {
52                let seq = self.root_seq.fetch_add(1, Ordering::SeqCst) + 1;
53                let parent = graph.get_latest_root().ok().flatten().map(|r| r.hash);
54                let now_ms = std::time::SystemTime::now()
55                    .duration_since(std::time::UNIX_EPOCH)
56                    .unwrap()
57                    .as_millis() as u64;
58                let hlc = {
59                    let mut last = self.last_hlc.lock().expect("hlc lock poisoned");
60                    last.tick(now_ms);
61                    last.clone()
62                };
63                let root = cp_core::StateRoot::new(hash, parent, hlc, self.device_id, seq);
64                if let Err(e) = graph.set_latest_root(&root) {
65                    tracing::error!("Failed to store state root: {}", e);
66                } else {
67                    self.sync_manager.update_state_root(hash);
68                    info!("State root updated: {}", hex::encode(&hash[..8]));
69                }
70            }
71            Err(e) => {
72                tracing::error!("Failed to compute Merkle root: {}", e);
73            }
74        }
75    }
76
77    fn process_event(&self, event: notify::Event) {
78        use notify::event::{ModifyKind, RenameMode};
79        use notify::EventKind;
80
81        // Log all events for debugging
82        tracing::debug!("FS Event: {:?}", event);
83
84        match event.kind {
85            EventKind::Modify(ModifyKind::Name(rename_mode)) => {
86                match rename_mode {
87                    RenameMode::From => {
88                        // File left watched directory
89                        info!("Rename-from detected: {:?}", event.paths);
90                        for path in event.paths {
91                            if let Err(e) = self.remove_file(&path) {
92                                tracing::error!(
93                                    "Failed to remove renamed-from file {:?}: {}",
94                                    path,
95                                    e
96                                );
97                            }
98                        }
99                    }
100                    RenameMode::To => {
101                        // File arrived in watched directory
102                        info!("Rename-to detected: {:?}", event.paths);
103                        for path in event.paths {
104                            if path.is_file() {
105                                let filename =
106                                    path.file_name().and_then(|n| n.to_str()).unwrap_or("");
107                                if filename == ".DS_Store" || filename.starts_with('.') {
108                                    continue;
109                                }
110                                if let Err(e) = self.process_file(&path) {
111                                    tracing::error!(
112                                        "Failed to process renamed-to file {:?}: {}",
113                                        path,
114                                        e
115                                    );
116                                }
117                            }
118                        }
119                    }
120                    RenameMode::Both => {
121                        // Both old and new paths provided: paths[0] = old, paths[1] = new
122                        info!("Rename-both detected: {:?}", event.paths);
123                        if let Some(old_path) = event.paths.first() {
124                            if let Err(e) = self.remove_file(old_path) {
125                                tracing::error!("Failed to remove old path {:?}: {}", old_path, e);
126                            }
127                        }
128                        if let Some(new_path) = event.paths.get(1) {
129                            if new_path.is_file() {
130                                let filename =
131                                    new_path.file_name().and_then(|n| n.to_str()).unwrap_or("");
132                                if filename != ".DS_Store" && !filename.starts_with('.') {
133                                    if let Err(e) = self.process_file(new_path) {
134                                        tracing::error!(
135                                            "Failed to process new path {:?}: {}",
136                                            new_path,
137                                            e
138                                        );
139                                    }
140                                }
141                            }
142                        }
143                    }
144                    RenameMode::Any | RenameMode::Other => {
145                        // Ambiguous rename: check if file exists to decide
146                        info!("Rename-any detected: {:?}", event.paths);
147                        for path in event.paths {
148                            if path.exists() && path.is_file() {
149                                let filename =
150                                    path.file_name().and_then(|n| n.to_str()).unwrap_or("");
151                                if filename == ".DS_Store" || filename.starts_with('.') {
152                                    continue;
153                                }
154                                if let Err(e) = self.process_file(&path) {
155                                    tracing::error!(
156                                        "Failed to process renamed file {:?}: {}",
157                                        path,
158                                        e
159                                    );
160                                }
161                            } else if let Err(e) = self.remove_file(&path) {
162                                tracing::error!("Failed to remove renamed file {:?}: {}", path, e);
163                            }
164                        }
165                    }
166                }
167            }
168            EventKind::Create(_) | EventKind::Modify(_) => {
169                for path in event.paths {
170                    if path.is_file() {
171                        let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
172                        if filename == ".DS_Store" || filename.starts_with('.') {
173                            continue;
174                        }
175                        if let Err(e) = self.process_file(&path) {
176                            tracing::error!("Failed to process file {:?}: {}", path, e);
177                        }
178                    }
179                }
180            }
181            EventKind::Remove(_) => {
182                info!("Remove event detected: {:?}", event.paths);
183                for path in event.paths {
184                    if let Err(e) = self.remove_file(&path) {
185                        tracing::error!("Failed to remove file {:?}: {}", path, e);
186                    }
187                }
188            }
189            _ => {}
190        }
191    }
192
193    fn process_file(&self, path: &PathBuf) -> Result<()> {
194        let filename = path
195            .file_name()
196            .and_then(|n| n.to_str())
197            .unwrap_or("unknown");
198        *self.status.lock().expect("status lock poisoned") = format!("Parsing {filename}");
199        let start_total = std::time::Instant::now();
200
201        // 1. Read metadata
202        let metadata = std::fs::metadata(path).map_err(CPError::Io)?;
203        let mtime = metadata
204            .modified()
205            .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
206            .duration_since(std::time::UNIX_EPOCH)
207            .unwrap()
208            .as_secs() as i64;
209
210        // 2. Parse
211        let start_parse = std::time::Instant::now();
212        let text = cp_parser::parse_file(path)?;
213        let parse_duration = start_parse.elapsed();
214
215        // 3. Create Document
216        let doc = Document::new(path.clone(), text.as_bytes(), mtime);
217
218        // 4. Chunk
219        *self.status.lock().expect("status lock poisoned") = format!("Chunking {filename}");
220        let start_chunk = std::time::Instant::now();
221        let chunks = self.chunker.chunk(doc.id, &text)?;
222        let chunk_duration = start_chunk.elapsed();
223
224        // 5. Embed (with Hierarchical Hash calculation)
225        *self.status.lock().expect("status lock poisoned") = format!("Embedding {filename}");
226        let start_embed = std::time::Instant::now();
227        let chunk_hashes: Vec<[u8; 32]> = chunks.iter().map(|c| c.text_hash).collect();
228        let h_hash = Document::compute_hierarchical_hash(&chunk_hashes);
229        let mut doc = doc;
230        doc.set_hierarchical_hash(h_hash);
231
232        let chunk_texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
233        let embeddings_vec = self.embedder.embed_batch(&chunk_texts)?;
234        let embed_duration = start_embed.elapsed();
235
236        // 6. Store
237        let start_store = std::time::Instant::now();
238        {
239            let mut graph = self.graph.lock().expect("graph lock poisoned");
240
241            graph.insert_document(&doc)?;
242            self.sync_manager.record_add_doc(doc.clone());
243
244            // Insert Chunks & Embeddings
245            for (i, chunk) in chunks.iter().enumerate() {
246                graph.insert_chunk(chunk)?;
247                self.sync_manager.record_add_chunk(chunk.clone());
248
249                if let Some(vec) = embeddings_vec.get(i) {
250                    let emb = Embedding::new(chunk.id, vec, self.embedder.model_hash()?, 0);
251                    graph.insert_embedding(&emb)?;
252                    self.sync_manager.record_add_embedding(emb.clone());
253                }
254            }
255        }
256        let store_duration = start_store.elapsed();
257        let total_duration = start_total.elapsed();
258
259        // Update Merkle state root
260        self.update_state_root();
261
262        *self.status.lock().expect("status lock poisoned") = "Idle".to_string();
263
264        info!(
265            "Processed {:?}: chunks={}, h_hash={}, parse={:?}, chunk={:?}, embed={:?}, store={:?}, total={:?}",
266            path,
267            chunks.len(),
268            hex::encode(&h_hash[..4]),
269            parse_duration,
270            chunk_duration,
271            embed_duration,
272            store_duration,
273            total_duration
274        );
275        Ok(())
276    }
277
278    fn remove_file(&self, path: &PathBuf) -> Result<()> {
279        let filename = path
280            .file_name()
281            .and_then(|n| n.to_str())
282            .unwrap_or("unknown");
283        *self.status.lock().expect("status lock poisoned") = format!("Removing {filename}");
284
285        info!("Remove event received for: {:?}", path);
286
287        let found = {
288            let mut graph = self.graph.lock().expect("graph lock poisoned");
289            if let Some(doc) = graph.get_document_by_path(path)? {
290                info!("Removing document from substrate: {:?}", path);
291
292                // 1. Get associated IDs for sync
293                let chunks = graph.get_chunks_for_doc(doc.id)?;
294                let chunk_ids: Vec<Uuid> = chunks.iter().map(|c| c.id).collect();
295                let mut embedding_ids = Vec::new();
296                for cid in &chunk_ids {
297                    if let Some(emb) = graph.get_embedding_for_chunk(*cid)? {
298                        embedding_ids.push(emb.id);
299                    }
300                }
301
302                // 2. Perform deletion in DB
303                graph.delete_document(doc.id)?;
304
305                // 3. Record for sync
306                self.sync_manager
307                    .record_remove_doc(doc.id, chunk_ids, embedding_ids);
308                true
309            } else {
310                tracing::warn!("Remove event for unknown document: {:?}", path);
311                false
312            }
313        }; // graph lock dropped here
314
315        if found {
316            self.update_state_root();
317            info!("Document removed successfully: {:?}", path);
318        }
319
320        *self.status.lock().expect("status lock poisoned") = "Idle".to_string();
321
322        Ok(())
323    }
324}
325
326impl DesktopApp {
327    pub fn new(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
328        Self::new_inner(data_dir.as_ref(), false)
329    }
330
331    /// Create a `DesktopApp` that skips platform keychain access.
332    ///
333    /// Uses file-based key storage only. Suitable for tests, CI,
334    /// containers, and headless environments.
335    pub fn new_file_only(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
336        Self::new_inner(data_dir.as_ref(), true)
337    }
338
339    fn new_inner(data_dir: &std::path::Path, file_only: bool) -> Result<Self> {
340        std::fs::create_dir_all(data_dir).map_err(CPError::Io)?;
341
342        let db_path = data_dir.join("graph.db");
343        let graph = GraphStore::open(db_path.to_str().unwrap())?;
344        let graph_arc = Arc::new(Mutex::new(graph));
345
346        let key_path = data_dir.join("cp.key");
347        let sync_manager = if file_only {
348            Arc::new(SyncManager::new_file_only(&key_path)?)
349        } else {
350            Arc::new(SyncManager::new(&key_path)?)
351        };
352
353        Ok(Self {
354            watch_dirs: Vec::new(),
355            graph: graph_arc,
356            sync_manager,
357            status: Arc::new(Mutex::new("Idle".to_string())),
358        })
359    }
360
361    pub fn add_watch_dir(&mut self, path: PathBuf) -> Result<()> {
362        if !path.exists() {
363            return Err(CPError::Io(std::io::Error::new(
364                std::io::ErrorKind::NotFound,
365                format!("Path not found: {}", path.display()),
366            )));
367        }
368        self.watch_dirs.push(path);
369        Ok(())
370    }
371
372    pub async fn start(&self) -> Result<()> {
373        let (tx, rx) = std::sync::mpsc::channel();
374        let mut watcher = FileWatcher::new(tx)?;
375
376        for dir in &self.watch_dirs {
377            watcher.watch(dir)?;
378        }
379
380        let embedder = Arc::new(EmbeddingEngine::new()?);
381
382        let device_id = device_uuid_from_sync_manager(&self.sync_manager);
383        let current_seq = {
384            let graph = self.graph.lock().expect("graph lock poisoned");
385            graph.get_latest_root().ok().flatten().map_or(0, |r| r.seq)
386        };
387        let root_seq = Arc::new(AtomicU64::new(current_seq));
388
389        let worker = IngestionWorker {
390            graph: self.graph.clone(),
391            embedder,
392            chunker: Chunker::new(ChunkConfig::default()),
393            sync_manager: self.sync_manager.clone(),
394            status: self.status.clone(),
395            device_id,
396            root_seq: root_seq.clone(),
397            last_hlc: Mutex::new(Hlc::now(device_id)),
398        };
399
400        // Initial crawl (recursive to match the recursive watcher)
401        let mut found_paths = std::collections::HashSet::new();
402        for dir in &self.watch_dirs {
403            crawl_directory_recursive(dir, &worker, &mut found_paths)?;
404        }
405
406        // Pruning: Remove docs from DB that are no longer in watch dirs
407        {
408            let mut graph = self.graph.lock().expect("graph lock poisoned");
409            let all_docs = graph.get_all_documents()?;
410            for doc in all_docs {
411                // Only prune if the doc is in one of the watch dirs but not found in the crawl
412                let in_watch_dir = self.watch_dirs.iter().any(|d| doc.path.starts_with(d));
413                if in_watch_dir && !found_paths.contains(&doc.path) {
414                    info!("Pruning stale document from substrate: {:?}", doc.path);
415                    // Get associated IDs for sync
416                    let chunks = graph.get_chunks_for_doc(doc.id)?;
417                    let chunk_ids: Vec<Uuid> = chunks.iter().map(|c| c.id).collect();
418                    let mut embedding_ids = Vec::new();
419                    for cid in &chunk_ids {
420                        if let Some(emb) = graph.get_embedding_for_chunk(*cid)? {
421                            embedding_ids.push(emb.id);
422                        }
423                    }
424                    graph.delete_document(doc.id)?;
425                    // Record removal diff so the relay (and other devices) know it's gone
426                    self.sync_manager
427                        .record_remove_doc(doc.id, chunk_ids, embedding_ids);
428                }
429            }
430        }
431
432        // Periodic stale document pruning
433        let app_clone = self.clone();
434        tokio::spawn(async move {
435            info!("Starting background prune loop (every 30s)");
436            loop {
437                tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
438                match app_clone.prune_stale_documents() {
439                    Ok(count) if count > 0 => {
440                        info!("Auto-pruned {} stale documents", count);
441                    }
442                    Err(e) => {
443                        tracing::error!("Auto-prune failed: {}", e);
444                    }
445                    _ => {}
446                }
447            }
448        });
449
450        // Run ingestion loop (blocking)
451        tokio::task::spawn_blocking(move || {
452            worker.run(rx);
453        })
454        .await
455        .map_err(|e| CPError::Io(std::io::Error::other(e.to_string())))?;
456
457        Ok(())
458    }
459
460    pub fn graph(&self) -> Arc<Mutex<GraphStore>> {
461        self.graph.clone()
462    }
463
464    pub fn status(&self) -> String {
465        self.status.lock().expect("status lock poisoned").clone()
466    }
467
468    /// Prune stale documents (files that no longer exist) and record for sync
469    pub fn prune_stale_documents(&self) -> Result<usize> {
470        let mut graph = self.graph.lock().expect("graph lock poisoned");
471        let all_docs = graph.get_all_documents()?;
472
473        let mut removed = 0;
474        for doc in all_docs {
475            if !doc.path.exists() {
476                info!("Pruning stale document: {:?}", doc.path);
477
478                // Get associated IDs for sync
479                let chunks = graph.get_chunks_for_doc(doc.id)?;
480                let chunk_ids: Vec<Uuid> = chunks.iter().map(|c| c.id).collect();
481                let mut embedding_ids = Vec::new();
482                for cid in &chunk_ids {
483                    if let Some(emb) = graph.get_embedding_for_chunk(*cid)? {
484                        embedding_ids.push(emb.id);
485                    }
486                }
487
488                // Delete from DB
489                graph.delete_document(doc.id)?;
490
491                // Record for sync
492                self.sync_manager
493                    .record_remove_doc(doc.id, chunk_ids, embedding_ids);
494
495                removed += 1;
496            }
497        }
498
499        // Update state root if anything was pruned (within the same graph lock)
500        if removed > 0 {
501            if let Ok(hash) = graph.compute_merkle_root() {
502                let device_uuid = device_uuid_from_sync_manager(&self.sync_manager);
503                let latest = graph.get_latest_root().ok().flatten();
504                let seq = latest.as_ref().map_or(0, |r| r.seq) + 1;
505                let parent = latest.map(|r| r.hash);
506                let hlc = Hlc::now(device_uuid);
507                let root = cp_core::StateRoot::new(hash, parent, hlc, device_uuid, seq);
508                if let Err(e) = graph.set_latest_root(&root) {
509                    tracing::error!("Failed to store state root after prune: {}", e);
510                } else {
511                    self.sync_manager.update_state_root(hash);
512                    info!(
513                        "State root updated after pruning: {}",
514                        hex::encode(&hash[..8])
515                    );
516                }
517            }
518        }
519
520        Ok(removed)
521    }
522
523    /// Clear the entire graph and re-ingest all files from watched directories
524    pub fn reingest_all(&self) -> Result<usize> {
525        // 1. Clear graph
526        {
527            let mut graph = self.graph.lock().expect("graph lock poisoned");
528            graph.clear_all()?;
529            info!("Graph cleared for full reingest");
530        }
531
532        // 2. Set up embedder and chunker
533        let embedder = Arc::new(EmbeddingEngine::new()?);
534        let chunker = Chunker::new(ChunkConfig::default());
535        let model_hash = embedder.model_hash()?;
536
537        // 3. Collect files recursively (matching watcher and initial crawl)
538        let mut files = Vec::new();
539        for dir in &self.watch_dirs {
540            collect_files_for_reingest(dir, &mut files)?;
541        }
542
543        let mut count = 0;
544        for path in &files {
545            let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
546            *self.status.lock().expect("status lock poisoned") = format!("Reingesting {filename}");
547
548            let text = match cp_parser::parse_file(path) {
549                Ok(t) => t,
550                Err(e) => {
551                    tracing::error!("Failed to parse {:?}: {}", path, e);
552                    continue;
553                }
554            };
555
556            let metadata = std::fs::metadata(path).map_err(CPError::Io)?;
557            let mtime = metadata
558                .modified()
559                .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
560                .duration_since(std::time::UNIX_EPOCH)
561                .unwrap()
562                .as_secs() as i64;
563
564            let mut doc = Document::new(path.clone(), text.as_bytes(), mtime);
565            let chunks = chunker.chunk(doc.id, &text)?;
566
567            let chunk_hashes: Vec<[u8; 32]> = chunks.iter().map(|c| c.text_hash).collect();
568            let h_hash = Document::compute_hierarchical_hash(&chunk_hashes);
569            doc.set_hierarchical_hash(h_hash);
570
571            let chunk_texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
572            let embeddings_vec = embedder.embed_batch(&chunk_texts)?;
573
574            {
575                let mut graph = self.graph.lock().expect("graph lock poisoned");
576                graph.insert_document(&doc)?;
577                self.sync_manager.record_add_doc(doc.clone());
578                for (i, chunk) in chunks.iter().enumerate() {
579                    graph.insert_chunk(chunk)?;
580                    self.sync_manager.record_add_chunk(chunk.clone());
581                    if let Some(vec) = embeddings_vec.get(i) {
582                        let emb = Embedding::new(chunk.id, vec, model_hash, 0);
583                        graph.insert_embedding(&emb)?;
584                        self.sync_manager.record_add_embedding(emb.clone());
585                    }
586                }
587            }
588
589            count += 1;
590            info!("Reingested {:?} ({} chunks)", path, chunks.len());
591        }
592
593        // 4. Compute final state root
594        {
595            let mut graph = self.graph.lock().expect("graph lock poisoned");
596            let hash = graph.compute_merkle_root()?;
597            let device_uuid = device_uuid_from_sync_manager(&self.sync_manager);
598            let hlc = Hlc::now(device_uuid);
599            let root = cp_core::StateRoot::new(hash, None, hlc, device_uuid, 1);
600            graph.set_latest_root(&root)?;
601            self.sync_manager.update_state_root(hash);
602        }
603
604        *self.status.lock().expect("status lock poisoned") = "Idle".to_string();
605        info!("Reingest complete: {} files", count);
606        Ok(count)
607    }
608
609    /// Get the node's device ID as hex
610    pub fn get_device_id_hex(&self) -> String {
611        self.sync_manager.get_device_id_hex()
612    }
613
614    /// Get the sync manager
615    pub fn sync_manager(&self) -> &Arc<SyncManager> {
616        &self.sync_manager
617    }
618}
619
620// Need to update IngestionWorker::run signature
621impl IngestionWorker {
622    #[allow(clippy::needless_pass_by_value)]
623    fn run(&self, rx: std::sync::mpsc::Receiver<notify::Result<notify::Event>>) {
624        loop {
625            match rx.recv() {
626                Ok(res) => match res {
627                    Ok(event) => {
628                        self.process_event(event);
629                    }
630                    Err(e) => {
631                        tracing::error!("Watch error event: {}", e);
632                    }
633                },
634                Err(e) => {
635                    tracing::error!("Watch channel closed: {}", e);
636                    break;
637                }
638            }
639        }
640    }
641}
642
643/// Derive a stable UUID from the sync manager's device public key.
644fn device_uuid_from_sync_manager(sync_manager: &SyncManager) -> Uuid {
645    let device_id_hex = sync_manager.get_device_id_hex();
646    if let Ok(key_bytes) = hex::decode(&device_id_hex) {
647        let mut uuid_bytes = [0u8; 16];
648        let len = key_bytes.len().min(16);
649        uuid_bytes[..len].copy_from_slice(&key_bytes[..len]);
650        Uuid::from_bytes(uuid_bytes)
651    } else {
652        Uuid::new_v4()
653    }
654}
655
656/// Recursively collect files for reingest, skipping dotfiles/dotdirs.
657fn collect_files_for_reingest(dir: &PathBuf, out: &mut Vec<PathBuf>) -> Result<()> {
658    let entries = std::fs::read_dir(dir).map_err(CPError::Io)?;
659    for entry in entries {
660        let entry = entry.map_err(CPError::Io)?;
661        let path = entry.path();
662        let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
663        if filename.starts_with('.') {
664            continue;
665        }
666        if path.is_dir() {
667            collect_files_for_reingest(&path, out)?;
668        } else if path.is_file() {
669            out.push(path);
670        }
671    }
672    Ok(())
673}
674
675/// Recursively crawl a directory and process all files, matching the
676/// recursive watcher behavior.
677fn crawl_directory_recursive(
678    dir: &PathBuf,
679    worker: &IngestionWorker,
680    found_paths: &mut std::collections::HashSet<PathBuf>,
681) -> Result<()> {
682    let entries = std::fs::read_dir(dir).map_err(CPError::Io)?;
683    for entry in entries {
684        let entry = entry.map_err(CPError::Io)?;
685        let path = entry.path();
686        let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
687        if filename.starts_with('.') {
688            continue;
689        }
690        if path.is_dir() {
691            crawl_directory_recursive(&path, worker, found_paths)?;
692        } else if path.is_file() {
693            found_paths.insert(path.clone());
694            if let Err(e) = worker.process_file(&path) {
695                tracing::error!("Failed to process existing file {:?}: {}", path, e);
696            }
697        }
698    }
699    Ok(())
700}