use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::{Path, PathBuf};
use musefs_db::convert::usize_from;
use musefs_db::{Db, Format, NewArt, NewTrack, Tag, TrackArt};
use musefs_format::{EmbeddedBinaryTag, EmbeddedPicture, Extent, flac, mp3, mp4, ogg, wav};
use crate::byte_budget::ByteBudget;
use crate::error::Result;
use crate::freshness::BackingStamp;
use std::fmt;
use std::sync::Arc;
use std::sync::mpsc::sync_channel;
const BATCH_FILES: usize = 256;
const BATCH_BYTES: u64 = 64 << 20;
const WINDOW: usize = 1 << 16; const MAX_WIDEN_RETRIES: usize = 8;
pub(crate) const MAX_PROBE_BYTES: u64 = 64 << 20;
pub(crate) const MAX_ART_BYTES: usize = 16 * 1024 * 1024 - 64 * 1024;
const MAX_BINARY_TAG_BYTES: usize = MAX_ART_BYTES;
#[derive(Debug)]
enum ProbeOutcome {
Probed(Probed, BackingStamp),
Unparseable,
Raced,
}
#[cfg(test)]
thread_local! {
static AFTER_S1_HOOK: std::cell::RefCell<Option<Box<dyn FnMut()>>> =
const { std::cell::RefCell::new(None) };
}
#[cfg(test)]
fn fire_after_s1() {
AFTER_S1_HOOK.with(|h| {
if let Some(f) = h.borrow_mut().as_mut() {
f();
}
});
}
#[cfg(test)]
fn set_after_s1_hook(f: impl FnMut() + 'static) {
AFTER_S1_HOOK.with(|h| *h.borrow_mut() = Some(Box::new(f)));
}
#[cfg(test)]
fn clear_after_s1_hook() {
AFTER_S1_HOOK.with(|h| *h.borrow_mut() = None);
}
#[derive(Debug, Clone, Copy)]
pub enum ScanProgress<'a> {
Discovered { found: u64 },
Walked { total: u64 },
Ingested {
done: u64,
total: u64,
path: &'a str,
},
}
#[derive(Clone)]
pub struct ProgressSink(Arc<dyn for<'a> Fn(ScanProgress<'a>) + Send + Sync>);
impl ProgressSink {
pub fn new(f: impl for<'a> Fn(ScanProgress<'a>) + Send + Sync + 'static) -> Self {
ProgressSink(Arc::new(f))
}
fn emit(&self, ev: ScanProgress<'_>) {
(self.0)(ev);
}
}
impl fmt::Debug for ProgressSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ProgressSink")
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ScanStats {
pub scanned: u64,
pub skipped: u64,
pub already_present: u64,
pub failed: u64,
pub raced: u64,
}
#[derive(Debug, Default)]
struct SkipTally {
total: u64,
by_ext: BTreeMap<String, u64>,
}
impl SkipTally {
fn record(&mut self, path: &Path) {
self.total += 1;
let ext = path
.extension()
.and_then(|e| e.to_str())
.map_or_else(|| "<none>".to_string(), str::to_ascii_lowercase);
*self.by_ext.entry(ext).or_insert(0) += 1;
}
fn summary(&self) -> Option<String> {
if self.total == 0 {
return None;
}
let mut buckets: Vec<(&String, &u64)> = self.by_ext.iter().collect();
buckets.sort_by(|a, b| b.1.cmp(a.1).then_with(|| a.0.cmp(b.0)));
let breakdown = buckets
.iter()
.map(|(ext, n)| format!("{ext}={n}"))
.collect::<Vec<_>>()
.join(", ");
Some(format!("skipped {}: {breakdown}", self.total))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RevalidateStats {
pub updated: u64,
pub unchanged: u64,
pub pruned: u64,
pub failed: u64,
pub raced: u64,
}
fn has_ext(path: &Path, ext: &str) -> bool {
path.extension()
.and_then(|e| e.to_str())
.is_some_and(|e| e.eq_ignore_ascii_case(ext))
}
fn is_supported_audio(path: &Path) -> bool {
has_ext(path, "flac")
|| has_ext(path, "mp3")
|| has_ext(path, "m4a")
|| has_ext(path, "m4b")
|| has_ext(path, "ogg")
|| has_ext(path, "oga")
|| has_ext(path, "opus")
|| has_ext(path, "wav")
}
fn collect_audio(
root: &Path,
out: &mut Vec<PathBuf>,
follow_symlinks: bool,
) -> std::io::Result<SkipTally> {
collect_audio_with(root, out, follow_symlinks, None)
}
fn collect_audio_with(
root: &Path,
out: &mut Vec<PathBuf>,
follow_symlinks: bool,
progress: Option<&ProgressSink>,
) -> std::io::Result<SkipTally> {
let mut visited = HashSet::new();
let mut files_visited = HashSet::new();
let mut tally = SkipTally::default();
if follow_symlinks && let Ok(meta) = std::fs::metadata(root) {
visited.insert(dir_key(&meta));
}
collect_audio_inner(
root,
out,
follow_symlinks,
&mut visited,
&mut files_visited,
&mut tally,
progress,
)?;
Ok(tally)
}
fn collect_audio_inner(
root: &Path,
out: &mut Vec<PathBuf>,
follow_symlinks: bool,
visited: &mut HashSet<(u64, u64)>,
files_visited: &mut HashSet<(u64, u64)>,
tally: &mut SkipTally,
progress: Option<&ProgressSink>,
) -> std::io::Result<()> {
let entries = match std::fs::read_dir(root) {
Ok(entries) => entries,
Err(e) => {
log::warn!("skipping directory {}: {e}", root.display());
return Ok(());
}
};
for entry in entries {
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
log::warn!("skipping unreadable entry in {}: {e}", root.display());
continue;
}
};
let path = entry.path();
let ftype = match entry.file_type() {
Ok(ftype) => ftype,
Err(e) => {
log::warn!("skipping {}: {e}", path.display());
continue;
}
};
if ftype.is_dir() {
descend(
&path,
out,
follow_symlinks,
visited,
files_visited,
tally,
progress,
)?;
} else if ftype.is_file() {
if is_supported_audio(&path) {
push_file(&path, out, follow_symlinks, files_visited, None, progress);
} else {
tally.record(&path);
}
} else if ftype.is_symlink() {
if !follow_symlinks {
log::debug!(
"skipping symlink {} (pass --follow-symlinks to scan it)",
path.display()
);
continue;
}
match std::fs::metadata(&path) {
Ok(meta) if meta.is_dir() => {
descend(
&path,
out,
follow_symlinks,
visited,
files_visited,
tally,
progress,
)?;
}
Ok(meta) if meta.is_file() => {
if is_supported_audio(&path) {
push_file(
&path,
out,
follow_symlinks,
files_visited,
Some(&meta),
progress,
);
} else {
tally.record(&path);
}
}
Ok(_) => {}
Err(e) => {
log::warn!("skipping broken symlink {}: {e}", path.display());
}
}
} else {
tally.record(&path);
}
}
Ok(())
}
fn descend(
path: &Path,
out: &mut Vec<PathBuf>,
follow_symlinks: bool,
visited: &mut HashSet<(u64, u64)>,
files_visited: &mut HashSet<(u64, u64)>,
tally: &mut SkipTally,
progress: Option<&ProgressSink>,
) -> std::io::Result<()> {
if !follow_symlinks {
return collect_audio_inner(
path,
out,
follow_symlinks,
visited,
files_visited,
tally,
progress,
);
}
let meta = match std::fs::metadata(path) {
Ok(m) => m,
Err(e) => {
log::warn!("skipping directory {}: {e}", path.display());
return Ok(());
}
};
if !visited.insert(dir_key(&meta)) {
log::warn!("skipping symlink cycle at {}", path.display());
return Ok(());
}
collect_audio_inner(
path,
out,
follow_symlinks,
visited,
files_visited,
tally,
progress,
)
}
fn dir_key(meta: &std::fs::Metadata) -> (u64, u64) {
use std::os::unix::fs::MetadataExt;
(meta.dev(), meta.ino())
}
fn push_file(
path: &Path,
out: &mut Vec<PathBuf>,
follow_symlinks: bool,
files_visited: &mut HashSet<(u64, u64)>,
known_meta: Option<&std::fs::Metadata>,
progress: Option<&ProgressSink>,
) {
if !follow_symlinks {
out.push(path.to_path_buf());
if let Some(p) = progress {
p.emit(ScanProgress::Discovered {
found: out.len() as u64,
});
}
return;
}
let key = match known_meta {
Some(m) => Some(dir_key(m)),
None => std::fs::metadata(path).ok().map(|m| dir_key(&m)),
};
match key {
Some(k) if !files_visited.insert(k) => {
log::debug!("skipping duplicate backing target {}", path.display());
}
_ => {
out.push(path.to_path_buf());
if let Some(p) = progress {
p.emit(ScanProgress::Discovered {
found: out.len() as u64,
});
}
}
}
}
#[derive(Debug)]
pub(crate) struct Probed {
format: Format,
audio_offset: u64,
audio_length: u64,
tags: Vec<(String, String)>,
pictures: Vec<EmbeddedPicture>,
binary_tags: Vec<EmbeddedBinaryTag>,
structural_blocks: Vec<(String, Vec<u8>)>,
}
fn wav_probed(prefix: &[u8], bounds: &wav::WavBounds) -> Probed {
let (binary_tags, promoted) = wav::read_binary_tags(prefix);
let mut tags = wav::read_tags(prefix);
tags.extend(promoted);
Probed {
format: Format::Wav,
audio_offset: bounds.audio_offset,
audio_length: bounds.audio_length,
tags,
pictures: wav::read_pictures(prefix),
binary_tags,
structural_blocks: Vec::new(),
}
}
pub(crate) fn probe_full(path: &Path, bytes: &[u8]) -> Option<Probed> {
if has_ext(path, "flac") {
let scan = flac::locate_audio(bytes).ok()?;
let (structural_blocks, binary_tags) = flac::split_preserved(&scan.preserved);
Some(Probed {
format: Format::Flac,
audio_offset: scan.audio_offset,
audio_length: scan.audio_length,
tags: flac::read_vorbis_comments(bytes).unwrap_or_default(),
pictures: flac::read_pictures(bytes).unwrap_or_default(),
binary_tags,
structural_blocks,
})
} else if has_ext(path, "mp3") {
let bounds = mp3::locate_audio(bytes).ok()?;
let (binary_tags, promoted) = mp3::read_binary_tags(bytes);
let mut tags = mp3::read_tags(bytes);
tags.extend(promoted);
Some(Probed {
format: Format::Mp3,
audio_offset: bounds.audio_offset,
audio_length: bounds.audio_length,
tags,
pictures: mp3::read_pictures(bytes),
binary_tags,
structural_blocks: Vec::new(),
})
} else if has_ext(path, "m4a") || has_ext(path, "m4b") {
let bounds = mp4::locate_audio(bytes).ok()?;
let (pictures, art_drops) = mp4::read_pictures_reporting(bytes, MAX_ART_BYTES);
let (binary_tags, bin_drops) = mp4::read_binary_tags_reporting(bytes, MAX_BINARY_TAG_BYTES);
log_mp4_oversize_drops(path, &art_drops, &bin_drops);
Some(Probed {
format: Format::M4a,
audio_offset: bounds.audio_offset,
audio_length: bounds.audio_length,
tags: mp4::read_tags(bytes),
pictures,
binary_tags,
structural_blocks: Vec::new(),
})
} else if has_ext(path, "ogg") || has_ext(path, "oga") || has_ext(path, "opus") {
let scan = ogg::locate_audio(bytes).ok()?;
let format = match scan.codec {
ogg::Codec::Opus => Format::Opus,
ogg::Codec::Vorbis => Format::Vorbis,
ogg::Codec::OggFlac => Format::OggFlac,
};
Some(Probed {
format,
audio_offset: scan.audio_offset,
audio_length: scan.audio_length,
tags: ogg::read_tags(bytes).unwrap_or_default(),
pictures: ogg::read_pictures(bytes).unwrap_or_default(),
binary_tags: Vec::new(),
structural_blocks: Vec::new(),
})
} else if has_ext(path, "wav") {
let bounds = wav::locate_audio(bytes).ok()?;
Some(wav_probed(bytes, &bounds))
} else {
None
}
}
fn read_window(file: &std::fs::File, len: usize) -> std::io::Result<Vec<u8>> {
use std::os::unix::fs::FileExt;
let mut buf = vec![0u8; len];
let n = file.read_at(&mut buf, 0)?;
buf.truncate(n);
crate::metrics::on_scan_read(n as u64);
Ok(buf)
}
fn read_tail_128(file: &std::fs::File, file_len: u64) -> std::io::Result<Option<[u8; 128]>> {
if file_len < 128 {
return Ok(None);
}
use std::os::unix::fs::FileExt;
let mut buf = [0u8; 128];
file.read_exact_at(&mut buf, file_len - 128)?;
crate::metrics::on_scan_read(128);
Ok(Some(buf))
}
fn probe_file(path: &Path, window: usize) -> std::io::Result<ProbeOutcome> {
let file = std::fs::File::open(path)?;
crate::metrics::on_scan_open();
let s1 = BackingStamp::from_metadata(&file.metadata()?);
#[cfg(test)]
fire_after_s1();
let probed = probe_body(path, &file, s1.size, window)?;
let s2 = BackingStamp::from_metadata(&file.metadata()?);
if s1 != s2 {
log::warn!("skipping {}: changed during probe", path.display());
return Ok(ProbeOutcome::Raced);
}
Ok(match probed {
Some(p) => ProbeOutcome::Probed(p, s1),
None => ProbeOutcome::Unparseable,
})
}
fn probe_file_caught(path: &Path, window: usize) -> std::io::Result<ProbeOutcome> {
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| probe_file(path, window))) {
Ok(res) => res,
Err(payload) => {
let msg = payload
.downcast_ref::<&str>()
.copied()
.or_else(|| payload.downcast_ref::<String>().map(String::as_str))
.unwrap_or("<non-string panic>");
log::error!(
"scan worker panicked probing {}: {msg}; counting as failed",
path.display()
);
Ok(ProbeOutcome::Unparseable)
}
}
}
fn probe_body(
path: &Path,
file: &std::fs::File,
file_len: u64,
window: usize,
) -> std::io::Result<Option<Probed>> {
if has_ext(path, "m4a") || has_ext(path, "m4b") {
let mut f = file;
let scan = match mp4::read_structure_from(&mut f, file_len) {
Ok(s) => s,
Err(e) => {
log::warn!("skipping {}: {e}", path.display());
return Ok(None);
}
};
let (pictures, art_drops) = mp4::read_pictures_reporting(&scan.moov, MAX_ART_BYTES);
let (binary_tags, bin_drops) =
mp4::read_binary_tags_reporting(&scan.moov, MAX_BINARY_TAG_BYTES);
log_mp4_oversize_drops(path, &art_drops, &bin_drops);
return Ok(Some(Probed {
format: Format::M4a,
audio_offset: scan.mdat_payload_offset,
audio_length: scan.mdat_payload_len,
tags: mp4::read_tags(&scan.moov),
pictures,
binary_tags,
structural_blocks: Vec::new(),
}));
}
let tail = if has_ext(path, "mp3") {
read_tail_128(file, file_len)?
} else {
None
};
let probe_cap = file_len.min(MAX_PROBE_BYTES);
let mut want = usize_from((window as u64).min(probe_cap));
let mut prefix = read_window(file, want)?;
for _ in 0..MAX_WIDEN_RETRIES {
match probe_prefix(path, &prefix, file_len, tail.as_ref()) {
Probe::Done(p) => return Ok(Some(p)),
Probe::Skip => {
log::warn!("skipping {}: no parseable audio metadata", path.display());
return Ok(None);
}
Probe::NeedMore(up_to) => {
if want as u64 >= probe_cap {
break;
}
want = usize_from(up_to.min(probe_cap))
.max(want + 1)
.min(usize_from(probe_cap));
prefix = read_window(file, want)?;
}
}
}
if (prefix.len() as u64) < probe_cap {
prefix = read_window(file, usize_from(probe_cap))?;
}
if let Some(p) = probe_full(path, &prefix) {
return Ok(Some(p));
}
if has_ext(path, "wav")
&& file_len > MAX_PROBE_BYTES
&& let Ok(bounds) = wav::locate_audio_at_ceiling(&prefix, file_len)
{
return Ok(Some(wav_probed(&prefix, &bounds)));
}
if file_len > MAX_PROBE_BYTES {
log::warn!(
"skipping {}: no parseable metadata within first {MAX_PROBE_BYTES} bytes",
path.display()
);
} else {
log::warn!("skipping {}: no parseable audio metadata", path.display());
}
Ok(None)
}
enum Probe {
Done(Probed),
NeedMore(u64),
Skip,
}
fn probe_prefix(path: &Path, prefix: &[u8], file_len: u64, tail: Option<&[u8; 128]>) -> Probe {
if has_ext(path, "flac") {
match flac::read_metadata_bounded(prefix) {
Ok(Extent::Complete(meta)) => {
let (structural_blocks, binary_tags) = flac::split_preserved(&meta.preserved);
Probe::Done(Probed {
format: Format::Flac,
audio_offset: meta.audio_offset,
audio_length: file_len - meta.audio_offset,
tags: flac::read_vorbis_comments(prefix).unwrap_or_default(),
pictures: flac::read_pictures(prefix).unwrap_or_default(),
binary_tags,
structural_blocks,
})
}
Ok(Extent::NeedMore { up_to }) => Probe::NeedMore(up_to),
Err(_) => Probe::Skip,
}
} else if has_ext(path, "mp3") {
match mp3::locate_audio_bounded(prefix, file_len, tail) {
Ok(Extent::Complete(b)) => {
let (binary_tags, promoted) = mp3::read_binary_tags(prefix);
let mut tags = mp3::read_tags(prefix);
tags.extend(promoted);
Probe::Done(Probed {
format: Format::Mp3,
audio_offset: b.audio_offset,
audio_length: b.audio_length,
tags,
pictures: mp3::read_pictures(prefix),
binary_tags,
structural_blocks: Vec::new(),
})
}
Ok(Extent::NeedMore { up_to }) => Probe::NeedMore(up_to),
Err(_) => Probe::Skip,
}
} else if has_ext(path, "ogg") || has_ext(path, "oga") || has_ext(path, "opus") {
match ogg::read_metadata_bounded(prefix, file_len) {
Ok(Extent::Complete(header)) => {
let format = match header.codec {
ogg::Codec::Opus => Format::Opus,
ogg::Codec::Vorbis => Format::Vorbis,
ogg::Codec::OggFlac => Format::OggFlac,
};
Probe::Done(Probed {
format,
audio_offset: header.audio_offset,
audio_length: file_len - header.audio_offset,
tags: ogg::read_tags(prefix).unwrap_or_default(),
pictures: ogg::read_pictures(prefix).unwrap_or_default(),
binary_tags: Vec::new(),
structural_blocks: Vec::new(),
})
}
Ok(Extent::NeedMore { up_to }) => Probe::NeedMore(up_to),
Err(_) => Probe::Skip,
}
} else if has_ext(path, "wav") {
match wav::locate_audio_bounded(prefix, file_len) {
Ok(Extent::Complete(b)) => Probe::Done(wav_probed(prefix, &b)),
Ok(Extent::NeedMore { up_to }) => Probe::NeedMore(up_to),
Err(_) => Probe::Skip,
}
} else {
Probe::Skip
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChecksumTier {
None,
Fingerprint,
Full,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MatchStrictness {
Auto,
Fast,
Strict,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WritePolicy {
Full,
StructuralOnly,
}
#[derive(Debug, Clone)]
pub struct ScanOptions {
pub jobs: usize,
pub window: usize,
pub batch_bytes: u64,
pub follow_symlinks: bool,
pub progress: Option<ProgressSink>,
pub checksum: ChecksumTier,
pub strictness: MatchStrictness,
pub force: bool,
pub prune: bool,
}
impl Default for ScanOptions {
fn default() -> Self {
Self {
jobs: 0,
window: WINDOW,
batch_bytes: BATCH_BYTES,
follow_symlinks: false,
progress: None,
checksum: ChecksumTier::Fingerprint,
strictness: MatchStrictness::Auto,
force: false,
prune: false,
}
}
}
fn effective_jobs(jobs: usize) -> usize {
if jobs != 0 {
return jobs;
}
std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
}
struct Unit {
abs_path: String,
stamp: BackingStamp,
probed: Probed,
weight: u64,
fingerprint: Option<String>,
content_hash: Option<String>,
}
fn payload_weight(p: &Probed) -> u64 {
let pictures: u64 = p.pictures.iter().map(|pic| pic.data.len() as u64).sum();
let binary: u64 = p.binary_tags.iter().map(|t| t.payload.len() as u64).sum();
let structural: u64 = p
.structural_blocks
.iter()
.map(|(_, body)| body.len() as u64)
.sum();
pictures + binary + structural
}
fn key_passes_floor(key: &str) -> bool {
!key.is_empty() && key.bytes().all(|b| b >= 0x20)
}
fn accept_pictures(abs_path: &str, pictures: Vec<EmbeddedPicture>) -> Vec<EmbeddedPicture> {
pictures
.into_iter()
.filter(|p| {
if p.data.len() > MAX_ART_BYTES {
log::warn!(
"{abs_path}: dropping embedded {} art ({} bytes), over the {MAX_ART_BYTES}-byte cap",
p.mime,
p.data.len(),
);
return false;
}
true
})
.collect()
}
fn accept_binary_tags(abs_path: &str, tags: Vec<EmbeddedBinaryTag>) -> Vec<musefs_db::BinaryTag> {
tags.into_iter()
.filter(|b| {
if b.payload.len() > MAX_BINARY_TAG_BYTES {
log::warn!(
"{abs_path}: dropping binary tag {} ({} bytes), over the {MAX_BINARY_TAG_BYTES}-byte cap",
b.key,
b.payload.len(),
);
return false;
}
!b.payload.is_empty()
})
.enumerate()
.map(|(ordinal, b)| musefs_db::BinaryTag {
key: b.key,
payload: b.payload,
ordinal: ordinal as u64,
})
.collect()
}
fn log_mp4_oversize_drops(path: &Path, art: &[mp4::OversizeDrop], binary: &[mp4::OversizeDrop]) {
for d in art {
log::warn!(
"{}: dropping embedded {} art ({} bytes), over the {MAX_ART_BYTES}-byte cap",
path.display(),
d.descriptor,
d.bytes,
);
}
for d in binary {
log::warn!(
"{}: dropping binary tag {} ({} bytes), over the {MAX_BINARY_TAG_BYTES}-byte cap",
path.display(),
d.descriptor,
d.bytes,
);
}
}
fn structural_blocks_from(blocks: Vec<(String, Vec<u8>)>) -> Vec<musefs_db::StructuralBlock> {
let mut ordinals: HashMap<String, u64> = HashMap::new();
blocks
.into_iter()
.map(|(kind, body)| {
let ord = ordinals.entry(kind.clone()).or_insert(0);
let block = musefs_db::StructuralBlock {
kind,
ordinal: *ord,
body,
};
*ord += 1;
block
})
.collect()
}
trait TrackSink {
fn upsert_track(&mut self, t: &NewTrack) -> musefs_db::Result<i64>;
fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> musefs_db::Result<()>;
fn set_binary_tags(
&mut self,
track_id: i64,
tags: &[musefs_db::BinaryTag],
) -> musefs_db::Result<()>;
fn set_structural_blocks(
&mut self,
track_id: i64,
blocks: &[musefs_db::StructuralBlock],
) -> musefs_db::Result<()>;
fn upsert_art(&mut self, a: &NewArt) -> musefs_db::Result<i64>;
fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> musefs_db::Result<()>;
fn set_track_checksums(
&mut self,
track_id: i64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> musefs_db::Result<()>;
fn track_exists_at(&mut self, path: &str) -> musefs_db::Result<bool>;
fn tracks_by_fingerprint(&mut self, fp: &str) -> musefs_db::Result<Vec<musefs_db::Track>>;
#[allow(clippy::too_many_arguments)]
fn retarget_track(
&mut self,
id: i64,
new_backing_path: &str,
stamp: BackingStamp,
audio_offset: u64,
audio_length: u64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> musefs_db::Result<()>;
}
impl TrackSink for &Db {
fn upsert_track(&mut self, t: &NewTrack) -> musefs_db::Result<i64> {
Db::upsert_track(self, t)
}
fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> musefs_db::Result<()> {
Db::replace_tags(self, track_id, tags)
}
fn set_binary_tags(
&mut self,
track_id: i64,
tags: &[musefs_db::BinaryTag],
) -> musefs_db::Result<()> {
Db::set_binary_tags(self, track_id, tags)
}
fn set_structural_blocks(
&mut self,
track_id: i64,
blocks: &[musefs_db::StructuralBlock],
) -> musefs_db::Result<()> {
Db::set_structural_blocks(self, track_id, blocks)
}
fn upsert_art(&mut self, a: &NewArt) -> musefs_db::Result<i64> {
Db::upsert_art(self, a)
}
fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> musefs_db::Result<()> {
Db::set_track_art(self, track_id, items)
}
fn set_track_checksums(
&mut self,
track_id: i64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> musefs_db::Result<()> {
Db::set_track_checksums(self, track_id, fingerprint, content_hash)
}
fn track_exists_at(&mut self, path: &str) -> musefs_db::Result<bool> {
Ok(Db::get_track_by_path(self, path)?.is_some())
}
fn tracks_by_fingerprint(&mut self, fp: &str) -> musefs_db::Result<Vec<musefs_db::Track>> {
Db::tracks_by_fingerprint(self, fp)
}
fn retarget_track(
&mut self,
id: i64,
new_backing_path: &str,
stamp: BackingStamp,
audio_offset: u64,
audio_length: u64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> musefs_db::Result<()> {
Db::retarget_track(
self,
id,
new_backing_path,
stamp.size,
stamp.mtime_ns,
stamp.ctime_ns,
audio_offset,
audio_length,
fingerprint,
content_hash,
)
}
}
impl TrackSink for &mut musefs_db::BulkWriter<'_> {
fn upsert_track(&mut self, t: &NewTrack) -> musefs_db::Result<i64> {
musefs_db::BulkWriter::upsert_track(self, t)
}
fn replace_tags(&mut self, track_id: i64, tags: &[Tag]) -> musefs_db::Result<()> {
musefs_db::BulkWriter::replace_tags(self, track_id, tags)
}
fn set_binary_tags(
&mut self,
track_id: i64,
tags: &[musefs_db::BinaryTag],
) -> musefs_db::Result<()> {
musefs_db::BulkWriter::set_binary_tags(self, track_id, tags)
}
fn set_structural_blocks(
&mut self,
track_id: i64,
blocks: &[musefs_db::StructuralBlock],
) -> musefs_db::Result<()> {
musefs_db::BulkWriter::set_structural_blocks(self, track_id, blocks)
}
fn upsert_art(&mut self, a: &NewArt) -> musefs_db::Result<i64> {
musefs_db::BulkWriter::upsert_art(self, a)
}
fn set_track_art(&mut self, track_id: i64, items: &[TrackArt]) -> musefs_db::Result<()> {
musefs_db::BulkWriter::set_track_art(self, track_id, items)
}
fn set_track_checksums(
&mut self,
track_id: i64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> musefs_db::Result<()> {
musefs_db::BulkWriter::set_track_checksums(self, track_id, fingerprint, content_hash)
}
fn track_exists_at(&mut self, path: &str) -> musefs_db::Result<bool> {
Ok(musefs_db::BulkWriter::get_track_by_path(self, path)?.is_some())
}
fn tracks_by_fingerprint(&mut self, fp: &str) -> musefs_db::Result<Vec<musefs_db::Track>> {
musefs_db::BulkWriter::tracks_by_fingerprint(self, fp)
}
fn retarget_track(
&mut self,
id: i64,
new_backing_path: &str,
stamp: BackingStamp,
audio_offset: u64,
audio_length: u64,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> musefs_db::Result<()> {
musefs_db::BulkWriter::retarget_track(
self,
id,
new_backing_path,
stamp.size,
stamp.mtime_ns,
stamp.ctime_ns,
audio_offset,
audio_length,
fingerprint,
content_hash,
)
}
}
fn ingest_into(
mut w: impl TrackSink,
abs_path: &str,
stamp: BackingStamp,
probed: Probed,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> Result<()> {
let track_id = w.upsert_track(&NewTrack {
backing_path: abs_path.to_string(),
format: probed.format,
audio_offset: probed.audio_offset,
audio_length: probed.audio_length,
backing_size: stamp.size,
backing_mtime_ns: stamp.mtime_ns,
backing_ctime_ns: stamp.ctime_ns,
})?;
w.set_track_checksums(track_id, fingerprint, content_hash)?;
let mut tags = Vec::new();
let mut ordinals: HashMap<String, u64> = HashMap::new();
for (key, value) in probed.tags {
if !key_passes_floor(&key) {
continue;
}
let ord = ordinals.entry(key.clone()).or_insert(0);
tags.push(Tag::new(&key, &value, *ord));
*ord += 1;
}
w.replace_tags(track_id, &tags)?;
let binary_tags = accept_binary_tags(abs_path, probed.binary_tags);
w.set_binary_tags(track_id, &binary_tags)?;
let structural_blocks = structural_blocks_from(probed.structural_blocks);
w.set_structural_blocks(track_id, &structural_blocks)?;
let mut track_arts = Vec::new();
for (ordinal, pic) in accept_pictures(abs_path, probed.pictures)
.into_iter()
.enumerate()
{
let art_id = w.upsert_art(&NewArt {
mime: pic.mime,
width: (pic.width != 0).then_some(pic.width),
height: (pic.height != 0).then_some(pic.height),
data: pic.data,
})?;
let picture_type = pic.picture_type.get();
track_arts.push(TrackArt {
art_id,
picture_type,
description: pic.description,
ordinal: ordinal as u64,
});
}
w.set_track_art(track_id, &track_arts)?;
Ok(())
}
fn refresh_structural_into(
mut w: impl TrackSink,
abs_path: &str,
stamp: BackingStamp,
probed: Probed,
fingerprint: Option<&str>,
content_hash: Option<&str>,
) -> Result<()> {
let track_id = w.upsert_track(&NewTrack {
backing_path: abs_path.to_string(),
format: probed.format,
audio_offset: probed.audio_offset,
audio_length: probed.audio_length,
backing_size: stamp.size,
backing_mtime_ns: stamp.mtime_ns,
backing_ctime_ns: stamp.ctime_ns,
})?;
w.set_track_checksums(track_id, fingerprint, content_hash)?;
let structural_blocks = structural_blocks_from(probed.structural_blocks);
w.set_structural_blocks(track_id, &structural_blocks)?;
Ok(())
}
fn ingest_unit(
mut w: impl TrackSink,
unit: Unit,
strictness: MatchStrictness,
policy: WritePolicy,
) -> Result<()> {
if policy == WritePolicy::StructuralOnly {
return refresh_structural_into(
w,
&unit.abs_path,
unit.stamp,
unit.probed,
unit.fingerprint.as_deref(),
unit.content_hash.as_deref(),
);
}
if w.track_exists_at(&unit.abs_path)? {
return ingest_into(
w,
&unit.abs_path,
unit.stamp,
unit.probed,
unit.fingerprint.as_deref(),
unit.content_hash.as_deref(),
);
}
if let Some(fp) = unit.fingerprint.as_deref() {
let candidates: Vec<musefs_db::Track> = w
.tracks_by_fingerprint(fp)?
.into_iter()
.filter(|t| match std::fs::metadata(&t.backing_path) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
Ok(_) => false,
Err(e) => {
log::warn!(
"skipping retarget candidate {}: cannot stat backing path ({e})",
t.backing_path
);
false
}
})
.collect();
if candidates.len() == 1 {
let cand = &candidates[0];
let needs_full = match strictness {
MatchStrictness::Fast => false,
MatchStrictness::Auto | MatchStrictness::Strict => cand.content_hash.is_some(),
};
let new_hash: Option<String> = match (&unit.content_hash, needs_full) {
(Some(h), _) => Some(h.clone()),
(None, true) => match full_file_hash(std::path::Path::new(&unit.abs_path)) {
Ok(h) => Some(h),
Err(e) => {
log::warn!(
"hash confirm failed for {}: {e}; inserting fresh",
unit.abs_path
);
None
}
},
(None, false) => None,
};
let confirmed = match strictness {
MatchStrictness::Fast => true,
MatchStrictness::Auto | MatchStrictness::Strict => match &cand.content_hash {
None => matches!(strictness, MatchStrictness::Auto),
Some(stored) => new_hash.as_deref() == Some(stored.as_str()),
},
};
if confirmed && !w.track_exists_at(&unit.abs_path)? {
w.retarget_track(
cand.id,
&unit.abs_path,
unit.stamp,
unit.probed.audio_offset,
unit.probed.audio_length,
unit.fingerprint.as_deref(),
new_hash.as_deref(),
)?;
return Ok(());
}
if !confirmed {
log::warn!(
"fingerprint match for {} not confirmed (strictness {:?}); inserting fresh",
unit.abs_path,
strictness,
);
}
} else if candidates.len() > 1 {
log::warn!(
"ambiguous fingerprint match for {} ({} missing candidates); inserting fresh",
unit.abs_path,
candidates.len(),
);
}
}
ingest_into(
w,
&unit.abs_path,
unit.stamp,
unit.probed,
unit.fingerprint.as_deref(),
unit.content_hash.as_deref(),
)
}
fn ingest(db: &Db, abs_path: &str, meta: &std::fs::Metadata, probed: Probed) -> Result<()> {
ingest_into(
db,
abs_path,
BackingStamp::from_metadata(meta),
probed,
None,
None,
)
}
#[cfg(test)]
fn ingest_bulk(
bw: &mut musefs_db::BulkWriter<'_>,
abs_path: &str,
stamp: BackingStamp,
probed: Probed,
) -> Result<()> {
ingest_into(bw, abs_path, stamp, probed, None, None)
}
pub fn scan_directory_with(db: &Db, root: &Path, opts: &ScanOptions) -> Result<ScanStats> {
let canon = std::fs::canonicalize(root)?;
let root = canon.as_path();
let mut files = Vec::new();
let mut tally = SkipTally::default();
if root.is_file() {
if is_supported_audio(root) {
files.push(root.to_path_buf());
} else {
tally.record(root);
}
} else {
tally = collect_audio_with(
root,
&mut files,
opts.follow_symlinks,
opts.progress.as_ref(),
)?;
}
let mut already_present = 0u64;
if !opts.force {
let existing: HashSet<String> = db
.list_tracks()?
.into_iter()
.map(|t| t.backing_path)
.collect();
let before = files.len();
files.retain(|path| {
let key = if opts.follow_symlinks {
match std::fs::canonicalize(path) {
Ok(abs) => abs.to_string_lossy().into_owned(),
Err(_) => return true,
}
} else {
path.to_string_lossy().into_owned()
};
!existing.contains(&key)
});
already_present = (before - files.len()) as u64;
}
if let Some(p) = &opts.progress {
p.emit(ScanProgress::Walked {
total: files.len() as u64,
});
}
db.apply_bulk_pragmas_self()?; let mut stats = run_pipeline(db, files, opts, WritePolicy::Full)?;
stats.skipped = tally.total;
stats.already_present = already_present;
if let Some(summary) = tally.summary() {
log::warn!("{summary}");
}
Ok(stats)
}
pub fn scan_directory(db: &Db, root: &Path) -> Result<ScanStats> {
scan_directory_with(db, root, &ScanOptions::default())
}
fn run_pipeline(
db: &Db,
files: Vec<PathBuf>,
opts: &ScanOptions,
policy: WritePolicy,
) -> Result<ScanStats> {
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
let jobs = effective_jobs(opts.jobs);
let total = files.len() as u64;
let progress = opts.progress.as_ref();
let window = opts.window;
let follow_symlinks = opts.follow_symlinks;
let tier = opts.checksum;
let strictness = opts.strictness;
let cap = opts.batch_bytes;
let budget = Arc::new(ByteBudget::new(cap));
let failed = Arc::new(AtomicU64::new(0));
let raced = Arc::new(AtomicU64::new(0));
let files = Arc::new(files);
let cursor = Arc::new(AtomicUsize::new(0));
let (tx, rx) = sync_channel::<Unit>(jobs * 2);
let mut workers = Vec::with_capacity(jobs);
for _ in 0..jobs {
let files = Arc::clone(&files);
let cursor = Arc::clone(&cursor);
let tx = tx.clone();
let budget = Arc::clone(&budget);
let failed = Arc::clone(&failed);
let raced = Arc::clone(&raced);
workers.push(std::thread::spawn(move || {
loop {
let i = cursor.fetch_add(1, Ordering::Relaxed);
let Some(path) = files.get(i) else { break };
match probe_file_caught(path, window) {
Ok(ProbeOutcome::Probed(probed, stamp)) => {
let abs_path = if follow_symlinks {
match std::fs::canonicalize(path) {
Ok(abs) => abs.to_string_lossy().into_owned(),
Err(e) => {
log::warn!("skipping {}: {e}", path.display());
failed.fetch_add(1, Ordering::Relaxed);
continue;
}
}
} else {
path.to_string_lossy().into_owned()
};
let weight = payload_weight(&probed);
budget.acquire(weight); let fingerprint = match tier {
ChecksumTier::None => None,
ChecksumTier::Fingerprint | ChecksumTier::Full => {
Some(fingerprint_of(&probed))
}
};
let content_hash = match tier {
ChecksumTier::Full => {
match full_file_hash(std::path::Path::new(&abs_path)) {
Ok(h) => Some(h),
Err(e) => {
log::warn!("content hash failed for {abs_path}: {e}");
None
}
}
}
_ => None,
};
let unit = Unit {
abs_path,
stamp,
probed,
weight,
fingerprint,
content_hash,
};
if tx.send(unit).is_err() {
budget.release(weight);
break;
}
}
Ok(ProbeOutcome::Unparseable) => {
failed.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
log::warn!("skipping {}: {e}", path.display());
failed.fetch_add(1, Ordering::Relaxed);
}
Ok(ProbeOutcome::Raced) => {
raced.fetch_add(1, Ordering::Relaxed);
}
}
}
}));
}
drop(tx);
let mut scanned = 0u64;
let mut batch: Vec<Unit> = Vec::new();
let mut batch_bytes = 0u64;
let flush = |batch: &mut Vec<Unit>, batch_bytes: &mut u64, scanned: &mut u64| -> Result<()> {
if batch.is_empty() {
return Ok(());
}
let mut bw = db.bulk_writer()?;
let mut released = 0u64;
let mut committed: Vec<String> = Vec::new();
for unit in batch.drain(..) {
released += unit.weight;
committed.push(unit.abs_path.clone());
ingest_unit(&mut bw, unit, strictness, policy)?;
}
bw.commit()?;
for abs_path in committed {
*scanned += 1;
if let Some(p) = progress {
p.emit(ScanProgress::Ingested {
done: *scanned,
total,
path: &abs_path,
});
}
}
budget.release(released);
*batch_bytes = 0;
Ok(())
};
loop {
match rx.try_recv() {
Ok(unit) => {
batch_bytes += unit.weight;
batch.push(unit);
if batch.len() >= BATCH_FILES || batch_bytes >= cap {
flush(&mut batch, &mut batch_bytes, &mut scanned)?;
}
}
Err(std::sync::mpsc::TryRecvError::Empty) => {
flush(&mut batch, &mut batch_bytes, &mut scanned)?;
match rx.recv() {
Ok(unit) => {
batch_bytes += unit.weight;
batch.push(unit);
if batch.len() >= BATCH_FILES || batch_bytes >= cap {
flush(&mut batch, &mut batch_bytes, &mut scanned)?;
}
}
Err(_) => break, }
}
Err(std::sync::mpsc::TryRecvError::Disconnected) => break,
}
}
flush(&mut batch, &mut batch_bytes, &mut scanned)?;
for w in workers {
let _ = w.join();
}
Ok(ScanStats {
scanned,
skipped: 0, already_present: 0,
failed: failed.load(Ordering::Relaxed),
raced: raced.load(Ordering::Relaxed),
})
}
#[doc(hidden)]
pub fn scan_directory_full_oracle(db: &Db, root: &Path) -> Result<ScanStats> {
let mut files = Vec::new();
let mut skipped = 0u64;
if root.is_file() {
if is_supported_audio(root) {
files.push(root.to_path_buf());
} else {
skipped += 1;
}
} else {
skipped += collect_audio(root, &mut files, false)?.total;
}
let mut stats = ScanStats {
scanned: 0,
skipped,
already_present: 0,
failed: 0,
raced: 0,
};
for path in files {
let bytes = std::fs::read(&path)?;
let Some(probed) = probe_full(&path, &bytes) else {
stats.failed += 1;
continue;
};
let meta = std::fs::metadata(&path)?;
let abs = std::fs::canonicalize(&path)?;
ingest(db, &abs.to_string_lossy(), &meta, probed)?;
stats.scanned += 1;
}
Ok(stats)
}
pub fn revalidate_with(db: &Db, root: &Path, opts: &ScanOptions) -> Result<RevalidateStats> {
let canon = std::fs::canonicalize(root)?;
let root = canon.as_path();
let mut files = Vec::new();
if root.is_file() {
if is_supported_audio(root) {
files.push(root.to_path_buf());
}
} else {
collect_audio_with(
root,
&mut files,
opts.follow_symlinks,
opts.progress.as_ref(),
)?;
}
db.apply_bulk_pragmas_self()?;
let existing: HashMap<String, (crate::freshness::BackingStamp, i64, Format, bool, bool)> = db
.list_tracks()?
.into_iter()
.map(|t| {
(
t.backing_path.clone(),
(
crate::freshness::BackingStamp::from_track(&t),
t.id,
t.format,
t.fingerprint.is_some(),
t.content_hash.is_some(),
),
)
})
.collect();
let have_structural = db.track_ids_with_structural_blocks()?;
let mut unchanged = 0u64;
let mut skip_failed = 0u64;
let mut changed: Vec<PathBuf> = Vec::new();
for path in files {
let meta = match std::fs::metadata(&path) {
Ok(meta) => meta,
Err(e) => {
log::warn!("skipping {}: {e}", path.display());
skip_failed += 1;
continue;
}
};
let key = if opts.follow_symlinks {
match std::fs::canonicalize(&path) {
Ok(abs) => abs.to_string_lossy().into_owned(),
Err(e) => {
log::warn!("skipping {}: {e}", path.display());
skip_failed += 1;
continue;
}
}
} else {
path.to_string_lossy().into_owned()
};
if let Some((stamp, id, format, has_fingerprint, has_content_hash)) =
existing.get(&key).copied()
{
let needs_backfill = format == Format::Flac && !have_structural.contains(&id);
let needs_checksum = match opts.checksum {
ChecksumTier::None => false,
ChecksumTier::Fingerprint => !has_fingerprint,
ChecksumTier::Full => !has_fingerprint || !has_content_hash,
};
if crate::freshness::BackingStamp::from_metadata(&meta) == stamp
&& !needs_backfill
&& !needs_checksum
{
unchanged += 1;
continue;
}
changed.push(path);
}
}
if let Some(p) = &opts.progress {
p.emit(ScanProgress::Walked {
total: changed.len() as u64,
});
}
let mut pruned = 0u64;
let scan = run_pipeline(db, changed, opts, WritePolicy::StructuralOnly)?;
if opts.prune {
let canon_root = root;
for track in db.list_tracks()? {
if !Path::new(&track.backing_path).starts_with(canon_root) {
continue;
}
if let Err(e) = std::fs::metadata(&track.backing_path)
&& e.kind() == std::io::ErrorKind::NotFound
{
db.delete_track(track.id)?;
pruned += 1;
}
}
db.gc_orphan_art()?;
}
Ok(RevalidateStats {
updated: scan.scanned,
unchanged,
pruned,
failed: scan.failed + skip_failed,
raced: scan.raced,
})
}
pub fn revalidate(db: &Db, root: &Path) -> Result<RevalidateStats> {
revalidate_with(db, root, &ScanOptions::default())
}
pub(crate) fn fingerprint_of(p: &Probed) -> String {
use sha2::{Digest, Sha256};
fn feed(h: &mut Sha256, bytes: &[u8]) {
h.update((bytes.len() as u64).to_le_bytes());
h.update(bytes);
}
let mut h = Sha256::new();
feed(&mut h, p.format.as_str().as_bytes());
h.update(p.audio_offset.to_le_bytes());
h.update(p.audio_length.to_le_bytes());
h.update((p.tags.len() as u64).to_le_bytes());
for (k, v) in &p.tags {
feed(&mut h, k.as_bytes());
feed(&mut h, v.as_bytes());
}
h.update((p.pictures.len() as u64).to_le_bytes());
for pic in &p.pictures {
feed(&mut h, pic.mime.as_bytes());
h.update(u64::from(pic.picture_type.get()).to_le_bytes());
feed(&mut h, pic.description.as_bytes());
h.update(u64::from(pic.width).to_le_bytes());
h.update(u64::from(pic.height).to_le_bytes());
feed(&mut h, &pic.data);
}
h.update((p.binary_tags.len() as u64).to_le_bytes());
for bt in &p.binary_tags {
feed(&mut h, bt.key.as_bytes());
feed(&mut h, &bt.payload);
}
h.update((p.structural_blocks.len() as u64).to_le_bytes());
for (kind, body) in &p.structural_blocks {
feed(&mut h, kind.as_bytes());
feed(&mut h, body);
}
format!("{:x}", base16ct::HexDisplay(&h.finalize()))
}
pub(crate) fn full_file_hash(path: &std::path::Path) -> std::io::Result<String> {
use sha2::{Digest, Sha256};
let mut f = std::fs::File::open(path)?;
let mut h = Sha256::new();
let mut buf = vec![0u8; 1 << 16];
loop {
let n = std::io::Read::read(&mut f, &mut buf)?;
if n == 0 {
break;
}
h.update(&buf[..n]);
}
Ok(format!("{:x}", base16ct::HexDisplay(&h.finalize())))
}
#[cfg(test)]
mod bounded_probe_tests;
#[cfg(test)]
mod hardening_tests;
#[cfg(test)]
mod ogg_probe_tests;
#[cfg(test)]
mod scan_unit_tests;
#[cfg(test)]
mod wav_probe_tests;