1pub mod loopback;
10
11#[cfg(test)]
12mod tests;
13
14use std::collections::HashMap;
15use std::path::{Path, PathBuf};
16use std::sync::Mutex;
17use std::time::{Duration, Instant};
18
19use notify_debouncer_full::{
20 new_debouncer, notify::RecursiveMode, DebounceEventResult, Debouncer, RecommendedCache,
21};
22use sha2::{Digest, Sha256};
23use tokio_util::sync::CancellationToken;
24
25use crate::config::{ConnectorConfig, ContentSourcesConfig};
26use crate::source::ContentSourceProvider;
27use crate::storage::watchtower as store;
28use crate::storage::DbPool;
29
30#[derive(Debug, thiserror::Error)]
36pub enum WatchtowerError {
37 #[error("IO error: {0}")]
38 Io(#[from] std::io::Error),
39
40 #[error("storage error: {0}")]
41 Storage(#[from] crate::error::StorageError),
42
43 #[error("notify error: {0}")]
44 Notify(#[from] notify::Error),
45
46 #[error("config error: {0}")]
47 Config(String),
48}
49
50#[derive(Debug, Default)]
56pub struct IngestSummary {
57 pub ingested: u32,
58 pub skipped: u32,
59 pub errors: Vec<String>,
60}
61
62#[derive(Debug, Default)]
64pub struct ParsedFrontMatter {
65 pub title: Option<String>,
66 pub tags: Option<String>,
67 pub raw_yaml: Option<String>,
68}
69
70pub fn parse_front_matter(content: &str) -> (ParsedFrontMatter, &str) {
78 let (yaml_str, body) = loopback::split_front_matter(content);
79
80 let yaml_str = match yaml_str {
81 Some(y) => y,
82 None => return (ParsedFrontMatter::default(), content),
83 };
84
85 let parsed: Result<serde_yaml::Value, _> = serde_yaml::from_str(yaml_str);
86 match parsed {
87 Ok(serde_yaml::Value::Mapping(map)) => {
88 let title = map
89 .get(serde_yaml::Value::String("title".to_string()))
90 .and_then(|v| v.as_str())
91 .map(|s| s.to_string());
92
93 let tags = map
94 .get(serde_yaml::Value::String("tags".to_string()))
95 .map(|v| match v {
96 serde_yaml::Value::Sequence(seq) => seq
97 .iter()
98 .filter_map(|item| item.as_str())
99 .collect::<Vec<_>>()
100 .join(","),
101 serde_yaml::Value::String(s) => s.clone(),
102 _ => String::new(),
103 })
104 .filter(|s| !s.is_empty());
105
106 let fm = ParsedFrontMatter {
107 title,
108 tags,
109 raw_yaml: Some(yaml_str.to_string()),
110 };
111 (fm, body)
112 }
113 _ => (
114 ParsedFrontMatter {
115 raw_yaml: Some(yaml_str.to_string()),
116 ..Default::default()
117 },
118 body,
119 ),
120 }
121}
122
123pub fn matches_patterns(path: &Path, patterns: &[String]) -> bool {
132 let file_name = match path.file_name().and_then(|n| n.to_str()) {
133 Some(n) => n,
134 None => return false,
135 };
136
137 for pattern in patterns {
138 if let Ok(p) = glob::Pattern::new(pattern) {
139 if p.matches(file_name) {
140 return true;
141 }
142 }
143 }
144 false
145}
146
147fn relative_path_string(path: &Path) -> String {
149 path.iter()
150 .map(|part| part.to_string_lossy().into_owned())
151 .collect::<Vec<_>>()
152 .join("/")
153}
154
155pub async fn ingest_content(
165 pool: &DbPool,
166 source_id: i64,
167 provider_id: &str,
168 content: &str,
169 force: bool,
170) -> Result<store::UpsertResult, WatchtowerError> {
171 let (fm, body) = parse_front_matter(content);
172
173 let hash = if force {
174 let mut hasher = Sha256::new();
175 hasher.update(content.as_bytes());
176 hasher.update(
177 std::time::SystemTime::now()
178 .duration_since(std::time::UNIX_EPOCH)
179 .unwrap_or_default()
180 .as_nanos()
181 .to_le_bytes(),
182 );
183 format!("{:x}", hasher.finalize())
184 } else {
185 let mut hasher = Sha256::new();
186 hasher.update(content.as_bytes());
187 format!("{:x}", hasher.finalize())
188 };
189
190 let result = store::upsert_content_node(
191 pool,
192 source_id,
193 provider_id,
194 &hash,
195 fm.title.as_deref(),
196 body,
197 fm.raw_yaml.as_deref(),
198 fm.tags.as_deref(),
199 )
200 .await?;
201
202 Ok(result)
203}
204
205pub async fn ingest_file(
209 pool: &DbPool,
210 source_id: i64,
211 base_path: &Path,
212 relative_path: &str,
213 force: bool,
214) -> Result<store::UpsertResult, WatchtowerError> {
215 let full_path = base_path.join(relative_path);
216 let content = tokio::fs::read_to_string(&full_path).await?;
217 ingest_content(pool, source_id, relative_path, &content, force).await
218}
219
220pub async fn ingest_files(
222 pool: &DbPool,
223 source_id: i64,
224 base_path: &Path,
225 paths: &[String],
226 force: bool,
227) -> IngestSummary {
228 let mut summary = IngestSummary::default();
229
230 for rel_path in paths {
231 match ingest_file(pool, source_id, base_path, rel_path, force).await {
232 Ok(store::UpsertResult::Inserted | store::UpsertResult::Updated) => {
233 summary.ingested += 1;
234 }
235 Ok(store::UpsertResult::Skipped) => {
236 summary.skipped += 1;
237 }
238 Err(e) => {
239 summary.errors.push(format!("{rel_path}: {e}"));
240 }
241 }
242 }
243
244 summary
245}
246
247struct CooldownSet {
253 entries: HashMap<PathBuf, Instant>,
254 ttl: Duration,
255}
256
257impl CooldownSet {
258 fn new(ttl: Duration) -> Self {
259 Self {
260 entries: HashMap::new(),
261 ttl,
262 }
263 }
264
265 #[allow(dead_code)]
267 fn mark(&mut self, path: PathBuf) {
268 self.entries.insert(path, Instant::now());
269 }
270
271 fn is_cooling(&self, path: &Path) -> bool {
273 if let Some(ts) = self.entries.get(path) {
274 ts.elapsed() < self.ttl
275 } else {
276 false
277 }
278 }
279
280 fn cleanup(&mut self) {
282 self.entries.retain(|_, ts| ts.elapsed() < self.ttl);
283 }
284}
285
286type RemoteSource = (i64, Box<dyn ContentSourceProvider>, Vec<String>, Duration);
292
293pub struct WatchtowerLoop {
298 pool: DbPool,
299 config: ContentSourcesConfig,
300 connector_config: ConnectorConfig,
301 data_dir: PathBuf,
302 debounce_duration: Duration,
303 fallback_scan_interval: Duration,
304 cooldown_ttl: Duration,
305}
306
307impl WatchtowerLoop {
308 pub fn new(
310 pool: DbPool,
311 config: ContentSourcesConfig,
312 connector_config: ConnectorConfig,
313 data_dir: PathBuf,
314 ) -> Self {
315 Self {
316 pool,
317 config,
318 connector_config,
319 data_dir,
320 debounce_duration: Duration::from_secs(2),
321 fallback_scan_interval: Duration::from_secs(300), cooldown_ttl: Duration::from_secs(5),
323 }
324 }
325
326 pub async fn run(&self, cancel: CancellationToken) {
332 let local_sources: Vec<_> = self
334 .config
335 .sources
336 .iter()
337 .filter(|s| s.source_type == "local_fs" && s.watch && s.path.is_some())
338 .collect();
339
340 let remote_sources: Vec<_> = self
341 .config
342 .sources
343 .iter()
344 .filter(|s| s.source_type == "google_drive" && s.folder_id.is_some())
345 .collect();
346
347 if local_sources.is_empty() && remote_sources.is_empty() {
348 tracing::info!("Watchtower: no watch sources configured, exiting");
349 return;
350 }
351
352 let mut source_map: Vec<(i64, PathBuf, Vec<String>)> = Vec::new();
354 for src in &local_sources {
355 let path_str = src.path.as_deref().unwrap();
356 let expanded = PathBuf::from(crate::storage::expand_tilde(path_str));
357
358 let config_json = serde_json::json!({
359 "path": path_str,
360 "file_patterns": src.file_patterns,
361 "loop_back_enabled": src.loop_back_enabled,
362 })
363 .to_string();
364
365 match store::ensure_local_fs_source(&self.pool, path_str, &config_json).await {
366 Ok(source_id) => {
367 source_map.push((source_id, expanded, src.file_patterns.clone()));
368 }
369 Err(e) => {
370 tracing::error!(path = path_str, error = %e, "Failed to register source context");
371 }
372 }
373 }
374
375 let mut remote_map: Vec<RemoteSource> = Vec::new();
377 for src in &remote_sources {
378 let folder_id = src.folder_id.as_deref().unwrap();
379 let config_json = serde_json::json!({
380 "folder_id": folder_id,
381 "file_patterns": src.file_patterns,
382 "service_account_key": src.service_account_key,
383 "connection_id": src.connection_id,
384 })
385 .to_string();
386
387 match store::ensure_google_drive_source(&self.pool, folder_id, &config_json).await {
388 Ok(source_id) => {
389 let interval = Duration::from_secs(src.poll_interval_seconds.unwrap_or(300));
390
391 let provider: Box<dyn ContentSourceProvider> =
393 if let Some(connection_id) = src.connection_id {
394 match self.build_connection_provider(folder_id, connection_id) {
395 Ok(p) => Box::new(p),
396 Err(reason) => {
397 tracing::warn!(
398 folder_id,
399 connection_id,
400 reason = %reason,
401 "Skipping connection-based source"
402 );
403 continue;
404 }
405 }
406 } else if src.service_account_key.is_some() {
407 let key_path = src.service_account_key.clone().unwrap_or_default();
408 Box::new(crate::source::google_drive::GoogleDriveProvider::new(
409 folder_id.to_string(),
410 key_path,
411 ))
412 } else {
413 tracing::warn!(
414 folder_id,
415 "Skipping Google Drive source: no connection_id or service_account_key"
416 );
417 continue;
418 };
419
420 remote_map.push((source_id, provider, src.file_patterns.clone(), interval));
421 }
422 Err(e) => {
423 tracing::error!(
424 folder_id = folder_id,
425 error = %e,
426 "Failed to register Google Drive source"
427 );
428 }
429 }
430 }
431
432 if source_map.is_empty() && remote_map.is_empty() {
433 tracing::warn!("Watchtower: no sources registered, exiting");
434 return;
435 }
436
437 for (source_id, base_path, patterns) in &source_map {
439 if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
440 tracing::error!(
441 path = %base_path.display(),
442 error = %e,
443 "Initial scan failed"
444 );
445 }
446 }
447
448 if !remote_map.is_empty() {
450 self.poll_remote_sources(&remote_map).await;
451 }
452
453 if source_map.is_empty() {
455 self.remote_only_loop(&remote_map, cancel).await;
456 return;
457 }
458
459 let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(256);
461
462 let handler = move |result: DebounceEventResult| {
463 let _ = async_tx.blocking_send(result);
464 };
465
466 let debouncer_result = new_debouncer(self.debounce_duration, None, handler);
467 let mut debouncer: Debouncer<notify::RecommendedWatcher, RecommendedCache> =
468 match debouncer_result {
469 Ok(d) => d,
470 Err(e) => {
471 tracing::error!(error = %e, "Failed to create filesystem watcher, falling back to polling");
472 self.polling_loop(&source_map, cancel).await;
473 return;
474 }
475 };
476
477 for (_, base_path, _) in &source_map {
479 if let Err(e) = debouncer.watch(base_path, RecursiveMode::Recursive) {
480 tracing::error!(
481 path = %base_path.display(),
482 error = %e,
483 "Failed to watch directory"
484 );
485 }
486 }
487
488 tracing::info!(
489 local_sources = source_map.len(),
490 remote_sources = remote_map.len(),
491 "Watchtower watching for changes"
492 );
493
494 let cooldown = Mutex::new(CooldownSet::new(self.cooldown_ttl));
495
496 let mut fallback_timer = tokio::time::interval(self.fallback_scan_interval);
498 fallback_timer.tick().await; let remote_interval = remote_map
502 .iter()
503 .map(|(_, _, _, d)| *d)
504 .min()
505 .unwrap_or(self.fallback_scan_interval);
506 let mut remote_timer = tokio::time::interval(remote_interval);
507 remote_timer.tick().await; loop {
510 tokio::select! {
511 () = cancel.cancelled() => {
512 tracing::info!("Watchtower: cancellation received, shutting down");
513 break;
514 }
515 _ = fallback_timer.tick() => {
516 for (source_id, base_path, patterns) in &source_map {
518 if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
519 tracing::warn!(
520 path = %base_path.display(),
521 error = %e,
522 "Fallback scan failed"
523 );
524 }
525 }
526 if let Ok(mut cd) = cooldown.lock() {
527 cd.cleanup();
528 }
529 }
530 _ = remote_timer.tick(), if !remote_map.is_empty() => {
531 self.poll_remote_sources(&remote_map).await;
532 }
533 result = async_rx.recv() => {
534 match result {
535 Some(Ok(events)) => {
536 for event in events {
537 for path in &event.paths {
538 self.handle_event(path, &source_map, &cooldown).await;
539 }
540 }
541 }
542 Some(Err(errs)) => {
543 for e in errs {
544 tracing::warn!(error = %e, "Watcher error");
545 }
546 }
547 None => {
548 tracing::warn!("Watcher event channel closed");
549 break;
550 }
551 }
552 }
553 }
554 }
555
556 drop(debouncer);
558 tracing::info!("Watchtower shut down");
559 }
560
561 async fn handle_event(
563 &self,
564 path: &Path,
565 source_map: &[(i64, PathBuf, Vec<String>)],
566 cooldown: &Mutex<CooldownSet>,
567 ) {
568 if let Ok(cd) = cooldown.lock() {
570 if cd.is_cooling(path) {
571 tracing::debug!(path = %path.display(), "Skipping cooldown path");
572 return;
573 }
574 }
575
576 for (source_id, base_path, patterns) in source_map {
578 if path.starts_with(base_path) {
579 if !matches_patterns(path, patterns) {
581 return;
582 }
583
584 let rel = match path.strip_prefix(base_path) {
586 Ok(r) => relative_path_string(r),
587 Err(_) => return,
588 };
589
590 match ingest_file(&self.pool, *source_id, base_path, &rel, false).await {
591 Ok(result) => {
592 tracing::debug!(
593 path = %rel,
594 result = ?result,
595 "Watchtower ingested file"
596 );
597 }
598 Err(e) => {
599 tracing::warn!(
600 path = %rel,
601 error = %e,
602 "Watchtower ingest failed"
603 );
604 }
605 }
606 return;
607 }
608 }
609 }
610
611 async fn scan_directory(
613 &self,
614 source_id: i64,
615 base_path: &Path,
616 patterns: &[String],
617 ) -> Result<IngestSummary, WatchtowerError> {
618 let mut rel_paths = Vec::new();
619 Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
620
621 let summary = ingest_files(&self.pool, source_id, base_path, &rel_paths, false).await;
622
623 tracing::debug!(
624 path = %base_path.display(),
625 ingested = summary.ingested,
626 skipped = summary.skipped,
627 errors = summary.errors.len(),
628 "Directory scan complete"
629 );
630
631 let cursor = chrono::Utc::now().to_rfc3339();
633 if let Err(e) = store::update_sync_cursor(&self.pool, source_id, &cursor).await {
634 tracing::warn!(error = %e, "Failed to update sync cursor");
635 }
636
637 Ok(summary)
638 }
639
640 fn walk_directory(
642 base: &Path,
643 current: &Path,
644 patterns: &[String],
645 out: &mut Vec<String>,
646 ) -> Result<(), WatchtowerError> {
647 let entries = std::fs::read_dir(current)?;
648 for entry in entries {
649 let entry = entry?;
650 let file_type = entry.file_type()?;
651 let path = entry.path();
652
653 if file_type.is_dir() {
654 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
656 if name.starts_with('.') {
657 continue;
658 }
659 }
660 Self::walk_directory(base, &path, patterns, out)?;
661 } else if file_type.is_file() && matches_patterns(&path, patterns) {
662 if let Ok(rel) = path.strip_prefix(base) {
663 out.push(relative_path_string(rel));
664 }
665 }
666 }
667 Ok(())
668 }
669
670 async fn poll_remote_sources(&self, remote_sources: &[RemoteSource]) {
672 for (source_id, provider, patterns, _interval) in remote_sources {
673 let cursor = match store::get_source_context(&self.pool, *source_id).await {
674 Ok(Some(ctx)) => ctx.sync_cursor,
675 Ok(None) => None,
676 Err(e) => {
677 tracing::warn!(source_id, error = %e, "Failed to get source context");
678 continue;
679 }
680 };
681
682 match provider.scan_for_changes(cursor.as_deref(), patterns).await {
683 Ok(files) => {
684 let mut ingested = 0u32;
685 let mut skipped = 0u32;
686 for file in &files {
687 match provider.read_content(&file.provider_id).await {
688 Ok(content) => {
689 match ingest_content(
690 &self.pool,
691 *source_id,
692 &file.provider_id,
693 &content,
694 false,
695 )
696 .await
697 {
698 Ok(
699 store::UpsertResult::Inserted
700 | store::UpsertResult::Updated,
701 ) => {
702 ingested += 1;
703 }
704 Ok(store::UpsertResult::Skipped) => {
705 skipped += 1;
706 }
707 Err(e) => {
708 tracing::warn!(
709 provider_id = %file.provider_id,
710 error = %e,
711 "Remote ingest failed"
712 );
713 }
714 }
715 }
716 Err(e) => {
717 tracing::warn!(
718 provider_id = %file.provider_id,
719 error = %e,
720 "Failed to read remote content"
721 );
722 }
723 }
724 }
725
726 tracing::debug!(
727 source_type = provider.source_type(),
728 ingested,
729 skipped,
730 total = files.len(),
731 "Remote poll complete"
732 );
733
734 let new_cursor = chrono::Utc::now().to_rfc3339();
736 if let Err(e) =
737 store::update_sync_cursor(&self.pool, *source_id, &new_cursor).await
738 {
739 tracing::warn!(error = %e, "Failed to update remote sync cursor");
740 }
741 }
742 Err(crate::source::SourceError::ConnectionBroken {
743 connection_id,
744 ref reason,
745 }) => {
746 tracing::warn!(
747 source_id,
748 connection_id,
749 reason = %reason,
750 "Connection broken -- marking source as error"
751 );
752 let _ =
753 store::update_source_status(&self.pool, *source_id, "error", Some(reason))
754 .await;
755 let _ =
756 store::update_connection_status(&self.pool, connection_id, "expired").await;
757 }
758 Err(e) => {
759 tracing::warn!(
760 source_type = provider.source_type(),
761 error = %e,
762 "Remote scan failed"
763 );
764 let _ = store::update_source_status(
765 &self.pool,
766 *source_id,
767 "error",
768 Some(&e.to_string()),
769 )
770 .await;
771 }
772 }
773 }
774 }
775
776 async fn remote_only_loop(&self, remote_map: &[RemoteSource], cancel: CancellationToken) {
778 let interval_dur = remote_map
779 .iter()
780 .map(|(_, _, _, d)| *d)
781 .min()
782 .unwrap_or(self.fallback_scan_interval);
783 let mut interval = tokio::time::interval(interval_dur);
784 interval.tick().await;
785
786 loop {
787 tokio::select! {
788 () = cancel.cancelled() => {
789 tracing::info!("Watchtower remote-only loop cancelled");
790 break;
791 }
792 _ = interval.tick() => {
793 self.poll_remote_sources(remote_map).await;
794 }
795 }
796 }
797 }
798
799 fn build_connection_provider(
805 &self,
806 folder_id: &str,
807 connection_id: i64,
808 ) -> Result<crate::source::google_drive::GoogleDriveProvider, String> {
809 let key = crate::source::connector::crypto::ensure_connector_key(&self.data_dir)
810 .map_err(|e| format!("connector key error: {e}"))?;
811
812 let connector = crate::source::connector::google_drive::GoogleDriveConnector::new(
813 &self.connector_config.google_drive,
814 )
815 .map_err(|e| format!("connector config error: {e}"))?;
816
817 Ok(
818 crate::source::google_drive::GoogleDriveProvider::from_connection(
819 folder_id.to_string(),
820 connection_id,
821 self.pool.clone(),
822 key,
823 connector,
824 ),
825 )
826 }
827
828 async fn polling_loop(
830 &self,
831 source_map: &[(i64, PathBuf, Vec<String>)],
832 cancel: CancellationToken,
833 ) {
834 let mut interval = tokio::time::interval(self.fallback_scan_interval);
835 interval.tick().await; loop {
838 tokio::select! {
839 () = cancel.cancelled() => {
840 tracing::info!("Watchtower polling loop cancelled");
841 break;
842 }
843 _ = interval.tick() => {
844 for (source_id, base_path, patterns) in source_map {
845 if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
846 tracing::warn!(
847 path = %base_path.display(),
848 error = %e,
849 "Polling scan failed"
850 );
851 }
852 }
853 }
854 }
855 }
856 }
857}