Skip to main content

orbok_fs/
scanner.rs

1//! File scanner and change detection (RFC-004).
2//!
3//! The scanner is the authority for source traversal and file catalog
4//! state (Appendix A §13: localcache freshness checks happen later, in
5//! workers, never here). Per-file failures never abort the scan
6//! (RFC-004 §16); cancellation leaves the catalog valid because every
7//! file is committed individually.
8
9use crate::hashing::sha256_file;
10use crate::policy::{CompiledPolicy, FileTypeClass, classify_file_type};
11use orbok_core::{
12    FileStatus, JobType, OrbokResult, SourceId, now_iso8601, system_time_iso8601,
13};
14use orbok_db::Catalog;
15use orbok_db::repo::{
16    FileRepository, IndexJobRepository, NewFile, ObservedMetadata, SourceRecord, SourceRepository,
17};
18use std::path::{Path, PathBuf};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::time::Instant;
21
22/// Scan request (RFC-004 §20).
23#[derive(Debug, Clone)]
24pub struct ScanRequest {
25    pub source_id: SourceId,
26    /// Hash even when the metadata fast-check says unchanged.
27    pub force_hash: bool,
28    /// Queue `extract` jobs for new/stale files (RFC-004 §13).
29    pub enqueue_index_jobs: bool,
30}
31
32/// Per-file classification produced during a scan.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum ScanOutcomeKind {
35    New,
36    Unchanged,
37    Stale,
38    Unsupported,
39    PermissionDenied,
40    Failed,
41}
42
43/// Scan summary counts (RFC-004 §14).
44#[derive(Debug, Clone, Default)]
45pub struct ScanSummary {
46    pub seen_files: u64,
47    pub new_files: u64,
48    pub unchanged_files: u64,
49    pub stale_files: u64,
50    pub missing_files: u64,
51    pub unsupported_files: u64,
52    pub permission_denied_files: u64,
53    pub failed_files: u64,
54    pub queued_index_jobs: u64,
55    pub duration_ms: u64,
56    pub canceled: bool,
57}
58
59/// The scanner. Holds the catalog handle; one scan call per source.
60pub struct Scanner<'a> {
61    catalog: &'a Catalog,
62}
63
64impl<'a> Scanner<'a> {
65    pub fn new(catalog: &'a Catalog) -> Self {
66        Self { catalog }
67    }
68
69    /// Scan one active source (RFC-004 §10). `cancel` may be flipped by
70    /// the UI at any time; the scan stops at the next file boundary.
71    pub fn scan(&self, request: &ScanRequest, cancel: &AtomicBool) -> OrbokResult<ScanSummary> {
72        let started = Instant::now();
73        let scan_started_at = now_iso8601();
74        let mut summary = ScanSummary::default();
75
76        let sources = SourceRepository::new(self.catalog);
77        let source = sources
78            .get(&request.source_id)?
79            .ok_or(orbok_core::OrbokError::SourceNotFound)?;
80        let policy = CompiledPolicy::from_source(&source);
81        let root = PathBuf::from(&source.canonical_path);
82
83        let files = FileRepository::new(self.catalog);
84        let jobs = IndexJobRepository::new(self.catalog);
85
86        let mut stack = vec![root.clone()];
87        'walk: while let Some(dir) = stack.pop() {
88            if cancel.load(Ordering::Relaxed) {
89                summary.canceled = true;
90                break 'walk;
91            }
92            let entries = match std::fs::read_dir(&dir) {
93                Ok(entries) => entries,
94                Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => {
95                    summary.permission_denied_files += 1;
96                    continue;
97                }
98                Err(_) => {
99                    summary.failed_files += 1;
100                    continue;
101                }
102            };
103            for entry in entries {
104                if cancel.load(Ordering::Relaxed) {
105                    summary.canceled = true;
106                    break 'walk;
107                }
108                let Ok(entry) = entry else {
109                    summary.failed_files += 1;
110                    continue;
111                };
112                let path = entry.path();
113                let name = entry.file_name().to_string_lossy().into_owned();
114
115                if skip_component(&policy, &source, &name) {
116                    continue;
117                }
118                let Ok(file_type) = entry.file_type() else {
119                    summary.failed_files += 1;
120                    continue;
121                };
122                if file_type.is_symlink() {
123                    // RFC-003 §6.2: v1 default Ignore; FollowWithinSource
124                    // resolves and verifies containment.
125                    if !symlink_allowed(&policy, &root, &path) {
126                        continue;
127                    }
128                }
129                if path.is_dir() {
130                    stack.push(path);
131                    continue;
132                }
133                if !path.is_file() {
134                    continue;
135                }
136                summary.seen_files += 1;
137                let outcome =
138                    self.process_file(&source, &policy, &files, &jobs, &path, request, &mut summary);
139                match outcome {
140                    Ok(()) => {}
141                    Err(_) => summary.failed_files += 1,
142                }
143            }
144        }
145
146        if !summary.canceled {
147            summary.missing_files = files.mark_missing_unseen(&source.source_id, &scan_started_at)?;
148            sources.touch_scanned(&source.source_id)?;
149        }
150        summary.duration_ms = started.elapsed().as_millis() as u64;
151        tracing::info!(
152            source = source.source_id.as_str(),
153            seen = summary.seen_files,
154            new = summary.new_files,
155            stale = summary.stale_files,
156            missing = summary.missing_files,
157            "scan finished"
158        );
159        Ok(summary)
160    }
161
162    /// Catalog one regular file. Failure here affects this file only
163    /// (RFC-004 §16).
164    #[allow(clippy::too_many_arguments)]
165    fn process_file(
166        &self,
167        source: &SourceRecord,
168        policy: &CompiledPolicy,
169        files: &FileRepository<'_>,
170        jobs: &IndexJobRepository<'_>,
171        path: &Path,
172        request: &ScanRequest,
173        summary: &mut ScanSummary,
174    ) -> OrbokResult<()> {
175        let file_name = path
176            .file_name()
177            .map(|n| n.to_string_lossy().into_owned())
178            .unwrap_or_default();
179        if !policy.file_included(&file_name) {
180            return Ok(());
181        }
182
183        let metadata = match std::fs::metadata(path) {
184            Ok(m) => m,
185            Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => {
186                summary.permission_denied_files += 1;
187                self.upsert_status_only(source, files, path, FileStatus::PermissionDenied)?;
188                return Ok(());
189            }
190            Err(e) => return Err(e.into()),
191        };
192        if !policy.size_allowed(metadata.len()) {
193            return Ok(()); // over limit: skipped, not cataloged (RFC-004 §10)
194        }
195
196        let supported = classify_file_type(path) == FileTypeClass::Supported;
197        let canonical = path.to_string_lossy().into_owned();
198        let observed = ObservedMetadata {
199            file_size_bytes: metadata.len(),
200            modified_at: metadata.modified().ok().map(system_time_iso8601),
201            platform_file_key: platform_file_key(&metadata),
202            content_hash: None,
203        };
204
205        let existing = files.get_by_path(&source.source_id, &canonical)?;
206        match existing {
207            None => {
208                // New file: hash if supported (content identity required
209                // before "indexed", RFC-004 §9.2).
210                let mut observed = observed;
211                let status = if supported {
212                    observed.content_hash = Some(sha256_file(path)?);
213                    FileStatus::Discovered
214                } else {
215                    summary.unsupported_files += 1;
216                    FileStatus::Unsupported
217                };
218                let record = files.insert(NewFile {
219                    source_id: source.source_id.clone(),
220                    original_path: canonical.clone(),
221                    canonical_path: canonical.clone(),
222                    display_path: display_path(&source.canonical_path, &canonical),
223                    extension: path
224                        .extension()
225                        .map(|e| e.to_string_lossy().to_ascii_lowercase()),
226                    metadata: observed,
227                    status,
228                })?;
229                if supported {
230                    summary.new_files += 1;
231                    if request.enqueue_index_jobs {
232                        jobs.enqueue(JobType::Extract, Some(&source.source_id), Some(&record.file_id))?;
233                        summary.queued_index_jobs += 1;
234                    }
235                }
236            }
237            Some(record) => {
238                // RFC-004 §11: a missing file that reappears with the
239                // same content returns to its previous state (Indexed if
240                // it ever was, otherwise Discovered).
241                let restored_status = (record.file_status == FileStatus::Missing).then(|| {
242                    if record.last_indexed_at.is_some() {
243                        FileStatus::Indexed
244                    } else {
245                        FileStatus::Discovered
246                    }
247                });
248                // Fast check (RFC-004 §9.1). `modified_at` strings are
249                // RFC 3339 with nanosecond precision where the
250                // filesystem provides it — same-second overwrites with
251                // unchanged size are still detected (the defect class
252                // fixed in localcache 0.20.0). On coarse-timestamp
253                // filesystems, `force_hash` remains the escape hatch.
254                let metadata_unchanged = record.file_size_bytes == observed.file_size_bytes
255                    && record.modified_at == observed.modified_at;
256                if metadata_unchanged && !request.force_hash {
257                    match restored_status {
258                        Some(status) => files.update_observed(&record.file_id, &observed, status)?,
259                        None => files.touch_seen(&record.file_id)?,
260                    }
261                    summary.unchanged_files += 1;
262                    return Ok(());
263                }
264                // Metadata changed (or forced): confirm with hash.
265                let mut observed = observed;
266                let new_hash = sha256_file(path)?;
267                if record.content_hash.as_deref() == Some(new_hash.as_str()) {
268                    match restored_status {
269                        Some(status) => files.update_observed(&record.file_id, &observed, status)?,
270                        None => files.touch_seen(&record.file_id)?,
271                    }
272                    summary.unchanged_files += 1;
273                    return Ok(());
274                }
275                observed.content_hash = Some(new_hash);
276                let status = match record.file_status {
277                    FileStatus::Indexed | FileStatus::Stale => FileStatus::Stale,
278                    _ => FileStatus::Discovered,
279                };
280                files.update_observed(&record.file_id, &observed, status)?;
281                summary.stale_files += 1;
282                if request.enqueue_index_jobs {
283                    jobs.enqueue(JobType::Extract, Some(&source.source_id), Some(&record.file_id))?;
284                    summary.queued_index_jobs += 1;
285                }
286            }
287        }
288        Ok(())
289    }
290
291    fn upsert_status_only(
292        &self,
293        source: &SourceRecord,
294        files: &FileRepository<'_>,
295        path: &Path,
296        status: FileStatus,
297    ) -> OrbokResult<()> {
298        let canonical = path.to_string_lossy().into_owned();
299        match files.get_by_path(&source.source_id, &canonical)? {
300            Some(record) => files.set_status(&record.file_id, status),
301            None => files
302                .insert(NewFile {
303                    source_id: source.source_id.clone(),
304                    original_path: canonical.clone(),
305                    canonical_path: canonical.clone(),
306                    display_path: display_path(&source.canonical_path, &canonical),
307                    extension: None,
308                    metadata: ObservedMetadata::default(),
309                    status,
310                })
311                .map(|_| ()),
312        }
313    }
314}
315
316/// Hidden/excluded component skipping for directory descent and files.
317fn skip_component(policy: &CompiledPolicy, source: &SourceRecord, name: &str) -> bool {
318    if policy.component_excluded(name) {
319        return true;
320    }
321    if CompiledPolicy::component_hidden(name)
322        && source.hidden_file_policy == orbok_core::HiddenFilePolicy::Exclude
323    {
324        return true;
325    }
326    false
327}
328
329/// Symlink admission per policy (RFC-003 §12.2): resolved target must
330/// stay inside the source root for FollowWithinSource; Ignore admits
331/// nothing.
332fn symlink_allowed(policy: &CompiledPolicy, root: &Path, path: &Path) -> bool {
333    match policy.symlink_policy {
334        orbok_core::SymlinkPolicy::Ignore => false,
335        orbok_core::SymlinkPolicy::FollowWithinSource
336        | orbok_core::SymlinkPolicy::FollowAllWithWarning => match std::fs::canonicalize(path) {
337            Ok(resolved) => resolved.starts_with(root),
338            Err(_) => false,
339        },
340    }
341}
342
343/// UI display path: relative to the source root where possible.
344fn display_path(root: &str, canonical: &str) -> String {
345    canonical
346        .strip_prefix(root)
347        .map(|rest| rest.trim_start_matches(['/', '\\']).to_string())
348        .filter(|s| !s.is_empty())
349        .unwrap_or_else(|| canonical.to_string())
350}
351
352/// Unix: device+inode identity (RFC-004 §9.3).
353#[cfg(unix)]
354fn platform_file_key(metadata: &std::fs::Metadata) -> Option<String> {
355    use std::os::unix::fs::MetadataExt;
356    Some(format!("{}:{}", metadata.dev(), metadata.ino()))
357}
358
359#[cfg(not(unix))]
360fn platform_file_key(_metadata: &std::fs::Metadata) -> Option<String> {
361    None
362}