Skip to main content

musefs_core/
scan.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::path::{Path, PathBuf};
3
4use musefs_db::convert::usize_from;
5use musefs_db::{Db, Format, NewArt, NewTrack, Tag, TrackArt};
6use musefs_format::{EmbeddedBinaryTag, EmbeddedPicture, Extent, flac, mp3, mp4, ogg, wav};
7
8use crate::byte_budget::ByteBudget;
9use crate::error::Result;
10use crate::freshness::BackingStamp;
11use std::fmt;
12use std::sync::Arc;
13use std::sync::mpsc::sync_channel;
14
15const BATCH_FILES: usize = 256;
16const BATCH_BYTES: u64 = 64 << 20; // 64 MiB
17
18/// Initial bounded-read window. Sized to cover most files' metadata in one read;
19/// larger metadata (e.g. embedded cover art) triggers a precise `NeedMore` widen.
20const WINDOW: usize = 1 << 16; // 64 KiB
21/// Cap on widen iterations before falling back to a full-buffer read.
22const MAX_WIDEN_RETRIES: usize = 8;
23/// Hard ceiling on bytes read to probe one file. Real audio metadata fits far
24/// below this, so a file still unparsed past the cap is treated as malformed
25/// rather than read whole into RAM. Guards against a multi-GB file misnamed with
26/// an audio extension, and against a corrupt header whose length field demands a
27/// giant `NeedMore` widen.
28pub(crate) const MAX_PROBE_BYTES: u64 = 64 << 20; // 64 MiB
29
30/// The artwork-size ceiling. Enforced here at ingest (oversize scanned art is
31/// dropped) and at resolve in `mapping::track_art_to_inputs` (oversize art from
32/// any writer is rejected). Sized to clear FLAC's 24-bit block length with
33/// headroom for the picture-block framing.
34pub(crate) const MAX_ART_BYTES: usize = 16 * 1024 * 1024 - 64 * 1024;
35
36/// Per-frame cap for opaque binary tags, mirroring `MAX_ART_BYTES`. Oversize
37/// payloads (e.g. a GEOB embedding a multi-MB file) are logged-and-skipped.
38const MAX_BINARY_TAG_BYTES: usize = MAX_ART_BYTES;
39
40/// Outcome of probing one backing file. `Unparseable` is a supported-extension
41/// file whose bytes did not parse (counted as a scan `failed`). `Raced` means
42/// the file changed under us between the pre- and post-probe `fstat` — the probe
43/// may be torn, so nothing is committed for it (#276).
44#[derive(Debug)]
45enum ProbeOutcome {
46    Probed(Probed, BackingStamp),
47    Unparseable,
48    Raced,
49}
50
51#[cfg(test)]
52thread_local! {
53    static AFTER_S1_HOOK: std::cell::RefCell<Option<Box<dyn FnMut()>>> =
54        const { std::cell::RefCell::new(None) };
55}
56#[cfg(test)]
57fn fire_after_s1() {
58    AFTER_S1_HOOK.with(|h| {
59        if let Some(f) = h.borrow_mut().as_mut() {
60            f();
61        }
62    });
63}
64#[cfg(test)]
65fn set_after_s1_hook(f: impl FnMut() + 'static) {
66    AFTER_S1_HOOK.with(|h| *h.borrow_mut() = Some(Box::new(f)));
67}
68#[cfg(test)]
69fn clear_after_s1_hook() {
70    AFTER_S1_HOOK.with(|h| *h.borrow_mut() = None);
71}
72
73/// A progress event emitted during a scan or revalidate. Borrows the current
74/// path to avoid a per-file allocation in the writer; the saved allocation is
75/// negligible next to the existing per-file `to_string_lossy` + DB write, so do
76/// not contort the API to preserve the borrow.
77#[derive(Debug, Clone, Copy)]
78pub enum ScanProgress<'a> {
79    /// A supported-audio file was found during the walk; `found` is the running
80    /// count of collected files.
81    Discovered { found: u64 },
82    /// The walk (and, for revalidate, the skip-unchanged pass) finished;
83    /// `total` files will be ingested and tracked by the determinate bar.
84    Walked { total: u64 },
85    /// A file was committed. `done` runs 1..=total; `path` is its absolute path.
86    Ingested {
87        done: u64,
88        total: u64,
89        path: &'a str,
90    },
91}
92
93/// UI-agnostic progress callback for [`ScanOptions`]. Invoked only from the
94/// caller's thread (the walk and the single writer), never from probe workers.
95/// The `Send + Sync` bound is not required by today's code; it is deliberate
96/// future-proofing and free here (`indicatif::ProgressBar` is `Send + Sync`).
97#[derive(Clone)]
98pub struct ProgressSink(Arc<dyn for<'a> Fn(ScanProgress<'a>) + Send + Sync>);
99
100impl ProgressSink {
101    pub fn new(f: impl for<'a> Fn(ScanProgress<'a>) + Send + Sync + 'static) -> Self {
102        ProgressSink(Arc::new(f))
103    }
104
105    fn emit(&self, ev: ScanProgress<'_>) {
106        (self.0)(ev);
107    }
108}
109
110impl fmt::Debug for ProgressSink {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        f.write_str("ProgressSink")
113    }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct ScanStats {
118    pub scanned: u64,
119    pub skipped: u64,
120    pub failed: u64,
121    pub raced: u64,
122}
123
124/// Per-extension tally of files skipped during the directory walk because their
125/// extension is not a supported audio format. Backs the end-of-scan summary log
126/// line (#341) that breaks the single `skipped` count down by extension, so an
127/// operator can tell expected sidecars (cover art, `.cue`, `.log`, `.nfo`) from
128/// genuinely unexpected files. Not part of `ScanStats`: the breakdown is
129/// log-only and does not affect the CLI summary.
130#[derive(Debug, Default)]
131struct SkipTally {
132    total: u64,
133    by_ext: BTreeMap<String, u64>,
134}
135
136impl SkipTally {
137    /// Record one skipped file, bucketed by its lowercased extension
138    /// (`<none>` when the file has no extension or a non-UTF-8 one).
139    fn record(&mut self, path: &Path) {
140        self.total += 1;
141        let ext = path
142            .extension()
143            .and_then(|e| e.to_str())
144            .map_or_else(|| "<none>".to_string(), str::to_ascii_lowercase);
145        *self.by_ext.entry(ext).or_insert(0) += 1;
146    }
147
148    /// The end-of-scan summary line, e.g. `skipped 42: jpg=20, cue=10, log=8,
149    /// <none>=4` — buckets ordered by descending count, ties broken by extension
150    /// name. `None` when nothing was skipped, so there is no line to emit.
151    fn summary(&self) -> Option<String> {
152        if self.total == 0 {
153            return None;
154        }
155        let mut buckets: Vec<(&String, &u64)> = self.by_ext.iter().collect();
156        buckets.sort_by(|a, b| b.1.cmp(a.1).then_with(|| a.0.cmp(b.0)));
157        let breakdown = buckets
158            .iter()
159            .map(|(ext, n)| format!("{ext}={n}"))
160            .collect::<Vec<_>>()
161            .join(", ");
162        Some(format!("skipped {}: {breakdown}", self.total))
163    }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct RevalidateStats {
168    pub updated: u64,
169    pub unchanged: u64,
170    pub pruned: u64,
171    pub failed: u64,
172    pub raced: u64,
173}
174
175fn has_ext(path: &Path, ext: &str) -> bool {
176    path.extension()
177        .and_then(|e| e.to_str())
178        .is_some_and(|e| e.eq_ignore_ascii_case(ext))
179}
180
181/// True if `path` has an extension for a format the scanner can probe.
182fn is_supported_audio(path: &Path) -> bool {
183    has_ext(path, "flac")
184        || has_ext(path, "mp3")
185        || has_ext(path, "m4a")
186        || has_ext(path, "m4b")
187        || has_ext(path, "ogg")
188        || has_ext(path, "oga")
189        || has_ext(path, "opus")
190        || has_ext(path, "wav")
191}
192
193fn collect_audio(
194    root: &Path,
195    out: &mut Vec<PathBuf>,
196    follow_symlinks: bool,
197) -> std::io::Result<SkipTally> {
198    collect_audio_with(root, out, follow_symlinks, None)
199}
200
201fn collect_audio_with(
202    root: &Path,
203    out: &mut Vec<PathBuf>,
204    follow_symlinks: bool,
205    progress: Option<&ProgressSink>,
206) -> std::io::Result<SkipTally> {
207    let mut visited = HashSet::new();
208    let mut files_visited = HashSet::new();
209    let mut tally = SkipTally::default();
210    if follow_symlinks && let Ok(meta) = std::fs::metadata(root) {
211        visited.insert(dir_key(&meta));
212    }
213    collect_audio_inner(
214        root,
215        out,
216        follow_symlinks,
217        &mut visited,
218        &mut files_visited,
219        &mut tally,
220        progress,
221    )?;
222    Ok(tally)
223}
224
225fn collect_audio_inner(
226    root: &Path,
227    out: &mut Vec<PathBuf>,
228    follow_symlinks: bool,
229    visited: &mut HashSet<(u64, u64)>,
230    files_visited: &mut HashSet<(u64, u64)>,
231    tally: &mut SkipTally,
232    progress: Option<&ProgressSink>,
233) -> std::io::Result<()> {
234    // A single unreadable subtree or vanished entry must drop only that entry,
235    // not abort the whole ingest — matching the log-and-continue resilience of
236    // the symlink arm below and `probe_file` (#534). The top-level root is
237    // validated upstream by `scan_directory_with`'s canonicalize, so a genuine
238    // bad root is still reported there.
239    let entries = match std::fs::read_dir(root) {
240        Ok(entries) => entries,
241        Err(e) => {
242            log::warn!("skipping directory {}: {e}", root.display());
243            return Ok(());
244        }
245    };
246    for entry in entries {
247        let entry = match entry {
248            Ok(entry) => entry,
249            Err(e) => {
250                log::warn!("skipping unreadable entry in {}: {e}", root.display());
251                continue;
252            }
253        };
254        let path = entry.path();
255        let ftype = match entry.file_type() {
256            Ok(ftype) => ftype,
257            Err(e) => {
258                log::warn!("skipping {}: {e}", path.display());
259                continue;
260            }
261        };
262        if ftype.is_dir() {
263            descend(
264                &path,
265                out,
266                follow_symlinks,
267                visited,
268                files_visited,
269                tally,
270                progress,
271            )?;
272        } else if ftype.is_file() {
273            if is_supported_audio(&path) {
274                push_file(&path, out, follow_symlinks, files_visited, None, progress);
275            } else {
276                tally.record(&path);
277            }
278        } else if ftype.is_symlink() {
279            if !follow_symlinks {
280                // Routine and expected (symlinks are off by default); a library
281                // sitting next to symlinked dirs would otherwise flood stderr at
282                // the default `warn` floor. The end-of-scan skip tally still
283                // surfaces what was passed over.
284                log::debug!(
285                    "skipping symlink {} (pass --follow-symlinks to scan it)",
286                    path.display()
287                );
288                continue;
289            }
290            match std::fs::metadata(&path) {
291                Ok(meta) if meta.is_dir() => {
292                    descend(
293                        &path,
294                        out,
295                        follow_symlinks,
296                        visited,
297                        files_visited,
298                        tally,
299                        progress,
300                    )?;
301                }
302                Ok(meta) if meta.is_file() => {
303                    if is_supported_audio(&path) {
304                        push_file(
305                            &path,
306                            out,
307                            follow_symlinks,
308                            files_visited,
309                            Some(&meta),
310                            progress,
311                        );
312                    } else {
313                        tally.record(&path);
314                    }
315                }
316                Ok(_) => {}
317                Err(e) => {
318                    log::warn!("skipping broken symlink {}: {e}", path.display());
319                }
320            }
321        } else {
322            // A direct special file (FIFO, char/block device, socket) — not a
323            // file, dir, or symlink. The audio invariant is unaffected (it is
324            // never opened), but tally it so it surfaces in the skip breakdown
325            // rather than vanishing without a trace, matching unsupported
326            // regular files above (#544).
327            tally.record(&path);
328        }
329    }
330    Ok(())
331}
332
333fn descend(
334    path: &Path,
335    out: &mut Vec<PathBuf>,
336    follow_symlinks: bool,
337    visited: &mut HashSet<(u64, u64)>,
338    files_visited: &mut HashSet<(u64, u64)>,
339    tally: &mut SkipTally,
340    progress: Option<&ProgressSink>,
341) -> std::io::Result<()> {
342    if !follow_symlinks {
343        return collect_audio_inner(
344            path,
345            out,
346            follow_symlinks,
347            visited,
348            files_visited,
349            tally,
350            progress,
351        );
352    }
353    let meta = match std::fs::metadata(path) {
354        Ok(m) => m,
355        Err(e) => {
356            log::warn!("skipping directory {}: {e}", path.display());
357            return Ok(());
358        }
359    };
360    if !visited.insert(dir_key(&meta)) {
361        log::warn!("skipping symlink cycle at {}", path.display());
362        return Ok(());
363    }
364    collect_audio_inner(
365        path,
366        out,
367        follow_symlinks,
368        visited,
369        files_visited,
370        tally,
371        progress,
372    )
373}
374
375fn dir_key(meta: &std::fs::Metadata) -> (u64, u64) {
376    use std::os::unix::fs::MetadataExt;
377    (meta.dev(), meta.ino())
378}
379
380/// Collect one supported-extension file into `out`, deduplicating by target
381/// identity when following symlinks so a real file and a symlink to it (or a
382/// file reached via two symlink paths) are ingested once. `known_meta` is the
383/// already-resolved target metadata when the caller has it (the symlink arm),
384/// avoiding a second `stat`. Dedup is best-effort: if the target cannot be
385/// `stat`ed we push it and let the probe pipeline count it rather than dropping
386/// it silently.
387fn push_file(
388    path: &Path,
389    out: &mut Vec<PathBuf>,
390    follow_symlinks: bool,
391    files_visited: &mut HashSet<(u64, u64)>,
392    known_meta: Option<&std::fs::Metadata>,
393    progress: Option<&ProgressSink>,
394) {
395    if !follow_symlinks {
396        out.push(path.to_path_buf());
397        if let Some(p) = progress {
398            p.emit(ScanProgress::Discovered {
399                found: out.len() as u64,
400            });
401        }
402        return;
403    }
404    let key = match known_meta {
405        Some(m) => Some(dir_key(m)),
406        None => std::fs::metadata(path).ok().map(|m| dir_key(&m)),
407    };
408    match key {
409        Some(k) if !files_visited.insert(k) => {
410            log::debug!("skipping duplicate backing target {}", path.display());
411        }
412        _ => {
413            out.push(path.to_path_buf());
414            if let Some(p) = progress {
415                p.emit(ScanProgress::Discovered {
416                    found: out.len() as u64,
417                });
418            }
419        }
420    }
421}
422
423/// A backing file parsed into the fields a track row needs, plus its raw
424/// `(key, value)` tags to seed.
425#[derive(Debug)]
426pub(crate) struct Probed {
427    format: Format,
428    audio_offset: u64,
429    audio_length: u64,
430    tags: Vec<(String, String)>,
431    pictures: Vec<EmbeddedPicture>,
432    binary_tags: Vec<EmbeddedBinaryTag>,
433    /// FLAC STREAMINFO/SEEKTABLE as (kind, body) pairs; empty for other formats.
434    structural_blocks: Vec<(String, Vec<u8>)>,
435}
436
437/// Assemble a WAV [`Probed`] from located audio bounds, reading tags and pictures
438/// from `prefix`. Shared by the bounded, full-buffer, and ceiling probe paths.
439fn wav_probed(prefix: &[u8], bounds: &wav::WavBounds) -> Probed {
440    let (binary_tags, promoted) = wav::read_binary_tags(prefix);
441    let mut tags = wav::read_tags(prefix);
442    tags.extend(promoted);
443    Probed {
444        format: Format::Wav,
445        audio_offset: bounds.audio_offset,
446        audio_length: bounds.audio_length,
447        tags,
448        pictures: wav::read_pictures(prefix),
449        binary_tags,
450        structural_blocks: Vec::new(),
451    }
452}
453
454/// Full-buffer probe (legacy path). Retained as the reference implementation the
455/// bounded path is checked against (see the equivalence property test).
456pub(crate) fn probe_full(path: &Path, bytes: &[u8]) -> Option<Probed> {
457    if has_ext(path, "flac") {
458        let scan = flac::locate_audio(bytes).ok()?;
459        let (structural_blocks, binary_tags) = flac::split_preserved(&scan.preserved);
460        Some(Probed {
461            format: Format::Flac,
462            audio_offset: scan.audio_offset,
463            audio_length: scan.audio_length,
464            tags: flac::read_vorbis_comments(bytes).unwrap_or_default(),
465            pictures: flac::read_pictures(bytes).unwrap_or_default(),
466            binary_tags,
467            structural_blocks,
468        })
469    } else if has_ext(path, "mp3") {
470        let bounds = mp3::locate_audio(bytes).ok()?;
471        let (binary_tags, promoted) = mp3::read_binary_tags(bytes);
472        let mut tags = mp3::read_tags(bytes);
473        tags.extend(promoted);
474        Some(Probed {
475            format: Format::Mp3,
476            audio_offset: bounds.audio_offset,
477            audio_length: bounds.audio_length,
478            tags,
479            pictures: mp3::read_pictures(bytes),
480            binary_tags,
481            structural_blocks: Vec::new(),
482        })
483    } else if has_ext(path, "m4a") || has_ext(path, "m4b") {
484        let bounds = mp4::locate_audio(bytes).ok()?;
485        let (pictures, art_drops) = mp4::read_pictures_reporting(bytes, MAX_ART_BYTES);
486        let (binary_tags, bin_drops) = mp4::read_binary_tags_reporting(bytes, MAX_BINARY_TAG_BYTES);
487        log_mp4_oversize_drops(path, &art_drops, &bin_drops);
488        Some(Probed {
489            format: Format::M4a,
490            audio_offset: bounds.audio_offset,
491            audio_length: bounds.audio_length,
492            tags: mp4::read_tags(bytes),
493            pictures,
494            binary_tags,
495            structural_blocks: Vec::new(),
496        })
497    } else if has_ext(path, "ogg") || has_ext(path, "oga") || has_ext(path, "opus") {
498        let scan = ogg::locate_audio(bytes).ok()?;
499        let format = match scan.codec {
500            ogg::Codec::Opus => Format::Opus,
501            ogg::Codec::Vorbis => Format::Vorbis,
502            ogg::Codec::OggFlac => Format::OggFlac,
503        };
504        Some(Probed {
505            format,
506            audio_offset: scan.audio_offset,
507            audio_length: scan.audio_length,
508            tags: ogg::read_tags(bytes).unwrap_or_default(),
509            pictures: ogg::read_pictures(bytes).unwrap_or_default(),
510            binary_tags: Vec::new(),
511            structural_blocks: Vec::new(),
512        })
513    } else if has_ext(path, "wav") {
514        let bounds = wav::locate_audio(bytes).ok()?;
515        Some(wav_probed(bytes, &bounds))
516    } else {
517        None
518    }
519}
520
521/// Read `[0, len)` of `path` into a buffer, counting the read. A short read at
522/// EOF is fine (`len` may exceed the file size).
523fn read_window(file: &std::fs::File, len: usize) -> std::io::Result<Vec<u8>> {
524    use std::os::unix::fs::FileExt;
525    let mut buf = vec![0u8; len];
526    let n = file.read_at(&mut buf, 0)?;
527    buf.truncate(n);
528    crate::metrics::on_scan_read(n as u64);
529    Ok(buf)
530}
531
532/// Read the file's last 128 bytes (for the MP3 ID3v1 trailer check), or `None`
533/// if the file is shorter than 128 bytes.
534fn read_tail_128(file: &std::fs::File, file_len: u64) -> std::io::Result<Option<[u8; 128]>> {
535    if file_len < 128 {
536        return Ok(None);
537    }
538    use std::os::unix::fs::FileExt;
539    let mut buf = [0u8; 128];
540    file.read_exact_at(&mut buf, file_len - 128)?;
541    crate::metrics::on_scan_read(128);
542    Ok(Some(buf))
543}
544
545/// Bounded probe of one backing file: open once, fstat before and after the
546/// probe, and report `Raced` when the file moved mid-probe — so the stored
547/// stamp and the probed bytes provably share one inode held still across the
548/// probe. Never reads the audio payload (M4A uses the seek reader;
549/// front-anchored formats read only the metadata extent).
550///
551/// Returns `ProbeOutcome::Unparseable` for a supported-extension file that does
552/// not parse (counted as `failed`) and `ProbeOutcome::Raced` if the file
553/// changed under us.
554fn probe_file(path: &Path, window: usize) -> std::io::Result<ProbeOutcome> {
555    let file = std::fs::File::open(path)?;
556    crate::metrics::on_scan_open();
557    let s1 = BackingStamp::from_metadata(&file.metadata()?);
558    #[cfg(test)]
559    fire_after_s1();
560
561    let probed = probe_body(path, &file, s1.size, window)?;
562
563    let s2 = BackingStamp::from_metadata(&file.metadata()?);
564    if s1 != s2 {
565        log::warn!("skipping {}: changed during probe", path.display());
566        return Ok(ProbeOutcome::Raced);
567    }
568    Ok(match probed {
569        Some(p) => ProbeOutcome::Probed(p, s1),
570        None => ProbeOutcome::Unparseable,
571    })
572}
573
574/// Run [`probe_file`] under a panic boundary so a residual parser panic — one
575/// the format-layer alloc guards (`id3v2_alloc_safe` and friends) don't catch —
576/// drops just that file instead of unwinding the scan worker thread. An unwound
577/// worker would skip its `failed.fetch_add`, and a crafted directory could kill
578/// every worker, closing the channel so the writer reports success while
579/// silently truncating the rest of the library (#425). A caught panic is logged
580/// and folded into `ProbeOutcome::Unparseable`, which the worker already counts
581/// as `failed`. Mirrors the read path's `read_outcome` boundary (#359).
582fn probe_file_caught(path: &Path, window: usize) -> std::io::Result<ProbeOutcome> {
583    match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| probe_file(path, window))) {
584        Ok(res) => res,
585        Err(payload) => {
586            let msg = payload
587                .downcast_ref::<&str>()
588                .copied()
589                .or_else(|| payload.downcast_ref::<String>().map(String::as_str))
590                .unwrap_or("<non-string panic>");
591            log::error!(
592                "scan worker panicked probing {}: {msg}; counting as failed",
593                path.display()
594            );
595            Ok(ProbeOutcome::Unparseable)
596        }
597    }
598}
599
600/// The per-format metadata dispatch for one already-opened backing file, over
601/// its first `file_len` bytes. Split out of `probe_file` so the fstat-sandwich
602/// wrapper stays legible. Never reads the audio payload (M4A uses the seek
603/// reader; front-anchored formats read only the metadata extent). Returns
604/// `Ok(None)` for an unsupported/unparseable file.
605fn probe_body(
606    path: &Path,
607    file: &std::fs::File,
608    file_len: u64,
609    window: usize,
610) -> std::io::Result<Option<Probed>> {
611    // M4A: seek reader, never touches mdat.
612    if has_ext(path, "m4a") || has_ext(path, "m4b") {
613        let mut f = file;
614        let scan = match mp4::read_structure_from(&mut f, file_len) {
615            Ok(s) => s,
616            Err(e) => {
617                log::warn!("skipping {}: {e}", path.display());
618                return Ok(None);
619            }
620        };
621        let (pictures, art_drops) = mp4::read_pictures_reporting(&scan.moov, MAX_ART_BYTES);
622        let (binary_tags, bin_drops) =
623            mp4::read_binary_tags_reporting(&scan.moov, MAX_BINARY_TAG_BYTES);
624        log_mp4_oversize_drops(path, &art_drops, &bin_drops);
625        return Ok(Some(Probed {
626            format: Format::M4a,
627            audio_offset: scan.mdat_payload_offset,
628            audio_length: scan.mdat_payload_len,
629            tags: mp4::read_tags(&scan.moov),
630            pictures,
631            binary_tags,
632            structural_blocks: Vec::new(),
633        }));
634    }
635
636    // Front-anchored formats: read a window, widen on NeedMore. Only the MP3
637    // arm of probe_prefix consumes the ID3v1 tail, and dispatch is by
638    // extension — so only .mp3 pays the tail read (#67).
639    let tail = if has_ext(path, "mp3") {
640        read_tail_128(file, file_len)?
641    } else {
642        None
643    };
644    // Never read past the probe ceiling, however large the file or whatever a
645    // (possibly corrupt) header asks for via `NeedMore`.
646    let probe_cap = file_len.min(MAX_PROBE_BYTES);
647    let mut want = usize_from((window as u64).min(probe_cap));
648    let mut prefix = read_window(file, want)?;
649    for _ in 0..MAX_WIDEN_RETRIES {
650        match probe_prefix(path, &prefix, file_len, tail.as_ref()) {
651            Probe::Done(p) => return Ok(Some(p)),
652            Probe::Skip => {
653                log::warn!("skipping {}: no parseable audio metadata", path.display());
654                return Ok(None);
655            }
656            Probe::NeedMore(up_to) => {
657                // Read everything we're willing to probe? Widening can't help.
658                if want as u64 >= probe_cap {
659                    break;
660                }
661                // Grow to at least `up_to` (capped at `probe_cap`), always making
662                // progress (`+1`), then retry.
663                want = usize_from(up_to.min(probe_cap))
664                    .max(want + 1)
665                    .min(usize_from(probe_cap));
666                prefix = read_window(file, want)?;
667            }
668        }
669    }
670    // Fallback: full-buffer probe over the bytes we were willing to read.
671    if (prefix.len() as u64) < probe_cap {
672        prefix = read_window(file, usize_from(probe_cap))?;
673    }
674    if let Some(p) = probe_full(path, &prefix) {
675        return Ok(Some(p));
676    }
677    // A WAV whose `data` payload runs past the probe ceiling fails the strict
678    // full-buffer parse (the payload isn't present to bound), yet its `fmt `/`data`
679    // headers sit at the front: trust the declared bounds and serve the audio,
680    // accepting the loss of any tag chunks trailing the payload.
681    if has_ext(path, "wav")
682        && file_len > MAX_PROBE_BYTES
683        && let Ok(bounds) = wav::locate_audio_at_ceiling(&prefix, file_len)
684    {
685        return Ok(Some(wav_probed(&prefix, &bounds)));
686    }
687    if file_len > MAX_PROBE_BYTES {
688        log::warn!(
689            "skipping {}: no parseable metadata within first {MAX_PROBE_BYTES} bytes",
690            path.display()
691        );
692    } else {
693        log::warn!("skipping {}: no parseable audio metadata", path.display());
694    }
695    Ok(None)
696}
697
698/// Outcome of a single bounded dispatch attempt against the current `prefix`.
699enum Probe {
700    Done(Probed),
701    NeedMore(u64),
702    Skip,
703}
704
705/// Dispatch the front-anchored formats against `prefix` + `file_len`.
706fn probe_prefix(path: &Path, prefix: &[u8], file_len: u64, tail: Option<&[u8; 128]>) -> Probe {
707    if has_ext(path, "flac") {
708        match flac::read_metadata_bounded(prefix) {
709            Ok(Extent::Complete(meta)) => {
710                let (structural_blocks, binary_tags) = flac::split_preserved(&meta.preserved);
711                Probe::Done(Probed {
712                    format: Format::Flac,
713                    audio_offset: meta.audio_offset,
714                    audio_length: file_len - meta.audio_offset,
715                    tags: flac::read_vorbis_comments(prefix).unwrap_or_default(),
716                    pictures: flac::read_pictures(prefix).unwrap_or_default(),
717                    binary_tags,
718                    structural_blocks,
719                })
720            }
721            Ok(Extent::NeedMore { up_to }) => Probe::NeedMore(up_to),
722            Err(_) => Probe::Skip,
723        }
724    } else if has_ext(path, "mp3") {
725        match mp3::locate_audio_bounded(prefix, file_len, tail) {
726            Ok(Extent::Complete(b)) => {
727                let (binary_tags, promoted) = mp3::read_binary_tags(prefix);
728                let mut tags = mp3::read_tags(prefix);
729                tags.extend(promoted);
730                Probe::Done(Probed {
731                    format: Format::Mp3,
732                    audio_offset: b.audio_offset,
733                    audio_length: b.audio_length,
734                    tags,
735                    pictures: mp3::read_pictures(prefix),
736                    binary_tags,
737                    structural_blocks: Vec::new(),
738                })
739            }
740            Ok(Extent::NeedMore { up_to }) => Probe::NeedMore(up_to),
741            Err(_) => Probe::Skip,
742        }
743    } else if has_ext(path, "ogg") || has_ext(path, "oga") || has_ext(path, "opus") {
744        match ogg::read_metadata_bounded(prefix, file_len) {
745            Ok(Extent::Complete(header)) => {
746                let format = match header.codec {
747                    ogg::Codec::Opus => Format::Opus,
748                    ogg::Codec::Vorbis => Format::Vorbis,
749                    ogg::Codec::OggFlac => Format::OggFlac,
750                };
751                Probe::Done(Probed {
752                    format,
753                    audio_offset: header.audio_offset,
754                    audio_length: file_len - header.audio_offset,
755                    tags: ogg::read_tags(prefix).unwrap_or_default(),
756                    pictures: ogg::read_pictures(prefix).unwrap_or_default(),
757                    binary_tags: Vec::new(),
758                    structural_blocks: Vec::new(),
759                })
760            }
761            Ok(Extent::NeedMore { up_to }) => Probe::NeedMore(up_to),
762            Err(_) => Probe::Skip,
763        }
764    } else if has_ext(path, "wav") {
765        match wav::locate_audio_bounded(prefix, file_len) {
766            Ok(Extent::Complete(b)) => Probe::Done(wav_probed(prefix, &b)),
767            Ok(Extent::NeedMore { up_to }) => Probe::NeedMore(up_to),
768            Err(_) => Probe::Skip,
769        }
770    } else {
771        Probe::Skip
772    }
773}
774
775/// How much checksum work a scan does per file.
776#[derive(Debug, Clone, Copy, PartialEq, Eq)]
777pub enum ChecksumTier {
778    /// No checksums (legacy behavior).
779    None,
780    /// Compute the cheap fingerprint only (rides the probe).
781    Fingerprint,
782    /// Fingerprint plus an eager full-file SHA-256.
783    Full,
784}
785
786/// How a fingerprint match is confirmed before a retarget.
787#[derive(Debug, Clone, Copy, PartialEq, Eq)]
788pub enum MatchStrictness {
789    /// Confirm with the full hash when the candidate has one; else trust the
790    /// fingerprint.
791    Auto,
792    /// Fingerprint match is always sufficient; never read the full file.
793    Fast,
794    /// Require a full-hash match; refuse the retarget if the candidate has no
795    /// stored content_hash.
796    Strict,
797}
798
799/// Knobs for a scan. `jobs == 0` means "use available parallelism".
800#[derive(Debug, Clone)]
801pub struct ScanOptions {
802    pub jobs: usize,
803    /// Initial probe read window in bytes; widened on `NeedMore`.
804    pub window: usize,
805    /// In-flight art-byte budget and per-batch byte-flush threshold.
806    pub batch_bytes: u64,
807    /// Follow symlinks during collection. Off by default: symlinks are logged
808    /// and skipped, which keeps the walk immune to directory-symlink cycles.
809    pub follow_symlinks: bool,
810    /// Optional progress callback. `None` (the default) disables reporting.
811    pub progress: Option<ProgressSink>,
812    /// Which checksums to compute and store this scan.
813    pub checksum: ChecksumTier,
814    /// How a refind fingerprint match is confirmed before retargeting.
815    pub strictness: MatchStrictness,
816}
817
818impl Default for ScanOptions {
819    fn default() -> Self {
820        Self {
821            jobs: 0,
822            window: WINDOW,
823            batch_bytes: BATCH_BYTES,
824            follow_symlinks: false,
825            progress: None,
826            checksum: ChecksumTier::Fingerprint,
827            strictness: MatchStrictness::Auto,
828        }
829    }
830}
831
832fn effective_jobs(jobs: usize) -> usize {
833    if jobs != 0 {
834        return jobs;
835    }
836    std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
837}
838
839/// One probed file ready to write, plus its art-byte weight for backpressure.
840struct Unit {
841    abs_path: String,
842    stamp: BackingStamp,
843    probed: Probed,
844    weight: u64,
845    fingerprint: Option<String>,
846    content_hash: Option<String>,
847}
848
849/// In-memory byte weight of a `Probed`, used for batch backpressure
850/// (`ScanOptions::batch_bytes`). Counts every buffered payload — pictures plus FLAC
851/// structural blocks and binary tags — so large preserved blocks can't slip the
852/// budget the way picture-only accounting did.
853fn payload_weight(p: &Probed) -> u64 {
854    let pictures: u64 = p.pictures.iter().map(|pic| pic.data.len() as u64).sum();
855    let binary: u64 = p.binary_tags.iter().map(|t| t.payload.len() as u64).sum();
856    let structural: u64 = p
857        .structural_blocks
858        .iter()
859        .map(|(_, body)| body.len() as u64)
860        .sum();
861    pictures + binary + structural
862}
863
864/// The universal `tags.key` floor, mirrored from the DB `CHECK` exactly: a key
865/// must be non-empty and contain no byte below 0x20 (the control chars the DB
866/// rejects via its GLOB range; NUL also fails here, the DB's documented blind
867/// spot). DEL (0x7F) and high/non-ASCII bytes are accepted, matching the DB.
868/// Distinct from the strict Vorbis `is_valid_key` (which also bars `=`, 0x7E,
869/// 0x7F, and non-ASCII) — applying that here would wrongly drop legal MP3/M4A
870/// custom keys containing `=`/`:`/space.
871fn key_passes_floor(key: &str) -> bool {
872    !key.is_empty() && key.bytes().all(|b| b >= 0x20)
873}
874
875/// Drops embedded pictures over [`MAX_ART_BYTES`], logging each so a cover that
876/// vanishes from the synthesized view is explained rather than silent (#284).
877/// Filtering here, before the caller enumerates, keeps stored art ordinals
878/// gap-free. Note: the mp4 `covr` path caps oversize art earlier, inside
879/// `mp4::read_pictures`, so those drops never reach this filter.
880fn accept_pictures(abs_path: &str, pictures: Vec<EmbeddedPicture>) -> Vec<EmbeddedPicture> {
881    pictures
882        .into_iter()
883        .filter(|p| {
884            if p.data.len() > MAX_ART_BYTES {
885                log::warn!(
886                    "{abs_path}: dropping embedded {} art ({} bytes), over the {MAX_ART_BYTES}-byte cap",
887                    p.mime,
888                    p.data.len(),
889                );
890                return false;
891            }
892            true
893        })
894        .collect()
895}
896
897/// Filters embedded binary tags to those worth storing, logging oversize drops
898/// (#284). Empty payloads carry nothing to serve, so they are dropped silently;
899/// payloads over [`MAX_BINARY_TAG_BYTES`] are a lossy drop and get a warning.
900fn accept_binary_tags(abs_path: &str, tags: Vec<EmbeddedBinaryTag>) -> Vec<musefs_db::BinaryTag> {
901    tags.into_iter()
902        .filter(|b| {
903            if b.payload.len() > MAX_BINARY_TAG_BYTES {
904                log::warn!(
905                    "{abs_path}: dropping binary tag {} ({} bytes), over the {MAX_BINARY_TAG_BYTES}-byte cap",
906                    b.key,
907                    b.payload.len(),
908                );
909                return false;
910            }
911            !b.payload.is_empty()
912        })
913        .enumerate()
914        .map(|(ordinal, b)| musefs_db::BinaryTag {
915            key: b.key,
916            payload: b.payload,
917            ordinal: ordinal as u64,
918        })
919        .collect()
920}
921
922/// Logs each oversized mp4 `covr` image / binary `----` value that the format
923/// layer skipped before materialization (#343). These drops happen inside
924/// `mp4::read_pictures` / `mp4::read_binary_tags` — earlier than the `accept_*`
925/// ingest filters that log the lossy drops for the other formats (#284), and
926/// deliberately so, to avoid building a large image out of a large `moov` — so
927/// they are surfaced here at probe time, mirroring the `accept_*` message shape.
928fn log_mp4_oversize_drops(path: &Path, art: &[mp4::OversizeDrop], binary: &[mp4::OversizeDrop]) {
929    for d in art {
930        log::warn!(
931            "{}: dropping embedded {} art ({} bytes), over the {MAX_ART_BYTES}-byte cap",
932            path.display(),
933            d.descriptor,
934            d.bytes,
935        );
936    }
937    for d in binary {
938        log::warn!(
939            "{}: dropping binary tag {} ({} bytes), over the {MAX_BINARY_TAG_BYTES}-byte cap",
940            path.display(),
941            d.descriptor,
942            d.bytes,
943        );
944    }
945}
946
947/// The write surface `ingest_into` drives: satisfied by both a direct `&Db`
948/// (its methods take `&self`) and a batched `&mut BulkWriter` (`&mut self`), so
949/// the upsert body lives in exactly one place. Each method delegates through the
950/// concrete type path (`Db::`/`BulkWriter::`), which names the inherent method
951/// unambiguously so the same-named trait method can't recurse into itself.
952trait TrackSink {
953    fn upsert_track(&mut self, t: &NewTrack) -> musefs_db::Result<i64>;
954    fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> musefs_db::Result<()>;
955    fn set_binary_tags(
956        &mut self,
957        track_id: i64,
958        tags: &[musefs_db::BinaryTag],
959    ) -> musefs_db::Result<()>;
960    fn set_structural_blocks(
961        &mut self,
962        track_id: i64,
963        blocks: &[musefs_db::StructuralBlock],
964    ) -> musefs_db::Result<()>;
965    fn upsert_art(&mut self, a: &NewArt) -> musefs_db::Result<i64>;
966    fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> musefs_db::Result<()>;
967    fn set_track_checksums(
968        &mut self,
969        track_id: i64,
970        fingerprint: Option<&str>,
971        content_hash: Option<&str>,
972    ) -> musefs_db::Result<()>;
973    fn track_exists_at(&mut self, path: &str) -> musefs_db::Result<bool>;
974    fn tracks_by_fingerprint(&mut self, fp: &str) -> musefs_db::Result<Vec<musefs_db::Track>>;
975    #[allow(clippy::too_many_arguments)]
976    fn retarget_track(
977        &mut self,
978        id: i64,
979        new_backing_path: &str,
980        stamp: BackingStamp,
981        audio_offset: u64,
982        audio_length: u64,
983        fingerprint: Option<&str>,
984        content_hash: Option<&str>,
985    ) -> musefs_db::Result<()>;
986}
987
988impl TrackSink for &Db {
989    fn upsert_track(&mut self, t: &NewTrack) -> musefs_db::Result<i64> {
990        Db::upsert_track(self, t)
991    }
992    fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> musefs_db::Result<()> {
993        Db::replace_tags(self, track_id, tags)
994    }
995    fn set_binary_tags(
996        &mut self,
997        track_id: i64,
998        tags: &[musefs_db::BinaryTag],
999    ) -> musefs_db::Result<()> {
1000        Db::set_binary_tags(self, track_id, tags)
1001    }
1002    fn set_structural_blocks(
1003        &mut self,
1004        track_id: i64,
1005        blocks: &[musefs_db::StructuralBlock],
1006    ) -> musefs_db::Result<()> {
1007        Db::set_structural_blocks(self, track_id, blocks)
1008    }
1009    fn upsert_art(&mut self, a: &NewArt) -> musefs_db::Result<i64> {
1010        Db::upsert_art(self, a)
1011    }
1012    fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> musefs_db::Result<()> {
1013        Db::set_track_art(self, track_id, items)
1014    }
1015    fn set_track_checksums(
1016        &mut self,
1017        track_id: i64,
1018        fingerprint: Option<&str>,
1019        content_hash: Option<&str>,
1020    ) -> musefs_db::Result<()> {
1021        Db::set_track_checksums(self, track_id, fingerprint, content_hash)
1022    }
1023    fn track_exists_at(&mut self, path: &str) -> musefs_db::Result<bool> {
1024        Ok(Db::get_track_by_path(self, path)?.is_some())
1025    }
1026    fn tracks_by_fingerprint(&mut self, fp: &str) -> musefs_db::Result<Vec<musefs_db::Track>> {
1027        Db::tracks_by_fingerprint(self, fp)
1028    }
1029    fn retarget_track(
1030        &mut self,
1031        id: i64,
1032        new_backing_path: &str,
1033        stamp: BackingStamp,
1034        audio_offset: u64,
1035        audio_length: u64,
1036        fingerprint: Option<&str>,
1037        content_hash: Option<&str>,
1038    ) -> musefs_db::Result<()> {
1039        Db::retarget_track(
1040            self,
1041            id,
1042            new_backing_path,
1043            stamp.size,
1044            stamp.mtime_ns,
1045            stamp.ctime_ns,
1046            audio_offset,
1047            audio_length,
1048            fingerprint,
1049            content_hash,
1050        )
1051    }
1052}
1053
1054impl TrackSink for &mut musefs_db::BulkWriter<'_> {
1055    fn upsert_track(&mut self, t: &NewTrack) -> musefs_db::Result<i64> {
1056        musefs_db::BulkWriter::upsert_track(self, t)
1057    }
1058    fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> musefs_db::Result<()> {
1059        musefs_db::BulkWriter::replace_tags(self, track_id, tags)
1060    }
1061    fn set_binary_tags(
1062        &mut self,
1063        track_id: i64,
1064        tags: &[musefs_db::BinaryTag],
1065    ) -> musefs_db::Result<()> {
1066        musefs_db::BulkWriter::set_binary_tags(self, track_id, tags)
1067    }
1068    fn set_structural_blocks(
1069        &mut self,
1070        track_id: i64,
1071        blocks: &[musefs_db::StructuralBlock],
1072    ) -> musefs_db::Result<()> {
1073        musefs_db::BulkWriter::set_structural_blocks(self, track_id, blocks)
1074    }
1075    fn upsert_art(&mut self, a: &NewArt) -> musefs_db::Result<i64> {
1076        musefs_db::BulkWriter::upsert_art(self, a)
1077    }
1078    fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> musefs_db::Result<()> {
1079        musefs_db::BulkWriter::set_track_art(self, track_id, items)
1080    }
1081    fn set_track_checksums(
1082        &mut self,
1083        track_id: i64,
1084        fingerprint: Option<&str>,
1085        content_hash: Option<&str>,
1086    ) -> musefs_db::Result<()> {
1087        musefs_db::BulkWriter::set_track_checksums(self, track_id, fingerprint, content_hash)
1088    }
1089    fn track_exists_at(&mut self, path: &str) -> musefs_db::Result<bool> {
1090        Ok(musefs_db::BulkWriter::get_track_by_path(self, path)?.is_some())
1091    }
1092    fn tracks_by_fingerprint(&mut self, fp: &str) -> musefs_db::Result<Vec<musefs_db::Track>> {
1093        musefs_db::BulkWriter::tracks_by_fingerprint(self, fp)
1094    }
1095    fn retarget_track(
1096        &mut self,
1097        id: i64,
1098        new_backing_path: &str,
1099        stamp: BackingStamp,
1100        audio_offset: u64,
1101        audio_length: u64,
1102        fingerprint: Option<&str>,
1103        content_hash: Option<&str>,
1104    ) -> musefs_db::Result<()> {
1105        musefs_db::BulkWriter::retarget_track(
1106            self,
1107            id,
1108            new_backing_path,
1109            stamp.size,
1110            stamp.mtime_ns,
1111            stamp.ctime_ns,
1112            audio_offset,
1113            audio_length,
1114            fingerprint,
1115            content_hash,
1116        )
1117    }
1118}
1119
1120/// Upsert a track from a probed backing file into `w`: write the track row,
1121/// replace its seeded tags, and ingest its embedded art (capped, deduped,
1122/// clamped). The single source of the ingest body shared by `ingest` (direct
1123/// `&Db`), `ingest_unit` (production batch path), and `ingest_bulk` (test-only
1124/// `BulkWriter` wrapper). Takes `probed` by value so
1125/// picture/binary-tag/structural-block bytes are moved, not cloned (#68).
1126fn ingest_into(
1127    mut w: impl TrackSink,
1128    abs_path: &str,
1129    stamp: BackingStamp,
1130    probed: Probed,
1131    fingerprint: Option<&str>,
1132    content_hash: Option<&str>,
1133) -> Result<()> {
1134    let track_id = w.upsert_track(&NewTrack {
1135        backing_path: abs_path.to_string(),
1136        format: probed.format,
1137        audio_offset: probed.audio_offset,
1138        audio_length: probed.audio_length,
1139        backing_size: stamp.size,
1140        backing_mtime_ns: stamp.mtime_ns,
1141        backing_ctime_ns: stamp.ctime_ns,
1142    })?;
1143    w.set_track_checksums(track_id, fingerprint, content_hash)?;
1144
1145    let mut tags = Vec::new();
1146    let mut ordinals: HashMap<String, u64> = HashMap::new();
1147    for (key, value) in probed.tags {
1148        if !key_passes_floor(&key) {
1149            continue;
1150        }
1151        let ord = ordinals.entry(key.clone()).or_insert(0);
1152        tags.push(Tag::new(&key, &value, *ord));
1153        *ord += 1;
1154    }
1155    w.replace_tags(track_id, &tags)?;
1156
1157    let binary_tags = accept_binary_tags(abs_path, probed.binary_tags);
1158    w.set_binary_tags(track_id, &binary_tags)?;
1159
1160    let mut sb_ordinals: HashMap<String, u64> = HashMap::new();
1161    let structural_blocks: Vec<musefs_db::StructuralBlock> = probed
1162        .structural_blocks
1163        .into_iter()
1164        .map(|(kind, body)| {
1165            let ord = sb_ordinals.entry(kind.clone()).or_insert(0);
1166            let sb = musefs_db::StructuralBlock {
1167                kind,
1168                ordinal: *ord,
1169                body,
1170            };
1171            *ord += 1;
1172            sb
1173        })
1174        .collect();
1175    w.set_structural_blocks(track_id, &structural_blocks)?;
1176
1177    let mut track_arts = Vec::new();
1178    for (ordinal, pic) in accept_pictures(abs_path, probed.pictures)
1179        .into_iter()
1180        .enumerate()
1181    {
1182        let art_id = w.upsert_art(&NewArt {
1183            mime: pic.mime,
1184            width: (pic.width != 0).then_some(pic.width),
1185            height: (pic.height != 0).then_some(pic.height),
1186            data: pic.data,
1187        })?;
1188        let picture_type = pic.picture_type.get();
1189        track_arts.push(TrackArt {
1190            art_id,
1191            picture_type,
1192            description: pic.description,
1193            ordinal: ordinal as u64,
1194        });
1195    }
1196    w.set_track_art(track_id, &track_arts)?;
1197    Ok(())
1198}
1199
1200/// Decide how to ingest one probed unit: retarget a relocated row when a unique
1201/// fingerprint match exists whose backing file is gone, otherwise ingest fresh.
1202/// The strict/auto confirm hash, if computed here, is persisted on the retarget
1203/// (so a fingerprint-tier strict move doesn't re-read the file next scan).
1204fn ingest_unit(mut w: impl TrackSink, unit: Unit, strictness: MatchStrictness) -> Result<()> {
1205    // Known path => ordinary upsert (re-scan of an in-place file).
1206    if w.track_exists_at(&unit.abs_path)? {
1207        return ingest_into(
1208            w,
1209            &unit.abs_path,
1210            unit.stamp,
1211            unit.probed,
1212            unit.fingerprint.as_deref(),
1213            unit.content_hash.as_deref(),
1214        );
1215    }
1216    if let Some(fp) = unit.fingerprint.as_deref() {
1217        let candidates: Vec<musefs_db::Track> = w
1218            .tracks_by_fingerprint(fp)?
1219            .into_iter()
1220            .filter(|t| match std::fs::metadata(&t.backing_path) {
1221                Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
1222                Ok(_) => false,
1223                Err(e) => {
1224                    log::warn!(
1225                        "skipping retarget candidate {}: cannot stat backing path ({e})",
1226                        t.backing_path
1227                    );
1228                    false
1229                }
1230            })
1231            .collect();
1232        if candidates.len() == 1 {
1233            let cand = &candidates[0];
1234            // Does this strictness need a full-hash confirm against this candidate?
1235            let needs_full = match strictness {
1236                MatchStrictness::Fast => false,
1237                MatchStrictness::Auto | MatchStrictness::Strict => cand.content_hash.is_some(),
1238            };
1239            // The new file's full hash: worker-computed if present, else read now
1240            // (the file is present — it's the move destination). A read error here
1241            // must not abort the whole scan — log it and fall through with `None`,
1242            // which fails the confirm and inserts this unit fresh.
1243            let new_hash: Option<String> = match (&unit.content_hash, needs_full) {
1244                (Some(h), _) => Some(h.clone()),
1245                (None, true) => match full_file_hash(std::path::Path::new(&unit.abs_path)) {
1246                    Ok(h) => Some(h),
1247                    Err(e) => {
1248                        log::warn!(
1249                            "hash confirm failed for {}: {e}; inserting fresh",
1250                            unit.abs_path
1251                        );
1252                        None
1253                    }
1254                },
1255                (None, false) => None,
1256            };
1257            let confirmed = match strictness {
1258                MatchStrictness::Fast => true,
1259                MatchStrictness::Auto | MatchStrictness::Strict => match &cand.content_hash {
1260                    // Strict with no stored hash => refuse; Auto with none => fingerprint is enough.
1261                    None => matches!(strictness, MatchStrictness::Auto),
1262                    Some(stored) => new_hash.as_deref() == Some(stored.as_str()),
1263                },
1264            };
1265            if confirmed && !w.track_exists_at(&unit.abs_path)? {
1266                w.retarget_track(
1267                    cand.id,
1268                    &unit.abs_path,
1269                    unit.stamp,
1270                    unit.probed.audio_offset,
1271                    unit.probed.audio_length,
1272                    unit.fingerprint.as_deref(),
1273                    new_hash.as_deref(),
1274                )?;
1275                return Ok(());
1276            }
1277            if !confirmed {
1278                log::warn!(
1279                    "fingerprint match for {} not confirmed (strictness {:?}); inserting fresh",
1280                    unit.abs_path,
1281                    strictness,
1282                );
1283            }
1284        } else if candidates.len() > 1 {
1285            log::warn!(
1286                "ambiguous fingerprint match for {} ({} missing candidates); inserting fresh",
1287                unit.abs_path,
1288                candidates.len(),
1289            );
1290        }
1291    }
1292    ingest_into(
1293        w,
1294        &unit.abs_path,
1295        unit.stamp,
1296        unit.probed,
1297        unit.fingerprint.as_deref(),
1298        unit.content_hash.as_deref(),
1299    )
1300}
1301
1302/// Upsert a track from a probed backing file through a direct `&Db`. Thin
1303/// wrapper over [`ingest_into`]; the `oracle`/non-bulk scan path.
1304fn ingest(db: &Db, abs_path: &str, meta: &std::fs::Metadata, probed: Probed) -> Result<()> {
1305    ingest_into(
1306        db,
1307        abs_path,
1308        BackingStamp::from_metadata(meta),
1309        probed,
1310        None,
1311        None,
1312    )
1313}
1314
1315/// Like [`ingest`], but writes through a batch `BulkWriter`. Thin wrapper over
1316/// [`ingest_into`]; the `stamp` is captured once by the caller's `fstat`. The
1317/// production batch path inlines `ingest_into` (it threads per-unit checksums),
1318/// so this wrapper now only serves the hardening tests' bulk-writer coverage.
1319#[cfg(test)]
1320fn ingest_bulk(
1321    bw: &mut musefs_db::BulkWriter<'_>,
1322    abs_path: &str,
1323    stamp: BackingStamp,
1324    probed: Probed,
1325) -> Result<()> {
1326    ingest_into(bw, abs_path, stamp, probed, None, None)
1327}
1328
1329/// Public entry: parallel-probe / single-writer scan of `root`.
1330///
1331/// Insert/update a track row for each supported audio file (FLAC, MP3, M4A,
1332/// Opus, Vorbis, FLAC-in-Ogg) under `root` (with audio bounds and validation
1333/// stamps), seeding its tags from the file's existing metadata. `root` may be
1334/// a single audio file (only that file is scanned) or a directory (walked
1335/// recursively). Files whose extension is not a supported audio format
1336/// increment `ScanStats::skipped` and are tallied by extension for the
1337/// end-of-scan summary log line (#341); supported-extension files with a
1338/// per-file I/O or parse error increment `ScanStats::failed` and do not abort
1339/// the scan.
1340pub fn scan_directory_with(db: &Db, root: &Path, opts: &ScanOptions) -> Result<ScanStats> {
1341    // Canonicalize the root once. With symlinks unfollowed (the default) every
1342    // path the walk yields is then already absolute and symlink-free — i.e.
1343    // canonical — so the workers need not canonicalize each probed file (#440).
1344    let canon = std::fs::canonicalize(root)?;
1345    let root = canon.as_path();
1346    let mut files = Vec::new();
1347    let mut tally = SkipTally::default();
1348    if root.is_file() {
1349        if is_supported_audio(root) {
1350            files.push(root.to_path_buf());
1351        } else {
1352            tally.record(root);
1353        }
1354    } else {
1355        tally = collect_audio_with(
1356            root,
1357            &mut files,
1358            opts.follow_symlinks,
1359            opts.progress.as_ref(),
1360        )?;
1361    }
1362    if let Some(p) = &opts.progress {
1363        p.emit(ScanProgress::Walked {
1364            total: files.len() as u64,
1365        });
1366    }
1367    db.apply_bulk_pragmas_self()?; // scan-scoped tuning on the caller's connection
1368    let mut stats = run_pipeline(db, files, opts)?;
1369    // skipped is tallied during the walk, not the pipeline
1370    stats.skipped = tally.total;
1371    // Per-extension breakdown of the skip count, so a large `skipped` is
1372    // diagnosable (#341). Log-only: never folded into `stats`/the CLI summary.
1373    if let Some(summary) = tally.summary() {
1374        log::warn!("{summary}");
1375    }
1376    Ok(stats)
1377}
1378
1379/// Back-compat shim used by the CLI and existing tests.
1380pub fn scan_directory(db: &Db, root: &Path) -> Result<ScanStats> {
1381    scan_directory_with(db, root, &ScanOptions::default())
1382}
1383
1384/// Probe `files` across `jobs` workers (no DB access) and write the results from a
1385/// single writer (this thread) in batched transactions. Per-file errors are
1386/// counted, not fatal.
1387fn run_pipeline(db: &Db, files: Vec<PathBuf>, opts: &ScanOptions) -> Result<ScanStats> {
1388    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1389
1390    let jobs = effective_jobs(opts.jobs);
1391    let total = files.len() as u64;
1392    let progress = opts.progress.as_ref();
1393    let window = opts.window;
1394    let follow_symlinks = opts.follow_symlinks;
1395    let tier = opts.checksum;
1396    let strictness = opts.strictness;
1397    let cap = opts.batch_bytes;
1398    let budget = Arc::new(ByteBudget::new(cap));
1399    let failed = Arc::new(AtomicU64::new(0));
1400    let raced = Arc::new(AtomicU64::new(0));
1401
1402    // Work queue: a shared slice with an atomic cursor — each worker claims the
1403    // next index with a single relaxed `fetch_add`, no per-file lock contention.
1404    let files = Arc::new(files);
1405    let cursor = Arc::new(AtomicUsize::new(0));
1406    let (tx, rx) = sync_channel::<Unit>(jobs * 2);
1407
1408    let mut workers = Vec::with_capacity(jobs);
1409    for _ in 0..jobs {
1410        let files = Arc::clone(&files);
1411        let cursor = Arc::clone(&cursor);
1412        let tx = tx.clone();
1413        let budget = Arc::clone(&budget);
1414        let failed = Arc::clone(&failed);
1415        let raced = Arc::clone(&raced);
1416        workers.push(std::thread::spawn(move || {
1417            loop {
1418                let i = cursor.fetch_add(1, Ordering::Relaxed);
1419                let Some(path) = files.get(i) else { break };
1420                match probe_file_caught(path, window) {
1421                    Ok(ProbeOutcome::Probed(probed, stamp)) => {
1422                        // No-follow paths are canonical by construction (the root
1423                        // was canonicalized up front); only the opt-in symlink walk
1424                        // can yield a path with a symlink component to resolve (#440).
1425                        let abs_path = if follow_symlinks {
1426                            match std::fs::canonicalize(path) {
1427                                Ok(abs) => abs.to_string_lossy().into_owned(),
1428                                Err(e) => {
1429                                    log::warn!("skipping {}: {e}", path.display());
1430                                    failed.fetch_add(1, Ordering::Relaxed);
1431                                    continue;
1432                                }
1433                            }
1434                        } else {
1435                            path.to_string_lossy().into_owned()
1436                        };
1437                        let weight = payload_weight(&probed);
1438                        budget.acquire(weight); // backpressure on in-flight art bytes
1439                        let fingerprint = match tier {
1440                            ChecksumTier::None => None,
1441                            ChecksumTier::Fingerprint | ChecksumTier::Full => {
1442                                Some(fingerprint_of(&probed))
1443                            }
1444                        };
1445                        let content_hash = match tier {
1446                            ChecksumTier::Full => {
1447                                match full_file_hash(std::path::Path::new(&abs_path)) {
1448                                    Ok(h) => Some(h),
1449                                    Err(e) => {
1450                                        log::warn!("content hash failed for {abs_path}: {e}");
1451                                        None
1452                                    }
1453                                }
1454                            }
1455                            _ => None,
1456                        };
1457                        let unit = Unit {
1458                            abs_path,
1459                            stamp,
1460                            probed,
1461                            weight,
1462                            fingerprint,
1463                            content_hash,
1464                        };
1465                        if tx.send(unit).is_err() {
1466                            budget.release(weight);
1467                            break;
1468                        }
1469                    }
1470                    Ok(ProbeOutcome::Unparseable) => {
1471                        failed.fetch_add(1, Ordering::Relaxed);
1472                    }
1473                    Err(e) => {
1474                        log::warn!("skipping {}: {e}", path.display());
1475                        failed.fetch_add(1, Ordering::Relaxed);
1476                    }
1477                    Ok(ProbeOutcome::Raced) => {
1478                        raced.fetch_add(1, Ordering::Relaxed);
1479                    }
1480                }
1481            }
1482        }));
1483    }
1484    drop(tx); // close the channel once all clones (workers) finish
1485
1486    // Writer: this thread. Batch by file count and accumulated art bytes.
1487    let mut scanned = 0u64;
1488    let mut batch: Vec<Unit> = Vec::new();
1489    let mut batch_bytes = 0u64;
1490    let flush = |batch: &mut Vec<Unit>, batch_bytes: &mut u64, scanned: &mut u64| -> Result<()> {
1491        if batch.is_empty() {
1492            return Ok(());
1493        }
1494        let mut bw = db.bulk_writer()?;
1495        // Budget weights are released only after commit, and ingest_unit consumes
1496        // the Probed — capture each unit's weight before the move (#68).
1497        let mut released = 0u64;
1498        // `Ingested` reports committed files, so buffer the paths and emit only
1499        // after `bw.commit()` succeeds — a failed commit aborts the scan without
1500        // having advanced the progress bar past unpersisted files.
1501        let mut committed: Vec<String> = Vec::new();
1502        for unit in batch.drain(..) {
1503            released += unit.weight;
1504            committed.push(unit.abs_path.clone());
1505            ingest_unit(&mut bw, unit, strictness)?;
1506        }
1507        bw.commit()?;
1508        for abs_path in committed {
1509            *scanned += 1;
1510            if let Some(p) = progress {
1511                p.emit(ScanProgress::Ingested {
1512                    done: *scanned,
1513                    total,
1514                    path: &abs_path,
1515                });
1516            }
1517        }
1518        // Coalesce into one wakeup: the commit frees the whole batch, so a single
1519        // release avoids waking every blocked producer once per committed file.
1520        budget.release(released);
1521        *batch_bytes = 0;
1522        Ok(())
1523    };
1524
1525    // Drain the channel, batching by file count and accumulated art bytes. The
1526    // budget cap equals the byte-flush threshold, so a worker calling
1527    // `budget.acquire` (which it does *before* `send`) could block while the
1528    // writer's pending batch sits just below the threshold — if the writer then
1529    // parked on a blocking `recv`, neither side could make progress (the held
1530    // budget is never released, the batch never reaches the threshold). To avoid
1531    // that, whenever the channel momentarily drains we flush the pending batch —
1532    // releasing the budget so blocked producers proceed — *before* blocking on the
1533    // next item.
1534    loop {
1535        match rx.try_recv() {
1536            Ok(unit) => {
1537                batch_bytes += unit.weight;
1538                batch.push(unit);
1539                if batch.len() >= BATCH_FILES || batch_bytes >= cap {
1540                    flush(&mut batch, &mut batch_bytes, &mut scanned)?;
1541                }
1542            }
1543            Err(std::sync::mpsc::TryRecvError::Empty) => {
1544                flush(&mut batch, &mut batch_bytes, &mut scanned)?;
1545                match rx.recv() {
1546                    Ok(unit) => {
1547                        batch_bytes += unit.weight;
1548                        batch.push(unit);
1549                        if batch.len() >= BATCH_FILES || batch_bytes >= cap {
1550                            flush(&mut batch, &mut batch_bytes, &mut scanned)?;
1551                        }
1552                    }
1553                    Err(_) => break, // all workers finished; channel closed
1554                }
1555            }
1556            Err(std::sync::mpsc::TryRecvError::Disconnected) => break,
1557        }
1558    }
1559    flush(&mut batch, &mut batch_bytes, &mut scanned)?;
1560    // A fatal flush error above returns via `?` *before* this join, abandoning the
1561    // worker threads — acceptable because a DB-write failure aborts the whole scan.
1562    // On the success path every worker has already exited (the work queue drained
1563    // and `drop(tx)` closed the channel), so these joins return promptly.
1564    for w in workers {
1565        let _ = w.join();
1566    }
1567
1568    Ok(ScanStats {
1569        scanned,
1570        skipped: 0, // counted at walk time; filled in by scan_directory_with
1571        failed: failed.load(Ordering::Relaxed),
1572        raced: raced.load(Ordering::Relaxed),
1573    })
1574}
1575
1576/// Test/oracle only: scan using the legacy whole-file probe (`probe_full`). The
1577/// equivalence property compares this against the bounded `scan_directory`.
1578#[doc(hidden)]
1579pub fn scan_directory_full_oracle(db: &Db, root: &Path) -> Result<ScanStats> {
1580    let mut files = Vec::new();
1581    let mut skipped = 0u64;
1582    if root.is_file() {
1583        if is_supported_audio(root) {
1584            files.push(root.to_path_buf());
1585        } else {
1586            skipped += 1;
1587        }
1588    } else {
1589        skipped += collect_audio(root, &mut files, false)?.total;
1590    }
1591    let mut stats = ScanStats {
1592        scanned: 0,
1593        skipped,
1594        failed: 0,
1595        raced: 0,
1596    };
1597    for path in files {
1598        let bytes = std::fs::read(&path)?;
1599        let Some(probed) = probe_full(&path, &bytes) else {
1600            stats.failed += 1;
1601            continue;
1602        };
1603        let meta = std::fs::metadata(&path)?;
1604        let abs = std::fs::canonicalize(&path)?;
1605        ingest(db, &abs.to_string_lossy(), &meta, probed)?;
1606        stats.scanned += 1;
1607    }
1608    Ok(stats)
1609}
1610
1611/// Re-validate an already-scanned library root: re-probe only files whose
1612/// size/mtime/ctime changed since the last scan (skipping unchanged ones so external
1613/// tag edits in the DB are preserved), then delete tracks **under `root`** whose
1614/// backing file is gone (cascading tags/art links) and garbage-collect
1615/// now-unreferenced art. `root` may be a single audio file (only that file is
1616/// revalidated) or a directory (walked recursively). Pruning is scoped to
1617/// `root`, so revalidating one library root never removes tracks belonging to
1618/// another.
1619///
1620/// Uses `opts` to configure the probe pipeline (e.g. `jobs` for parallelism).
1621/// The skip-unchanged decision runs on the calling thread before workers are
1622/// dispatched, so workers remain DB-free. A `stat`/`canonicalize` failure on a
1623/// candidate during the skip pass is counted in `failed` (and the file is left
1624/// for the next revalidation) rather than re-probed or pruned.
1625pub fn revalidate_with(db: &Db, root: &Path, opts: &ScanOptions) -> Result<RevalidateStats> {
1626    // Canonicalize once; see scan_directory_with (#440). The prune pass below reuses
1627    // this canonical root for its `starts_with` scope check.
1628    let canon = std::fs::canonicalize(root)?;
1629    let root = canon.as_path();
1630    let mut files = Vec::new();
1631    if root.is_file() {
1632        if is_supported_audio(root) {
1633            files.push(root.to_path_buf());
1634        }
1635    } else {
1636        collect_audio_with(
1637            root,
1638            &mut files,
1639            opts.follow_symlinks,
1640            opts.progress.as_ref(),
1641        )?;
1642    }
1643    db.apply_bulk_pragmas_self()?;
1644
1645    // Main-thread pre-dispatch skip pass: load existing
1646    // (path -> stamp, id, format, has_fingerprint, has_content_hash) once,
1647    // stat each candidate, keep only changed files. Workers stay DB-free.
1648    let existing: HashMap<String, (crate::freshness::BackingStamp, i64, Format, bool, bool)> = db
1649        .list_tracks()?
1650        .into_iter()
1651        .map(|t| {
1652            (
1653                t.backing_path.clone(),
1654                (
1655                    crate::freshness::BackingStamp::from_track(&t),
1656                    t.id,
1657                    t.format,
1658                    t.fingerprint.is_some(),
1659                    t.content_hash.is_some(),
1660                ),
1661            )
1662        })
1663        .collect();
1664    // Legacy backfill (spec §1): FLAC tracks scanned under V1 have no structural
1665    // blocks. Re-scan them even when the backing file is unchanged so the V2
1666    // structural store + binary tags get populated by the ingest path.
1667    let have_structural = db.track_ids_with_structural_blocks()?;
1668
1669    let mut unchanged = 0u64;
1670    let mut skip_failed = 0u64;
1671    let mut changed: Vec<PathBuf> = Vec::new();
1672    for path in files {
1673        let meta = match std::fs::metadata(&path) {
1674            Ok(meta) => meta,
1675            Err(e) => {
1676                log::warn!("skipping {}: {e}", path.display());
1677                skip_failed += 1;
1678                continue;
1679            }
1680        };
1681        let key = if opts.follow_symlinks {
1682            match std::fs::canonicalize(&path) {
1683                Ok(abs) => abs.to_string_lossy().into_owned(),
1684                Err(e) => {
1685                    log::warn!("skipping {}: {e}", path.display());
1686                    skip_failed += 1;
1687                    continue;
1688                }
1689            }
1690        } else {
1691            path.to_string_lossy().into_owned()
1692        };
1693        if let Some((stamp, id, format, has_fingerprint, has_content_hash)) =
1694            existing.get(&key).copied()
1695        {
1696            let needs_backfill = format == Format::Flac && !have_structural.contains(&id);
1697            let needs_checksum = match opts.checksum {
1698                ChecksumTier::None => false,
1699                ChecksumTier::Fingerprint => !has_fingerprint,
1700                ChecksumTier::Full => !has_fingerprint || !has_content_hash,
1701            };
1702            if crate::freshness::BackingStamp::from_metadata(&meta) == stamp
1703                && !needs_backfill
1704                && !needs_checksum
1705            {
1706                unchanged += 1;
1707                continue;
1708            }
1709        }
1710        changed.push(path);
1711    }
1712
1713    if let Some(p) = &opts.progress {
1714        p.emit(ScanProgress::Walked {
1715            total: changed.len() as u64,
1716        });
1717    }
1718
1719    let scan = run_pipeline(db, changed, opts)?;
1720
1721    // Prune + GC on the writer connection (single-threaded), unchanged from before.
1722    let canon_root = root;
1723    let mut pruned = 0u64;
1724    for track in db.list_tracks()? {
1725        if !Path::new(&track.backing_path).starts_with(canon_root) {
1726            continue;
1727        }
1728        if let Err(e) = std::fs::metadata(&track.backing_path)
1729            && e.kind() == std::io::ErrorKind::NotFound
1730        {
1731            db.delete_track(track.id)?;
1732            pruned += 1;
1733        }
1734    }
1735    db.gc_orphan_art()?;
1736
1737    Ok(RevalidateStats {
1738        updated: scan.scanned,
1739        unchanged,
1740        pruned,
1741        failed: scan.failed + skip_failed,
1742        raced: scan.raced,
1743    })
1744}
1745
1746/// Back-compat shim used by the CLI and existing tests.
1747pub fn revalidate(db: &Db, root: &Path) -> Result<RevalidateStats> {
1748    revalidate_with(db, root, &ScanOptions::default())
1749}
1750
1751/// SHA-256 of the probe's parsed output, hex-encoded. This is the cheap content
1752/// fingerprint: deterministic per file (the parsed `Probed` is window- and
1753/// format-independent), and excludes every filesystem-stamp field. Length-prefix
1754/// every variable-length field so concatenation can't alias.
1755pub(crate) fn fingerprint_of(p: &Probed) -> String {
1756    use sha2::{Digest, Sha256};
1757    // Inner fn (not a closure) so it doesn't hold a borrow of `h` across the
1758    // direct `h.update(...)` calls below.
1759    fn feed(h: &mut Sha256, bytes: &[u8]) {
1760        h.update((bytes.len() as u64).to_le_bytes());
1761        h.update(bytes);
1762    }
1763    let mut h = Sha256::new();
1764    feed(&mut h, p.format.as_str().as_bytes());
1765    h.update(p.audio_offset.to_le_bytes());
1766    h.update(p.audio_length.to_le_bytes());
1767    h.update((p.tags.len() as u64).to_le_bytes());
1768    for (k, v) in &p.tags {
1769        feed(&mut h, k.as_bytes());
1770        feed(&mut h, v.as_bytes());
1771    }
1772    h.update((p.pictures.len() as u64).to_le_bytes());
1773    for pic in &p.pictures {
1774        feed(&mut h, pic.mime.as_bytes());
1775        h.update(u64::from(pic.picture_type.get()).to_le_bytes());
1776        feed(&mut h, pic.description.as_bytes());
1777        h.update(u64::from(pic.width).to_le_bytes());
1778        h.update(u64::from(pic.height).to_le_bytes());
1779        feed(&mut h, &pic.data);
1780    }
1781    h.update((p.binary_tags.len() as u64).to_le_bytes());
1782    for bt in &p.binary_tags {
1783        feed(&mut h, bt.key.as_bytes());
1784        feed(&mut h, &bt.payload);
1785    }
1786    h.update((p.structural_blocks.len() as u64).to_le_bytes());
1787    for (kind, body) in &p.structural_blocks {
1788        feed(&mut h, kind.as_bytes());
1789        feed(&mut h, body);
1790    }
1791    format!("{:x}", base16ct::HexDisplay(&h.finalize()))
1792}
1793
1794/// Streaming SHA-256 of an entire backing file, hex-encoded. The authoritative
1795/// content identity; reads the whole file, so callers gate it on the `Full` tier
1796/// or a strict-confirmation need.
1797pub(crate) fn full_file_hash(path: &std::path::Path) -> std::io::Result<String> {
1798    use sha2::{Digest, Sha256};
1799    let mut f = std::fs::File::open(path)?;
1800    let mut h = Sha256::new();
1801    let mut buf = vec![0u8; 1 << 16];
1802    loop {
1803        let n = std::io::Read::read(&mut f, &mut buf)?;
1804        if n == 0 {
1805            break;
1806        }
1807        h.update(&buf[..n]);
1808    }
1809    Ok(format!("{:x}", base16ct::HexDisplay(&h.finalize())))
1810}
1811
1812#[cfg(test)]
1813mod bounded_probe_tests;
1814#[cfg(test)]
1815mod hardening_tests;
1816#[cfg(test)]
1817mod ogg_probe_tests;
1818#[cfg(test)]
1819mod scan_unit_tests;
1820#[cfg(test)]
1821mod wav_probe_tests;