1pub mod chunker;
10pub mod embedding_worker;
11pub mod graph_ingest;
12pub mod link_extractor;
13pub mod loopback;
14
15#[cfg(test)]
16mod tests;
17
18#[cfg(test)]
19mod inline_tests;
20
21use std::collections::HashMap;
22use std::path::{Path, PathBuf};
23use std::sync::Mutex;
24use std::time::{Duration, Instant};
25
26use notify_debouncer_full::{
27 new_debouncer, notify::RecursiveMode, DebounceEventResult, Debouncer, RecommendedCache,
28};
29use sha2::{Digest, Sha256};
30use tokio_util::sync::CancellationToken;
31
32use crate::config::{ConnectorConfig, ContentSourcesConfig};
33use crate::source::ContentSourceProvider;
34use crate::storage::accounts::DEFAULT_ACCOUNT_ID;
35use crate::storage::watchtower as store;
36use crate::storage::DbPool;
37
38#[derive(Debug, thiserror::Error)]
44pub enum WatchtowerError {
45 #[error("IO error: {0}")]
46 Io(#[from] std::io::Error),
47
48 #[error("storage error: {0}")]
49 Storage(#[from] crate::error::StorageError),
50
51 #[error("notify error: {0}")]
52 Notify(#[from] notify::Error),
53
54 #[error("config error: {0}")]
55 Config(String),
56
57 #[error("chunker error: {0}")]
58 Chunker(#[from] chunker::ChunkerError),
59}
60
61#[derive(Debug, Default)]
67pub struct IngestSummary {
68 pub ingested: u32,
69 pub skipped: u32,
70 pub errors: Vec<String>,
71}
72
73#[derive(Debug, Default)]
75pub struct ParsedFrontMatter {
76 pub title: Option<String>,
77 pub tags: Option<String>,
78 pub raw_yaml: Option<String>,
79}
80
81pub fn parse_front_matter(content: &str) -> (ParsedFrontMatter, &str) {
89 let (yaml_str, body) = loopback::split_front_matter(content);
90
91 let yaml_str = match yaml_str {
92 Some(y) => y,
93 None => return (ParsedFrontMatter::default(), content),
94 };
95
96 let parsed: Result<serde_yaml::Value, _> = serde_yaml::from_str(yaml_str);
97 match parsed {
98 Ok(serde_yaml::Value::Mapping(map)) => {
99 let title = map
100 .get(serde_yaml::Value::String("title".to_string()))
101 .and_then(|v| v.as_str())
102 .map(|s| s.to_string());
103
104 let tags = map
105 .get(serde_yaml::Value::String("tags".to_string()))
106 .map(|v| match v {
107 serde_yaml::Value::Sequence(seq) => seq
108 .iter()
109 .filter_map(|item| item.as_str())
110 .collect::<Vec<_>>()
111 .join(","),
112 serde_yaml::Value::String(s) => s.clone(),
113 _ => String::new(),
114 })
115 .filter(|s| !s.is_empty());
116
117 let fm = ParsedFrontMatter {
118 title,
119 tags,
120 raw_yaml: Some(yaml_str.to_string()),
121 };
122 (fm, body)
123 }
124 _ => (
125 ParsedFrontMatter {
126 raw_yaml: Some(yaml_str.to_string()),
127 ..Default::default()
128 },
129 body,
130 ),
131 }
132}
133
134pub fn matches_patterns(path: &Path, patterns: &[String]) -> bool {
143 let file_name = match path.file_name().and_then(|n| n.to_str()) {
144 Some(n) => n,
145 None => return false,
146 };
147
148 for pattern in patterns {
149 if let Ok(p) = glob::Pattern::new(pattern) {
150 if p.matches(file_name) {
151 return true;
152 }
153 }
154 }
155 false
156}
157
158fn relative_path_string(path: &Path) -> String {
160 path.iter()
161 .map(|part| part.to_string_lossy().into_owned())
162 .collect::<Vec<_>>()
163 .join("/")
164}
165
166pub async fn ingest_content(
176 pool: &DbPool,
177 source_id: i64,
178 provider_id: &str,
179 content: &str,
180 force: bool,
181) -> Result<store::UpsertResult, WatchtowerError> {
182 let (fm, body) = parse_front_matter(content);
183
184 let hash = if force {
185 let mut hasher = Sha256::new();
186 hasher.update(content.as_bytes());
187 hasher.update(
188 std::time::SystemTime::now()
189 .duration_since(std::time::UNIX_EPOCH)
190 .unwrap_or_default()
191 .as_nanos()
192 .to_le_bytes(),
193 );
194 format!("{:x}", hasher.finalize())
195 } else {
196 let mut hasher = Sha256::new();
197 hasher.update(content.as_bytes());
198 format!("{:x}", hasher.finalize())
199 };
200
201 let result = store::upsert_content_node(
202 pool,
203 source_id,
204 provider_id,
205 &hash,
206 fm.title.as_deref(),
207 body,
208 fm.raw_yaml.as_deref(),
209 fm.tags.as_deref(),
210 )
211 .await?;
212
213 Ok(result)
214}
215
216pub async fn ingest_file(
220 pool: &DbPool,
221 source_id: i64,
222 base_path: &Path,
223 relative_path: &str,
224 force: bool,
225) -> Result<store::UpsertResult, WatchtowerError> {
226 let full_path = base_path.join(relative_path);
227 let content = tokio::fs::read_to_string(&full_path).await?;
228 ingest_content(pool, source_id, relative_path, &content, force).await
229}
230
231pub async fn ingest_files(
233 pool: &DbPool,
234 source_id: i64,
235 base_path: &Path,
236 paths: &[String],
237 force: bool,
238) -> IngestSummary {
239 let mut summary = IngestSummary::default();
240
241 for rel_path in paths {
242 match ingest_file(pool, source_id, base_path, rel_path, force).await {
243 Ok(store::UpsertResult::Inserted | store::UpsertResult::Updated) => {
244 summary.ingested += 1;
245 }
246 Ok(store::UpsertResult::Skipped) => {
247 summary.skipped += 1;
248 }
249 Err(e) => {
250 summary.errors.push(format!("{rel_path}: {e}"));
251 }
252 }
253 }
254
255 summary
256}
257
258struct CooldownSet {
264 entries: HashMap<PathBuf, Instant>,
265 ttl: Duration,
266}
267
268impl CooldownSet {
269 fn new(ttl: Duration) -> Self {
270 Self {
271 entries: HashMap::new(),
272 ttl,
273 }
274 }
275
276 #[allow(dead_code)]
278 fn mark(&mut self, path: PathBuf) {
279 self.entries.insert(path, Instant::now());
280 }
281
282 fn is_cooling(&self, path: &Path) -> bool {
284 if let Some(ts) = self.entries.get(path) {
285 ts.elapsed() < self.ttl
286 } else {
287 false
288 }
289 }
290
291 fn cleanup(&mut self) {
293 self.entries.retain(|_, ts| ts.elapsed() < self.ttl);
294 }
295}
296
297type RemoteSource = (i64, Box<dyn ContentSourceProvider>, Vec<String>, Duration);
303
304pub struct WatchtowerLoop {
309 pool: DbPool,
310 config: ContentSourcesConfig,
311 connector_config: ConnectorConfig,
312 data_dir: PathBuf,
313 debounce_duration: Duration,
314 fallback_scan_interval: Duration,
315 cooldown_ttl: Duration,
316}
317
318impl WatchtowerLoop {
319 pub fn new(
321 pool: DbPool,
322 config: ContentSourcesConfig,
323 connector_config: ConnectorConfig,
324 data_dir: PathBuf,
325 ) -> Self {
326 Self {
327 pool,
328 config,
329 connector_config,
330 data_dir,
331 debounce_duration: Duration::from_secs(2),
332 fallback_scan_interval: Duration::from_secs(300), cooldown_ttl: Duration::from_secs(5),
334 }
335 }
336
337 pub async fn run(&self, cancel: CancellationToken) {
343 let local_sources: Vec<_> = self
346 .config
347 .sources
348 .iter()
349 .filter(|s| s.source_type == "local_fs" && s.is_enabled() && s.path.is_some())
350 .collect();
351
352 let remote_sources: Vec<_> = self
353 .config
354 .sources
355 .iter()
356 .filter(|s| s.source_type == "google_drive" && s.is_enabled() && s.folder_id.is_some())
357 .collect();
358
359 if local_sources.is_empty() && remote_sources.is_empty() {
360 tracing::info!("Watchtower: no watch sources configured, exiting");
361 return;
362 }
363
364 let mut source_map: Vec<(i64, PathBuf, Vec<String>)> = Vec::new();
366 for src in &local_sources {
367 let path_str = src.path.as_deref().unwrap();
368 let expanded = PathBuf::from(crate::storage::expand_tilde(path_str));
369
370 let config_json = serde_json::json!({
371 "path": path_str,
372 "file_patterns": src.file_patterns,
373 "loop_back_enabled": src.loop_back_enabled,
374 "analytics_sync_enabled": src.analytics_sync_enabled,
375 })
376 .to_string();
377
378 match store::ensure_local_fs_source(&self.pool, path_str, &config_json).await {
379 Ok(source_id) => {
380 source_map.push((source_id, expanded, src.file_patterns.clone()));
381 }
382 Err(e) => {
383 tracing::error!(path = path_str, error = %e, "Failed to register source context");
384 }
385 }
386 }
387
388 let mut remote_map: Vec<RemoteSource> = Vec::new();
390 for src in &remote_sources {
391 let folder_id = src.folder_id.as_deref().unwrap();
392 let config_json = serde_json::json!({
393 "folder_id": folder_id,
394 "file_patterns": src.file_patterns,
395 "service_account_key": src.service_account_key,
396 "connection_id": src.connection_id,
397 })
398 .to_string();
399
400 match store::ensure_google_drive_source(&self.pool, folder_id, &config_json).await {
401 Ok(source_id) => {
402 let interval = Duration::from_secs(src.poll_interval_seconds.unwrap_or(300));
403
404 let provider: Box<dyn ContentSourceProvider> =
406 if let Some(connection_id) = src.connection_id {
407 match self.build_connection_provider(folder_id, connection_id) {
408 Ok(p) => Box::new(p),
409 Err(reason) => {
410 tracing::warn!(
411 folder_id,
412 connection_id,
413 reason = %reason,
414 "Skipping connection-based source"
415 );
416 continue;
417 }
418 }
419 } else if src.service_account_key.is_some() {
420 let key_path = src.service_account_key.clone().unwrap_or_default();
421 Box::new(crate::source::google_drive::GoogleDriveProvider::new(
422 folder_id.to_string(),
423 key_path,
424 ))
425 } else {
426 tracing::warn!(
427 folder_id,
428 "Skipping Google Drive source: no connection_id or service_account_key"
429 );
430 continue;
431 };
432
433 remote_map.push((source_id, provider, src.file_patterns.clone(), interval));
434 }
435 Err(e) => {
436 tracing::error!(
437 folder_id = folder_id,
438 error = %e,
439 "Failed to register Google Drive source"
440 );
441 }
442 }
443 }
444
445 if source_map.is_empty() && remote_map.is_empty() {
446 tracing::warn!("Watchtower: no sources registered, exiting");
447 return;
448 }
449
450 for (source_id, base_path, patterns) in &source_map {
452 let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
453 match self.scan_directory(*source_id, base_path, patterns).await {
454 Ok(_) => {
455 let _ =
456 store::update_source_status(&self.pool, *source_id, "active", None).await;
457 }
458 Err(e) => {
459 tracing::error!(
460 path = %base_path.display(),
461 error = %e,
462 "Initial scan failed"
463 );
464 let _ = store::update_source_status(
465 &self.pool,
466 *source_id,
467 "error",
468 Some(&e.to_string()),
469 )
470 .await;
471 }
472 }
473 }
474
475 self.chunk_pending().await;
477
478 if !remote_map.is_empty() {
480 self.poll_remote_sources(&remote_map).await;
481 self.chunk_pending().await;
482 }
483
484 let watch_source_map: Vec<_> = source_map
488 .iter()
489 .zip(local_sources.iter())
490 .filter(|(_, src)| !src.is_scan_only())
491 .map(|(entry, _)| entry.clone())
492 .collect();
493
494 let notify_source_map: Vec<_> = source_map
496 .iter()
497 .zip(local_sources.iter())
498 .filter(|(_, src)| src.effective_change_detection() == "auto")
499 .map(|(entry, _)| entry.clone())
500 .collect();
501
502 if watch_source_map.is_empty() {
504 if remote_map.is_empty() {
505 tracing::info!(
506 "Watchtower: all local sources are scan-only and no remote sources, exiting"
507 );
508 return;
509 }
510 self.remote_only_loop(&remote_map, cancel).await;
511 return;
512 }
513
514 let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<DebounceEventResult>(256);
516
517 let handler = move |result: DebounceEventResult| {
518 let _ = async_tx.blocking_send(result);
519 };
520
521 let debouncer_result = new_debouncer(self.debounce_duration, None, handler);
522 let mut debouncer: Debouncer<notify::RecommendedWatcher, RecommendedCache> =
523 match debouncer_result {
524 Ok(d) => d,
525 Err(e) => {
526 tracing::error!(error = %e, "Failed to create filesystem watcher, falling back to polling");
527 self.polling_loop(&watch_source_map, cancel).await;
528 return;
529 }
530 };
531
532 for (_, base_path, _) in ¬ify_source_map {
534 if let Err(e) = debouncer.watch(base_path, RecursiveMode::Recursive) {
535 tracing::error!(
536 path = %base_path.display(),
537 error = %e,
538 "Failed to watch directory"
539 );
540 }
541 }
542
543 tracing::info!(
544 local_sources = source_map.len(),
545 watching = notify_source_map.len(),
546 polling = watch_source_map.len() - notify_source_map.len(),
547 remote_sources = remote_map.len(),
548 "Watchtower watching for changes"
549 );
550
551 let cooldown = Mutex::new(CooldownSet::new(self.cooldown_ttl));
552
553 let mut fallback_timer = tokio::time::interval(self.fallback_scan_interval);
555 fallback_timer.tick().await; let remote_interval = remote_map
559 .iter()
560 .map(|(_, _, _, d)| *d)
561 .min()
562 .unwrap_or(self.fallback_scan_interval);
563 let mut remote_timer = tokio::time::interval(remote_interval);
564 remote_timer.tick().await; loop {
567 tokio::select! {
568 () = cancel.cancelled() => {
569 tracing::info!("Watchtower: cancellation received, shutting down");
570 break;
571 }
572 _ = fallback_timer.tick() => {
573 for (source_id, base_path, patterns) in &watch_source_map {
576 if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
577 tracing::warn!(
578 path = %base_path.display(),
579 error = %e,
580 "Fallback scan failed"
581 );
582 }
583 }
584 if let Ok(mut cd) = cooldown.lock() {
585 cd.cleanup();
586 }
587 self.chunk_pending().await;
588 }
589 _ = remote_timer.tick(), if !remote_map.is_empty() => {
590 self.poll_remote_sources(&remote_map).await;
591 self.chunk_pending().await;
592 }
593 result = async_rx.recv() => {
594 match result {
595 Some(Ok(events)) => {
596 for event in events {
597 for path in &event.paths {
598 self.handle_event(path, &source_map, &cooldown).await;
599 }
600 }
601 self.chunk_pending().await;
602 }
603 Some(Err(errs)) => {
604 for e in errs {
605 tracing::warn!(error = %e, "Watcher error");
606 }
607 }
608 None => {
609 tracing::warn!("Watcher event channel closed");
610 break;
611 }
612 }
613 }
614 }
615 }
616
617 drop(debouncer);
619 tracing::info!("Watchtower shut down");
620 }
621
622 async fn handle_event(
624 &self,
625 path: &Path,
626 source_map: &[(i64, PathBuf, Vec<String>)],
627 cooldown: &Mutex<CooldownSet>,
628 ) {
629 if let Ok(cd) = cooldown.lock() {
631 if cd.is_cooling(path) {
632 tracing::debug!(path = %path.display(), "Skipping cooldown path");
633 return;
634 }
635 }
636
637 for (source_id, base_path, patterns) in source_map {
639 if path.starts_with(base_path) {
640 if !matches_patterns(path, patterns) {
642 return;
643 }
644
645 let rel = match path.strip_prefix(base_path) {
647 Ok(r) => relative_path_string(r),
648 Err(_) => return,
649 };
650
651 match ingest_file(&self.pool, *source_id, base_path, &rel, false).await {
652 Ok(result) => {
653 tracing::debug!(
654 path = %rel,
655 result = ?result,
656 "Watchtower ingested file"
657 );
658 }
659 Err(e) => {
660 tracing::warn!(
661 path = %rel,
662 error = %e,
663 "Watchtower ingest failed"
664 );
665 }
666 }
667 return;
668 }
669 }
670 }
671
672 async fn scan_directory(
674 &self,
675 source_id: i64,
676 base_path: &Path,
677 patterns: &[String],
678 ) -> Result<IngestSummary, WatchtowerError> {
679 let mut rel_paths = Vec::new();
680 Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
681
682 let summary = ingest_files(&self.pool, source_id, base_path, &rel_paths, false).await;
683
684 tracing::debug!(
685 path = %base_path.display(),
686 ingested = summary.ingested,
687 skipped = summary.skipped,
688 errors = summary.errors.len(),
689 "Directory scan complete"
690 );
691
692 let cursor = chrono::Utc::now().to_rfc3339();
694 if let Err(e) = store::update_sync_cursor(&self.pool, source_id, &cursor).await {
695 tracing::warn!(error = %e, "Failed to update sync cursor");
696 }
697
698 Ok(summary)
699 }
700
701 fn walk_directory(
703 base: &Path,
704 current: &Path,
705 patterns: &[String],
706 out: &mut Vec<String>,
707 ) -> Result<(), WatchtowerError> {
708 let entries = std::fs::read_dir(current)?;
709 for entry in entries {
710 let entry = entry?;
711 let file_type = entry.file_type()?;
712 let path = entry.path();
713
714 if file_type.is_dir() {
715 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
717 if name.starts_with('.') {
718 continue;
719 }
720 }
721 Self::walk_directory(base, &path, patterns, out)?;
722 } else if file_type.is_file() && matches_patterns(&path, patterns) {
723 if let Ok(rel) = path.strip_prefix(base) {
724 out.push(relative_path_string(rel));
725 }
726 }
727 }
728 Ok(())
729 }
730
731 async fn poll_remote_sources(&self, remote_sources: &[RemoteSource]) {
733 for (source_id, provider, patterns, _interval) in remote_sources {
734 let _ = store::update_source_status(&self.pool, *source_id, "syncing", None).await;
735
736 let cursor = match store::get_source_context(&self.pool, *source_id).await {
737 Ok(Some(ctx)) => ctx.sync_cursor,
738 Ok(None) => None,
739 Err(e) => {
740 tracing::warn!(source_id, error = %e, "Failed to get source context");
741 continue;
742 }
743 };
744
745 match provider.scan_for_changes(cursor.as_deref(), patterns).await {
746 Ok(files) => {
747 let mut ingested = 0u32;
748 let mut skipped = 0u32;
749 for file in &files {
750 match provider.read_content(&file.provider_id).await {
751 Ok(content) => {
752 match ingest_content(
753 &self.pool,
754 *source_id,
755 &file.provider_id,
756 &content,
757 false,
758 )
759 .await
760 {
761 Ok(
762 store::UpsertResult::Inserted
763 | store::UpsertResult::Updated,
764 ) => {
765 ingested += 1;
766 }
767 Ok(store::UpsertResult::Skipped) => {
768 skipped += 1;
769 }
770 Err(e) => {
771 tracing::warn!(
772 provider_id = %file.provider_id,
773 error = %e,
774 "Remote ingest failed"
775 );
776 }
777 }
778 }
779 Err(e) => {
780 tracing::warn!(
781 provider_id = %file.provider_id,
782 error = %e,
783 "Failed to read remote content"
784 );
785 }
786 }
787 }
788
789 tracing::debug!(
790 source_type = provider.source_type(),
791 ingested,
792 skipped,
793 total = files.len(),
794 "Remote poll complete"
795 );
796
797 let new_cursor = chrono::Utc::now().to_rfc3339();
799 if let Err(e) =
800 store::update_sync_cursor(&self.pool, *source_id, &new_cursor).await
801 {
802 tracing::warn!(error = %e, "Failed to update remote sync cursor");
803 }
804 let _ =
805 store::update_source_status(&self.pool, *source_id, "active", None).await;
806 }
807 Err(crate::source::SourceError::ConnectionBroken {
808 connection_id,
809 ref reason,
810 }) => {
811 tracing::warn!(
812 source_id,
813 connection_id,
814 reason = %reason,
815 "Connection broken -- marking source as error"
816 );
817 let _ =
818 store::update_source_status(&self.pool, *source_id, "error", Some(reason))
819 .await;
820 let _ =
821 store::update_connection_status(&self.pool, connection_id, "expired").await;
822 }
823 Err(e) => {
824 tracing::warn!(
825 source_type = provider.source_type(),
826 error = %e,
827 "Remote scan failed"
828 );
829 let _ = store::update_source_status(
830 &self.pool,
831 *source_id,
832 "error",
833 Some(&e.to_string()),
834 )
835 .await;
836 }
837 }
838 }
839 }
840
841 async fn remote_only_loop(&self, remote_map: &[RemoteSource], cancel: CancellationToken) {
843 let interval_dur = remote_map
844 .iter()
845 .map(|(_, _, _, d)| *d)
846 .min()
847 .unwrap_or(self.fallback_scan_interval);
848 let mut interval = tokio::time::interval(interval_dur);
849 interval.tick().await;
850
851 loop {
852 tokio::select! {
853 () = cancel.cancelled() => {
854 tracing::info!("Watchtower remote-only loop cancelled");
855 break;
856 }
857 _ = interval.tick() => {
858 self.poll_remote_sources(remote_map).await;
859 self.chunk_pending().await;
860 }
861 }
862 }
863 }
864
865 fn build_connection_provider(
871 &self,
872 folder_id: &str,
873 connection_id: i64,
874 ) -> Result<crate::source::google_drive::GoogleDriveProvider, String> {
875 let key = crate::source::connector::crypto::ensure_connector_key(&self.data_dir)
876 .map_err(|e| format!("connector key error: {e}"))?;
877
878 let connector = crate::source::connector::google_drive::GoogleDriveConnector::new(
879 &self.connector_config.google_drive,
880 )
881 .map_err(|e| format!("connector config error: {e}"))?;
882
883 Ok(
884 crate::source::google_drive::GoogleDriveProvider::from_connection(
885 folder_id.to_string(),
886 connection_id,
887 self.pool.clone(),
888 key,
889 connector,
890 ),
891 )
892 }
893
894 pub async fn reindex_local_source(
899 pool: &DbPool,
900 source_id: i64,
901 base_path: &Path,
902 patterns: &[String],
903 ) -> Result<IngestSummary, WatchtowerError> {
904 store::update_source_status(pool, source_id, "syncing", None).await?;
905
906 let mut rel_paths = Vec::new();
907 Self::walk_directory(base_path, base_path, patterns, &mut rel_paths)?;
908
909 let summary = ingest_files(pool, source_id, base_path, &rel_paths, true).await;
910
911 let cursor = chrono::Utc::now().to_rfc3339();
912 let _ = store::update_sync_cursor(pool, source_id, &cursor).await;
913
914 if summary.errors.is_empty() {
915 let _ = store::update_source_status(pool, source_id, "active", None).await;
916 } else {
917 let msg = format!("{} errors during reindex", summary.errors.len());
918 let _ = store::update_source_status(pool, source_id, "error", Some(&msg)).await;
919 }
920
921 Ok(summary)
922 }
923
924 async fn chunk_pending(&self) {
926 let chunked = chunker::chunk_pending_nodes(&self.pool, DEFAULT_ACCOUNT_ID, 100).await;
927 if chunked > 0 {
928 tracing::debug!(chunked, "Watchtower chunked pending nodes");
929 }
930 }
931
932 async fn polling_loop(
934 &self,
935 source_map: &[(i64, PathBuf, Vec<String>)],
936 cancel: CancellationToken,
937 ) {
938 let mut interval = tokio::time::interval(self.fallback_scan_interval);
939 interval.tick().await; loop {
942 tokio::select! {
943 () = cancel.cancelled() => {
944 tracing::info!("Watchtower polling loop cancelled");
945 break;
946 }
947 _ = interval.tick() => {
948 for (source_id, base_path, patterns) in source_map {
949 if let Err(e) = self.scan_directory(*source_id, base_path, patterns).await {
950 tracing::warn!(
951 path = %base_path.display(),
952 error = %e,
953 "Polling scan failed"
954 );
955 }
956 }
957 }
958 }
959 }
960 }
961}