Skip to main content

orbok_fs/
scanner.rs

1//! File scanner and change detection (RFC-004).
2//!
3//! The scanner is the authority for source traversal and file catalog
4//! state (Appendix A §13: localcache freshness checks happen later, in
5//! workers, never here). Per-file failures never abort the scan
6//! (RFC-004 §16); cancellation leaves the catalog valid because every
7//! file is committed individually.
8
9use crate::hashing::sha256_file;
10use crate::policy::{CompiledPolicy, FileTypeClass, classify_file_type};
11use orbok_core::{FileStatus, JobType, OrbokResult, SourceId, now_iso8601, system_time_iso8601};
12use orbok_db::Catalog;
13use orbok_db::repo::{
14    FileRepository, IndexJobRepository, NewFile, ObservedMetadata, SourceRecord, SourceRepository,
15};
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Instant;
19
20/// Scan request (RFC-004 §20).
21#[derive(Debug, Clone)]
22pub struct ScanRequest {
23    pub source_id: SourceId,
24    /// Hash even when the metadata fast-check says unchanged.
25    pub force_hash: bool,
26    /// Queue `extract` jobs for new/stale files (RFC-004 §13).
27    pub enqueue_index_jobs: bool,
28}
29
30/// Per-file classification produced during a scan.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum ScanOutcomeKind {
33    New,
34    Unchanged,
35    Stale,
36    Unsupported,
37    PermissionDenied,
38    Failed,
39}
40
41/// Scan summary counts (RFC-004 §14).
42#[derive(Debug, Clone, Default)]
43pub struct ScanSummary {
44    pub seen_files: u64,
45    pub new_files: u64,
46    pub unchanged_files: u64,
47    pub stale_files: u64,
48    pub missing_files: u64,
49    pub unsupported_files: u64,
50    pub permission_denied_files: u64,
51    pub failed_files: u64,
52    pub queued_index_jobs: u64,
53    pub duration_ms: u64,
54    pub canceled: bool,
55}
56
57/// The scanner. Holds the catalog handle; one scan call per source.
58pub struct Scanner<'a> {
59    catalog: &'a Catalog,
60}
61
62impl<'a> Scanner<'a> {
63    pub fn new(catalog: &'a Catalog) -> Self {
64        Self { catalog }
65    }
66
67    /// Scan one active source (RFC-004 §10). `cancel` may be flipped by
68    /// the UI at any time; the scan stops at the next file boundary.
69    pub fn scan(&self, request: &ScanRequest, cancel: &AtomicBool) -> OrbokResult<ScanSummary> {
70        let started = Instant::now();
71        let scan_started_at = now_iso8601();
72        let mut summary = ScanSummary::default();
73
74        let sources = SourceRepository::new(self.catalog);
75        let source = sources
76            .get(&request.source_id)?
77            .ok_or(orbok_core::OrbokError::SourceNotFound)?;
78        let policy = CompiledPolicy::from_source(&source);
79        let root = PathBuf::from(&source.canonical_path);
80
81        let files = FileRepository::new(self.catalog);
82        let jobs = IndexJobRepository::new(self.catalog);
83
84        let mut stack = vec![root.clone()];
85        'walk: while let Some(dir) = stack.pop() {
86            if cancel.load(Ordering::Relaxed) {
87                summary.canceled = true;
88                break 'walk;
89            }
90            let entries = match std::fs::read_dir(&dir) {
91                Ok(entries) => entries,
92                Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => {
93                    summary.permission_denied_files += 1;
94                    continue;
95                }
96                Err(_) => {
97                    summary.failed_files += 1;
98                    continue;
99                }
100            };
101            for entry in entries {
102                if cancel.load(Ordering::Relaxed) {
103                    summary.canceled = true;
104                    break 'walk;
105                }
106                let Ok(entry) = entry else {
107                    summary.failed_files += 1;
108                    continue;
109                };
110                let path = entry.path();
111                let name = entry.file_name().to_string_lossy().into_owned();
112
113                if skip_component(&policy, &source, &name) {
114                    continue;
115                }
116                let Ok(file_type) = entry.file_type() else {
117                    summary.failed_files += 1;
118                    continue;
119                };
120                if file_type.is_symlink() {
121                    // RFC-003 §6.2: v1 default Ignore; FollowWithinSource
122                    // resolves and verifies containment.
123                    if !symlink_allowed(&policy, &root, &path) {
124                        continue;
125                    }
126                }
127                if path.is_dir() {
128                    stack.push(path);
129                    continue;
130                }
131                if !path.is_file() {
132                    continue;
133                }
134                summary.seen_files += 1;
135                let outcome = self.process_file(
136                    &source,
137                    &policy,
138                    &files,
139                    &jobs,
140                    &path,
141                    request,
142                    &mut summary,
143                );
144                match outcome {
145                    Ok(()) => {}
146                    Err(_) => summary.failed_files += 1,
147                }
148            }
149        }
150
151        if !summary.canceled {
152            summary.missing_files =
153                files.mark_missing_unseen(&source.source_id, &scan_started_at)?;
154            sources.touch_scanned(&source.source_id)?;
155        }
156        summary.duration_ms = started.elapsed().as_millis() as u64;
157        tracing::info!(
158            source = source.source_id.as_str(),
159            seen = summary.seen_files,
160            new = summary.new_files,
161            stale = summary.stale_files,
162            missing = summary.missing_files,
163            "scan finished"
164        );
165        Ok(summary)
166    }
167
168    /// Catalog one regular file. Failure here affects this file only
169    /// (RFC-004 §16).
170    #[allow(clippy::too_many_arguments)]
171    fn process_file(
172        &self,
173        source: &SourceRecord,
174        policy: &CompiledPolicy,
175        files: &FileRepository<'_>,
176        jobs: &IndexJobRepository<'_>,
177        path: &Path,
178        request: &ScanRequest,
179        summary: &mut ScanSummary,
180    ) -> OrbokResult<()> {
181        let file_name = path
182            .file_name()
183            .map(|n| n.to_string_lossy().into_owned())
184            .unwrap_or_default();
185        if !policy.file_included(&file_name) {
186            return Ok(());
187        }
188
189        let metadata = match std::fs::metadata(path) {
190            Ok(m) => m,
191            Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => {
192                summary.permission_denied_files += 1;
193                self.upsert_status_only(source, files, path, FileStatus::PermissionDenied)?;
194                return Ok(());
195            }
196            Err(e) => return Err(e.into()),
197        };
198        if !policy.size_allowed(metadata.len()) {
199            return Ok(()); // over limit: skipped, not cataloged (RFC-004 §10)
200        }
201
202        let supported = classify_file_type(path) == FileTypeClass::Supported;
203        let canonical = path.to_string_lossy().into_owned();
204        let observed = ObservedMetadata {
205            file_size_bytes: metadata.len(),
206            modified_at: metadata.modified().ok().map(system_time_iso8601),
207            platform_file_key: platform_file_key(&metadata),
208            content_hash: None,
209        };
210
211        let existing = files.get_by_path(&source.source_id, &canonical)?;
212        match existing {
213            None => {
214                // New file: hash if supported (content identity required
215                // before "indexed", RFC-004 §9.2).
216                let mut observed = observed;
217                let status = if supported {
218                    observed.content_hash = Some(sha256_file(path)?);
219                    FileStatus::Discovered
220                } else {
221                    summary.unsupported_files += 1;
222                    FileStatus::Unsupported
223                };
224                let record = files.insert(NewFile {
225                    source_id: source.source_id.clone(),
226                    original_path: canonical.clone(),
227                    canonical_path: canonical.clone(),
228                    display_path: display_path(&source.canonical_path, &canonical),
229                    extension: path
230                        .extension()
231                        .map(|e| e.to_string_lossy().to_ascii_lowercase()),
232                    metadata: observed,
233                    status,
234                })?;
235                if supported {
236                    summary.new_files += 1;
237                    if request.enqueue_index_jobs {
238                        jobs.enqueue(
239                            JobType::Extract,
240                            Some(&source.source_id),
241                            Some(&record.file_id),
242                        )?;
243                        summary.queued_index_jobs += 1;
244                    }
245                }
246            }
247            Some(record) => {
248                // RFC-004 §11: a missing file that reappears with the
249                // same content returns to its previous state (Indexed if
250                // it ever was, otherwise Discovered).
251                let restored_status = (record.file_status == FileStatus::Missing).then(|| {
252                    if record.last_indexed_at.is_some() {
253                        FileStatus::Indexed
254                    } else {
255                        FileStatus::Discovered
256                    }
257                });
258                // Fast check (RFC-004 §9.1). `modified_at` strings are
259                // RFC 3339 with nanosecond precision where the
260                // filesystem provides it — same-second overwrites with
261                // unchanged size are still detected (the defect class
262                // fixed in localcache 0.20.0). On coarse-timestamp
263                // filesystems, `force_hash` remains the escape hatch.
264                let metadata_unchanged = record.file_size_bytes == observed.file_size_bytes
265                    && record.modified_at == observed.modified_at;
266                if metadata_unchanged && !request.force_hash {
267                    match restored_status {
268                        Some(status) => {
269                            files.update_observed(&record.file_id, &observed, status)?
270                        }
271                        None => files.touch_seen(&record.file_id)?,
272                    }
273                    summary.unchanged_files += 1;
274                    return Ok(());
275                }
276                // Metadata changed (or forced): confirm with hash.
277                let mut observed = observed;
278                let new_hash = sha256_file(path)?;
279                if record.content_hash.as_deref() == Some(new_hash.as_str()) {
280                    match restored_status {
281                        Some(status) => {
282                            files.update_observed(&record.file_id, &observed, status)?
283                        }
284                        None => files.touch_seen(&record.file_id)?,
285                    }
286                    summary.unchanged_files += 1;
287                    return Ok(());
288                }
289                observed.content_hash = Some(new_hash);
290                let status = match record.file_status {
291                    FileStatus::Indexed | FileStatus::Stale => FileStatus::Stale,
292                    _ => FileStatus::Discovered,
293                };
294                files.update_observed(&record.file_id, &observed, status)?;
295                summary.stale_files += 1;
296                if request.enqueue_index_jobs {
297                    jobs.enqueue(
298                        JobType::Extract,
299                        Some(&source.source_id),
300                        Some(&record.file_id),
301                    )?;
302                    summary.queued_index_jobs += 1;
303                }
304            }
305        }
306        Ok(())
307    }
308
309    fn upsert_status_only(
310        &self,
311        source: &SourceRecord,
312        files: &FileRepository<'_>,
313        path: &Path,
314        status: FileStatus,
315    ) -> OrbokResult<()> {
316        let canonical = path.to_string_lossy().into_owned();
317        match files.get_by_path(&source.source_id, &canonical)? {
318            Some(record) => files.set_status(&record.file_id, status),
319            None => files
320                .insert(NewFile {
321                    source_id: source.source_id.clone(),
322                    original_path: canonical.clone(),
323                    canonical_path: canonical.clone(),
324                    display_path: display_path(&source.canonical_path, &canonical),
325                    extension: None,
326                    metadata: ObservedMetadata::default(),
327                    status,
328                })
329                .map(|_| ()),
330        }
331    }
332}
333
334/// Hidden/excluded component skipping for directory descent and files.
335fn skip_component(policy: &CompiledPolicy, source: &SourceRecord, name: &str) -> bool {
336    if policy.component_excluded(name) {
337        return true;
338    }
339    if CompiledPolicy::component_hidden(name)
340        && source.hidden_file_policy == orbok_core::HiddenFilePolicy::Exclude
341    {
342        return true;
343    }
344    false
345}
346
347/// Symlink admission per policy (RFC-003 §12.2): resolved target must
348/// stay inside the source root for FollowWithinSource; Ignore admits
349/// nothing.
350fn symlink_allowed(policy: &CompiledPolicy, root: &Path, path: &Path) -> bool {
351    match policy.symlink_policy {
352        orbok_core::SymlinkPolicy::Ignore => false,
353        orbok_core::SymlinkPolicy::FollowWithinSource
354        | orbok_core::SymlinkPolicy::FollowAllWithWarning => match std::fs::canonicalize(path) {
355            Ok(resolved) => resolved.starts_with(root),
356            Err(_) => false,
357        },
358    }
359}
360
361/// UI display path: relative to the source root where possible.
362fn display_path(root: &str, canonical: &str) -> String {
363    canonical
364        .strip_prefix(root)
365        .map(|rest| rest.trim_start_matches(['/', '\\']).to_string())
366        .filter(|s| !s.is_empty())
367        .unwrap_or_else(|| canonical.to_string())
368}
369
370/// Unix: device+inode identity (RFC-004 §9.3).
371#[cfg(unix)]
372fn platform_file_key(metadata: &std::fs::Metadata) -> Option<String> {
373    use std::os::unix::fs::MetadataExt;
374    Some(format!("{}:{}", metadata.dev(), metadata.ino()))
375}
376
377#[cfg(not(unix))]
378fn platform_file_key(_metadata: &std::fs::Metadata) -> Option<String> {
379    None
380}