1use cp_core::hlc::Hlc;
7use cp_core::{CPError, Document, Embedding, Result};
8use cp_embeddings::EmbeddingEngine;
9use cp_graph::GraphStore;
10use cp_parser::{ChunkConfig, Chunker};
11use std::path::PathBuf;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, Mutex};
14use tracing::info;
15use uuid::Uuid;
16
17pub mod arweave;
18mod sync;
19mod watcher;
20
21pub use arweave::{ArweaveConfig, ArweaveWorker};
22pub use sync::SyncManager;
23pub use watcher::FileWatcher;
24
25#[derive(Clone)]
26pub struct DesktopApp {
27 watch_dirs: Vec<PathBuf>,
28 graph: Arc<Mutex<GraphStore>>,
29 sync_manager: Arc<SyncManager>,
30 status: Arc<Mutex<String>>,
31}
32
33struct IngestionWorker {
35 graph: Arc<Mutex<GraphStore>>,
36 embedder: Arc<EmbeddingEngine>,
37 chunker: Chunker,
38 sync_manager: Arc<SyncManager>,
39 status: Arc<Mutex<String>>,
40 device_id: Uuid,
41 root_seq: Arc<AtomicU64>,
42 last_hlc: Mutex<Hlc>,
44}
45
46impl IngestionWorker {
47 fn update_state_root(&self) {
49 let mut graph = self.graph.lock().expect("graph lock poisoned");
50 match graph.compute_merkle_root() {
51 Ok(hash) => {
52 let seq = self.root_seq.fetch_add(1, Ordering::SeqCst) + 1;
53 let parent = graph.get_latest_root().ok().flatten().map(|r| r.hash);
54 let now_ms = std::time::SystemTime::now()
55 .duration_since(std::time::UNIX_EPOCH)
56 .unwrap()
57 .as_millis() as u64;
58 let hlc = {
59 let mut last = self.last_hlc.lock().expect("hlc lock poisoned");
60 last.tick(now_ms);
61 last.clone()
62 };
63 let root = cp_core::StateRoot::new(hash, parent, hlc, self.device_id, seq);
64 if let Err(e) = graph.set_latest_root(&root) {
65 tracing::error!("Failed to store state root: {}", e);
66 } else {
67 self.sync_manager.update_state_root(hash);
68 info!("State root updated: {}", hex::encode(&hash[..8]));
69 }
70 }
71 Err(e) => {
72 tracing::error!("Failed to compute Merkle root: {}", e);
73 }
74 }
75 }
76
77 fn process_event(&self, event: notify::Event) {
78 use notify::event::{ModifyKind, RenameMode};
79 use notify::EventKind;
80
81 tracing::debug!("FS Event: {:?}", event);
83
84 match event.kind {
85 EventKind::Modify(ModifyKind::Name(rename_mode)) => {
86 match rename_mode {
87 RenameMode::From => {
88 info!("Rename-from detected: {:?}", event.paths);
90 for path in event.paths {
91 if let Err(e) = self.remove_file(&path) {
92 tracing::error!(
93 "Failed to remove renamed-from file {:?}: {}",
94 path,
95 e
96 );
97 }
98 }
99 }
100 RenameMode::To => {
101 info!("Rename-to detected: {:?}", event.paths);
103 for path in event.paths {
104 if path.is_file() {
105 let filename =
106 path.file_name().and_then(|n| n.to_str()).unwrap_or("");
107 if filename == ".DS_Store" || filename.starts_with('.') {
108 continue;
109 }
110 if let Err(e) = self.process_file(&path) {
111 tracing::error!(
112 "Failed to process renamed-to file {:?}: {}",
113 path,
114 e
115 );
116 }
117 }
118 }
119 }
120 RenameMode::Both => {
121 info!("Rename-both detected: {:?}", event.paths);
123 if let Some(old_path) = event.paths.first() {
124 if let Err(e) = self.remove_file(old_path) {
125 tracing::error!("Failed to remove old path {:?}: {}", old_path, e);
126 }
127 }
128 if let Some(new_path) = event.paths.get(1) {
129 if new_path.is_file() {
130 let filename =
131 new_path.file_name().and_then(|n| n.to_str()).unwrap_or("");
132 if filename != ".DS_Store" && !filename.starts_with('.') {
133 if let Err(e) = self.process_file(new_path) {
134 tracing::error!(
135 "Failed to process new path {:?}: {}",
136 new_path,
137 e
138 );
139 }
140 }
141 }
142 }
143 }
144 RenameMode::Any | RenameMode::Other => {
145 info!("Rename-any detected: {:?}", event.paths);
147 for path in event.paths {
148 if path.exists() && path.is_file() {
149 let filename =
150 path.file_name().and_then(|n| n.to_str()).unwrap_or("");
151 if filename == ".DS_Store" || filename.starts_with('.') {
152 continue;
153 }
154 if let Err(e) = self.process_file(&path) {
155 tracing::error!(
156 "Failed to process renamed file {:?}: {}",
157 path,
158 e
159 );
160 }
161 } else if let Err(e) = self.remove_file(&path) {
162 tracing::error!("Failed to remove renamed file {:?}: {}", path, e);
163 }
164 }
165 }
166 }
167 }
168 EventKind::Create(_) | EventKind::Modify(_) => {
169 for path in event.paths {
170 if path.is_file() {
171 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
172 if filename == ".DS_Store" || filename.starts_with('.') {
173 continue;
174 }
175 if let Err(e) = self.process_file(&path) {
176 tracing::error!("Failed to process file {:?}: {}", path, e);
177 }
178 }
179 }
180 }
181 EventKind::Remove(_) => {
182 info!("Remove event detected: {:?}", event.paths);
183 for path in event.paths {
184 if let Err(e) = self.remove_file(&path) {
185 tracing::error!("Failed to remove file {:?}: {}", path, e);
186 }
187 }
188 }
189 _ => {}
190 }
191 }
192
193 fn process_file(&self, path: &PathBuf) -> Result<()> {
194 let filename = path
195 .file_name()
196 .and_then(|n| n.to_str())
197 .unwrap_or("unknown");
198 *self.status.lock().expect("status lock poisoned") = format!("Parsing {filename}");
199 let start_total = std::time::Instant::now();
200
201 let metadata = std::fs::metadata(path).map_err(CPError::Io)?;
203 let mtime = metadata
204 .modified()
205 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
206 .duration_since(std::time::UNIX_EPOCH)
207 .unwrap()
208 .as_secs() as i64;
209
210 let start_parse = std::time::Instant::now();
212 let text = cp_parser::parse_file(path)?;
213 let parse_duration = start_parse.elapsed();
214
215 let doc = Document::new(path.clone(), text.as_bytes(), mtime);
217
218 *self.status.lock().expect("status lock poisoned") = format!("Chunking {filename}");
220 let start_chunk = std::time::Instant::now();
221 let chunks = self.chunker.chunk(doc.id, &text)?;
222 let chunk_duration = start_chunk.elapsed();
223
224 *self.status.lock().expect("status lock poisoned") = format!("Embedding {filename}");
226 let start_embed = std::time::Instant::now();
227 let chunk_hashes: Vec<[u8; 32]> = chunks.iter().map(|c| c.text_hash).collect();
228 let h_hash = Document::compute_hierarchical_hash(&chunk_hashes);
229 let mut doc = doc;
230 doc.set_hierarchical_hash(h_hash);
231
232 let chunk_texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
233 let embeddings_vec = self.embedder.embed_batch(&chunk_texts)?;
234 let embed_duration = start_embed.elapsed();
235
236 let start_store = std::time::Instant::now();
238 {
239 let mut graph = self.graph.lock().expect("graph lock poisoned");
240
241 graph.insert_document(&doc)?;
242 self.sync_manager.record_add_doc(doc.clone());
243
244 for (i, chunk) in chunks.iter().enumerate() {
246 graph.insert_chunk(chunk)?;
247 self.sync_manager.record_add_chunk(chunk.clone());
248
249 if let Some(vec) = embeddings_vec.get(i) {
250 let emb = Embedding::new(chunk.id, vec, self.embedder.model_hash()?, 0);
251 graph.insert_embedding(&emb)?;
252 self.sync_manager.record_add_embedding(emb.clone());
253 }
254 }
255 }
256 let store_duration = start_store.elapsed();
257 let total_duration = start_total.elapsed();
258
259 self.update_state_root();
261
262 *self.status.lock().expect("status lock poisoned") = "Idle".to_string();
263
264 info!(
265 "Processed {:?}: chunks={}, h_hash={}, parse={:?}, chunk={:?}, embed={:?}, store={:?}, total={:?}",
266 path,
267 chunks.len(),
268 hex::encode(&h_hash[..4]),
269 parse_duration,
270 chunk_duration,
271 embed_duration,
272 store_duration,
273 total_duration
274 );
275 Ok(())
276 }
277
278 fn remove_file(&self, path: &PathBuf) -> Result<()> {
279 let filename = path
280 .file_name()
281 .and_then(|n| n.to_str())
282 .unwrap_or("unknown");
283 *self.status.lock().expect("status lock poisoned") = format!("Removing {filename}");
284
285 info!("Remove event received for: {:?}", path);
286
287 let found = {
288 let mut graph = self.graph.lock().expect("graph lock poisoned");
289 if let Some(doc) = graph.get_document_by_path(path)? {
290 info!("Removing document from substrate: {:?}", path);
291
292 let chunks = graph.get_chunks_for_doc(doc.id)?;
294 let chunk_ids: Vec<Uuid> = chunks.iter().map(|c| c.id).collect();
295 let mut embedding_ids = Vec::new();
296 for cid in &chunk_ids {
297 if let Some(emb) = graph.get_embedding_for_chunk(*cid)? {
298 embedding_ids.push(emb.id);
299 }
300 }
301
302 graph.delete_document(doc.id)?;
304
305 self.sync_manager
307 .record_remove_doc(doc.id, chunk_ids, embedding_ids);
308 true
309 } else {
310 tracing::warn!("Remove event for unknown document: {:?}", path);
311 false
312 }
313 }; if found {
316 self.update_state_root();
317 info!("Document removed successfully: {:?}", path);
318 }
319
320 *self.status.lock().expect("status lock poisoned") = "Idle".to_string();
321
322 Ok(())
323 }
324}
325
326impl DesktopApp {
327 pub fn new(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
328 Self::new_inner(data_dir.as_ref(), false)
329 }
330
331 pub fn new_file_only(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
336 Self::new_inner(data_dir.as_ref(), true)
337 }
338
339 fn new_inner(data_dir: &std::path::Path, file_only: bool) -> Result<Self> {
340 std::fs::create_dir_all(data_dir).map_err(CPError::Io)?;
341
342 let db_path = data_dir.join("graph.db");
343 let graph = GraphStore::open(db_path.to_str().unwrap())?;
344 let graph_arc = Arc::new(Mutex::new(graph));
345
346 let key_path = data_dir.join("cp.key");
347 let sync_manager = if file_only {
348 Arc::new(SyncManager::new_file_only(&key_path)?)
349 } else {
350 Arc::new(SyncManager::new(&key_path)?)
351 };
352
353 Ok(Self {
354 watch_dirs: Vec::new(),
355 graph: graph_arc,
356 sync_manager,
357 status: Arc::new(Mutex::new("Idle".to_string())),
358 })
359 }
360
361 pub fn add_watch_dir(&mut self, path: PathBuf) -> Result<()> {
362 if !path.exists() {
363 return Err(CPError::Io(std::io::Error::new(
364 std::io::ErrorKind::NotFound,
365 format!("Path not found: {}", path.display()),
366 )));
367 }
368 self.watch_dirs.push(path);
369 Ok(())
370 }
371
372 pub async fn start(&self) -> Result<()> {
373 let (tx, rx) = std::sync::mpsc::channel();
374 let mut watcher = FileWatcher::new(tx)?;
375
376 for dir in &self.watch_dirs {
377 watcher.watch(dir)?;
378 }
379
380 let embedder = Arc::new(EmbeddingEngine::new()?);
381
382 let device_id = device_uuid_from_sync_manager(&self.sync_manager);
383 let current_seq = {
384 let graph = self.graph.lock().expect("graph lock poisoned");
385 graph.get_latest_root().ok().flatten().map_or(0, |r| r.seq)
386 };
387 let root_seq = Arc::new(AtomicU64::new(current_seq));
388
389 let worker = IngestionWorker {
390 graph: self.graph.clone(),
391 embedder,
392 chunker: Chunker::new(ChunkConfig::default()),
393 sync_manager: self.sync_manager.clone(),
394 status: self.status.clone(),
395 device_id,
396 root_seq: root_seq.clone(),
397 last_hlc: Mutex::new(Hlc::now(device_id)),
398 };
399
400 let mut found_paths = std::collections::HashSet::new();
402 for dir in &self.watch_dirs {
403 crawl_directory_recursive(dir, &worker, &mut found_paths)?;
404 }
405
406 {
408 let mut graph = self.graph.lock().expect("graph lock poisoned");
409 let all_docs = graph.get_all_documents()?;
410 for doc in all_docs {
411 let in_watch_dir = self.watch_dirs.iter().any(|d| doc.path.starts_with(d));
413 if in_watch_dir && !found_paths.contains(&doc.path) {
414 info!("Pruning stale document from substrate: {:?}", doc.path);
415 let chunks = graph.get_chunks_for_doc(doc.id)?;
417 let chunk_ids: Vec<Uuid> = chunks.iter().map(|c| c.id).collect();
418 let mut embedding_ids = Vec::new();
419 for cid in &chunk_ids {
420 if let Some(emb) = graph.get_embedding_for_chunk(*cid)? {
421 embedding_ids.push(emb.id);
422 }
423 }
424 graph.delete_document(doc.id)?;
425 self.sync_manager
427 .record_remove_doc(doc.id, chunk_ids, embedding_ids);
428 }
429 }
430 }
431
432 let app_clone = self.clone();
434 tokio::spawn(async move {
435 info!("Starting background prune loop (every 30s)");
436 loop {
437 tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
438 match app_clone.prune_stale_documents() {
439 Ok(count) if count > 0 => {
440 info!("Auto-pruned {} stale documents", count);
441 }
442 Err(e) => {
443 tracing::error!("Auto-prune failed: {}", e);
444 }
445 _ => {}
446 }
447 }
448 });
449
450 tokio::task::spawn_blocking(move || {
452 worker.run(rx);
453 })
454 .await
455 .map_err(|e| CPError::Io(std::io::Error::other(e.to_string())))?;
456
457 Ok(())
458 }
459
460 pub fn graph(&self) -> Arc<Mutex<GraphStore>> {
461 self.graph.clone()
462 }
463
464 pub fn status(&self) -> String {
465 self.status.lock().expect("status lock poisoned").clone()
466 }
467
468 pub fn prune_stale_documents(&self) -> Result<usize> {
470 let mut graph = self.graph.lock().expect("graph lock poisoned");
471 let all_docs = graph.get_all_documents()?;
472
473 let mut removed = 0;
474 for doc in all_docs {
475 if !doc.path.exists() {
476 info!("Pruning stale document: {:?}", doc.path);
477
478 let chunks = graph.get_chunks_for_doc(doc.id)?;
480 let chunk_ids: Vec<Uuid> = chunks.iter().map(|c| c.id).collect();
481 let mut embedding_ids = Vec::new();
482 for cid in &chunk_ids {
483 if let Some(emb) = graph.get_embedding_for_chunk(*cid)? {
484 embedding_ids.push(emb.id);
485 }
486 }
487
488 graph.delete_document(doc.id)?;
490
491 self.sync_manager
493 .record_remove_doc(doc.id, chunk_ids, embedding_ids);
494
495 removed += 1;
496 }
497 }
498
499 if removed > 0 {
501 if let Ok(hash) = graph.compute_merkle_root() {
502 let device_uuid = device_uuid_from_sync_manager(&self.sync_manager);
503 let latest = graph.get_latest_root().ok().flatten();
504 let seq = latest.as_ref().map_or(0, |r| r.seq) + 1;
505 let parent = latest.map(|r| r.hash);
506 let hlc = Hlc::now(device_uuid);
507 let root = cp_core::StateRoot::new(hash, parent, hlc, device_uuid, seq);
508 if let Err(e) = graph.set_latest_root(&root) {
509 tracing::error!("Failed to store state root after prune: {}", e);
510 } else {
511 self.sync_manager.update_state_root(hash);
512 info!(
513 "State root updated after pruning: {}",
514 hex::encode(&hash[..8])
515 );
516 }
517 }
518 }
519
520 Ok(removed)
521 }
522
523 pub fn reingest_all(&self) -> Result<usize> {
525 {
527 let mut graph = self.graph.lock().expect("graph lock poisoned");
528 graph.clear_all()?;
529 info!("Graph cleared for full reingest");
530 }
531
532 let embedder = Arc::new(EmbeddingEngine::new()?);
534 let chunker = Chunker::new(ChunkConfig::default());
535 let model_hash = embedder.model_hash()?;
536
537 let mut files = Vec::new();
539 for dir in &self.watch_dirs {
540 collect_files_for_reingest(dir, &mut files)?;
541 }
542
543 let mut count = 0;
544 for path in &files {
545 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
546 *self.status.lock().expect("status lock poisoned") = format!("Reingesting {filename}");
547
548 let text = match cp_parser::parse_file(path) {
549 Ok(t) => t,
550 Err(e) => {
551 tracing::error!("Failed to parse {:?}: {}", path, e);
552 continue;
553 }
554 };
555
556 let metadata = std::fs::metadata(path).map_err(CPError::Io)?;
557 let mtime = metadata
558 .modified()
559 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
560 .duration_since(std::time::UNIX_EPOCH)
561 .unwrap()
562 .as_secs() as i64;
563
564 let mut doc = Document::new(path.clone(), text.as_bytes(), mtime);
565 let chunks = chunker.chunk(doc.id, &text)?;
566
567 let chunk_hashes: Vec<[u8; 32]> = chunks.iter().map(|c| c.text_hash).collect();
568 let h_hash = Document::compute_hierarchical_hash(&chunk_hashes);
569 doc.set_hierarchical_hash(h_hash);
570
571 let chunk_texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
572 let embeddings_vec = embedder.embed_batch(&chunk_texts)?;
573
574 {
575 let mut graph = self.graph.lock().expect("graph lock poisoned");
576 graph.insert_document(&doc)?;
577 self.sync_manager.record_add_doc(doc.clone());
578 for (i, chunk) in chunks.iter().enumerate() {
579 graph.insert_chunk(chunk)?;
580 self.sync_manager.record_add_chunk(chunk.clone());
581 if let Some(vec) = embeddings_vec.get(i) {
582 let emb = Embedding::new(chunk.id, vec, model_hash, 0);
583 graph.insert_embedding(&emb)?;
584 self.sync_manager.record_add_embedding(emb.clone());
585 }
586 }
587 }
588
589 count += 1;
590 info!("Reingested {:?} ({} chunks)", path, chunks.len());
591 }
592
593 {
595 let mut graph = self.graph.lock().expect("graph lock poisoned");
596 let hash = graph.compute_merkle_root()?;
597 let device_uuid = device_uuid_from_sync_manager(&self.sync_manager);
598 let hlc = Hlc::now(device_uuid);
599 let root = cp_core::StateRoot::new(hash, None, hlc, device_uuid, 1);
600 graph.set_latest_root(&root)?;
601 self.sync_manager.update_state_root(hash);
602 }
603
604 *self.status.lock().expect("status lock poisoned") = "Idle".to_string();
605 info!("Reingest complete: {} files", count);
606 Ok(count)
607 }
608
609 pub fn get_device_id_hex(&self) -> String {
611 self.sync_manager.get_device_id_hex()
612 }
613
614 pub fn sync_manager(&self) -> &Arc<SyncManager> {
616 &self.sync_manager
617 }
618}
619
620impl IngestionWorker {
622 #[allow(clippy::needless_pass_by_value)]
623 fn run(&self, rx: std::sync::mpsc::Receiver<notify::Result<notify::Event>>) {
624 loop {
625 match rx.recv() {
626 Ok(res) => match res {
627 Ok(event) => {
628 self.process_event(event);
629 }
630 Err(e) => {
631 tracing::error!("Watch error event: {}", e);
632 }
633 },
634 Err(e) => {
635 tracing::error!("Watch channel closed: {}", e);
636 break;
637 }
638 }
639 }
640 }
641}
642
643fn device_uuid_from_sync_manager(sync_manager: &SyncManager) -> Uuid {
645 let device_id_hex = sync_manager.get_device_id_hex();
646 if let Ok(key_bytes) = hex::decode(&device_id_hex) {
647 let mut uuid_bytes = [0u8; 16];
648 let len = key_bytes.len().min(16);
649 uuid_bytes[..len].copy_from_slice(&key_bytes[..len]);
650 Uuid::from_bytes(uuid_bytes)
651 } else {
652 Uuid::new_v4()
653 }
654}
655
656fn collect_files_for_reingest(dir: &PathBuf, out: &mut Vec<PathBuf>) -> Result<()> {
658 let entries = std::fs::read_dir(dir).map_err(CPError::Io)?;
659 for entry in entries {
660 let entry = entry.map_err(CPError::Io)?;
661 let path = entry.path();
662 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
663 if filename.starts_with('.') {
664 continue;
665 }
666 if path.is_dir() {
667 collect_files_for_reingest(&path, out)?;
668 } else if path.is_file() {
669 out.push(path);
670 }
671 }
672 Ok(())
673}
674
675fn crawl_directory_recursive(
678 dir: &PathBuf,
679 worker: &IngestionWorker,
680 found_paths: &mut std::collections::HashSet<PathBuf>,
681) -> Result<()> {
682 let entries = std::fs::read_dir(dir).map_err(CPError::Io)?;
683 for entry in entries {
684 let entry = entry.map_err(CPError::Io)?;
685 let path = entry.path();
686 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
687 if filename.starts_with('.') {
688 continue;
689 }
690 if path.is_dir() {
691 crawl_directory_recursive(&path, worker, found_paths)?;
692 } else if path.is_file() {
693 found_paths.insert(path.clone());
694 if let Err(e) = worker.process_file(&path) {
695 tracing::error!("Failed to process existing file {:?}: {}", path, e);
696 }
697 }
698 }
699 Ok(())
700}