1pub mod chunker;
10pub mod loopback;
11
12#[cfg(test)]
13mod tests;
14
15#[cfg(test)]
16mod inline_tests;
17
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::sync::Mutex;
21use std::time::{Duration, Instant};
22
23use notify_debouncer_full::{
24 new_debouncer, notify::RecursiveMode, DebounceEventResult, Debouncer, RecommendedCache,
25};
26use sha2::{Digest, Sha256};
27use tokio_util::sync::CancellationToken;
28
29use crate::config::{ConnectorConfig, ContentSourcesConfig};
30use crate::source::ContentSourceProvider;
31use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
32use crate::storage::watchtower as store;
33use crate::storage::DbPool;
34
35#[derive(Debug, thiserror::Error)]
41pub enum WatchtowerError {
42 #[error("IO error: {0}")]
43 Io(#[from] std::io::Error),
44
45 #[error("storage error: {0}")]
46 Storage(#[from] crate::error::StorageError),
47
48 #[error("notify error: {0}")]
49 Notify(#[from] notify::Error),
50
51 #[error("config error: {0}")]
52 Config(String),
53
54 #[error("chunker error: {0}")]
55 Chunker(#[from] chunker::ChunkerError),
56}
57
58#[derive(Debug, Default)]
64pub struct IngestSummary {
65 pub ingested: u32,
66 pub skipped: u32,
67 pub errors: Vec<String>,
68}
69
70#[derive(Debug, Default)]
72pub struct ParsedFrontMatter {
73 pub title: Option<String>,
74 pub tags: Option<String>,
75 pub raw_yaml: Option<String>,
76}
77
78pub fn parse_front_matter(content: &str) -> (ParsedFrontMatter, &str) {
86 let (yaml_str, body) = loopback::split_front_matter(content);
87
88 let yaml_str = match yaml_str {
89 Some(y) => y,
90 None => return (ParsedFrontMatter::default(), content),
91 };
92
93 let parsed: Result<serde_yaml::Value, _> = serde_yaml::from_str(yaml_str);
94 match parsed {
95 Ok(serde_yaml::Value::Mapping(map)) => {
96 let title = map
97 .get(serde_yaml::Value::String("title".to_string()))
98 .and_then(|v| v.as_str())
99 .map(|s| s.to_string());
100
101 let tags = map
102 .get(serde_yaml::Value::String("tags".to_string()))
103 .map(|v| match v {
104 serde_yaml::Value::Sequence(seq) => seq
105 .iter()
106 .filter_map(|item| item.as_str())
107 .collect::<Vec<_>>()
108 .join(","),
109 serde_yaml::Value::String(s) => s.clone(),
110 _ => String::new(),
111 })
112 .filter(|s| !s.is_empty());
113
114 let fm = ParsedFrontMatter {
115 title,
116 tags,
117 raw_yaml: Some(yaml_str.to_string()),
118 };
119 (fm, body)
120 }
121 _ => (
122 ParsedFrontMatter {
123 raw_yaml: Some(yaml_str.to_string()),
124 ..Default::default()
125 },
126 body,
127 ),
128 }
129}
130
131pub fn matches_patterns(path: &Path, patterns: &[String]) -> bool {
140 let file_name = match path.file_name().and_then(|n| n.to_str()) {
141 Some(n) => n,
142 None => return false,
143 };
144
145 for pattern in patterns {
146 if let Ok(p) = glob::Pattern::new(pattern) {
147 if p.matches(file_name) {
148 return true;
149 }
150 }
151 }
152 false
153}
154
155fn relative_path_string(path: &Path) -> String {
157 path.iter()
158 .map(|part| part.to_string_lossy().into_owned())
159 .collect::<Vec<_>>()
160 .join("/")
161}
162
163pub async fn ingest_content(
173 pool: &DbPool,
174 source_id: i64,
175 provider_id: &str,
176 content: &str,
177 force: bool,
178) -> Result<store::UpsertResult, WatchtowerError> {
179 let (fm, body) = parse_front_matter(content);
180
181 let hash = if force {
182 let mut hasher = Sha256::new();
183 hasher.update(content.as_bytes());
184 hasher.update(
185 std::time::SystemTime::now()
186 .duration_since(std::time::UNIX_EPOCH)
187 .unwrap_or_default()
188 .as_nanos()
189 .to_le_bytes(),
190 );
191 format!("{:x}", hasher.finalize())
192 } else {
193 let mut hasher = Sha256::new();
194 hasher.update(content.as_bytes());
195 format!("{:x}", hasher.finalize())
196 };
197
198 let result = store::upsert_content_node(
199 pool,
200 source_id,
201 provider_id,
202 &hash,
203 fm.title.as_deref(),
204 body,
205 fm.raw_yaml.as_deref(),
206 fm.tags.as_deref(),
207 )
208 .await?;
209
210 Ok(result)
211}
212
213pub async fn ingest_file(
217 pool: &DbPool,
218 source_id: i64,
219 base_path: &Path,
220 relative_path: &str,
221 force: bool,
222) -> Result<store::UpsertResult, WatchtowerError> {
223 let full_path = base_path.join(relative_path);
224 let content = tokio::fs::read_to_string(&full_path).await?;
225 ingest_content(pool, source_id, relative_path, &content, force).await
226}
227
228pub async fn ingest_files(
230 pool: &DbPool,
231 source_id: i64,
232 base_path: &Path,
233 paths: &[String],
234 force: bool,
235) -> IngestSummary {
236 let mut summary = IngestSummary::default();
237
238 for rel_path in paths {
239 match ingest_file(pool, source_id, base_path, rel_path, force).await {
240 Ok(store::UpsertResult::Inserted | store::UpsertResult::Updated) => {
241 summary.ingested += 1;
242 }
243 Ok(store::UpsertResult::Skipped) => {
244 summary.skipped += 1;
245 }
246 Err(e) => {
247 summary.errors.push(format!("{rel_path}: {e}"));
248 }
249 }
250 }
251
252 summary
253}
254
255struct CooldownSet {
261 entries: HashMap<PathBuf, Instant>,
262 ttl: Duration,
263}
264
265impl CooldownSet {
266 fn new(ttl: Duration) -> Self {
267 Self {
268 entries: HashMap::new(),
269 ttl,
270 }
271 }
272
273 #[allow(dead_code)]
275 fn mark(&mut self, path: PathBuf) {
276 self.entries.insert(path, Instant::now());
277 }
278
279 fn is_cooling(&self, path: &Path) -> bool {
281 if let Some(ts) = self.entries.get(path) {
282 ts.elapsed() < self.ttl
283 } else {
284 false
285 }
286 }
287
288 fn cleanup(&mut self) {
290 self.entries.retain(|_, ts| ts.elapsed() < self.ttl);
291 }
292}
293
294type RemoteSource = (i64, Box<dyn ContentSourceProvider>, Vec<String>, Duration);
300
301pub struct WatchtowerLoop {
306 pool: DbPool,
307 config: ContentSourcesConfig,
308 connector_config: ConnectorConfig,
309 data_dir: PathBuf,
310 debounce_duration: Duration,
311 fallback_scan_interval: Duration,
312 cooldown_ttl: Duration,
313}
314
315impl WatchtowerLoop {
316 pub fn new(
318 pool: DbPool,
319 config: ContentSourcesConfig,
320 connector_config: ConnectorConfig,
321 data_dir: PathBuf,
322 ) -> Self {
323 Self {
324 pool,
325 config,
326 connector_config,
327 data_dir,
328 debounce_duration: Duration::from_secs(2),
329 fallback_scan_interval: Duration::from_secs(300), cooldown_ttl: Duration::from_secs(5),
331 }
332 }
333
334 pub async fn run(&self, cancel: CancellationToken) {
340 let local_sources: Vec<_> = self
343 .config
344 .sources
345 .iter()
346 .filter(|s| s.source_type == "local_fs" && s.is_enabled() && s.path.is_some())
347 .collect();
348
349 let remote_sources: Vec<_> = self
350 .config
351 .sources
352 .iter()
353 .filter(|s| s.source_type == "google_drive" && s.is_enabled() && s.folder_id.is_some())
354 .collect();
355
356 if local_sources.is_empty() && remote_sources.is_empty() {
357 tracing::info!("Watchtower: no watch sources configured, exiting");
358 return;
359 }
360
361 let mut source_map: Vec<(i64, PathBuf, Vec<String>)> = Vec::new();
363 for src in &local_sources {
364 let path_str = src.path.as_deref().unwrap();
365 let expanded = PathBuf::from(crate::storage::expand_tilde(path_str));
366
367 let config_json = serde_json::json!({
368 "path": path_str,
369 "file_patterns": src.file_patterns,
370 "loop_back_enabled": src.loop_back_enabled,
371 })
372 .to_string();
373
374 match store::ensure_local_fs_source(&self.pool, path_str, &config_json).await {
375 Ok(source_id) => {
376 source_map.push((source_id, expanded, src.file_patterns.clone()));
377 }
378 Err(e) => {
379 tracing::error!(path = path_str, error = %e, "Failed to register source context");
380 }
381 }
382 }
383
384 let mut remote_map: Vec<RemoteSource> = Vec::new();
386 for src in &remote_sources {
387 let folder_id = src.folder_id.as_deref().unwrap();
388 let config_json = serde_json::json!({
389 "folder_id": folder_id,
390 "file_patterns": src.file_patterns,
391 "service_account_key": src.service_account_key,
392 "connection_id": src.connection_id,
393 })
394 .to_string();
395
396 match store::ensure_google_drive_source(&self.pool, folder_id, &config_json).await {
397 Ok(source_id) => {
398 let interval = Duration::from_secs(src.poll_interval_seconds.unwrap_or(300));
399
400 let provider: Box<dyn ContentSourceProvider> =
402 if let Some(connection_id) = src.connection_id {
403 match self.build_connection_provider(folder_id, connection_id) {
404 Ok(p) => Box::new(p),
405 Err(reason) => {
406 tracing::warn!(
407 folder_id,
408 connection_id,
409 reason = %reason,
410 "Skipping connection-based source"
411 );
412 continue;
413 }
414 }
415 } else if src.service_account_key.is_some() {
416 let key_path = src.service_account_key.clone().unwrap_or_default();
417 Box::new(crate::source::google_drive::GoogleDriveProvider::new(
418 folder_id.to_string(),
419 key_path,
420 ))
421 } else {
422 tracing::warn!(
423 folder_id,
424 "Skipping Google Drive source: no connection_id or service_account_key"
425 );
426 continue;
427 };
428
429 remote_map.push((source_id, provider, src.file_patterns.clone(), interval));
430 }
431 Err(e) => {
432 tracing::error!(
433 folder_id = folder_id,
434 error = %e,
435 "Failed to register Google Drive source"
436 );
437 }
438 }
439 }
440
441 if source_map.is_empty() && remote_map.is_empty() {
442 tracing::warn!("Watchtower: no sources registered, exiting");
443 return;
444 }
445
446 for (source_id, base_path, patterns) in &source_map {
448 let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
449 match self.scan_directory(*source_id, base_path, patterns).await {
450 Ok(_) => {
451 let _ =
452 store::update_source_status(&self.pool, *source_id, "active", None).await;
453 }
454 Err(e) => {
455 tracing::error!(
456 path = %base_path.display(),
457 error = %e,
458 "Initial scan failed"
459 );
460 let _ = store::update_source_status(
461 &self.pool,
462 *source_id,
463 "error",
464 Some(&e.to_string()),
465 )
466 .await;
467 }
468 }
469 }
470
471 self.chunk_pending().await;
473
474 if !remote_map.is_empty() {
476 self.poll_remote_sources(&remote_map).await;
477 self.chunk_pending().await;
478 }
479
480 let watch_source_map: Vec<_> = source_map
484 .iter()
485 .zip(local_sources.iter())
486 .filter(|(_, src)| !src.is_scan_only())
487 .map(|(entry, _)| entry.clone())
488 .collect();
489
490 let notify_source_map: Vec<_> = source_map
492 .iter()
493 .zip(local_sources.iter())
494 .filter(|(_, src)| src.effective_change_detection() == "auto")
495 .map(|(entry, _)| entry.clone())
496 .collect();
497
498 if watch_source_map.is_empty() {
500 if remote_map.is_empty() {
501 tracing::info!(
502 "Watchtower: all local sources are scan-only and no remote sources, exiting"
503 );
504 return;
505 }
506 self.remote_only_loop(&remote_map, cancel).await;
507 return;
508 }
509
510 let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(256);
512
513 let handler = move |result: DebounceEventResult| {
514 let _ = async_tx.blocking_send(result);
515 };
516
517 let debouncer_result = new_debouncer(self.debounce_duration, None, handler);
518 let mut debouncer: Debouncer<notify::RecommendedWatcher, RecommendedCache> =
519 match debouncer_result {
520 Ok(d) => d,
521 Err(e) => {
522 tracing::error!(error = %e, "Failed to create filesystem watcher, falling back to polling");
523 self.polling_loop(&watch_source_map, cancel).await;
524 return;
525 }
526 };
527
528 for (_, base_path, _) in ¬ify_source_map {
530 if let Err(e) = debouncer.watch(base_path, RecursiveMode::Recursive) {
531 tracing::error!(
532 path = %base_path.display(),
533 error = %e,
534 "Failed to watch directory"
535 );
536 }
537 }
538
539 tracing::info!(
540 local_sources = source_map.len(),
541 watching = notify_source_map.len(),
542 polling = watch_source_map.len() - notify_source_map.len(),
543 remote_sources = remote_map.len(),
544 "Watchtower watching for changes"
545 );
546
547 let cooldown = Mutex::new(CooldownSet::new(self.cooldown_ttl));
548
549 let mut fallback_timer = tokio::time::interval(self.fallback_scan_interval);
551 fallback_timer.tick().await; let remote_interval = remote_map
555 .iter()
556 .map(|(_, _, _, d)| *d)
557 .min()
558 .unwrap_or(self.fallback_scan_interval);
559 let mut remote_timer = tokio::time::interval(remote_interval);
560 remote_timer.tick().await; loop {
563 tokio::select! {
564 () = cancel.cancelled() => {
565 tracing::info!("Watchtower: cancellation received, shutting down");
566 break;
567 }
568 _ = fallback_timer.tick() => {
569 for (source_id, base_path, patterns) in &watch_source_map {
572 if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
573 tracing::warn!(
574 path = %base_path.display(),
575 error = %e,
576 "Fallback scan failed"
577 );
578 }
579 }
580 if let Ok(mut cd) = cooldown.lock() {
581 cd.cleanup();
582 }
583 self.chunk_pending().await;
584 }
585 _ = remote_timer.tick(), if !remote_map.is_empty() => {
586 self.poll_remote_sources(&remote_map).await;
587 self.chunk_pending().await;
588 }
589 result = async_rx.recv() => {
590 match result {
591 Some(Ok(events)) => {
592 for event in events {
593 for path in &event.paths {
594 self.handle_event(path, &source_map, &cooldown).await;
595 }
596 }
597 self.chunk_pending().await;
598 }
599 Some(Err(errs)) => {
600 for e in errs {
601 tracing::warn!(error = %e, "Watcher error");
602 }
603 }
604 None => {
605 tracing::warn!("Watcher event channel closed");
606 break;
607 }
608 }
609 }
610 }
611 }
612
613 drop(debouncer);
615 tracing::info!("Watchtower shut down");
616 }
617
618 async fn handle_event(
620 &self,
621 path: &Path,
622 source_map: &[(i64, PathBuf, Vec<String>)],
623 cooldown: &Mutex<CooldownSet>,
624 ) {
625 if let Ok(cd) = cooldown.lock() {
627 if cd.is_cooling(path) {
628 tracing::debug!(path = %path.display(), "Skipping cooldown path");
629 return;
630 }
631 }
632
633 for (source_id, base_path, patterns) in source_map {
635 if path.starts_with(base_path) {
636 if !matches_patterns(path, patterns) {
638 return;
639 }
640
641 let rel = match path.strip_prefix(base_path) {
643 Ok(r) => relative_path_string(r),
644 Err(_) => return,
645 };
646
647 match ingest_file(&self.pool, *source_id, base_path, &rel, false).await {
648 Ok(result) => {
649 tracing::debug!(
650 path = %rel,
651 result = ?result,
652 "Watchtower ingested file"
653 );
654 }
655 Err(e) => {
656 tracing::warn!(
657 path = %rel,
658 error = %e,
659 "Watchtower ingest failed"
660 );
661 }
662 }
663 return;
664 }
665 }
666 }
667
668 async fn scan_directory(
670 &self,
671 source_id: i64,
672 base_path: &Path,
673 patterns: &[String],
674 ) -> Result<IngestSummary, WatchtowerError> {
675 let mut rel_paths = Vec::new();
676 Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
677
678 let summary = ingest_files(&self.pool, source_id, base_path, &rel_paths, false).await;
679
680 tracing::debug!(
681 path = %base_path.display(),
682 ingested = summary.ingested,
683 skipped = summary.skipped,
684 errors = summary.errors.len(),
685 "Directory scan complete"
686 );
687
688 let cursor = chrono::Utc::now().to_rfc3339();
690 if let Err(e) = store::update_sync_cursor(&self.pool, source_id, &cursor).await {
691 tracing::warn!(error = %e, "Failed to update sync cursor");
692 }
693
694 Ok(summary)
695 }
696
697 fn walk_directory(
699 base: &Path,
700 current: &Path,
701 patterns: &[String],
702 out: &mut Vec<String>,
703 ) -> Result<(), WatchtowerError> {
704 let entries = std::fs::read_dir(current)?;
705 for entry in entries {
706 let entry = entry?;
707 let file_type = entry.file_type()?;
708 let path = entry.path();
709
710 if file_type.is_dir() {
711 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
713 if name.starts_with('.') {
714 continue;
715 }
716 }
717 Self::walk_directory(base, &path, patterns, out)?;
718 } else if file_type.is_file() && matches_patterns(&path, patterns) {
719 if let Ok(rel) = path.strip_prefix(base) {
720 out.push(relative_path_string(rel));
721 }
722 }
723 }
724 Ok(())
725 }
726
727 async fn poll_remote_sources(&self, remote_sources: &[RemoteSource]) {
729 for (source_id, provider, patterns, _interval) in remote_sources {
730 let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
731
732 let cursor = match store::get_source_context(&self.pool, *source_id).await {
733 Ok(Some(ctx)) => ctx.sync_cursor,
734 Ok(None) => None,
735 Err(e) => {
736 tracing::warn!(source_id, error = %e, "Failed to get source context");
737 continue;
738 }
739 };
740
741 match provider.scan_for_changes(cursor.as_deref(), patterns).await {
742 Ok(files) => {
743 let mut ingested = 0u32;
744 let mut skipped = 0u32;
745 for file in &files {
746 match provider.read_content(&file.provider_id).await {
747 Ok(content) => {
748 match ingest_content(
749 &self.pool,
750 *source_id,
751 &file.provider_id,
752 &content,
753 false,
754 )
755 .await
756 {
757 Ok(
758 store::UpsertResult::Inserted
759 | store::UpsertResult::Updated,
760 ) => {
761 ingested += 1;
762 }
763 Ok(store::UpsertResult::Skipped) => {
764 skipped += 1;
765 }
766 Err(e) => {
767 tracing::warn!(
768 provider_id = %file.provider_id,
769 error = %e,
770 "Remote ingest failed"
771 );
772 }
773 }
774 }
775 Err(e) => {
776 tracing::warn!(
777 provider_id = %file.provider_id,
778 error = %e,
779 "Failed to read remote content"
780 );
781 }
782 }
783 }
784
785 tracing::debug!(
786 source_type = provider.source_type(),
787 ingested,
788 skipped,
789 total = files.len(),
790 "Remote poll complete"
791 );
792
793 let new_cursor = chrono::Utc::now().to_rfc3339();
795 if let Err(e) =
796 store::update_sync_cursor(&self.pool, *source_id, &new_cursor).await
797 {
798 tracing::warn!(error = %e, "Failed to update remote sync cursor");
799 }
800 let _ =
801 store::update_source_status(&self.pool, *source_id, "active", None).await;
802 }
803 Err(crate::source::SourceError::ConnectionBroken {
804 connection_id,
805 ref reason,
806 }) => {
807 tracing::warn!(
808 source_id,
809 connection_id,
810 reason = %reason,
811 "Connection broken -- marking source as error"
812 );
813 let _ =
814 store::update_source_status(&self.pool, *source_id, "error", Some(reason))
815 .await;
816 let _ =
817 store::update_connection_status(&self.pool, connection_id, "expired").await;
818 }
819 Err(e) => {
820 tracing::warn!(
821 source_type = provider.source_type(),
822 error = %e,
823 "Remote scan failed"
824 );
825 let _ = store::update_source_status(
826 &self.pool,
827 *source_id,
828 "error",
829 Some(&e.to_string()),
830 )
831 .await;
832 }
833 }
834 }
835 }
836
837 async fn remote_only_loop(&self, remote_map: &[RemoteSource], cancel: CancellationToken) {
839 let interval_dur = remote_map
840 .iter()
841 .map(|(_, _, _, d)| *d)
842 .min()
843 .unwrap_or(self.fallback_scan_interval);
844 let mut interval = tokio::time::interval(interval_dur);
845 interval.tick().await;
846
847 loop {
848 tokio::select! {
849 () = cancel.cancelled() => {
850 tracing::info!("Watchtower remote-only loop cancelled");
851 break;
852 }
853 _ = interval.tick() => {
854 self.poll_remote_sources(remote_map).await;
855 self.chunk_pending().await;
856 }
857 }
858 }
859 }
860
861 fn build_connection_provider(
867 &self,
868 folder_id: &str,
869 connection_id: i64,
870 ) -> Result<crate::source::google_drive::GoogleDriveProvider, String> {
871 let key = crate::source::connector::crypto::ensure_connector_key(&self.data_dir)
872 .map_err(|e| format!("connector key error: {e}"))?;
873
874 let connector = crate::source::connector::google_drive::GoogleDriveConnector::new(
875 &self.connector_config.google_drive,
876 )
877 .map_err(|e| format!("connector config error: {e}"))?;
878
879 Ok(
880 crate::source::google_drive::GoogleDriveProvider::from_connection(
881 folder_id.to_string(),
882 connection_id,
883 self.pool.clone(),
884 key,
885 connector,
886 ),
887 )
888 }
889
890 pub async fn reindex_local_source(
895 pool: &DbPool,
896 source_id: i64,
897 base_path: &Path,
898 patterns: &[String],
899 ) -> Result<IngestSummary, WatchtowerError> {
900 store::update_source_status(pool, source_id, "syncing", None).await?;
901
902 let mut rel_paths = Vec::new();
903 Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
904
905 let summary = ingest_files(pool, source_id, base_path, &rel_paths, true).await;
906
907 let cursor = chrono::Utc::now().to_rfc3339();
908 let _ = store::update_sync_cursor(pool, source_id, &cursor).await;
909
910 if summary.errors.is_empty() {
911 let _ = store::update_source_status(pool, source_id, "active", None).await;
912 } else {
913 let msg = format!("{} errors during reindex", summary.errors.len());
914 let _ = store::update_source_status(pool, source_id, "error", Some(&msg)).await;
915 }
916
917 Ok(summary)
918 }
919
920 async fn chunk_pending(&self) {
922 let chunked = chunker::chunk_pending_nodes(&self.pool, DEFAULT_ACCOUNT_ID, 100).await;
923 if chunked > 0 {
924 tracing::debug!(chunked, "Watchtower chunked pending nodes");
925 }
926 }
927
928 async fn polling_loop(
930 &self,
931 source_map: &[(i64, PathBuf, Vec<String>)],
932 cancel: CancellationToken,
933 ) {
934 let mut interval = tokio::time::interval(self.fallback_scan_interval);
935 interval.tick().await; loop {
938 tokio::select! {
939 () = cancel.cancelled() => {
940 tracing::info!("Watchtower polling loop cancelled");
941 break;
942 }
943 _ = interval.tick() => {
944 for (source_id, base_path, patterns) in source_map {
945 if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
946 tracing::warn!(
947 path = %base_path.display(),
948 error = %e,
949 "Polling scan failed"
950 );
951 }
952 }
953 }
954 }
955 }
956 }
957}