Skip to main content

tuitbot_core/automation/watchtower/
mod.rs

1//! Watchtower content source watcher and shared ingest pipeline.
2//!
3//! Watches configured local directories for `.md` and `.txt` changes via
4//! the `notify` crate with debouncing, and polls remote content sources
5//! (e.g. Google Drive) on a configurable interval.  Both local filesystem
6//! events and remote polls funnel through `ingest_content()`, ensuring
7//! identical state transitions.
8
9pub mod loopback;
10
11#[cfg(test)]
12mod tests;
13
14use std::collections::HashMap;
15use std::path::{Path, PathBuf};
16use std::sync::Mutex;
17use std::time::{Duration, Instant};
18
19use notify_debouncer_full::{
20    new_debouncer, notify::RecursiveMode, DebounceEventResult, Debouncer, RecommendedCache,
21};
22use sha2::{Digest, Sha256};
23use tokio_util::sync::CancellationToken;
24
25use crate::config::{ConnectorConfig, ContentSourcesConfig};
26use crate::source::ContentSourceProvider;
27use crate::storage::watchtower as store;
28use crate::storage::DbPool;
29
30// ---------------------------------------------------------------------------
31// Error type
32// ---------------------------------------------------------------------------
33
34/// Errors specific to Watchtower operations.
35#[derive(Debug, thiserror::Error)]
36pub enum WatchtowerError {
37    #[error("IO error: {0}")]
38    Io(#[from] std::io::Error),
39
40    #[error("storage error: {0}")]
41    Storage(#[from] crate::error::StorageError),
42
43    #[error("notify error: {0}")]
44    Notify(#[from] notify::Error),
45
46    #[error("config error: {0}")]
47    Config(String),
48}
49
50// ---------------------------------------------------------------------------
51// Ingest result types
52// ---------------------------------------------------------------------------
53
54/// Summary of a batch ingest operation.
55#[derive(Debug, Default)]
56pub struct IngestSummary {
57    pub ingested: u32,
58    pub skipped: u32,
59    pub errors: Vec<String>,
60}
61
62/// Parsed front-matter from a markdown file.
63#[derive(Debug, Default)]
64pub struct ParsedFrontMatter {
65    pub title: Option<String>,
66    pub tags: Option<String>,
67    pub raw_yaml: Option<String>,
68}
69
70// ---------------------------------------------------------------------------
71// Front-matter parsing
72// ---------------------------------------------------------------------------
73
74/// Parse YAML front-matter from file content.
75///
76/// Returns extracted metadata and the body text (content after front-matter).
77pub fn parse_front_matter(content: &str) -> (ParsedFrontMatter, &str) {
78    let (yaml_str, body) = loopback::split_front_matter(content);
79
80    let yaml_str = match yaml_str {
81        Some(y) => y,
82        None => return (ParsedFrontMatter::default(), content),
83    };
84
85    let parsed: Result<serde_yaml::Value, _> = serde_yaml::from_str(yaml_str);
86    match parsed {
87        Ok(serde_yaml::Value::Mapping(map)) => {
88            let title = map
89                .get(serde_yaml::Value::String("title".to_string()))
90                .and_then(|v| v.as_str())
91                .map(|s| s.to_string());
92
93            let tags = map
94                .get(serde_yaml::Value::String("tags".to_string()))
95                .map(|v| match v {
96                    serde_yaml::Value::Sequence(seq) => seq
97                        .iter()
98                        .filter_map(|item| item.as_str())
99                        .collect::<Vec<_>>()
100                        .join(","),
101                    serde_yaml::Value::String(s) => s.clone(),
102                    _ => String::new(),
103                })
104                .filter(|s| !s.is_empty());
105
106            let fm = ParsedFrontMatter {
107                title,
108                tags,
109                raw_yaml: Some(yaml_str.to_string()),
110            };
111            (fm, body)
112        }
113        _ => (
114            ParsedFrontMatter {
115                raw_yaml: Some(yaml_str.to_string()),
116                ..Default::default()
117            },
118            body,
119        ),
120    }
121}
122
123// ---------------------------------------------------------------------------
124// Pattern matching
125// ---------------------------------------------------------------------------
126
127/// Check whether a file path matches any of the given glob patterns.
128///
129/// Matches against the file name only (not the full path), so `*.md`
130/// matches `sub/dir/note.md`.
131pub fn matches_patterns(path: &Path, patterns: &[String]) -> bool {
132    let file_name = match path.file_name().and_then(|n| n.to_str()) {
133        Some(n) => n,
134        None => return false,
135    };
136
137    for pattern in patterns {
138        if let Ok(p) = glob::Pattern::new(pattern) {
139            if p.matches(file_name) {
140                return true;
141            }
142        }
143    }
144    false
145}
146
147/// Convert a relative path into a stable slash-delimited string across platforms.
148fn relative_path_string(path: &Path) -> String {
149    path.iter()
150        .map(|part| part.to_string_lossy().into_owned())
151        .collect::<Vec<_>>()
152        .join("/")
153}
154
155// ---------------------------------------------------------------------------
156// Shared ingest pipeline
157// ---------------------------------------------------------------------------
158
159/// Ingest raw text content into the Watchtower pipeline.
160///
161/// This is the provider-agnostic code path that both local file reads and
162/// remote content fetches funnel through. It parses front-matter, computes
163/// a content hash, and upserts the content node in the database.
164pub async fn ingest_content(
165    pool: &DbPool,
166    source_id: i64,
167    provider_id: &str,
168    content: &str,
169    force: bool,
170) -> Result<store::UpsertResult, WatchtowerError> {
171    let (fm, body) = parse_front_matter(content);
172
173    let hash = if force {
174        let mut hasher = Sha256::new();
175        hasher.update(content.as_bytes());
176        hasher.update(
177            std::time::SystemTime::now()
178                .duration_since(std::time::UNIX_EPOCH)
179                .unwrap_or_default()
180                .as_nanos()
181                .to_le_bytes(),
182        );
183        format!("{:x}", hasher.finalize())
184    } else {
185        let mut hasher = Sha256::new();
186        hasher.update(content.as_bytes());
187        format!("{:x}", hasher.finalize())
188    };
189
190    let result = store::upsert_content_node(
191        pool,
192        source_id,
193        provider_id,
194        &hash,
195        fm.title.as_deref(),
196        body,
197        fm.raw_yaml.as_deref(),
198        fm.tags.as_deref(),
199    )
200    .await?;
201
202    Ok(result)
203}
204
205/// Ingest a single file from the local filesystem into the Watchtower pipeline.
206///
207/// Convenience wrapper that reads the file then delegates to `ingest_content`.
208pub async fn ingest_file(
209    pool: &DbPool,
210    source_id: i64,
211    base_path: &Path,
212    relative_path: &str,
213    force: bool,
214) -> Result<store::UpsertResult, WatchtowerError> {
215    let full_path = base_path.join(relative_path);
216    let content = tokio::fs::read_to_string(&full_path).await?;
217    ingest_content(pool, source_id, relative_path, &content, force).await
218}
219
220/// Ingest multiple files, collecting results into a summary.
221pub async fn ingest_files(
222    pool: &DbPool,
223    source_id: i64,
224    base_path: &Path,
225    paths: &[String],
226    force: bool,
227) -> IngestSummary {
228    let mut summary = IngestSummary::default();
229
230    for rel_path in paths {
231        match ingest_file(pool, source_id, base_path, rel_path, force).await {
232            Ok(store::UpsertResult::Inserted | store::UpsertResult::Updated) => {
233                summary.ingested += 1;
234            }
235            Ok(store::UpsertResult::Skipped) => {
236                summary.skipped += 1;
237            }
238            Err(e) => {
239                summary.errors.push(format!("{rel_path}: {e}"));
240            }
241        }
242    }
243
244    summary
245}
246
247// ---------------------------------------------------------------------------
248// Cooldown set
249// ---------------------------------------------------------------------------
250
251/// Tracks recently-written paths to prevent re-ingestion of our own writes.
252struct CooldownSet {
253    entries: HashMap<PathBuf, Instant>,
254    ttl: Duration,
255}
256
257impl CooldownSet {
258    fn new(ttl: Duration) -> Self {
259        Self {
260            entries: HashMap::new(),
261            ttl,
262        }
263    }
264
265    /// Mark a path as recently written (used by loop-back writes and tests).
266    #[allow(dead_code)]
267    fn mark(&mut self, path: PathBuf) {
268        self.entries.insert(path, Instant::now());
269    }
270
271    /// Check if a path is in cooldown (recently written by us).
272    fn is_cooling(&self, path: &Path) -> bool {
273        if let Some(ts) = self.entries.get(path) {
274            ts.elapsed() < self.ttl
275        } else {
276            false
277        }
278    }
279
280    /// Remove expired entries to prevent unbounded growth.
281    fn cleanup(&mut self) {
282        self.entries.retain(|_, ts| ts.elapsed() < self.ttl);
283    }
284}
285
286// ---------------------------------------------------------------------------
287// WatchtowerLoop
288// ---------------------------------------------------------------------------
289
290/// A registered remote source: (db_source_id, provider, file_patterns, poll_interval).
291type RemoteSource = (i64, Box<dyn ContentSourceProvider>, Vec<String>, Duration);
292
293/// The Watchtower content source watcher service.
294///
295/// Watches configured source directories for file changes, debounces events,
296/// and ingests changed files into the database via the shared pipeline.
297pub struct WatchtowerLoop {
298    pool: DbPool,
299    config: ContentSourcesConfig,
300    connector_config: ConnectorConfig,
301    data_dir: PathBuf,
302    debounce_duration: Duration,
303    fallback_scan_interval: Duration,
304    cooldown_ttl: Duration,
305}
306
307impl WatchtowerLoop {
308    /// Create a new WatchtowerLoop.
309    pub fn new(
310        pool: DbPool,
311        config: ContentSourcesConfig,
312        connector_config: ConnectorConfig,
313        data_dir: PathBuf,
314    ) -> Self {
315        Self {
316            pool,
317            config,
318            connector_config,
319            data_dir,
320            debounce_duration: Duration::from_secs(2),
321            fallback_scan_interval: Duration::from_secs(300), // 5 minutes
322            cooldown_ttl: Duration::from_secs(5),
323        }
324    }
325
326    /// Run the watchtower loop until the cancellation token is triggered.
327    ///
328    /// Registers both local filesystem and remote sources, then runs:
329    /// - `notify` watcher + fallback polling for local sources
330    /// - interval-based polling for remote sources (e.g. Google Drive)
331    pub async fn run(&self, cancel: CancellationToken) {
332        // Split config into local (watchable) and remote (pollable) sources.
333        let local_sources: Vec<_> = self
334            .config
335            .sources
336            .iter()
337            .filter(|s| s.source_type == "local_fs" && s.watch && s.path.is_some())
338            .collect();
339
340        let remote_sources: Vec<_> = self
341            .config
342            .sources
343            .iter()
344            .filter(|s| s.source_type == "google_drive" && s.folder_id.is_some())
345            .collect();
346
347        if local_sources.is_empty() && remote_sources.is_empty() {
348            tracing::info!("Watchtower: no watch sources configured, exiting");
349            return;
350        }
351
352        // Register local source contexts in DB.
353        let mut source_map: Vec<(i64, PathBuf, Vec<String>)> = Vec::new();
354        for src in &local_sources {
355            let path_str = src.path.as_deref().unwrap();
356            let expanded = PathBuf::from(crate::storage::expand_tilde(path_str));
357
358            let config_json = serde_json::json!({
359                "path": path_str,
360                "file_patterns": src.file_patterns,
361                "loop_back_enabled": src.loop_back_enabled,
362            })
363            .to_string();
364
365            match store::ensure_local_fs_source(&self.pool, path_str, &config_json).await {
366                Ok(source_id) => {
367                    source_map.push((source_id, expanded, src.file_patterns.clone()));
368                }
369                Err(e) => {
370                    tracing::error!(path = path_str, error = %e, "Failed to register source context");
371                }
372            }
373        }
374
375        // Register remote source contexts and build provider instances.
376        let mut remote_map: Vec<RemoteSource> = Vec::new();
377        for src in &remote_sources {
378            let folder_id = src.folder_id.as_deref().unwrap();
379            let config_json = serde_json::json!({
380                "folder_id": folder_id,
381                "file_patterns": src.file_patterns,
382                "service_account_key": src.service_account_key,
383                "connection_id": src.connection_id,
384            })
385            .to_string();
386
387            match store::ensure_google_drive_source(&self.pool, folder_id, &config_json).await {
388                Ok(source_id) => {
389                    let interval = Duration::from_secs(src.poll_interval_seconds.unwrap_or(300));
390
391                    // connection_id takes precedence over service_account_key.
392                    let provider: Box<dyn ContentSourceProvider> =
393                        if let Some(connection_id) = src.connection_id {
394                            match self.build_connection_provider(folder_id, connection_id) {
395                                Ok(p) => Box::new(p),
396                                Err(reason) => {
397                                    tracing::warn!(
398                                        folder_id,
399                                        connection_id,
400                                        reason = %reason,
401                                        "Skipping connection-based source"
402                                    );
403                                    continue;
404                                }
405                            }
406                        } else if src.service_account_key.is_some() {
407                            let key_path = src.service_account_key.clone().unwrap_or_default();
408                            Box::new(crate::source::google_drive::GoogleDriveProvider::new(
409                                folder_id.to_string(),
410                                key_path,
411                            ))
412                        } else {
413                            tracing::warn!(
414                            folder_id,
415                            "Skipping Google Drive source: no connection_id or service_account_key"
416                        );
417                            continue;
418                        };
419
420                    remote_map.push((source_id, provider, src.file_patterns.clone(), interval));
421                }
422                Err(e) => {
423                    tracing::error!(
424                        folder_id = folder_id,
425                        error = %e,
426                        "Failed to register Google Drive source"
427                    );
428                }
429            }
430        }
431
432        if source_map.is_empty() && remote_map.is_empty() {
433            tracing::warn!("Watchtower: no sources registered, exiting");
434            return;
435        }
436
437        // Initial scan of all local directories.
438        for (source_id, base_path, patterns) in &source_map {
439            if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
440                tracing::error!(
441                    path = %base_path.display(),
442                    error = %e,
443                    "Initial scan failed"
444                );
445            }
446        }
447
448        // Initial poll of remote sources.
449        if !remote_map.is_empty() {
450            self.poll_remote_sources(&remote_map).await;
451        }
452
453        // If there are no local sources, only run remote polling.
454        if source_map.is_empty() {
455            self.remote_only_loop(&remote_map, cancel).await;
456            return;
457        }
458
459        // Bridge notify's sync callback to an async-friendly tokio channel.
460        let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(256);
461
462        let handler = move |result: DebounceEventResult| {
463            let _ = async_tx.blocking_send(result);
464        };
465
466        let debouncer_result = new_debouncer(self.debounce_duration, None, handler);
467        let mut debouncer: Debouncer<notify::RecommendedWatcher, RecommendedCache> =
468            match debouncer_result {
469                Ok(d) => d,
470                Err(e) => {
471                    tracing::error!(error = %e, "Failed to create filesystem watcher, falling back to polling");
472                    self.polling_loop(&source_map, cancel).await;
473                    return;
474                }
475            };
476
477        // Register directories with the watcher.
478        for (_, base_path, _) in &source_map {
479            if let Err(e) = debouncer.watch(base_path, RecursiveMode::Recursive) {
480                tracing::error!(
481                    path = %base_path.display(),
482                    error = %e,
483                    "Failed to watch directory"
484                );
485            }
486        }
487
488        tracing::info!(
489            local_sources = source_map.len(),
490            remote_sources = remote_map.len(),
491            "Watchtower watching for changes"
492        );
493
494        let cooldown = Mutex::new(CooldownSet::new(self.cooldown_ttl));
495
496        // Main event loop.
497        let mut fallback_timer = tokio::time::interval(self.fallback_scan_interval);
498        fallback_timer.tick().await; // Consume the immediate first tick.
499
500        // Remote poll interval (use minimum configured or fallback default).
501        let remote_interval = remote_map
502            .iter()
503            .map(|(_, _, _, d)| *d)
504            .min()
505            .unwrap_or(self.fallback_scan_interval);
506        let mut remote_timer = tokio::time::interval(remote_interval);
507        remote_timer.tick().await; // Consume the immediate first tick.
508
509        loop {
510            tokio::select! {
511                () = cancel.cancelled() => {
512                    tracing::info!("Watchtower: cancellation received, shutting down");
513                    break;
514                }
515                _ = fallback_timer.tick() => {
516                    // Periodic fallback scan to catch any missed events.
517                    for (source_id, base_path, patterns) in &source_map {
518                        if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
519                            tracing::warn!(
520                                path = %base_path.display(),
521                                error = %e,
522                                "Fallback scan failed"
523                            );
524                        }
525                    }
526                    if let Ok(mut cd) = cooldown.lock() {
527                        cd.cleanup();
528                    }
529                }
530                _ = remote_timer.tick(), if !remote_map.is_empty() => {
531                    self.poll_remote_sources(&remote_map).await;
532                }
533                result = async_rx.recv() => {
534                    match result {
535                        Some(Ok(events)) => {
536                            for event in events {
537                                for path in &event.paths {
538                                    self.handle_event(path, &source_map, &cooldown).await;
539                                }
540                            }
541                        }
542                        Some(Err(errs)) => {
543                            for e in errs {
544                                tracing::warn!(error = %e, "Watcher error");
545                            }
546                        }
547                        None => {
548                            tracing::warn!("Watcher event channel closed");
549                            break;
550                        }
551                    }
552                }
553            }
554        }
555
556        // Drop the debouncer to stop watching.
557        drop(debouncer);
558        tracing::info!("Watchtower shut down");
559    }
560
561    /// Handle a single filesystem event for a changed path.
562    async fn handle_event(
563        &self,
564        path: &Path,
565        source_map: &[(i64, PathBuf, Vec<String>)],
566        cooldown: &Mutex<CooldownSet>,
567    ) {
568        // Check cooldown.
569        if let Ok(cd) = cooldown.lock() {
570            if cd.is_cooling(path) {
571                tracing::debug!(path = %path.display(), "Skipping cooldown path");
572                return;
573            }
574        }
575
576        // Find matching source.
577        for (source_id, base_path, patterns) in source_map {
578            if path.starts_with(base_path) {
579                // Check pattern match.
580                if !matches_patterns(path, patterns) {
581                    return;
582                }
583
584                // Compute relative path.
585                let rel = match path.strip_prefix(base_path) {
586                    Ok(r) => relative_path_string(r),
587                    Err(_) => return,
588                };
589
590                match ingest_file(&self.pool, *source_id, base_path, &rel, false).await {
591                    Ok(result) => {
592                        tracing::debug!(
593                            path = %rel,
594                            result = ?result,
595                            "Watchtower ingested file"
596                        );
597                    }
598                    Err(e) => {
599                        tracing::warn!(
600                            path = %rel,
601                            error = %e,
602                            "Watchtower ingest failed"
603                        );
604                    }
605                }
606                return;
607            }
608        }
609    }
610
611    /// Scan a directory for all matching files and ingest them.
612    async fn scan_directory(
613        &self,
614        source_id: i64,
615        base_path: &Path,
616        patterns: &[String],
617    ) -> Result<IngestSummary, WatchtowerError> {
618        let mut rel_paths = Vec::new();
619        Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
620
621        let summary = ingest_files(&self.pool, source_id, base_path, &rel_paths, false).await;
622
623        tracing::debug!(
624            path = %base_path.display(),
625            ingested = summary.ingested,
626            skipped = summary.skipped,
627            errors = summary.errors.len(),
628            "Directory scan complete"
629        );
630
631        // Update sync cursor.
632        let cursor = chrono::Utc::now().to_rfc3339();
633        if let Err(e) = store::update_sync_cursor(&self.pool, source_id, &cursor).await {
634            tracing::warn!(error = %e, "Failed to update sync cursor");
635        }
636
637        Ok(summary)
638    }
639
640    /// Recursively walk a directory, collecting relative paths of matching files.
641    fn walk_directory(
642        base: &Path,
643        current: &Path,
644        patterns: &[String],
645        out: &mut Vec<String>,
646    ) -> Result<(), WatchtowerError> {
647        let entries = std::fs::read_dir(current)?;
648        for entry in entries {
649            let entry = entry?;
650            let file_type = entry.file_type()?;
651            let path = entry.path();
652
653            if file_type.is_dir() {
654                // Skip hidden directories.
655                if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
656                    if name.starts_with('.') {
657                        continue;
658                    }
659                }
660                Self::walk_directory(base, &path, patterns, out)?;
661            } else if file_type.is_file() && matches_patterns(&path, patterns) {
662                if let Ok(rel) = path.strip_prefix(base) {
663                    out.push(relative_path_string(rel));
664                }
665            }
666        }
667        Ok(())
668    }
669
670    /// Poll all remote sources for changes, ingest new/updated content.
671    async fn poll_remote_sources(&self, remote_sources: &[RemoteSource]) {
672        for (source_id, provider, patterns, _interval) in remote_sources {
673            let cursor = match store::get_source_context(&self.pool, *source_id).await {
674                Ok(Some(ctx)) => ctx.sync_cursor,
675                Ok(None) => None,
676                Err(e) => {
677                    tracing::warn!(source_id, error = %e, "Failed to get source context");
678                    continue;
679                }
680            };
681
682            match provider.scan_for_changes(cursor.as_deref(), patterns).await {
683                Ok(files) => {
684                    let mut ingested = 0u32;
685                    let mut skipped = 0u32;
686                    for file in &files {
687                        match provider.read_content(&file.provider_id).await {
688                            Ok(content) => {
689                                match ingest_content(
690                                    &self.pool,
691                                    *source_id,
692                                    &file.provider_id,
693                                    &content,
694                                    false,
695                                )
696                                .await
697                                {
698                                    Ok(
699                                        store::UpsertResult::Inserted
700                                        | store::UpsertResult::Updated,
701                                    ) => {
702                                        ingested += 1;
703                                    }
704                                    Ok(store::UpsertResult::Skipped) => {
705                                        skipped += 1;
706                                    }
707                                    Err(e) => {
708                                        tracing::warn!(
709                                            provider_id = %file.provider_id,
710                                            error = %e,
711                                            "Remote ingest failed"
712                                        );
713                                    }
714                                }
715                            }
716                            Err(e) => {
717                                tracing::warn!(
718                                    provider_id = %file.provider_id,
719                                    error = %e,
720                                    "Failed to read remote content"
721                                );
722                            }
723                        }
724                    }
725
726                    tracing::debug!(
727                        source_type = provider.source_type(),
728                        ingested,
729                        skipped,
730                        total = files.len(),
731                        "Remote poll complete"
732                    );
733
734                    // Update sync cursor.
735                    let new_cursor = chrono::Utc::now().to_rfc3339();
736                    if let Err(e) =
737                        store::update_sync_cursor(&self.pool, *source_id, &new_cursor).await
738                    {
739                        tracing::warn!(error = %e, "Failed to update remote sync cursor");
740                    }
741                }
742                Err(crate::source::SourceError::ConnectionBroken {
743                    connection_id,
744                    ref reason,
745                }) => {
746                    tracing::warn!(
747                        source_id,
748                        connection_id,
749                        reason = %reason,
750                        "Connection broken -- marking source as error"
751                    );
752                    let _ =
753                        store::update_source_status(&self.pool, *source_id, "error", Some(reason))
754                            .await;
755                    let _ =
756                        store::update_connection_status(&self.pool, connection_id, "expired").await;
757                }
758                Err(e) => {
759                    tracing::warn!(
760                        source_type = provider.source_type(),
761                        error = %e,
762                        "Remote scan failed"
763                    );
764                    let _ = store::update_source_status(
765                        &self.pool,
766                        *source_id,
767                        "error",
768                        Some(&e.to_string()),
769                    )
770                    .await;
771                }
772            }
773        }
774    }
775
776    /// Loop for when only remote sources are configured (no local watchers).
777    async fn remote_only_loop(&self, remote_map: &[RemoteSource], cancel: CancellationToken) {
778        let interval_dur = remote_map
779            .iter()
780            .map(|(_, _, _, d)| *d)
781            .min()
782            .unwrap_or(self.fallback_scan_interval);
783        let mut interval = tokio::time::interval(interval_dur);
784        interval.tick().await;
785
786        loop {
787            tokio::select! {
788                () = cancel.cancelled() => {
789                    tracing::info!("Watchtower remote-only loop cancelled");
790                    break;
791                }
792                _ = interval.tick() => {
793                    self.poll_remote_sources(remote_map).await;
794                }
795            }
796        }
797    }
798
799    /// Build a GoogleDriveProvider backed by a linked-account connection.
800    ///
801    /// Loads the connector encryption key and constructs the connector
802    /// from config. Returns an error string if setup fails (caller
803    /// logs and skips the source).
804    fn build_connection_provider(
805        &self,
806        folder_id: &str,
807        connection_id: i64,
808    ) -> Result<crate::source::google_drive::GoogleDriveProvider, String> {
809        let key = crate::source::connector::crypto::ensure_connector_key(&self.data_dir)
810            .map_err(|e| format!("connector key error: {e}"))?;
811
812        let connector = crate::source::connector::google_drive::GoogleDriveConnector::new(
813            &self.connector_config.google_drive,
814        )
815        .map_err(|e| format!("connector config error: {e}"))?;
816
817        Ok(
818            crate::source::google_drive::GoogleDriveProvider::from_connection(
819                folder_id.to_string(),
820                connection_id,
821                self.pool.clone(),
822                key,
823                connector,
824            ),
825        )
826    }
827
828    /// Polling-only fallback loop when the notify watcher fails to initialize.
829    async fn polling_loop(
830        &self,
831        source_map: &[(i64, PathBuf, Vec<String>)],
832        cancel: CancellationToken,
833    ) {
834        let mut interval = tokio::time::interval(self.fallback_scan_interval);
835        interval.tick().await; // Consume immediate tick.
836
837        loop {
838            tokio::select! {
839                () = cancel.cancelled() => {
840                    tracing::info!("Watchtower polling loop cancelled");
841                    break;
842                }
843                _ = interval.tick() => {
844                    for (source_id, base_path, patterns) in source_map {
845                        if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
846                            tracing::warn!(
847                                path = %base_path.display(),
848                                error = %e,
849                                "Polling scan failed"
850                            );
851                        }
852                    }
853                }
854            }
855        }
856    }
857}