Skip to main content

tuitbot_core/automation/watchtower/
mod.rs

1//! Watchtower content source watcher and shared ingest pipeline.
2//!
3//! Watches configured local directories for `.md` and `.txt` changes via
4//! the `notify` crate with debouncing, and polls remote content sources
5//! (e.g. Google Drive) on a configurable interval.  Both local filesystem
6//! events and remote polls funnel through `ingest_content()`, ensuring
7//! identical state transitions.
8
9pub mod chunker;
10pub mod embedding_worker;
11pub mod graph_ingest;
12pub mod link_extractor;
13pub mod loopback;
14
15#[cfg(test)]
16mod tests;
17
18#[cfg(test)]
19mod inline_tests;
20
21use std::collections::HashMap;
22use std::path::{Path, PathBuf};
23use std::sync::Mutex;
24use std::time::{Duration, Instant};
25
26use notify_debouncer_full::{
27    new_debouncer, notify::RecursiveMode, DebounceEventResult, Debouncer, RecommendedCache,
28};
29use sha2::{Digest, Sha256};
30use tokio_util::sync::CancellationToken;
31
32use crate::config::{ConnectorConfig, ContentSourcesConfig};
33use crate::source::ContentSourceProvider;
34use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
35use crate::storage::watchtower as store;
36use crate::storage::DbPool;
37
38// ---------------------------------------------------------------------------
39// Error type
40// ---------------------------------------------------------------------------
41
42/// Errors specific to Watchtower operations.
43#[derive(Debug, thiserror::Error)]
44pub enum WatchtowerError {
45    #[error("IO error: {0}")]
46    Io(#[from] std::io::Error),
47
48    #[error("storage error: {0}")]
49    Storage(#[from] crate::error::StorageError),
50
51    #[error("notify error: {0}")]
52    Notify(#[from] notify::Error),
53
54    #[error("config error: {0}")]
55    Config(String),
56
57    #[error("chunker error: {0}")]
58    Chunker(#[from] chunker::ChunkerError),
59}
60
61// ---------------------------------------------------------------------------
62// Ingest result types
63// ---------------------------------------------------------------------------
64
65/// Summary of a batch ingest operation.
66#[derive(Debug, Default)]
67pub struct IngestSummary {
68    pub ingested: u32,
69    pub skipped: u32,
70    pub errors: Vec<String>,
71}
72
73/// Parsed front-matter from a markdown file.
74#[derive(Debug, Default)]
75pub struct ParsedFrontMatter {
76    pub title: Option<String>,
77    pub tags: Option<String>,
78    pub raw_yaml: Option<String>,
79}
80
81// ---------------------------------------------------------------------------
82// Front-matter parsing
83// ---------------------------------------------------------------------------
84
85/// Parse YAML front-matter from file content.
86///
87/// Returns extracted metadata and the body text (content after front-matter).
88pub fn parse_front_matter(content: &str) -> (ParsedFrontMatter, &str) {
89    let (yaml_str, body) = loopback::split_front_matter(content);
90
91    let yaml_str = match yaml_str {
92        Some(y) => y,
93        None => return (ParsedFrontMatter::default(), content),
94    };
95
96    let parsed: Result<serde_yaml::Value, _> = serde_yaml::from_str(yaml_str);
97    match parsed {
98        Ok(serde_yaml::Value::Mapping(map)) => {
99            let title = map
100                .get(serde_yaml::Value::String("title".to_string()))
101                .and_then(|v| v.as_str())
102                .map(|s| s.to_string());
103
104            let tags = map
105                .get(serde_yaml::Value::String("tags".to_string()))
106                .map(|v| match v {
107                    serde_yaml::Value::Sequence(seq) => seq
108                        .iter()
109                        .filter_map(|item| item.as_str())
110                        .collect::<Vec<_>>()
111                        .join(","),
112                    serde_yaml::Value::String(s) => s.clone(),
113                    _ => String::new(),
114                })
115                .filter(|s| !s.is_empty());
116
117            let fm = ParsedFrontMatter {
118                title,
119                tags,
120                raw_yaml: Some(yaml_str.to_string()),
121            };
122            (fm, body)
123        }
124        _ => (
125            ParsedFrontMatter {
126                raw_yaml: Some(yaml_str.to_string()),
127                ..Default::default()
128            },
129            body,
130        ),
131    }
132}
133
134// ---------------------------------------------------------------------------
135// Pattern matching
136// ---------------------------------------------------------------------------
137
138/// Check whether a file path matches any of the given glob patterns.
139///
140/// Matches against the file name only (not the full path), so `*.md`
141/// matches `sub/dir/note.md`.
142pub fn matches_patterns(path: &Path, patterns: &[String]) -> bool {
143    let file_name = match path.file_name().and_then(|n| n.to_str()) {
144        Some(n) => n,
145        None => return false,
146    };
147
148    for pattern in patterns {
149        if let Ok(p) = glob::Pattern::new(pattern) {
150            if p.matches(file_name) {
151                return true;
152            }
153        }
154    }
155    false
156}
157
158/// Convert a relative path into a stable slash-delimited string across platforms.
159fn relative_path_string(path: &Path) -> String {
160    path.iter()
161        .map(|part| part.to_string_lossy().into_owned())
162        .collect::<Vec<_>>()
163        .join("/")
164}
165
166// ---------------------------------------------------------------------------
167// Shared ingest pipeline
168// ---------------------------------------------------------------------------
169
170/// Ingest raw text content into the Watchtower pipeline.
171///
172/// This is the provider-agnostic code path that both local file reads and
173/// remote content fetches funnel through. It parses front-matter, computes
174/// a content hash, and upserts the content node in the database.
175pub async fn ingest_content(
176    pool: &DbPool,
177    source_id: i64,
178    provider_id: &str,
179    content: &str,
180    force: bool,
181) -> Result<store::UpsertResult, WatchtowerError> {
182    let (fm, body) = parse_front_matter(content);
183
184    let hash = if force {
185        let mut hasher = Sha256::new();
186        hasher.update(content.as_bytes());
187        hasher.update(
188            std::time::SystemTime::now()
189                .duration_since(std::time::UNIX_EPOCH)
190                .unwrap_or_default()
191                .as_nanos()
192                .to_le_bytes(),
193        );
194        format!("{:x}", hasher.finalize())
195    } else {
196        let mut hasher = Sha256::new();
197        hasher.update(content.as_bytes());
198        format!("{:x}", hasher.finalize())
199    };
200
201    let result = store::upsert_content_node(
202        pool,
203        source_id,
204        provider_id,
205        &hash,
206        fm.title.as_deref(),
207        body,
208        fm.raw_yaml.as_deref(),
209        fm.tags.as_deref(),
210    )
211    .await?;
212
213    Ok(result)
214}
215
216/// Ingest a single file from the local filesystem into the Watchtower pipeline.
217///
218/// Convenience wrapper that reads the file then delegates to `ingest_content`.
219pub async fn ingest_file(
220    pool: &DbPool,
221    source_id: i64,
222    base_path: &Path,
223    relative_path: &str,
224    force: bool,
225) -> Result<store::UpsertResult, WatchtowerError> {
226    let full_path = base_path.join(relative_path);
227    let content = tokio::fs::read_to_string(&full_path).await?;
228    ingest_content(pool, source_id, relative_path, &content, force).await
229}
230
231/// Ingest multiple files, collecting results into a summary.
232pub async fn ingest_files(
233    pool: &DbPool,
234    source_id: i64,
235    base_path: &Path,
236    paths: &[String],
237    force: bool,
238) -> IngestSummary {
239    let mut summary = IngestSummary::default();
240
241    for rel_path in paths {
242        match ingest_file(pool, source_id, base_path, rel_path, force).await {
243            Ok(store::UpsertResult::Inserted | store::UpsertResult::Updated) => {
244                summary.ingested += 1;
245            }
246            Ok(store::UpsertResult::Skipped) => {
247                summary.skipped += 1;
248            }
249            Err(e) => {
250                summary.errors.push(format!("{rel_path}: {e}"));
251            }
252        }
253    }
254
255    summary
256}
257
258// ---------------------------------------------------------------------------
259// Cooldown set
260// ---------------------------------------------------------------------------
261
262/// Tracks recently-written paths to prevent re-ingestion of our own writes.
263struct CooldownSet {
264    entries: HashMap<PathBuf, Instant>,
265    ttl: Duration,
266}
267
268impl CooldownSet {
269    fn new(ttl: Duration) -> Self {
270        Self {
271            entries: HashMap::new(),
272            ttl,
273        }
274    }
275
276    /// Mark a path as recently written (used by loop-back writes and tests).
277    #[allow(dead_code)]
278    fn mark(&mut self, path: PathBuf) {
279        self.entries.insert(path, Instant::now());
280    }
281
282    /// Check if a path is in cooldown (recently written by us).
283    fn is_cooling(&self, path: &Path) -> bool {
284        if let Some(ts) = self.entries.get(path) {
285            ts.elapsed() < self.ttl
286        } else {
287            false
288        }
289    }
290
291    /// Remove expired entries to prevent unbounded growth.
292    fn cleanup(&mut self) {
293        self.entries.retain(|_, ts| ts.elapsed() < self.ttl);
294    }
295}
296
297// ---------------------------------------------------------------------------
298// WatchtowerLoop
299// ---------------------------------------------------------------------------
300
301/// A registered remote source: (db_source_id, provider, file_patterns, poll_interval).
302type RemoteSource = (i64, Box<dyn ContentSourceProvider>, Vec<String>, Duration);
303
304/// The Watchtower content source watcher service.
305///
306/// Watches configured source directories for file changes, debounces events,
307/// and ingests changed files into the database via the shared pipeline.
308pub struct WatchtowerLoop {
309    pool: DbPool,
310    config: ContentSourcesConfig,
311    connector_config: ConnectorConfig,
312    data_dir: PathBuf,
313    debounce_duration: Duration,
314    fallback_scan_interval: Duration,
315    cooldown_ttl: Duration,
316}
317
318impl WatchtowerLoop {
319    /// Create a new WatchtowerLoop.
320    pub fn new(
321        pool: DbPool,
322        config: ContentSourcesConfig,
323        connector_config: ConnectorConfig,
324        data_dir: PathBuf,
325    ) -> Self {
326        Self {
327            pool,
328            config,
329            connector_config,
330            data_dir,
331            debounce_duration: Duration::from_secs(2),
332            fallback_scan_interval: Duration::from_secs(300), // 5 minutes
333            cooldown_ttl: Duration::from_secs(5),
334        }
335    }
336
337    /// Run the watchtower loop until the cancellation token is triggered.
338    ///
339    /// Registers both local filesystem and remote sources, then runs:
340    /// - `notify` watcher + fallback polling for local sources
341    /// - interval-based polling for remote sources (e.g. Google Drive)
342    pub async fn run(&self, cancel: CancellationToken) {
343        // Split config into local (watchable) and remote (pollable) sources.
344        // Uses `is_enabled()` which respects both `enabled` and legacy `watch`.
345        let local_sources: Vec<_> = self
346            .config
347            .sources
348            .iter()
349            .filter(|s| s.source_type == "local_fs" && s.is_enabled() && s.path.is_some())
350            .collect();
351
352        let remote_sources: Vec<_> = self
353            .config
354            .sources
355            .iter()
356            .filter(|s| s.source_type == "google_drive" && s.is_enabled() && s.folder_id.is_some())
357            .collect();
358
359        if local_sources.is_empty() && remote_sources.is_empty() {
360            tracing::info!("Watchtower: no watch sources configured, exiting");
361            return;
362        }
363
364        // Register local source contexts in DB.
365        let mut source_map: Vec<(i64, PathBuf, Vec<String>)> = Vec::new();
366        for src in &local_sources {
367            let path_str = src.path.as_deref().unwrap();
368            let expanded = PathBuf::from(crate::storage::expand_tilde(path_str));
369
370            let config_json = serde_json::json!({
371                "path": path_str,
372                "file_patterns": src.file_patterns,
373                "loop_back_enabled": src.loop_back_enabled,
374                "analytics_sync_enabled": src.analytics_sync_enabled,
375            })
376            .to_string();
377
378            match store::ensure_local_fs_source(&self.pool, path_str, &config_json).await {
379                Ok(source_id) => {
380                    source_map.push((source_id, expanded, src.file_patterns.clone()));
381                }
382                Err(e) => {
383                    tracing::error!(path = path_str, error = %e, "Failed to register source context");
384                }
385            }
386        }
387
388        // Register remote source contexts and build provider instances.
389        let mut remote_map: Vec<RemoteSource> = Vec::new();
390        for src in &remote_sources {
391            let folder_id = src.folder_id.as_deref().unwrap();
392            let config_json = serde_json::json!({
393                "folder_id": folder_id,
394                "file_patterns": src.file_patterns,
395                "service_account_key": src.service_account_key,
396                "connection_id": src.connection_id,
397            })
398            .to_string();
399
400            match store::ensure_google_drive_source(&self.pool, folder_id, &config_json).await {
401                Ok(source_id) => {
402                    let interval = Duration::from_secs(src.poll_interval_seconds.unwrap_or(300));
403
404                    // connection_id takes precedence over service_account_key.
405                    let provider: Box<dyn ContentSourceProvider> =
406                        if let Some(connection_id) = src.connection_id {
407                            match self.build_connection_provider(folder_id, connection_id) {
408                                Ok(p) => Box::new(p),
409                                Err(reason) => {
410                                    tracing::warn!(
411                                        folder_id,
412                                        connection_id,
413                                        reason = %reason,
414                                        "Skipping connection-based source"
415                                    );
416                                    continue;
417                                }
418                            }
419                        } else if src.service_account_key.is_some() {
420                            let key_path = src.service_account_key.clone().unwrap_or_default();
421                            Box::new(crate::source::google_drive::GoogleDriveProvider::new(
422                                folder_id.to_string(),
423                                key_path,
424                            ))
425                        } else {
426                            tracing::warn!(
427                            folder_id,
428                            "Skipping Google Drive source: no connection_id or service_account_key"
429                        );
430                            continue;
431                        };
432
433                    remote_map.push((source_id, provider, src.file_patterns.clone(), interval));
434                }
435                Err(e) => {
436                    tracing::error!(
437                        folder_id = folder_id,
438                        error = %e,
439                        "Failed to register Google Drive source"
440                    );
441                }
442            }
443        }
444
445        if source_map.is_empty() && remote_map.is_empty() {
446            tracing::warn!("Watchtower: no sources registered, exiting");
447            return;
448        }
449
450        // Initial scan of all local directories (all enabled sources, regardless of change_detection).
451        for (source_id, base_path, patterns) in &source_map {
452            let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
453            match self.scan_directory(*source_id, base_path, patterns).await {
454                Ok(_) => {
455                    let _ =
456                        store::update_source_status(&self.pool, *source_id, "active", None).await;
457                }
458                Err(e) => {
459                    tracing::error!(
460                        path = %base_path.display(),
461                        error = %e,
462                        "Initial scan failed"
463                    );
464                    let _ = store::update_source_status(
465                        &self.pool,
466                        *source_id,
467                        "error",
468                        Some(&e.to_string()),
469                    )
470                    .await;
471                }
472            }
473        }
474
475        // Chunk any nodes created during initial local scan.
476        self.chunk_pending().await;
477
478        // Initial poll of remote sources.
479        if !remote_map.is_empty() {
480            self.poll_remote_sources(&remote_map).await;
481            self.chunk_pending().await;
482        }
483
484        // Partition local sources: those with ongoing monitoring vs scan-only.
485        // Scan-only sources (`change_detection = "none"`) already did their initial
486        // scan above and don't participate in the event loop.
487        let watch_source_map: Vec<_> = source_map
488            .iter()
489            .zip(local_sources.iter())
490            .filter(|(_, src)| !src.is_scan_only())
491            .map(|(entry, _)| entry.clone())
492            .collect();
493
494        // Further split: sources that need notify vs poll-only.
495        let notify_source_map: Vec<_> = source_map
496            .iter()
497            .zip(local_sources.iter())
498            .filter(|(_, src)| src.effective_change_detection() == "auto")
499            .map(|(entry, _)| entry.clone())
500            .collect();
501
502        // If no sources need ongoing monitoring, only run remote polling.
503        if watch_source_map.is_empty() {
504            if remote_map.is_empty() {
505                tracing::info!(
506                    "Watchtower: all local sources are scan-only and no remote sources, exiting"
507                );
508                return;
509            }
510            self.remote_only_loop(&remote_map, cancel).await;
511            return;
512        }
513
514        // Bridge notify's sync callback to an async-friendly tokio channel.
515        let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(256);
516
517        let handler = move |result: DebounceEventResult| {
518            let _ = async_tx.blocking_send(result);
519        };
520
521        let debouncer_result = new_debouncer(self.debounce_duration, None, handler);
522        let mut debouncer: Debouncer<notify::RecommendedWatcher, RecommendedCache> =
523            match debouncer_result {
524                Ok(d) => d,
525                Err(e) => {
526                    tracing::error!(error = %e, "Failed to create filesystem watcher, falling back to polling");
527                    self.polling_loop(&watch_source_map, cancel).await;
528                    return;
529                }
530            };
531
532        // Register directories with the notify watcher (only "auto" sources, not poll-only).
533        for (_, base_path, _) in &notify_source_map {
534            if let Err(e) = debouncer.watch(base_path, RecursiveMode::Recursive) {
535                tracing::error!(
536                    path = %base_path.display(),
537                    error = %e,
538                    "Failed to watch directory"
539                );
540            }
541        }
542
543        tracing::info!(
544            local_sources = source_map.len(),
545            watching = notify_source_map.len(),
546            polling = watch_source_map.len() - notify_source_map.len(),
547            remote_sources = remote_map.len(),
548            "Watchtower watching for changes"
549        );
550
551        let cooldown = Mutex::new(CooldownSet::new(self.cooldown_ttl));
552
553        // Main event loop.
554        let mut fallback_timer = tokio::time::interval(self.fallback_scan_interval);
555        fallback_timer.tick().await; // Consume the immediate first tick.
556
557        // Remote poll interval (use minimum configured or fallback default).
558        let remote_interval = remote_map
559            .iter()
560            .map(|(_, _, _, d)| *d)
561            .min()
562            .unwrap_or(self.fallback_scan_interval);
563        let mut remote_timer = tokio::time::interval(remote_interval);
564        remote_timer.tick().await; // Consume the immediate first tick.
565
566        loop {
567            tokio::select! {
568                () = cancel.cancelled() => {
569                    tracing::info!("Watchtower: cancellation received, shutting down");
570                    break;
571                }
572                _ = fallback_timer.tick() => {
573                    // Periodic fallback scan for all local sources with ongoing monitoring
574                    // (both "auto" and "poll" change_detection modes).
575                    for (source_id, base_path, patterns) in &watch_source_map {
576                        if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
577                            tracing::warn!(
578                                path = %base_path.display(),
579                                error = %e,
580                                "Fallback scan failed"
581                            );
582                        }
583                    }
584                    if let Ok(mut cd) = cooldown.lock() {
585                        cd.cleanup();
586                    }
587                    self.chunk_pending().await;
588                }
589                _ = remote_timer.tick(), if !remote_map.is_empty() => {
590                    self.poll_remote_sources(&remote_map).await;
591                    self.chunk_pending().await;
592                }
593                result = async_rx.recv() => {
594                    match result {
595                        Some(Ok(events)) => {
596                            for event in events {
597                                for path in &event.paths {
598                                    self.handle_event(path, &source_map, &cooldown).await;
599                                }
600                            }
601                            self.chunk_pending().await;
602                        }
603                        Some(Err(errs)) => {
604                            for e in errs {
605                                tracing::warn!(error = %e, "Watcher error");
606                            }
607                        }
608                        None => {
609                            tracing::warn!("Watcher event channel closed");
610                            break;
611                        }
612                    }
613                }
614            }
615        }
616
617        // Drop the debouncer to stop watching.
618        drop(debouncer);
619        tracing::info!("Watchtower shut down");
620    }
621
622    /// Handle a single filesystem event for a changed path.
623    async fn handle_event(
624        &self,
625        path: &Path,
626        source_map: &[(i64, PathBuf, Vec<String>)],
627        cooldown: &Mutex<CooldownSet>,
628    ) {
629        // Check cooldown.
630        if let Ok(cd) = cooldown.lock() {
631            if cd.is_cooling(path) {
632                tracing::debug!(path = %path.display(), "Skipping cooldown path");
633                return;
634            }
635        }
636
637        // Find matching source.
638        for (source_id, base_path, patterns) in source_map {
639            if path.starts_with(base_path) {
640                // Check pattern match.
641                if !matches_patterns(path, patterns) {
642                    return;
643                }
644
645                // Compute relative path.
646                let rel = match path.strip_prefix(base_path) {
647                    Ok(r) => relative_path_string(r),
648                    Err(_) => return,
649                };
650
651                match ingest_file(&self.pool, *source_id, base_path, &rel, false).await {
652                    Ok(result) => {
653                        tracing::debug!(
654                            path = %rel,
655                            result = ?result,
656                            "Watchtower ingested file"
657                        );
658                    }
659                    Err(e) => {
660                        tracing::warn!(
661                            path = %rel,
662                            error = %e,
663                            "Watchtower ingest failed"
664                        );
665                    }
666                }
667                return;
668            }
669        }
670    }
671
672    /// Scan a directory for all matching files and ingest them.
673    async fn scan_directory(
674        &self,
675        source_id: i64,
676        base_path: &Path,
677        patterns: &[String],
678    ) -> Result<IngestSummary, WatchtowerError> {
679        let mut rel_paths = Vec::new();
680        Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
681
682        let summary = ingest_files(&self.pool, source_id, base_path, &rel_paths, false).await;
683
684        tracing::debug!(
685            path = %base_path.display(),
686            ingested = summary.ingested,
687            skipped = summary.skipped,
688            errors = summary.errors.len(),
689            "Directory scan complete"
690        );
691
692        // Update sync cursor.
693        let cursor = chrono::Utc::now().to_rfc3339();
694        if let Err(e) = store::update_sync_cursor(&self.pool, source_id, &cursor).await {
695            tracing::warn!(error = %e, "Failed to update sync cursor");
696        }
697
698        Ok(summary)
699    }
700
701    /// Recursively walk a directory, collecting relative paths of matching files.
702    fn walk_directory(
703        base: &Path,
704        current: &Path,
705        patterns: &[String],
706        out: &mut Vec<String>,
707    ) -> Result<(), WatchtowerError> {
708        let entries = std::fs::read_dir(current)?;
709        for entry in entries {
710            let entry = entry?;
711            let file_type = entry.file_type()?;
712            let path = entry.path();
713
714            if file_type.is_dir() {
715                // Skip hidden directories.
716                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
717                    if name.starts_with('.') {
718                        continue;
719                    }
720                }
721                Self::walk_directory(base, &path, patterns, out)?;
722            } else if file_type.is_file() && matches_patterns(&path, patterns) {
723                if let Ok(rel) = path.strip_prefix(base) {
724                    out.push(relative_path_string(rel));
725                }
726            }
727        }
728        Ok(())
729    }
730
731    /// Poll all remote sources for changes, ingest new/updated content.
732    async fn poll_remote_sources(&self, remote_sources: &[RemoteSource]) {
733        for (source_id, provider, patterns, _interval) in remote_sources {
734            let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
735
736            let cursor = match store::get_source_context(&self.pool, *source_id).await {
737                Ok(Some(ctx)) => ctx.sync_cursor,
738                Ok(None) => None,
739                Err(e) => {
740                    tracing::warn!(source_id, error = %e, "Failed to get source context");
741                    continue;
742                }
743            };
744
745            match provider.scan_for_changes(cursor.as_deref(), patterns).await {
746                Ok(files) => {
747                    let mut ingested = 0u32;
748                    let mut skipped = 0u32;
749                    for file in &files {
750                        match provider.read_content(&file.provider_id).await {
751                            Ok(content) => {
752                                match ingest_content(
753                                    &self.pool,
754                                    *source_id,
755                                    &file.provider_id,
756                                    &content,
757                                    false,
758                                )
759                                .await
760                                {
761                                    Ok(
762                                        store::UpsertResult::Inserted
763                                        | store::UpsertResult::Updated,
764                                    ) => {
765                                        ingested += 1;
766                                    }
767                                    Ok(store::UpsertResult::Skipped) => {
768                                        skipped += 1;
769                                    }
770                                    Err(e) => {
771                                        tracing::warn!(
772                                            provider_id = %file.provider_id,
773                                            error = %e,
774                                            "Remote ingest failed"
775                                        );
776                                    }
777                                }
778                            }
779                            Err(e) => {
780                                tracing::warn!(
781                                    provider_id = %file.provider_id,
782                                    error = %e,
783                                    "Failed to read remote content"
784                                );
785                            }
786                        }
787                    }
788
789                    tracing::debug!(
790                        source_type = provider.source_type(),
791                        ingested,
792                        skipped,
793                        total = files.len(),
794                        "Remote poll complete"
795                    );
796
797                    // Update sync cursor and mark active.
798                    let new_cursor = chrono::Utc::now().to_rfc3339();
799                    if let Err(e) =
800                        store::update_sync_cursor(&self.pool, *source_id, &new_cursor).await
801                    {
802                        tracing::warn!(error = %e, "Failed to update remote sync cursor");
803                    }
804                    let _ =
805                        store::update_source_status(&self.pool, *source_id, "active", None).await;
806                }
807                Err(crate::source::SourceError::ConnectionBroken {
808                    connection_id,
809                    ref reason,
810                }) => {
811                    tracing::warn!(
812                        source_id,
813                        connection_id,
814                        reason = %reason,
815                        "Connection broken -- marking source as error"
816                    );
817                    let _ =
818                        store::update_source_status(&self.pool, *source_id, "error", Some(reason))
819                            .await;
820                    let _ =
821                        store::update_connection_status(&self.pool, connection_id, "expired").await;
822                }
823                Err(e) => {
824                    tracing::warn!(
825                        source_type = provider.source_type(),
826                        error = %e,
827                        "Remote scan failed"
828                    );
829                    let _ = store::update_source_status(
830                        &self.pool,
831                        *source_id,
832                        "error",
833                        Some(&e.to_string()),
834                    )
835                    .await;
836                }
837            }
838        }
839    }
840
841    /// Loop for when only remote sources are configured (no local watchers).
842    async fn remote_only_loop(&self, remote_map: &[RemoteSource], cancel: CancellationToken) {
843        let interval_dur = remote_map
844            .iter()
845            .map(|(_, _, _, d)| *d)
846            .min()
847            .unwrap_or(self.fallback_scan_interval);
848        let mut interval = tokio::time::interval(interval_dur);
849        interval.tick().await;
850
851        loop {
852            tokio::select! {
853                () = cancel.cancelled() => {
854                    tracing::info!("Watchtower remote-only loop cancelled");
855                    break;
856                }
857                _ = interval.tick() => {
858                    self.poll_remote_sources(remote_map).await;
859                    self.chunk_pending().await;
860                }
861            }
862        }
863    }
864
865    /// Build a GoogleDriveProvider backed by a linked-account connection.
866    ///
867    /// Loads the connector encryption key and constructs the connector
868    /// from config. Returns an error string if setup fails (caller
869    /// logs and skips the source).
870    fn build_connection_provider(
871        &self,
872        folder_id: &str,
873        connection_id: i64,
874    ) -> Result<crate::source::google_drive::GoogleDriveProvider, String> {
875        let key = crate::source::connector::crypto::ensure_connector_key(&self.data_dir)
876            .map_err(|e| format!("connector key error: {e}"))?;
877
878        let connector = crate::source::connector::google_drive::GoogleDriveConnector::new(
879            &self.connector_config.google_drive,
880        )
881        .map_err(|e| format!("connector config error: {e}"))?;
882
883        Ok(
884            crate::source::google_drive::GoogleDriveProvider::from_connection(
885                folder_id.to_string(),
886                connection_id,
887                self.pool.clone(),
888                key,
889                connector,
890            ),
891        )
892    }
893
894    /// Perform a one-shot full rescan of a single local source.
895    ///
896    /// Used by the reindex API. Sets status to `"syncing"` before the scan
897    /// and `"active"` (or `"error"`) afterward.
898    pub async fn reindex_local_source(
899        pool: &DbPool,
900        source_id: i64,
901        base_path: &Path,
902        patterns: &[String],
903    ) -> Result<IngestSummary, WatchtowerError> {
904        store::update_source_status(pool, source_id, "syncing", None).await?;
905
906        let mut rel_paths = Vec::new();
907        Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
908
909        let summary = ingest_files(pool, source_id, base_path, &rel_paths, true).await;
910
911        let cursor = chrono::Utc::now().to_rfc3339();
912        let _ = store::update_sync_cursor(pool, source_id, &cursor).await;
913
914        if summary.errors.is_empty() {
915            let _ = store::update_source_status(pool, source_id, "active", None).await;
916        } else {
917            let msg = format!("{} errors during reindex", summary.errors.len());
918            let _ = store::update_source_status(pool, source_id, "error", Some(&msg)).await;
919        }
920
921        Ok(summary)
922    }
923
924    /// Process pending content nodes: extract fragments and persist as chunks.
925    async fn chunk_pending(&self) {
926        let chunked = chunker::chunk_pending_nodes(&self.pool, DEFAULT_ACCOUNT_ID, 100).await;
927        if chunked > 0 {
928            tracing::debug!(chunked, "Watchtower chunked pending nodes");
929        }
930    }
931
932    /// Polling-only fallback loop when the notify watcher fails to initialize.
933    async fn polling_loop(
934        &self,
935        source_map: &[(i64, PathBuf, Vec<String>)],
936        cancel: CancellationToken,
937    ) {
938        let mut interval = tokio::time::interval(self.fallback_scan_interval);
939        interval.tick().await; // Consume immediate tick.
940
941        loop {
942            tokio::select! {
943                () = cancel.cancelled() => {
944                    tracing::info!("Watchtower polling loop cancelled");
945                    break;
946                }
947                _ = interval.tick() => {
948                    for (source_id, base_path, patterns) in source_map {
949                        if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
950                            tracing::warn!(
951                                path = %base_path.display(),
952                                error = %e,
953                                "Polling scan failed"
954                            );
955                        }
956                    }
957                }
958            }
959        }
960    }
961}