Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3, tag_wav};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` is the only `&mut` port: it performs the authenticated WAV render
158/// flow and so mutates its cached session. The rest are shared references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a mut SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// The audio-producing actions ([`Download`](Action::Download) and
186/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
187/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
188/// transcode, tag) overlap while the order-sensitive Suno API calls are
189/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
190/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
191/// rename, delete, and artifact writes/deletes) then run serially in plan order.
192///
193/// The outcome is deterministic regardless of completion order: concurrent audio
194/// results are committed to the manifest in plan-index order, so the same plan
195/// always yields the same manifest and counts whatever the concurrency level. A
196/// per-clip failure is recorded and the run continues; only an auth failure or a
197/// full disk aborts, and it does so promptly by stopping further audio work.
198///
199/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
200/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
201/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
202/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
203/// run with the feature off) is tagged exactly as before. The synced `.lrc`
204/// sidecar itself is a generated artifact whose body the caller has already
205/// resolved into the plan, so it is written like any other text sidecar.
206#[allow(clippy::too_many_arguments)]
207pub async fn execute<H, F, G, C>(
208    plan: &Plan,
209    manifest: &mut Manifest,
210    albums: &mut BTreeMap<String, AlbumArt>,
211    playlists: &mut BTreeMap<String, PlaylistState>,
212    desired: &[Desired],
213    synced: &HashMap<String, AlignedLyrics>,
214    ports: Ports<'_, H, F, G, C>,
215    opts: &ExecOptions,
216) -> ExecOutcome
217where
218    H: Http,
219    F: Filesystem,
220    G: Ffmpeg,
221    C: Clock,
222{
223    let Ports {
224        client,
225        http,
226        fs,
227        ffmpeg,
228        clock,
229    } = ports;
230    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
231    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
232    // How many tracked artifact slots reference each path. The inline old-path
233    // cleanup removes a path only once nothing else holds it: each slot that
234    // moves away decrements its reference, and the removal fires only when the
235    // count reaches zero and no action writes the path this run. This keeps a
236    // live file a co-referencing slot still owns (a prior failed swap can leave
237    // two clips sharing a path) while letting the last slot to leave reclaim it,
238    // so nothing is orphaned either (#76).
239    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
240    for (_, entry) in manifest.iter() {
241        for path in entry.artifact_paths() {
242            *tracked_paths.entry(path.to_owned()).or_default() += 1;
243        }
244    }
245    for art in albums.values() {
246        for state in [
247            art.folder_jpg.as_ref(),
248            art.folder_webp.as_ref(),
249            art.folder_mp4.as_ref(),
250        ]
251        .into_iter()
252        .flatten()
253        {
254            *tracked_paths.entry(state.path.clone()).or_default() += 1;
255        }
256    }
257    for playlist in playlists.values() {
258        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
259    }
260    // Static cover art is otherwise fetched twice per clip (#89): once to embed
261    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
262    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
263    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
264    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
265    // its entry on use, so the map holds at most the covers for the clips in
266    // flight (bounded by `concurrency`), never the whole library.
267    let cover_wanted: HashSet<&str> = plan
268        .actions
269        .iter()
270        .filter_map(|action| match action {
271            Action::WriteArtifact {
272                kind: ArtifactKind::CoverJpg,
273                source_url,
274                ..
275            } if !source_url.is_empty() => Some(source_url.as_str()),
276            _ => None,
277        })
278        .collect();
279    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
280    // The `both` video-cover retention keeps `cover.webp` (transcoded) and
281    // `cover.mp4` (raw) for an album from the SAME `video_cover_url`. Cache that
282    // source on its first fetch so the second folder artifact drains it rather
283    // than fetching the same MP4 twice (#90 reuses the #89 fetch-once path).
284    let mut folder_cover_uses: HashMap<&str, u32> = HashMap::new();
285    for action in &plan.actions {
286        if let Action::WriteArtifact {
287            kind: ArtifactKind::FolderWebp | ArtifactKind::FolderMp4,
288            source_url,
289            ..
290        } = action
291            && !source_url.is_empty()
292        {
293            *folder_cover_uses.entry(source_url.as_str()).or_default() += 1;
294        }
295    }
296    let shared_cover_urls: HashSet<&str> = folder_cover_uses
297        .into_iter()
298        .filter(|(_, uses)| *uses > 1)
299        .map(|(url, _)| url)
300        .collect();
301    let ctx = Ctx {
302        http,
303        fs,
304        ffmpeg,
305        clock,
306        opts,
307        by_id: &by_id,
308        by_path: &by_path,
309        synced,
310        cover_cache: &cover_cache,
311        cover_wanted: &cover_wanted,
312        shared_cover_urls: &shared_cover_urls,
313    };
314
315    let mut outcome = ExecOutcome::default();
316    // Destinations whose write has actually committed this run, gating old-path
317    // cleanup so a vacated sidecar/stem is kept only when a *successful* write
318    // also targets it (#142). Serial commit order makes this a clean prefix.
319    let mut committed: BTreeSet<String> = BTreeSet::new();
320
321    // The audio-producing actions ([`Download`](Action::Download) /
322    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
323    // deliberately split so that NO destination write, file removal, or manifest
324    // update happens off the plan's order:
325    //
326    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
327    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
328    //   tag), returning the tagged bytes; and
329    // - a single serial committer below writes those bytes to the destination,
330    //   removes any superseded file, and records the manifest entry, in strict
331    //   plan-index order, interleaved with the non-audio actions.
332    //
333    // The shared client is the only `&mut` port and its API calls must stay
334    // ordered, so it rides behind an async mutex; each producer locks it only for
335    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
336    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
337    // so at most about `concurrency` tagged payloads are ever held in memory -
338    // never the whole library.
339    let client_lock = AsyncMutex::new(client);
340    let concurrency = opts.concurrency.max(1) as usize;
341    let ctx_ref = &ctx;
342    let client_lock_ref = &client_lock;
343    let mut renders = stream::iter(
344        plan.actions
345            .iter()
346            .filter(|action| is_audio_action(action))
347            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
348    )
349    .buffered(concurrency);
350
351    for action in &plan.actions {
352        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
353        // commit them here; every other action applies its own effect. Both the
354        // audio commit and the non-audio apply run serially, so all destination
355        // and manifest effects keep the plan's order exactly as the sequential
356        // executor did.
357        let result = if is_audio_action(action) {
358            match renders.next().await {
359                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
360                Some(Err(fail)) => Err(fail),
361                None => unreachable!("buffered yields one result per audio action"),
362            }
363        } else {
364            ctx.apply(
365                client_lock_ref,
366                action,
367                manifest,
368                albums,
369                playlists,
370                &mut tracked_paths,
371                &committed,
372            )
373            .await
374        };
375        match result {
376            Ok(effect) => {
377                outcome.record(effect);
378                // Record this action's destination now that its write succeeded.
379                // A later action vacating a path removes it only when no
380                // *committed* write also targets it; commit is strictly serial in
381                // plan order, so a planned-but-failed or not-yet-run write never
382                // protects a stale file from cleanup (#142).
383                if let Some(dest) = written_path(action) {
384                    committed.insert(dest.to_owned());
385                }
386            }
387            Err(fail) => {
388                let abort = abort_status(fail.class);
389                outcome.failures.push(Failure {
390                    clip_id: fail.clip_id,
391                    reason: fail.reason,
392                });
393                if let Some(status) = abort {
394                    // A systemic abort stops the run. Dropping the render stream
395                    // cancels any in-flight or completed-but-uncommitted producer;
396                    // because producers touch nothing on disk, the destination and
397                    // manifest are left exactly as the committed prefix wrote them,
398                    // with no untracked files and no removed-but-referenced file.
399                    outcome.status = status;
400                    break;
401                }
402            }
403        }
404    }
405    drop(renders);
406
407    // Renames and deletes can leave an album directory empty; prune those ghost
408    // directories bottom-up. This runs on both the completed and the aborted
409    // paths, and is best-effort: a prune failure is only a missed tidy that the
410    // next run repeats, never a reason to fail the run.
411    let _ = fs.prune_empty_dirs("");
412    outcome
413}
414
415/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
416/// file. Its slow render runs in the concurrent phase; its destination write and
417/// manifest update are committed serially in plan order. Everything else touches
418/// the manifest, album, or playlist stores directly and runs serially.
419fn is_audio_action(action: &Action) -> bool {
420    matches!(action, Action::Download { .. } | Action::Reformat { .. })
421}
422
423/// The destination path an action writes on success, or `None` for actions that
424/// write no file (skips, deletes). The serial committer records this once the
425/// action succeeds, so a later action vacating that same path keeps it rather
426/// than removing a freshly written file (#142, #76).
427fn written_path(action: &Action) -> Option<&str> {
428    match action {
429        Action::Download { path, .. }
430        | Action::Reformat { path, .. }
431        | Action::WriteArtifact { path, .. }
432        | Action::WriteStem { path, .. } => Some(path),
433        Action::Rename { to, .. }
434        | Action::MoveArtifact { to, .. }
435        | Action::MoveStem { to, .. } => Some(to),
436        _ => None,
437    }
438}
439
440/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
441/// committer needs to place them. Produced concurrently and side-effect-free (no
442/// destination write, no removal, no manifest touch); [`commit_audio`] applies
443/// all of those in plan order.
444struct RenderedAudio {
445    clip_id: String,
446    path: String,
447    format: AudioFormat,
448    /// The superseded file to remove after the new one lands (a [`Reformat`]),
449    /// or `None` for a plain [`Download`].
450    from_path: Option<String>,
451    effect: Effect,
452    bytes: Vec<u8>,
453}
454
455/// What an applied action did, for the outcome counters.
456enum Effect {
457    Downloaded,
458    Reformatted,
459    Retagged,
460    Renamed,
461    Deleted,
462    Skipped,
463    ArtifactWritten,
464    ArtifactDeleted,
465}
466
467/// How a failure should be handled (SYNC-17).
468#[derive(Debug, Clone, Copy)]
469enum Class {
470    /// Stop the account run; do not retry.
471    Auth,
472    /// Stop the account run: a full disk is systemic, like auth, so aborting
473    /// beats skipping every remaining clip (each of which would first burn a
474    /// server-side WAV-render budget before failing the same way).
475    Disk,
476    /// Retry a bounded number of times, then record and skip.
477    Transient,
478    /// Record and skip immediately.
479    Permanent,
480}
481
482/// A classified action failure attributed to a clip.
483struct Fail {
484    class: Class,
485    clip_id: String,
486    reason: String,
487}
488
489/// The run-ending status for a failure class, or `None` when the failure is
490/// per-clip and the run continues.
491fn abort_status(class: Class) -> Option<RunStatus> {
492    match class {
493        Class::Auth => Some(RunStatus::AuthAborted),
494        Class::Disk => Some(RunStatus::DiskFull),
495        Class::Transient | Class::Permanent => None,
496    }
497}
498
499fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
500    Fail {
501        class: Class::Auth,
502        clip_id: clip_id.into(),
503        reason: reason.into(),
504    }
505}
506
507fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
508    Fail {
509        class: Class::Transient,
510        clip_id: clip_id.into(),
511        reason: reason.into(),
512    }
513}
514
515fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
516    Fail {
517        class: Class::Permanent,
518        clip_id: clip_id.into(),
519        reason: reason.into(),
520    }
521}
522
523fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
524    Fail {
525        class: Class::Disk,
526        clip_id: clip_id.into(),
527        reason: reason.into(),
528    }
529}
530
531/// Whether an artifact kind is album-scoped folder art (owned by a root id and
532/// recorded on the album store) rather than a per-clip sidecar (recorded on the
533/// manifest).
534fn is_album_kind(kind: ArtifactKind) -> bool {
535    matches!(
536        kind,
537        ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4
538    )
539}
540
541/// True for the library-scoped playlist artifact, routed to the playlist store.
542fn is_playlist_kind(kind: ArtifactKind) -> bool {
543    matches!(kind, ArtifactKind::Playlist)
544}
545
546/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
547/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
548/// root/playlist id that is deliberately absent from the manifest.
549fn is_per_clip_kind(kind: ArtifactKind) -> bool {
550    matches!(
551        kind,
552        ArtifactKind::CoverJpg
553            | ArtifactKind::CoverWebp
554            | ArtifactKind::DetailsTxt
555            | ArtifactKind::LyricsTxt
556            | ArtifactKind::Lrc
557            | ArtifactKind::VideoMp4
558    )
559}
560
561/// Recover a playlist's display name from its `.m3u8` path's file stem.
562///
563/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
564/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
565/// this recovered name is a convenience for humans and its lossiness (the
566/// sanitiser is not reversible) never affects a decision.
567fn playlist_name_from_path(path: &str) -> String {
568    std::path::Path::new(path)
569        .file_stem()
570        .map(|stem| stem.to_string_lossy().into_owned())
571        .unwrap_or_default()
572}
573
574/// A classified fetch failure, not yet attributed to a clip.
575struct FetchError {
576    class: Class,
577    reason: String,
578    retry_after: Option<Duration>,
579}
580
581impl FetchError {
582    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
583        Self {
584            class: Class::Transient,
585            reason: reason.into(),
586            retry_after,
587        }
588    }
589
590    fn permanent(reason: impl Into<String>) -> Self {
591        Self {
592            class: Class::Permanent,
593            reason: reason.into(),
594            retry_after: None,
595        }
596    }
597
598    fn attribute(self, clip_id: &str) -> Fail {
599        Fail {
600            class: self.class,
601            clip_id: clip_id.to_owned(),
602            reason: self.reason,
603        }
604    }
605}
606
607/// The shared, read-only context threaded through every action handler.
608struct Ctx<'a, H, F, G, C> {
609    http: &'a H,
610    fs: &'a F,
611    ffmpeg: &'a G,
612    clock: &'a C,
613    opts: &'a ExecOptions,
614    by_id: &'a HashMap<&'a str, &'a Desired>,
615    by_path: &'a HashMap<&'a str, &'a Desired>,
616    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
617    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
618    /// lyric text; a clip absent here is tagged exactly as before. Populated by
619    /// the caller (the fetch is IO), so the engine stays free of direct IO.
620    synced: &'a HashMap<String, AlignedLyrics>,
621    /// Static cover art the audio producer already fetched to embed in the tag,
622    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
623    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
624    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
625    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
626    /// concurrent producers only ever insert, and the lock is never held across an
627    /// await.
628    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
629    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
630    /// a cover only when its URL is here, so a clip whose cover is embedded but
631    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
632    cover_wanted: &'a HashSet<&'a str>,
633    /// Album video-cover source URLs fetched by more than one folder artifact
634    /// this run. The `both` retention derives `cover.webp` (transcoded) and
635    /// `cover.mp4` (raw) from the SAME `video_cover_url`; the first fetch caches
636    /// the raw source here so the sibling drains it instead of re-fetching (#90
637    /// reuses the #89 fetch-once path). `FolderWebp` sorts before `FolderMp4`, so
638    /// the raw source is always cached before the raw sidecar reads it.
639    shared_cover_urls: &'a HashSet<&'a str>,
640}
641
642impl<H, F, G, C> Ctx<'_, H, F, G, C>
643where
644    H: Http,
645    F: Filesystem,
646    G: Ffmpeg,
647    C: Clock,
648{
649    /// Apply one non-audio action, returning what it did or why it failed.
650    ///
651    /// Audio actions ([`Download`](Action::Download) /
652    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
653    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
654    #[allow(clippy::too_many_arguments)]
655    async fn apply(
656        &self,
657        client_lock: &ClientLock<'_, C>,
658        action: &Action,
659        manifest: &mut Manifest,
660        albums: &mut BTreeMap<String, AlbumArt>,
661        playlists: &mut BTreeMap<String, PlaylistState>,
662        tracked_paths: &mut HashMap<String, u32>,
663        committed: &BTreeSet<String>,
664    ) -> Result<Effect, Fail> {
665        match action {
666            Action::Download { .. } | Action::Reformat { .. } => {
667                unreachable!("audio actions are applied in the concurrent phase")
668            }
669            Action::Retag {
670                clip,
671                lineage,
672                path,
673            } => self.retag(manifest, clip, lineage, path).await,
674            Action::Rename { from, to } => self.rename(manifest, from, to),
675            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
676            Action::Skip { clip_id } => {
677                self.refresh_preserve(manifest, clip_id);
678                Ok(Effect::Skipped)
679            }
680            Action::WriteArtifact {
681                kind,
682                path,
683                source_url,
684                hash,
685                owner_id,
686                content,
687            } => {
688                self.write_artifact(
689                    manifest,
690                    albums,
691                    playlists,
692                    *kind,
693                    path,
694                    source_url,
695                    hash,
696                    owner_id,
697                    content.as_deref(),
698                    tracked_paths,
699                    committed,
700                )
701                .await
702            }
703            Action::DeleteArtifact {
704                kind,
705                path,
706                owner_id,
707            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
708            Action::MoveArtifact {
709                kind,
710                from,
711                to,
712                source_url,
713                hash,
714                owner_id,
715            } => {
716                self.move_artifact(
717                    manifest,
718                    albums,
719                    playlists,
720                    *kind,
721                    from,
722                    to,
723                    source_url,
724                    hash,
725                    owner_id,
726                    tracked_paths,
727                    committed,
728                )
729                .await
730            }
731            Action::WriteStem {
732                clip_id,
733                key,
734                stem_id,
735                path,
736                source_url,
737                format,
738                hash,
739            } => {
740                self.write_stem(
741                    client_lock,
742                    manifest,
743                    clip_id,
744                    key,
745                    stem_id,
746                    path,
747                    source_url,
748                    *format,
749                    hash,
750                    tracked_paths,
751                    committed,
752                )
753                .await
754            }
755            Action::DeleteStem { clip_id, key, path } => {
756                self.delete_stem(manifest, clip_id, key, path)
757            }
758            Action::MoveStem {
759                clip_id,
760                key,
761                stem_id,
762                from,
763                to,
764                source_url,
765                format,
766                hash,
767            } => {
768                self.move_stem(
769                    client_lock,
770                    manifest,
771                    clip_id,
772                    key,
773                    stem_id,
774                    from,
775                    to,
776                    source_url,
777                    *format,
778                    hash,
779                    tracked_paths,
780                    committed,
781                )
782                .await
783            }
784        }
785    }
786
787    /// Render one audio action's tagged bytes, side-effect-free.
788    ///
789    /// This is the concurrent part: it fetches, transcodes, and tags the file
790    /// (through shared ports, plus the client behind `client_lock`), then returns
791    /// the bytes and where they must go. It deliberately writes nothing, removes
792    /// nothing, and never touches `manifest`, so many run at once and an aborted
793    /// run can drop them with no destination or manifest effect. The serial
794    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
795    async fn prepare_audio(
796        &self,
797        client_lock: &ClientLock<'_, C>,
798        action: &Action,
799    ) -> Result<RenderedAudio, Fail> {
800        match action {
801            Action::Download {
802                clip,
803                lineage,
804                path,
805                format,
806            } => {
807                let bytes = self
808                    .produce_audio(client_lock, clip, lineage, *format)
809                    .await?;
810                Ok(RenderedAudio {
811                    clip_id: clip.id.clone(),
812                    path: path.clone(),
813                    format: *format,
814                    from_path: None,
815                    effect: Effect::Downloaded,
816                    bytes,
817                })
818            }
819            Action::Reformat {
820                clip,
821                path,
822                from_path,
823                from: _,
824                to,
825            } => {
826                // A Reformat action carries no lineage, so recover it from the
827                // desired set (the same context that drove naming and the hash),
828                // falling back to a self-rooted context when the clip is not in
829                // the current selection.
830                let lineage = self
831                    .by_id
832                    .get(clip.id.as_str())
833                    .map(|d| d.lineage.clone())
834                    .unwrap_or_else(|| LineageContext::own_root(clip));
835                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
836                Ok(RenderedAudio {
837                    clip_id: clip.id.clone(),
838                    path: path.clone(),
839                    format: *to,
840                    from_path: Some(from_path.clone()),
841                    effect: Effect::Reformatted,
842                    bytes,
843                })
844            }
845            _ => unreachable!("prepare_audio only handles audio actions"),
846        }
847    }
848
849    /// Commit one rendered audio result serially, in plan order.
850    ///
851    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
852    /// the superseded file, then records the manifest entry. Ordering the write
853    /// before the removal keeps a crash from losing both copies; keeping all of
854    /// this off the concurrent phase preserves the sequential executor's plan-order
855    /// guarantee for every destination and manifest effect.
856    fn commit_audio(
857        &self,
858        manifest: &mut Manifest,
859        rendered: RenderedAudio,
860    ) -> Result<Effect, Fail> {
861        let RenderedAudio {
862            clip_id,
863            path,
864            format,
865            from_path,
866            effect,
867            bytes,
868        } = rendered;
869        let size = self.write_verify(&clip_id, &path, &bytes)?;
870        if let Some(from) = from_path {
871            // The new file is safely in place; only now drop the old rendering.
872            self.fs.remove(&from).map_err(|err| {
873                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
874            })?;
875        }
876        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
877        Ok(effect)
878    }
879
880    /// Re-tag the existing file in place to match current metadata and art.
881    async fn retag(
882        &self,
883        manifest: &mut Manifest,
884        clip: &Clip,
885        lineage: &LineageContext,
886        path: &str,
887    ) -> Result<Effect, Fail> {
888        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
889            return Err(permanent_fail(
890                &clip.id,
891                "retag target missing from manifest",
892            ));
893        };
894
895        if format == AudioFormat::Wav {
896            let (meta, synced) = self.track_meta(clip, lineage);
897            let cover = self.fetch_cover(clip).await;
898            let existing = self.fs.read(path).map_err(|err| {
899                permanent_fail(&clip.id, format!("could not read for retag: {err}"))
900            })?;
901            let tagged = tag_wav(&existing, &meta, cover.as_deref(), synced)
902                .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
903            let size = self.write_verify(&clip.id, path, &tagged)?;
904            self.refresh_hashes(manifest, &clip.id, Some(size));
905            return Ok(Effect::Retagged);
906        }
907
908        let (meta, synced) = self.track_meta(clip, lineage);
909        let cover = self.fetch_cover(clip).await;
910        let existing = self
911            .fs
912            .read(path)
913            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
914        let tagged = match format {
915            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
916            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
917            AudioFormat::Wav => unreachable!("WAV handled above"),
918        }
919        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
920        let size = self.write_verify(&clip.id, path, &tagged)?;
921        self.refresh_hashes(manifest, &clip.id, Some(size));
922        Ok(Effect::Retagged)
923    }
924
925    /// Move the file and update the entry's path (and protection).
926    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
927        let label = self
928            .by_path
929            .get(to)
930            .map(|d| d.clip.id.clone())
931            .unwrap_or_else(|| to.to_owned());
932        self.fs.rename(from, to).map_err(|err| {
933            if err.is_out_of_space() {
934                disk_fail(label, "disk full: no space left to rename")
935            } else {
936                permanent_fail(label, format!("rename failed: {err}"))
937            }
938        })?;
939
940        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
941            manifest
942                .entries
943                .iter()
944                .find(|(_, entry)| entry.path == from)
945                .map(|(id, _)| id.clone())
946        });
947        if let Some(id) = clip_id
948            && let Some(entry) = manifest.entries.get_mut(&id)
949        {
950            entry.path = to.to_owned();
951            if let Some(d) = self.by_path.get(to) {
952                entry.preserve = preserve_for(d);
953            }
954        }
955        Ok(Effect::Renamed)
956    }
957
958    /// Remove the file and drop the manifest entry.
959    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
960        self.fs
961            .remove(path)
962            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
963        manifest.remove(clip_id);
964        Ok(Effect::Deleted)
965    }
966
967    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
968    /// on the owning manifest entry.
969    ///
970    /// The fetch and write share the audio path's resilience: `fetch_bytes`
971    /// retries transient failures and verifies `Content-Length`, and
972    /// `write_verify` confirms the on-disk size. A failure is attributed to the
973    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
974    /// aborts the whole run (only an auth failure or a full disk does, matching
975    /// audio).
976    ///
977    /// The bytes written depend on the kind: a static cover is the fetched image
978    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
979    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
980    ///
981    /// A sidecar is only ever written for a clip whose audio is present: a
982    /// successful `Download`/`Reformat` creates the manifest entry earlier in
983    /// this run, and a prior-run clip already has one. So an absent owning entry
984    /// means the audio failed or never existed this run; we skip (no fetch, no
985    /// write) rather than strand an untracked sidecar with no owning audio.
986    ///
987    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg),
988    /// [`FolderWebp`](ArtifactKind::FolderWebp), and
989    /// [`FolderMp4`](ArtifactKind::FolderMp4)) is album-scoped: its `owner_id`
990    /// is the album's stable root id, not a manifest clip, so it skips the
991    /// manifest presence guard and records its state on the album store instead.
992    ///
993    /// When a title or album change moves the audio, reconcile re-emits this
994    /// write at the NEW path; this handler then removes the sidecar left at the
995    /// artifact's previously tracked path, moving it rather than orphaning it.
996    /// The removal happens only after the new file is safely written, and a
997    /// remove failure returns before the state slot advances, so the next run
998    /// re-plans the identical write and retries — self-healing, never an orphan.
999    #[allow(clippy::too_many_arguments)]
1000    async fn write_artifact(
1001        &self,
1002        manifest: &mut Manifest,
1003        albums: &mut BTreeMap<String, AlbumArt>,
1004        playlists: &mut BTreeMap<String, PlaylistState>,
1005        kind: ArtifactKind,
1006        path: &str,
1007        source_url: &str,
1008        hash: &str,
1009        owner_id: &str,
1010        content: Option<&str>,
1011        tracked_paths: &mut HashMap<String, u32>,
1012        committed: &BTreeSet<String>,
1013    ) -> Result<Effect, Fail> {
1014        // A per-song sidecar needs its owning clip's manifest entry; album and
1015        // playlist kinds are keyed elsewhere and skip this guard.
1016        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
1017            // The owning audio never landed this run, so this sidecar is skipped
1018            // and will never drain a cover the producer cached for it. Drop that
1019            // entry now: an insert without a matching sidecar write must not
1020            // outlive its clip, keeping `cover_cache` bounded to the clips in
1021            // flight (#89). A non-cover kind has no entry here, so this is a
1022            // harmless no-op for them.
1023            self.cover_cache
1024                .lock()
1025                .expect("cover cache mutex poisoned")
1026                .remove(source_url);
1027            return Ok(Effect::Skipped);
1028        }
1029        // Capture the path this artifact was last tracked at, BEFORE the slot is
1030        // overwritten below, so a path-changing write (a title/album rename that
1031        // moves the audio) can clean up the old sidecar it left behind. Cover
1032        // kinds live on the manifest, folder kinds on the album store; playlists
1033        // reconcile their own old-path delete and so opt out here.
1034        let old_path = match kind {
1035            ArtifactKind::CoverJpg => manifest
1036                .get(owner_id)
1037                .and_then(|e| e.cover_jpg.as_ref())
1038                .map(|s| s.path.clone()),
1039            ArtifactKind::CoverWebp => manifest
1040                .get(owner_id)
1041                .and_then(|e| e.cover_webp.as_ref())
1042                .map(|s| s.path.clone()),
1043            ArtifactKind::DetailsTxt => manifest
1044                .get(owner_id)
1045                .and_then(|e| e.details_txt.as_ref())
1046                .map(|s| s.path.clone()),
1047            ArtifactKind::LyricsTxt => manifest
1048                .get(owner_id)
1049                .and_then(|e| e.lyrics_txt.as_ref())
1050                .map(|s| s.path.clone()),
1051            ArtifactKind::Lrc => manifest
1052                .get(owner_id)
1053                .and_then(|e| e.lrc.as_ref())
1054                .map(|s| s.path.clone()),
1055            ArtifactKind::VideoMp4 => manifest
1056                .get(owner_id)
1057                .and_then(|e| e.video_mp4.as_ref())
1058                .map(|s| s.path.clone()),
1059            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4 => albums
1060                .get(owner_id)
1061                .and_then(|a| a.artifact(kind))
1062                .map(|s| s.path.clone()),
1063            ArtifactKind::Playlist => None,
1064        };
1065        // A generated artifact (a playlist) carries its body inline and never
1066        // touches the network; a fetched one pulls (and transcodes) its source.
1067        let bytes = match content {
1068            Some(text) => text.as_bytes().to_vec(),
1069            None => self.artifact_bytes(kind, source_url, owner_id).await?,
1070        };
1071        self.write_verify(owner_id, path, &bytes)?;
1072        // The new sidecar is safely in place; only now drop a stale copy left at
1073        // the previous path (the audio moved). `remove` is idempotent, so an
1074        // already-absent old file is fine. On a genuine remove failure we return
1075        // BEFORE updating the slot, leaving the manifest/album pointing at the
1076        // old path: the next run sees the same path drift, re-plans this write,
1077        // and retries the cleanup — convergent, no orphan persists.
1078        //
1079        // The removal is gated so it can never delete a live file (#76). This
1080        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
1081        // file is removed only once nothing else holds it — no other tracked slot
1082        // still references it (count now zero) and no *committed* write this run
1083        // has already placed a file there (`committed`, the commit-tracked twin of
1084        // `suppress_path_aliasing`). On a path swap (A: x -> y while B: y -> x)
1085        // the earlier write commits its path, so the later mover keeps it; when
1086        // two slots share a path after a prior failed swap, the reference count
1087        // keeps it. But a merely *planned* colliding write that later fails no
1088        // longer protects a stale file, so it is cleaned up rather than orphaned
1089        // (#142).
1090        if let Some(old) = old_path.as_deref()
1091            && !old.is_empty()
1092            && old != path
1093        {
1094            let still_referenced = tracked_paths
1095                .get_mut(old)
1096                .map(|count| {
1097                    *count = count.saturating_sub(1);
1098                    *count > 0
1099                })
1100                .unwrap_or(false);
1101            if !still_referenced && !committed.contains(old) {
1102                self.fs.remove(old).map_err(|err| {
1103                    permanent_fail(
1104                        owner_id,
1105                        format!("could not remove old sidecar {old}: {err}"),
1106                    )
1107                })?;
1108            }
1109        }
1110        if is_album_kind(kind) {
1111            albums.entry(owner_id.to_owned()).or_default().set(
1112                kind,
1113                Some(ArtifactState {
1114                    path: path.to_owned(),
1115                    hash: hash.to_owned(),
1116                }),
1117            );
1118        } else if is_playlist_kind(kind) {
1119            playlists.insert(
1120                owner_id.to_owned(),
1121                PlaylistState {
1122                    name: playlist_name_from_path(path),
1123                    path: path.to_owned(),
1124                    hash: hash.to_owned(),
1125                },
1126            );
1127        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1128            set_manifest_artifact(
1129                entry,
1130                kind,
1131                Some(ArtifactState {
1132                    path: path.to_owned(),
1133                    hash: hash.to_owned(),
1134                }),
1135            );
1136        }
1137        Ok(Effect::ArtifactWritten)
1138    }
1139
1140    /// Relocate a fetched per-clip sidecar with a local rename, falling back to a
1141    /// fetch-and-write when the move is unsafe or the old file has vanished.
1142    ///
1143    /// Reconcile downgrades a pure path drift (same bytes, new path, old file
1144    /// present, fetched kind) to a `MoveArtifact`, so a retitle renames the file
1145    /// rather than re-downloading a cover or re-transcoding an animated WebP
1146    /// (#141). The in-place rename is taken only when `from` is this slot's alone
1147    /// to give up (no other tracked slot references it and no committed write has
1148    /// placed a file there); otherwise, or if the rename fails, the ordinary
1149    /// [`write_artifact`](Self::write_artifact) fetches fresh bytes and runs the
1150    /// gated old-path cleanup, so a swap or co-reference is handled exactly as
1151    /// before.
1152    #[allow(clippy::too_many_arguments)]
1153    async fn move_artifact(
1154        &self,
1155        manifest: &mut Manifest,
1156        albums: &mut BTreeMap<String, AlbumArt>,
1157        playlists: &mut BTreeMap<String, PlaylistState>,
1158        kind: ArtifactKind,
1159        from: &str,
1160        to: &str,
1161        source_url: &str,
1162        hash: &str,
1163        owner_id: &str,
1164        tracked_paths: &mut HashMap<String, u32>,
1165        committed: &BTreeSet<String>,
1166    ) -> Result<Effect, Fail> {
1167        // A per-clip sidecar needs its owning clip's audio present, exactly as
1168        // write_artifact requires.
1169        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
1170            return Ok(Effect::Skipped);
1171        }
1172        // Relocate in place only when `from` is ours alone to give up: no other
1173        // tracked slot still references it (a prior failed swap can share a path)
1174        // and no committed write this run has already placed a file there.
1175        // Otherwise the fetch-and-write fallback copies fresh bytes and runs the
1176        // gated old-path cleanup.
1177        let exclusive =
1178            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1179        if from != to && exclusive {
1180            match self.fs.rename(from, to) {
1181                Ok(()) => {
1182                    if let Some(count) = tracked_paths.get_mut(from) {
1183                        *count = count.saturating_sub(1);
1184                    }
1185                    if let Some(entry) = manifest.entries.get_mut(owner_id) {
1186                        set_manifest_artifact(
1187                            entry,
1188                            kind,
1189                            Some(ArtifactState {
1190                                path: to.to_owned(),
1191                                hash: hash.to_owned(),
1192                            }),
1193                        );
1194                    }
1195                    return Ok(Effect::Renamed);
1196                }
1197                Err(err) if err.is_out_of_space() => {
1198                    return Err(disk_fail(
1199                        owner_id,
1200                        "disk full: no space left to move sidecar",
1201                    ));
1202                }
1203                // The old file has vanished, or the rename is unsupported: fall
1204                // through to a fetch-and-write at `to`.
1205                Err(_) => {}
1206            }
1207        }
1208        self.write_artifact(
1209            manifest,
1210            albums,
1211            playlists,
1212            kind,
1213            to,
1214            source_url,
1215            hash,
1216            owner_id,
1217            None,
1218            tracked_paths,
1219            committed,
1220        )
1221        .await
1222    }
1223    ///
1224    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1225    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1226    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1227    /// ffmpeg port; every other kind is the fetched source verbatim (the static
1228    /// [`CoverJpg`](ArtifactKind::CoverJpg) / album [`FolderJpg`](ArtifactKind::FolderJpg)
1229    /// image, or the raw album [`FolderMp4`](ArtifactKind::FolderMp4) whose
1230    /// `video_cover_url` is kept untranscoded). A fetch or transcode failure
1231    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1232    /// disk-full transcode, which aborts the run like the audio FLAC path.
1233    async fn artifact_bytes(
1234        &self,
1235        kind: ArtifactKind,
1236        source_url: &str,
1237        owner_id: &str,
1238    ) -> Result<Vec<u8>, Fail> {
1239        // Reuse the cover the audio producer already fetched for the embedded tag
1240        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1241        // is taken and dropped in its own statement so it never spans the await.
1242        let cached = self
1243            .cover_cache
1244            .lock()
1245            .expect("cover cache mutex poisoned")
1246            .remove(source_url);
1247        let source = match cached {
1248            Some(bytes) => bytes,
1249            None => {
1250                let fetched = self
1251                    .fetch_bytes(source_url)
1252                    .await
1253                    .map_err(|err| err.attribute(owner_id))?;
1254                // Cache the raw source when a sibling folder artifact will fetch
1255                // the same URL (the `both` retention: cover.webp + cover.mp4), so
1256                // it is fetched exactly once. Bounded to shared URLs and drained
1257                // on the sibling's use.
1258                if self.shared_cover_urls.contains(source_url) {
1259                    self.cover_cache
1260                        .lock()
1261                        .expect("cover cache mutex poisoned")
1262                        .insert(source_url.to_owned(), fetched.clone());
1263                }
1264                fetched
1265            }
1266        };
1267        match kind {
1268            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1269                .ffmpeg
1270                .mp4_to_webp(&source, self.opts.cover_webp)
1271                .await
1272                .map_err(|err| {
1273                    if err.is_out_of_space() {
1274                        disk_fail(owner_id, "disk full: no space left to transcode")
1275                    } else {
1276                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1277                    }
1278                }),
1279            // The text sidecars are generated and always carry inline content, so
1280            // `write_artifact` never reaches this fetch path for them. Guard it so
1281            // a future miswiring fails loudly rather than fetching a URL.
1282            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1283                permanent_fail(owner_id, "text sidecar requires inline content"),
1284            ),
1285            ArtifactKind::CoverJpg
1286            | ArtifactKind::FolderJpg
1287            | ArtifactKind::FolderMp4
1288            | ArtifactKind::Playlist
1289            | ArtifactKind::VideoMp4 => Ok(source),
1290        }
1291    }
1292
1293    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1294    ///
1295    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1296    /// When the owning entry is already gone (its audio was deleted earlier this
1297    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1298    ///
1299    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1300    /// the album's root id, not on a manifest clip.
1301    ///
1302    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1303    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1304    /// untracked, but the design stays convergent rather than transactional: the
1305    /// next run re-plans the same removal and retries, and any directory it would
1306    /// have emptied is pruned once the file finally clears.
1307    fn delete_artifact(
1308        &self,
1309        manifest: &mut Manifest,
1310        albums: &mut BTreeMap<String, AlbumArt>,
1311        playlists: &mut BTreeMap<String, PlaylistState>,
1312        kind: ArtifactKind,
1313        path: &str,
1314        owner_id: &str,
1315    ) -> Result<Effect, Fail> {
1316        self.fs
1317            .remove(path)
1318            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1319        if is_album_kind(kind) {
1320            if let Some(art) = albums.get_mut(owner_id) {
1321                art.set(kind, None);
1322                if art.is_empty() {
1323                    albums.remove(owner_id);
1324                }
1325            }
1326        } else if is_playlist_kind(kind) {
1327            playlists.remove(owner_id);
1328        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1329            set_manifest_artifact(entry, kind, None);
1330        }
1331        Ok(Effect::ArtifactDeleted)
1332    }
1333
1334    /// Fetch one stem's bytes, write them atomically, then record the stem on
1335    /// the owning clip's keyed stem map.
1336    ///
1337    /// Mirrors [`write_artifact`](Self::write_artifact) for the keyed-stem case,
1338    /// sharing the fetch resilience (`fetch_bytes` retries and verifies
1339    /// `Content-Length`) and the atomic size-verified write. A stem is only ever
1340    /// written for a clip whose audio is present, so an absent owning manifest
1341    /// entry means the audio failed or never existed this run; we skip rather
1342    /// than strand an untracked stem with no owning audio.
1343    ///
1344    /// Stems are stored RAW in their native container and are NEVER transcoded to
1345    /// FLAC, even when the song's own format is FLAC — they are the deliberate
1346    /// exception. A `Wav` stem is rendered through the free `convert_wav` flow
1347    /// (see [`fetch_stem_bytes`](Self::fetch_stem_bytes)); an `Mp3` stem is fetched
1348    /// straight from its public CDN url. Either way the bytes land verbatim at
1349    /// `path`, whose extension already matches the stem format.
1350    ///
1351    /// When a title/album change moves the song, reconcile re-emits this write at
1352    /// the NEW path; this handler then removes the stem left at the previously
1353    /// tracked path, moving it rather than orphaning it. The removal happens only
1354    /// after the new file is safely written and only when nothing else this run
1355    /// writes that path, and a remove failure returns before the slot advances so
1356    /// the next run re-plans the identical write and retries — self-healing.
1357    #[allow(clippy::too_many_arguments)]
1358    async fn write_stem(
1359        &self,
1360        client_lock: &ClientLock<'_, C>,
1361        manifest: &mut Manifest,
1362        clip_id: &str,
1363        key: &str,
1364        stem_id: &str,
1365        path: &str,
1366        source_url: &str,
1367        format: StemFormat,
1368        hash: &str,
1369        tracked_paths: &mut HashMap<String, u32>,
1370        committed: &BTreeSet<String>,
1371    ) -> Result<Effect, Fail> {
1372        // A stem needs its owning clip's manifest entry (its audio must exist).
1373        if manifest.get(clip_id).is_none() {
1374            return Ok(Effect::Skipped);
1375        }
1376        let old_path = manifest
1377            .get(clip_id)
1378            .and_then(|e| e.stems.get(key))
1379            .map(|s| s.path.clone());
1380        let bytes = self
1381            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1382            .await?;
1383        self.write_verify(clip_id, path, &bytes)?;
1384        // The new stem is in place; only now drop a stale copy left at the old
1385        // path (the song moved, or the stem format changed). `remove` is
1386        // idempotent. On a genuine remove failure we return BEFORE updating the
1387        // slot, so the next run re-plans the same write and retries the cleanup.
1388        //
1389        // The removal is gated by the same twin guards as `write_artifact` (#76,
1390        // #142): this slot decrements its reference in `tracked_paths`, and the
1391        // file is removed only when nothing else holds it (count reaches zero)
1392        // and no committed write this run already placed a file there. This
1393        // keeps the shared file alive when two clips co-reference the same stem
1394        // path after a partially-failed swap.
1395        if let Some(old) = old_path.as_deref()
1396            && !old.is_empty()
1397            && old != path
1398        {
1399            let still_referenced = tracked_paths
1400                .get_mut(old)
1401                .map(|count| {
1402                    *count = count.saturating_sub(1);
1403                    *count > 0
1404                })
1405                .unwrap_or(false);
1406            if !still_referenced && !committed.contains(old) {
1407                self.fs.remove(old).map_err(|err| {
1408                    permanent_fail(clip_id, format!("could not remove old stem {old}: {err}"))
1409                })?;
1410            }
1411        }
1412        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1413            set_manifest_stem(
1414                entry,
1415                key,
1416                Some(ArtifactState {
1417                    path: path.to_owned(),
1418                    hash: hash.to_owned(),
1419                }),
1420            );
1421        }
1422        Ok(Effect::ArtifactWritten)
1423    }
1424
1425    /// Relocate a stem with a local rename, falling back to a fetch-and-write
1426    /// when the move is unsafe or the old file has vanished (#141).
1427    ///
1428    /// Reconcile downgrades a pure stem path drift to a `MoveStem`, so a retitle
1429    /// renames the raw stem rather than re-rendering a WAV through `convert_wav`
1430    /// or re-fetching an MP3. The in-place rename is taken only when `from` is
1431    /// this slot's alone to give up (no other tracked slot references it — two
1432    /// same-base clips can share a stem path after a partially-failed swap — and
1433    /// no committed write this run already holds it); otherwise the
1434    /// fetch-and-write fallback re-fetches the correct bytes at `to`, so a
1435    /// co-referenced shared stem is never renamed away with mismatched content.
1436    #[allow(clippy::too_many_arguments)]
1437    async fn move_stem(
1438        &self,
1439        client_lock: &ClientLock<'_, C>,
1440        manifest: &mut Manifest,
1441        clip_id: &str,
1442        key: &str,
1443        stem_id: &str,
1444        from: &str,
1445        to: &str,
1446        source_url: &str,
1447        format: StemFormat,
1448        hash: &str,
1449        tracked_paths: &mut HashMap<String, u32>,
1450        committed: &BTreeSet<String>,
1451    ) -> Result<Effect, Fail> {
1452        if manifest.get(clip_id).is_none() {
1453            return Ok(Effect::Skipped);
1454        }
1455        let exclusive =
1456            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1457        if from != to && exclusive {
1458            match self.fs.rename(from, to) {
1459                Ok(()) => {
1460                    if let Some(count) = tracked_paths.get_mut(from) {
1461                        *count = count.saturating_sub(1);
1462                    }
1463                    if let Some(entry) = manifest.entries.get_mut(clip_id) {
1464                        set_manifest_stem(
1465                            entry,
1466                            key,
1467                            Some(ArtifactState {
1468                                path: to.to_owned(),
1469                                hash: hash.to_owned(),
1470                            }),
1471                        );
1472                    }
1473                    return Ok(Effect::Renamed);
1474                }
1475                Err(err) if err.is_out_of_space() => {
1476                    return Err(disk_fail(clip_id, "disk full: no space left to move stem"));
1477                }
1478                // The old file has vanished, or the rename is unsupported: fall
1479                // through to a fetch-and-write at `to`.
1480                Err(_) => {}
1481            }
1482        }
1483        self.write_stem(
1484            client_lock,
1485            manifest,
1486            clip_id,
1487            key,
1488            stem_id,
1489            to,
1490            source_url,
1491            format,
1492            hash,
1493            tracked_paths,
1494            committed,
1495        )
1496        .await
1497    }
1498
1499    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1500    ///
1501    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1502    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1503    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1504    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1505    /// stem with no id to render) downloads its public CDN url directly. Stems
1506    /// are the deliberate exception to the source format: the bytes are returned
1507    /// exactly as delivered and are never re-encoded to FLAC.
1508    async fn fetch_stem_bytes(
1509        &self,
1510        client_lock: &ClientLock<'_, C>,
1511        clip_id: &str,
1512        stem_id: &str,
1513        source_url: &str,
1514        format: StemFormat,
1515    ) -> Result<Vec<u8>, Fail> {
1516        let url = match format {
1517            StemFormat::Wav if !stem_id.is_empty() => {
1518                match self.resolve_wav_url(client_lock, stem_id).await? {
1519                    Some(url) => url,
1520                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1521                }
1522            }
1523            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1524            _ => source_url.to_owned(),
1525        };
1526        self.fetch_bytes(&url)
1527            .await
1528            .map_err(|err| err.attribute(clip_id))
1529    }
1530
1531    /// Remove one stem file and clear its slot in the owning clip's stem map.
1532    ///
1533    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1534    /// the owning entry is already gone (its audio was deleted earlier this run,
1535    /// co-deleting the stem), there is no slot to clear and that is fine; the
1536    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1537    fn delete_stem(
1538        &self,
1539        manifest: &mut Manifest,
1540        clip_id: &str,
1541        key: &str,
1542        path: &str,
1543    ) -> Result<Effect, Fail> {
1544        self.fs
1545            .remove(path)
1546            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1547        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1548            set_manifest_stem(entry, key, None);
1549        }
1550        Ok(Effect::ArtifactDeleted)
1551    }
1552
1553    /// Download (and transcode/tag) the audio for `clip` in `format`.
1554    async fn produce_audio(
1555        &self,
1556        client_lock: &ClientLock<'_, C>,
1557        clip: &Clip,
1558        lineage: &LineageContext,
1559        format: AudioFormat,
1560    ) -> Result<Vec<u8>, Fail> {
1561        let (meta, synced) = self.track_meta(clip, lineage);
1562        match format {
1563            AudioFormat::Mp3 => {
1564                let url = clip.mp3_url();
1565                let audio = self
1566                    .fetch_bytes(&url)
1567                    .await
1568                    .map_err(|err| err.attribute(&clip.id))?;
1569                let cover = self.fetch_cover(clip).await;
1570                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1571                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1572            }
1573            AudioFormat::Flac => {
1574                let wav = self.fetch_wav(client_lock, clip).await?;
1575                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1576                    if err.is_out_of_space() {
1577                        disk_fail(&clip.id, "disk full: no space left to transcode")
1578                    } else {
1579                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1580                    }
1581                })?;
1582                let cover = self.fetch_cover(clip).await;
1583                tag_flac(&flac, &meta, cover.as_deref())
1584                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1585            }
1586            AudioFormat::Wav => {
1587                let wav = self.fetch_wav(client_lock, clip).await?;
1588                let cover = self.fetch_cover(clip).await;
1589                tag_wav(&wav, &meta, cover.as_deref(), synced)
1590                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1591            }
1592        }
1593    }
1594
1595    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1596    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1597        self.synced
1598            .get(clip_id)
1599            .filter(|aligned| !aligned.is_empty())
1600    }
1601
1602    /// The track metadata for a clip, paired with its synced lyrics (if any).
1603    ///
1604    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1605    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1606    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1607    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1608    fn track_meta<'m>(
1609        &'m self,
1610        clip: &Clip,
1611        lineage: &LineageContext,
1612    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1613        let synced = self.synced_for(&clip.id);
1614        let mut meta = TrackMetadata::from_clip(clip, lineage);
1615        if let Some(aligned) = synced {
1616            meta.lyrics = aligned.plain_text();
1617        }
1618        (meta, synced)
1619    }
1620
1621    /// Resolve the rendered WAV URL and download it.
1622    async fn fetch_wav(
1623        &self,
1624        client_lock: &ClientLock<'_, C>,
1625        clip: &Clip,
1626    ) -> Result<Vec<u8>, Fail> {
1627        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1628            Some(url) => url,
1629            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1630        };
1631        self.fetch_bytes(&url)
1632            .await
1633            .map_err(|err| err.attribute(&clip.id))
1634    }
1635
1636    /// Read the WAV URL, requesting a render and polling if it is not ready.
1637    ///
1638    /// `None` means the render did not become ready within the poll budget; the
1639    /// caller treats that as a non-fatal transient failure, never a silent skip.
1640    ///
1641    /// Each client call briefly locks `client_lock`; the poll waits happen
1642    /// unlocked, so concurrent clips interleave their WAV renders rather than
1643    /// serialising behind one clip's whole poll budget.
1644    async fn resolve_wav_url(
1645        &self,
1646        client_lock: &ClientLock<'_, C>,
1647        id: &str,
1648    ) -> Result<Option<String>, Fail> {
1649        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1650            return Ok(Some(url));
1651        }
1652        self.request_wav_retrying(client_lock, id).await?;
1653        for _ in 0..self.opts.wav_poll_attempts {
1654            self.clock.sleep(self.opts.wav_poll_interval).await;
1655            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1656                return Ok(Some(url));
1657            }
1658        }
1659        Ok(None)
1660    }
1661
1662    /// Read the rendered WAV URL, retrying transient API failures with backoff
1663    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1664    async fn wav_url_retrying(
1665        &self,
1666        client_lock: &ClientLock<'_, C>,
1667        id: &str,
1668    ) -> Result<Option<String>, Fail> {
1669        let mut attempt: u32 = 0;
1670        loop {
1671            let result = {
1672                let mut client = client_lock.lock().await;
1673                client.wav_url(self.http, id).await
1674            };
1675            match result {
1676                Ok(url) => return Ok(url),
1677                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1678                    Some(fail) => return Err(fail),
1679                    None => continue,
1680                },
1681            }
1682        }
1683    }
1684
1685    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1686    async fn request_wav_retrying(
1687        &self,
1688        client_lock: &ClientLock<'_, C>,
1689        id: &str,
1690    ) -> Result<(), Fail> {
1691        let mut attempt: u32 = 0;
1692        loop {
1693            let result = {
1694                let mut client = client_lock.lock().await;
1695                client.request_wav(self.http, id).await
1696            };
1697            match result {
1698                Ok(()) => return Ok(()),
1699                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1700                    Some(fail) => return Err(fail),
1701                    None => continue,
1702                },
1703            }
1704        }
1705    }
1706
1707    /// Classify a core error from the authenticated WAV flow. On a transient
1708    /// class within budget, back off through the [`Clock`] and return `None` to
1709    /// retry; otherwise return the terminal [`Fail`].
1710    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1711        let fail = classify_core(id, err);
1712        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1713            self.clock.sleep(backoff_delay(*attempt, None)).await;
1714            *attempt += 1;
1715            None
1716        } else {
1717            Some(fail)
1718        }
1719    }
1720
1721    /// GET `url`, retrying transient failures with backoff, verifying size.
1722    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1723        let mut attempt: u32 = 0;
1724        loop {
1725            let result = self.http.send(HttpRequest::get(url)).await;
1726            match classify_response(result) {
1727                Ok(body) => return Ok(body),
1728                Err(err) => {
1729                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1730                        let delay = backoff_delay(attempt, err.retry_after);
1731                        self.clock.sleep(delay).await;
1732                        attempt += 1;
1733                        continue;
1734                    }
1735                    return Err(err);
1736                }
1737            }
1738        }
1739    }
1740
1741    /// Download cover art, trying each candidate URL in order; `None` is fine.
1742    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1743        for url in clip.cover_candidates() {
1744            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1745                && (200..=299).contains(&response.status)
1746                && !response.body.is_empty()
1747            {
1748                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1749                // bytes so its write reuses them instead of fetching again (#89).
1750                // The lock guards only the insert, never the await above.
1751                if self.cover_wanted.contains(url) {
1752                    self.cover_cache
1753                        .lock()
1754                        .expect("cover cache mutex poisoned")
1755                        .insert(url.to_owned(), response.body.clone());
1756                }
1757                return Some(response.body);
1758            }
1759        }
1760        None
1761    }
1762
1763    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1764    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1765        self.fs.write_atomic(path, bytes).map_err(|err| {
1766            if err.is_out_of_space() {
1767                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1768            } else {
1769                permanent_fail(clip_id, format!("write failed: {err}"))
1770            }
1771        })?;
1772        match self.fs.metadata(path) {
1773            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1774            Some(stat) => Err(permanent_fail(
1775                clip_id,
1776                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1777            )),
1778            None => Ok(bytes.len() as u64),
1779        }
1780    }
1781
1782    /// Build the manifest entry for a freshly written file.
1783    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1784        match self.by_id.get(clip_id) {
1785            Some(d) => manifest_entry(d, size),
1786            None => ManifestEntry {
1787                path: path.to_owned(),
1788                format,
1789                size,
1790                ..ManifestEntry::default()
1791            },
1792        }
1793    }
1794
1795    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1796    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1797        let desired = self.by_id.get(clip_id).copied();
1798        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1799            if let Some(d) = desired {
1800                entry.meta_hash = d.meta_hash.clone();
1801                entry.art_hash = d.art_hash.clone();
1802                entry.preserve = preserve_for(d);
1803            }
1804            if let Some(size) = size {
1805                entry.size = size;
1806            }
1807        }
1808    }
1809
1810    /// Refresh only an entry's preserve marker from the current desired state.
1811    ///
1812    /// A clip can gain or lose copy/private protection with no file change, which
1813    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1814    /// persisted marker a faithful image of live protection, so the cross-run
1815    /// delete guard (SYNC-8) never reads it stale.
1816    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1817        if let Some(d) = self.by_id.get(clip_id).copied()
1818            && let Some(entry) = manifest.entries.get_mut(clip_id)
1819        {
1820            entry.preserve = preserve_for(d);
1821        }
1822    }
1823}
1824
1825/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1826fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1827    ManifestEntry {
1828        path: d.path.clone(),
1829        format: d.format,
1830        meta_hash: d.meta_hash.clone(),
1831        art_hash: d.art_hash.clone(),
1832        size,
1833        preserve: preserve_for(d),
1834        ..Default::default()
1835    }
1836}
1837
1838/// Whether a written entry must be preserved across runs: held by any copy
1839/// source, or private. The reconcile delete guard reads this marker later.
1840fn preserve_for(d: &Desired) -> bool {
1841    d.private || d.modes.contains(&SourceMode::Copy)
1842}
1843
1844/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1845fn classify_response(
1846    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1847) -> Result<Vec<u8>, FetchError> {
1848    let response = match result {
1849        Ok(response) => response,
1850        Err(err) => {
1851            return Err(FetchError::transient(
1852                format!("transport error: {err}"),
1853                None,
1854            ));
1855        }
1856    };
1857    match response.status {
1858        200..=299 => {
1859            if let Some(expected) = content_length(&response) {
1860                let actual = response.body.len() as u64;
1861                if actual != expected {
1862                    return Err(FetchError::transient(
1863                        format!("truncated download: {actual} of {expected} bytes"),
1864                        None,
1865                    ));
1866                }
1867            }
1868            Ok(response.body)
1869        }
1870        401 | 403 => Err(FetchError::transient(
1871            format!("download rejected: status {}", response.status),
1872            None,
1873        )),
1874        408 => Err(FetchError::transient("request timed out", None)),
1875        429 => Err(FetchError::transient(
1876            "rate limited",
1877            retry_after(&response),
1878        )),
1879        500..=599 => Err(FetchError::transient(
1880            format!("server error {}", response.status),
1881            None,
1882        )),
1883        status => Err(FetchError::permanent(format!(
1884            "download failed: status {status}"
1885        ))),
1886    }
1887}
1888
1889/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1890fn classify_core(id: &str, err: Error) -> Fail {
1891    let reason = err.to_string();
1892    match err {
1893        Error::Auth(_) => auth_fail(id, reason),
1894        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1895        Error::Api(_)
1896        | Error::BadRequest(_)
1897        | Error::NotFound(_)
1898        | Error::Tag(_)
1899        | Error::Config(_)
1900        | Error::Refused(_) => permanent_fail(id, reason),
1901    }
1902}
1903
1904/// The provider-reported body size from `Content-Length`, if present and valid.
1905fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1906    response.header("content-length")?.trim().parse().ok()
1907}
1908
1909#[cfg(test)]
1910mod tests {
1911    use super::*;
1912    use crate::ClerkAuth;
1913    use crate::http::HttpResponse;
1914    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1915
1916    fn clip(id: &str) -> Clip {
1917        Clip {
1918            id: id.to_owned(),
1919            title: "Song".to_owned(),
1920            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1921            ..Default::default()
1922        }
1923    }
1924
1925    fn art_clip(id: &str) -> Clip {
1926        Clip {
1927            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1928            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1929            ..clip(id)
1930        }
1931    }
1932
1933    fn ext(format: AudioFormat) -> &'static str {
1934        match format {
1935            AudioFormat::Mp3 => "mp3",
1936            AudioFormat::Flac => "flac",
1937            AudioFormat::Wav => "wav",
1938        }
1939    }
1940
1941    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1942        Desired {
1943            path: format!("{}.{}", clip.id, ext(format)),
1944            lineage: LineageContext::own_root(&clip),
1945            clip,
1946            format,
1947            meta_hash: "m".to_owned(),
1948            art_hash: "art".to_owned(),
1949            modes: vec![SourceMode::Mirror],
1950            trashed: false,
1951            private: false,
1952            artifacts: Vec::new(),
1953            stems: None,
1954        }
1955    }
1956
1957    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1958        ManifestEntry {
1959            path: path.to_owned(),
1960            format,
1961            meta_hash: "old".to_owned(),
1962            art_hash: "old-art".to_owned(),
1963            size: 8,
1964            preserve: false,
1965            ..Default::default()
1966        }
1967    }
1968
1969    #[allow(clippy::too_many_arguments)]
1970    fn run<G: Ffmpeg>(
1971        plan: &Plan,
1972        manifest: &mut Manifest,
1973        desired: &[Desired],
1974        http: &ScriptedHttp,
1975        fs: &MemFs,
1976        ffmpeg: &G,
1977        clock: &RecordingClock,
1978        opts: &ExecOptions,
1979    ) -> ExecOutcome {
1980        let mut albums = BTreeMap::new();
1981        run_with_albums(
1982            plan,
1983            manifest,
1984            &mut albums,
1985            desired,
1986            http,
1987            fs,
1988            ffmpeg,
1989            clock,
1990            opts,
1991        )
1992    }
1993
1994    #[allow(clippy::too_many_arguments)]
1995    fn run_with_albums<G: Ffmpeg>(
1996        plan: &Plan,
1997        manifest: &mut Manifest,
1998        albums: &mut BTreeMap<String, AlbumArt>,
1999        desired: &[Desired],
2000        http: &ScriptedHttp,
2001        fs: &MemFs,
2002        ffmpeg: &G,
2003        clock: &RecordingClock,
2004        opts: &ExecOptions,
2005    ) -> ExecOutcome {
2006        let mut playlists = BTreeMap::new();
2007        run_full(
2008            plan,
2009            manifest,
2010            albums,
2011            &mut playlists,
2012            desired,
2013            http,
2014            fs,
2015            ffmpeg,
2016            clock,
2017            opts,
2018        )
2019    }
2020
2021    #[allow(clippy::too_many_arguments)]
2022    fn run_full<G: Ffmpeg>(
2023        plan: &Plan,
2024        manifest: &mut Manifest,
2025        albums: &mut BTreeMap<String, AlbumArt>,
2026        playlists: &mut BTreeMap<String, PlaylistState>,
2027        desired: &[Desired],
2028        http: &ScriptedHttp,
2029        fs: &MemFs,
2030        ffmpeg: &G,
2031        clock: &RecordingClock,
2032        opts: &ExecOptions,
2033    ) -> ExecOutcome {
2034        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2035        let synced = HashMap::new();
2036        pollster::block_on(execute(
2037            plan,
2038            manifest,
2039            albums,
2040            playlists,
2041            desired,
2042            &synced,
2043            Ports {
2044                client: &mut client,
2045                http,
2046                fs,
2047                ffmpeg,
2048                clock,
2049            },
2050            opts,
2051        ))
2052    }
2053
2054    fn small_poll() -> ExecOptions {
2055        ExecOptions {
2056            max_retries: 3,
2057            wav_poll_attempts: 2,
2058            wav_poll_interval: Duration::from_secs(5),
2059            concurrency: 4,
2060            cover_webp: WebpEncodeSettings::default(),
2061        }
2062    }
2063
2064    // ── Download: MP3 ───────────────────────────────────────────────
2065
2066    #[test]
2067    fn download_mp3_writes_tagged_file_and_records_manifest() {
2068        let c = art_clip("a");
2069        let d = desired(c.clone(), AudioFormat::Mp3);
2070        let plan = Plan {
2071            actions: vec![Action::Download {
2072                clip: c.clone(),
2073                lineage: LineageContext::own_root(&c),
2074                path: d.path.clone(),
2075                format: AudioFormat::Mp3,
2076            }],
2077        };
2078        let http = ScriptedHttp::new()
2079            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2080            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2081        let fs = MemFs::new();
2082        let ffmpeg = StubFfmpeg::flac();
2083        let clock = RecordingClock::new();
2084        let mut manifest = Manifest::new();
2085
2086        let outcome = run(
2087            &plan,
2088            &mut manifest,
2089            &[d],
2090            &http,
2091            &fs,
2092            &ffmpeg,
2093            &clock,
2094            &ExecOptions::default(),
2095        );
2096
2097        assert_eq!(outcome.downloaded, 1);
2098        assert_eq!(outcome.failed(), 0);
2099        assert_eq!(outcome.status, RunStatus::Completed);
2100        let written = fs.read_file("a.mp3").unwrap();
2101        assert_eq!(&written[..3], b"ID3");
2102        assert!(written.ends_with(b"mp3-body"));
2103        let entry = manifest.get("a").unwrap();
2104        assert_eq!(entry.path, "a.mp3");
2105        assert_eq!(entry.format, AudioFormat::Mp3);
2106        assert_eq!(entry.meta_hash, "m");
2107        assert_eq!(entry.art_hash, "art");
2108        assert_eq!(entry.size, written.len() as u64);
2109        assert!(!entry.preserve);
2110    }
2111
2112    #[test]
2113    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
2114        // A clip whose alignment was fetched this run gets a word-level SYLT frame
2115        // and its plain lyric text embedded (USLT), end to end through execute.
2116        let c = art_clip("a");
2117        let d = desired(c.clone(), AudioFormat::Mp3);
2118        let plan = Plan {
2119            actions: vec![Action::Download {
2120                clip: c.clone(),
2121                lineage: LineageContext::own_root(&c),
2122                path: d.path.clone(),
2123                format: AudioFormat::Mp3,
2124            }],
2125        };
2126        let http = ScriptedHttp::new()
2127            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2128            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2129        let fs = MemFs::new();
2130        let ffmpeg = StubFfmpeg::flac();
2131        let clock = RecordingClock::new();
2132        let mut manifest = Manifest::new();
2133        let mut albums = BTreeMap::new();
2134        let mut playlists = BTreeMap::new();
2135        let mut synced = HashMap::new();
2136        synced.insert(
2137            "a".to_string(),
2138            AlignedLyrics::from_json(&serde_json::json!({
2139                "aligned_words": [],
2140                "aligned_lyrics": [
2141                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
2142                     "words": [
2143                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
2144                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
2145                     ]}
2146                ]
2147            })),
2148        );
2149        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2150        let outcome = pollster::block_on(execute(
2151            &plan,
2152            &mut manifest,
2153            &mut albums,
2154            &mut playlists,
2155            &[d],
2156            &synced,
2157            Ports {
2158                client: &mut client,
2159                http: &http,
2160                fs: &fs,
2161                ffmpeg: &ffmpeg,
2162                clock: &clock,
2163            },
2164            &ExecOptions::default(),
2165        ));
2166
2167        assert_eq!(outcome.downloaded, 1);
2168        let written = fs.read_file("a.mp3").unwrap();
2169        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2170        assert_eq!(
2171            tag.synchronised_lyrics().count(),
2172            1,
2173            "a SYLT frame is embedded"
2174        );
2175        // The plain lyric text is populated from the alignment for the USLT frame.
2176        assert_eq!(
2177            tag.lyrics().next().map(|frame| frame.text.as_str()),
2178            Some("hi there")
2179        );
2180    }
2181
2182    #[test]
2183    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
2184        // The synced map is empty when the feature is off (no alignment fetched),
2185        // so no SYLT frame and no lyric text are embedded.
2186        let c = art_clip("a");
2187        let d = desired(c.clone(), AudioFormat::Mp3);
2188        let plan = Plan {
2189            actions: vec![Action::Download {
2190                clip: c.clone(),
2191                lineage: LineageContext::own_root(&c),
2192                path: d.path.clone(),
2193                format: AudioFormat::Mp3,
2194            }],
2195        };
2196        let http = ScriptedHttp::new()
2197            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2198            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2199        let fs = MemFs::new();
2200        let ffmpeg = StubFfmpeg::flac();
2201        let clock = RecordingClock::new();
2202        let mut manifest = Manifest::new();
2203        let mut albums = BTreeMap::new();
2204        let mut playlists = BTreeMap::new();
2205        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2206        let outcome = pollster::block_on(execute(
2207            &plan,
2208            &mut manifest,
2209            &mut albums,
2210            &mut playlists,
2211            &[d],
2212            &HashMap::new(),
2213            Ports {
2214                client: &mut client,
2215                http: &http,
2216                fs: &fs,
2217                ffmpeg: &ffmpeg,
2218                clock: &clock,
2219            },
2220            &ExecOptions::default(),
2221        ));
2222        assert_eq!(outcome.downloaded, 1);
2223        let written = fs.read_file("a.mp3").unwrap();
2224        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2225        assert_eq!(tag.synchronised_lyrics().count(), 0);
2226        assert_eq!(tag.lyrics().count(), 0);
2227    }
2228
2229    #[test]
2230    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
2231        let mut c = clip("a");
2232        c.audio_url = String::new();
2233        let d = desired(c.clone(), AudioFormat::Mp3);
2234        let plan = Plan {
2235            actions: vec![Action::Download {
2236                clip: c.clone(),
2237                lineage: LineageContext::own_root(&c),
2238                path: d.path.clone(),
2239                format: AudioFormat::Mp3,
2240            }],
2241        };
2242        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
2243        let fs = MemFs::new();
2244        let mut manifest = Manifest::new();
2245        let outcome = run(
2246            &plan,
2247            &mut manifest,
2248            &[d],
2249            &http,
2250            &fs,
2251            &StubFfmpeg::flac(),
2252            &RecordingClock::new(),
2253            &ExecOptions::default(),
2254        );
2255        assert_eq!(outcome.downloaded, 1);
2256        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
2257    }
2258
2259    // ── Download: FLAC render + transcode ───────────────────────────
2260
2261    #[test]
2262    fn download_flac_renders_transcodes_and_records() {
2263        let c = clip("b");
2264        let d = desired(c.clone(), AudioFormat::Flac);
2265        let plan = Plan {
2266            actions: vec![Action::Download {
2267                clip: c.clone(),
2268                lineage: LineageContext::own_root(&c),
2269                path: d.path.clone(),
2270                format: AudioFormat::Flac,
2271            }],
2272        };
2273        let http = ScriptedHttp::new()
2274            .with_auth()
2275            .route(
2276                "/wav_file/",
2277                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
2278            )
2279            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
2280        let fs = MemFs::new();
2281        let clock = RecordingClock::new();
2282        let mut manifest = Manifest::new();
2283
2284        let outcome = run(
2285            &plan,
2286            &mut manifest,
2287            &[d],
2288            &http,
2289            &fs,
2290            &StubFfmpeg::flac(),
2291            &clock,
2292            &ExecOptions::default(),
2293        );
2294
2295        assert_eq!(outcome.downloaded, 1);
2296        assert_eq!(outcome.failed(), 0);
2297        let written = fs.read_file("b.flac").unwrap();
2298        assert_eq!(&written[..4], b"fLaC");
2299        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
2300        // The URL was ready immediately, so no render request and no polling.
2301        assert_eq!(http.count("/convert_wav/"), 0);
2302        assert!(clock.sleeps().is_empty());
2303    }
2304
2305    #[test]
2306    fn download_flac_requests_render_then_polls_until_ready() {
2307        let c = clip("c");
2308        let d = desired(c.clone(), AudioFormat::Flac);
2309        let plan = Plan {
2310            actions: vec![Action::Download {
2311                clip: c.clone(),
2312                lineage: LineageContext::own_root(&c),
2313                path: d.path.clone(),
2314                format: AudioFormat::Flac,
2315            }],
2316        };
2317        let http = ScriptedHttp::new()
2318            .with_auth()
2319            .route_seq(
2320                "/wav_file/",
2321                vec![
2322                    Reply::json("{}"),
2323                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2324                ],
2325            )
2326            .route("/convert_wav/", Reply::status(200))
2327            .route("c.wav", Reply::ok(b"wav".to_vec()));
2328        let clock = RecordingClock::new();
2329        let mut manifest = Manifest::new();
2330
2331        let outcome = run(
2332            &plan,
2333            &mut manifest,
2334            &[d],
2335            &http,
2336            &fs_new(),
2337            &StubFfmpeg::flac(),
2338            &clock,
2339            &small_poll(),
2340        );
2341
2342        assert_eq!(outcome.downloaded, 1);
2343        assert_eq!(http.count("/convert_wav/"), 1);
2344        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2345    }
2346
2347    #[test]
2348    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2349        let c = clip("d");
2350        let d = desired(c.clone(), AudioFormat::Flac);
2351        let plan = Plan {
2352            actions: vec![Action::Download {
2353                clip: c.clone(),
2354                lineage: LineageContext::own_root(&c),
2355                path: d.path.clone(),
2356                format: AudioFormat::Flac,
2357            }],
2358        };
2359        let http = ScriptedHttp::new()
2360            .with_auth()
2361            .route("/wav_file/", Reply::json("{}"))
2362            .route("/convert_wav/", Reply::status(200));
2363        let fs = MemFs::new();
2364        let clock = RecordingClock::new();
2365        let mut manifest = Manifest::new();
2366
2367        let outcome = run(
2368            &plan,
2369            &mut manifest,
2370            &[d],
2371            &http,
2372            &fs,
2373            &StubFfmpeg::flac(),
2374            &clock,
2375            &small_poll(),
2376        );
2377
2378        assert_eq!(outcome.downloaded, 0);
2379        assert_eq!(outcome.failed(), 1);
2380        assert_eq!(outcome.failures[0].clip_id, "d");
2381        assert_eq!(outcome.status, RunStatus::Completed);
2382        assert!(!fs.exists("d.flac"));
2383        assert_eq!(clock.sleeps().len(), 2);
2384    }
2385
2386    #[test]
2387    fn flac_transcode_failure_is_recorded_and_skipped() {
2388        let c = clip("t");
2389        let d = desired(c.clone(), AudioFormat::Flac);
2390        let plan = Plan {
2391            actions: vec![Action::Download {
2392                clip: c.clone(),
2393                lineage: LineageContext::own_root(&c),
2394                path: d.path.clone(),
2395                format: AudioFormat::Flac,
2396            }],
2397        };
2398        let http = ScriptedHttp::new()
2399            .with_auth()
2400            .route(
2401                "/wav_file/",
2402                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2403            )
2404            .route("t.wav", Reply::ok(b"wav".to_vec()));
2405        let fs = MemFs::new();
2406        let mut manifest = Manifest::new();
2407
2408        let outcome = run(
2409            &plan,
2410            &mut manifest,
2411            &[d],
2412            &http,
2413            &fs,
2414            &StubFfmpeg::failing(),
2415            &RecordingClock::new(),
2416            &ExecOptions::default(),
2417        );
2418
2419        assert_eq!(outcome.downloaded, 0);
2420        assert_eq!(outcome.failed(), 1);
2421        assert!(!fs.exists("t.flac"));
2422        assert!(manifest.get("t").is_none());
2423    }
2424
2425    // ── Cover fallback ──────────────────────────────────────────────
2426
2427    #[test]
2428    fn cover_falls_back_when_large_image_is_missing() {
2429        let c = art_clip("e");
2430        let d = desired(c.clone(), AudioFormat::Mp3);
2431        let plan = Plan {
2432            actions: vec![Action::Download {
2433                clip: c.clone(),
2434                lineage: LineageContext::own_root(&c),
2435                path: d.path.clone(),
2436                format: AudioFormat::Mp3,
2437            }],
2438        };
2439        let http = ScriptedHttp::new()
2440            .route("e.mp3", Reply::ok(b"body".to_vec()))
2441            .route("e/large.jpg", Reply::status(404))
2442            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2443        let fs = MemFs::new();
2444        let mut manifest = Manifest::new();
2445
2446        let outcome = run(
2447            &plan,
2448            &mut manifest,
2449            &[d],
2450            &http,
2451            &fs,
2452            &StubFfmpeg::flac(),
2453            &RecordingClock::new(),
2454            &ExecOptions::default(),
2455        );
2456
2457        assert_eq!(outcome.downloaded, 1);
2458        let calls = http.calls();
2459        let large = calls
2460            .iter()
2461            .position(|u| u.contains("e/large.jpg"))
2462            .unwrap();
2463        let small = calls
2464            .iter()
2465            .position(|u| u.contains("e/small.jpg"))
2466            .unwrap();
2467        assert!(large < small, "large art tried before small");
2468    }
2469
2470    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2471
2472    #[test]
2473    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2474        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2475        // fetched once and the bytes serve both.
2476        let c = art_clip("a");
2477        let d = desired(c.clone(), AudioFormat::Mp3);
2478        let plan = Plan {
2479            actions: vec![
2480                Action::Download {
2481                    clip: c.clone(),
2482                    lineage: LineageContext::own_root(&c),
2483                    path: d.path.clone(),
2484                    format: AudioFormat::Mp3,
2485                },
2486                Action::WriteArtifact {
2487                    kind: ArtifactKind::CoverJpg,
2488                    path: "a/cover.jpg".to_owned(),
2489                    source_url: c.selected_image_url().unwrap().to_owned(),
2490                    hash: "art".to_owned(),
2491                    owner_id: "a".to_owned(),
2492                    content: None,
2493                },
2494            ],
2495        };
2496        let http = ScriptedHttp::new()
2497            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2498            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2499        let fs = MemFs::new();
2500        let mut manifest = Manifest::new();
2501
2502        let outcome = run(
2503            &plan,
2504            &mut manifest,
2505            &[d],
2506            &http,
2507            &fs,
2508            &StubFfmpeg::flac(),
2509            &RecordingClock::new(),
2510            &ExecOptions::default(),
2511        );
2512
2513        assert_eq!(outcome.downloaded, 1);
2514        assert_eq!(outcome.artifacts_written, 1);
2515        assert_eq!(outcome.failed(), 0);
2516        // Fetched once, not twice.
2517        assert_eq!(http.count("a/large.jpg"), 1);
2518        // The sidecar carries the fetched bytes, and the audio was tagged.
2519        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2520        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2521    }
2522
2523    #[test]
2524    fn concurrent_downloads_reuse_each_clips_own_cover() {
2525        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2526        // (no cross-contamination) and each cover URL is fetched exactly once.
2527        let a = art_clip("a");
2528        let b = art_clip("b");
2529        let da = desired(a.clone(), AudioFormat::Mp3);
2530        let db = desired(b.clone(), AudioFormat::Mp3);
2531        let plan = Plan {
2532            actions: vec![
2533                Action::Download {
2534                    clip: a.clone(),
2535                    lineage: LineageContext::own_root(&a),
2536                    path: da.path.clone(),
2537                    format: AudioFormat::Mp3,
2538                },
2539                Action::WriteArtifact {
2540                    kind: ArtifactKind::CoverJpg,
2541                    path: "a/cover.jpg".to_owned(),
2542                    source_url: a.selected_image_url().unwrap().to_owned(),
2543                    hash: "art".to_owned(),
2544                    owner_id: "a".to_owned(),
2545                    content: None,
2546                },
2547                Action::Download {
2548                    clip: b.clone(),
2549                    lineage: LineageContext::own_root(&b),
2550                    path: db.path.clone(),
2551                    format: AudioFormat::Mp3,
2552                },
2553                Action::WriteArtifact {
2554                    kind: ArtifactKind::CoverJpg,
2555                    path: "b/cover.jpg".to_owned(),
2556                    source_url: b.selected_image_url().unwrap().to_owned(),
2557                    hash: "art".to_owned(),
2558                    owner_id: "b".to_owned(),
2559                    content: None,
2560                },
2561            ],
2562        };
2563        let http = ScriptedHttp::new()
2564            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2565            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2566            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2567            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2568        let fs = MemFs::new();
2569        let mut manifest = Manifest::new();
2570
2571        let outcome = run(
2572            &plan,
2573            &mut manifest,
2574            &[da, db],
2575            &http,
2576            &fs,
2577            &StubFfmpeg::flac(),
2578            &RecordingClock::new(),
2579            &small_poll(),
2580        );
2581
2582        assert_eq!(outcome.downloaded, 2);
2583        assert_eq!(outcome.artifacts_written, 2);
2584        assert_eq!(http.count("a/large.jpg"), 1);
2585        assert_eq!(http.count("b/large.jpg"), 1);
2586        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2587        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2588    }
2589
2590    #[test]
2591    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2592        // The large image 404s so the embed falls back to the small image; the
2593        // sidecar still wants the (dead) large URL and must NOT be handed the
2594        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2595        // the sidecar fetches the large URL itself (then fails on the 404).
2596        let c = art_clip("e");
2597        let d = desired(c.clone(), AudioFormat::Mp3);
2598        let plan = Plan {
2599            actions: vec![
2600                Action::Download {
2601                    clip: c.clone(),
2602                    lineage: LineageContext::own_root(&c),
2603                    path: d.path.clone(),
2604                    format: AudioFormat::Mp3,
2605                },
2606                Action::WriteArtifact {
2607                    kind: ArtifactKind::CoverJpg,
2608                    path: "e/cover.jpg".to_owned(),
2609                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2610                    hash: "art".to_owned(),
2611                    owner_id: "e".to_owned(),
2612                    content: None,
2613                },
2614            ],
2615        };
2616        let http = ScriptedHttp::new()
2617            .route("e.mp3", Reply::ok(b"body".to_vec()))
2618            .route("e/large.jpg", Reply::status(404))
2619            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2620        let fs = MemFs::new();
2621        let mut manifest = Manifest::new();
2622
2623        let outcome = run(
2624            &plan,
2625            &mut manifest,
2626            &[d],
2627            &http,
2628            &fs,
2629            &StubFfmpeg::flac(),
2630            &RecordingClock::new(),
2631            &ExecOptions::default(),
2632        );
2633
2634        assert_eq!(outcome.downloaded, 1);
2635        // The small image was fetched once (the embed fallback) and never reused
2636        // for the large-keyed sidecar; the sidecar went to the network itself.
2637        assert_eq!(http.count("e/small.jpg"), 1);
2638        assert!(
2639            http.count("e/large.jpg") >= 2,
2640            "sidecar refetched the large URL"
2641        );
2642        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2643        assert!(!fs.exists("e/cover.jpg"));
2644    }
2645
2646    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2647
2648    #[test]
2649    fn failed_write_leaves_the_prior_file_intact() {
2650        let c = clip("f");
2651        let d = desired(c.clone(), AudioFormat::Mp3);
2652        let plan = Plan {
2653            actions: vec![Action::Download {
2654                clip: c.clone(),
2655                lineage: LineageContext::own_root(&c),
2656                path: d.path.clone(),
2657                format: AudioFormat::Mp3,
2658            }],
2659        };
2660        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2661        let fs = MemFs::new()
2662            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2663            .fail_write("f.mp3");
2664        let mut manifest = Manifest::new();
2665
2666        let outcome = run(
2667            &plan,
2668            &mut manifest,
2669            &[d],
2670            &http,
2671            &fs,
2672            &StubFfmpeg::flac(),
2673            &RecordingClock::new(),
2674            &ExecOptions::default(),
2675        );
2676
2677        assert_eq!(outcome.downloaded, 0);
2678        assert_eq!(outcome.failed(), 1);
2679        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2680        assert!(manifest.get("f").is_none());
2681    }
2682
2683    #[test]
2684    fn size_mismatch_after_write_is_a_failure() {
2685        let c = clip("g");
2686        let d = desired(c.clone(), AudioFormat::Mp3);
2687        let plan = Plan {
2688            actions: vec![Action::Download {
2689                clip: c.clone(),
2690                lineage: LineageContext::own_root(&c),
2691                path: d.path.clone(),
2692                format: AudioFormat::Mp3,
2693            }],
2694        };
2695        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2696        let fs = MemFs::new().corrupt_write("g.mp3");
2697        let mut manifest = Manifest::new();
2698
2699        let outcome = run(
2700            &plan,
2701            &mut manifest,
2702            &[d],
2703            &http,
2704            &fs,
2705            &StubFfmpeg::flac(),
2706            &RecordingClock::new(),
2707            &ExecOptions::default(),
2708        );
2709
2710        assert_eq!(outcome.downloaded, 0);
2711        assert_eq!(outcome.failed(), 1);
2712        assert!(outcome.failures[0].reason.contains("expected"));
2713        assert!(manifest.get("g").is_none());
2714    }
2715
2716    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2717
2718    #[test]
2719    fn transient_failure_is_retried_then_skipped() {
2720        let c = clip("h");
2721        let d = desired(c.clone(), AudioFormat::Mp3);
2722        let plan = Plan {
2723            actions: vec![Action::Download {
2724                clip: c.clone(),
2725                lineage: LineageContext::own_root(&c),
2726                path: d.path.clone(),
2727                format: AudioFormat::Mp3,
2728            }],
2729        };
2730        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2731        let fs = MemFs::new();
2732        let clock = RecordingClock::new();
2733        let opts = ExecOptions {
2734            max_retries: 2,
2735            ..ExecOptions::default()
2736        };
2737        let mut manifest = Manifest::new();
2738
2739        let outcome = run(
2740            &plan,
2741            &mut manifest,
2742            &[d],
2743            &http,
2744            &fs,
2745            &StubFfmpeg::flac(),
2746            &clock,
2747            &opts,
2748        );
2749
2750        assert_eq!(outcome.downloaded, 0);
2751        assert_eq!(outcome.failed(), 1);
2752        assert_eq!(http.count("h.mp3"), 3);
2753        assert_eq!(clock.sleeps().len(), 2);
2754    }
2755
2756    #[test]
2757    fn truncated_download_is_retried_then_succeeds() {
2758        let c = clip("i");
2759        let d = desired(c.clone(), AudioFormat::Mp3);
2760        let plan = Plan {
2761            actions: vec![Action::Download {
2762                clip: c.clone(),
2763                lineage: LineageContext::own_root(&c),
2764                path: d.path.clone(),
2765                format: AudioFormat::Mp3,
2766            }],
2767        };
2768        let http = ScriptedHttp::new().route_seq(
2769            "i.mp3",
2770            vec![
2771                Reply::ok(b"short".to_vec()).with_content_length(999),
2772                Reply::ok(b"good-body".to_vec()),
2773            ],
2774        );
2775        let fs = MemFs::new();
2776        let clock = RecordingClock::new();
2777        let mut manifest = Manifest::new();
2778
2779        let outcome = run(
2780            &plan,
2781            &mut manifest,
2782            &[d],
2783            &http,
2784            &fs,
2785            &StubFfmpeg::flac(),
2786            &clock,
2787            &ExecOptions::default(),
2788        );
2789
2790        assert_eq!(outcome.downloaded, 1);
2791        assert_eq!(http.count("i.mp3"), 2);
2792        assert_eq!(clock.sleeps().len(), 1);
2793    }
2794
2795    #[test]
2796    fn rate_limit_backs_off_using_retry_after() {
2797        let c = clip("j");
2798        let d = desired(c.clone(), AudioFormat::Mp3);
2799        let plan = Plan {
2800            actions: vec![Action::Download {
2801                clip: c.clone(),
2802                lineage: LineageContext::own_root(&c),
2803                path: d.path.clone(),
2804                format: AudioFormat::Mp3,
2805            }],
2806        };
2807        let http = ScriptedHttp::new().route_seq(
2808            "j.mp3",
2809            vec![
2810                Reply::status(429).with_retry_after(7),
2811                Reply::ok(b"body".to_vec()),
2812            ],
2813        );
2814        let fs = MemFs::new();
2815        let clock = RecordingClock::new();
2816        let mut manifest = Manifest::new();
2817
2818        let outcome = run(
2819            &plan,
2820            &mut manifest,
2821            &[d],
2822            &http,
2823            &fs,
2824            &StubFfmpeg::flac(),
2825            &clock,
2826            &ExecOptions::default(),
2827        );
2828
2829        assert_eq!(outcome.downloaded, 1);
2830        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2831    }
2832
2833    #[test]
2834    fn auth_failure_aborts_the_run() {
2835        let c1 = clip("k1");
2836        let c2 = clip("k2");
2837        let d1 = desired(c1.clone(), AudioFormat::Flac);
2838        let d2 = desired(c2.clone(), AudioFormat::Flac);
2839        let plan = Plan {
2840            actions: vec![
2841                Action::Download {
2842                    clip: c1.clone(),
2843                    lineage: LineageContext::own_root(&c1),
2844                    path: d1.path.clone(),
2845                    format: AudioFormat::Flac,
2846                },
2847                Action::Download {
2848                    clip: c2.clone(),
2849                    lineage: LineageContext::own_root(&c2),
2850                    path: d2.path.clone(),
2851                    format: AudioFormat::Flac,
2852                },
2853            ],
2854        };
2855        // The authenticated WAV-render endpoint rejects auth even after a JWT
2856        // refresh: that is a bad token, so the whole run aborts rather than
2857        // hammering every clip. A CDN media rejection, by contrast, does not.
2858        let http = ScriptedHttp::new()
2859            .with_auth()
2860            .route("/wav_file/", Reply::status(401));
2861        let fs = MemFs::new();
2862        let mut manifest = Manifest::new();
2863
2864        let outcome = run(
2865            &plan,
2866            &mut manifest,
2867            &[d1, d2],
2868            &http,
2869            &fs,
2870            &StubFfmpeg::flac(),
2871            &RecordingClock::new(),
2872            &small_poll(),
2873        );
2874
2875        assert_eq!(outcome.status, RunStatus::AuthAborted);
2876        assert_eq!(outcome.failed(), 1);
2877        assert_eq!(outcome.failures[0].clip_id, "k1");
2878        assert_eq!(outcome.downloaded, 0);
2879    }
2880
2881    // ── Disk-full aborts the run (issue #17) ────────────────────────
2882
2883    #[test]
2884    fn disk_full_primary_write_aborts_the_run() {
2885        // Two MP3 downloads; the first write is out of space. That is systemic,
2886        // so the run aborts before the second is even attempted: exactly one
2887        // failure is recorded and its reason names the disk-full cause.
2888        let c1 = clip("d1");
2889        let c2 = clip("d2");
2890        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2891        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2892        let plan = Plan {
2893            actions: vec![
2894                Action::Download {
2895                    clip: c1.clone(),
2896                    lineage: LineageContext::own_root(&c1),
2897                    path: d1.path.clone(),
2898                    format: AudioFormat::Mp3,
2899                },
2900                Action::Download {
2901                    clip: c2.clone(),
2902                    lineage: LineageContext::own_root(&c2),
2903                    path: d2.path.clone(),
2904                    format: AudioFormat::Mp3,
2905                },
2906            ],
2907        };
2908        let http = ScriptedHttp::new()
2909            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2910            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2911        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2912        let mut manifest = Manifest::new();
2913
2914        let outcome = run(
2915            &plan,
2916            &mut manifest,
2917            &[d1, d2],
2918            &http,
2919            &fs,
2920            &StubFfmpeg::flac(),
2921            &RecordingClock::new(),
2922            &ExecOptions::default(),
2923        );
2924
2925        assert_eq!(outcome.status, RunStatus::DiskFull);
2926        assert_eq!(outcome.failed(), 1);
2927        assert_eq!(outcome.failures[0].clip_id, "d1");
2928        assert!(outcome.failures[0].reason.contains("disk full"));
2929        assert_eq!(outcome.downloaded, 0);
2930        // The second clip was never fetched: the run aborted first.
2931        assert_eq!(http.count("d2.mp3"), 0);
2932        assert!(!fs.exists("d2.mp3"));
2933    }
2934
2935    #[test]
2936    fn disk_full_flac_transcode_aborts_the_run() {
2937        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2938        // there is nowhere to stage the transcode, so the run aborts.
2939        let c1 = clip("d1");
2940        let c2 = clip("d2");
2941        let d1 = desired(c1.clone(), AudioFormat::Flac);
2942        let d2 = desired(c2.clone(), AudioFormat::Flac);
2943        let plan = Plan {
2944            actions: vec![
2945                Action::Download {
2946                    clip: c1.clone(),
2947                    lineage: LineageContext::own_root(&c1),
2948                    path: d1.path.clone(),
2949                    format: AudioFormat::Flac,
2950                },
2951                Action::Download {
2952                    clip: c2.clone(),
2953                    lineage: LineageContext::own_root(&c2),
2954                    path: d2.path.clone(),
2955                    format: AudioFormat::Flac,
2956                },
2957            ],
2958        };
2959        let http = ScriptedHttp::new()
2960            .with_auth()
2961            .route(
2962                "/wav_file/",
2963                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2964            )
2965            .route(".wav", Reply::ok(b"wav".to_vec()));
2966        let fs = MemFs::new();
2967        let mut manifest = Manifest::new();
2968
2969        let outcome = run(
2970            &plan,
2971            &mut manifest,
2972            &[d1, d2],
2973            &http,
2974            &fs,
2975            &StubFfmpeg::out_of_space(),
2976            &RecordingClock::new(),
2977            &ExecOptions::default(),
2978        );
2979
2980        assert_eq!(outcome.status, RunStatus::DiskFull);
2981        assert_eq!(outcome.failed(), 1);
2982        assert_eq!(outcome.failures[0].clip_id, "d1");
2983        assert!(outcome.failures[0].reason.contains("disk full"));
2984        assert_eq!(outcome.downloaded, 0);
2985    }
2986
2987    #[test]
2988    fn disk_full_artifact_write_aborts_the_run() {
2989        // A sidecar write (not a primary download) also aborts on a full disk:
2990        // the owning audio is present, the cover fetch succeeds, but the sidecar
2991        // cannot be written.
2992        let mut manifest = Manifest::new();
2993        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2994        let plan = Plan {
2995            actions: vec![Action::WriteArtifact {
2996                kind: ArtifactKind::CoverJpg,
2997                path: "a/cover.jpg".to_owned(),
2998                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2999                hash: "h1".to_owned(),
3000                owner_id: "a".to_owned(),
3001                content: None,
3002            }],
3003        };
3004        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3005        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
3006
3007        let outcome = run(
3008            &plan,
3009            &mut manifest,
3010            &[],
3011            &http,
3012            &fs,
3013            &StubFfmpeg::flac(),
3014            &RecordingClock::new(),
3015            &ExecOptions::default(),
3016        );
3017
3018        assert_eq!(outcome.status, RunStatus::DiskFull);
3019        assert_eq!(outcome.failed(), 1);
3020        assert!(outcome.failures[0].reason.contains("disk full"));
3021        assert_eq!(outcome.artifacts_written, 0);
3022        // The sidecar slot was never recorded: the write failed before it.
3023        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3024    }
3025
3026    #[test]
3027    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
3028        // write_verify fails before any manifest insert, so a re-download that
3029        // hits a full disk leaves the prior entry (and file) exactly as it was.
3030        let c = clip("m");
3031        let d = desired(c.clone(), AudioFormat::Mp3);
3032        let plan = Plan {
3033            actions: vec![Action::Download {
3034                clip: c.clone(),
3035                lineage: LineageContext::own_root(&c),
3036                path: d.path.clone(),
3037                format: AudioFormat::Mp3,
3038            }],
3039        };
3040        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
3041        let fs = MemFs::new()
3042            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
3043            .fail_write_out_of_space("m.mp3");
3044        let mut manifest = Manifest::new();
3045        let before = entry("m.mp3", AudioFormat::Mp3);
3046        manifest.insert("m", before.clone());
3047
3048        let outcome = run(
3049            &plan,
3050            &mut manifest,
3051            &[d],
3052            &http,
3053            &fs,
3054            &StubFfmpeg::flac(),
3055            &RecordingClock::new(),
3056            &ExecOptions::default(),
3057        );
3058
3059        assert_eq!(outcome.status, RunStatus::DiskFull);
3060        assert_eq!(manifest.get("m"), Some(&before));
3061        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
3062    }
3063
3064    #[test]
3065    fn cdn_download_rejection_skips_the_clip_without_aborting() {
3066        let c1 = clip("k1");
3067        let c2 = clip("k2");
3068        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3069        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3070        let plan = Plan {
3071            actions: vec![
3072                Action::Download {
3073                    clip: c1.clone(),
3074                    lineage: LineageContext::own_root(&c1),
3075                    path: d1.path.clone(),
3076                    format: AudioFormat::Mp3,
3077                },
3078                Action::Download {
3079                    clip: c2.clone(),
3080                    lineage: LineageContext::own_root(&c2),
3081                    path: d2.path.clone(),
3082                    format: AudioFormat::Mp3,
3083                },
3084            ],
3085        };
3086        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
3087        // rejection (often transient), not a bad token: the clip is retried
3088        // then recorded and skipped, and the run carries on to the rest.
3089        let http = ScriptedHttp::new()
3090            .route("k1.mp3", Reply::status(403))
3091            .route("k2.mp3", Reply::ok(b"body".to_vec()));
3092        let fs = MemFs::new();
3093        let mut manifest = Manifest::new();
3094
3095        let outcome = run(
3096            &plan,
3097            &mut manifest,
3098            &[d1, d2],
3099            &http,
3100            &fs,
3101            &StubFfmpeg::flac(),
3102            &RecordingClock::new(),
3103            &ExecOptions::default(),
3104        );
3105
3106        assert_ne!(outcome.status, RunStatus::AuthAborted);
3107        assert_eq!(outcome.downloaded, 1);
3108        assert_eq!(outcome.failed(), 1);
3109        assert_eq!(outcome.failures[0].clip_id, "k1");
3110    }
3111
3112    #[test]
3113    fn one_clip_failure_does_not_abort_the_run() {
3114        let c1 = clip("l1");
3115        let c2 = clip("l2");
3116        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3117        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3118        let plan = Plan {
3119            actions: vec![
3120                Action::Download {
3121                    clip: c1.clone(),
3122                    lineage: LineageContext::own_root(&c1),
3123                    path: d1.path.clone(),
3124                    format: AudioFormat::Mp3,
3125                },
3126                Action::Download {
3127                    clip: c2.clone(),
3128                    lineage: LineageContext::own_root(&c2),
3129                    path: d2.path.clone(),
3130                    format: AudioFormat::Mp3,
3131                },
3132            ],
3133        };
3134        let http = ScriptedHttp::new()
3135            .route("l1.mp3", Reply::status(404))
3136            .route("l2.mp3", Reply::ok(b"body".to_vec()));
3137        let fs = MemFs::new();
3138        let mut manifest = Manifest::new();
3139
3140        let outcome = run(
3141            &plan,
3142            &mut manifest,
3143            &[d1, d2],
3144            &http,
3145            &fs,
3146            &StubFfmpeg::flac(),
3147            &RecordingClock::new(),
3148            &ExecOptions::default(),
3149        );
3150
3151        assert_eq!(outcome.status, RunStatus::Completed);
3152        assert_eq!(outcome.downloaded, 1);
3153        assert_eq!(outcome.failed(), 1);
3154        assert_eq!(outcome.failures[0].clip_id, "l1");
3155        assert!(fs.exists("l2.mp3"));
3156        assert!(manifest.get("l2").is_some());
3157        assert!(manifest.get("l1").is_none());
3158    }
3159
3160    // ── preserve marker (SYNC-8) ────────────────────────────────────
3161
3162    #[test]
3163    fn preserve_is_set_for_copy_held_and_private_clips() {
3164        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
3165        mirror.modes = vec![SourceMode::Mirror];
3166        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
3167        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
3168        let mut private = desired(clip("m3"), AudioFormat::Mp3);
3169        private.private = true;
3170
3171        let plan = Plan {
3172            actions: vec![
3173                Action::Download {
3174                    clip: mirror.clip.clone(),
3175                    lineage: LineageContext::own_root(&mirror.clip),
3176                    path: mirror.path.clone(),
3177                    format: AudioFormat::Mp3,
3178                },
3179                Action::Download {
3180                    clip: copy_held.clip.clone(),
3181                    lineage: LineageContext::own_root(&copy_held.clip),
3182                    path: copy_held.path.clone(),
3183                    format: AudioFormat::Mp3,
3184                },
3185                Action::Download {
3186                    clip: private.clip.clone(),
3187                    lineage: LineageContext::own_root(&private.clip),
3188                    path: private.path.clone(),
3189                    format: AudioFormat::Mp3,
3190                },
3191            ],
3192        };
3193        let http = ScriptedHttp::new()
3194            .route("m1.mp3", Reply::ok(b"a".to_vec()))
3195            .route("m2.mp3", Reply::ok(b"b".to_vec()))
3196            .route("m3.mp3", Reply::ok(b"c".to_vec()));
3197        let fs = MemFs::new();
3198        let mut manifest = Manifest::new();
3199
3200        let outcome = run(
3201            &plan,
3202            &mut manifest,
3203            &[mirror, copy_held, private],
3204            &http,
3205            &fs,
3206            &StubFfmpeg::flac(),
3207            &RecordingClock::new(),
3208            &ExecOptions::default(),
3209        );
3210
3211        assert_eq!(outcome.downloaded, 3);
3212        assert!(!manifest.get("m1").unwrap().preserve);
3213        assert!(manifest.get("m2").unwrap().preserve);
3214        assert!(manifest.get("m3").unwrap().preserve);
3215    }
3216
3217    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
3218
3219    #[test]
3220    fn reformat_writes_new_format_and_removes_old_file() {
3221        let c = clip("n");
3222        let d = desired(c.clone(), AudioFormat::Mp3);
3223        let plan = Plan {
3224            actions: vec![Action::Reformat {
3225                clip: c.clone(),
3226                path: "n.mp3".to_owned(),
3227                from_path: "n.flac".to_owned(),
3228                from: AudioFormat::Flac,
3229                to: AudioFormat::Mp3,
3230            }],
3231        };
3232        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
3233        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
3234        let mut manifest = Manifest::new();
3235        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
3236
3237        let outcome = run(
3238            &plan,
3239            &mut manifest,
3240            &[d],
3241            &http,
3242            &fs,
3243            &StubFfmpeg::flac(),
3244            &RecordingClock::new(),
3245            &ExecOptions::default(),
3246        );
3247
3248        assert_eq!(outcome.reformatted, 1);
3249        assert!(fs.exists("n.mp3"));
3250        assert!(!fs.exists("n.flac"));
3251        let updated = manifest.get("n").unwrap();
3252        assert_eq!(updated.path, "n.mp3");
3253        assert_eq!(updated.format, AudioFormat::Mp3);
3254        assert_eq!(updated.meta_hash, "m");
3255    }
3256
3257    #[test]
3258    fn retag_rewrites_file_and_updates_hashes() {
3259        let c = clip("o");
3260        let mut d = desired(c.clone(), AudioFormat::Mp3);
3261        d.meta_hash = "new".to_owned();
3262        d.art_hash = "new-art".to_owned();
3263        let existing = tag_mp3(
3264            b"audio",
3265            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
3266            None,
3267            None,
3268        )
3269        .unwrap();
3270        let fs = MemFs::new().with_file("o.mp3", existing.clone());
3271        let mut manifest = Manifest::new();
3272        let mut start = entry("o.mp3", AudioFormat::Mp3);
3273        start.size = existing.len() as u64;
3274        manifest.insert("o", start);
3275        let plan = Plan {
3276            actions: vec![Action::Retag {
3277                clip: c.clone(),
3278                lineage: LineageContext::own_root(&c),
3279                path: "o.mp3".to_owned(),
3280            }],
3281        };
3282
3283        let outcome = run(
3284            &plan,
3285            &mut manifest,
3286            &[d],
3287            &ScriptedHttp::new(),
3288            &fs,
3289            &StubFfmpeg::flac(),
3290            &RecordingClock::new(),
3291            &ExecOptions::default(),
3292        );
3293
3294        assert_eq!(outcome.retagged, 1);
3295        let updated = manifest.get("o").unwrap();
3296        assert_eq!(updated.meta_hash, "new");
3297        assert_eq!(updated.art_hash, "new-art");
3298        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
3299    }
3300
3301    #[test]
3302    fn rename_moves_file_and_updates_manifest_path() {
3303        let c = clip("p");
3304        let mut d = desired(c.clone(), AudioFormat::Mp3);
3305        d.path = "new/p.mp3".to_owned();
3306        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3307        let mut manifest = Manifest::new();
3308        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3309        let plan = Plan {
3310            actions: vec![Action::Rename {
3311                from: "old/p.mp3".to_owned(),
3312                to: "new/p.mp3".to_owned(),
3313            }],
3314        };
3315
3316        let outcome = run(
3317            &plan,
3318            &mut manifest,
3319            &[d],
3320            &ScriptedHttp::new(),
3321            &fs,
3322            &StubFfmpeg::flac(),
3323            &RecordingClock::new(),
3324            &ExecOptions::default(),
3325        );
3326
3327        assert_eq!(outcome.renamed, 1);
3328        assert!(fs.exists("new/p.mp3"));
3329        assert!(!fs.exists("old/p.mp3"));
3330        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3331    }
3332
3333    #[test]
3334    fn disk_full_rename_aborts_the_run() {
3335        // A move onto a full disk is systemic like a full-disk write: the run
3336        // aborts with DiskFull and the source file is left untouched.
3337        let c = clip("p");
3338        let mut d = desired(c.clone(), AudioFormat::Mp3);
3339        d.path = "new/p.mp3".to_owned();
3340        let fs = MemFs::new()
3341            .with_file("old/p.mp3", b"DATA".to_vec())
3342            .fail_rename_out_of_space("new/p.mp3");
3343        let mut manifest = Manifest::new();
3344        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3345        let plan = Plan {
3346            actions: vec![Action::Rename {
3347                from: "old/p.mp3".to_owned(),
3348                to: "new/p.mp3".to_owned(),
3349            }],
3350        };
3351
3352        let outcome = run(
3353            &plan,
3354            &mut manifest,
3355            &[d],
3356            &ScriptedHttp::new(),
3357            &fs,
3358            &StubFfmpeg::flac(),
3359            &RecordingClock::new(),
3360            &ExecOptions::default(),
3361        );
3362
3363        assert_eq!(outcome.status, RunStatus::DiskFull);
3364        assert_eq!(outcome.renamed, 0);
3365        assert_eq!(outcome.failed(), 1);
3366        assert!(outcome.failures[0].reason.contains("disk full"));
3367        // The source is untouched: the move never happened.
3368        assert!(fs.exists("old/p.mp3"));
3369        assert!(!fs.exists("new/p.mp3"));
3370        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3371    }
3372
3373    #[test]
3374    fn delete_removes_file_and_manifest_entry() {
3375        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3376        let mut manifest = Manifest::new();
3377        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3378        let plan = Plan {
3379            actions: vec![Action::Delete {
3380                path: "q.mp3".to_owned(),
3381                clip_id: "q".to_owned(),
3382            }],
3383        };
3384
3385        let outcome = run(
3386            &plan,
3387            &mut manifest,
3388            &[],
3389            &ScriptedHttp::new(),
3390            &fs,
3391            &StubFfmpeg::flac(),
3392            &RecordingClock::new(),
3393            &ExecOptions::default(),
3394        );
3395
3396        assert_eq!(outcome.deleted, 1);
3397        assert!(!fs.exists("q.mp3"));
3398        assert!(manifest.get("q").is_none());
3399    }
3400
3401    #[test]
3402    fn failed_delete_keeps_the_manifest_entry() {
3403        let fs = MemFs::new()
3404            .with_file("s.mp3", b"DATA".to_vec())
3405            .fail_remove("s.mp3");
3406        let mut manifest = Manifest::new();
3407        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3408        let plan = Plan {
3409            actions: vec![Action::Delete {
3410                path: "s.mp3".to_owned(),
3411                clip_id: "s".to_owned(),
3412            }],
3413        };
3414
3415        let outcome = run(
3416            &plan,
3417            &mut manifest,
3418            &[],
3419            &ScriptedHttp::new(),
3420            &fs,
3421            &StubFfmpeg::flac(),
3422            &RecordingClock::new(),
3423            &ExecOptions::default(),
3424        );
3425
3426        assert_eq!(outcome.deleted, 0);
3427        assert_eq!(outcome.failed(), 1);
3428        assert!(manifest.get("s").is_some());
3429        assert!(fs.exists("s.mp3"));
3430    }
3431
3432    #[test]
3433    fn skip_is_a_noop() {
3434        let mut manifest = Manifest::new();
3435        let plan = Plan {
3436            actions: vec![Action::Skip {
3437                clip_id: "r".to_owned(),
3438            }],
3439        };
3440        let outcome = run(
3441            &plan,
3442            &mut manifest,
3443            &[],
3444            &ScriptedHttp::new(),
3445            &MemFs::new(),
3446            &StubFfmpeg::flac(),
3447            &RecordingClock::new(),
3448            &ExecOptions::default(),
3449        );
3450        assert_eq!(outcome.skipped, 1);
3451        assert_eq!(outcome.failed(), 0);
3452    }
3453
3454    // ── Pure helpers ────────────────────────────────────────────────
3455
3456    #[test]
3457    fn header_helpers_parse_or_ignore() {
3458        let resp = HttpResponse {
3459            status: 200,
3460            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3461            body: Vec::new(),
3462        };
3463        assert_eq!(content_length(&resp), Some(42));
3464
3465        let bare = HttpResponse {
3466            status: 200,
3467            headers: Vec::new(),
3468            body: Vec::new(),
3469        };
3470        assert_eq!(content_length(&bare), None);
3471    }
3472
3473    #[test]
3474    fn preserve_rule_covers_copy_and_private() {
3475        let base = desired(clip("x"), AudioFormat::Mp3);
3476        assert!(!preserve_for(&base));
3477        let mut copy_held = base.clone();
3478        copy_held.modes = vec![SourceMode::Copy];
3479        assert!(preserve_for(&copy_held));
3480        let mut private = base.clone();
3481        private.private = true;
3482        assert!(preserve_for(&private));
3483    }
3484
3485    fn fs_new() -> MemFs {
3486        MemFs::new()
3487    }
3488
3489    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3490
3491    #[test]
3492    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3493        let c = clip("s1");
3494        let mut d = desired(c.clone(), AudioFormat::Mp3);
3495        d.modes = vec![SourceMode::Copy];
3496        let plan = Plan {
3497            actions: vec![Action::Skip {
3498                clip_id: "s1".to_owned(),
3499            }],
3500        };
3501        let mut manifest = Manifest::new();
3502        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3503        assert!(!manifest.get("s1").unwrap().preserve);
3504
3505        let outcome = run(
3506            &plan,
3507            &mut manifest,
3508            &[d],
3509            &ScriptedHttp::new(),
3510            &fs_new(),
3511            &StubFfmpeg::flac(),
3512            &RecordingClock::new(),
3513            &ExecOptions::default(),
3514        );
3515
3516        assert_eq!(outcome.skipped, 1);
3517        assert!(
3518            manifest.get("s1").unwrap().preserve,
3519            "a copy-held skip must mark the entry preserved"
3520        );
3521    }
3522
3523    #[test]
3524    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3525        let c = clip("s2");
3526        let d = desired(c.clone(), AudioFormat::Mp3);
3527        let plan = Plan {
3528            actions: vec![Action::Skip {
3529                clip_id: "s2".to_owned(),
3530            }],
3531        };
3532        let mut manifest = Manifest::new();
3533        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3534        stale.preserve = true;
3535        manifest.insert("s2".to_owned(), stale);
3536
3537        run(
3538            &plan,
3539            &mut manifest,
3540            &[d],
3541            &ScriptedHttp::new(),
3542            &fs_new(),
3543            &StubFfmpeg::flac(),
3544            &RecordingClock::new(),
3545            &ExecOptions::default(),
3546        );
3547
3548        assert!(
3549            !manifest.get("s2").unwrap().preserve,
3550            "a mirror-only skip must clear a stale preserve marker"
3551        );
3552    }
3553
3554    #[test]
3555    fn flac_render_retries_a_rate_limited_wav_lookup() {
3556        let c = clip("rl");
3557        let d = desired(c.clone(), AudioFormat::Flac);
3558        let plan = Plan {
3559            actions: vec![Action::Download {
3560                clip: c.clone(),
3561                lineage: LineageContext::own_root(&c),
3562                path: d.path.clone(),
3563                format: AudioFormat::Flac,
3564            }],
3565        };
3566        let http = ScriptedHttp::new()
3567            .with_auth()
3568            .route_seq(
3569                "/wav_file/",
3570                vec![
3571                    Reply::status(429),
3572                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3573                ],
3574            )
3575            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3576        let clock = RecordingClock::new();
3577        let mut manifest = Manifest::new();
3578
3579        let outcome = run(
3580            &plan,
3581            &mut manifest,
3582            &[d],
3583            &http,
3584            &fs_new(),
3585            &StubFfmpeg::flac(),
3586            &clock,
3587            &small_poll(),
3588        );
3589
3590        assert_eq!(outcome.downloaded, 1);
3591        assert_eq!(outcome.failed(), 0);
3592        // The render was ready on retry, so no fresh convert_wav was needed.
3593        assert_eq!(http.count("/convert_wav/"), 0);
3594        // One transient backoff (1s base), not the 5s poll interval.
3595        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3596    }
3597
3598    // ── Phase 6: artifact actions ───────────────────────────────────
3599
3600    #[test]
3601    fn write_artifact_fetches_writes_and_updates_manifest() {
3602        // The owning entry exists (its audio was kept this run); WriteArtifact
3603        // fetches the source, writes the sidecar, and records it on the entry.
3604        let mut manifest = Manifest::new();
3605        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3606        let plan = Plan {
3607            actions: vec![Action::WriteArtifact {
3608                kind: ArtifactKind::CoverJpg,
3609                path: "a/cover.jpg".to_owned(),
3610                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3611                hash: "h1".to_owned(),
3612                owner_id: "a".to_owned(),
3613                content: None,
3614            }],
3615        };
3616        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3617        let fs = MemFs::new();
3618
3619        let outcome = run(
3620            &plan,
3621            &mut manifest,
3622            &[],
3623            &http,
3624            &fs,
3625            &StubFfmpeg::flac(),
3626            &RecordingClock::new(),
3627            &ExecOptions::default(),
3628        );
3629
3630        assert_eq!(outcome.artifacts_written, 1);
3631        assert_eq!(outcome.failed(), 0);
3632        assert_eq!(outcome.status, RunStatus::Completed);
3633        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3634        assert_eq!(
3635            manifest.get("a").unwrap().cover_jpg,
3636            Some(ArtifactState {
3637                path: "a/cover.jpg".to_owned(),
3638                hash: "h1".to_owned(),
3639            })
3640        );
3641    }
3642
3643    #[test]
3644    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3645        // A generated text sidecar carries its body inline, so it is written
3646        // verbatim with NO HTTP fetch and the details slot records its state.
3647        let mut manifest = Manifest::new();
3648        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3649        let plan = Plan {
3650            actions: vec![Action::WriteArtifact {
3651                kind: ArtifactKind::DetailsTxt,
3652                path: "a.details.txt".to_owned(),
3653                source_url: String::new(),
3654                hash: "dh".to_owned(),
3655                owner_id: "a".to_owned(),
3656                content: Some("Title: A\n".to_owned()),
3657            }],
3658        };
3659        // An empty HTTP script: any fetch would fail, proving none happens.
3660        let http = ScriptedHttp::new();
3661        let fs = MemFs::new();
3662
3663        let outcome = run(
3664            &plan,
3665            &mut manifest,
3666            &[],
3667            &http,
3668            &fs,
3669            &StubFfmpeg::flac(),
3670            &RecordingClock::new(),
3671            &ExecOptions::default(),
3672        );
3673
3674        assert_eq!(outcome.artifacts_written, 1);
3675        assert_eq!(outcome.failed(), 0);
3676        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3677        assert_eq!(
3678            manifest.get("a").unwrap().details_txt,
3679            Some(ArtifactState {
3680                path: "a.details.txt".to_owned(),
3681                hash: "dh".to_owned(),
3682            })
3683        );
3684    }
3685
3686    #[test]
3687    fn write_lyrics_sidecar_relocation_removes_old_file() {
3688        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3689        // the executor writes the new file and prunes the stale one.
3690        let mut manifest = Manifest::new();
3691        let mut e = entry("old/a.flac", AudioFormat::Flac);
3692        e.lyrics_txt = Some(ArtifactState {
3693            path: "old/a.lyrics.txt".to_owned(),
3694            hash: "lh".to_owned(),
3695        });
3696        manifest.insert("a", e);
3697        let fs = MemFs::new()
3698            .with_file("old/a.flac", b"AUDIO".to_vec())
3699            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3700        let plan = Plan {
3701            actions: vec![Action::WriteArtifact {
3702                kind: ArtifactKind::LyricsTxt,
3703                path: "new/a.lyrics.txt".to_owned(),
3704                source_url: String::new(),
3705                hash: "lh".to_owned(),
3706                owner_id: "a".to_owned(),
3707                content: Some("new words\n".to_owned()),
3708            }],
3709        };
3710
3711        let outcome = run(
3712            &plan,
3713            &mut manifest,
3714            &[],
3715            &ScriptedHttp::new(),
3716            &fs,
3717            &StubFfmpeg::flac(),
3718            &RecordingClock::new(),
3719            &ExecOptions::default(),
3720        );
3721
3722        assert_eq!(outcome.failed(), 0);
3723        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3724        assert!(!fs.exists("old/a.lyrics.txt"));
3725        assert_eq!(
3726            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3727            "new/a.lyrics.txt"
3728        );
3729    }
3730
3731    #[test]
3732    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3733        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3734        // Each write's inline old-path cleanup must skip a path another action
3735        // writes this run, or the second write would delete the first's freshly
3736        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3737        // for every sidecar, including the .mp4 video.
3738        let mut manifest = Manifest::new();
3739        let mut a = entry("a.flac", AudioFormat::Flac);
3740        a.lyrics_txt = Some(ArtifactState {
3741            path: "x.lyrics.txt".to_owned(),
3742            hash: "ah".to_owned(),
3743        });
3744        manifest.insert("a", a);
3745        let mut b = entry("b.flac", AudioFormat::Flac);
3746        b.lyrics_txt = Some(ArtifactState {
3747            path: "y.lyrics.txt".to_owned(),
3748            hash: "bh".to_owned(),
3749        });
3750        manifest.insert("b", b);
3751        let fs = MemFs::new()
3752            .with_file("a.flac", b"A".to_vec())
3753            .with_file("b.flac", b"B".to_vec())
3754            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3755            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3756        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3757        let plan = Plan {
3758            actions: vec![
3759                Action::WriteArtifact {
3760                    kind: ArtifactKind::LyricsTxt,
3761                    path: "y.lyrics.txt".to_owned(),
3762                    source_url: String::new(),
3763                    hash: "ah".to_owned(),
3764                    owner_id: "a".to_owned(),
3765                    content: Some("A words\n".to_owned()),
3766                },
3767                Action::WriteArtifact {
3768                    kind: ArtifactKind::LyricsTxt,
3769                    path: "x.lyrics.txt".to_owned(),
3770                    source_url: String::new(),
3771                    hash: "bh".to_owned(),
3772                    owner_id: "b".to_owned(),
3773                    content: Some("B words\n".to_owned()),
3774                },
3775            ],
3776        };
3777
3778        let outcome = run(
3779            &plan,
3780            &mut manifest,
3781            &[],
3782            &ScriptedHttp::new(),
3783            &fs,
3784            &StubFfmpeg::flac(),
3785            &RecordingClock::new(),
3786            &ExecOptions::default(),
3787        );
3788
3789        assert_eq!(outcome.failed(), 0);
3790        // Both freshly written files survive; neither cleanup clobbered the other.
3791        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3792        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3793        assert_eq!(
3794            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3795            "y.lyrics.txt"
3796        );
3797        assert_eq!(
3798            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3799            "x.lyrics.txt"
3800        );
3801    }
3802
3803    #[test]
3804    fn old_sidecar_kept_when_another_clip_still_references_it() {
3805        // A prior failed swap can leave two clips pointing at one path (A -> y and
3806        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3807        // still A's live file (#76). tracked_paths counts two references to y, so
3808        // the removal is skipped even though y is not a write target this run.
3809        let mut manifest = Manifest::new();
3810        let mut a = entry("a.flac", AudioFormat::Flac);
3811        a.lyrics_txt = Some(ArtifactState {
3812            path: "y.lyrics.txt".to_owned(),
3813            hash: "ah".to_owned(),
3814        });
3815        manifest.insert("a", a);
3816        let mut b = entry("b.flac", AudioFormat::Flac);
3817        b.lyrics_txt = Some(ArtifactState {
3818            path: "y.lyrics.txt".to_owned(),
3819            hash: "bh".to_owned(),
3820        });
3821        manifest.insert("b", b);
3822        let fs = MemFs::new()
3823            .with_file("a.flac", b"A".to_vec())
3824            .with_file("b.flac", b"B".to_vec())
3825            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3826        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3827        // the tracked-reference count is what protects A's file.
3828        let plan = Plan {
3829            actions: vec![Action::WriteArtifact {
3830                kind: ArtifactKind::LyricsTxt,
3831                path: "x.lyrics.txt".to_owned(),
3832                source_url: String::new(),
3833                hash: "bh".to_owned(),
3834                owner_id: "b".to_owned(),
3835                content: Some("B words\n".to_owned()),
3836            }],
3837        };
3838
3839        let outcome = run(
3840            &plan,
3841            &mut manifest,
3842            &[],
3843            &ScriptedHttp::new(),
3844            &fs,
3845            &StubFfmpeg::flac(),
3846            &RecordingClock::new(),
3847            &ExecOptions::default(),
3848        );
3849
3850        assert_eq!(outcome.failed(), 0);
3851        assert!(
3852            fs.exists("y.lyrics.txt"),
3853            "A's live sidecar must not be deleted"
3854        );
3855        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3856    }
3857
3858    #[test]
3859    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3860        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3861        // When BOTH move away this run, the path is no longer live, so the last
3862        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3863        // still referenced. The dynamic reference count drops to zero only after
3864        // both moves, so exactly the final cleanup removes it (#76).
3865        let mut manifest = Manifest::new();
3866        let mut a = entry("a.flac", AudioFormat::Flac);
3867        a.lyrics_txt = Some(ArtifactState {
3868            path: "s.lyrics.txt".to_owned(),
3869            hash: "ah".to_owned(),
3870        });
3871        manifest.insert("a", a);
3872        let mut b = entry("b.flac", AudioFormat::Flac);
3873        b.lyrics_txt = Some(ArtifactState {
3874            path: "s.lyrics.txt".to_owned(),
3875            hash: "bh".to_owned(),
3876        });
3877        manifest.insert("b", b);
3878        let fs = MemFs::new()
3879            .with_file("a.flac", b"A".to_vec())
3880            .with_file("b.flac", b"B".to_vec())
3881            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3882        let plan = Plan {
3883            actions: vec![
3884                Action::WriteArtifact {
3885                    kind: ArtifactKind::LyricsTxt,
3886                    path: "pa.lyrics.txt".to_owned(),
3887                    source_url: String::new(),
3888                    hash: "ah".to_owned(),
3889                    owner_id: "a".to_owned(),
3890                    content: Some("A words\n".to_owned()),
3891                },
3892                Action::WriteArtifact {
3893                    kind: ArtifactKind::LyricsTxt,
3894                    path: "pb.lyrics.txt".to_owned(),
3895                    source_url: String::new(),
3896                    hash: "bh".to_owned(),
3897                    owner_id: "b".to_owned(),
3898                    content: Some("B words\n".to_owned()),
3899                },
3900            ],
3901        };
3902
3903        let outcome = run(
3904            &plan,
3905            &mut manifest,
3906            &[],
3907            &ScriptedHttp::new(),
3908            &fs,
3909            &StubFfmpeg::flac(),
3910            &RecordingClock::new(),
3911            &ExecOptions::default(),
3912        );
3913
3914        assert_eq!(outcome.failed(), 0);
3915        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3916        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3917        assert!(
3918            !fs.exists("s.lyrics.txt"),
3919            "the vacated shared path must be reclaimed, not orphaned"
3920        );
3921    }
3922
3923    #[test]
3924    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3925        // A text sidecar for a clip with no manifest entry (its audio download
3926        // failed) must be skipped, never writing an untracked file.
3927        let plan = Plan {
3928            actions: vec![Action::WriteArtifact {
3929                kind: ArtifactKind::DetailsTxt,
3930                path: "gone.details.txt".to_owned(),
3931                source_url: String::new(),
3932                hash: "dh".to_owned(),
3933                owner_id: "gone".to_owned(),
3934                content: Some("Title: Gone\n".to_owned()),
3935            }],
3936        };
3937        let fs = MemFs::new();
3938        let mut manifest = Manifest::new();
3939
3940        let outcome = run(
3941            &plan,
3942            &mut manifest,
3943            &[],
3944            &ScriptedHttp::new(),
3945            &fs,
3946            &StubFfmpeg::flac(),
3947            &RecordingClock::new(),
3948            &ExecOptions::default(),
3949        );
3950
3951        assert_eq!(outcome.artifacts_written, 0);
3952        assert_eq!(outcome.skipped, 1);
3953        assert!(!fs.exists("gone.details.txt"));
3954        assert!(manifest.get("gone").is_none());
3955    }
3956
3957    #[test]
3958    fn delete_artifact_removes_file_and_clears_slot() {
3959        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3960        let mut manifest = Manifest::new();
3961        let mut e = entry("a.mp3", AudioFormat::Mp3);
3962        e.cover_jpg = Some(ArtifactState {
3963            path: "a/cover.jpg".to_owned(),
3964            hash: "h1".to_owned(),
3965        });
3966        manifest.insert("a", e);
3967        let plan = Plan {
3968            actions: vec![Action::DeleteArtifact {
3969                kind: ArtifactKind::CoverJpg,
3970                path: "a/cover.jpg".to_owned(),
3971                owner_id: "a".to_owned(),
3972            }],
3973        };
3974
3975        let outcome = run(
3976            &plan,
3977            &mut manifest,
3978            &[],
3979            &ScriptedHttp::new(),
3980            &fs,
3981            &StubFfmpeg::flac(),
3982            &RecordingClock::new(),
3983            &ExecOptions::default(),
3984        );
3985
3986        assert_eq!(outcome.artifacts_deleted, 1);
3987        assert!(!fs.exists("a/cover.jpg"));
3988        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3989    }
3990
3991    #[test]
3992    fn delete_artifact_tolerates_already_absent_file() {
3993        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3994        // is not a failure.
3995        let mut manifest = Manifest::new();
3996        let mut e = entry("a.mp3", AudioFormat::Mp3);
3997        e.cover_jpg = Some(ArtifactState {
3998            path: "a/cover.jpg".to_owned(),
3999            hash: "h1".to_owned(),
4000        });
4001        manifest.insert("a", e);
4002        let plan = Plan {
4003            actions: vec![Action::DeleteArtifact {
4004                kind: ArtifactKind::CoverJpg,
4005                path: "a/cover.jpg".to_owned(),
4006                owner_id: "a".to_owned(),
4007            }],
4008        };
4009
4010        let outcome = run(
4011            &plan,
4012            &mut manifest,
4013            &[],
4014            &ScriptedHttp::new(),
4015            &MemFs::new(),
4016            &StubFfmpeg::flac(),
4017            &RecordingClock::new(),
4018            &ExecOptions::default(),
4019        );
4020
4021        assert_eq!(outcome.artifacts_deleted, 1);
4022        assert_eq!(outcome.failed(), 0);
4023        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4024    }
4025
4026    #[test]
4027    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
4028        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
4029        // the run continues and the following WriteArtifact still succeeds.
4030        let mut manifest = Manifest::new();
4031        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4032        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4033        let plan = Plan {
4034            actions: vec![
4035                Action::WriteArtifact {
4036                    kind: ArtifactKind::CoverJpg,
4037                    path: "a/cover.jpg".to_owned(),
4038                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4039                    hash: "h1".to_owned(),
4040                    owner_id: "a".to_owned(),
4041                    content: None,
4042                },
4043                Action::WriteArtifact {
4044                    kind: ArtifactKind::CoverJpg,
4045                    path: "b/cover.jpg".to_owned(),
4046                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4047                    hash: "h2".to_owned(),
4048                    owner_id: "b".to_owned(),
4049                    content: None,
4050                },
4051            ],
4052        };
4053        let http = ScriptedHttp::new()
4054            .route("a/large.jpg", Reply::status(404))
4055            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4056        let fs = MemFs::new();
4057
4058        let outcome = run(
4059            &plan,
4060            &mut manifest,
4061            &[],
4062            &http,
4063            &fs,
4064            &StubFfmpeg::flac(),
4065            &RecordingClock::new(),
4066            &ExecOptions::default(),
4067        );
4068
4069        assert_eq!(outcome.status, RunStatus::Completed);
4070        assert_eq!(outcome.failed(), 1);
4071        assert_eq!(outcome.failures[0].clip_id, "a");
4072        assert_eq!(outcome.artifacts_written, 1);
4073        // The failed sidecar left no file and no manifest record.
4074        assert!(!fs.exists("a/cover.jpg"));
4075        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4076        // The following sidecar was written and recorded.
4077        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4078        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4079    }
4080
4081    #[test]
4082    fn stranded_old_sidecar_removed_when_colliding_writer_fails() {
4083        // #142: clip A moves its cover shared -> a/cover.jpg (fetch succeeds);
4084        // clip B is planned to write the vacated `shared` path but its fetch
4085        // fails. The old-path cleanup is gated on COMMITTED writes, not planned
4086        // ones, so B's failed write no longer protects the stale file: A's old
4087        // `shared` copy is removed rather than left as an untracked orphan.
4088        let mut manifest = Manifest::new();
4089        let mut a = entry("a.mp3", AudioFormat::Mp3);
4090        a.cover_jpg = Some(ArtifactState {
4091            path: "shared/cover.jpg".to_owned(),
4092            hash: "ha".to_owned(),
4093        });
4094        manifest.insert("a", a);
4095        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4096        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4097        let plan = Plan {
4098            actions: vec![
4099                Action::WriteArtifact {
4100                    kind: ArtifactKind::CoverJpg,
4101                    path: "a/cover.jpg".to_owned(),
4102                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4103                    hash: "ha".to_owned(),
4104                    owner_id: "a".to_owned(),
4105                    content: None,
4106                },
4107                Action::WriteArtifact {
4108                    kind: ArtifactKind::CoverJpg,
4109                    path: "shared/cover.jpg".to_owned(),
4110                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4111                    hash: "hb".to_owned(),
4112                    owner_id: "b".to_owned(),
4113                    content: None,
4114                },
4115            ],
4116        };
4117        let http = ScriptedHttp::new()
4118            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4119            .route("b/large.jpg", Reply::status(404));
4120
4121        let outcome = run(
4122            &plan,
4123            &mut manifest,
4124            &[],
4125            &http,
4126            &fs,
4127            &StubFfmpeg::flac(),
4128            &RecordingClock::new(),
4129            &ExecOptions::default(),
4130        );
4131
4132        assert_eq!(outcome.failed(), 1);
4133        assert_eq!(outcome.failures[0].clip_id, "b");
4134        // A's move committed; the vacated file is gone, not an orphan.
4135        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4136        assert!(
4137            !fs.exists("shared/cover.jpg"),
4138            "the vacated file must be removed once the colliding writer failed"
4139        );
4140        assert_eq!(
4141            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4142            "a/cover.jpg"
4143        );
4144    }
4145
4146    #[test]
4147    fn committed_write_at_old_path_is_preserved() {
4148        // #142: clip B writes `shared` and commits BEFORE clip A vacates it
4149        // (A moves shared -> a/cover.jpg). A's cleanup sees `shared` in the
4150        // committed set and keeps B's freshly written file rather than deleting
4151        // it. This is the successful-collision case the guard must still protect.
4152        let mut manifest = Manifest::new();
4153        let mut a = entry("a.mp3", AudioFormat::Mp3);
4154        a.cover_jpg = Some(ArtifactState {
4155            path: "shared/cover.jpg".to_owned(),
4156            hash: "ha".to_owned(),
4157        });
4158        manifest.insert("a", a);
4159        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4160        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4161        let plan = Plan {
4162            actions: vec![
4163                Action::WriteArtifact {
4164                    kind: ArtifactKind::CoverJpg,
4165                    path: "shared/cover.jpg".to_owned(),
4166                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4167                    hash: "hb".to_owned(),
4168                    owner_id: "b".to_owned(),
4169                    content: None,
4170                },
4171                Action::WriteArtifact {
4172                    kind: ArtifactKind::CoverJpg,
4173                    path: "a/cover.jpg".to_owned(),
4174                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4175                    hash: "ha".to_owned(),
4176                    owner_id: "a".to_owned(),
4177                    content: None,
4178                },
4179            ],
4180        };
4181        let http = ScriptedHttp::new()
4182            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()))
4183            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()));
4184
4185        let outcome = run(
4186            &plan,
4187            &mut manifest,
4188            &[],
4189            &http,
4190            &fs,
4191            &StubFfmpeg::flac(),
4192            &RecordingClock::new(),
4193            &ExecOptions::default(),
4194        );
4195
4196        assert_eq!(outcome.failed(), 0);
4197        // B's committed write survives A's subsequent move; both files are present.
4198        assert_eq!(fs.read_file("shared/cover.jpg").unwrap(), b"jpg-b");
4199        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4200        assert_eq!(
4201            manifest.get("b").unwrap().cover_jpg.as_ref().unwrap().path,
4202            "shared/cover.jpg"
4203        );
4204        assert_eq!(
4205            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4206            "a/cover.jpg"
4207        );
4208    }
4209
4210    #[test]
4211    fn cover_move_renames_without_fetching() {
4212        // #141: a MoveArtifact relocates the cover with a local rename. The
4213        // ScriptedHttp has no route, so any fetch would fail the run; a clean
4214        // outcome proves the bytes were renamed, not re-downloaded.
4215        let mut manifest = Manifest::new();
4216        let mut e = entry("a.mp3", AudioFormat::Mp3);
4217        e.cover_jpg = Some(ArtifactState {
4218            path: "old/cover.jpg".to_owned(),
4219            hash: "h".to_owned(),
4220        });
4221        manifest.insert("a", e);
4222        let fs = MemFs::new().with_file("old/cover.jpg", b"JPGBYTES".to_vec());
4223        let plan = Plan {
4224            actions: vec![Action::MoveArtifact {
4225                kind: ArtifactKind::CoverJpg,
4226                from: "old/cover.jpg".to_owned(),
4227                to: "new/cover.jpg".to_owned(),
4228                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4229                hash: "h".to_owned(),
4230                owner_id: "a".to_owned(),
4231            }],
4232        };
4233
4234        let outcome = run(
4235            &plan,
4236            &mut manifest,
4237            &[],
4238            &ScriptedHttp::new(),
4239            &fs,
4240            &StubFfmpeg::flac(),
4241            &RecordingClock::new(),
4242            &ExecOptions::default(),
4243        );
4244
4245        assert_eq!(outcome.failed(), 0);
4246        assert_eq!(outcome.renamed, 1, "counted as a rename, not a write");
4247        // Renamed in place: the new path carries the ORIGINAL bytes, old is gone.
4248        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"JPGBYTES");
4249        assert!(!fs.exists("old/cover.jpg"));
4250        assert_eq!(
4251            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4252            "new/cover.jpg"
4253        );
4254    }
4255
4256    #[test]
4257    fn cover_move_falls_back_to_fetch_when_old_file_missing() {
4258        // #141: the old file vanished before commit, so the rename fails and the
4259        // executor fetches fresh bytes at the new path rather than failing.
4260        let mut manifest = Manifest::new();
4261        let mut e = entry("a.mp3", AudioFormat::Mp3);
4262        e.cover_jpg = Some(ArtifactState {
4263            path: "old/cover.jpg".to_owned(),
4264            hash: "h".to_owned(),
4265        });
4266        manifest.insert("a", e);
4267        let fs = MemFs::new(); // old/cover.jpg is absent.
4268        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED".to_vec()));
4269        let plan = Plan {
4270            actions: vec![Action::MoveArtifact {
4271                kind: ArtifactKind::CoverJpg,
4272                from: "old/cover.jpg".to_owned(),
4273                to: "new/cover.jpg".to_owned(),
4274                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4275                hash: "h".to_owned(),
4276                owner_id: "a".to_owned(),
4277            }],
4278        };
4279
4280        let outcome = run(
4281            &plan,
4282            &mut manifest,
4283            &[],
4284            &http,
4285            &fs,
4286            &StubFfmpeg::flac(),
4287            &RecordingClock::new(),
4288            &ExecOptions::default(),
4289        );
4290
4291        assert_eq!(outcome.failed(), 0);
4292        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"FETCHED");
4293        assert_eq!(
4294            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4295            "new/cover.jpg"
4296        );
4297    }
4298
4299    #[test]
4300    fn cover_move_falls_back_when_source_co_referenced() {
4301        // Two clips' covers share old/cover.jpg after a prior failed swap. A move
4302        // for `a` must NOT rename the shared file away (that would strand `b`); it
4303        // falls back to a fetch, and `b`'s file survives.
4304        let mut manifest = Manifest::new();
4305        let mut a = entry("a.mp3", AudioFormat::Mp3);
4306        a.cover_jpg = Some(ArtifactState {
4307            path: "old/cover.jpg".to_owned(),
4308            hash: "h".to_owned(),
4309        });
4310        manifest.insert("a", a);
4311        let mut b = entry("b.mp3", AudioFormat::Mp3);
4312        b.cover_jpg = Some(ArtifactState {
4313            path: "old/cover.jpg".to_owned(),
4314            hash: "h".to_owned(),
4315        });
4316        manifest.insert("b", b);
4317        let fs = MemFs::new().with_file("old/cover.jpg", b"SHARED".to_vec());
4318        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED-A".to_vec()));
4319        // Only `a` moves this run: old/cover.jpg -> a/cover.jpg.
4320        let plan = Plan {
4321            actions: vec![Action::MoveArtifact {
4322                kind: ArtifactKind::CoverJpg,
4323                from: "old/cover.jpg".to_owned(),
4324                to: "a/cover.jpg".to_owned(),
4325                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4326                hash: "h".to_owned(),
4327                owner_id: "a".to_owned(),
4328            }],
4329        };
4330
4331        let outcome = run(
4332            &plan,
4333            &mut manifest,
4334            &[],
4335            &http,
4336            &fs,
4337            &StubFfmpeg::flac(),
4338            &RecordingClock::new(),
4339            &ExecOptions::default(),
4340        );
4341
4342        assert_eq!(outcome.failed(), 0);
4343        // `a` got a fresh fetched copy; `b`'s shared file is untouched.
4344        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"FETCHED-A");
4345        assert_eq!(
4346            fs.read_file("old/cover.jpg").unwrap(),
4347            b"SHARED",
4348            "the co-referenced file must survive"
4349        );
4350    }
4351
4352    #[test]
4353    fn stem_move_renames_without_refetch() {
4354        // #141: a MoveStem relocates the raw stem with a rename; no route is set,
4355        // so a clean outcome proves it did not re-render or re-fetch.
4356        let mut manifest = Manifest::new();
4357        let mut e = entry("a.flac", AudioFormat::Flac);
4358        e.stems.insert(
4359            "voc".to_owned(),
4360            ArtifactState {
4361                path: "old.stems/voc.mp3".to_owned(),
4362                hash: "h1".to_owned(),
4363            },
4364        );
4365        manifest.insert("a", e);
4366        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"STEMBYTES".to_vec());
4367        let plan = Plan {
4368            actions: vec![Action::MoveStem {
4369                clip_id: "a".to_owned(),
4370                key: "voc".to_owned(),
4371                stem_id: "voc".to_owned(),
4372                from: "old.stems/voc.mp3".to_owned(),
4373                to: "new.stems/voc.mp3".to_owned(),
4374                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4375                format: StemFormat::Mp3,
4376                hash: "h1".to_owned(),
4377            }],
4378        };
4379
4380        let outcome = run(
4381            &plan,
4382            &mut manifest,
4383            &[],
4384            &ScriptedHttp::new(),
4385            &fs,
4386            &StubFfmpeg::flac(),
4387            &RecordingClock::new(),
4388            &ExecOptions::default(),
4389        );
4390
4391        assert_eq!(outcome.failed(), 0);
4392        assert_eq!(outcome.renamed, 1);
4393        assert_eq!(fs.read_file("new.stems/voc.mp3").unwrap(), b"STEMBYTES");
4394        assert!(!fs.exists("old.stems/voc.mp3"));
4395        assert_eq!(
4396            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4397            "new.stems/voc.mp3"
4398        );
4399    }
4400
4401    #[test]
4402    fn stem_move_falls_back_to_fetch_when_source_co_referenced() {
4403        // Two clips' stems share shared.stems/voc.mp3 after a partially-failed
4404        // swap (the file holds `a`'s bytes). When `b` moves it, move_stem must NOT
4405        // rename the shared file under `b`'s hash (that records `a`'s bytes as
4406        // `b`'s); it falls back to a fetch of `b`'s correct bytes.
4407        let mut manifest = Manifest::new();
4408        let mut a = entry("a.flac", AudioFormat::Flac);
4409        a.stems.insert(
4410            "voc".to_owned(),
4411            ArtifactState {
4412                path: "shared.stems/voc.mp3".to_owned(),
4413                hash: "h".to_owned(),
4414            },
4415        );
4416        manifest.insert("a", a);
4417        let mut b = entry("b.flac", AudioFormat::Flac);
4418        b.stems.insert(
4419            "voc".to_owned(),
4420            ArtifactState {
4421                path: "shared.stems/voc.mp3".to_owned(),
4422                hash: "h".to_owned(),
4423            },
4424        );
4425        manifest.insert("b", b);
4426        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4427        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4428        let plan = Plan {
4429            actions: vec![Action::MoveStem {
4430                clip_id: "b".to_owned(),
4431                key: "voc".to_owned(),
4432                stem_id: "bvoc".to_owned(),
4433                from: "shared.stems/voc.mp3".to_owned(),
4434                to: "b.stems/voc.mp3".to_owned(),
4435                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4436                format: StemFormat::Mp3,
4437                hash: "h".to_owned(),
4438            }],
4439        };
4440
4441        let outcome = run(
4442            &plan,
4443            &mut manifest,
4444            &[],
4445            &http,
4446            &fs,
4447            &StubFfmpeg::flac(),
4448            &RecordingClock::new(),
4449            &ExecOptions::default(),
4450        );
4451
4452        assert_eq!(outcome.failed(), 0);
4453        // b's new stem carries b's freshly fetched bytes, never a's renamed bytes.
4454        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4455        assert_eq!(
4456            fs.read_file("shared.stems/voc.mp3").unwrap(),
4457            b"A-STEM",
4458            "the co-referenced stem must survive"
4459        );
4460    }
4461
4462    #[test]
4463    fn write_stem_keeps_shared_stem_when_co_referenced() {
4464        // Two clips share shared.stems/voc.mp3 after a prior partially-failed swap.
4465        // When `b` writes to a new path, write_stem must NOT remove the shared file;
4466        // clip `a` still references it and its stem must survive.
4467        let mut manifest = Manifest::new();
4468        let mut a = entry("a.flac", AudioFormat::Flac);
4469        a.stems.insert(
4470            "voc".to_owned(),
4471            ArtifactState {
4472                path: "shared.stems/voc.mp3".to_owned(),
4473                hash: "h".to_owned(),
4474            },
4475        );
4476        manifest.insert("a", a);
4477        let mut b = entry("b.flac", AudioFormat::Flac);
4478        b.stems.insert(
4479            "voc".to_owned(),
4480            ArtifactState {
4481                path: "shared.stems/voc.mp3".to_owned(),
4482                hash: "h".to_owned(),
4483            },
4484        );
4485        manifest.insert("b", b);
4486        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4487        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4488        let plan = Plan {
4489            actions: vec![Action::WriteStem {
4490                clip_id: "b".to_owned(),
4491                key: "voc".to_owned(),
4492                stem_id: "bvoc".to_owned(),
4493                path: "b.stems/voc.mp3".to_owned(),
4494                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4495                format: StemFormat::Mp3,
4496                hash: "bh".to_owned(),
4497            }],
4498        };
4499
4500        let outcome = run(
4501            &plan,
4502            &mut manifest,
4503            &[],
4504            &http,
4505            &fs,
4506            &StubFfmpeg::flac(),
4507            &RecordingClock::new(),
4508            &ExecOptions::default(),
4509        );
4510
4511        assert_eq!(outcome.failed(), 0);
4512        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4513        assert_eq!(
4514            fs.read_file("shared.stems/voc.mp3").unwrap(),
4515            b"A-STEM",
4516            "the co-referenced stem must survive"
4517        );
4518    }
4519
4520    #[test]
4521    fn co_delete_executes_audio_delete_then_artifact_delete() {
4522        // The plan orders the audio Delete before its sidecar DeleteArtifact.
4523        // The audio delete removes the manifest entry; the sidecar delete then
4524        // removes the file and tolerates the now-absent entry.
4525        let fs = MemFs::new()
4526            .with_file("gone.mp3", b"DATA".to_vec())
4527            .with_file("gone/cover.jpg", b"jpg".to_vec());
4528        let mut manifest = Manifest::new();
4529        let mut e = entry("gone.mp3", AudioFormat::Mp3);
4530        e.cover_jpg = Some(ArtifactState {
4531            path: "gone/cover.jpg".to_owned(),
4532            hash: "h1".to_owned(),
4533        });
4534        manifest.insert("gone", e);
4535        let plan = Plan {
4536            actions: vec![
4537                Action::Delete {
4538                    path: "gone.mp3".to_owned(),
4539                    clip_id: "gone".to_owned(),
4540                },
4541                Action::DeleteArtifact {
4542                    kind: ArtifactKind::CoverJpg,
4543                    path: "gone/cover.jpg".to_owned(),
4544                    owner_id: "gone".to_owned(),
4545                },
4546            ],
4547        };
4548
4549        let outcome = run(
4550            &plan,
4551            &mut manifest,
4552            &[],
4553            &ScriptedHttp::new(),
4554            &fs,
4555            &StubFfmpeg::flac(),
4556            &RecordingClock::new(),
4557            &ExecOptions::default(),
4558        );
4559
4560        assert_eq!(outcome.deleted, 1);
4561        assert_eq!(outcome.artifacts_deleted, 1);
4562        assert_eq!(outcome.failed(), 0);
4563        assert!(!fs.exists("gone.mp3"));
4564        assert!(!fs.exists("gone/cover.jpg"));
4565        assert!(manifest.get("gone").is_none());
4566    }
4567
4568    #[test]
4569    fn write_stem_mp3_stores_raw_and_records_slot() {
4570        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
4571        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
4572        // keyed slot records the path and hash.
4573        let mut manifest = Manifest::new();
4574        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4575        let plan = Plan {
4576            actions: vec![Action::WriteStem {
4577                clip_id: "a".to_owned(),
4578                key: "voc".to_owned(),
4579                stem_id: "voc".to_owned(),
4580                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4581                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4582                format: StemFormat::Mp3,
4583                hash: "vh".to_owned(),
4584            }],
4585        };
4586        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
4587        let fs = MemFs::new();
4588
4589        let outcome = run(
4590            &plan,
4591            &mut manifest,
4592            &[],
4593            &http,
4594            &fs,
4595            &StubFfmpeg::flac(),
4596            &RecordingClock::new(),
4597            &ExecOptions::default(),
4598        );
4599
4600        assert_eq!(outcome.artifacts_written, 1);
4601        assert_eq!(outcome.failed(), 0);
4602        // Bytes are stored exactly as delivered (no transcode applied).
4603        assert_eq!(
4604            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
4605            b"stem-bytes"
4606        );
4607        // An MP3 stem never renders WAV: no convert_wav, no generation.
4608        assert_eq!(http.count("convert_wav"), 0);
4609        assert_eq!(http.count("/api/gen/"), 0);
4610        assert_eq!(
4611            manifest.get("a").unwrap().stems.get("voc"),
4612            Some(&ArtifactState {
4613                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4614                hash: "vh".to_owned(),
4615            })
4616        );
4617    }
4618
4619    #[test]
4620    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
4621        // A WAV stem (the default) renders the stem clip's lossless WAV through the
4622        // free convert_wav flow keyed on the stem id, then downloads and stores it
4623        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
4624        let mut manifest = Manifest::new();
4625        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4626        let plan = Plan {
4627            actions: vec![Action::WriteStem {
4628                clip_id: "a".to_owned(),
4629                key: "voc".to_owned(),
4630                stem_id: "stemvoc".to_owned(),
4631                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
4632                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
4633                format: StemFormat::Wav,
4634                hash: "vh".to_owned(),
4635            }],
4636        };
4637        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
4638        // (free) and polls again — exactly the main FLAC/WAV render path.
4639        let http = ScriptedHttp::new()
4640            .with_auth()
4641            .route_seq(
4642                "stemvoc/wav_file/",
4643                vec![
4644                    Reply::json("{}"),
4645                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
4646                ],
4647            )
4648            .route("stemvoc/convert_wav/", Reply::status(200))
4649            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
4650        let fs = MemFs::new();
4651
4652        let outcome = run(
4653            &plan,
4654            &mut manifest,
4655            &[],
4656            &http,
4657            &fs,
4658            &StubFfmpeg::flac(),
4659            &RecordingClock::new(),
4660            &small_poll(),
4661        );
4662
4663        assert_eq!(outcome.artifacts_written, 1);
4664        assert_eq!(outcome.failed(), 0);
4665        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
4666        // so the stored bytes are the raw WAV, not a FLAC transcode.
4667        assert_eq!(
4668            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
4669            b"RIFFwav-bytes"
4670        );
4671        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
4672        // The free WAV render ran; no credit-spending generation endpoint did.
4673        assert_eq!(http.count("convert_wav"), 1);
4674        assert_eq!(http.count("stem_task"), 0);
4675        assert_eq!(http.count("separate"), 0);
4676        assert_eq!(
4677            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4678            "a.stems/a - Vocals [stemvoc].wav"
4679        );
4680    }
4681
4682    #[test]
4683    fn write_stem_is_skipped_when_owner_audio_is_absent() {
4684        // No owning manifest entry (audio failed or never existed) => skip with
4685        // no fetch and no write, so a stem is never stranded without its song.
4686        let mut manifest = Manifest::new();
4687        let plan = Plan {
4688            actions: vec![Action::WriteStem {
4689                clip_id: "ghost".to_owned(),
4690                key: "voc".to_owned(),
4691                stem_id: "voc".to_owned(),
4692                path: "ghost.stems/voc.mp3".to_owned(),
4693                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4694                format: StemFormat::Mp3,
4695                hash: "vh".to_owned(),
4696            }],
4697        };
4698        // Empty HTTP script: any fetch would error, proving none happens.
4699        let http = ScriptedHttp::new();
4700        let fs = MemFs::new();
4701
4702        let outcome = run(
4703            &plan,
4704            &mut manifest,
4705            &[],
4706            &http,
4707            &fs,
4708            &StubFfmpeg::flac(),
4709            &RecordingClock::new(),
4710            &ExecOptions::default(),
4711        );
4712
4713        assert_eq!(outcome.skipped, 1);
4714        assert_eq!(outcome.artifacts_written, 0);
4715        assert_eq!(outcome.failed(), 0);
4716        assert!(!fs.exists("ghost.stems/voc.mp3"));
4717    }
4718
4719    #[test]
4720    fn write_stem_relocates_the_old_file_on_a_path_move() {
4721        // The song was renamed, so the stem moves: the new file is written and the
4722        // stale copy at the previously tracked path is removed (moved, not orphaned).
4723        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
4724        let mut manifest = Manifest::new();
4725        let mut e = entry("new.flac", AudioFormat::Flac);
4726        e.stems.insert(
4727            "voc".to_owned(),
4728            ArtifactState {
4729                path: "old.stems/voc.mp3".to_owned(),
4730                hash: "vh".to_owned(),
4731            },
4732        );
4733        manifest.insert("a", e);
4734        let plan = Plan {
4735            actions: vec![Action::WriteStem {
4736                clip_id: "a".to_owned(),
4737                key: "voc".to_owned(),
4738                stem_id: "voc".to_owned(),
4739                path: "new.stems/voc.mp3".to_owned(),
4740                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4741                format: StemFormat::Mp3,
4742                hash: "vh".to_owned(),
4743            }],
4744        };
4745        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4746
4747        let outcome = run(
4748            &plan,
4749            &mut manifest,
4750            &[],
4751            &http,
4752            &fs,
4753            &StubFfmpeg::flac(),
4754            &RecordingClock::new(),
4755            &ExecOptions::default(),
4756        );
4757
4758        assert_eq!(outcome.artifacts_written, 1);
4759        assert!(fs.exists("new.stems/voc.mp3"));
4760        assert!(
4761            !fs.exists("old.stems/voc.mp3"),
4762            "the old stem is moved, not left behind"
4763        );
4764        assert_eq!(
4765            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4766            "new.stems/voc.mp3"
4767        );
4768    }
4769
4770    #[test]
4771    fn delete_stem_removes_file_and_clears_slot() {
4772        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4773        let mut manifest = Manifest::new();
4774        let mut e = entry("a.flac", AudioFormat::Flac);
4775        e.stems.insert(
4776            "voc".to_owned(),
4777            ArtifactState {
4778                path: "a.stems/voc.mp3".to_owned(),
4779                hash: "vh".to_owned(),
4780            },
4781        );
4782        manifest.insert("a", e);
4783        let plan = Plan {
4784            actions: vec![Action::DeleteStem {
4785                clip_id: "a".to_owned(),
4786                key: "voc".to_owned(),
4787                path: "a.stems/voc.mp3".to_owned(),
4788            }],
4789        };
4790
4791        let outcome = run(
4792            &plan,
4793            &mut manifest,
4794            &[],
4795            &ScriptedHttp::new(),
4796            &fs,
4797            &StubFfmpeg::flac(),
4798            &RecordingClock::new(),
4799            &ExecOptions::default(),
4800        );
4801
4802        assert_eq!(outcome.artifacts_deleted, 1);
4803        assert!(!fs.exists("a.stems/voc.mp3"));
4804        assert!(manifest.get("a").unwrap().stems.is_empty());
4805    }
4806
4807    #[test]
4808    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4809        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4810        // pruned by the end-of-run sweep, so it can never be orphaned.
4811        let fs = MemFs::new()
4812            .with_file("song.flac", b"DATA".to_vec())
4813            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4814        assert!(fs.has_dir("song.stems"));
4815        let mut manifest = Manifest::new();
4816        let mut e = entry("song.flac", AudioFormat::Flac);
4817        e.stems.insert(
4818            "voc".to_owned(),
4819            ArtifactState {
4820                path: "song.stems/voc.mp3".to_owned(),
4821                hash: "vh".to_owned(),
4822            },
4823        );
4824        manifest.insert("a", e);
4825        let plan = Plan {
4826            actions: vec![
4827                Action::Delete {
4828                    path: "song.flac".to_owned(),
4829                    clip_id: "a".to_owned(),
4830                },
4831                Action::DeleteStem {
4832                    clip_id: "a".to_owned(),
4833                    key: "voc".to_owned(),
4834                    path: "song.stems/voc.mp3".to_owned(),
4835                },
4836            ],
4837        };
4838
4839        let outcome = run(
4840            &plan,
4841            &mut manifest,
4842            &[],
4843            &ScriptedHttp::new(),
4844            &fs,
4845            &StubFfmpeg::flac(),
4846            &RecordingClock::new(),
4847            &ExecOptions::default(),
4848        );
4849
4850        assert_eq!(outcome.deleted, 1);
4851        assert_eq!(outcome.artifacts_deleted, 1);
4852        assert!(!fs.exists("song.flac"));
4853        assert!(!fs.exists("song.stems/voc.mp3"));
4854        assert!(
4855            !fs.has_dir("song.stems"),
4856            "the emptied .stems folder is pruned"
4857        );
4858        assert!(manifest.get("a").is_none());
4859    }
4860
4861    #[test]
4862    fn write_stem_mp3_never_issues_a_generation_post() {
4863        // The MP3 stem path is GET-only: writing a stem fetches its CDN url and
4864        // never POSTs, let alone to any generation or WAV-render endpoint.
4865        let mut manifest = Manifest::new();
4866        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4867        let plan = Plan {
4868            actions: vec![Action::WriteStem {
4869                clip_id: "a".to_owned(),
4870                key: "voc".to_owned(),
4871                stem_id: "voc".to_owned(),
4872                path: "a.stems/voc.mp3".to_owned(),
4873                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4874                format: StemFormat::Mp3,
4875                hash: "vh".to_owned(),
4876            }],
4877        };
4878        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem".to_vec()));
4879
4880        run(
4881            &plan,
4882            &mut manifest,
4883            &[],
4884            &http,
4885            &MemFs::new(),
4886            &StubFfmpeg::flac(),
4887            &RecordingClock::new(),
4888            &ExecOptions::default(),
4889        );
4890
4891        assert_eq!(
4892            http.count("stem_task"),
4893            0,
4894            "no generation endpoint is ever hit"
4895        );
4896        assert_eq!(http.count("convert_wav"), 0);
4897        assert_eq!(http.count("/api/gen/"), 0);
4898    }
4899
4900    #[test]
4901    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4902        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4903        // GET over the live page-count + 0-indexed page shape), reconcile them into
4904        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4905        // is GET-only and touches NO `/api/gen/` endpoint at all.
4906        let http = ScriptedHttp::new()
4907            .with_auth()
4908            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4909            .route(
4910                "clip1/stems?page=0",
4911                Reply::json(
4912                    r#"{"stems":[
4913                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4914                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4915                    ]}"#,
4916                ),
4917            )
4918            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
4919            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
4920
4921        // List the existing stems through the client (GET-only, free).
4922        let mut auth = ClerkAuth::new("eyJtoken");
4923        pollster::block_on(auth.authenticate(&http)).unwrap();
4924        let mut client = SunoClient::new(auth, RecordingClock::new());
4925        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4926        assert!(complete);
4927        assert_eq!(stems.len(), 2);
4928        assert_eq!(stems[0].label, "Vocals");
4929
4930        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
4931        let mut manifest = Manifest::new();
4932        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4933        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4934            .iter()
4935            .map(|s| crate::reconcile::DesiredStem {
4936                key: s.id.clone(),
4937                stem_id: s.id.clone(),
4938                path: format!("clip1.stems/{}.mp3", s.id),
4939                source_url: s.url.clone(),
4940                format: StemFormat::Mp3,
4941                hash: crate::art_url_hash(&s.url),
4942            })
4943            .collect();
4944        let d = Desired {
4945            path: "clip1.flac".to_owned(),
4946            stems: Some(desired_stems),
4947            ..desired(clip("clip1"), AudioFormat::Flac)
4948        };
4949        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4950            "clip1".to_owned(),
4951            crate::reconcile::LocalFile {
4952                exists: true,
4953                size: 100,
4954            },
4955        )]
4956        .into_iter()
4957        .collect();
4958        let sources = [crate::reconcile::SourceStatus {
4959            mode: SourceMode::Mirror,
4960            fully_enumerated: true,
4961        }];
4962        let plan =
4963            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4964        assert_eq!(plan.stem_writes(), 2);
4965
4966        let fs = MemFs::new();
4967        let outcome = run(
4968            &plan,
4969            &mut manifest,
4970            std::slice::from_ref(&d),
4971            &http,
4972            &fs,
4973            &StubFfmpeg::flac(),
4974            &RecordingClock::new(),
4975            &ExecOptions::default(),
4976        );
4977
4978        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
4979        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
4980        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
4981        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
4982        // generation, no separation).
4983        assert_eq!(http.count("/api/gen/"), 0);
4984        assert_eq!(http.count("stem_task"), 0);
4985        assert_eq!(http.count("separate"), 0);
4986        assert_eq!(http.count("generate"), 0);
4987        // No stem is ever written as FLAC.
4988        assert!(!fs.exists("clip1.stems/s1.flac"));
4989    }
4990
4991    #[test]
4992    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
4993        // End-to-end #100 path with WAV stems (the default): each stem's lossless
4994        // WAV is rendered through the FREE convert_wav flow and stored RAW as
4995        // `.wav`. The mirror makes NO credit-spending generation POST.
4996        let http = ScriptedHttp::new()
4997            .with_auth()
4998            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4999            .route(
5000                "clip1/stems?page=0",
5001                Reply::json(
5002                    r#"{"stems":[
5003                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
5004                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
5005                    ]}"#,
5006                ),
5007            )
5008            // Each stem's WAV is already rendered, so wav_file returns the url and
5009            // no convert_wav POST is even needed (still free either way).
5010            .route(
5011                "s1/wav_file/",
5012                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
5013            )
5014            .route(
5015                "s2/wav_file/",
5016                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
5017            )
5018            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
5019            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
5020
5021        let mut auth = ClerkAuth::new("eyJtoken");
5022        pollster::block_on(auth.authenticate(&http)).unwrap();
5023        let mut client = SunoClient::new(auth, RecordingClock::new());
5024        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
5025
5026        let mut manifest = Manifest::new();
5027        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
5028        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
5029            .iter()
5030            .map(|s| crate::reconcile::DesiredStem {
5031                key: s.id.clone(),
5032                stem_id: s.id.clone(),
5033                path: format!("clip1.stems/{}.wav", s.id),
5034                source_url: s.url.clone(),
5035                format: StemFormat::Wav,
5036                hash: crate::art_url_hash(&s.url),
5037            })
5038            .collect();
5039        let d = Desired {
5040            path: "clip1.flac".to_owned(),
5041            stems: Some(desired_stems),
5042            ..desired(clip("clip1"), AudioFormat::Flac)
5043        };
5044        let local: HashMap<String, crate::reconcile::LocalFile> = [(
5045            "clip1".to_owned(),
5046            crate::reconcile::LocalFile {
5047                exists: true,
5048                size: 100,
5049            },
5050        )]
5051        .into_iter()
5052        .collect();
5053        let sources = [crate::reconcile::SourceStatus {
5054            mode: SourceMode::Mirror,
5055            fully_enumerated: true,
5056        }];
5057        let plan =
5058            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
5059
5060        let fs = MemFs::new();
5061        let outcome = run(
5062            &plan,
5063            &mut manifest,
5064            std::slice::from_ref(&d),
5065            &http,
5066            &fs,
5067            &StubFfmpeg::flac(),
5068            &RecordingClock::new(),
5069            &small_poll(),
5070        );
5071
5072        assert_eq!(outcome.artifacts_written, 2);
5073        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
5074        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
5075        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
5076        assert!(!fs.exists("clip1.stems/s1.flac"));
5077        // No credit-spending generation/separation endpoint is ever hit.
5078        assert_eq!(http.count("stem_task"), 0);
5079        assert_eq!(http.count("separate"), 0);
5080        assert_eq!(http.count("generate"), 0);
5081    }
5082
5083    #[test]
5084    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
5085        // A clip whose Download fails leaves no manifest entry, so its following
5086        // WriteArtifact must not strand an untracked sidecar: it is skipped with
5087        // no fetch and no write. A following healthy clip still succeeds.
5088        let ca = clip("a");
5089        let plan = Plan {
5090            actions: vec![
5091                Action::Download {
5092                    clip: ca.clone(),
5093                    lineage: LineageContext::own_root(&ca),
5094                    path: "a.mp3".to_owned(),
5095                    format: AudioFormat::Mp3,
5096                },
5097                Action::WriteArtifact {
5098                    kind: ArtifactKind::CoverJpg,
5099                    path: "a/cover.jpg".to_owned(),
5100                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5101                    hash: "h1".to_owned(),
5102                    owner_id: "a".to_owned(),
5103                    content: None,
5104                },
5105                Action::WriteArtifact {
5106                    kind: ArtifactKind::CoverJpg,
5107                    path: "b/cover.jpg".to_owned(),
5108                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5109                    hash: "h2".to_owned(),
5110                    owner_id: "b".to_owned(),
5111                    content: None,
5112                },
5113            ],
5114        };
5115        // The Download's audio 404s (permanent), so no entry for "a" is created.
5116        let http = ScriptedHttp::new()
5117            .route("a.mp3", Reply::status(404))
5118            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
5119            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5120        let fs = MemFs::new();
5121        let mut manifest = Manifest::new();
5122        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
5123        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5124
5125        let outcome = run(
5126            &plan,
5127            &mut manifest,
5128            &[],
5129            &http,
5130            &fs,
5131            &StubFfmpeg::flac(),
5132            &RecordingClock::new(),
5133            &ExecOptions::default(),
5134        );
5135
5136        assert_eq!(outcome.status, RunStatus::Completed);
5137        // The audio download is the only failure; the orphan artifact is skipped.
5138        assert_eq!(outcome.failed(), 1);
5139        assert_eq!(outcome.failures[0].clip_id, "a");
5140        assert_eq!(outcome.skipped, 1);
5141        // The orphan sidecar was neither fetched nor written, and left no record.
5142        assert_eq!(http.count("a/large.jpg"), 0);
5143        assert!(!fs.exists("a/cover.jpg"));
5144        assert!(manifest.get("a").is_none());
5145        // The healthy clip's sidecar still succeeded.
5146        assert_eq!(outcome.artifacts_written, 1);
5147        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5148        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5149    }
5150
5151    #[test]
5152    fn write_artifact_transcodes_animated_cover_to_webp() {
5153        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
5154        // port, and writes the transcoded WebP (not the fetched MP4), recording
5155        // the sidecar on the owning entry.
5156        let mut manifest = Manifest::new();
5157        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5158        let plan = Plan {
5159            actions: vec![Action::WriteArtifact {
5160                kind: ArtifactKind::CoverWebp,
5161                path: "a/cover.webp".to_owned(),
5162                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5163                hash: "v1".to_owned(),
5164                owner_id: "a".to_owned(),
5165                content: None,
5166            }],
5167        };
5168        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5169        let fs = MemFs::new();
5170        let ffmpeg = StubFfmpeg::webp();
5171
5172        let outcome = run(
5173            &plan,
5174            &mut manifest,
5175            &[],
5176            &http,
5177            &fs,
5178            &ffmpeg,
5179            &RecordingClock::new(),
5180            &ExecOptions::default(),
5181        );
5182
5183        assert_eq!(outcome.artifacts_written, 1);
5184        assert_eq!(outcome.failed(), 0);
5185        assert_eq!(outcome.status, RunStatus::Completed);
5186        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
5187        assert_eq!(http.count("a/video.mp4"), 1);
5188        let written = fs.read_file("a/cover.webp").unwrap();
5189        assert_ne!(written, b"mp4-bytes");
5190        assert!(written.starts_with(b"RIFF"));
5191        assert_eq!(
5192            manifest.get("a").unwrap().cover_webp,
5193            Some(ArtifactState {
5194                path: "a/cover.webp".to_owned(),
5195                hash: "v1".to_owned(),
5196            })
5197        );
5198    }
5199
5200    #[test]
5201    fn write_artifact_webp_transcode_failure_is_per_clip() {
5202        // A transcode failure is attributed to the owning clip: it is a per-clip
5203        // failure, the run completes, no sidecar is written, and the slot stays
5204        // empty. A healthy static cover in the same run still succeeds.
5205        let mut manifest = Manifest::new();
5206        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5207        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5208        let plan = Plan {
5209            actions: vec![
5210                Action::WriteArtifact {
5211                    kind: ArtifactKind::CoverWebp,
5212                    path: "a/cover.webp".to_owned(),
5213                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5214                    hash: "v1".to_owned(),
5215                    owner_id: "a".to_owned(),
5216                    content: None,
5217                },
5218                Action::WriteArtifact {
5219                    kind: ArtifactKind::CoverJpg,
5220                    path: "b/cover.jpg".to_owned(),
5221                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5222                    hash: "h1".to_owned(),
5223                    owner_id: "b".to_owned(),
5224                    content: None,
5225                },
5226            ],
5227        };
5228        let http = ScriptedHttp::new()
5229            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
5230            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5231        let fs = MemFs::new();
5232
5233        let outcome = run(
5234            &plan,
5235            &mut manifest,
5236            &[],
5237            &http,
5238            &fs,
5239            &StubFfmpeg::failing(),
5240            &RecordingClock::new(),
5241            &ExecOptions::default(),
5242        );
5243
5244        assert_eq!(outcome.status, RunStatus::Completed);
5245        assert_eq!(outcome.failed(), 1);
5246        assert_eq!(outcome.failures[0].clip_id, "a");
5247        // The animated cover failed to transcode: nothing written, slot empty.
5248        assert!(!fs.exists("a/cover.webp"));
5249        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
5250        // The static cover in the same run still succeeded.
5251        assert_eq!(outcome.artifacts_written, 1);
5252        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5253        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5254    }
5255
5256    #[test]
5257    fn write_artifact_uses_configured_webp_settings() {
5258        use std::sync::{Arc, Mutex};
5259
5260        struct RecordingWebpFfmpeg {
5261            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
5262        }
5263
5264        impl Ffmpeg for RecordingWebpFfmpeg {
5265            async fn wav_to_flac(
5266                &self,
5267                _wav: &[u8],
5268            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5269                Ok(Vec::new())
5270            }
5271
5272            async fn mp4_to_webp(
5273                &self,
5274                _mp4: &[u8],
5275                settings: WebpEncodeSettings,
5276            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5277                let seen = Arc::clone(&self.seen);
5278                seen.lock().unwrap().push(settings);
5279                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
5280            }
5281        }
5282
5283        let mut manifest = Manifest::new();
5284        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5285        let plan = Plan {
5286            actions: vec![Action::WriteArtifact {
5287                kind: ArtifactKind::CoverWebp,
5288                path: "a/cover.webp".to_owned(),
5289                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5290                hash: "v1".to_owned(),
5291                owner_id: "a".to_owned(),
5292                content: None,
5293            }],
5294        };
5295        let seen = Arc::new(Mutex::new(Vec::new()));
5296        let ffmpeg = RecordingWebpFfmpeg {
5297            seen: Arc::clone(&seen),
5298        };
5299        let opts = ExecOptions {
5300            cover_webp: WebpEncodeSettings {
5301                quality: 88,
5302                max_fps: 12,
5303                max_width: Some(720),
5304                lossless: false,
5305                compression_level: 4,
5306            },
5307            ..ExecOptions::default()
5308        };
5309
5310        let _ = run(
5311            &plan,
5312            &mut manifest,
5313            &[],
5314            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
5315            &MemFs::new(),
5316            &ffmpeg,
5317            &RecordingClock::new(),
5318            &opts,
5319        );
5320
5321        assert_eq!(
5322            seen.lock().unwrap().as_slice(),
5323            &[WebpEncodeSettings {
5324                quality: 88,
5325                max_fps: 12,
5326                max_width: Some(720),
5327                lossless: false,
5328                compression_level: 4,
5329            }]
5330        );
5331    }
5332
5333    // ── Phase 8: folder art routes to the album store ───────────────
5334
5335    #[test]
5336    fn folder_jpg_write_records_album_state_and_skips_manifest() {
5337        // Folder art is owned by the album root id, not a manifest clip: it
5338        // writes even with an empty manifest and records on the album store.
5339        let mut manifest = Manifest::new();
5340        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5341        let plan = Plan {
5342            actions: vec![Action::WriteArtifact {
5343                kind: ArtifactKind::FolderJpg,
5344                path: "creator/album/folder.jpg".to_owned(),
5345                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5346                hash: "jh".to_owned(),
5347                owner_id: "root".to_owned(),
5348                content: None,
5349            }],
5350        };
5351        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
5352        let fs = MemFs::new();
5353
5354        let outcome = run_with_albums(
5355            &plan,
5356            &mut manifest,
5357            &mut albums,
5358            &[],
5359            &http,
5360            &fs,
5361            &StubFfmpeg::flac(),
5362            &RecordingClock::new(),
5363            &ExecOptions::default(),
5364        );
5365
5366        assert_eq!(outcome.artifacts_written, 1);
5367        assert_eq!(outcome.status, RunStatus::Completed);
5368        assert_eq!(
5369            fs.read_file("creator/album/folder.jpg").unwrap(),
5370            b"folder-jpg"
5371        );
5372        assert_eq!(
5373            albums.get("root").unwrap().folder_jpg,
5374            Some(ArtifactState {
5375                path: "creator/album/folder.jpg".to_owned(),
5376                hash: "jh".to_owned(),
5377            })
5378        );
5379        assert!(manifest.get("root").is_none());
5380    }
5381
5382    #[test]
5383    fn folder_webp_write_transcodes_and_records_album_state() {
5384        let mut manifest = Manifest::new();
5385        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5386        let plan = Plan {
5387            actions: vec![Action::WriteArtifact {
5388                kind: ArtifactKind::FolderWebp,
5389                path: "creator/album/cover.webp".to_owned(),
5390                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5391                hash: "wh".to_owned(),
5392                owner_id: "root".to_owned(),
5393                content: None,
5394            }],
5395        };
5396        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5397        let fs = MemFs::new();
5398
5399        let outcome = run_with_albums(
5400            &plan,
5401            &mut manifest,
5402            &mut albums,
5403            &[],
5404            &http,
5405            &fs,
5406            &StubFfmpeg::webp(),
5407            &RecordingClock::new(),
5408            &ExecOptions::default(),
5409        );
5410
5411        assert_eq!(outcome.artifacts_written, 1);
5412        assert_eq!(outcome.failed(), 0);
5413        // The MP4 was transcoded to WebP, not written verbatim.
5414        let written = fs.read_file("creator/album/cover.webp").unwrap();
5415        assert_ne!(written, b"mp4-bytes");
5416        assert!(written.starts_with(b"RIFF"));
5417        assert_eq!(
5418            albums.get("root").unwrap().folder_webp,
5419            Some(ArtifactState {
5420                path: "creator/album/cover.webp".to_owned(),
5421                hash: "wh".to_owned(),
5422            })
5423        );
5424    }
5425
5426    #[test]
5427    fn folder_mp4_write_keeps_the_source_verbatim() {
5428        let mut manifest = Manifest::new();
5429        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5430        let plan = Plan {
5431            actions: vec![Action::WriteArtifact {
5432                kind: ArtifactKind::FolderMp4,
5433                path: "creator/album/cover.mp4".to_owned(),
5434                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5435                hash: "mh".to_owned(),
5436                owner_id: "root".to_owned(),
5437                content: None,
5438            }],
5439        };
5440        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5441        let fs = MemFs::new();
5442
5443        let outcome = run_with_albums(
5444            &plan,
5445            &mut manifest,
5446            &mut albums,
5447            &[],
5448            &http,
5449            &fs,
5450            &StubFfmpeg::webp(),
5451            &RecordingClock::new(),
5452            &ExecOptions::default(),
5453        );
5454
5455        assert_eq!(outcome.artifacts_written, 1);
5456        assert_eq!(outcome.failed(), 0);
5457        // The raw MP4 is written byte-for-byte, never transcoded.
5458        assert_eq!(
5459            fs.read_file("creator/album/cover.mp4").unwrap(),
5460            b"mp4-bytes"
5461        );
5462        assert_eq!(
5463            albums.get("root").unwrap().folder_mp4,
5464            Some(ArtifactState {
5465                path: "creator/album/cover.mp4".to_owned(),
5466                hash: "mh".to_owned(),
5467            })
5468        );
5469    }
5470
5471    #[test]
5472    fn both_folder_covers_fetch_the_video_cover_once() {
5473        let mut manifest = Manifest::new();
5474        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5475        // `both` retention keeps cover.webp (transcoded) and cover.mp4 (raw) from
5476        // the one video_cover_url. FolderWebp sorts first and caches the fetched
5477        // source; FolderMp4 drains it, so the source is fetched exactly once.
5478        let plan = Plan {
5479            actions: vec![
5480                Action::WriteArtifact {
5481                    kind: ArtifactKind::FolderWebp,
5482                    path: "creator/album/cover.webp".to_owned(),
5483                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5484                    hash: "wh".to_owned(),
5485                    owner_id: "root".to_owned(),
5486                    content: None,
5487                },
5488                Action::WriteArtifact {
5489                    kind: ArtifactKind::FolderMp4,
5490                    path: "creator/album/cover.mp4".to_owned(),
5491                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5492                    hash: "mh".to_owned(),
5493                    owner_id: "root".to_owned(),
5494                    content: None,
5495                },
5496            ],
5497        };
5498        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5499        let fs = MemFs::new();
5500
5501        let outcome = run_with_albums(
5502            &plan,
5503            &mut manifest,
5504            &mut albums,
5505            &[],
5506            &http,
5507            &fs,
5508            &StubFfmpeg::webp(),
5509            &RecordingClock::new(),
5510            &ExecOptions::default(),
5511        );
5512
5513        assert_eq!(outcome.artifacts_written, 2);
5514        assert_eq!(outcome.failed(), 0);
5515        // Fetched exactly once despite two artifacts consuming it (#90 / #89).
5516        assert_eq!(http.count("root/video.mp4"), 1);
5517        // The webp is transcoded; the mp4 is the raw source verbatim.
5518        assert!(
5519            fs.read_file("creator/album/cover.webp")
5520                .unwrap()
5521                .starts_with(b"RIFF")
5522        );
5523        assert_eq!(
5524            fs.read_file("creator/album/cover.mp4").unwrap(),
5525            b"mp4-bytes"
5526        );
5527    }
5528
5529    #[test]
5530    fn folder_art_delete_clears_album_state() {
5531        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
5532        let mut manifest = Manifest::new();
5533        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5534        albums.insert(
5535            "root".to_owned(),
5536            AlbumArt {
5537                folder_jpg: Some(ArtifactState {
5538                    path: "creator/album/folder.jpg".to_owned(),
5539                    hash: "jh".to_owned(),
5540                }),
5541                folder_webp: None,
5542                folder_mp4: None,
5543            },
5544        );
5545        let plan = Plan {
5546            actions: vec![Action::DeleteArtifact {
5547                kind: ArtifactKind::FolderJpg,
5548                path: "creator/album/folder.jpg".to_owned(),
5549                owner_id: "root".to_owned(),
5550            }],
5551        };
5552
5553        let outcome = run_with_albums(
5554            &plan,
5555            &mut manifest,
5556            &mut albums,
5557            &[],
5558            &ScriptedHttp::new(),
5559            &fs,
5560            &StubFfmpeg::flac(),
5561            &RecordingClock::new(),
5562            &ExecOptions::default(),
5563        );
5564
5565        assert_eq!(outcome.artifacts_deleted, 1);
5566        assert!(!fs.exists("creator/album/folder.jpg"));
5567        // The album row had only the one kind, so it is pruned entirely.
5568        assert!(!albums.contains_key("root"));
5569    }
5570
5571    // ── Phase 9: playlist artifacts ─────────────────────────────────
5572
5573    #[test]
5574    fn playlist_write_uses_inline_content_and_records_state() {
5575        // A playlist body is generated, carried inline. With an empty manifest
5576        // and NO http routes, the write still succeeds — proving it skipped the
5577        // network — and records the playlist store keyed by the playlist id.
5578        let mut manifest = Manifest::new();
5579        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5580        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5581        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
5582        let plan = Plan {
5583            actions: vec![Action::WriteArtifact {
5584                kind: ArtifactKind::Playlist,
5585                path: "Road Trip.m3u8".to_owned(),
5586                source_url: String::new(),
5587                hash: "ph1".to_owned(),
5588                owner_id: "pl1".to_owned(),
5589                content: Some(body.to_owned()),
5590            }],
5591        };
5592        let fs = MemFs::new();
5593
5594        let outcome = run_full(
5595            &plan,
5596            &mut manifest,
5597            &mut albums,
5598            &mut playlists,
5599            &[],
5600            &ScriptedHttp::new(),
5601            &fs,
5602            &StubFfmpeg::flac(),
5603            &RecordingClock::new(),
5604            &ExecOptions::default(),
5605        );
5606
5607        assert_eq!(outcome.artifacts_written, 1);
5608        assert_eq!(outcome.failed(), 0);
5609        // The exact inline bytes were written, verbatim.
5610        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
5611        assert_eq!(
5612            playlists.get("pl1"),
5613            Some(&PlaylistState {
5614                name: "Road Trip".to_owned(),
5615                path: "Road Trip.m3u8".to_owned(),
5616                hash: "ph1".to_owned(),
5617            })
5618        );
5619    }
5620
5621    #[test]
5622    fn playlist_delete_removes_file_and_clears_state() {
5623        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
5624        let mut manifest = Manifest::new();
5625        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5626        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5627        playlists.insert(
5628            "pl1".to_owned(),
5629            PlaylistState {
5630                name: "Old".to_owned(),
5631                path: "Old.m3u8".to_owned(),
5632                hash: "ph1".to_owned(),
5633            },
5634        );
5635        let plan = Plan {
5636            actions: vec![Action::DeleteArtifact {
5637                kind: ArtifactKind::Playlist,
5638                path: "Old.m3u8".to_owned(),
5639                owner_id: "pl1".to_owned(),
5640            }],
5641        };
5642
5643        let outcome = run_full(
5644            &plan,
5645            &mut manifest,
5646            &mut albums,
5647            &mut playlists,
5648            &[],
5649            &ScriptedHttp::new(),
5650            &fs,
5651            &StubFfmpeg::flac(),
5652            &RecordingClock::new(),
5653            &ExecOptions::default(),
5654        );
5655
5656        assert_eq!(outcome.artifacts_deleted, 1);
5657        assert!(!fs.exists("Old.m3u8"));
5658        assert!(
5659            !playlists.contains_key("pl1"),
5660            "the playlist row is cleared on delete"
5661        );
5662    }
5663
5664    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
5665
5666    #[test]
5667    fn rename_move_relocates_cover_and_prunes_old_album() {
5668        // A title/album change moves the audio (Rename) and re-emits the cover
5669        // at the NEW path. The old cover must be removed and the now-empty old
5670        // album directory pruned, leaving no orphan sidecar and no ghost dir.
5671        let mut manifest = Manifest::new();
5672        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
5673        e.cover_jpg = Some(ArtifactState {
5674            path: "Creator/AlbumA/cover.jpg".to_owned(),
5675            hash: "h1".to_owned(),
5676        });
5677        manifest.insert("a", e);
5678        let fs = MemFs::new()
5679            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
5680            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
5681        let plan = Plan {
5682            actions: vec![
5683                Action::Rename {
5684                    from: "Creator/AlbumA/song.flac".to_owned(),
5685                    to: "Creator/AlbumB/song.flac".to_owned(),
5686                },
5687                Action::WriteArtifact {
5688                    kind: ArtifactKind::CoverJpg,
5689                    path: "Creator/AlbumB/cover.jpg".to_owned(),
5690                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5691                    hash: "h1".to_owned(),
5692                    owner_id: "a".to_owned(),
5693                    content: None,
5694                },
5695            ],
5696        };
5697        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
5698
5699        let outcome = run(
5700            &plan,
5701            &mut manifest,
5702            &[],
5703            &http,
5704            &fs,
5705            &StubFfmpeg::flac(),
5706            &RecordingClock::new(),
5707            &ExecOptions::default(),
5708        );
5709
5710        assert_eq!(outcome.failed(), 0);
5711        // Audio moved, the new cover was written, the old cover removed.
5712        assert!(fs.exists("Creator/AlbumB/song.flac"));
5713        assert_eq!(
5714            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
5715            b"new-jpg"
5716        );
5717        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
5718        assert!(!fs.exists("Creator/AlbumA/song.flac"));
5719        // The manifest cover slot now points at the new path.
5720        assert_eq!(
5721            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5722            "Creator/AlbumB/cover.jpg"
5723        );
5724        // The emptied old album directory is pruned; the new one survives.
5725        assert!(!fs.has_dir("Creator/AlbumA"));
5726        assert!(fs.has_dir("Creator/AlbumB"));
5727    }
5728
5729    #[test]
5730    fn rename_move_relocates_folder_art_and_prunes_old_album() {
5731        // An album rename moves folder.jpg: the old file is removed, the album
5732        // store slot advanced to the new path, and the emptied dir pruned.
5733        let mut manifest = Manifest::new();
5734        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5735        albums.insert(
5736            "root".to_owned(),
5737            AlbumArt {
5738                folder_jpg: Some(ArtifactState {
5739                    path: "Creator/AlbumA/folder.jpg".to_owned(),
5740                    hash: "jh".to_owned(),
5741                }),
5742                folder_webp: None,
5743                folder_mp4: None,
5744            },
5745        );
5746        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
5747        let plan = Plan {
5748            actions: vec![Action::WriteArtifact {
5749                kind: ArtifactKind::FolderJpg,
5750                path: "Creator/AlbumB/folder.jpg".to_owned(),
5751                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5752                hash: "jh".to_owned(),
5753                owner_id: "root".to_owned(),
5754                content: None,
5755            }],
5756        };
5757        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
5758
5759        let outcome = run_with_albums(
5760            &plan,
5761            &mut manifest,
5762            &mut albums,
5763            &[],
5764            &http,
5765            &fs,
5766            &StubFfmpeg::flac(),
5767            &RecordingClock::new(),
5768            &ExecOptions::default(),
5769        );
5770
5771        assert_eq!(outcome.failed(), 0);
5772        assert_eq!(
5773            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
5774            b"new-folder"
5775        );
5776        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
5777        assert_eq!(
5778            albums
5779                .get("root")
5780                .unwrap()
5781                .folder_jpg
5782                .as_ref()
5783                .unwrap()
5784                .path,
5785            "Creator/AlbumB/folder.jpg"
5786        );
5787        assert!(!fs.has_dir("Creator/AlbumA"));
5788        assert!(fs.has_dir("Creator/AlbumB"));
5789    }
5790
5791    #[test]
5792    fn prune_empty_dirs_removes_only_empty_dirs() {
5793        // A direct exercise of the prune port's safety guarantees on a mixed
5794        // tree: nested empties go, anything holding a file (hidden ones too)
5795        // stays, and no file is touched.
5796        let fs = MemFs::new()
5797            .with_file("keep/full/song.flac", b"x".to_vec())
5798            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
5799            .with_dir("empty/leaf")
5800            .with_dir("nested/a/b/c");
5801
5802        fs.prune_empty_dirs("").unwrap();
5803
5804        // Every empty directory, however deeply nested, is pruned bottom-up.
5805        for gone in [
5806            "empty",
5807            "empty/leaf",
5808            "nested",
5809            "nested/a",
5810            "nested/a/b",
5811            "nested/a/b/c",
5812        ] {
5813            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
5814        }
5815        // A directory holding any file — including only a hidden dotfile — stays.
5816        assert!(fs.has_dir("keep"));
5817        assert!(fs.has_dir("keep/full"));
5818        assert!(fs.has_dir("hidden"));
5819        // No file was touched.
5820        assert!(fs.exists("keep/full/song.flac"));
5821        assert!(fs.exists("hidden/.suno-manifest.json"));
5822    }
5823
5824    #[test]
5825    fn prune_empty_dirs_never_removes_the_named_root() {
5826        // Pruning under a named root clears its empty children but keeps the
5827        // root itself, even when the root is now empty.
5828        let fs = MemFs::new().with_dir("empty/leaf");
5829        fs.prune_empty_dirs("empty").unwrap();
5830        assert!(fs.has_dir("empty"), "the named root is never removed");
5831        assert!(!fs.has_dir("empty/leaf"));
5832    }
5833
5834    #[test]
5835    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
5836        // If removing the old sidecar fails, the write is a per-clip failure
5837        // that never aborts the run and does NOT advance the state slot, so the
5838        // next identical run re-attempts the cleanup and the tree converges.
5839        let mut manifest = Manifest::new();
5840        let mut e = entry("a.flac", AudioFormat::Flac);
5841        e.cover_jpg = Some(ArtifactState {
5842            path: "AlbumA/cover.jpg".to_owned(),
5843            hash: "h1".to_owned(),
5844        });
5845        manifest.insert("a", e);
5846        let fs = MemFs::new()
5847            .with_file("a.flac", b"AUDIO".to_vec())
5848            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5849        let plan = Plan {
5850            actions: vec![Action::WriteArtifact {
5851                kind: ArtifactKind::CoverJpg,
5852                path: "AlbumB/cover.jpg".to_owned(),
5853                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5854                hash: "h1".to_owned(),
5855                owner_id: "a".to_owned(),
5856                content: None,
5857            }],
5858        };
5859        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5860
5861        // Run 1: the old-cover remove is forced to fail.
5862        fs.arm_fail_remove("AlbumA/cover.jpg");
5863        let first = run(
5864            &plan,
5865            &mut manifest,
5866            &[],
5867            &http,
5868            &fs,
5869            &StubFfmpeg::flac(),
5870            &RecordingClock::new(),
5871            &ExecOptions::default(),
5872        );
5873        assert_eq!(
5874            first.status,
5875            RunStatus::Completed,
5876            "a remove failure never aborts the run"
5877        );
5878        assert_eq!(first.failed(), 1);
5879        // The new cover is written but the old one lingers and the slot is stale.
5880        assert!(fs.exists("AlbumB/cover.jpg"));
5881        assert!(fs.exists("AlbumA/cover.jpg"));
5882        assert_eq!(
5883            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5884            "AlbumA/cover.jpg"
5885        );
5886        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5887
5888        // Run 2: the same plan re-runs with the fault cleared and converges.
5889        fs.disarm_fail_remove("AlbumA/cover.jpg");
5890        let second = run(
5891            &plan,
5892            &mut manifest,
5893            &[],
5894            &http,
5895            &fs,
5896            &StubFfmpeg::flac(),
5897            &RecordingClock::new(),
5898            &ExecOptions::default(),
5899        );
5900        assert_eq!(second.failed(), 0);
5901        assert!(fs.exists("AlbumB/cover.jpg"));
5902        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5903        assert_eq!(
5904            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5905            "AlbumB/cover.jpg"
5906        );
5907        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5908    }
5909
5910    #[test]
5911    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
5912        // The idempotent case: a content-only cover rewrite (hash drift, path
5913        // unchanged) attempts no remove and prunes no live directory. A remove
5914        // failure is armed on the cover path, so any spurious remove would
5915        // surface as a failure — none does.
5916        let mut manifest = Manifest::new();
5917        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
5918        e.cover_jpg = Some(ArtifactState {
5919            path: "Album/cover.jpg".to_owned(),
5920            hash: "h1".to_owned(),
5921        });
5922        manifest.insert("a", e);
5923        let fs = MemFs::new()
5924            .with_file("Album/a.mp3", b"AUDIO".to_vec())
5925            .with_file("Album/cover.jpg", b"old".to_vec());
5926        fs.arm_fail_remove("Album/cover.jpg");
5927        let plan = Plan {
5928            actions: vec![Action::WriteArtifact {
5929                kind: ArtifactKind::CoverJpg,
5930                path: "Album/cover.jpg".to_owned(),
5931                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5932                hash: "h2".to_owned(),
5933                owner_id: "a".to_owned(),
5934                content: None,
5935            }],
5936        };
5937        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5938
5939        let outcome = run(
5940            &plan,
5941            &mut manifest,
5942            &[],
5943            &http,
5944            &fs,
5945            &StubFfmpeg::flac(),
5946            &RecordingClock::new(),
5947            &ExecOptions::default(),
5948        );
5949
5950        assert_eq!(
5951            outcome.failed(),
5952            0,
5953            "no remove is attempted, so the armed failure never fires"
5954        );
5955        assert_eq!(outcome.artifacts_written, 1);
5956        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
5957        assert_eq!(
5958            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
5959            "h2"
5960        );
5961        // The live directory is untouched by prune.
5962        assert!(fs.has_dir("Album"));
5963    }
5964
5965    // ── Concurrency (issue #22) ─────────────────────────────────────
5966
5967    mod concurrency {
5968        use super::*;
5969        use crate::ffmpeg::FfmpegError;
5970        use crate::fs::{FileStat, FsError};
5971        use crate::http::{HttpRequest, TransportError};
5972        use std::future::Future;
5973        use std::pin::Pin;
5974        use std::sync::Arc;
5975        use std::sync::atomic::{AtomicUsize, Ordering};
5976        use std::task::{Context, Poll};
5977
5978        /// A future that pends exactly once before resolving, waking itself so a
5979        /// single-threaded executor re-polls. It forces the [`Http`] port to
5980        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
5981        /// each in-flight request and the true overlap becomes observable.
5982        #[derive(Default)]
5983        struct YieldOnce {
5984            yielded: bool,
5985        }
5986
5987        impl Future for YieldOnce {
5988            type Output = ();
5989            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
5990                if self.yielded {
5991                    Poll::Ready(())
5992                } else {
5993                    self.yielded = true;
5994                    cx.waker().wake_by_ref();
5995                    Poll::Pending
5996                }
5997            }
5998        }
5999
6000        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
6001        /// number of concurrently in-flight requests. Each `send` bumps a live
6002        /// counter, yields once (so peers can start), then delegates.
6003        struct GatedHttp {
6004            inner: ScriptedHttp,
6005            inflight: Arc<AtomicUsize>,
6006            peak: Arc<AtomicUsize>,
6007        }
6008
6009        impl GatedHttp {
6010            fn new(inner: ScriptedHttp) -> Self {
6011                Self {
6012                    inner,
6013                    inflight: Arc::new(AtomicUsize::new(0)),
6014                    peak: Arc::new(AtomicUsize::new(0)),
6015                }
6016            }
6017
6018            fn peak(&self) -> usize {
6019                self.peak.load(Ordering::SeqCst)
6020            }
6021        }
6022
6023        impl Http for GatedHttp {
6024            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
6025                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
6026                self.peak.fetch_max(now, Ordering::SeqCst);
6027                YieldOnce::default().await;
6028                let out = self.inner.send(request).await;
6029                self.inflight.fetch_sub(1, Ordering::SeqCst);
6030                out
6031            }
6032        }
6033
6034        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
6035            let c = clip(id);
6036            let d = desired(c.clone(), format);
6037            let action = Action::Download {
6038                clip: c.clone(),
6039                lineage: LineageContext::own_root(&c),
6040                path: d.path.clone(),
6041                format,
6042            };
6043            (c, d, action)
6044        }
6045
6046        fn opts_with(concurrency: u32) -> ExecOptions {
6047            ExecOptions {
6048                concurrency,
6049                ..small_poll()
6050            }
6051        }
6052
6053        #[test]
6054        fn concurrency_never_exceeds_the_configured_bound() {
6055            let count = 6;
6056            let concurrency = 3;
6057            let mut scripted = ScriptedHttp::new().with_auth();
6058            let mut actions = Vec::new();
6059            let mut desireds = Vec::new();
6060            for i in 0..count {
6061                let id = format!("c{i}");
6062                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6063                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6064                actions.push(action);
6065                desireds.push(d);
6066            }
6067            let http = GatedHttp::new(scripted);
6068            let fs = MemFs::new();
6069            let plan = Plan { actions };
6070            let mut manifest = Manifest::new();
6071
6072            let outcome = run_gated_fs(
6073                &plan,
6074                &mut manifest,
6075                &desireds,
6076                &http,
6077                &fs,
6078                &opts_with(concurrency),
6079            );
6080
6081            assert_eq!(outcome.downloaded, count);
6082            assert!(
6083                http.peak() <= concurrency as usize,
6084                "peak {} exceeded the bound {concurrency}",
6085                http.peak()
6086            );
6087            assert_eq!(
6088                http.peak(),
6089                concurrency as usize,
6090                "expected the run to saturate the bound"
6091            );
6092        }
6093
6094        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
6095        /// outcome. The client is built here so the limiter can be inspected by
6096        /// the caller-facing variant below.
6097        fn run_gated_fs(
6098            plan: &Plan,
6099            manifest: &mut Manifest,
6100            desired: &[Desired],
6101            http: &GatedHttp,
6102            fs: &MemFs,
6103            opts: &ExecOptions,
6104        ) -> ExecOutcome {
6105            let ffmpeg = StubFfmpeg::flac();
6106            let clock = RecordingClock::new();
6107            let mut albums = BTreeMap::new();
6108            let mut playlists = BTreeMap::new();
6109            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6110            pollster::block_on(execute(
6111                plan,
6112                manifest,
6113                &mut albums,
6114                &mut playlists,
6115                desired,
6116                &HashMap::new(),
6117                Ports {
6118                    client: &mut client,
6119                    http,
6120                    fs,
6121                    ffmpeg: &ffmpeg,
6122                    clock: &clock,
6123                },
6124                opts,
6125            ))
6126        }
6127
6128        #[test]
6129        fn a_failing_clip_does_not_abort_the_others() {
6130            let mut scripted = ScriptedHttp::new().with_auth();
6131            scripted = scripted
6132                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
6133                .route("bad.mp3", Reply::status(404))
6134                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
6135            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
6136            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
6137            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
6138            let http = GatedHttp::new(scripted);
6139            let fs = MemFs::new();
6140            let plan = Plan {
6141                actions: vec![a1, a2, a3],
6142            };
6143            let mut manifest = Manifest::new();
6144
6145            let outcome = run_gated_fs(
6146                &plan,
6147                &mut manifest,
6148                &[d1, d2, d3],
6149                &http,
6150                &fs,
6151                &opts_with(3),
6152            );
6153
6154            assert_eq!(outcome.downloaded, 2);
6155            assert_eq!(outcome.failed(), 1);
6156            assert_eq!(outcome.status, RunStatus::Completed);
6157            assert_eq!(outcome.failures[0].clip_id, "bad");
6158            assert!(manifest.get("ok1").is_some());
6159            assert!(manifest.get("ok2").is_some());
6160            assert!(manifest.get("bad").is_none());
6161        }
6162
6163        #[test]
6164        fn outcome_is_identical_across_concurrency_levels() {
6165            // A plan mixing successful and failing downloads with serial phase-2
6166            // actions (a skip and a delete), so both phases contribute.
6167            fn build() -> (Plan, Vec<Desired>) {
6168                let mut actions = Vec::new();
6169                let mut desireds = Vec::new();
6170                for id in ["a", "b", "c", "d"] {
6171                    let (_c, d, action) = download(id, AudioFormat::Mp3);
6172                    actions.push(action);
6173                    desireds.push(d);
6174                }
6175                // A failing download in the middle of the audio set.
6176                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
6177                actions.insert(2, ae);
6178                desireds.push(de);
6179                // Phase-2 actions.
6180                actions.push(Action::Skip {
6181                    clip_id: "gone".to_owned(),
6182                });
6183                actions.push(Action::Delete {
6184                    path: "old.mp3".to_owned(),
6185                    clip_id: "old".to_owned(),
6186                });
6187                (Plan { actions }, desireds)
6188            }
6189
6190            fn http() -> ScriptedHttp {
6191                ScriptedHttp::new()
6192                    .with_auth()
6193                    .route("a.mp3", Reply::ok(b"a".to_vec()))
6194                    .route("b.mp3", Reply::ok(b"b".to_vec()))
6195                    .route("c.mp3", Reply::ok(b"c".to_vec()))
6196                    .route("d.mp3", Reply::ok(b"d".to_vec()))
6197                    .route("fail.mp3", Reply::status(404))
6198            }
6199
6200            fn seed_manifest() -> Manifest {
6201                let mut m = Manifest::new();
6202                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
6203                m
6204            }
6205
6206            let (plan, desireds) = build();
6207
6208            let mut m1 = seed_manifest();
6209            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6210            let out1 = run_gated_fs(
6211                &plan,
6212                &mut m1,
6213                &desireds,
6214                &GatedHttp::new(http()),
6215                &fs1,
6216                &opts_with(1),
6217            );
6218
6219            let mut m8 = seed_manifest();
6220            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6221            let out8 = run_gated_fs(
6222                &plan,
6223                &mut m8,
6224                &desireds,
6225                &GatedHttp::new(http()),
6226                &fs8,
6227                &opts_with(8),
6228            );
6229
6230            assert_eq!(out1, out8, "outcome must not depend on concurrency");
6231            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
6232            assert_eq!(out8.downloaded, 4);
6233            assert_eq!(out8.deleted, 1);
6234            assert_eq!(out8.skipped, 1);
6235            assert_eq!(out8.failed(), 1);
6236        }
6237
6238        #[test]
6239        fn a_systemic_disk_full_aborts_promptly() {
6240            let count = 8;
6241            let concurrency = 2;
6242            let mut scripted = ScriptedHttp::new().with_auth();
6243            let mut actions = Vec::new();
6244            let mut desireds = Vec::new();
6245            for i in 0..count {
6246                let id = format!("d{i}");
6247                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6248                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6249                actions.push(action);
6250                desireds.push(d);
6251            }
6252            // The very first clip's write hits ENOSPC, a systemic failure.
6253            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
6254            let http = GatedHttp::new(scripted);
6255            let plan = Plan { actions };
6256            let mut manifest = Manifest::new();
6257
6258            let outcome = run_gated_fs(
6259                &plan,
6260                &mut manifest,
6261                &desireds,
6262                &http,
6263                &fs,
6264                &opts_with(concurrency),
6265            );
6266
6267            assert_eq!(outcome.status, RunStatus::DiskFull);
6268            assert!(
6269                outcome.downloaded < count,
6270                "a systemic abort must stop remaining work, downloaded {}",
6271                outcome.downloaded
6272            );
6273        }
6274
6275        #[test]
6276        fn limiter_records_a_rate_limit_under_concurrent_calls() {
6277            // Three concurrent FLAC renders; exactly one clip is throttled once
6278            // on its wav_file read. The shared limiter must record that single
6279            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
6280            // the mutex keeps the AIMD state correct under concurrency.
6281            let scripted = ScriptedHttp::new()
6282                .with_auth()
6283                .route_seq(
6284                    "/gen/x/wav_file/",
6285                    vec![
6286                        Reply::status(429),
6287                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
6288                    ],
6289                )
6290                .route(
6291                    "/gen/y/wav_file/",
6292                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
6293                )
6294                .route(
6295                    "/gen/z/wav_file/",
6296                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
6297                )
6298                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
6299                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
6300                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
6301
6302            let mut actions = Vec::new();
6303            let mut desireds = Vec::new();
6304            for id in ["x", "y", "z"] {
6305                let (_c, d, action) = download(id, AudioFormat::Flac);
6306                actions.push(action);
6307                desireds.push(d);
6308            }
6309            let plan = Plan { actions };
6310            let fs = MemFs::new();
6311            let ffmpeg = StubFfmpeg::flac();
6312            let clock = RecordingClock::new();
6313            let mut albums = BTreeMap::new();
6314            let mut playlists = BTreeMap::new();
6315            let mut manifest = Manifest::new();
6316            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6317
6318            let outcome = pollster::block_on(execute(
6319                &plan,
6320                &mut manifest,
6321                &mut albums,
6322                &mut playlists,
6323                &desireds,
6324                &HashMap::new(),
6325                Ports {
6326                    client: &mut client,
6327                    http: &scripted,
6328                    fs: &fs,
6329                    ffmpeg: &ffmpeg,
6330                    clock: &clock,
6331                },
6332                &opts_with(3),
6333            ));
6334
6335            assert_eq!(outcome.downloaded, 3);
6336            assert_eq!(outcome.failed(), 0);
6337            assert!(
6338                (client.limiter_rate() - 1.0).abs() < 1e-9,
6339                "one 429 must halve the rate to 1.0, got {}",
6340                client.limiter_rate()
6341            );
6342        }
6343
6344        #[test]
6345        fn a_download_is_committed_in_plan_order_around_a_rename() {
6346            // Plan order: rename "orig" away from shared.mp3 first, then download
6347            // a new clip into shared.mp3. A parallel executor that performed the
6348            // download's destination write off plan order would write shared.mp3
6349            // before the rename ran, letting the rename carry those fresh bytes
6350            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
6351            // Committing every destination effect serially in plan order keeps
6352            // moved.mp3 = the original and shared.mp3 = the new download.
6353            let c_new = clip("new");
6354            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
6355            d_new.path = "shared.mp3".to_owned();
6356            let plan = Plan {
6357                actions: vec![
6358                    Action::Rename {
6359                        from: "shared.mp3".to_owned(),
6360                        to: "moved.mp3".to_owned(),
6361                    },
6362                    Action::Download {
6363                        clip: c_new.clone(),
6364                        lineage: LineageContext::own_root(&c_new),
6365                        path: "shared.mp3".to_owned(),
6366                        format: AudioFormat::Mp3,
6367                    },
6368                ],
6369            };
6370            let scripted = ScriptedHttp::new()
6371                .with_auth()
6372                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
6373            let http = GatedHttp::new(scripted);
6374            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
6375            let mut manifest = Manifest::new();
6376            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
6377
6378            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
6379
6380            assert_eq!(outcome.renamed, 1);
6381            assert_eq!(outcome.downloaded, 1);
6382            assert_eq!(
6383                fs.read_file("moved.mp3").as_deref(),
6384                Some(&b"ORIGINAL"[..]),
6385                "the rename must carry the original bytes, untouched by the download"
6386            );
6387            let landed = fs.read_file("shared.mp3").expect("new download must land");
6388            assert_ne!(
6389                landed, b"ORIGINAL",
6390                "the new download must replace the moved original, not corrupt it"
6391            );
6392            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
6393            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
6394        }
6395
6396        #[test]
6397        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
6398            // A systemic disk-full abort strikes the download committed before the
6399            // reformat. Because the reformat's slow render is side-effect-free and
6400            // its destination write + old-file removal only happen in the serial
6401            // commit (which the abort skips), the old file survives and the
6402            // manifest still points at it: no removed-but-referenced file.
6403            let boom = clip("boom");
6404            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
6405            d_boom.path = "boom.mp3".to_owned();
6406            let reformer = clip("r");
6407            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
6408            let plan = Plan {
6409                actions: vec![
6410                    Action::Download {
6411                        clip: boom.clone(),
6412                        lineage: LineageContext::own_root(&boom),
6413                        path: "boom.mp3".to_owned(),
6414                        format: AudioFormat::Mp3,
6415                    },
6416                    Action::Reformat {
6417                        clip: reformer.clone(),
6418                        path: "r_new.mp3".to_owned(),
6419                        from_path: "r_old.flac".to_owned(),
6420                        from: AudioFormat::Flac,
6421                        to: AudioFormat::Mp3,
6422                    },
6423                ],
6424            };
6425            let scripted = ScriptedHttp::new()
6426                .with_auth()
6427                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
6428                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
6429            let http = GatedHttp::new(scripted);
6430            // The download's write hits ENOSPC, a systemic abort.
6431            let fs = MemFs::new()
6432                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
6433                .fail_write_out_of_space("boom.mp3");
6434            let mut manifest = Manifest::new();
6435            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
6436
6437            let outcome = run_gated_fs(
6438                &plan,
6439                &mut manifest,
6440                &[d_boom, d_reformer],
6441                &http,
6442                &fs,
6443                &opts_with(4),
6444            );
6445
6446            assert_eq!(outcome.status, RunStatus::DiskFull);
6447            assert!(
6448                fs.exists("r_old.flac"),
6449                "the old file must survive the abort"
6450            );
6451            assert!(
6452                !fs.exists("r_new.mp3"),
6453                "no reformatted file may be written"
6454            );
6455            let still = manifest.get("r").expect("the manifest must still track r");
6456            assert_eq!(
6457                still.path, "r_old.flac",
6458                "the manifest must still point at the surviving old file"
6459            );
6460            assert_eq!(still.format, AudioFormat::Flac);
6461        }
6462
6463        #[test]
6464        fn a_systemic_abort_leaves_no_untracked_destination_files() {
6465            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
6466            // and the rest never commit. Every file remaining on disk must be one
6467            // the manifest tracks: producers write nothing, so an abort cannot
6468            // strand an untracked file from an in-flight or buffered render.
6469            let mut scripted = ScriptedHttp::new().with_auth();
6470            let mut actions = Vec::new();
6471            let mut desireds = Vec::new();
6472            for id in ["a0", "a1", "boom", "a3", "a4"] {
6473                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
6474                let (_c, d, action) = download(id, AudioFormat::Mp3);
6475                actions.push(action);
6476                desireds.push(d);
6477            }
6478            let http = GatedHttp::new(scripted);
6479            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
6480            let plan = Plan { actions };
6481            let mut manifest = Manifest::new();
6482
6483            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
6484
6485            assert_eq!(outcome.status, RunStatus::DiskFull);
6486            let tracked: std::collections::BTreeSet<String> = manifest
6487                .entries
6488                .values()
6489                .map(|entry| entry.path.clone())
6490                .collect();
6491            for path in fs.paths() {
6492                assert!(
6493                    tracked.contains(&path),
6494                    "found an untracked destination file: {path}"
6495                );
6496            }
6497            assert!(
6498                !fs.exists("a3.mp3"),
6499                "uncommitted renders must not be on disk"
6500            );
6501            assert!(
6502                !fs.exists("a4.mp3"),
6503                "uncommitted renders must not be on disk"
6504            );
6505        }
6506
6507        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
6508        /// live: it bumps a shared counter (tracking the peak) when a transcode
6509        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
6510        /// The [transcode, write] window is a superset of the true in-memory hold,
6511        /// so the observed peak upper-bounds the real one.
6512        struct CountingFfmpeg {
6513            inner: StubFfmpeg,
6514            held: Arc<AtomicUsize>,
6515            peak: Arc<AtomicUsize>,
6516        }
6517
6518        impl Ffmpeg for CountingFfmpeg {
6519            fn wav_to_flac(
6520                &self,
6521                wav: &[u8],
6522            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6523                let fut = self.inner.wav_to_flac(wav);
6524                let held = self.held.clone();
6525                let peak = self.peak.clone();
6526                async move {
6527                    let out = fut.await;
6528                    if out.is_ok() {
6529                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
6530                        peak.fetch_max(now, Ordering::SeqCst);
6531                    }
6532                    out
6533                }
6534            }
6535
6536            fn mp4_to_webp(
6537                &self,
6538                mp4: &[u8],
6539                settings: WebpEncodeSettings,
6540            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6541                self.inner.mp4_to_webp(mp4, settings)
6542            }
6543        }
6544
6545        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
6546        /// payload counter on each committing write, closing the window opened by
6547        /// [`CountingFfmpeg`].
6548        struct CountingFs {
6549            inner: MemFs,
6550            held: Arc<AtomicUsize>,
6551        }
6552
6553        impl Filesystem for CountingFs {
6554            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
6555                let out = self.inner.write_atomic(path, bytes);
6556                self.held.fetch_sub(1, Ordering::SeqCst);
6557                out
6558            }
6559
6560            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
6561                self.inner.rename(from, to)
6562            }
6563
6564            fn remove(&self, path: &str) -> Result<(), FsError> {
6565                self.inner.remove(path)
6566            }
6567
6568            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
6569                self.inner.prune_empty_dirs(root)
6570            }
6571
6572            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
6573                self.inner.read(path)
6574            }
6575
6576            fn metadata(&self, path: &str) -> Option<FileStat> {
6577                self.inner.metadata(path)
6578            }
6579        }
6580
6581        #[test]
6582        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
6583            // Far more FLAC clips than the concurrency bound. The ordered buffered
6584            // render keeps at most about `concurrency` transcoded payloads live at
6585            // once (never the whole library), so peak held <= concurrency + 1.
6586            let count = 12;
6587            let concurrency = 3;
6588            let mut scripted = ScriptedHttp::new().with_auth();
6589            let mut actions = Vec::new();
6590            let mut desireds = Vec::new();
6591            for i in 0..count {
6592                let id = format!("f{i}");
6593                scripted = scripted
6594                    .route(
6595                        &format!("/gen/{id}/wav_file/"),
6596                        Reply::json(&format!(
6597                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
6598                        )),
6599                    )
6600                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
6601                let (_c, d, action) = download(&id, AudioFormat::Flac);
6602                actions.push(action);
6603                desireds.push(d);
6604            }
6605            let http = GatedHttp::new(scripted);
6606            let held = Arc::new(AtomicUsize::new(0));
6607            let peak = Arc::new(AtomicUsize::new(0));
6608            let ffmpeg = CountingFfmpeg {
6609                inner: StubFfmpeg::flac(),
6610                held: held.clone(),
6611                peak: peak.clone(),
6612            };
6613            let fs = CountingFs {
6614                inner: MemFs::new(),
6615                held: held.clone(),
6616            };
6617            let clock = RecordingClock::new();
6618            let mut albums = BTreeMap::new();
6619            let mut playlists = BTreeMap::new();
6620            let mut manifest = Manifest::new();
6621            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6622            let plan = Plan { actions };
6623
6624            let outcome = pollster::block_on(execute(
6625                &plan,
6626                &mut manifest,
6627                &mut albums,
6628                &mut playlists,
6629                &desireds,
6630                &HashMap::new(),
6631                Ports {
6632                    client: &mut client,
6633                    http: &http,
6634                    fs: &fs,
6635                    ffmpeg: &ffmpeg,
6636                    clock: &clock,
6637                },
6638                &opts_with(concurrency),
6639            ));
6640
6641            assert_eq!(outcome.downloaded, count as usize);
6642            assert_eq!(
6643                held.load(Ordering::SeqCst),
6644                0,
6645                "every payload must be committed"
6646            );
6647            assert!(
6648                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
6649                "peak live payloads {} exceeded the bound {}",
6650                peak.load(Ordering::SeqCst),
6651                concurrency + 1
6652            );
6653            assert!(
6654                peak.load(Ordering::SeqCst) >= 2,
6655                "the render should genuinely overlap, peak was {}",
6656                peak.load(Ordering::SeqCst)
6657            );
6658        }
6659    }
6660}