Skip to main content

tuitbot_core/automation/watchtower/
mod.rs

1//! Watchtower content source watcher and shared ingest pipeline.
2//!
3//! Watches configured local directories for `.md` and `.txt` changes via
4//! the `notify` crate with debouncing, and polls remote content sources
5//! (e.g. Google Drive) on a configurable interval.  Both local filesystem
6//! events and remote polls funnel through `ingest_content()`, ensuring
7//! identical state transitions.
8
9pub mod chunker;
10pub mod loopback;
11
12#[cfg(test)]
13mod tests;
14
15#[cfg(test)]
16mod inline_tests;
17
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::sync::Mutex;
21use std::time::{Duration, Instant};
22
23use notify_debouncer_full::{
24    new_debouncer, notify::RecursiveMode, DebounceEventResult, Debouncer, RecommendedCache,
25};
26use sha2::{Digest, Sha256};
27use tokio_util::sync::CancellationToken;
28
29use crate::config::{ConnectorConfig, ContentSourcesConfig};
30use crate::source::ContentSourceProvider;
31use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
32use crate::storage::watchtower as store;
33use crate::storage::DbPool;
34
35// ---------------------------------------------------------------------------
36// Error type
37// ---------------------------------------------------------------------------
38
39/// Errors specific to Watchtower operations.
40#[derive(Debug, thiserror::Error)]
41pub enum WatchtowerError {
42    #[error("IO error: {0}")]
43    Io(#[from] std::io::Error),
44
45    #[error("storage error: {0}")]
46    Storage(#[from] crate::error::StorageError),
47
48    #[error("notify error: {0}")]
49    Notify(#[from] notify::Error),
50
51    #[error("config error: {0}")]
52    Config(String),
53
54    #[error("chunker error: {0}")]
55    Chunker(#[from] chunker::ChunkerError),
56}
57
58// ---------------------------------------------------------------------------
59// Ingest result types
60// ---------------------------------------------------------------------------
61
62/// Summary of a batch ingest operation.
63#[derive(Debug, Default)]
64pub struct IngestSummary {
65    pub ingested: u32,
66    pub skipped: u32,
67    pub errors: Vec<String>,
68}
69
70/// Parsed front-matter from a markdown file.
71#[derive(Debug, Default)]
72pub struct ParsedFrontMatter {
73    pub title: Option<String>,
74    pub tags: Option<String>,
75    pub raw_yaml: Option<String>,
76}
77
78// ---------------------------------------------------------------------------
79// Front-matter parsing
80// ---------------------------------------------------------------------------
81
82/// Parse YAML front-matter from file content.
83///
84/// Returns extracted metadata and the body text (content after front-matter).
85pub fn parse_front_matter(content: &str) -> (ParsedFrontMatter, &str) {
86    let (yaml_str, body) = loopback::split_front_matter(content);
87
88    let yaml_str = match yaml_str {
89        Some(y) => y,
90        None => return (ParsedFrontMatter::default(), content),
91    };
92
93    let parsed: Result<serde_yaml::Value, _> = serde_yaml::from_str(yaml_str);
94    match parsed {
95        Ok(serde_yaml::Value::Mapping(map)) => {
96            let title = map
97                .get(serde_yaml::Value::String("title".to_string()))
98                .and_then(|v| v.as_str())
99                .map(|s| s.to_string());
100
101            let tags = map
102                .get(serde_yaml::Value::String("tags".to_string()))
103                .map(|v| match v {
104                    serde_yaml::Value::Sequence(seq) => seq
105                        .iter()
106                        .filter_map(|item| item.as_str())
107                        .collect::<Vec<_>>()
108                        .join(","),
109                    serde_yaml::Value::String(s) => s.clone(),
110                    _ => String::new(),
111                })
112                .filter(|s| !s.is_empty());
113
114            let fm = ParsedFrontMatter {
115                title,
116                tags,
117                raw_yaml: Some(yaml_str.to_string()),
118            };
119            (fm, body)
120        }
121        _ => (
122            ParsedFrontMatter {
123                raw_yaml: Some(yaml_str.to_string()),
124                ..Default::default()
125            },
126            body,
127        ),
128    }
129}
130
131// ---------------------------------------------------------------------------
132// Pattern matching
133// ---------------------------------------------------------------------------
134
135/// Check whether a file path matches any of the given glob patterns.
136///
137/// Matches against the file name only (not the full path), so `*.md`
138/// matches `sub/dir/note.md`.
139pub fn matches_patterns(path: &Path, patterns: &[String]) -> bool {
140    let file_name = match path.file_name().and_then(|n| n.to_str()) {
141        Some(n) => n,
142        None => return false,
143    };
144
145    for pattern in patterns {
146        if let Ok(p) = glob::Pattern::new(pattern) {
147            if p.matches(file_name) {
148                return true;
149            }
150        }
151    }
152    false
153}
154
155/// Convert a relative path into a stable slash-delimited string across platforms.
156fn relative_path_string(path: &Path) -> String {
157    path.iter()
158        .map(|part| part.to_string_lossy().into_owned())
159        .collect::<Vec<_>>()
160        .join("/")
161}
162
163// ---------------------------------------------------------------------------
164// Shared ingest pipeline
165// ---------------------------------------------------------------------------
166
167/// Ingest raw text content into the Watchtower pipeline.
168///
169/// This is the provider-agnostic code path that both local file reads and
170/// remote content fetches funnel through. It parses front-matter, computes
171/// a content hash, and upserts the content node in the database.
172pub async fn ingest_content(
173    pool: &DbPool,
174    source_id: i64,
175    provider_id: &str,
176    content: &str,
177    force: bool,
178) -> Result<store::UpsertResult, WatchtowerError> {
179    let (fm, body) = parse_front_matter(content);
180
181    let hash = if force {
182        let mut hasher = Sha256::new();
183        hasher.update(content.as_bytes());
184        hasher.update(
185            std::time::SystemTime::now()
186                .duration_since(std::time::UNIX_EPOCH)
187                .unwrap_or_default()
188                .as_nanos()
189                .to_le_bytes(),
190        );
191        format!("{:x}", hasher.finalize())
192    } else {
193        let mut hasher = Sha256::new();
194        hasher.update(content.as_bytes());
195        format!("{:x}", hasher.finalize())
196    };
197
198    let result = store::upsert_content_node(
199        pool,
200        source_id,
201        provider_id,
202        &hash,
203        fm.title.as_deref(),
204        body,
205        fm.raw_yaml.as_deref(),
206        fm.tags.as_deref(),
207    )
208    .await?;
209
210    Ok(result)
211}
212
213/// Ingest a single file from the local filesystem into the Watchtower pipeline.
214///
215/// Convenience wrapper that reads the file then delegates to `ingest_content`.
216pub async fn ingest_file(
217    pool: &DbPool,
218    source_id: i64,
219    base_path: &Path,
220    relative_path: &str,
221    force: bool,
222) -> Result<store::UpsertResult, WatchtowerError> {
223    let full_path = base_path.join(relative_path);
224    let content = tokio::fs::read_to_string(&full_path).await?;
225    ingest_content(pool, source_id, relative_path, &content, force).await
226}
227
228/// Ingest multiple files, collecting results into a summary.
229pub async fn ingest_files(
230    pool: &DbPool,
231    source_id: i64,
232    base_path: &Path,
233    paths: &[String],
234    force: bool,
235) -> IngestSummary {
236    let mut summary = IngestSummary::default();
237
238    for rel_path in paths {
239        match ingest_file(pool, source_id, base_path, rel_path, force).await {
240            Ok(store::UpsertResult::Inserted | store::UpsertResult::Updated) => {
241                summary.ingested += 1;
242            }
243            Ok(store::UpsertResult::Skipped) => {
244                summary.skipped += 1;
245            }
246            Err(e) => {
247                summary.errors.push(format!("{rel_path}: {e}"));
248            }
249        }
250    }
251
252    summary
253}
254
255// ---------------------------------------------------------------------------
256// Cooldown set
257// ---------------------------------------------------------------------------
258
259/// Tracks recently-written paths to prevent re-ingestion of our own writes.
260struct CooldownSet {
261    entries: HashMap<PathBuf, Instant>,
262    ttl: Duration,
263}
264
265impl CooldownSet {
266    fn new(ttl: Duration) -> Self {
267        Self {
268            entries: HashMap::new(),
269            ttl,
270        }
271    }
272
273    /// Mark a path as recently written (used by loop-back writes and tests).
274    #[allow(dead_code)]
275    fn mark(&mut self, path: PathBuf) {
276        self.entries.insert(path, Instant::now());
277    }
278
279    /// Check if a path is in cooldown (recently written by us).
280    fn is_cooling(&self, path: &Path) -> bool {
281        if let Some(ts) = self.entries.get(path) {
282            ts.elapsed() < self.ttl
283        } else {
284            false
285        }
286    }
287
288    /// Remove expired entries to prevent unbounded growth.
289    fn cleanup(&mut self) {
290        self.entries.retain(|_, ts| ts.elapsed() < self.ttl);
291    }
292}
293
294// ---------------------------------------------------------------------------
295// WatchtowerLoop
296// ---------------------------------------------------------------------------
297
298/// A registered remote source: (db_source_id, provider, file_patterns, poll_interval).
299type RemoteSource = (i64, Box<dyn ContentSourceProvider>, Vec<String>, Duration);
300
301/// The Watchtower content source watcher service.
302///
303/// Watches configured source directories for file changes, debounces events,
304/// and ingests changed files into the database via the shared pipeline.
305pub struct WatchtowerLoop {
306    pool: DbPool,
307    config: ContentSourcesConfig,
308    connector_config: ConnectorConfig,
309    data_dir: PathBuf,
310    debounce_duration: Duration,
311    fallback_scan_interval: Duration,
312    cooldown_ttl: Duration,
313}
314
315impl WatchtowerLoop {
316    /// Create a new WatchtowerLoop.
317    pub fn new(
318        pool: DbPool,
319        config: ContentSourcesConfig,
320        connector_config: ConnectorConfig,
321        data_dir: PathBuf,
322    ) -> Self {
323        Self {
324            pool,
325            config,
326            connector_config,
327            data_dir,
328            debounce_duration: Duration::from_secs(2),
329            fallback_scan_interval: Duration::from_secs(300), // 5 minutes
330            cooldown_ttl: Duration::from_secs(5),
331        }
332    }
333
334    /// Run the watchtower loop until the cancellation token is triggered.
335    ///
336    /// Registers both local filesystem and remote sources, then runs:
337    /// - `notify` watcher + fallback polling for local sources
338    /// - interval-based polling for remote sources (e.g. Google Drive)
339    pub async fn run(&self, cancel: CancellationToken) {
340        // Split config into local (watchable) and remote (pollable) sources.
341        // Uses `is_enabled()` which respects both `enabled` and legacy `watch`.
342        let local_sources: Vec<_> = self
343            .config
344            .sources
345            .iter()
346            .filter(|s| s.source_type == "local_fs" && s.is_enabled() && s.path.is_some())
347            .collect();
348
349        let remote_sources: Vec<_> = self
350            .config
351            .sources
352            .iter()
353            .filter(|s| s.source_type == "google_drive" && s.is_enabled() && s.folder_id.is_some())
354            .collect();
355
356        if local_sources.is_empty() && remote_sources.is_empty() {
357            tracing::info!("Watchtower: no watch sources configured, exiting");
358            return;
359        }
360
361        // Register local source contexts in DB.
362        let mut source_map: Vec<(i64, PathBuf, Vec<String>)> = Vec::new();
363        for src in &local_sources {
364            let path_str = src.path.as_deref().unwrap();
365            let expanded = PathBuf::from(crate::storage::expand_tilde(path_str));
366
367            let config_json = serde_json::json!({
368                "path": path_str,
369                "file_patterns": src.file_patterns,
370                "loop_back_enabled": src.loop_back_enabled,
371            })
372            .to_string();
373
374            match store::ensure_local_fs_source(&self.pool, path_str, &config_json).await {
375                Ok(source_id) => {
376                    source_map.push((source_id, expanded, src.file_patterns.clone()));
377                }
378                Err(e) => {
379                    tracing::error!(path = path_str, error = %e, "Failed to register source context");
380                }
381            }
382        }
383
384        // Register remote source contexts and build provider instances.
385        let mut remote_map: Vec<RemoteSource> = Vec::new();
386        for src in &remote_sources {
387            let folder_id = src.folder_id.as_deref().unwrap();
388            let config_json = serde_json::json!({
389                "folder_id": folder_id,
390                "file_patterns": src.file_patterns,
391                "service_account_key": src.service_account_key,
392                "connection_id": src.connection_id,
393            })
394            .to_string();
395
396            match store::ensure_google_drive_source(&self.pool, folder_id, &config_json).await {
397                Ok(source_id) => {
398                    let interval = Duration::from_secs(src.poll_interval_seconds.unwrap_or(300));
399
400                    // connection_id takes precedence over service_account_key.
401                    let provider: Box<dyn ContentSourceProvider> =
402                        if let Some(connection_id) = src.connection_id {
403                            match self.build_connection_provider(folder_id, connection_id) {
404                                Ok(p) => Box::new(p),
405                                Err(reason) => {
406                                    tracing::warn!(
407                                        folder_id,
408                                        connection_id,
409                                        reason = %reason,
410                                        "Skipping connection-based source"
411                                    );
412                                    continue;
413                                }
414                            }
415                        } else if src.service_account_key.is_some() {
416                            let key_path = src.service_account_key.clone().unwrap_or_default();
417                            Box::new(crate::source::google_drive::GoogleDriveProvider::new(
418                                folder_id.to_string(),
419                                key_path,
420                            ))
421                        } else {
422                            tracing::warn!(
423                            folder_id,
424                            "Skipping Google Drive source: no connection_id or service_account_key"
425                        );
426                            continue;
427                        };
428
429                    remote_map.push((source_id, provider, src.file_patterns.clone(), interval));
430                }
431                Err(e) => {
432                    tracing::error!(
433                        folder_id = folder_id,
434                        error = %e,
435                        "Failed to register Google Drive source"
436                    );
437                }
438            }
439        }
440
441        if source_map.is_empty() && remote_map.is_empty() {
442            tracing::warn!("Watchtower: no sources registered, exiting");
443            return;
444        }
445
446        // Initial scan of all local directories (all enabled sources, regardless of change_detection).
447        for (source_id, base_path, patterns) in &source_map {
448            let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
449            match self.scan_directory(*source_id, base_path, patterns).await {
450                Ok(_) => {
451                    let _ =
452                        store::update_source_status(&self.pool, *source_id, "active", None).await;
453                }
454                Err(e) => {
455                    tracing::error!(
456                        path = %base_path.display(),
457                        error = %e,
458                        "Initial scan failed"
459                    );
460                    let _ = store::update_source_status(
461                        &self.pool,
462                        *source_id,
463                        "error",
464                        Some(&e.to_string()),
465                    )
466                    .await;
467                }
468            }
469        }
470
471        // Chunk any nodes created during initial local scan.
472        self.chunk_pending().await;
473
474        // Initial poll of remote sources.
475        if !remote_map.is_empty() {
476            self.poll_remote_sources(&remote_map).await;
477            self.chunk_pending().await;
478        }
479
480        // Partition local sources: those with ongoing monitoring vs scan-only.
481        // Scan-only sources (`change_detection = "none"`) already did their initial
482        // scan above and don't participate in the event loop.
483        let watch_source_map: Vec<_> = source_map
484            .iter()
485            .zip(local_sources.iter())
486            .filter(|(_, src)| !src.is_scan_only())
487            .map(|(entry, _)| entry.clone())
488            .collect();
489
490        // Further split: sources that need notify vs poll-only.
491        let notify_source_map: Vec<_> = source_map
492            .iter()
493            .zip(local_sources.iter())
494            .filter(|(_, src)| src.effective_change_detection() == "auto")
495            .map(|(entry, _)| entry.clone())
496            .collect();
497
498        // If no sources need ongoing monitoring, only run remote polling.
499        if watch_source_map.is_empty() {
500            if remote_map.is_empty() {
501                tracing::info!(
502                    "Watchtower: all local sources are scan-only and no remote sources, exiting"
503                );
504                return;
505            }
506            self.remote_only_loop(&remote_map, cancel).await;
507            return;
508        }
509
510        // Bridge notify's sync callback to an async-friendly tokio channel.
511        let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(256);
512
513        let handler = move |result: DebounceEventResult| {
514            let _ = async_tx.blocking_send(result);
515        };
516
517        let debouncer_result = new_debouncer(self.debounce_duration, None, handler);
518        let mut debouncer: Debouncer<notify::RecommendedWatcher, RecommendedCache> =
519            match debouncer_result {
520                Ok(d) => d,
521                Err(e) => {
522                    tracing::error!(error = %e, "Failed to create filesystem watcher, falling back to polling");
523                    self.polling_loop(&watch_source_map, cancel).await;
524                    return;
525                }
526            };
527
528        // Register directories with the notify watcher (only "auto" sources, not poll-only).
529        for (_, base_path, _) in &notify_source_map {
530            if let Err(e) = debouncer.watch(base_path, RecursiveMode::Recursive) {
531                tracing::error!(
532                    path = %base_path.display(),
533                    error = %e,
534                    "Failed to watch directory"
535                );
536            }
537        }
538
539        tracing::info!(
540            local_sources = source_map.len(),
541            watching = notify_source_map.len(),
542            polling = watch_source_map.len() - notify_source_map.len(),
543            remote_sources = remote_map.len(),
544            "Watchtower watching for changes"
545        );
546
547        let cooldown = Mutex::new(CooldownSet::new(self.cooldown_ttl));
548
549        // Main event loop.
550        let mut fallback_timer = tokio::time::interval(self.fallback_scan_interval);
551        fallback_timer.tick().await; // Consume the immediate first tick.
552
553        // Remote poll interval (use minimum configured or fallback default).
554        let remote_interval = remote_map
555            .iter()
556            .map(|(_, _, _, d)| *d)
557            .min()
558            .unwrap_or(self.fallback_scan_interval);
559        let mut remote_timer = tokio::time::interval(remote_interval);
560        remote_timer.tick().await; // Consume the immediate first tick.
561
562        loop {
563            tokio::select! {
564                () = cancel.cancelled() => {
565                    tracing::info!("Watchtower: cancellation received, shutting down");
566                    break;
567                }
568                _ = fallback_timer.tick() => {
569                    // Periodic fallback scan for all local sources with ongoing monitoring
570                    // (both "auto" and "poll" change_detection modes).
571                    for (source_id, base_path, patterns) in &watch_source_map {
572                        if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
573                            tracing::warn!(
574                                path = %base_path.display(),
575                                error = %e,
576                                "Fallback scan failed"
577                            );
578                        }
579                    }
580                    if let Ok(mut cd) = cooldown.lock() {
581                        cd.cleanup();
582                    }
583                    self.chunk_pending().await;
584                }
585                _ = remote_timer.tick(), if !remote_map.is_empty() => {
586                    self.poll_remote_sources(&remote_map).await;
587                    self.chunk_pending().await;
588                }
589                result = async_rx.recv() => {
590                    match result {
591                        Some(Ok(events)) => {
592                            for event in events {
593                                for path in &event.paths {
594                                    self.handle_event(path, &source_map, &cooldown).await;
595                                }
596                            }
597                            self.chunk_pending().await;
598                        }
599                        Some(Err(errs)) => {
600                            for e in errs {
601                                tracing::warn!(error = %e, "Watcher error");
602                            }
603                        }
604                        None => {
605                            tracing::warn!("Watcher event channel closed");
606                            break;
607                        }
608                    }
609                }
610            }
611        }
612
613        // Drop the debouncer to stop watching.
614        drop(debouncer);
615        tracing::info!("Watchtower shut down");
616    }
617
618    /// Handle a single filesystem event for a changed path.
619    async fn handle_event(
620        &self,
621        path: &Path,
622        source_map: &[(i64, PathBuf, Vec<String>)],
623        cooldown: &Mutex<CooldownSet>,
624    ) {
625        // Check cooldown.
626        if let Ok(cd) = cooldown.lock() {
627            if cd.is_cooling(path) {
628                tracing::debug!(path = %path.display(), "Skipping cooldown path");
629                return;
630            }
631        }
632
633        // Find matching source.
634        for (source_id, base_path, patterns) in source_map {
635            if path.starts_with(base_path) {
636                // Check pattern match.
637                if !matches_patterns(path, patterns) {
638                    return;
639                }
640
641                // Compute relative path.
642                let rel = match path.strip_prefix(base_path) {
643                    Ok(r) => relative_path_string(r),
644                    Err(_) => return,
645                };
646
647                match ingest_file(&self.pool, *source_id, base_path, &rel, false).await {
648                    Ok(result) => {
649                        tracing::debug!(
650                            path = %rel,
651                            result = ?result,
652                            "Watchtower ingested file"
653                        );
654                    }
655                    Err(e) => {
656                        tracing::warn!(
657                            path = %rel,
658                            error = %e,
659                            "Watchtower ingest failed"
660                        );
661                    }
662                }
663                return;
664            }
665        }
666    }
667
668    /// Scan a directory for all matching files and ingest them.
669    async fn scan_directory(
670        &self,
671        source_id: i64,
672        base_path: &Path,
673        patterns: &[String],
674    ) -> Result<IngestSummary, WatchtowerError> {
675        let mut rel_paths = Vec::new();
676        Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
677
678        let summary = ingest_files(&self.pool, source_id, base_path, &rel_paths, false).await;
679
680        tracing::debug!(
681            path = %base_path.display(),
682            ingested = summary.ingested,
683            skipped = summary.skipped,
684            errors = summary.errors.len(),
685            "Directory scan complete"
686        );
687
688        // Update sync cursor.
689        let cursor = chrono::Utc::now().to_rfc3339();
690        if let Err(e) = store::update_sync_cursor(&self.pool, source_id, &cursor).await {
691            tracing::warn!(error = %e, "Failed to update sync cursor");
692        }
693
694        Ok(summary)
695    }
696
697    /// Recursively walk a directory, collecting relative paths of matching files.
698    fn walk_directory(
699        base: &Path,
700        current: &Path,
701        patterns: &[String],
702        out: &mut Vec<String>,
703    ) -> Result<(), WatchtowerError> {
704        let entries = std::fs::read_dir(current)?;
705        for entry in entries {
706            let entry = entry?;
707            let file_type = entry.file_type()?;
708            let path = entry.path();
709
710            if file_type.is_dir() {
711                // Skip hidden directories.
712                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
713                    if name.starts_with('.') {
714                        continue;
715                    }
716                }
717                Self::walk_directory(base, &path, patterns, out)?;
718            } else if file_type.is_file() && matches_patterns(&path, patterns) {
719                if let Ok(rel) = path.strip_prefix(base) {
720                    out.push(relative_path_string(rel));
721                }
722            }
723        }
724        Ok(())
725    }
726
727    /// Poll all remote sources for changes, ingest new/updated content.
728    async fn poll_remote_sources(&self, remote_sources: &[RemoteSource]) {
729        for (source_id, provider, patterns, _interval) in remote_sources {
730            let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
731
732            let cursor = match store::get_source_context(&self.pool, *source_id).await {
733                Ok(Some(ctx)) => ctx.sync_cursor,
734                Ok(None) => None,
735                Err(e) => {
736                    tracing::warn!(source_id, error = %e, "Failed to get source context");
737                    continue;
738                }
739            };
740
741            match provider.scan_for_changes(cursor.as_deref(), patterns).await {
742                Ok(files) => {
743                    let mut ingested = 0u32;
744                    let mut skipped = 0u32;
745                    for file in &files {
746                        match provider.read_content(&file.provider_id).await {
747                            Ok(content) => {
748                                match ingest_content(
749                                    &self.pool,
750                                    *source_id,
751                                    &file.provider_id,
752                                    &content,
753                                    false,
754                                )
755                                .await
756                                {
757                                    Ok(
758                                        store::UpsertResult::Inserted
759                                        | store::UpsertResult::Updated,
760                                    ) => {
761                                        ingested += 1;
762                                    }
763                                    Ok(store::UpsertResult::Skipped) => {
764                                        skipped += 1;
765                                    }
766                                    Err(e) => {
767                                        tracing::warn!(
768                                            provider_id = %file.provider_id,
769                                            error = %e,
770                                            "Remote ingest failed"
771                                        );
772                                    }
773                                }
774                            }
775                            Err(e) => {
776                                tracing::warn!(
777                                    provider_id = %file.provider_id,
778                                    error = %e,
779                                    "Failed to read remote content"
780                                );
781                            }
782                        }
783                    }
784
785                    tracing::debug!(
786                        source_type = provider.source_type(),
787                        ingested,
788                        skipped,
789                        total = files.len(),
790                        "Remote poll complete"
791                    );
792
793                    // Update sync cursor and mark active.
794                    let new_cursor = chrono::Utc::now().to_rfc3339();
795                    if let Err(e) =
796                        store::update_sync_cursor(&self.pool, *source_id, &new_cursor).await
797                    {
798                        tracing::warn!(error = %e, "Failed to update remote sync cursor");
799                    }
800                    let _ =
801                        store::update_source_status(&self.pool, *source_id, "active", None).await;
802                }
803                Err(crate::source::SourceError::ConnectionBroken {
804                    connection_id,
805                    ref reason,
806                }) => {
807                    tracing::warn!(
808                        source_id,
809                        connection_id,
810                        reason = %reason,
811                        "Connection broken -- marking source as error"
812                    );
813                    let _ =
814                        store::update_source_status(&self.pool, *source_id, "error", Some(reason))
815                            .await;
816                    let _ =
817                        store::update_connection_status(&self.pool, connection_id, "expired").await;
818                }
819                Err(e) => {
820                    tracing::warn!(
821                        source_type = provider.source_type(),
822                        error = %e,
823                        "Remote scan failed"
824                    );
825                    let _ = store::update_source_status(
826                        &self.pool,
827                        *source_id,
828                        "error",
829                        Some(&e.to_string()),
830                    )
831                    .await;
832                }
833            }
834        }
835    }
836
837    /// Loop for when only remote sources are configured (no local watchers).
838    async fn remote_only_loop(&self, remote_map: &[RemoteSource], cancel: CancellationToken) {
839        let interval_dur = remote_map
840            .iter()
841            .map(|(_, _, _, d)| *d)
842            .min()
843            .unwrap_or(self.fallback_scan_interval);
844        let mut interval = tokio::time::interval(interval_dur);
845        interval.tick().await;
846
847        loop {
848            tokio::select! {
849                () = cancel.cancelled() => {
850                    tracing::info!("Watchtower remote-only loop cancelled");
851                    break;
852                }
853                _ = interval.tick() => {
854                    self.poll_remote_sources(remote_map).await;
855                    self.chunk_pending().await;
856                }
857            }
858        }
859    }
860
861    /// Build a GoogleDriveProvider backed by a linked-account connection.
862    ///
863    /// Loads the connector encryption key and constructs the connector
864    /// from config. Returns an error string if setup fails (caller
865    /// logs and skips the source).
866    fn build_connection_provider(
867        &self,
868        folder_id: &str,
869        connection_id: i64,
870    ) -> Result<crate::source::google_drive::GoogleDriveProvider, String> {
871        let key = crate::source::connector::crypto::ensure_connector_key(&self.data_dir)
872            .map_err(|e| format!("connector key error: {e}"))?;
873
874        let connector = crate::source::connector::google_drive::GoogleDriveConnector::new(
875            &self.connector_config.google_drive,
876        )
877        .map_err(|e| format!("connector config error: {e}"))?;
878
879        Ok(
880            crate::source::google_drive::GoogleDriveProvider::from_connection(
881                folder_id.to_string(),
882                connection_id,
883                self.pool.clone(),
884                key,
885                connector,
886            ),
887        )
888    }
889
890    /// Perform a one-shot full rescan of a single local source.
891    ///
892    /// Used by the reindex API. Sets status to `"syncing"` before the scan
893    /// and `"active"` (or `"error"`) afterward.
894    pub async fn reindex_local_source(
895        pool: &DbPool,
896        source_id: i64,
897        base_path: &Path,
898        patterns: &[String],
899    ) -> Result<IngestSummary, WatchtowerError> {
900        store::update_source_status(pool, source_id, "syncing", None).await?;
901
902        let mut rel_paths = Vec::new();
903        Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
904
905        let summary = ingest_files(pool, source_id, base_path, &rel_paths, true).await;
906
907        let cursor = chrono::Utc::now().to_rfc3339();
908        let _ = store::update_sync_cursor(pool, source_id, &cursor).await;
909
910        if summary.errors.is_empty() {
911            let _ = store::update_source_status(pool, source_id, "active", None).await;
912        } else {
913            let msg = format!("{} errors during reindex", summary.errors.len());
914            let _ = store::update_source_status(pool, source_id, "error", Some(&msg)).await;
915        }
916
917        Ok(summary)
918    }
919
920    /// Process pending content nodes: extract fragments and persist as chunks.
921    async fn chunk_pending(&self) {
922        let chunked = chunker::chunk_pending_nodes(&self.pool, DEFAULT_ACCOUNT_ID, 100).await;
923        if chunked > 0 {
924            tracing::debug!(chunked, "Watchtower chunked pending nodes");
925        }
926    }
927
928    /// Polling-only fallback loop when the notify watcher fails to initialize.
929    async fn polling_loop(
930        &self,
931        source_map: &[(i64, PathBuf, Vec<String>)],
932        cancel: CancellationToken,
933    ) {
934        let mut interval = tokio::time::interval(self.fallback_scan_interval);
935        interval.tick().await; // Consume immediate tick.
936
937        loop {
938            tokio::select! {
939                () = cancel.cancelled() => {
940                    tracing::info!("Watchtower polling loop cancelled");
941                    break;
942                }
943                _ = interval.tick() => {
944                    for (source_id, base_path, patterns) in source_map {
945                        if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
946                            tracing::warn!(
947                                path = %base_path.display(),
948                                error = %e,
949                                "Polling scan failed"
950                            );
951                        }
952                    }
953                }
954            }
955        }
956    }
957}