Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3, tag_wav};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` is the only `&mut` port: it performs the authenticated WAV render
158/// flow and so mutates its cached session. The rest are shared references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a mut SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// The audio-producing actions ([`Download`](Action::Download) and
186/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
187/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
188/// transcode, tag) overlap while the order-sensitive Suno API calls are
189/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
190/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
191/// rename, delete, and artifact writes/deletes) then run serially in plan order.
192///
193/// The outcome is deterministic regardless of completion order: concurrent audio
194/// results are committed to the manifest in plan-index order, so the same plan
195/// always yields the same manifest and counts whatever the concurrency level. A
196/// per-clip failure is recorded and the run continues; only an auth failure or a
197/// full disk aborts, and it does so promptly by stopping further audio work.
198///
199/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
200/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
201/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
202/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
203/// run with the feature off) is tagged exactly as before. The synced `.lrc`
204/// sidecar itself is a generated artifact whose body the caller has already
205/// resolved into the plan, so it is written like any other text sidecar.
206#[allow(clippy::too_many_arguments)]
207pub async fn execute<H, F, G, C>(
208    plan: &Plan,
209    manifest: &mut Manifest,
210    albums: &mut BTreeMap<String, AlbumArt>,
211    playlists: &mut BTreeMap<String, PlaylistState>,
212    desired: &[Desired],
213    synced: &HashMap<String, AlignedLyrics>,
214    ports: Ports<'_, H, F, G, C>,
215    opts: &ExecOptions,
216) -> ExecOutcome
217where
218    H: Http,
219    F: Filesystem,
220    G: Ffmpeg,
221    C: Clock,
222{
223    let Ports {
224        client,
225        http,
226        fs,
227        ffmpeg,
228        clock,
229    } = ports;
230    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
231    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
232    // How many tracked artifact slots reference each path. The inline old-path
233    // cleanup removes a path only once nothing else holds it: each slot that
234    // moves away decrements its reference, and the removal fires only when the
235    // count reaches zero and no action writes the path this run. This keeps a
236    // live file a co-referencing slot still owns (a prior failed swap can leave
237    // two clips sharing a path) while letting the last slot to leave reclaim it,
238    // so nothing is orphaned either (#76).
239    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
240    for (_, entry) in manifest.iter() {
241        for path in entry.artifact_paths() {
242            *tracked_paths.entry(path.to_owned()).or_default() += 1;
243        }
244    }
245    for art in albums.values() {
246        for state in [
247            art.folder_jpg.as_ref(),
248            art.folder_webp.as_ref(),
249            art.folder_mp4.as_ref(),
250        ]
251        .into_iter()
252        .flatten()
253        {
254            *tracked_paths.entry(state.path.clone()).or_default() += 1;
255        }
256    }
257    for playlist in playlists.values() {
258        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
259    }
260    // Static cover art is otherwise fetched twice per clip (#89): once to embed
261    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
262    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
263    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
264    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
265    // its entry on use, so the map holds at most the covers for the clips in
266    // flight (bounded by `concurrency`), never the whole library.
267    let cover_wanted: HashSet<&str> = plan
268        .actions
269        .iter()
270        .filter_map(|action| match action {
271            Action::WriteArtifact {
272                kind: ArtifactKind::CoverJpg,
273                source_url,
274                ..
275            } if !source_url.is_empty() => Some(source_url.as_str()),
276            _ => None,
277        })
278        .collect();
279    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
280    // The `both` video-cover retention keeps `cover.webp` (transcoded) and
281    // `cover.mp4` (raw) for an album from the SAME `video_cover_url`. Cache that
282    // source on its first fetch so the second folder artifact drains it rather
283    // than fetching the same MP4 twice (#90 reuses the #89 fetch-once path).
284    let mut folder_cover_uses: HashMap<&str, u32> = HashMap::new();
285    for action in &plan.actions {
286        if let Action::WriteArtifact {
287            kind: ArtifactKind::FolderWebp | ArtifactKind::FolderMp4,
288            source_url,
289            ..
290        } = action
291            && !source_url.is_empty()
292        {
293            *folder_cover_uses.entry(source_url.as_str()).or_default() += 1;
294        }
295    }
296    let shared_cover_urls: HashSet<&str> = folder_cover_uses
297        .into_iter()
298        .filter(|(_, uses)| *uses > 1)
299        .map(|(url, _)| url)
300        .collect();
301    let ctx = Ctx {
302        http,
303        fs,
304        ffmpeg,
305        clock,
306        opts,
307        by_id: &by_id,
308        by_path: &by_path,
309        synced,
310        cover_cache: &cover_cache,
311        cover_wanted: &cover_wanted,
312        shared_cover_urls: &shared_cover_urls,
313    };
314
315    let mut outcome = ExecOutcome::default();
316    // Destinations whose write has actually committed this run, gating old-path
317    // cleanup so a vacated sidecar/stem is kept only when a *successful* write
318    // also targets it (#142). Serial commit order makes this a clean prefix.
319    let mut committed: BTreeSet<String> = BTreeSet::new();
320
321    // The audio-producing actions ([`Download`](Action::Download) /
322    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
323    // deliberately split so that NO destination write, file removal, or manifest
324    // update happens off the plan's order:
325    //
326    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
327    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
328    //   tag), returning the tagged bytes; and
329    // - a single serial committer below writes those bytes to the destination,
330    //   removes any superseded file, and records the manifest entry, in strict
331    //   plan-index order, interleaved with the non-audio actions.
332    //
333    // The shared client is the only `&mut` port and its API calls must stay
334    // ordered, so it rides behind an async mutex; each producer locks it only for
335    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
336    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
337    // so at most about `concurrency` tagged payloads are ever held in memory -
338    // never the whole library.
339    let client_lock = AsyncMutex::new(client);
340    let concurrency = opts.concurrency.max(1) as usize;
341    let ctx_ref = &ctx;
342    let client_lock_ref = &client_lock;
343    let mut renders = stream::iter(
344        plan.actions
345            .iter()
346            .filter(|action| is_audio_action(action))
347            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
348    )
349    .buffered(concurrency);
350
351    for action in &plan.actions {
352        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
353        // commit them here; every other action applies its own effect. Both the
354        // audio commit and the non-audio apply run serially, so all destination
355        // and manifest effects keep the plan's order exactly as the sequential
356        // executor did.
357        let result = if is_audio_action(action) {
358            match renders.next().await {
359                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
360                Some(Err(fail)) => Err(fail),
361                None => unreachable!("buffered yields one result per audio action"),
362            }
363        } else {
364            ctx.apply(
365                client_lock_ref,
366                action,
367                manifest,
368                albums,
369                playlists,
370                &mut tracked_paths,
371                &committed,
372            )
373            .await
374        };
375        match result {
376            Ok(effect) => {
377                outcome.record(effect);
378                // Record this action's destination now that its write succeeded.
379                // A later action vacating a path removes it only when no
380                // *committed* write also targets it; commit is strictly serial in
381                // plan order, so a planned-but-failed or not-yet-run write never
382                // protects a stale file from cleanup (#142).
383                if let Some(dest) = written_path(action) {
384                    committed.insert(dest.to_owned());
385                }
386            }
387            Err(fail) => {
388                let abort = abort_status(fail.class);
389                outcome.failures.push(Failure {
390                    clip_id: fail.clip_id,
391                    reason: fail.reason,
392                });
393                if let Some(status) = abort {
394                    // A systemic abort stops the run. Dropping the render stream
395                    // cancels any in-flight or completed-but-uncommitted producer;
396                    // because producers touch nothing on disk, the destination and
397                    // manifest are left exactly as the committed prefix wrote them,
398                    // with no untracked files and no removed-but-referenced file.
399                    outcome.status = status;
400                    break;
401                }
402            }
403        }
404    }
405    drop(renders);
406
407    // Renames and deletes can leave an album directory empty; prune those ghost
408    // directories bottom-up. This runs on both the completed and the aborted
409    // paths, and is best-effort: a prune failure is only a missed tidy that the
410    // next run repeats, never a reason to fail the run.
411    let _ = fs.prune_empty_dirs("");
412    outcome
413}
414
415/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
416/// file. Its slow render runs in the concurrent phase; its destination write and
417/// manifest update are committed serially in plan order. Everything else touches
418/// the manifest, album, or playlist stores directly and runs serially.
419fn is_audio_action(action: &Action) -> bool {
420    matches!(action, Action::Download { .. } | Action::Reformat { .. })
421}
422
423/// The destination path an action writes on success, or `None` for actions that
424/// write no file (skips, deletes). The serial committer records this once the
425/// action succeeds, so a later action vacating that same path keeps it rather
426/// than removing a freshly written file (#142, #76).
427fn written_path(action: &Action) -> Option<&str> {
428    match action {
429        Action::Download { path, .. }
430        | Action::Reformat { path, .. }
431        | Action::WriteArtifact { path, .. }
432        | Action::WriteStem { path, .. } => Some(path),
433        Action::Rename { to, .. }
434        | Action::MoveArtifact { to, .. }
435        | Action::MoveStem { to, .. } => Some(to),
436        _ => None,
437    }
438}
439
440/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
441/// committer needs to place them. Produced concurrently and side-effect-free (no
442/// destination write, no removal, no manifest touch); [`commit_audio`] applies
443/// all of those in plan order.
444struct RenderedAudio {
445    clip_id: String,
446    path: String,
447    format: AudioFormat,
448    /// The superseded file to remove after the new one lands (a [`Reformat`]),
449    /// or `None` for a plain [`Download`].
450    from_path: Option<String>,
451    effect: Effect,
452    bytes: Vec<u8>,
453}
454
455/// What an applied action did, for the outcome counters.
456enum Effect {
457    Downloaded,
458    Reformatted,
459    Retagged,
460    Renamed,
461    Deleted,
462    Skipped,
463    ArtifactWritten,
464    ArtifactDeleted,
465}
466
467/// How a failure should be handled (SYNC-17).
468#[derive(Debug, Clone, Copy)]
469enum Class {
470    /// Stop the account run; do not retry.
471    Auth,
472    /// Stop the account run: a full disk is systemic, like auth, so aborting
473    /// beats skipping every remaining clip (each of which would first burn a
474    /// server-side WAV-render budget before failing the same way).
475    Disk,
476    /// Retry a bounded number of times, then record and skip.
477    Transient,
478    /// Record and skip immediately.
479    Permanent,
480}
481
482/// A classified action failure attributed to a clip.
483struct Fail {
484    class: Class,
485    clip_id: String,
486    reason: String,
487}
488
489/// The run-ending status for a failure class, or `None` when the failure is
490/// per-clip and the run continues.
491fn abort_status(class: Class) -> Option<RunStatus> {
492    match class {
493        Class::Auth => Some(RunStatus::AuthAborted),
494        Class::Disk => Some(RunStatus::DiskFull),
495        Class::Transient | Class::Permanent => None,
496    }
497}
498
499fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
500    Fail {
501        class: Class::Auth,
502        clip_id: clip_id.into(),
503        reason: reason.into(),
504    }
505}
506
507fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
508    Fail {
509        class: Class::Transient,
510        clip_id: clip_id.into(),
511        reason: reason.into(),
512    }
513}
514
515fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
516    Fail {
517        class: Class::Permanent,
518        clip_id: clip_id.into(),
519        reason: reason.into(),
520    }
521}
522
523fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
524    Fail {
525        class: Class::Disk,
526        clip_id: clip_id.into(),
527        reason: reason.into(),
528    }
529}
530
531/// Whether an artifact kind is album-scoped folder art (owned by a root id and
532/// recorded on the album store) rather than a per-clip sidecar (recorded on the
533/// manifest).
534fn is_album_kind(kind: ArtifactKind) -> bool {
535    matches!(
536        kind,
537        ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4
538    )
539}
540
541/// True for the library-scoped playlist artifact, routed to the playlist store.
542fn is_playlist_kind(kind: ArtifactKind) -> bool {
543    matches!(kind, ArtifactKind::Playlist)
544}
545
546/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
547/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
548/// root/playlist id that is deliberately absent from the manifest.
549fn is_per_clip_kind(kind: ArtifactKind) -> bool {
550    matches!(
551        kind,
552        ArtifactKind::CoverJpg
553            | ArtifactKind::CoverWebp
554            | ArtifactKind::DetailsTxt
555            | ArtifactKind::LyricsTxt
556            | ArtifactKind::Lrc
557            | ArtifactKind::VideoMp4
558    )
559}
560
561/// Recover a playlist's display name from its `.m3u8` path's file stem.
562///
563/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
564/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
565/// this recovered name is a convenience for humans and its lossiness (the
566/// sanitiser is not reversible) never affects a decision.
567fn playlist_name_from_path(path: &str) -> String {
568    std::path::Path::new(path)
569        .file_stem()
570        .map(|stem| stem.to_string_lossy().into_owned())
571        .unwrap_or_default()
572}
573
574/// A classified fetch failure, not yet attributed to a clip.
575struct FetchError {
576    class: Class,
577    reason: String,
578    retry_after: Option<Duration>,
579}
580
581impl FetchError {
582    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
583        Self {
584            class: Class::Transient,
585            reason: reason.into(),
586            retry_after,
587        }
588    }
589
590    fn permanent(reason: impl Into<String>) -> Self {
591        Self {
592            class: Class::Permanent,
593            reason: reason.into(),
594            retry_after: None,
595        }
596    }
597
598    fn attribute(self, clip_id: &str) -> Fail {
599        Fail {
600            class: self.class,
601            clip_id: clip_id.to_owned(),
602            reason: self.reason,
603        }
604    }
605}
606
607/// The shared, read-only context threaded through every action handler.
608struct Ctx<'a, H, F, G, C> {
609    http: &'a H,
610    fs: &'a F,
611    ffmpeg: &'a G,
612    clock: &'a C,
613    opts: &'a ExecOptions,
614    by_id: &'a HashMap<&'a str, &'a Desired>,
615    by_path: &'a HashMap<&'a str, &'a Desired>,
616    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
617    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
618    /// lyric text; a clip absent here is tagged exactly as before. Populated by
619    /// the caller (the fetch is IO), so the engine stays free of direct IO.
620    synced: &'a HashMap<String, AlignedLyrics>,
621    /// Static cover art the audio producer already fetched to embed in the tag,
622    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
623    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
624    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
625    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
626    /// concurrent producers only ever insert, and the lock is never held across an
627    /// await.
628    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
629    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
630    /// a cover only when its URL is here, so a clip whose cover is embedded but
631    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
632    cover_wanted: &'a HashSet<&'a str>,
633    /// Album video-cover source URLs fetched by more than one folder artifact
634    /// this run. The `both` retention derives `cover.webp` (transcoded) and
635    /// `cover.mp4` (raw) from the SAME `video_cover_url`; the first fetch caches
636    /// the raw source here so the sibling drains it instead of re-fetching (#90
637    /// reuses the #89 fetch-once path). `FolderWebp` sorts before `FolderMp4`, so
638    /// the raw source is always cached before the raw sidecar reads it.
639    shared_cover_urls: &'a HashSet<&'a str>,
640}
641
642impl<H, F, G, C> Ctx<'_, H, F, G, C>
643where
644    H: Http,
645    F: Filesystem,
646    G: Ffmpeg,
647    C: Clock,
648{
649    /// Apply one non-audio action, returning what it did or why it failed.
650    ///
651    /// Audio actions ([`Download`](Action::Download) /
652    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
653    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
654    #[allow(clippy::too_many_arguments)]
655    async fn apply(
656        &self,
657        client_lock: &ClientLock<'_, C>,
658        action: &Action,
659        manifest: &mut Manifest,
660        albums: &mut BTreeMap<String, AlbumArt>,
661        playlists: &mut BTreeMap<String, PlaylistState>,
662        tracked_paths: &mut HashMap<String, u32>,
663        committed: &BTreeSet<String>,
664    ) -> Result<Effect, Fail> {
665        match action {
666            Action::Download { .. } | Action::Reformat { .. } => {
667                unreachable!("audio actions are applied in the concurrent phase")
668            }
669            Action::Retag {
670                clip,
671                lineage,
672                path,
673            } => self.retag(manifest, clip, lineage, path).await,
674            Action::Rename { from, to } => self.rename(manifest, from, to),
675            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
676            Action::Skip { clip_id } => {
677                self.refresh_preserve(manifest, clip_id);
678                Ok(Effect::Skipped)
679            }
680            Action::WriteArtifact {
681                kind,
682                path,
683                source_url,
684                hash,
685                owner_id,
686                content,
687            } => {
688                self.write_artifact(
689                    manifest,
690                    albums,
691                    playlists,
692                    *kind,
693                    path,
694                    source_url,
695                    hash,
696                    owner_id,
697                    content.as_deref(),
698                    tracked_paths,
699                    committed,
700                )
701                .await
702            }
703            Action::DeleteArtifact {
704                kind,
705                path,
706                owner_id,
707            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
708            Action::MoveArtifact {
709                kind,
710                from,
711                to,
712                source_url,
713                hash,
714                owner_id,
715            } => {
716                self.move_artifact(
717                    manifest,
718                    albums,
719                    playlists,
720                    *kind,
721                    from,
722                    to,
723                    source_url,
724                    hash,
725                    owner_id,
726                    tracked_paths,
727                    committed,
728                )
729                .await
730            }
731            Action::WriteStem {
732                clip_id,
733                key,
734                stem_id,
735                path,
736                source_url,
737                format,
738                hash,
739            } => {
740                self.write_stem(
741                    client_lock,
742                    manifest,
743                    clip_id,
744                    key,
745                    stem_id,
746                    path,
747                    source_url,
748                    *format,
749                    hash,
750                    committed,
751                )
752                .await
753            }
754            Action::DeleteStem { clip_id, key, path } => {
755                self.delete_stem(manifest, clip_id, key, path)
756            }
757            Action::MoveStem {
758                clip_id,
759                key,
760                stem_id,
761                from,
762                to,
763                source_url,
764                format,
765                hash,
766            } => {
767                self.move_stem(
768                    client_lock,
769                    manifest,
770                    clip_id,
771                    key,
772                    stem_id,
773                    from,
774                    to,
775                    source_url,
776                    *format,
777                    hash,
778                    tracked_paths,
779                    committed,
780                )
781                .await
782            }
783        }
784    }
785
786    /// Render one audio action's tagged bytes, side-effect-free.
787    ///
788    /// This is the concurrent part: it fetches, transcodes, and tags the file
789    /// (through shared ports, plus the client behind `client_lock`), then returns
790    /// the bytes and where they must go. It deliberately writes nothing, removes
791    /// nothing, and never touches `manifest`, so many run at once and an aborted
792    /// run can drop them with no destination or manifest effect. The serial
793    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
794    async fn prepare_audio(
795        &self,
796        client_lock: &ClientLock<'_, C>,
797        action: &Action,
798    ) -> Result<RenderedAudio, Fail> {
799        match action {
800            Action::Download {
801                clip,
802                lineage,
803                path,
804                format,
805            } => {
806                let bytes = self
807                    .produce_audio(client_lock, clip, lineage, *format)
808                    .await?;
809                Ok(RenderedAudio {
810                    clip_id: clip.id.clone(),
811                    path: path.clone(),
812                    format: *format,
813                    from_path: None,
814                    effect: Effect::Downloaded,
815                    bytes,
816                })
817            }
818            Action::Reformat {
819                clip,
820                path,
821                from_path,
822                from: _,
823                to,
824            } => {
825                // A Reformat action carries no lineage, so recover it from the
826                // desired set (the same context that drove naming and the hash),
827                // falling back to a self-rooted context when the clip is not in
828                // the current selection.
829                let lineage = self
830                    .by_id
831                    .get(clip.id.as_str())
832                    .map(|d| d.lineage.clone())
833                    .unwrap_or_else(|| LineageContext::own_root(clip));
834                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
835                Ok(RenderedAudio {
836                    clip_id: clip.id.clone(),
837                    path: path.clone(),
838                    format: *to,
839                    from_path: Some(from_path.clone()),
840                    effect: Effect::Reformatted,
841                    bytes,
842                })
843            }
844            _ => unreachable!("prepare_audio only handles audio actions"),
845        }
846    }
847
848    /// Commit one rendered audio result serially, in plan order.
849    ///
850    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
851    /// the superseded file, then records the manifest entry. Ordering the write
852    /// before the removal keeps a crash from losing both copies; keeping all of
853    /// this off the concurrent phase preserves the sequential executor's plan-order
854    /// guarantee for every destination and manifest effect.
855    fn commit_audio(
856        &self,
857        manifest: &mut Manifest,
858        rendered: RenderedAudio,
859    ) -> Result<Effect, Fail> {
860        let RenderedAudio {
861            clip_id,
862            path,
863            format,
864            from_path,
865            effect,
866            bytes,
867        } = rendered;
868        let size = self.write_verify(&clip_id, &path, &bytes)?;
869        if let Some(from) = from_path {
870            // The new file is safely in place; only now drop the old rendering.
871            self.fs.remove(&from).map_err(|err| {
872                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
873            })?;
874        }
875        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
876        Ok(effect)
877    }
878
879    /// Re-tag the existing file in place to match current metadata and art.
880    async fn retag(
881        &self,
882        manifest: &mut Manifest,
883        clip: &Clip,
884        lineage: &LineageContext,
885        path: &str,
886    ) -> Result<Effect, Fail> {
887        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
888            return Err(permanent_fail(
889                &clip.id,
890                "retag target missing from manifest",
891            ));
892        };
893
894        if format == AudioFormat::Wav {
895            let (meta, synced) = self.track_meta(clip, lineage);
896            let cover = self.fetch_cover(clip).await;
897            let existing = self.fs.read(path).map_err(|err| {
898                permanent_fail(&clip.id, format!("could not read for retag: {err}"))
899            })?;
900            let tagged = tag_wav(&existing, &meta, cover.as_deref(), synced)
901                .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
902            let size = self.write_verify(&clip.id, path, &tagged)?;
903            self.refresh_hashes(manifest, &clip.id, Some(size));
904            return Ok(Effect::Retagged);
905        }
906
907        let (meta, synced) = self.track_meta(clip, lineage);
908        let cover = self.fetch_cover(clip).await;
909        let existing = self
910            .fs
911            .read(path)
912            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
913        let tagged = match format {
914            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
915            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
916            AudioFormat::Wav => unreachable!("WAV handled above"),
917        }
918        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
919        let size = self.write_verify(&clip.id, path, &tagged)?;
920        self.refresh_hashes(manifest, &clip.id, Some(size));
921        Ok(Effect::Retagged)
922    }
923
924    /// Move the file and update the entry's path (and protection).
925    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
926        let label = self
927            .by_path
928            .get(to)
929            .map(|d| d.clip.id.clone())
930            .unwrap_or_else(|| to.to_owned());
931        self.fs.rename(from, to).map_err(|err| {
932            if err.is_out_of_space() {
933                disk_fail(label, "disk full: no space left to rename")
934            } else {
935                permanent_fail(label, format!("rename failed: {err}"))
936            }
937        })?;
938
939        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
940            manifest
941                .entries
942                .iter()
943                .find(|(_, entry)| entry.path == from)
944                .map(|(id, _)| id.clone())
945        });
946        if let Some(id) = clip_id
947            && let Some(entry) = manifest.entries.get_mut(&id)
948        {
949            entry.path = to.to_owned();
950            if let Some(d) = self.by_path.get(to) {
951                entry.preserve = preserve_for(d);
952            }
953        }
954        Ok(Effect::Renamed)
955    }
956
957    /// Remove the file and drop the manifest entry.
958    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
959        self.fs
960            .remove(path)
961            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
962        manifest.remove(clip_id);
963        Ok(Effect::Deleted)
964    }
965
966    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
967    /// on the owning manifest entry.
968    ///
969    /// The fetch and write share the audio path's resilience: `fetch_bytes`
970    /// retries transient failures and verifies `Content-Length`, and
971    /// `write_verify` confirms the on-disk size. A failure is attributed to the
972    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
973    /// aborts the whole run (only an auth failure or a full disk does, matching
974    /// audio).
975    ///
976    /// The bytes written depend on the kind: a static cover is the fetched image
977    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
978    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
979    ///
980    /// A sidecar is only ever written for a clip whose audio is present: a
981    /// successful `Download`/`Reformat` creates the manifest entry earlier in
982    /// this run, and a prior-run clip already has one. So an absent owning entry
983    /// means the audio failed or never existed this run; we skip (no fetch, no
984    /// write) rather than strand an untracked sidecar with no owning audio.
985    ///
986    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg),
987    /// [`FolderWebp`](ArtifactKind::FolderWebp), and
988    /// [`FolderMp4`](ArtifactKind::FolderMp4)) is album-scoped: its `owner_id`
989    /// is the album's stable root id, not a manifest clip, so it skips the
990    /// manifest presence guard and records its state on the album store instead.
991    ///
992    /// When a title or album change moves the audio, reconcile re-emits this
993    /// write at the NEW path; this handler then removes the sidecar left at the
994    /// artifact's previously tracked path, moving it rather than orphaning it.
995    /// The removal happens only after the new file is safely written, and a
996    /// remove failure returns before the state slot advances, so the next run
997    /// re-plans the identical write and retries — self-healing, never an orphan.
998    #[allow(clippy::too_many_arguments)]
999    async fn write_artifact(
1000        &self,
1001        manifest: &mut Manifest,
1002        albums: &mut BTreeMap<String, AlbumArt>,
1003        playlists: &mut BTreeMap<String, PlaylistState>,
1004        kind: ArtifactKind,
1005        path: &str,
1006        source_url: &str,
1007        hash: &str,
1008        owner_id: &str,
1009        content: Option<&str>,
1010        tracked_paths: &mut HashMap<String, u32>,
1011        committed: &BTreeSet<String>,
1012    ) -> Result<Effect, Fail> {
1013        // A per-song sidecar needs its owning clip's manifest entry; album and
1014        // playlist kinds are keyed elsewhere and skip this guard.
1015        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
1016            // The owning audio never landed this run, so this sidecar is skipped
1017            // and will never drain a cover the producer cached for it. Drop that
1018            // entry now: an insert without a matching sidecar write must not
1019            // outlive its clip, keeping `cover_cache` bounded to the clips in
1020            // flight (#89). A non-cover kind has no entry here, so this is a
1021            // harmless no-op for them.
1022            self.cover_cache
1023                .lock()
1024                .expect("cover cache mutex poisoned")
1025                .remove(source_url);
1026            return Ok(Effect::Skipped);
1027        }
1028        // Capture the path this artifact was last tracked at, BEFORE the slot is
1029        // overwritten below, so a path-changing write (a title/album rename that
1030        // moves the audio) can clean up the old sidecar it left behind. Cover
1031        // kinds live on the manifest, folder kinds on the album store; playlists
1032        // reconcile their own old-path delete and so opt out here.
1033        let old_path = match kind {
1034            ArtifactKind::CoverJpg => manifest
1035                .get(owner_id)
1036                .and_then(|e| e.cover_jpg.as_ref())
1037                .map(|s| s.path.clone()),
1038            ArtifactKind::CoverWebp => manifest
1039                .get(owner_id)
1040                .and_then(|e| e.cover_webp.as_ref())
1041                .map(|s| s.path.clone()),
1042            ArtifactKind::DetailsTxt => manifest
1043                .get(owner_id)
1044                .and_then(|e| e.details_txt.as_ref())
1045                .map(|s| s.path.clone()),
1046            ArtifactKind::LyricsTxt => manifest
1047                .get(owner_id)
1048                .and_then(|e| e.lyrics_txt.as_ref())
1049                .map(|s| s.path.clone()),
1050            ArtifactKind::Lrc => manifest
1051                .get(owner_id)
1052                .and_then(|e| e.lrc.as_ref())
1053                .map(|s| s.path.clone()),
1054            ArtifactKind::VideoMp4 => manifest
1055                .get(owner_id)
1056                .and_then(|e| e.video_mp4.as_ref())
1057                .map(|s| s.path.clone()),
1058            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4 => albums
1059                .get(owner_id)
1060                .and_then(|a| a.artifact(kind))
1061                .map(|s| s.path.clone()),
1062            ArtifactKind::Playlist => None,
1063        };
1064        // A generated artifact (a playlist) carries its body inline and never
1065        // touches the network; a fetched one pulls (and transcodes) its source.
1066        let bytes = match content {
1067            Some(text) => text.as_bytes().to_vec(),
1068            None => self.artifact_bytes(kind, source_url, owner_id).await?,
1069        };
1070        self.write_verify(owner_id, path, &bytes)?;
1071        // The new sidecar is safely in place; only now drop a stale copy left at
1072        // the previous path (the audio moved). `remove` is idempotent, so an
1073        // already-absent old file is fine. On a genuine remove failure we return
1074        // BEFORE updating the slot, leaving the manifest/album pointing at the
1075        // old path: the next run sees the same path drift, re-plans this write,
1076        // and retries the cleanup — convergent, no orphan persists.
1077        //
1078        // The removal is gated so it can never delete a live file (#76). This
1079        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
1080        // file is removed only once nothing else holds it — no other tracked slot
1081        // still references it (count now zero) and no *committed* write this run
1082        // has already placed a file there (`committed`, the commit-tracked twin of
1083        // `suppress_path_aliasing`). On a path swap (A: x -> y while B: y -> x)
1084        // the earlier write commits its path, so the later mover keeps it; when
1085        // two slots share a path after a prior failed swap, the reference count
1086        // keeps it. But a merely *planned* colliding write that later fails no
1087        // longer protects a stale file, so it is cleaned up rather than orphaned
1088        // (#142).
1089        if let Some(old) = old_path.as_deref()
1090            && !old.is_empty()
1091            && old != path
1092        {
1093            let still_referenced = tracked_paths
1094                .get_mut(old)
1095                .map(|count| {
1096                    *count = count.saturating_sub(1);
1097                    *count > 0
1098                })
1099                .unwrap_or(false);
1100            if !still_referenced && !committed.contains(old) {
1101                self.fs.remove(old).map_err(|err| {
1102                    permanent_fail(
1103                        owner_id,
1104                        format!("could not remove old sidecar {old}: {err}"),
1105                    )
1106                })?;
1107            }
1108        }
1109        if is_album_kind(kind) {
1110            albums.entry(owner_id.to_owned()).or_default().set(
1111                kind,
1112                Some(ArtifactState {
1113                    path: path.to_owned(),
1114                    hash: hash.to_owned(),
1115                }),
1116            );
1117        } else if is_playlist_kind(kind) {
1118            playlists.insert(
1119                owner_id.to_owned(),
1120                PlaylistState {
1121                    name: playlist_name_from_path(path),
1122                    path: path.to_owned(),
1123                    hash: hash.to_owned(),
1124                },
1125            );
1126        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1127            set_manifest_artifact(
1128                entry,
1129                kind,
1130                Some(ArtifactState {
1131                    path: path.to_owned(),
1132                    hash: hash.to_owned(),
1133                }),
1134            );
1135        }
1136        Ok(Effect::ArtifactWritten)
1137    }
1138
1139    /// Relocate a fetched per-clip sidecar with a local rename, falling back to a
1140    /// fetch-and-write when the move is unsafe or the old file has vanished.
1141    ///
1142    /// Reconcile downgrades a pure path drift (same bytes, new path, old file
1143    /// present, fetched kind) to a `MoveArtifact`, so a retitle renames the file
1144    /// rather than re-downloading a cover or re-transcoding an animated WebP
1145    /// (#141). The in-place rename is taken only when `from` is this slot's alone
1146    /// to give up (no other tracked slot references it and no committed write has
1147    /// placed a file there); otherwise, or if the rename fails, the ordinary
1148    /// [`write_artifact`](Self::write_artifact) fetches fresh bytes and runs the
1149    /// gated old-path cleanup, so a swap or co-reference is handled exactly as
1150    /// before.
1151    #[allow(clippy::too_many_arguments)]
1152    async fn move_artifact(
1153        &self,
1154        manifest: &mut Manifest,
1155        albums: &mut BTreeMap<String, AlbumArt>,
1156        playlists: &mut BTreeMap<String, PlaylistState>,
1157        kind: ArtifactKind,
1158        from: &str,
1159        to: &str,
1160        source_url: &str,
1161        hash: &str,
1162        owner_id: &str,
1163        tracked_paths: &mut HashMap<String, u32>,
1164        committed: &BTreeSet<String>,
1165    ) -> Result<Effect, Fail> {
1166        // A per-clip sidecar needs its owning clip's audio present, exactly as
1167        // write_artifact requires.
1168        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
1169            return Ok(Effect::Skipped);
1170        }
1171        // Relocate in place only when `from` is ours alone to give up: no other
1172        // tracked slot still references it (a prior failed swap can share a path)
1173        // and no committed write this run has already placed a file there.
1174        // Otherwise the fetch-and-write fallback copies fresh bytes and runs the
1175        // gated old-path cleanup.
1176        let exclusive =
1177            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1178        if from != to && exclusive {
1179            match self.fs.rename(from, to) {
1180                Ok(()) => {
1181                    if let Some(count) = tracked_paths.get_mut(from) {
1182                        *count = count.saturating_sub(1);
1183                    }
1184                    if let Some(entry) = manifest.entries.get_mut(owner_id) {
1185                        set_manifest_artifact(
1186                            entry,
1187                            kind,
1188                            Some(ArtifactState {
1189                                path: to.to_owned(),
1190                                hash: hash.to_owned(),
1191                            }),
1192                        );
1193                    }
1194                    return Ok(Effect::Renamed);
1195                }
1196                Err(err) if err.is_out_of_space() => {
1197                    return Err(disk_fail(
1198                        owner_id,
1199                        "disk full: no space left to move sidecar",
1200                    ));
1201                }
1202                // The old file has vanished, or the rename is unsupported: fall
1203                // through to a fetch-and-write at `to`.
1204                Err(_) => {}
1205            }
1206        }
1207        self.write_artifact(
1208            manifest,
1209            albums,
1210            playlists,
1211            kind,
1212            to,
1213            source_url,
1214            hash,
1215            owner_id,
1216            None,
1217            tracked_paths,
1218            committed,
1219        )
1220        .await
1221    }
1222    ///
1223    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1224    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1225    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1226    /// ffmpeg port; every other kind is the fetched source verbatim (the static
1227    /// [`CoverJpg`](ArtifactKind::CoverJpg) / album [`FolderJpg`](ArtifactKind::FolderJpg)
1228    /// image, or the raw album [`FolderMp4`](ArtifactKind::FolderMp4) whose
1229    /// `video_cover_url` is kept untranscoded). A fetch or transcode failure
1230    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1231    /// disk-full transcode, which aborts the run like the audio FLAC path.
1232    async fn artifact_bytes(
1233        &self,
1234        kind: ArtifactKind,
1235        source_url: &str,
1236        owner_id: &str,
1237    ) -> Result<Vec<u8>, Fail> {
1238        // Reuse the cover the audio producer already fetched for the embedded tag
1239        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1240        // is taken and dropped in its own statement so it never spans the await.
1241        let cached = self
1242            .cover_cache
1243            .lock()
1244            .expect("cover cache mutex poisoned")
1245            .remove(source_url);
1246        let source = match cached {
1247            Some(bytes) => bytes,
1248            None => {
1249                let fetched = self
1250                    .fetch_bytes(source_url)
1251                    .await
1252                    .map_err(|err| err.attribute(owner_id))?;
1253                // Cache the raw source when a sibling folder artifact will fetch
1254                // the same URL (the `both` retention: cover.webp + cover.mp4), so
1255                // it is fetched exactly once. Bounded to shared URLs and drained
1256                // on the sibling's use.
1257                if self.shared_cover_urls.contains(source_url) {
1258                    self.cover_cache
1259                        .lock()
1260                        .expect("cover cache mutex poisoned")
1261                        .insert(source_url.to_owned(), fetched.clone());
1262                }
1263                fetched
1264            }
1265        };
1266        match kind {
1267            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1268                .ffmpeg
1269                .mp4_to_webp(&source, self.opts.cover_webp)
1270                .await
1271                .map_err(|err| {
1272                    if err.is_out_of_space() {
1273                        disk_fail(owner_id, "disk full: no space left to transcode")
1274                    } else {
1275                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1276                    }
1277                }),
1278            // The text sidecars are generated and always carry inline content, so
1279            // `write_artifact` never reaches this fetch path for them. Guard it so
1280            // a future miswiring fails loudly rather than fetching a URL.
1281            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1282                permanent_fail(owner_id, "text sidecar requires inline content"),
1283            ),
1284            ArtifactKind::CoverJpg
1285            | ArtifactKind::FolderJpg
1286            | ArtifactKind::FolderMp4
1287            | ArtifactKind::Playlist
1288            | ArtifactKind::VideoMp4 => Ok(source),
1289        }
1290    }
1291
1292    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1293    ///
1294    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1295    /// When the owning entry is already gone (its audio was deleted earlier this
1296    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1297    ///
1298    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1299    /// the album's root id, not on a manifest clip.
1300    ///
1301    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1302    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1303    /// untracked, but the design stays convergent rather than transactional: the
1304    /// next run re-plans the same removal and retries, and any directory it would
1305    /// have emptied is pruned once the file finally clears.
1306    fn delete_artifact(
1307        &self,
1308        manifest: &mut Manifest,
1309        albums: &mut BTreeMap<String, AlbumArt>,
1310        playlists: &mut BTreeMap<String, PlaylistState>,
1311        kind: ArtifactKind,
1312        path: &str,
1313        owner_id: &str,
1314    ) -> Result<Effect, Fail> {
1315        self.fs
1316            .remove(path)
1317            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1318        if is_album_kind(kind) {
1319            if let Some(art) = albums.get_mut(owner_id) {
1320                art.set(kind, None);
1321                if art.is_empty() {
1322                    albums.remove(owner_id);
1323                }
1324            }
1325        } else if is_playlist_kind(kind) {
1326            playlists.remove(owner_id);
1327        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1328            set_manifest_artifact(entry, kind, None);
1329        }
1330        Ok(Effect::ArtifactDeleted)
1331    }
1332
1333    /// Fetch one stem's bytes, write them atomically, then record the stem on
1334    /// the owning clip's keyed stem map.
1335    ///
1336    /// Mirrors [`write_artifact`](Self::write_artifact) for the keyed-stem case,
1337    /// sharing the fetch resilience (`fetch_bytes` retries and verifies
1338    /// `Content-Length`) and the atomic size-verified write. A stem is only ever
1339    /// written for a clip whose audio is present, so an absent owning manifest
1340    /// entry means the audio failed or never existed this run; we skip rather
1341    /// than strand an untracked stem with no owning audio.
1342    ///
1343    /// Stems are stored RAW in their native container and are NEVER transcoded to
1344    /// FLAC, even when the song's own format is FLAC — they are the deliberate
1345    /// exception. A `Wav` stem is rendered through the free `convert_wav` flow
1346    /// (see [`fetch_stem_bytes`](Self::fetch_stem_bytes)); an `Mp3` stem is fetched
1347    /// straight from its public CDN url. Either way the bytes land verbatim at
1348    /// `path`, whose extension already matches the stem format.
1349    ///
1350    /// When a title/album change moves the song, reconcile re-emits this write at
1351    /// the NEW path; this handler then removes the stem left at the previously
1352    /// tracked path, moving it rather than orphaning it. The removal happens only
1353    /// after the new file is safely written and only when nothing else this run
1354    /// writes that path, and a remove failure returns before the slot advances so
1355    /// the next run re-plans the identical write and retries — self-healing.
1356    #[allow(clippy::too_many_arguments)]
1357    async fn write_stem(
1358        &self,
1359        client_lock: &ClientLock<'_, C>,
1360        manifest: &mut Manifest,
1361        clip_id: &str,
1362        key: &str,
1363        stem_id: &str,
1364        path: &str,
1365        source_url: &str,
1366        format: StemFormat,
1367        hash: &str,
1368        committed: &BTreeSet<String>,
1369    ) -> Result<Effect, Fail> {
1370        // A stem needs its owning clip's manifest entry (its audio must exist).
1371        if manifest.get(clip_id).is_none() {
1372            return Ok(Effect::Skipped);
1373        }
1374        let old_path = manifest
1375            .get(clip_id)
1376            .and_then(|e| e.stems.get(key))
1377            .map(|s| s.path.clone());
1378        let bytes = self
1379            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1380            .await?;
1381        self.write_verify(clip_id, path, &bytes)?;
1382        // The new stem is in place; only now drop a stale copy left at the old
1383        // path (the song moved, or the stem format changed). `remove` is
1384        // idempotent. A path a *committed* write this run has already placed is
1385        // never removed (the commit-tracked twin of `suppress_path_aliasing`); a
1386        // merely planned write that later fails no longer protects a stale file
1387        // (#142). On a genuine remove failure we return BEFORE updating the slot,
1388        // so the next run re-plans the same write and retries the cleanup.
1389        if let Some(old) = old_path.as_deref()
1390            && !old.is_empty()
1391            && old != path
1392            && !committed.contains(old)
1393        {
1394            self.fs.remove(old).map_err(|err| {
1395                permanent_fail(clip_id, format!("could not remove old stem {old}: {err}"))
1396            })?;
1397        }
1398        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1399            set_manifest_stem(
1400                entry,
1401                key,
1402                Some(ArtifactState {
1403                    path: path.to_owned(),
1404                    hash: hash.to_owned(),
1405                }),
1406            );
1407        }
1408        Ok(Effect::ArtifactWritten)
1409    }
1410
1411    /// Relocate a stem with a local rename, falling back to a fetch-and-write
1412    /// when the move is unsafe or the old file has vanished (#141).
1413    ///
1414    /// Reconcile downgrades a pure stem path drift to a `MoveStem`, so a retitle
1415    /// renames the raw stem rather than re-rendering a WAV through `convert_wav`
1416    /// or re-fetching an MP3. The in-place rename is taken only when `from` is
1417    /// this slot's alone to give up (no other tracked slot references it — two
1418    /// same-base clips can share a stem path after a partially-failed swap — and
1419    /// no committed write this run already holds it); otherwise the
1420    /// fetch-and-write fallback re-fetches the correct bytes at `to`, so a
1421    /// co-referenced shared stem is never renamed away with mismatched content.
1422    #[allow(clippy::too_many_arguments)]
1423    async fn move_stem(
1424        &self,
1425        client_lock: &ClientLock<'_, C>,
1426        manifest: &mut Manifest,
1427        clip_id: &str,
1428        key: &str,
1429        stem_id: &str,
1430        from: &str,
1431        to: &str,
1432        source_url: &str,
1433        format: StemFormat,
1434        hash: &str,
1435        tracked_paths: &mut HashMap<String, u32>,
1436        committed: &BTreeSet<String>,
1437    ) -> Result<Effect, Fail> {
1438        if manifest.get(clip_id).is_none() {
1439            return Ok(Effect::Skipped);
1440        }
1441        let exclusive =
1442            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1443        if from != to && exclusive {
1444            match self.fs.rename(from, to) {
1445                Ok(()) => {
1446                    if let Some(count) = tracked_paths.get_mut(from) {
1447                        *count = count.saturating_sub(1);
1448                    }
1449                    if let Some(entry) = manifest.entries.get_mut(clip_id) {
1450                        set_manifest_stem(
1451                            entry,
1452                            key,
1453                            Some(ArtifactState {
1454                                path: to.to_owned(),
1455                                hash: hash.to_owned(),
1456                            }),
1457                        );
1458                    }
1459                    return Ok(Effect::Renamed);
1460                }
1461                Err(err) if err.is_out_of_space() => {
1462                    return Err(disk_fail(clip_id, "disk full: no space left to move stem"));
1463                }
1464                // The old file has vanished, or the rename is unsupported: fall
1465                // through to a fetch-and-write at `to`.
1466                Err(_) => {}
1467            }
1468        }
1469        self.write_stem(
1470            client_lock,
1471            manifest,
1472            clip_id,
1473            key,
1474            stem_id,
1475            to,
1476            source_url,
1477            format,
1478            hash,
1479            committed,
1480        )
1481        .await
1482    }
1483
1484    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1485    ///
1486    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1487    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1488    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1489    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1490    /// stem with no id to render) downloads its public CDN url directly. Stems
1491    /// are the deliberate exception to the source format: the bytes are returned
1492    /// exactly as delivered and are never re-encoded to FLAC.
1493    async fn fetch_stem_bytes(
1494        &self,
1495        client_lock: &ClientLock<'_, C>,
1496        clip_id: &str,
1497        stem_id: &str,
1498        source_url: &str,
1499        format: StemFormat,
1500    ) -> Result<Vec<u8>, Fail> {
1501        let url = match format {
1502            StemFormat::Wav if !stem_id.is_empty() => {
1503                match self.resolve_wav_url(client_lock, stem_id).await? {
1504                    Some(url) => url,
1505                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1506                }
1507            }
1508            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1509            _ => source_url.to_owned(),
1510        };
1511        self.fetch_bytes(&url)
1512            .await
1513            .map_err(|err| err.attribute(clip_id))
1514    }
1515
1516    /// Remove one stem file and clear its slot in the owning clip's stem map.
1517    ///
1518    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1519    /// the owning entry is already gone (its audio was deleted earlier this run,
1520    /// co-deleting the stem), there is no slot to clear and that is fine; the
1521    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1522    fn delete_stem(
1523        &self,
1524        manifest: &mut Manifest,
1525        clip_id: &str,
1526        key: &str,
1527        path: &str,
1528    ) -> Result<Effect, Fail> {
1529        self.fs
1530            .remove(path)
1531            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1532        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1533            set_manifest_stem(entry, key, None);
1534        }
1535        Ok(Effect::ArtifactDeleted)
1536    }
1537
1538    /// Download (and transcode/tag) the audio for `clip` in `format`.
1539    async fn produce_audio(
1540        &self,
1541        client_lock: &ClientLock<'_, C>,
1542        clip: &Clip,
1543        lineage: &LineageContext,
1544        format: AudioFormat,
1545    ) -> Result<Vec<u8>, Fail> {
1546        let (meta, synced) = self.track_meta(clip, lineage);
1547        match format {
1548            AudioFormat::Mp3 => {
1549                let url = clip.mp3_url();
1550                let audio = self
1551                    .fetch_bytes(&url)
1552                    .await
1553                    .map_err(|err| err.attribute(&clip.id))?;
1554                let cover = self.fetch_cover(clip).await;
1555                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1556                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1557            }
1558            AudioFormat::Flac => {
1559                let wav = self.fetch_wav(client_lock, clip).await?;
1560                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1561                    if err.is_out_of_space() {
1562                        disk_fail(&clip.id, "disk full: no space left to transcode")
1563                    } else {
1564                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1565                    }
1566                })?;
1567                let cover = self.fetch_cover(clip).await;
1568                tag_flac(&flac, &meta, cover.as_deref())
1569                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1570            }
1571            AudioFormat::Wav => {
1572                let wav = self.fetch_wav(client_lock, clip).await?;
1573                let cover = self.fetch_cover(clip).await;
1574                tag_wav(&wav, &meta, cover.as_deref(), synced)
1575                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1576            }
1577        }
1578    }
1579
1580    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1581    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1582        self.synced
1583            .get(clip_id)
1584            .filter(|aligned| !aligned.is_empty())
1585    }
1586
1587    /// The track metadata for a clip, paired with its synced lyrics (if any).
1588    ///
1589    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1590    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1591    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1592    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1593    fn track_meta<'m>(
1594        &'m self,
1595        clip: &Clip,
1596        lineage: &LineageContext,
1597    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1598        let synced = self.synced_for(&clip.id);
1599        let mut meta = TrackMetadata::from_clip(clip, lineage);
1600        if let Some(aligned) = synced {
1601            meta.lyrics = aligned.plain_text();
1602        }
1603        (meta, synced)
1604    }
1605
1606    /// Resolve the rendered WAV URL and download it.
1607    async fn fetch_wav(
1608        &self,
1609        client_lock: &ClientLock<'_, C>,
1610        clip: &Clip,
1611    ) -> Result<Vec<u8>, Fail> {
1612        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1613            Some(url) => url,
1614            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1615        };
1616        self.fetch_bytes(&url)
1617            .await
1618            .map_err(|err| err.attribute(&clip.id))
1619    }
1620
1621    /// Read the WAV URL, requesting a render and polling if it is not ready.
1622    ///
1623    /// `None` means the render did not become ready within the poll budget; the
1624    /// caller treats that as a non-fatal transient failure, never a silent skip.
1625    ///
1626    /// Each client call briefly locks `client_lock`; the poll waits happen
1627    /// unlocked, so concurrent clips interleave their WAV renders rather than
1628    /// serialising behind one clip's whole poll budget.
1629    async fn resolve_wav_url(
1630        &self,
1631        client_lock: &ClientLock<'_, C>,
1632        id: &str,
1633    ) -> Result<Option<String>, Fail> {
1634        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1635            return Ok(Some(url));
1636        }
1637        self.request_wav_retrying(client_lock, id).await?;
1638        for _ in 0..self.opts.wav_poll_attempts {
1639            self.clock.sleep(self.opts.wav_poll_interval).await;
1640            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1641                return Ok(Some(url));
1642            }
1643        }
1644        Ok(None)
1645    }
1646
1647    /// Read the rendered WAV URL, retrying transient API failures with backoff
1648    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1649    async fn wav_url_retrying(
1650        &self,
1651        client_lock: &ClientLock<'_, C>,
1652        id: &str,
1653    ) -> Result<Option<String>, Fail> {
1654        let mut attempt: u32 = 0;
1655        loop {
1656            let result = {
1657                let mut client = client_lock.lock().await;
1658                client.wav_url(self.http, id).await
1659            };
1660            match result {
1661                Ok(url) => return Ok(url),
1662                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1663                    Some(fail) => return Err(fail),
1664                    None => continue,
1665                },
1666            }
1667        }
1668    }
1669
1670    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1671    async fn request_wav_retrying(
1672        &self,
1673        client_lock: &ClientLock<'_, C>,
1674        id: &str,
1675    ) -> Result<(), Fail> {
1676        let mut attempt: u32 = 0;
1677        loop {
1678            let result = {
1679                let mut client = client_lock.lock().await;
1680                client.request_wav(self.http, id).await
1681            };
1682            match result {
1683                Ok(()) => return Ok(()),
1684                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1685                    Some(fail) => return Err(fail),
1686                    None => continue,
1687                },
1688            }
1689        }
1690    }
1691
1692    /// Classify a core error from the authenticated WAV flow. On a transient
1693    /// class within budget, back off through the [`Clock`] and return `None` to
1694    /// retry; otherwise return the terminal [`Fail`].
1695    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1696        let fail = classify_core(id, err);
1697        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1698            self.clock.sleep(backoff_delay(*attempt, None)).await;
1699            *attempt += 1;
1700            None
1701        } else {
1702            Some(fail)
1703        }
1704    }
1705
1706    /// GET `url`, retrying transient failures with backoff, verifying size.
1707    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1708        let mut attempt: u32 = 0;
1709        loop {
1710            let result = self.http.send(HttpRequest::get(url)).await;
1711            match classify_response(result) {
1712                Ok(body) => return Ok(body),
1713                Err(err) => {
1714                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1715                        let delay = backoff_delay(attempt, err.retry_after);
1716                        self.clock.sleep(delay).await;
1717                        attempt += 1;
1718                        continue;
1719                    }
1720                    return Err(err);
1721                }
1722            }
1723        }
1724    }
1725
1726    /// Download cover art, trying each candidate URL in order; `None` is fine.
1727    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1728        for url in clip.cover_candidates() {
1729            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1730                && (200..=299).contains(&response.status)
1731                && !response.body.is_empty()
1732            {
1733                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1734                // bytes so its write reuses them instead of fetching again (#89).
1735                // The lock guards only the insert, never the await above.
1736                if self.cover_wanted.contains(url) {
1737                    self.cover_cache
1738                        .lock()
1739                        .expect("cover cache mutex poisoned")
1740                        .insert(url.to_owned(), response.body.clone());
1741                }
1742                return Some(response.body);
1743            }
1744        }
1745        None
1746    }
1747
1748    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1749    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1750        self.fs.write_atomic(path, bytes).map_err(|err| {
1751            if err.is_out_of_space() {
1752                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1753            } else {
1754                permanent_fail(clip_id, format!("write failed: {err}"))
1755            }
1756        })?;
1757        match self.fs.metadata(path) {
1758            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1759            Some(stat) => Err(permanent_fail(
1760                clip_id,
1761                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1762            )),
1763            None => Ok(bytes.len() as u64),
1764        }
1765    }
1766
1767    /// Build the manifest entry for a freshly written file.
1768    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1769        match self.by_id.get(clip_id) {
1770            Some(d) => manifest_entry(d, size),
1771            None => ManifestEntry {
1772                path: path.to_owned(),
1773                format,
1774                size,
1775                ..ManifestEntry::default()
1776            },
1777        }
1778    }
1779
1780    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1781    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1782        let desired = self.by_id.get(clip_id).copied();
1783        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1784            if let Some(d) = desired {
1785                entry.meta_hash = d.meta_hash.clone();
1786                entry.art_hash = d.art_hash.clone();
1787                entry.preserve = preserve_for(d);
1788            }
1789            if let Some(size) = size {
1790                entry.size = size;
1791            }
1792        }
1793    }
1794
1795    /// Refresh only an entry's preserve marker from the current desired state.
1796    ///
1797    /// A clip can gain or lose copy/private protection with no file change, which
1798    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1799    /// persisted marker a faithful image of live protection, so the cross-run
1800    /// delete guard (SYNC-8) never reads it stale.
1801    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1802        if let Some(d) = self.by_id.get(clip_id).copied()
1803            && let Some(entry) = manifest.entries.get_mut(clip_id)
1804        {
1805            entry.preserve = preserve_for(d);
1806        }
1807    }
1808}
1809
1810/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1811fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1812    ManifestEntry {
1813        path: d.path.clone(),
1814        format: d.format,
1815        meta_hash: d.meta_hash.clone(),
1816        art_hash: d.art_hash.clone(),
1817        size,
1818        preserve: preserve_for(d),
1819        ..Default::default()
1820    }
1821}
1822
1823/// Whether a written entry must be preserved across runs: held by any copy
1824/// source, or private. The reconcile delete guard reads this marker later.
1825fn preserve_for(d: &Desired) -> bool {
1826    d.private || d.modes.contains(&SourceMode::Copy)
1827}
1828
1829/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1830fn classify_response(
1831    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1832) -> Result<Vec<u8>, FetchError> {
1833    let response = match result {
1834        Ok(response) => response,
1835        Err(err) => {
1836            return Err(FetchError::transient(
1837                format!("transport error: {err}"),
1838                None,
1839            ));
1840        }
1841    };
1842    match response.status {
1843        200..=299 => {
1844            if let Some(expected) = content_length(&response) {
1845                let actual = response.body.len() as u64;
1846                if actual != expected {
1847                    return Err(FetchError::transient(
1848                        format!("truncated download: {actual} of {expected} bytes"),
1849                        None,
1850                    ));
1851                }
1852            }
1853            Ok(response.body)
1854        }
1855        401 | 403 => Err(FetchError::transient(
1856            format!("download rejected: status {}", response.status),
1857            None,
1858        )),
1859        408 => Err(FetchError::transient("request timed out", None)),
1860        429 => Err(FetchError::transient(
1861            "rate limited",
1862            retry_after(&response),
1863        )),
1864        500..=599 => Err(FetchError::transient(
1865            format!("server error {}", response.status),
1866            None,
1867        )),
1868        status => Err(FetchError::permanent(format!(
1869            "download failed: status {status}"
1870        ))),
1871    }
1872}
1873
1874/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1875fn classify_core(id: &str, err: Error) -> Fail {
1876    let reason = err.to_string();
1877    match err {
1878        Error::Auth(_) => auth_fail(id, reason),
1879        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1880        Error::Api(_)
1881        | Error::NotFound(_)
1882        | Error::Tag(_)
1883        | Error::Config(_)
1884        | Error::Refused(_) => permanent_fail(id, reason),
1885    }
1886}
1887
1888/// The provider-reported body size from `Content-Length`, if present and valid.
1889fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1890    response.header("content-length")?.trim().parse().ok()
1891}
1892
1893#[cfg(test)]
1894mod tests {
1895    use super::*;
1896    use crate::ClerkAuth;
1897    use crate::http::HttpResponse;
1898    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1899
1900    fn clip(id: &str) -> Clip {
1901        Clip {
1902            id: id.to_owned(),
1903            title: "Song".to_owned(),
1904            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1905            ..Default::default()
1906        }
1907    }
1908
1909    fn art_clip(id: &str) -> Clip {
1910        Clip {
1911            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1912            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1913            ..clip(id)
1914        }
1915    }
1916
1917    fn ext(format: AudioFormat) -> &'static str {
1918        match format {
1919            AudioFormat::Mp3 => "mp3",
1920            AudioFormat::Flac => "flac",
1921            AudioFormat::Wav => "wav",
1922        }
1923    }
1924
1925    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1926        Desired {
1927            path: format!("{}.{}", clip.id, ext(format)),
1928            lineage: LineageContext::own_root(&clip),
1929            clip,
1930            format,
1931            meta_hash: "m".to_owned(),
1932            art_hash: "art".to_owned(),
1933            modes: vec![SourceMode::Mirror],
1934            trashed: false,
1935            private: false,
1936            artifacts: Vec::new(),
1937            stems: None,
1938        }
1939    }
1940
1941    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1942        ManifestEntry {
1943            path: path.to_owned(),
1944            format,
1945            meta_hash: "old".to_owned(),
1946            art_hash: "old-art".to_owned(),
1947            size: 8,
1948            preserve: false,
1949            ..Default::default()
1950        }
1951    }
1952
1953    #[allow(clippy::too_many_arguments)]
1954    fn run<G: Ffmpeg>(
1955        plan: &Plan,
1956        manifest: &mut Manifest,
1957        desired: &[Desired],
1958        http: &ScriptedHttp,
1959        fs: &MemFs,
1960        ffmpeg: &G,
1961        clock: &RecordingClock,
1962        opts: &ExecOptions,
1963    ) -> ExecOutcome {
1964        let mut albums = BTreeMap::new();
1965        run_with_albums(
1966            plan,
1967            manifest,
1968            &mut albums,
1969            desired,
1970            http,
1971            fs,
1972            ffmpeg,
1973            clock,
1974            opts,
1975        )
1976    }
1977
1978    #[allow(clippy::too_many_arguments)]
1979    fn run_with_albums<G: Ffmpeg>(
1980        plan: &Plan,
1981        manifest: &mut Manifest,
1982        albums: &mut BTreeMap<String, AlbumArt>,
1983        desired: &[Desired],
1984        http: &ScriptedHttp,
1985        fs: &MemFs,
1986        ffmpeg: &G,
1987        clock: &RecordingClock,
1988        opts: &ExecOptions,
1989    ) -> ExecOutcome {
1990        let mut playlists = BTreeMap::new();
1991        run_full(
1992            plan,
1993            manifest,
1994            albums,
1995            &mut playlists,
1996            desired,
1997            http,
1998            fs,
1999            ffmpeg,
2000            clock,
2001            opts,
2002        )
2003    }
2004
2005    #[allow(clippy::too_many_arguments)]
2006    fn run_full<G: Ffmpeg>(
2007        plan: &Plan,
2008        manifest: &mut Manifest,
2009        albums: &mut BTreeMap<String, AlbumArt>,
2010        playlists: &mut BTreeMap<String, PlaylistState>,
2011        desired: &[Desired],
2012        http: &ScriptedHttp,
2013        fs: &MemFs,
2014        ffmpeg: &G,
2015        clock: &RecordingClock,
2016        opts: &ExecOptions,
2017    ) -> ExecOutcome {
2018        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2019        let synced = HashMap::new();
2020        pollster::block_on(execute(
2021            plan,
2022            manifest,
2023            albums,
2024            playlists,
2025            desired,
2026            &synced,
2027            Ports {
2028                client: &mut client,
2029                http,
2030                fs,
2031                ffmpeg,
2032                clock,
2033            },
2034            opts,
2035        ))
2036    }
2037
2038    fn small_poll() -> ExecOptions {
2039        ExecOptions {
2040            max_retries: 3,
2041            wav_poll_attempts: 2,
2042            wav_poll_interval: Duration::from_secs(5),
2043            concurrency: 4,
2044            cover_webp: WebpEncodeSettings::default(),
2045        }
2046    }
2047
2048    // ── Download: MP3 ───────────────────────────────────────────────
2049
2050    #[test]
2051    fn download_mp3_writes_tagged_file_and_records_manifest() {
2052        let c = art_clip("a");
2053        let d = desired(c.clone(), AudioFormat::Mp3);
2054        let plan = Plan {
2055            actions: vec![Action::Download {
2056                clip: c.clone(),
2057                lineage: LineageContext::own_root(&c),
2058                path: d.path.clone(),
2059                format: AudioFormat::Mp3,
2060            }],
2061        };
2062        let http = ScriptedHttp::new()
2063            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2064            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2065        let fs = MemFs::new();
2066        let ffmpeg = StubFfmpeg::flac();
2067        let clock = RecordingClock::new();
2068        let mut manifest = Manifest::new();
2069
2070        let outcome = run(
2071            &plan,
2072            &mut manifest,
2073            &[d],
2074            &http,
2075            &fs,
2076            &ffmpeg,
2077            &clock,
2078            &ExecOptions::default(),
2079        );
2080
2081        assert_eq!(outcome.downloaded, 1);
2082        assert_eq!(outcome.failed(), 0);
2083        assert_eq!(outcome.status, RunStatus::Completed);
2084        let written = fs.read_file("a.mp3").unwrap();
2085        assert_eq!(&written[..3], b"ID3");
2086        assert!(written.ends_with(b"mp3-body"));
2087        let entry = manifest.get("a").unwrap();
2088        assert_eq!(entry.path, "a.mp3");
2089        assert_eq!(entry.format, AudioFormat::Mp3);
2090        assert_eq!(entry.meta_hash, "m");
2091        assert_eq!(entry.art_hash, "art");
2092        assert_eq!(entry.size, written.len() as u64);
2093        assert!(!entry.preserve);
2094    }
2095
2096    #[test]
2097    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
2098        // A clip whose alignment was fetched this run gets a word-level SYLT frame
2099        // and its plain lyric text embedded (USLT), end to end through execute.
2100        let c = art_clip("a");
2101        let d = desired(c.clone(), AudioFormat::Mp3);
2102        let plan = Plan {
2103            actions: vec![Action::Download {
2104                clip: c.clone(),
2105                lineage: LineageContext::own_root(&c),
2106                path: d.path.clone(),
2107                format: AudioFormat::Mp3,
2108            }],
2109        };
2110        let http = ScriptedHttp::new()
2111            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2112            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2113        let fs = MemFs::new();
2114        let ffmpeg = StubFfmpeg::flac();
2115        let clock = RecordingClock::new();
2116        let mut manifest = Manifest::new();
2117        let mut albums = BTreeMap::new();
2118        let mut playlists = BTreeMap::new();
2119        let mut synced = HashMap::new();
2120        synced.insert(
2121            "a".to_string(),
2122            AlignedLyrics::from_json(&serde_json::json!({
2123                "aligned_words": [],
2124                "aligned_lyrics": [
2125                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
2126                     "words": [
2127                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
2128                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
2129                     ]}
2130                ]
2131            })),
2132        );
2133        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2134        let outcome = pollster::block_on(execute(
2135            &plan,
2136            &mut manifest,
2137            &mut albums,
2138            &mut playlists,
2139            &[d],
2140            &synced,
2141            Ports {
2142                client: &mut client,
2143                http: &http,
2144                fs: &fs,
2145                ffmpeg: &ffmpeg,
2146                clock: &clock,
2147            },
2148            &ExecOptions::default(),
2149        ));
2150
2151        assert_eq!(outcome.downloaded, 1);
2152        let written = fs.read_file("a.mp3").unwrap();
2153        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2154        assert_eq!(
2155            tag.synchronised_lyrics().count(),
2156            1,
2157            "a SYLT frame is embedded"
2158        );
2159        // The plain lyric text is populated from the alignment for the USLT frame.
2160        assert_eq!(
2161            tag.lyrics().next().map(|frame| frame.text.as_str()),
2162            Some("hi there")
2163        );
2164    }
2165
2166    #[test]
2167    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
2168        // The synced map is empty when the feature is off (no alignment fetched),
2169        // so no SYLT frame and no lyric text are embedded.
2170        let c = art_clip("a");
2171        let d = desired(c.clone(), AudioFormat::Mp3);
2172        let plan = Plan {
2173            actions: vec![Action::Download {
2174                clip: c.clone(),
2175                lineage: LineageContext::own_root(&c),
2176                path: d.path.clone(),
2177                format: AudioFormat::Mp3,
2178            }],
2179        };
2180        let http = ScriptedHttp::new()
2181            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2182            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2183        let fs = MemFs::new();
2184        let ffmpeg = StubFfmpeg::flac();
2185        let clock = RecordingClock::new();
2186        let mut manifest = Manifest::new();
2187        let mut albums = BTreeMap::new();
2188        let mut playlists = BTreeMap::new();
2189        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2190        let outcome = pollster::block_on(execute(
2191            &plan,
2192            &mut manifest,
2193            &mut albums,
2194            &mut playlists,
2195            &[d],
2196            &HashMap::new(),
2197            Ports {
2198                client: &mut client,
2199                http: &http,
2200                fs: &fs,
2201                ffmpeg: &ffmpeg,
2202                clock: &clock,
2203            },
2204            &ExecOptions::default(),
2205        ));
2206        assert_eq!(outcome.downloaded, 1);
2207        let written = fs.read_file("a.mp3").unwrap();
2208        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2209        assert_eq!(tag.synchronised_lyrics().count(), 0);
2210        assert_eq!(tag.lyrics().count(), 0);
2211    }
2212
2213    #[test]
2214    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
2215        let mut c = clip("a");
2216        c.audio_url = String::new();
2217        let d = desired(c.clone(), AudioFormat::Mp3);
2218        let plan = Plan {
2219            actions: vec![Action::Download {
2220                clip: c.clone(),
2221                lineage: LineageContext::own_root(&c),
2222                path: d.path.clone(),
2223                format: AudioFormat::Mp3,
2224            }],
2225        };
2226        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
2227        let fs = MemFs::new();
2228        let mut manifest = Manifest::new();
2229        let outcome = run(
2230            &plan,
2231            &mut manifest,
2232            &[d],
2233            &http,
2234            &fs,
2235            &StubFfmpeg::flac(),
2236            &RecordingClock::new(),
2237            &ExecOptions::default(),
2238        );
2239        assert_eq!(outcome.downloaded, 1);
2240        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
2241    }
2242
2243    // ── Download: FLAC render + transcode ───────────────────────────
2244
2245    #[test]
2246    fn download_flac_renders_transcodes_and_records() {
2247        let c = clip("b");
2248        let d = desired(c.clone(), AudioFormat::Flac);
2249        let plan = Plan {
2250            actions: vec![Action::Download {
2251                clip: c.clone(),
2252                lineage: LineageContext::own_root(&c),
2253                path: d.path.clone(),
2254                format: AudioFormat::Flac,
2255            }],
2256        };
2257        let http = ScriptedHttp::new()
2258            .with_auth()
2259            .route(
2260                "/wav_file/",
2261                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
2262            )
2263            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
2264        let fs = MemFs::new();
2265        let clock = RecordingClock::new();
2266        let mut manifest = Manifest::new();
2267
2268        let outcome = run(
2269            &plan,
2270            &mut manifest,
2271            &[d],
2272            &http,
2273            &fs,
2274            &StubFfmpeg::flac(),
2275            &clock,
2276            &ExecOptions::default(),
2277        );
2278
2279        assert_eq!(outcome.downloaded, 1);
2280        assert_eq!(outcome.failed(), 0);
2281        let written = fs.read_file("b.flac").unwrap();
2282        assert_eq!(&written[..4], b"fLaC");
2283        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
2284        // The URL was ready immediately, so no render request and no polling.
2285        assert_eq!(http.count("/convert_wav/"), 0);
2286        assert!(clock.sleeps().is_empty());
2287    }
2288
2289    #[test]
2290    fn download_flac_requests_render_then_polls_until_ready() {
2291        let c = clip("c");
2292        let d = desired(c.clone(), AudioFormat::Flac);
2293        let plan = Plan {
2294            actions: vec![Action::Download {
2295                clip: c.clone(),
2296                lineage: LineageContext::own_root(&c),
2297                path: d.path.clone(),
2298                format: AudioFormat::Flac,
2299            }],
2300        };
2301        let http = ScriptedHttp::new()
2302            .with_auth()
2303            .route_seq(
2304                "/wav_file/",
2305                vec![
2306                    Reply::json("{}"),
2307                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2308                ],
2309            )
2310            .route("/convert_wav/", Reply::status(200))
2311            .route("c.wav", Reply::ok(b"wav".to_vec()));
2312        let clock = RecordingClock::new();
2313        let mut manifest = Manifest::new();
2314
2315        let outcome = run(
2316            &plan,
2317            &mut manifest,
2318            &[d],
2319            &http,
2320            &fs_new(),
2321            &StubFfmpeg::flac(),
2322            &clock,
2323            &small_poll(),
2324        );
2325
2326        assert_eq!(outcome.downloaded, 1);
2327        assert_eq!(http.count("/convert_wav/"), 1);
2328        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2329    }
2330
2331    #[test]
2332    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2333        let c = clip("d");
2334        let d = desired(c.clone(), AudioFormat::Flac);
2335        let plan = Plan {
2336            actions: vec![Action::Download {
2337                clip: c.clone(),
2338                lineage: LineageContext::own_root(&c),
2339                path: d.path.clone(),
2340                format: AudioFormat::Flac,
2341            }],
2342        };
2343        let http = ScriptedHttp::new()
2344            .with_auth()
2345            .route("/wav_file/", Reply::json("{}"))
2346            .route("/convert_wav/", Reply::status(200));
2347        let fs = MemFs::new();
2348        let clock = RecordingClock::new();
2349        let mut manifest = Manifest::new();
2350
2351        let outcome = run(
2352            &plan,
2353            &mut manifest,
2354            &[d],
2355            &http,
2356            &fs,
2357            &StubFfmpeg::flac(),
2358            &clock,
2359            &small_poll(),
2360        );
2361
2362        assert_eq!(outcome.downloaded, 0);
2363        assert_eq!(outcome.failed(), 1);
2364        assert_eq!(outcome.failures[0].clip_id, "d");
2365        assert_eq!(outcome.status, RunStatus::Completed);
2366        assert!(!fs.exists("d.flac"));
2367        assert_eq!(clock.sleeps().len(), 2);
2368    }
2369
2370    #[test]
2371    fn flac_transcode_failure_is_recorded_and_skipped() {
2372        let c = clip("t");
2373        let d = desired(c.clone(), AudioFormat::Flac);
2374        let plan = Plan {
2375            actions: vec![Action::Download {
2376                clip: c.clone(),
2377                lineage: LineageContext::own_root(&c),
2378                path: d.path.clone(),
2379                format: AudioFormat::Flac,
2380            }],
2381        };
2382        let http = ScriptedHttp::new()
2383            .with_auth()
2384            .route(
2385                "/wav_file/",
2386                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2387            )
2388            .route("t.wav", Reply::ok(b"wav".to_vec()));
2389        let fs = MemFs::new();
2390        let mut manifest = Manifest::new();
2391
2392        let outcome = run(
2393            &plan,
2394            &mut manifest,
2395            &[d],
2396            &http,
2397            &fs,
2398            &StubFfmpeg::failing(),
2399            &RecordingClock::new(),
2400            &ExecOptions::default(),
2401        );
2402
2403        assert_eq!(outcome.downloaded, 0);
2404        assert_eq!(outcome.failed(), 1);
2405        assert!(!fs.exists("t.flac"));
2406        assert!(manifest.get("t").is_none());
2407    }
2408
2409    // ── Cover fallback ──────────────────────────────────────────────
2410
2411    #[test]
2412    fn cover_falls_back_when_large_image_is_missing() {
2413        let c = art_clip("e");
2414        let d = desired(c.clone(), AudioFormat::Mp3);
2415        let plan = Plan {
2416            actions: vec![Action::Download {
2417                clip: c.clone(),
2418                lineage: LineageContext::own_root(&c),
2419                path: d.path.clone(),
2420                format: AudioFormat::Mp3,
2421            }],
2422        };
2423        let http = ScriptedHttp::new()
2424            .route("e.mp3", Reply::ok(b"body".to_vec()))
2425            .route("e/large.jpg", Reply::status(404))
2426            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2427        let fs = MemFs::new();
2428        let mut manifest = Manifest::new();
2429
2430        let outcome = run(
2431            &plan,
2432            &mut manifest,
2433            &[d],
2434            &http,
2435            &fs,
2436            &StubFfmpeg::flac(),
2437            &RecordingClock::new(),
2438            &ExecOptions::default(),
2439        );
2440
2441        assert_eq!(outcome.downloaded, 1);
2442        let calls = http.calls();
2443        let large = calls
2444            .iter()
2445            .position(|u| u.contains("e/large.jpg"))
2446            .unwrap();
2447        let small = calls
2448            .iter()
2449            .position(|u| u.contains("e/small.jpg"))
2450            .unwrap();
2451        assert!(large < small, "large art tried before small");
2452    }
2453
2454    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2455
2456    #[test]
2457    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2458        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2459        // fetched once and the bytes serve both.
2460        let c = art_clip("a");
2461        let d = desired(c.clone(), AudioFormat::Mp3);
2462        let plan = Plan {
2463            actions: vec![
2464                Action::Download {
2465                    clip: c.clone(),
2466                    lineage: LineageContext::own_root(&c),
2467                    path: d.path.clone(),
2468                    format: AudioFormat::Mp3,
2469                },
2470                Action::WriteArtifact {
2471                    kind: ArtifactKind::CoverJpg,
2472                    path: "a/cover.jpg".to_owned(),
2473                    source_url: c.selected_image_url().unwrap().to_owned(),
2474                    hash: "art".to_owned(),
2475                    owner_id: "a".to_owned(),
2476                    content: None,
2477                },
2478            ],
2479        };
2480        let http = ScriptedHttp::new()
2481            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2482            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2483        let fs = MemFs::new();
2484        let mut manifest = Manifest::new();
2485
2486        let outcome = run(
2487            &plan,
2488            &mut manifest,
2489            &[d],
2490            &http,
2491            &fs,
2492            &StubFfmpeg::flac(),
2493            &RecordingClock::new(),
2494            &ExecOptions::default(),
2495        );
2496
2497        assert_eq!(outcome.downloaded, 1);
2498        assert_eq!(outcome.artifacts_written, 1);
2499        assert_eq!(outcome.failed(), 0);
2500        // Fetched once, not twice.
2501        assert_eq!(http.count("a/large.jpg"), 1);
2502        // The sidecar carries the fetched bytes, and the audio was tagged.
2503        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2504        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2505    }
2506
2507    #[test]
2508    fn concurrent_downloads_reuse_each_clips_own_cover() {
2509        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2510        // (no cross-contamination) and each cover URL is fetched exactly once.
2511        let a = art_clip("a");
2512        let b = art_clip("b");
2513        let da = desired(a.clone(), AudioFormat::Mp3);
2514        let db = desired(b.clone(), AudioFormat::Mp3);
2515        let plan = Plan {
2516            actions: vec![
2517                Action::Download {
2518                    clip: a.clone(),
2519                    lineage: LineageContext::own_root(&a),
2520                    path: da.path.clone(),
2521                    format: AudioFormat::Mp3,
2522                },
2523                Action::WriteArtifact {
2524                    kind: ArtifactKind::CoverJpg,
2525                    path: "a/cover.jpg".to_owned(),
2526                    source_url: a.selected_image_url().unwrap().to_owned(),
2527                    hash: "art".to_owned(),
2528                    owner_id: "a".to_owned(),
2529                    content: None,
2530                },
2531                Action::Download {
2532                    clip: b.clone(),
2533                    lineage: LineageContext::own_root(&b),
2534                    path: db.path.clone(),
2535                    format: AudioFormat::Mp3,
2536                },
2537                Action::WriteArtifact {
2538                    kind: ArtifactKind::CoverJpg,
2539                    path: "b/cover.jpg".to_owned(),
2540                    source_url: b.selected_image_url().unwrap().to_owned(),
2541                    hash: "art".to_owned(),
2542                    owner_id: "b".to_owned(),
2543                    content: None,
2544                },
2545            ],
2546        };
2547        let http = ScriptedHttp::new()
2548            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2549            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2550            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2551            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2552        let fs = MemFs::new();
2553        let mut manifest = Manifest::new();
2554
2555        let outcome = run(
2556            &plan,
2557            &mut manifest,
2558            &[da, db],
2559            &http,
2560            &fs,
2561            &StubFfmpeg::flac(),
2562            &RecordingClock::new(),
2563            &small_poll(),
2564        );
2565
2566        assert_eq!(outcome.downloaded, 2);
2567        assert_eq!(outcome.artifacts_written, 2);
2568        assert_eq!(http.count("a/large.jpg"), 1);
2569        assert_eq!(http.count("b/large.jpg"), 1);
2570        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2571        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2572    }
2573
2574    #[test]
2575    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2576        // The large image 404s so the embed falls back to the small image; the
2577        // sidecar still wants the (dead) large URL and must NOT be handed the
2578        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2579        // the sidecar fetches the large URL itself (then fails on the 404).
2580        let c = art_clip("e");
2581        let d = desired(c.clone(), AudioFormat::Mp3);
2582        let plan = Plan {
2583            actions: vec![
2584                Action::Download {
2585                    clip: c.clone(),
2586                    lineage: LineageContext::own_root(&c),
2587                    path: d.path.clone(),
2588                    format: AudioFormat::Mp3,
2589                },
2590                Action::WriteArtifact {
2591                    kind: ArtifactKind::CoverJpg,
2592                    path: "e/cover.jpg".to_owned(),
2593                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2594                    hash: "art".to_owned(),
2595                    owner_id: "e".to_owned(),
2596                    content: None,
2597                },
2598            ],
2599        };
2600        let http = ScriptedHttp::new()
2601            .route("e.mp3", Reply::ok(b"body".to_vec()))
2602            .route("e/large.jpg", Reply::status(404))
2603            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2604        let fs = MemFs::new();
2605        let mut manifest = Manifest::new();
2606
2607        let outcome = run(
2608            &plan,
2609            &mut manifest,
2610            &[d],
2611            &http,
2612            &fs,
2613            &StubFfmpeg::flac(),
2614            &RecordingClock::new(),
2615            &ExecOptions::default(),
2616        );
2617
2618        assert_eq!(outcome.downloaded, 1);
2619        // The small image was fetched once (the embed fallback) and never reused
2620        // for the large-keyed sidecar; the sidecar went to the network itself.
2621        assert_eq!(http.count("e/small.jpg"), 1);
2622        assert!(
2623            http.count("e/large.jpg") >= 2,
2624            "sidecar refetched the large URL"
2625        );
2626        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2627        assert!(!fs.exists("e/cover.jpg"));
2628    }
2629
2630    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2631
2632    #[test]
2633    fn failed_write_leaves_the_prior_file_intact() {
2634        let c = clip("f");
2635        let d = desired(c.clone(), AudioFormat::Mp3);
2636        let plan = Plan {
2637            actions: vec![Action::Download {
2638                clip: c.clone(),
2639                lineage: LineageContext::own_root(&c),
2640                path: d.path.clone(),
2641                format: AudioFormat::Mp3,
2642            }],
2643        };
2644        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2645        let fs = MemFs::new()
2646            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2647            .fail_write("f.mp3");
2648        let mut manifest = Manifest::new();
2649
2650        let outcome = run(
2651            &plan,
2652            &mut manifest,
2653            &[d],
2654            &http,
2655            &fs,
2656            &StubFfmpeg::flac(),
2657            &RecordingClock::new(),
2658            &ExecOptions::default(),
2659        );
2660
2661        assert_eq!(outcome.downloaded, 0);
2662        assert_eq!(outcome.failed(), 1);
2663        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2664        assert!(manifest.get("f").is_none());
2665    }
2666
2667    #[test]
2668    fn size_mismatch_after_write_is_a_failure() {
2669        let c = clip("g");
2670        let d = desired(c.clone(), AudioFormat::Mp3);
2671        let plan = Plan {
2672            actions: vec![Action::Download {
2673                clip: c.clone(),
2674                lineage: LineageContext::own_root(&c),
2675                path: d.path.clone(),
2676                format: AudioFormat::Mp3,
2677            }],
2678        };
2679        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2680        let fs = MemFs::new().corrupt_write("g.mp3");
2681        let mut manifest = Manifest::new();
2682
2683        let outcome = run(
2684            &plan,
2685            &mut manifest,
2686            &[d],
2687            &http,
2688            &fs,
2689            &StubFfmpeg::flac(),
2690            &RecordingClock::new(),
2691            &ExecOptions::default(),
2692        );
2693
2694        assert_eq!(outcome.downloaded, 0);
2695        assert_eq!(outcome.failed(), 1);
2696        assert!(outcome.failures[0].reason.contains("expected"));
2697        assert!(manifest.get("g").is_none());
2698    }
2699
2700    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2701
2702    #[test]
2703    fn transient_failure_is_retried_then_skipped() {
2704        let c = clip("h");
2705        let d = desired(c.clone(), AudioFormat::Mp3);
2706        let plan = Plan {
2707            actions: vec![Action::Download {
2708                clip: c.clone(),
2709                lineage: LineageContext::own_root(&c),
2710                path: d.path.clone(),
2711                format: AudioFormat::Mp3,
2712            }],
2713        };
2714        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2715        let fs = MemFs::new();
2716        let clock = RecordingClock::new();
2717        let opts = ExecOptions {
2718            max_retries: 2,
2719            ..ExecOptions::default()
2720        };
2721        let mut manifest = Manifest::new();
2722
2723        let outcome = run(
2724            &plan,
2725            &mut manifest,
2726            &[d],
2727            &http,
2728            &fs,
2729            &StubFfmpeg::flac(),
2730            &clock,
2731            &opts,
2732        );
2733
2734        assert_eq!(outcome.downloaded, 0);
2735        assert_eq!(outcome.failed(), 1);
2736        assert_eq!(http.count("h.mp3"), 3);
2737        assert_eq!(clock.sleeps().len(), 2);
2738    }
2739
2740    #[test]
2741    fn truncated_download_is_retried_then_succeeds() {
2742        let c = clip("i");
2743        let d = desired(c.clone(), AudioFormat::Mp3);
2744        let plan = Plan {
2745            actions: vec![Action::Download {
2746                clip: c.clone(),
2747                lineage: LineageContext::own_root(&c),
2748                path: d.path.clone(),
2749                format: AudioFormat::Mp3,
2750            }],
2751        };
2752        let http = ScriptedHttp::new().route_seq(
2753            "i.mp3",
2754            vec![
2755                Reply::ok(b"short".to_vec()).with_content_length(999),
2756                Reply::ok(b"good-body".to_vec()),
2757            ],
2758        );
2759        let fs = MemFs::new();
2760        let clock = RecordingClock::new();
2761        let mut manifest = Manifest::new();
2762
2763        let outcome = run(
2764            &plan,
2765            &mut manifest,
2766            &[d],
2767            &http,
2768            &fs,
2769            &StubFfmpeg::flac(),
2770            &clock,
2771            &ExecOptions::default(),
2772        );
2773
2774        assert_eq!(outcome.downloaded, 1);
2775        assert_eq!(http.count("i.mp3"), 2);
2776        assert_eq!(clock.sleeps().len(), 1);
2777    }
2778
2779    #[test]
2780    fn rate_limit_backs_off_using_retry_after() {
2781        let c = clip("j");
2782        let d = desired(c.clone(), AudioFormat::Mp3);
2783        let plan = Plan {
2784            actions: vec![Action::Download {
2785                clip: c.clone(),
2786                lineage: LineageContext::own_root(&c),
2787                path: d.path.clone(),
2788                format: AudioFormat::Mp3,
2789            }],
2790        };
2791        let http = ScriptedHttp::new().route_seq(
2792            "j.mp3",
2793            vec![
2794                Reply::status(429).with_retry_after(7),
2795                Reply::ok(b"body".to_vec()),
2796            ],
2797        );
2798        let fs = MemFs::new();
2799        let clock = RecordingClock::new();
2800        let mut manifest = Manifest::new();
2801
2802        let outcome = run(
2803            &plan,
2804            &mut manifest,
2805            &[d],
2806            &http,
2807            &fs,
2808            &StubFfmpeg::flac(),
2809            &clock,
2810            &ExecOptions::default(),
2811        );
2812
2813        assert_eq!(outcome.downloaded, 1);
2814        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2815    }
2816
2817    #[test]
2818    fn auth_failure_aborts_the_run() {
2819        let c1 = clip("k1");
2820        let c2 = clip("k2");
2821        let d1 = desired(c1.clone(), AudioFormat::Flac);
2822        let d2 = desired(c2.clone(), AudioFormat::Flac);
2823        let plan = Plan {
2824            actions: vec![
2825                Action::Download {
2826                    clip: c1.clone(),
2827                    lineage: LineageContext::own_root(&c1),
2828                    path: d1.path.clone(),
2829                    format: AudioFormat::Flac,
2830                },
2831                Action::Download {
2832                    clip: c2.clone(),
2833                    lineage: LineageContext::own_root(&c2),
2834                    path: d2.path.clone(),
2835                    format: AudioFormat::Flac,
2836                },
2837            ],
2838        };
2839        // The authenticated WAV-render endpoint rejects auth even after a JWT
2840        // refresh: that is a bad token, so the whole run aborts rather than
2841        // hammering every clip. A CDN media rejection, by contrast, does not.
2842        let http = ScriptedHttp::new()
2843            .with_auth()
2844            .route("/wav_file/", Reply::status(401));
2845        let fs = MemFs::new();
2846        let mut manifest = Manifest::new();
2847
2848        let outcome = run(
2849            &plan,
2850            &mut manifest,
2851            &[d1, d2],
2852            &http,
2853            &fs,
2854            &StubFfmpeg::flac(),
2855            &RecordingClock::new(),
2856            &small_poll(),
2857        );
2858
2859        assert_eq!(outcome.status, RunStatus::AuthAborted);
2860        assert_eq!(outcome.failed(), 1);
2861        assert_eq!(outcome.failures[0].clip_id, "k1");
2862        assert_eq!(outcome.downloaded, 0);
2863    }
2864
2865    // ── Disk-full aborts the run (issue #17) ────────────────────────
2866
2867    #[test]
2868    fn disk_full_primary_write_aborts_the_run() {
2869        // Two MP3 downloads; the first write is out of space. That is systemic,
2870        // so the run aborts before the second is even attempted: exactly one
2871        // failure is recorded and its reason names the disk-full cause.
2872        let c1 = clip("d1");
2873        let c2 = clip("d2");
2874        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2875        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2876        let plan = Plan {
2877            actions: vec![
2878                Action::Download {
2879                    clip: c1.clone(),
2880                    lineage: LineageContext::own_root(&c1),
2881                    path: d1.path.clone(),
2882                    format: AudioFormat::Mp3,
2883                },
2884                Action::Download {
2885                    clip: c2.clone(),
2886                    lineage: LineageContext::own_root(&c2),
2887                    path: d2.path.clone(),
2888                    format: AudioFormat::Mp3,
2889                },
2890            ],
2891        };
2892        let http = ScriptedHttp::new()
2893            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2894            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2895        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2896        let mut manifest = Manifest::new();
2897
2898        let outcome = run(
2899            &plan,
2900            &mut manifest,
2901            &[d1, d2],
2902            &http,
2903            &fs,
2904            &StubFfmpeg::flac(),
2905            &RecordingClock::new(),
2906            &ExecOptions::default(),
2907        );
2908
2909        assert_eq!(outcome.status, RunStatus::DiskFull);
2910        assert_eq!(outcome.failed(), 1);
2911        assert_eq!(outcome.failures[0].clip_id, "d1");
2912        assert!(outcome.failures[0].reason.contains("disk full"));
2913        assert_eq!(outcome.downloaded, 0);
2914        // The second clip was never fetched: the run aborted first.
2915        assert_eq!(http.count("d2.mp3"), 0);
2916        assert!(!fs.exists("d2.mp3"));
2917    }
2918
2919    #[test]
2920    fn disk_full_flac_transcode_aborts_the_run() {
2921        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2922        // there is nowhere to stage the transcode, so the run aborts.
2923        let c1 = clip("d1");
2924        let c2 = clip("d2");
2925        let d1 = desired(c1.clone(), AudioFormat::Flac);
2926        let d2 = desired(c2.clone(), AudioFormat::Flac);
2927        let plan = Plan {
2928            actions: vec![
2929                Action::Download {
2930                    clip: c1.clone(),
2931                    lineage: LineageContext::own_root(&c1),
2932                    path: d1.path.clone(),
2933                    format: AudioFormat::Flac,
2934                },
2935                Action::Download {
2936                    clip: c2.clone(),
2937                    lineage: LineageContext::own_root(&c2),
2938                    path: d2.path.clone(),
2939                    format: AudioFormat::Flac,
2940                },
2941            ],
2942        };
2943        let http = ScriptedHttp::new()
2944            .with_auth()
2945            .route(
2946                "/wav_file/",
2947                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2948            )
2949            .route(".wav", Reply::ok(b"wav".to_vec()));
2950        let fs = MemFs::new();
2951        let mut manifest = Manifest::new();
2952
2953        let outcome = run(
2954            &plan,
2955            &mut manifest,
2956            &[d1, d2],
2957            &http,
2958            &fs,
2959            &StubFfmpeg::out_of_space(),
2960            &RecordingClock::new(),
2961            &ExecOptions::default(),
2962        );
2963
2964        assert_eq!(outcome.status, RunStatus::DiskFull);
2965        assert_eq!(outcome.failed(), 1);
2966        assert_eq!(outcome.failures[0].clip_id, "d1");
2967        assert!(outcome.failures[0].reason.contains("disk full"));
2968        assert_eq!(outcome.downloaded, 0);
2969    }
2970
2971    #[test]
2972    fn disk_full_artifact_write_aborts_the_run() {
2973        // A sidecar write (not a primary download) also aborts on a full disk:
2974        // the owning audio is present, the cover fetch succeeds, but the sidecar
2975        // cannot be written.
2976        let mut manifest = Manifest::new();
2977        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2978        let plan = Plan {
2979            actions: vec![Action::WriteArtifact {
2980                kind: ArtifactKind::CoverJpg,
2981                path: "a/cover.jpg".to_owned(),
2982                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2983                hash: "h1".to_owned(),
2984                owner_id: "a".to_owned(),
2985                content: None,
2986            }],
2987        };
2988        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2989        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
2990
2991        let outcome = run(
2992            &plan,
2993            &mut manifest,
2994            &[],
2995            &http,
2996            &fs,
2997            &StubFfmpeg::flac(),
2998            &RecordingClock::new(),
2999            &ExecOptions::default(),
3000        );
3001
3002        assert_eq!(outcome.status, RunStatus::DiskFull);
3003        assert_eq!(outcome.failed(), 1);
3004        assert!(outcome.failures[0].reason.contains("disk full"));
3005        assert_eq!(outcome.artifacts_written, 0);
3006        // The sidecar slot was never recorded: the write failed before it.
3007        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3008    }
3009
3010    #[test]
3011    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
3012        // write_verify fails before any manifest insert, so a re-download that
3013        // hits a full disk leaves the prior entry (and file) exactly as it was.
3014        let c = clip("m");
3015        let d = desired(c.clone(), AudioFormat::Mp3);
3016        let plan = Plan {
3017            actions: vec![Action::Download {
3018                clip: c.clone(),
3019                lineage: LineageContext::own_root(&c),
3020                path: d.path.clone(),
3021                format: AudioFormat::Mp3,
3022            }],
3023        };
3024        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
3025        let fs = MemFs::new()
3026            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
3027            .fail_write_out_of_space("m.mp3");
3028        let mut manifest = Manifest::new();
3029        let before = entry("m.mp3", AudioFormat::Mp3);
3030        manifest.insert("m", before.clone());
3031
3032        let outcome = run(
3033            &plan,
3034            &mut manifest,
3035            &[d],
3036            &http,
3037            &fs,
3038            &StubFfmpeg::flac(),
3039            &RecordingClock::new(),
3040            &ExecOptions::default(),
3041        );
3042
3043        assert_eq!(outcome.status, RunStatus::DiskFull);
3044        assert_eq!(manifest.get("m"), Some(&before));
3045        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
3046    }
3047
3048    #[test]
3049    fn cdn_download_rejection_skips_the_clip_without_aborting() {
3050        let c1 = clip("k1");
3051        let c2 = clip("k2");
3052        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3053        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3054        let plan = Plan {
3055            actions: vec![
3056                Action::Download {
3057                    clip: c1.clone(),
3058                    lineage: LineageContext::own_root(&c1),
3059                    path: d1.path.clone(),
3060                    format: AudioFormat::Mp3,
3061                },
3062                Action::Download {
3063                    clip: c2.clone(),
3064                    lineage: LineageContext::own_root(&c2),
3065                    path: d2.path.clone(),
3066                    format: AudioFormat::Mp3,
3067                },
3068            ],
3069        };
3070        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
3071        // rejection (often transient), not a bad token: the clip is retried
3072        // then recorded and skipped, and the run carries on to the rest.
3073        let http = ScriptedHttp::new()
3074            .route("k1.mp3", Reply::status(403))
3075            .route("k2.mp3", Reply::ok(b"body".to_vec()));
3076        let fs = MemFs::new();
3077        let mut manifest = Manifest::new();
3078
3079        let outcome = run(
3080            &plan,
3081            &mut manifest,
3082            &[d1, d2],
3083            &http,
3084            &fs,
3085            &StubFfmpeg::flac(),
3086            &RecordingClock::new(),
3087            &ExecOptions::default(),
3088        );
3089
3090        assert_ne!(outcome.status, RunStatus::AuthAborted);
3091        assert_eq!(outcome.downloaded, 1);
3092        assert_eq!(outcome.failed(), 1);
3093        assert_eq!(outcome.failures[0].clip_id, "k1");
3094    }
3095
3096    #[test]
3097    fn one_clip_failure_does_not_abort_the_run() {
3098        let c1 = clip("l1");
3099        let c2 = clip("l2");
3100        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3101        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3102        let plan = Plan {
3103            actions: vec![
3104                Action::Download {
3105                    clip: c1.clone(),
3106                    lineage: LineageContext::own_root(&c1),
3107                    path: d1.path.clone(),
3108                    format: AudioFormat::Mp3,
3109                },
3110                Action::Download {
3111                    clip: c2.clone(),
3112                    lineage: LineageContext::own_root(&c2),
3113                    path: d2.path.clone(),
3114                    format: AudioFormat::Mp3,
3115                },
3116            ],
3117        };
3118        let http = ScriptedHttp::new()
3119            .route("l1.mp3", Reply::status(404))
3120            .route("l2.mp3", Reply::ok(b"body".to_vec()));
3121        let fs = MemFs::new();
3122        let mut manifest = Manifest::new();
3123
3124        let outcome = run(
3125            &plan,
3126            &mut manifest,
3127            &[d1, d2],
3128            &http,
3129            &fs,
3130            &StubFfmpeg::flac(),
3131            &RecordingClock::new(),
3132            &ExecOptions::default(),
3133        );
3134
3135        assert_eq!(outcome.status, RunStatus::Completed);
3136        assert_eq!(outcome.downloaded, 1);
3137        assert_eq!(outcome.failed(), 1);
3138        assert_eq!(outcome.failures[0].clip_id, "l1");
3139        assert!(fs.exists("l2.mp3"));
3140        assert!(manifest.get("l2").is_some());
3141        assert!(manifest.get("l1").is_none());
3142    }
3143
3144    // ── preserve marker (SYNC-8) ────────────────────────────────────
3145
3146    #[test]
3147    fn preserve_is_set_for_copy_held_and_private_clips() {
3148        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
3149        mirror.modes = vec![SourceMode::Mirror];
3150        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
3151        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
3152        let mut private = desired(clip("m3"), AudioFormat::Mp3);
3153        private.private = true;
3154
3155        let plan = Plan {
3156            actions: vec![
3157                Action::Download {
3158                    clip: mirror.clip.clone(),
3159                    lineage: LineageContext::own_root(&mirror.clip),
3160                    path: mirror.path.clone(),
3161                    format: AudioFormat::Mp3,
3162                },
3163                Action::Download {
3164                    clip: copy_held.clip.clone(),
3165                    lineage: LineageContext::own_root(&copy_held.clip),
3166                    path: copy_held.path.clone(),
3167                    format: AudioFormat::Mp3,
3168                },
3169                Action::Download {
3170                    clip: private.clip.clone(),
3171                    lineage: LineageContext::own_root(&private.clip),
3172                    path: private.path.clone(),
3173                    format: AudioFormat::Mp3,
3174                },
3175            ],
3176        };
3177        let http = ScriptedHttp::new()
3178            .route("m1.mp3", Reply::ok(b"a".to_vec()))
3179            .route("m2.mp3", Reply::ok(b"b".to_vec()))
3180            .route("m3.mp3", Reply::ok(b"c".to_vec()));
3181        let fs = MemFs::new();
3182        let mut manifest = Manifest::new();
3183
3184        let outcome = run(
3185            &plan,
3186            &mut manifest,
3187            &[mirror, copy_held, private],
3188            &http,
3189            &fs,
3190            &StubFfmpeg::flac(),
3191            &RecordingClock::new(),
3192            &ExecOptions::default(),
3193        );
3194
3195        assert_eq!(outcome.downloaded, 3);
3196        assert!(!manifest.get("m1").unwrap().preserve);
3197        assert!(manifest.get("m2").unwrap().preserve);
3198        assert!(manifest.get("m3").unwrap().preserve);
3199    }
3200
3201    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
3202
3203    #[test]
3204    fn reformat_writes_new_format_and_removes_old_file() {
3205        let c = clip("n");
3206        let d = desired(c.clone(), AudioFormat::Mp3);
3207        let plan = Plan {
3208            actions: vec![Action::Reformat {
3209                clip: c.clone(),
3210                path: "n.mp3".to_owned(),
3211                from_path: "n.flac".to_owned(),
3212                from: AudioFormat::Flac,
3213                to: AudioFormat::Mp3,
3214            }],
3215        };
3216        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
3217        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
3218        let mut manifest = Manifest::new();
3219        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
3220
3221        let outcome = run(
3222            &plan,
3223            &mut manifest,
3224            &[d],
3225            &http,
3226            &fs,
3227            &StubFfmpeg::flac(),
3228            &RecordingClock::new(),
3229            &ExecOptions::default(),
3230        );
3231
3232        assert_eq!(outcome.reformatted, 1);
3233        assert!(fs.exists("n.mp3"));
3234        assert!(!fs.exists("n.flac"));
3235        let updated = manifest.get("n").unwrap();
3236        assert_eq!(updated.path, "n.mp3");
3237        assert_eq!(updated.format, AudioFormat::Mp3);
3238        assert_eq!(updated.meta_hash, "m");
3239    }
3240
3241    #[test]
3242    fn retag_rewrites_file_and_updates_hashes() {
3243        let c = clip("o");
3244        let mut d = desired(c.clone(), AudioFormat::Mp3);
3245        d.meta_hash = "new".to_owned();
3246        d.art_hash = "new-art".to_owned();
3247        let existing = tag_mp3(
3248            b"audio",
3249            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
3250            None,
3251            None,
3252        )
3253        .unwrap();
3254        let fs = MemFs::new().with_file("o.mp3", existing.clone());
3255        let mut manifest = Manifest::new();
3256        let mut start = entry("o.mp3", AudioFormat::Mp3);
3257        start.size = existing.len() as u64;
3258        manifest.insert("o", start);
3259        let plan = Plan {
3260            actions: vec![Action::Retag {
3261                clip: c.clone(),
3262                lineage: LineageContext::own_root(&c),
3263                path: "o.mp3".to_owned(),
3264            }],
3265        };
3266
3267        let outcome = run(
3268            &plan,
3269            &mut manifest,
3270            &[d],
3271            &ScriptedHttp::new(),
3272            &fs,
3273            &StubFfmpeg::flac(),
3274            &RecordingClock::new(),
3275            &ExecOptions::default(),
3276        );
3277
3278        assert_eq!(outcome.retagged, 1);
3279        let updated = manifest.get("o").unwrap();
3280        assert_eq!(updated.meta_hash, "new");
3281        assert_eq!(updated.art_hash, "new-art");
3282        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
3283    }
3284
3285    #[test]
3286    fn rename_moves_file_and_updates_manifest_path() {
3287        let c = clip("p");
3288        let mut d = desired(c.clone(), AudioFormat::Mp3);
3289        d.path = "new/p.mp3".to_owned();
3290        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3291        let mut manifest = Manifest::new();
3292        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3293        let plan = Plan {
3294            actions: vec![Action::Rename {
3295                from: "old/p.mp3".to_owned(),
3296                to: "new/p.mp3".to_owned(),
3297            }],
3298        };
3299
3300        let outcome = run(
3301            &plan,
3302            &mut manifest,
3303            &[d],
3304            &ScriptedHttp::new(),
3305            &fs,
3306            &StubFfmpeg::flac(),
3307            &RecordingClock::new(),
3308            &ExecOptions::default(),
3309        );
3310
3311        assert_eq!(outcome.renamed, 1);
3312        assert!(fs.exists("new/p.mp3"));
3313        assert!(!fs.exists("old/p.mp3"));
3314        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3315    }
3316
3317    #[test]
3318    fn disk_full_rename_aborts_the_run() {
3319        // A move onto a full disk is systemic like a full-disk write: the run
3320        // aborts with DiskFull and the source file is left untouched.
3321        let c = clip("p");
3322        let mut d = desired(c.clone(), AudioFormat::Mp3);
3323        d.path = "new/p.mp3".to_owned();
3324        let fs = MemFs::new()
3325            .with_file("old/p.mp3", b"DATA".to_vec())
3326            .fail_rename_out_of_space("new/p.mp3");
3327        let mut manifest = Manifest::new();
3328        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3329        let plan = Plan {
3330            actions: vec![Action::Rename {
3331                from: "old/p.mp3".to_owned(),
3332                to: "new/p.mp3".to_owned(),
3333            }],
3334        };
3335
3336        let outcome = run(
3337            &plan,
3338            &mut manifest,
3339            &[d],
3340            &ScriptedHttp::new(),
3341            &fs,
3342            &StubFfmpeg::flac(),
3343            &RecordingClock::new(),
3344            &ExecOptions::default(),
3345        );
3346
3347        assert_eq!(outcome.status, RunStatus::DiskFull);
3348        assert_eq!(outcome.renamed, 0);
3349        assert_eq!(outcome.failed(), 1);
3350        assert!(outcome.failures[0].reason.contains("disk full"));
3351        // The source is untouched: the move never happened.
3352        assert!(fs.exists("old/p.mp3"));
3353        assert!(!fs.exists("new/p.mp3"));
3354        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3355    }
3356
3357    #[test]
3358    fn delete_removes_file_and_manifest_entry() {
3359        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3360        let mut manifest = Manifest::new();
3361        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3362        let plan = Plan {
3363            actions: vec![Action::Delete {
3364                path: "q.mp3".to_owned(),
3365                clip_id: "q".to_owned(),
3366            }],
3367        };
3368
3369        let outcome = run(
3370            &plan,
3371            &mut manifest,
3372            &[],
3373            &ScriptedHttp::new(),
3374            &fs,
3375            &StubFfmpeg::flac(),
3376            &RecordingClock::new(),
3377            &ExecOptions::default(),
3378        );
3379
3380        assert_eq!(outcome.deleted, 1);
3381        assert!(!fs.exists("q.mp3"));
3382        assert!(manifest.get("q").is_none());
3383    }
3384
3385    #[test]
3386    fn failed_delete_keeps_the_manifest_entry() {
3387        let fs = MemFs::new()
3388            .with_file("s.mp3", b"DATA".to_vec())
3389            .fail_remove("s.mp3");
3390        let mut manifest = Manifest::new();
3391        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3392        let plan = Plan {
3393            actions: vec![Action::Delete {
3394                path: "s.mp3".to_owned(),
3395                clip_id: "s".to_owned(),
3396            }],
3397        };
3398
3399        let outcome = run(
3400            &plan,
3401            &mut manifest,
3402            &[],
3403            &ScriptedHttp::new(),
3404            &fs,
3405            &StubFfmpeg::flac(),
3406            &RecordingClock::new(),
3407            &ExecOptions::default(),
3408        );
3409
3410        assert_eq!(outcome.deleted, 0);
3411        assert_eq!(outcome.failed(), 1);
3412        assert!(manifest.get("s").is_some());
3413        assert!(fs.exists("s.mp3"));
3414    }
3415
3416    #[test]
3417    fn skip_is_a_noop() {
3418        let mut manifest = Manifest::new();
3419        let plan = Plan {
3420            actions: vec![Action::Skip {
3421                clip_id: "r".to_owned(),
3422            }],
3423        };
3424        let outcome = run(
3425            &plan,
3426            &mut manifest,
3427            &[],
3428            &ScriptedHttp::new(),
3429            &MemFs::new(),
3430            &StubFfmpeg::flac(),
3431            &RecordingClock::new(),
3432            &ExecOptions::default(),
3433        );
3434        assert_eq!(outcome.skipped, 1);
3435        assert_eq!(outcome.failed(), 0);
3436    }
3437
3438    // ── Pure helpers ────────────────────────────────────────────────
3439
3440    #[test]
3441    fn header_helpers_parse_or_ignore() {
3442        let resp = HttpResponse {
3443            status: 200,
3444            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3445            body: Vec::new(),
3446        };
3447        assert_eq!(content_length(&resp), Some(42));
3448
3449        let bare = HttpResponse {
3450            status: 200,
3451            headers: Vec::new(),
3452            body: Vec::new(),
3453        };
3454        assert_eq!(content_length(&bare), None);
3455    }
3456
3457    #[test]
3458    fn preserve_rule_covers_copy_and_private() {
3459        let base = desired(clip("x"), AudioFormat::Mp3);
3460        assert!(!preserve_for(&base));
3461        let mut copy_held = base.clone();
3462        copy_held.modes = vec![SourceMode::Copy];
3463        assert!(preserve_for(&copy_held));
3464        let mut private = base.clone();
3465        private.private = true;
3466        assert!(preserve_for(&private));
3467    }
3468
3469    fn fs_new() -> MemFs {
3470        MemFs::new()
3471    }
3472
3473    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3474
3475    #[test]
3476    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3477        let c = clip("s1");
3478        let mut d = desired(c.clone(), AudioFormat::Mp3);
3479        d.modes = vec![SourceMode::Copy];
3480        let plan = Plan {
3481            actions: vec![Action::Skip {
3482                clip_id: "s1".to_owned(),
3483            }],
3484        };
3485        let mut manifest = Manifest::new();
3486        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3487        assert!(!manifest.get("s1").unwrap().preserve);
3488
3489        let outcome = run(
3490            &plan,
3491            &mut manifest,
3492            &[d],
3493            &ScriptedHttp::new(),
3494            &fs_new(),
3495            &StubFfmpeg::flac(),
3496            &RecordingClock::new(),
3497            &ExecOptions::default(),
3498        );
3499
3500        assert_eq!(outcome.skipped, 1);
3501        assert!(
3502            manifest.get("s1").unwrap().preserve,
3503            "a copy-held skip must mark the entry preserved"
3504        );
3505    }
3506
3507    #[test]
3508    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3509        let c = clip("s2");
3510        let d = desired(c.clone(), AudioFormat::Mp3);
3511        let plan = Plan {
3512            actions: vec![Action::Skip {
3513                clip_id: "s2".to_owned(),
3514            }],
3515        };
3516        let mut manifest = Manifest::new();
3517        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3518        stale.preserve = true;
3519        manifest.insert("s2".to_owned(), stale);
3520
3521        run(
3522            &plan,
3523            &mut manifest,
3524            &[d],
3525            &ScriptedHttp::new(),
3526            &fs_new(),
3527            &StubFfmpeg::flac(),
3528            &RecordingClock::new(),
3529            &ExecOptions::default(),
3530        );
3531
3532        assert!(
3533            !manifest.get("s2").unwrap().preserve,
3534            "a mirror-only skip must clear a stale preserve marker"
3535        );
3536    }
3537
3538    #[test]
3539    fn flac_render_retries_a_rate_limited_wav_lookup() {
3540        let c = clip("rl");
3541        let d = desired(c.clone(), AudioFormat::Flac);
3542        let plan = Plan {
3543            actions: vec![Action::Download {
3544                clip: c.clone(),
3545                lineage: LineageContext::own_root(&c),
3546                path: d.path.clone(),
3547                format: AudioFormat::Flac,
3548            }],
3549        };
3550        let http = ScriptedHttp::new()
3551            .with_auth()
3552            .route_seq(
3553                "/wav_file/",
3554                vec![
3555                    Reply::status(429),
3556                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3557                ],
3558            )
3559            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3560        let clock = RecordingClock::new();
3561        let mut manifest = Manifest::new();
3562
3563        let outcome = run(
3564            &plan,
3565            &mut manifest,
3566            &[d],
3567            &http,
3568            &fs_new(),
3569            &StubFfmpeg::flac(),
3570            &clock,
3571            &small_poll(),
3572        );
3573
3574        assert_eq!(outcome.downloaded, 1);
3575        assert_eq!(outcome.failed(), 0);
3576        // The render was ready on retry, so no fresh convert_wav was needed.
3577        assert_eq!(http.count("/convert_wav/"), 0);
3578        // One transient backoff (1s base), not the 5s poll interval.
3579        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3580    }
3581
3582    // ── Phase 6: artifact actions ───────────────────────────────────
3583
3584    #[test]
3585    fn write_artifact_fetches_writes_and_updates_manifest() {
3586        // The owning entry exists (its audio was kept this run); WriteArtifact
3587        // fetches the source, writes the sidecar, and records it on the entry.
3588        let mut manifest = Manifest::new();
3589        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3590        let plan = Plan {
3591            actions: vec![Action::WriteArtifact {
3592                kind: ArtifactKind::CoverJpg,
3593                path: "a/cover.jpg".to_owned(),
3594                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3595                hash: "h1".to_owned(),
3596                owner_id: "a".to_owned(),
3597                content: None,
3598            }],
3599        };
3600        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3601        let fs = MemFs::new();
3602
3603        let outcome = run(
3604            &plan,
3605            &mut manifest,
3606            &[],
3607            &http,
3608            &fs,
3609            &StubFfmpeg::flac(),
3610            &RecordingClock::new(),
3611            &ExecOptions::default(),
3612        );
3613
3614        assert_eq!(outcome.artifacts_written, 1);
3615        assert_eq!(outcome.failed(), 0);
3616        assert_eq!(outcome.status, RunStatus::Completed);
3617        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3618        assert_eq!(
3619            manifest.get("a").unwrap().cover_jpg,
3620            Some(ArtifactState {
3621                path: "a/cover.jpg".to_owned(),
3622                hash: "h1".to_owned(),
3623            })
3624        );
3625    }
3626
3627    #[test]
3628    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3629        // A generated text sidecar carries its body inline, so it is written
3630        // verbatim with NO HTTP fetch and the details slot records its state.
3631        let mut manifest = Manifest::new();
3632        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3633        let plan = Plan {
3634            actions: vec![Action::WriteArtifact {
3635                kind: ArtifactKind::DetailsTxt,
3636                path: "a.details.txt".to_owned(),
3637                source_url: String::new(),
3638                hash: "dh".to_owned(),
3639                owner_id: "a".to_owned(),
3640                content: Some("Title: A\n".to_owned()),
3641            }],
3642        };
3643        // An empty HTTP script: any fetch would fail, proving none happens.
3644        let http = ScriptedHttp::new();
3645        let fs = MemFs::new();
3646
3647        let outcome = run(
3648            &plan,
3649            &mut manifest,
3650            &[],
3651            &http,
3652            &fs,
3653            &StubFfmpeg::flac(),
3654            &RecordingClock::new(),
3655            &ExecOptions::default(),
3656        );
3657
3658        assert_eq!(outcome.artifacts_written, 1);
3659        assert_eq!(outcome.failed(), 0);
3660        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3661        assert_eq!(
3662            manifest.get("a").unwrap().details_txt,
3663            Some(ArtifactState {
3664                path: "a.details.txt".to_owned(),
3665                hash: "dh".to_owned(),
3666            })
3667        );
3668    }
3669
3670    #[test]
3671    fn write_lyrics_sidecar_relocation_removes_old_file() {
3672        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3673        // the executor writes the new file and prunes the stale one.
3674        let mut manifest = Manifest::new();
3675        let mut e = entry("old/a.flac", AudioFormat::Flac);
3676        e.lyrics_txt = Some(ArtifactState {
3677            path: "old/a.lyrics.txt".to_owned(),
3678            hash: "lh".to_owned(),
3679        });
3680        manifest.insert("a", e);
3681        let fs = MemFs::new()
3682            .with_file("old/a.flac", b"AUDIO".to_vec())
3683            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3684        let plan = Plan {
3685            actions: vec![Action::WriteArtifact {
3686                kind: ArtifactKind::LyricsTxt,
3687                path: "new/a.lyrics.txt".to_owned(),
3688                source_url: String::new(),
3689                hash: "lh".to_owned(),
3690                owner_id: "a".to_owned(),
3691                content: Some("new words\n".to_owned()),
3692            }],
3693        };
3694
3695        let outcome = run(
3696            &plan,
3697            &mut manifest,
3698            &[],
3699            &ScriptedHttp::new(),
3700            &fs,
3701            &StubFfmpeg::flac(),
3702            &RecordingClock::new(),
3703            &ExecOptions::default(),
3704        );
3705
3706        assert_eq!(outcome.failed(), 0);
3707        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3708        assert!(!fs.exists("old/a.lyrics.txt"));
3709        assert_eq!(
3710            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3711            "new/a.lyrics.txt"
3712        );
3713    }
3714
3715    #[test]
3716    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3717        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3718        // Each write's inline old-path cleanup must skip a path another action
3719        // writes this run, or the second write would delete the first's freshly
3720        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3721        // for every sidecar, including the .mp4 video.
3722        let mut manifest = Manifest::new();
3723        let mut a = entry("a.flac", AudioFormat::Flac);
3724        a.lyrics_txt = Some(ArtifactState {
3725            path: "x.lyrics.txt".to_owned(),
3726            hash: "ah".to_owned(),
3727        });
3728        manifest.insert("a", a);
3729        let mut b = entry("b.flac", AudioFormat::Flac);
3730        b.lyrics_txt = Some(ArtifactState {
3731            path: "y.lyrics.txt".to_owned(),
3732            hash: "bh".to_owned(),
3733        });
3734        manifest.insert("b", b);
3735        let fs = MemFs::new()
3736            .with_file("a.flac", b"A".to_vec())
3737            .with_file("b.flac", b"B".to_vec())
3738            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3739            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3740        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3741        let plan = Plan {
3742            actions: vec![
3743                Action::WriteArtifact {
3744                    kind: ArtifactKind::LyricsTxt,
3745                    path: "y.lyrics.txt".to_owned(),
3746                    source_url: String::new(),
3747                    hash: "ah".to_owned(),
3748                    owner_id: "a".to_owned(),
3749                    content: Some("A words\n".to_owned()),
3750                },
3751                Action::WriteArtifact {
3752                    kind: ArtifactKind::LyricsTxt,
3753                    path: "x.lyrics.txt".to_owned(),
3754                    source_url: String::new(),
3755                    hash: "bh".to_owned(),
3756                    owner_id: "b".to_owned(),
3757                    content: Some("B words\n".to_owned()),
3758                },
3759            ],
3760        };
3761
3762        let outcome = run(
3763            &plan,
3764            &mut manifest,
3765            &[],
3766            &ScriptedHttp::new(),
3767            &fs,
3768            &StubFfmpeg::flac(),
3769            &RecordingClock::new(),
3770            &ExecOptions::default(),
3771        );
3772
3773        assert_eq!(outcome.failed(), 0);
3774        // Both freshly written files survive; neither cleanup clobbered the other.
3775        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3776        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3777        assert_eq!(
3778            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3779            "y.lyrics.txt"
3780        );
3781        assert_eq!(
3782            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3783            "x.lyrics.txt"
3784        );
3785    }
3786
3787    #[test]
3788    fn old_sidecar_kept_when_another_clip_still_references_it() {
3789        // A prior failed swap can leave two clips pointing at one path (A -> y and
3790        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3791        // still A's live file (#76). tracked_paths counts two references to y, so
3792        // the removal is skipped even though y is not a write target this run.
3793        let mut manifest = Manifest::new();
3794        let mut a = entry("a.flac", AudioFormat::Flac);
3795        a.lyrics_txt = Some(ArtifactState {
3796            path: "y.lyrics.txt".to_owned(),
3797            hash: "ah".to_owned(),
3798        });
3799        manifest.insert("a", a);
3800        let mut b = entry("b.flac", AudioFormat::Flac);
3801        b.lyrics_txt = Some(ArtifactState {
3802            path: "y.lyrics.txt".to_owned(),
3803            hash: "bh".to_owned(),
3804        });
3805        manifest.insert("b", b);
3806        let fs = MemFs::new()
3807            .with_file("a.flac", b"A".to_vec())
3808            .with_file("b.flac", b"B".to_vec())
3809            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3810        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3811        // the tracked-reference count is what protects A's file.
3812        let plan = Plan {
3813            actions: vec![Action::WriteArtifact {
3814                kind: ArtifactKind::LyricsTxt,
3815                path: "x.lyrics.txt".to_owned(),
3816                source_url: String::new(),
3817                hash: "bh".to_owned(),
3818                owner_id: "b".to_owned(),
3819                content: Some("B words\n".to_owned()),
3820            }],
3821        };
3822
3823        let outcome = run(
3824            &plan,
3825            &mut manifest,
3826            &[],
3827            &ScriptedHttp::new(),
3828            &fs,
3829            &StubFfmpeg::flac(),
3830            &RecordingClock::new(),
3831            &ExecOptions::default(),
3832        );
3833
3834        assert_eq!(outcome.failed(), 0);
3835        assert!(
3836            fs.exists("y.lyrics.txt"),
3837            "A's live sidecar must not be deleted"
3838        );
3839        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3840    }
3841
3842    #[test]
3843    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3844        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3845        // When BOTH move away this run, the path is no longer live, so the last
3846        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3847        // still referenced. The dynamic reference count drops to zero only after
3848        // both moves, so exactly the final cleanup removes it (#76).
3849        let mut manifest = Manifest::new();
3850        let mut a = entry("a.flac", AudioFormat::Flac);
3851        a.lyrics_txt = Some(ArtifactState {
3852            path: "s.lyrics.txt".to_owned(),
3853            hash: "ah".to_owned(),
3854        });
3855        manifest.insert("a", a);
3856        let mut b = entry("b.flac", AudioFormat::Flac);
3857        b.lyrics_txt = Some(ArtifactState {
3858            path: "s.lyrics.txt".to_owned(),
3859            hash: "bh".to_owned(),
3860        });
3861        manifest.insert("b", b);
3862        let fs = MemFs::new()
3863            .with_file("a.flac", b"A".to_vec())
3864            .with_file("b.flac", b"B".to_vec())
3865            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3866        let plan = Plan {
3867            actions: vec![
3868                Action::WriteArtifact {
3869                    kind: ArtifactKind::LyricsTxt,
3870                    path: "pa.lyrics.txt".to_owned(),
3871                    source_url: String::new(),
3872                    hash: "ah".to_owned(),
3873                    owner_id: "a".to_owned(),
3874                    content: Some("A words\n".to_owned()),
3875                },
3876                Action::WriteArtifact {
3877                    kind: ArtifactKind::LyricsTxt,
3878                    path: "pb.lyrics.txt".to_owned(),
3879                    source_url: String::new(),
3880                    hash: "bh".to_owned(),
3881                    owner_id: "b".to_owned(),
3882                    content: Some("B words\n".to_owned()),
3883                },
3884            ],
3885        };
3886
3887        let outcome = run(
3888            &plan,
3889            &mut manifest,
3890            &[],
3891            &ScriptedHttp::new(),
3892            &fs,
3893            &StubFfmpeg::flac(),
3894            &RecordingClock::new(),
3895            &ExecOptions::default(),
3896        );
3897
3898        assert_eq!(outcome.failed(), 0);
3899        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3900        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3901        assert!(
3902            !fs.exists("s.lyrics.txt"),
3903            "the vacated shared path must be reclaimed, not orphaned"
3904        );
3905    }
3906
3907    #[test]
3908    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3909        // A text sidecar for a clip with no manifest entry (its audio download
3910        // failed) must be skipped, never writing an untracked file.
3911        let plan = Plan {
3912            actions: vec![Action::WriteArtifact {
3913                kind: ArtifactKind::DetailsTxt,
3914                path: "gone.details.txt".to_owned(),
3915                source_url: String::new(),
3916                hash: "dh".to_owned(),
3917                owner_id: "gone".to_owned(),
3918                content: Some("Title: Gone\n".to_owned()),
3919            }],
3920        };
3921        let fs = MemFs::new();
3922        let mut manifest = Manifest::new();
3923
3924        let outcome = run(
3925            &plan,
3926            &mut manifest,
3927            &[],
3928            &ScriptedHttp::new(),
3929            &fs,
3930            &StubFfmpeg::flac(),
3931            &RecordingClock::new(),
3932            &ExecOptions::default(),
3933        );
3934
3935        assert_eq!(outcome.artifacts_written, 0);
3936        assert_eq!(outcome.skipped, 1);
3937        assert!(!fs.exists("gone.details.txt"));
3938        assert!(manifest.get("gone").is_none());
3939    }
3940
3941    #[test]
3942    fn delete_artifact_removes_file_and_clears_slot() {
3943        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3944        let mut manifest = Manifest::new();
3945        let mut e = entry("a.mp3", AudioFormat::Mp3);
3946        e.cover_jpg = Some(ArtifactState {
3947            path: "a/cover.jpg".to_owned(),
3948            hash: "h1".to_owned(),
3949        });
3950        manifest.insert("a", e);
3951        let plan = Plan {
3952            actions: vec![Action::DeleteArtifact {
3953                kind: ArtifactKind::CoverJpg,
3954                path: "a/cover.jpg".to_owned(),
3955                owner_id: "a".to_owned(),
3956            }],
3957        };
3958
3959        let outcome = run(
3960            &plan,
3961            &mut manifest,
3962            &[],
3963            &ScriptedHttp::new(),
3964            &fs,
3965            &StubFfmpeg::flac(),
3966            &RecordingClock::new(),
3967            &ExecOptions::default(),
3968        );
3969
3970        assert_eq!(outcome.artifacts_deleted, 1);
3971        assert!(!fs.exists("a/cover.jpg"));
3972        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3973    }
3974
3975    #[test]
3976    fn delete_artifact_tolerates_already_absent_file() {
3977        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3978        // is not a failure.
3979        let mut manifest = Manifest::new();
3980        let mut e = entry("a.mp3", AudioFormat::Mp3);
3981        e.cover_jpg = Some(ArtifactState {
3982            path: "a/cover.jpg".to_owned(),
3983            hash: "h1".to_owned(),
3984        });
3985        manifest.insert("a", e);
3986        let plan = Plan {
3987            actions: vec![Action::DeleteArtifact {
3988                kind: ArtifactKind::CoverJpg,
3989                path: "a/cover.jpg".to_owned(),
3990                owner_id: "a".to_owned(),
3991            }],
3992        };
3993
3994        let outcome = run(
3995            &plan,
3996            &mut manifest,
3997            &[],
3998            &ScriptedHttp::new(),
3999            &MemFs::new(),
4000            &StubFfmpeg::flac(),
4001            &RecordingClock::new(),
4002            &ExecOptions::default(),
4003        );
4004
4005        assert_eq!(outcome.artifacts_deleted, 1);
4006        assert_eq!(outcome.failed(), 0);
4007        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4008    }
4009
4010    #[test]
4011    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
4012        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
4013        // the run continues and the following WriteArtifact still succeeds.
4014        let mut manifest = Manifest::new();
4015        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4016        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4017        let plan = Plan {
4018            actions: vec![
4019                Action::WriteArtifact {
4020                    kind: ArtifactKind::CoverJpg,
4021                    path: "a/cover.jpg".to_owned(),
4022                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4023                    hash: "h1".to_owned(),
4024                    owner_id: "a".to_owned(),
4025                    content: None,
4026                },
4027                Action::WriteArtifact {
4028                    kind: ArtifactKind::CoverJpg,
4029                    path: "b/cover.jpg".to_owned(),
4030                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4031                    hash: "h2".to_owned(),
4032                    owner_id: "b".to_owned(),
4033                    content: None,
4034                },
4035            ],
4036        };
4037        let http = ScriptedHttp::new()
4038            .route("a/large.jpg", Reply::status(404))
4039            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4040        let fs = MemFs::new();
4041
4042        let outcome = run(
4043            &plan,
4044            &mut manifest,
4045            &[],
4046            &http,
4047            &fs,
4048            &StubFfmpeg::flac(),
4049            &RecordingClock::new(),
4050            &ExecOptions::default(),
4051        );
4052
4053        assert_eq!(outcome.status, RunStatus::Completed);
4054        assert_eq!(outcome.failed(), 1);
4055        assert_eq!(outcome.failures[0].clip_id, "a");
4056        assert_eq!(outcome.artifacts_written, 1);
4057        // The failed sidecar left no file and no manifest record.
4058        assert!(!fs.exists("a/cover.jpg"));
4059        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4060        // The following sidecar was written and recorded.
4061        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4062        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4063    }
4064
4065    #[test]
4066    fn stranded_old_sidecar_removed_when_colliding_writer_fails() {
4067        // #142: clip A moves its cover shared -> a/cover.jpg (fetch succeeds);
4068        // clip B is planned to write the vacated `shared` path but its fetch
4069        // fails. The old-path cleanup is gated on COMMITTED writes, not planned
4070        // ones, so B's failed write no longer protects the stale file: A's old
4071        // `shared` copy is removed rather than left as an untracked orphan.
4072        let mut manifest = Manifest::new();
4073        let mut a = entry("a.mp3", AudioFormat::Mp3);
4074        a.cover_jpg = Some(ArtifactState {
4075            path: "shared/cover.jpg".to_owned(),
4076            hash: "ha".to_owned(),
4077        });
4078        manifest.insert("a", a);
4079        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4080        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4081        let plan = Plan {
4082            actions: vec![
4083                Action::WriteArtifact {
4084                    kind: ArtifactKind::CoverJpg,
4085                    path: "a/cover.jpg".to_owned(),
4086                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4087                    hash: "ha".to_owned(),
4088                    owner_id: "a".to_owned(),
4089                    content: None,
4090                },
4091                Action::WriteArtifact {
4092                    kind: ArtifactKind::CoverJpg,
4093                    path: "shared/cover.jpg".to_owned(),
4094                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4095                    hash: "hb".to_owned(),
4096                    owner_id: "b".to_owned(),
4097                    content: None,
4098                },
4099            ],
4100        };
4101        let http = ScriptedHttp::new()
4102            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4103            .route("b/large.jpg", Reply::status(404));
4104
4105        let outcome = run(
4106            &plan,
4107            &mut manifest,
4108            &[],
4109            &http,
4110            &fs,
4111            &StubFfmpeg::flac(),
4112            &RecordingClock::new(),
4113            &ExecOptions::default(),
4114        );
4115
4116        assert_eq!(outcome.failed(), 1);
4117        assert_eq!(outcome.failures[0].clip_id, "b");
4118        // A's move committed; the vacated file is gone, not an orphan.
4119        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4120        assert!(
4121            !fs.exists("shared/cover.jpg"),
4122            "the vacated file must be removed once the colliding writer failed"
4123        );
4124        assert_eq!(
4125            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4126            "a/cover.jpg"
4127        );
4128    }
4129
4130    #[test]
4131    fn committed_write_at_old_path_is_preserved() {
4132        // #142: clip B writes `shared` and commits BEFORE clip A vacates it
4133        // (A moves shared -> a/cover.jpg). A's cleanup sees `shared` in the
4134        // committed set and keeps B's freshly written file rather than deleting
4135        // it. This is the successful-collision case the guard must still protect.
4136        let mut manifest = Manifest::new();
4137        let mut a = entry("a.mp3", AudioFormat::Mp3);
4138        a.cover_jpg = Some(ArtifactState {
4139            path: "shared/cover.jpg".to_owned(),
4140            hash: "ha".to_owned(),
4141        });
4142        manifest.insert("a", a);
4143        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4144        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4145        let plan = Plan {
4146            actions: vec![
4147                Action::WriteArtifact {
4148                    kind: ArtifactKind::CoverJpg,
4149                    path: "shared/cover.jpg".to_owned(),
4150                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4151                    hash: "hb".to_owned(),
4152                    owner_id: "b".to_owned(),
4153                    content: None,
4154                },
4155                Action::WriteArtifact {
4156                    kind: ArtifactKind::CoverJpg,
4157                    path: "a/cover.jpg".to_owned(),
4158                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4159                    hash: "ha".to_owned(),
4160                    owner_id: "a".to_owned(),
4161                    content: None,
4162                },
4163            ],
4164        };
4165        let http = ScriptedHttp::new()
4166            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()))
4167            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()));
4168
4169        let outcome = run(
4170            &plan,
4171            &mut manifest,
4172            &[],
4173            &http,
4174            &fs,
4175            &StubFfmpeg::flac(),
4176            &RecordingClock::new(),
4177            &ExecOptions::default(),
4178        );
4179
4180        assert_eq!(outcome.failed(), 0);
4181        // B's committed write survives A's subsequent move; both files are present.
4182        assert_eq!(fs.read_file("shared/cover.jpg").unwrap(), b"jpg-b");
4183        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4184        assert_eq!(
4185            manifest.get("b").unwrap().cover_jpg.as_ref().unwrap().path,
4186            "shared/cover.jpg"
4187        );
4188        assert_eq!(
4189            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4190            "a/cover.jpg"
4191        );
4192    }
4193
4194    #[test]
4195    fn cover_move_renames_without_fetching() {
4196        // #141: a MoveArtifact relocates the cover with a local rename. The
4197        // ScriptedHttp has no route, so any fetch would fail the run; a clean
4198        // outcome proves the bytes were renamed, not re-downloaded.
4199        let mut manifest = Manifest::new();
4200        let mut e = entry("a.mp3", AudioFormat::Mp3);
4201        e.cover_jpg = Some(ArtifactState {
4202            path: "old/cover.jpg".to_owned(),
4203            hash: "h".to_owned(),
4204        });
4205        manifest.insert("a", e);
4206        let fs = MemFs::new().with_file("old/cover.jpg", b"JPGBYTES".to_vec());
4207        let plan = Plan {
4208            actions: vec![Action::MoveArtifact {
4209                kind: ArtifactKind::CoverJpg,
4210                from: "old/cover.jpg".to_owned(),
4211                to: "new/cover.jpg".to_owned(),
4212                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4213                hash: "h".to_owned(),
4214                owner_id: "a".to_owned(),
4215            }],
4216        };
4217
4218        let outcome = run(
4219            &plan,
4220            &mut manifest,
4221            &[],
4222            &ScriptedHttp::new(),
4223            &fs,
4224            &StubFfmpeg::flac(),
4225            &RecordingClock::new(),
4226            &ExecOptions::default(),
4227        );
4228
4229        assert_eq!(outcome.failed(), 0);
4230        assert_eq!(outcome.renamed, 1, "counted as a rename, not a write");
4231        // Renamed in place: the new path carries the ORIGINAL bytes, old is gone.
4232        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"JPGBYTES");
4233        assert!(!fs.exists("old/cover.jpg"));
4234        assert_eq!(
4235            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4236            "new/cover.jpg"
4237        );
4238    }
4239
4240    #[test]
4241    fn cover_move_falls_back_to_fetch_when_old_file_missing() {
4242        // #141: the old file vanished before commit, so the rename fails and the
4243        // executor fetches fresh bytes at the new path rather than failing.
4244        let mut manifest = Manifest::new();
4245        let mut e = entry("a.mp3", AudioFormat::Mp3);
4246        e.cover_jpg = Some(ArtifactState {
4247            path: "old/cover.jpg".to_owned(),
4248            hash: "h".to_owned(),
4249        });
4250        manifest.insert("a", e);
4251        let fs = MemFs::new(); // old/cover.jpg is absent.
4252        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED".to_vec()));
4253        let plan = Plan {
4254            actions: vec![Action::MoveArtifact {
4255                kind: ArtifactKind::CoverJpg,
4256                from: "old/cover.jpg".to_owned(),
4257                to: "new/cover.jpg".to_owned(),
4258                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4259                hash: "h".to_owned(),
4260                owner_id: "a".to_owned(),
4261            }],
4262        };
4263
4264        let outcome = run(
4265            &plan,
4266            &mut manifest,
4267            &[],
4268            &http,
4269            &fs,
4270            &StubFfmpeg::flac(),
4271            &RecordingClock::new(),
4272            &ExecOptions::default(),
4273        );
4274
4275        assert_eq!(outcome.failed(), 0);
4276        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"FETCHED");
4277        assert_eq!(
4278            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4279            "new/cover.jpg"
4280        );
4281    }
4282
4283    #[test]
4284    fn cover_move_falls_back_when_source_co_referenced() {
4285        // Two clips' covers share old/cover.jpg after a prior failed swap. A move
4286        // for `a` must NOT rename the shared file away (that would strand `b`); it
4287        // falls back to a fetch, and `b`'s file survives.
4288        let mut manifest = Manifest::new();
4289        let mut a = entry("a.mp3", AudioFormat::Mp3);
4290        a.cover_jpg = Some(ArtifactState {
4291            path: "old/cover.jpg".to_owned(),
4292            hash: "h".to_owned(),
4293        });
4294        manifest.insert("a", a);
4295        let mut b = entry("b.mp3", AudioFormat::Mp3);
4296        b.cover_jpg = Some(ArtifactState {
4297            path: "old/cover.jpg".to_owned(),
4298            hash: "h".to_owned(),
4299        });
4300        manifest.insert("b", b);
4301        let fs = MemFs::new().with_file("old/cover.jpg", b"SHARED".to_vec());
4302        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED-A".to_vec()));
4303        // Only `a` moves this run: old/cover.jpg -> a/cover.jpg.
4304        let plan = Plan {
4305            actions: vec![Action::MoveArtifact {
4306                kind: ArtifactKind::CoverJpg,
4307                from: "old/cover.jpg".to_owned(),
4308                to: "a/cover.jpg".to_owned(),
4309                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4310                hash: "h".to_owned(),
4311                owner_id: "a".to_owned(),
4312            }],
4313        };
4314
4315        let outcome = run(
4316            &plan,
4317            &mut manifest,
4318            &[],
4319            &http,
4320            &fs,
4321            &StubFfmpeg::flac(),
4322            &RecordingClock::new(),
4323            &ExecOptions::default(),
4324        );
4325
4326        assert_eq!(outcome.failed(), 0);
4327        // `a` got a fresh fetched copy; `b`'s shared file is untouched.
4328        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"FETCHED-A");
4329        assert_eq!(
4330            fs.read_file("old/cover.jpg").unwrap(),
4331            b"SHARED",
4332            "the co-referenced file must survive"
4333        );
4334    }
4335
4336    #[test]
4337    fn stem_move_renames_without_refetch() {
4338        // #141: a MoveStem relocates the raw stem with a rename; no route is set,
4339        // so a clean outcome proves it did not re-render or re-fetch.
4340        let mut manifest = Manifest::new();
4341        let mut e = entry("a.flac", AudioFormat::Flac);
4342        e.stems.insert(
4343            "voc".to_owned(),
4344            ArtifactState {
4345                path: "old.stems/voc.mp3".to_owned(),
4346                hash: "h1".to_owned(),
4347            },
4348        );
4349        manifest.insert("a", e);
4350        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"STEMBYTES".to_vec());
4351        let plan = Plan {
4352            actions: vec![Action::MoveStem {
4353                clip_id: "a".to_owned(),
4354                key: "voc".to_owned(),
4355                stem_id: "voc".to_owned(),
4356                from: "old.stems/voc.mp3".to_owned(),
4357                to: "new.stems/voc.mp3".to_owned(),
4358                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4359                format: StemFormat::Mp3,
4360                hash: "h1".to_owned(),
4361            }],
4362        };
4363
4364        let outcome = run(
4365            &plan,
4366            &mut manifest,
4367            &[],
4368            &ScriptedHttp::new(),
4369            &fs,
4370            &StubFfmpeg::flac(),
4371            &RecordingClock::new(),
4372            &ExecOptions::default(),
4373        );
4374
4375        assert_eq!(outcome.failed(), 0);
4376        assert_eq!(outcome.renamed, 1);
4377        assert_eq!(fs.read_file("new.stems/voc.mp3").unwrap(), b"STEMBYTES");
4378        assert!(!fs.exists("old.stems/voc.mp3"));
4379        assert_eq!(
4380            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4381            "new.stems/voc.mp3"
4382        );
4383    }
4384
4385    #[test]
4386    fn stem_move_falls_back_to_fetch_when_source_co_referenced() {
4387        // Two clips' stems share shared.stems/voc.mp3 after a partially-failed
4388        // swap (the file holds `a`'s bytes). When `b` moves it, move_stem must NOT
4389        // rename the shared file under `b`'s hash (that records `a`'s bytes as
4390        // `b`'s); it falls back to a fetch of `b`'s correct bytes.
4391        let mut manifest = Manifest::new();
4392        let mut a = entry("a.flac", AudioFormat::Flac);
4393        a.stems.insert(
4394            "voc".to_owned(),
4395            ArtifactState {
4396                path: "shared.stems/voc.mp3".to_owned(),
4397                hash: "h".to_owned(),
4398            },
4399        );
4400        manifest.insert("a", a);
4401        let mut b = entry("b.flac", AudioFormat::Flac);
4402        b.stems.insert(
4403            "voc".to_owned(),
4404            ArtifactState {
4405                path: "shared.stems/voc.mp3".to_owned(),
4406                hash: "h".to_owned(),
4407            },
4408        );
4409        manifest.insert("b", b);
4410        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4411        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4412        let plan = Plan {
4413            actions: vec![Action::MoveStem {
4414                clip_id: "b".to_owned(),
4415                key: "voc".to_owned(),
4416                stem_id: "bvoc".to_owned(),
4417                from: "shared.stems/voc.mp3".to_owned(),
4418                to: "b.stems/voc.mp3".to_owned(),
4419                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4420                format: StemFormat::Mp3,
4421                hash: "h".to_owned(),
4422            }],
4423        };
4424
4425        let outcome = run(
4426            &plan,
4427            &mut manifest,
4428            &[],
4429            &http,
4430            &fs,
4431            &StubFfmpeg::flac(),
4432            &RecordingClock::new(),
4433            &ExecOptions::default(),
4434        );
4435
4436        assert_eq!(outcome.failed(), 0);
4437        // b's new stem carries b's freshly fetched bytes, never a's renamed bytes.
4438        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4439    }
4440
4441    #[test]
4442    fn co_delete_executes_audio_delete_then_artifact_delete() {
4443        // The plan orders the audio Delete before its sidecar DeleteArtifact.
4444        // The audio delete removes the manifest entry; the sidecar delete then
4445        // removes the file and tolerates the now-absent entry.
4446        let fs = MemFs::new()
4447            .with_file("gone.mp3", b"DATA".to_vec())
4448            .with_file("gone/cover.jpg", b"jpg".to_vec());
4449        let mut manifest = Manifest::new();
4450        let mut e = entry("gone.mp3", AudioFormat::Mp3);
4451        e.cover_jpg = Some(ArtifactState {
4452            path: "gone/cover.jpg".to_owned(),
4453            hash: "h1".to_owned(),
4454        });
4455        manifest.insert("gone", e);
4456        let plan = Plan {
4457            actions: vec![
4458                Action::Delete {
4459                    path: "gone.mp3".to_owned(),
4460                    clip_id: "gone".to_owned(),
4461                },
4462                Action::DeleteArtifact {
4463                    kind: ArtifactKind::CoverJpg,
4464                    path: "gone/cover.jpg".to_owned(),
4465                    owner_id: "gone".to_owned(),
4466                },
4467            ],
4468        };
4469
4470        let outcome = run(
4471            &plan,
4472            &mut manifest,
4473            &[],
4474            &ScriptedHttp::new(),
4475            &fs,
4476            &StubFfmpeg::flac(),
4477            &RecordingClock::new(),
4478            &ExecOptions::default(),
4479        );
4480
4481        assert_eq!(outcome.deleted, 1);
4482        assert_eq!(outcome.artifacts_deleted, 1);
4483        assert_eq!(outcome.failed(), 0);
4484        assert!(!fs.exists("gone.mp3"));
4485        assert!(!fs.exists("gone/cover.jpg"));
4486        assert!(manifest.get("gone").is_none());
4487    }
4488
4489    #[test]
4490    fn write_stem_mp3_stores_raw_and_records_slot() {
4491        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
4492        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
4493        // keyed slot records the path and hash.
4494        let mut manifest = Manifest::new();
4495        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4496        let plan = Plan {
4497            actions: vec![Action::WriteStem {
4498                clip_id: "a".to_owned(),
4499                key: "voc".to_owned(),
4500                stem_id: "voc".to_owned(),
4501                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4502                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4503                format: StemFormat::Mp3,
4504                hash: "vh".to_owned(),
4505            }],
4506        };
4507        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
4508        let fs = MemFs::new();
4509
4510        let outcome = run(
4511            &plan,
4512            &mut manifest,
4513            &[],
4514            &http,
4515            &fs,
4516            &StubFfmpeg::flac(),
4517            &RecordingClock::new(),
4518            &ExecOptions::default(),
4519        );
4520
4521        assert_eq!(outcome.artifacts_written, 1);
4522        assert_eq!(outcome.failed(), 0);
4523        // Bytes are stored exactly as delivered (no transcode applied).
4524        assert_eq!(
4525            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
4526            b"stem-bytes"
4527        );
4528        // An MP3 stem never renders WAV: no convert_wav, no generation.
4529        assert_eq!(http.count("convert_wav"), 0);
4530        assert_eq!(http.count("/api/gen/"), 0);
4531        assert_eq!(
4532            manifest.get("a").unwrap().stems.get("voc"),
4533            Some(&ArtifactState {
4534                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4535                hash: "vh".to_owned(),
4536            })
4537        );
4538    }
4539
4540    #[test]
4541    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
4542        // A WAV stem (the default) renders the stem clip's lossless WAV through the
4543        // free convert_wav flow keyed on the stem id, then downloads and stores it
4544        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
4545        let mut manifest = Manifest::new();
4546        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4547        let plan = Plan {
4548            actions: vec![Action::WriteStem {
4549                clip_id: "a".to_owned(),
4550                key: "voc".to_owned(),
4551                stem_id: "stemvoc".to_owned(),
4552                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
4553                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
4554                format: StemFormat::Wav,
4555                hash: "vh".to_owned(),
4556            }],
4557        };
4558        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
4559        // (free) and polls again — exactly the main FLAC/WAV render path.
4560        let http = ScriptedHttp::new()
4561            .with_auth()
4562            .route_seq(
4563                "stemvoc/wav_file/",
4564                vec![
4565                    Reply::json("{}"),
4566                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
4567                ],
4568            )
4569            .route("stemvoc/convert_wav/", Reply::status(200))
4570            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
4571        let fs = MemFs::new();
4572
4573        let outcome = run(
4574            &plan,
4575            &mut manifest,
4576            &[],
4577            &http,
4578            &fs,
4579            &StubFfmpeg::flac(),
4580            &RecordingClock::new(),
4581            &small_poll(),
4582        );
4583
4584        assert_eq!(outcome.artifacts_written, 1);
4585        assert_eq!(outcome.failed(), 0);
4586        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
4587        // so the stored bytes are the raw WAV, not a FLAC transcode.
4588        assert_eq!(
4589            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
4590            b"RIFFwav-bytes"
4591        );
4592        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
4593        // The free WAV render ran; no credit-spending generation endpoint did.
4594        assert_eq!(http.count("convert_wav"), 1);
4595        assert_eq!(http.count("stem_task"), 0);
4596        assert_eq!(http.count("separate"), 0);
4597        assert_eq!(
4598            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4599            "a.stems/a - Vocals [stemvoc].wav"
4600        );
4601    }
4602
4603    #[test]
4604    fn write_stem_is_skipped_when_owner_audio_is_absent() {
4605        // No owning manifest entry (audio failed or never existed) => skip with
4606        // no fetch and no write, so a stem is never stranded without its song.
4607        let mut manifest = Manifest::new();
4608        let plan = Plan {
4609            actions: vec![Action::WriteStem {
4610                clip_id: "ghost".to_owned(),
4611                key: "voc".to_owned(),
4612                stem_id: "voc".to_owned(),
4613                path: "ghost.stems/voc.mp3".to_owned(),
4614                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4615                format: StemFormat::Mp3,
4616                hash: "vh".to_owned(),
4617            }],
4618        };
4619        // Empty HTTP script: any fetch would error, proving none happens.
4620        let http = ScriptedHttp::new();
4621        let fs = MemFs::new();
4622
4623        let outcome = run(
4624            &plan,
4625            &mut manifest,
4626            &[],
4627            &http,
4628            &fs,
4629            &StubFfmpeg::flac(),
4630            &RecordingClock::new(),
4631            &ExecOptions::default(),
4632        );
4633
4634        assert_eq!(outcome.skipped, 1);
4635        assert_eq!(outcome.artifacts_written, 0);
4636        assert_eq!(outcome.failed(), 0);
4637        assert!(!fs.exists("ghost.stems/voc.mp3"));
4638    }
4639
4640    #[test]
4641    fn write_stem_relocates_the_old_file_on_a_path_move() {
4642        // The song was renamed, so the stem moves: the new file is written and the
4643        // stale copy at the previously tracked path is removed (moved, not orphaned).
4644        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
4645        let mut manifest = Manifest::new();
4646        let mut e = entry("new.flac", AudioFormat::Flac);
4647        e.stems.insert(
4648            "voc".to_owned(),
4649            ArtifactState {
4650                path: "old.stems/voc.mp3".to_owned(),
4651                hash: "vh".to_owned(),
4652            },
4653        );
4654        manifest.insert("a", e);
4655        let plan = Plan {
4656            actions: vec![Action::WriteStem {
4657                clip_id: "a".to_owned(),
4658                key: "voc".to_owned(),
4659                stem_id: "voc".to_owned(),
4660                path: "new.stems/voc.mp3".to_owned(),
4661                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4662                format: StemFormat::Mp3,
4663                hash: "vh".to_owned(),
4664            }],
4665        };
4666        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4667
4668        let outcome = run(
4669            &plan,
4670            &mut manifest,
4671            &[],
4672            &http,
4673            &fs,
4674            &StubFfmpeg::flac(),
4675            &RecordingClock::new(),
4676            &ExecOptions::default(),
4677        );
4678
4679        assert_eq!(outcome.artifacts_written, 1);
4680        assert!(fs.exists("new.stems/voc.mp3"));
4681        assert!(
4682            !fs.exists("old.stems/voc.mp3"),
4683            "the old stem is moved, not left behind"
4684        );
4685        assert_eq!(
4686            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4687            "new.stems/voc.mp3"
4688        );
4689    }
4690
4691    #[test]
4692    fn delete_stem_removes_file_and_clears_slot() {
4693        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4694        let mut manifest = Manifest::new();
4695        let mut e = entry("a.flac", AudioFormat::Flac);
4696        e.stems.insert(
4697            "voc".to_owned(),
4698            ArtifactState {
4699                path: "a.stems/voc.mp3".to_owned(),
4700                hash: "vh".to_owned(),
4701            },
4702        );
4703        manifest.insert("a", e);
4704        let plan = Plan {
4705            actions: vec![Action::DeleteStem {
4706                clip_id: "a".to_owned(),
4707                key: "voc".to_owned(),
4708                path: "a.stems/voc.mp3".to_owned(),
4709            }],
4710        };
4711
4712        let outcome = run(
4713            &plan,
4714            &mut manifest,
4715            &[],
4716            &ScriptedHttp::new(),
4717            &fs,
4718            &StubFfmpeg::flac(),
4719            &RecordingClock::new(),
4720            &ExecOptions::default(),
4721        );
4722
4723        assert_eq!(outcome.artifacts_deleted, 1);
4724        assert!(!fs.exists("a.stems/voc.mp3"));
4725        assert!(manifest.get("a").unwrap().stems.is_empty());
4726    }
4727
4728    #[test]
4729    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4730        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4731        // pruned by the end-of-run sweep, so it can never be orphaned.
4732        let fs = MemFs::new()
4733            .with_file("song.flac", b"DATA".to_vec())
4734            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4735        assert!(fs.has_dir("song.stems"));
4736        let mut manifest = Manifest::new();
4737        let mut e = entry("song.flac", AudioFormat::Flac);
4738        e.stems.insert(
4739            "voc".to_owned(),
4740            ArtifactState {
4741                path: "song.stems/voc.mp3".to_owned(),
4742                hash: "vh".to_owned(),
4743            },
4744        );
4745        manifest.insert("a", e);
4746        let plan = Plan {
4747            actions: vec![
4748                Action::Delete {
4749                    path: "song.flac".to_owned(),
4750                    clip_id: "a".to_owned(),
4751                },
4752                Action::DeleteStem {
4753                    clip_id: "a".to_owned(),
4754                    key: "voc".to_owned(),
4755                    path: "song.stems/voc.mp3".to_owned(),
4756                },
4757            ],
4758        };
4759
4760        let outcome = run(
4761            &plan,
4762            &mut manifest,
4763            &[],
4764            &ScriptedHttp::new(),
4765            &fs,
4766            &StubFfmpeg::flac(),
4767            &RecordingClock::new(),
4768            &ExecOptions::default(),
4769        );
4770
4771        assert_eq!(outcome.deleted, 1);
4772        assert_eq!(outcome.artifacts_deleted, 1);
4773        assert!(!fs.exists("song.flac"));
4774        assert!(!fs.exists("song.stems/voc.mp3"));
4775        assert!(
4776            !fs.has_dir("song.stems"),
4777            "the emptied .stems folder is pruned"
4778        );
4779        assert!(manifest.get("a").is_none());
4780    }
4781
4782    #[test]
4783    fn write_stem_mp3_never_issues_a_generation_post() {
4784        // The MP3 stem path is GET-only: writing a stem fetches its CDN url and
4785        // never POSTs, let alone to any generation or WAV-render endpoint.
4786        let mut manifest = Manifest::new();
4787        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4788        let plan = Plan {
4789            actions: vec![Action::WriteStem {
4790                clip_id: "a".to_owned(),
4791                key: "voc".to_owned(),
4792                stem_id: "voc".to_owned(),
4793                path: "a.stems/voc.mp3".to_owned(),
4794                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4795                format: StemFormat::Mp3,
4796                hash: "vh".to_owned(),
4797            }],
4798        };
4799        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem".to_vec()));
4800
4801        run(
4802            &plan,
4803            &mut manifest,
4804            &[],
4805            &http,
4806            &MemFs::new(),
4807            &StubFfmpeg::flac(),
4808            &RecordingClock::new(),
4809            &ExecOptions::default(),
4810        );
4811
4812        assert_eq!(
4813            http.count("stem_task"),
4814            0,
4815            "no generation endpoint is ever hit"
4816        );
4817        assert_eq!(http.count("convert_wav"), 0);
4818        assert_eq!(http.count("/api/gen/"), 0);
4819    }
4820
4821    #[test]
4822    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4823        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4824        // GET over the live page-count + 0-indexed page shape), reconcile them into
4825        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4826        // is GET-only and touches NO `/api/gen/` endpoint at all.
4827        let http = ScriptedHttp::new()
4828            .with_auth()
4829            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4830            .route(
4831                "clip1/stems?page=0",
4832                Reply::json(
4833                    r#"{"stems":[
4834                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4835                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4836                    ]}"#,
4837                ),
4838            )
4839            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
4840            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
4841
4842        // List the existing stems through the client (GET-only, free).
4843        let mut auth = ClerkAuth::new("eyJtoken");
4844        pollster::block_on(auth.authenticate(&http)).unwrap();
4845        let mut client = SunoClient::new(auth, RecordingClock::new());
4846        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4847        assert!(complete);
4848        assert_eq!(stems.len(), 2);
4849        assert_eq!(stems[0].label, "Vocals");
4850
4851        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
4852        let mut manifest = Manifest::new();
4853        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4854        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4855            .iter()
4856            .map(|s| crate::reconcile::DesiredStem {
4857                key: s.id.clone(),
4858                stem_id: s.id.clone(),
4859                path: format!("clip1.stems/{}.mp3", s.id),
4860                source_url: s.url.clone(),
4861                format: StemFormat::Mp3,
4862                hash: crate::art_url_hash(&s.url),
4863            })
4864            .collect();
4865        let d = Desired {
4866            path: "clip1.flac".to_owned(),
4867            stems: Some(desired_stems),
4868            ..desired(clip("clip1"), AudioFormat::Flac)
4869        };
4870        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4871            "clip1".to_owned(),
4872            crate::reconcile::LocalFile {
4873                exists: true,
4874                size: 100,
4875            },
4876        )]
4877        .into_iter()
4878        .collect();
4879        let sources = [crate::reconcile::SourceStatus {
4880            mode: SourceMode::Mirror,
4881            fully_enumerated: true,
4882        }];
4883        let plan =
4884            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4885        assert_eq!(plan.stem_writes(), 2);
4886
4887        let fs = MemFs::new();
4888        let outcome = run(
4889            &plan,
4890            &mut manifest,
4891            std::slice::from_ref(&d),
4892            &http,
4893            &fs,
4894            &StubFfmpeg::flac(),
4895            &RecordingClock::new(),
4896            &ExecOptions::default(),
4897        );
4898
4899        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
4900        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
4901        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
4902        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
4903        // generation, no separation).
4904        assert_eq!(http.count("/api/gen/"), 0);
4905        assert_eq!(http.count("stem_task"), 0);
4906        assert_eq!(http.count("separate"), 0);
4907        assert_eq!(http.count("generate"), 0);
4908        // No stem is ever written as FLAC.
4909        assert!(!fs.exists("clip1.stems/s1.flac"));
4910    }
4911
4912    #[test]
4913    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
4914        // End-to-end #100 path with WAV stems (the default): each stem's lossless
4915        // WAV is rendered through the FREE convert_wav flow and stored RAW as
4916        // `.wav`. The mirror makes NO credit-spending generation POST.
4917        let http = ScriptedHttp::new()
4918            .with_auth()
4919            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4920            .route(
4921                "clip1/stems?page=0",
4922                Reply::json(
4923                    r#"{"stems":[
4924                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4925                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4926                    ]}"#,
4927                ),
4928            )
4929            // Each stem's WAV is already rendered, so wav_file returns the url and
4930            // no convert_wav POST is even needed (still free either way).
4931            .route(
4932                "s1/wav_file/",
4933                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
4934            )
4935            .route(
4936                "s2/wav_file/",
4937                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
4938            )
4939            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
4940            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
4941
4942        let mut auth = ClerkAuth::new("eyJtoken");
4943        pollster::block_on(auth.authenticate(&http)).unwrap();
4944        let mut client = SunoClient::new(auth, RecordingClock::new());
4945        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4946
4947        let mut manifest = Manifest::new();
4948        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4949        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4950            .iter()
4951            .map(|s| crate::reconcile::DesiredStem {
4952                key: s.id.clone(),
4953                stem_id: s.id.clone(),
4954                path: format!("clip1.stems/{}.wav", s.id),
4955                source_url: s.url.clone(),
4956                format: StemFormat::Wav,
4957                hash: crate::art_url_hash(&s.url),
4958            })
4959            .collect();
4960        let d = Desired {
4961            path: "clip1.flac".to_owned(),
4962            stems: Some(desired_stems),
4963            ..desired(clip("clip1"), AudioFormat::Flac)
4964        };
4965        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4966            "clip1".to_owned(),
4967            crate::reconcile::LocalFile {
4968                exists: true,
4969                size: 100,
4970            },
4971        )]
4972        .into_iter()
4973        .collect();
4974        let sources = [crate::reconcile::SourceStatus {
4975            mode: SourceMode::Mirror,
4976            fully_enumerated: true,
4977        }];
4978        let plan =
4979            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4980
4981        let fs = MemFs::new();
4982        let outcome = run(
4983            &plan,
4984            &mut manifest,
4985            std::slice::from_ref(&d),
4986            &http,
4987            &fs,
4988            &StubFfmpeg::flac(),
4989            &RecordingClock::new(),
4990            &small_poll(),
4991        );
4992
4993        assert_eq!(outcome.artifacts_written, 2);
4994        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
4995        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
4996        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
4997        assert!(!fs.exists("clip1.stems/s1.flac"));
4998        // No credit-spending generation/separation endpoint is ever hit.
4999        assert_eq!(http.count("stem_task"), 0);
5000        assert_eq!(http.count("separate"), 0);
5001        assert_eq!(http.count("generate"), 0);
5002    }
5003
5004    #[test]
5005    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
5006        // A clip whose Download fails leaves no manifest entry, so its following
5007        // WriteArtifact must not strand an untracked sidecar: it is skipped with
5008        // no fetch and no write. A following healthy clip still succeeds.
5009        let ca = clip("a");
5010        let plan = Plan {
5011            actions: vec![
5012                Action::Download {
5013                    clip: ca.clone(),
5014                    lineage: LineageContext::own_root(&ca),
5015                    path: "a.mp3".to_owned(),
5016                    format: AudioFormat::Mp3,
5017                },
5018                Action::WriteArtifact {
5019                    kind: ArtifactKind::CoverJpg,
5020                    path: "a/cover.jpg".to_owned(),
5021                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5022                    hash: "h1".to_owned(),
5023                    owner_id: "a".to_owned(),
5024                    content: None,
5025                },
5026                Action::WriteArtifact {
5027                    kind: ArtifactKind::CoverJpg,
5028                    path: "b/cover.jpg".to_owned(),
5029                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5030                    hash: "h2".to_owned(),
5031                    owner_id: "b".to_owned(),
5032                    content: None,
5033                },
5034            ],
5035        };
5036        // The Download's audio 404s (permanent), so no entry for "a" is created.
5037        let http = ScriptedHttp::new()
5038            .route("a.mp3", Reply::status(404))
5039            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
5040            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5041        let fs = MemFs::new();
5042        let mut manifest = Manifest::new();
5043        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
5044        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5045
5046        let outcome = run(
5047            &plan,
5048            &mut manifest,
5049            &[],
5050            &http,
5051            &fs,
5052            &StubFfmpeg::flac(),
5053            &RecordingClock::new(),
5054            &ExecOptions::default(),
5055        );
5056
5057        assert_eq!(outcome.status, RunStatus::Completed);
5058        // The audio download is the only failure; the orphan artifact is skipped.
5059        assert_eq!(outcome.failed(), 1);
5060        assert_eq!(outcome.failures[0].clip_id, "a");
5061        assert_eq!(outcome.skipped, 1);
5062        // The orphan sidecar was neither fetched nor written, and left no record.
5063        assert_eq!(http.count("a/large.jpg"), 0);
5064        assert!(!fs.exists("a/cover.jpg"));
5065        assert!(manifest.get("a").is_none());
5066        // The healthy clip's sidecar still succeeded.
5067        assert_eq!(outcome.artifacts_written, 1);
5068        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5069        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5070    }
5071
5072    #[test]
5073    fn write_artifact_transcodes_animated_cover_to_webp() {
5074        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
5075        // port, and writes the transcoded WebP (not the fetched MP4), recording
5076        // the sidecar on the owning entry.
5077        let mut manifest = Manifest::new();
5078        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5079        let plan = Plan {
5080            actions: vec![Action::WriteArtifact {
5081                kind: ArtifactKind::CoverWebp,
5082                path: "a/cover.webp".to_owned(),
5083                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5084                hash: "v1".to_owned(),
5085                owner_id: "a".to_owned(),
5086                content: None,
5087            }],
5088        };
5089        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5090        let fs = MemFs::new();
5091        let ffmpeg = StubFfmpeg::webp();
5092
5093        let outcome = run(
5094            &plan,
5095            &mut manifest,
5096            &[],
5097            &http,
5098            &fs,
5099            &ffmpeg,
5100            &RecordingClock::new(),
5101            &ExecOptions::default(),
5102        );
5103
5104        assert_eq!(outcome.artifacts_written, 1);
5105        assert_eq!(outcome.failed(), 0);
5106        assert_eq!(outcome.status, RunStatus::Completed);
5107        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
5108        assert_eq!(http.count("a/video.mp4"), 1);
5109        let written = fs.read_file("a/cover.webp").unwrap();
5110        assert_ne!(written, b"mp4-bytes");
5111        assert!(written.starts_with(b"RIFF"));
5112        assert_eq!(
5113            manifest.get("a").unwrap().cover_webp,
5114            Some(ArtifactState {
5115                path: "a/cover.webp".to_owned(),
5116                hash: "v1".to_owned(),
5117            })
5118        );
5119    }
5120
5121    #[test]
5122    fn write_artifact_webp_transcode_failure_is_per_clip() {
5123        // A transcode failure is attributed to the owning clip: it is a per-clip
5124        // failure, the run completes, no sidecar is written, and the slot stays
5125        // empty. A healthy static cover in the same run still succeeds.
5126        let mut manifest = Manifest::new();
5127        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5128        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5129        let plan = Plan {
5130            actions: vec![
5131                Action::WriteArtifact {
5132                    kind: ArtifactKind::CoverWebp,
5133                    path: "a/cover.webp".to_owned(),
5134                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5135                    hash: "v1".to_owned(),
5136                    owner_id: "a".to_owned(),
5137                    content: None,
5138                },
5139                Action::WriteArtifact {
5140                    kind: ArtifactKind::CoverJpg,
5141                    path: "b/cover.jpg".to_owned(),
5142                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5143                    hash: "h1".to_owned(),
5144                    owner_id: "b".to_owned(),
5145                    content: None,
5146                },
5147            ],
5148        };
5149        let http = ScriptedHttp::new()
5150            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
5151            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5152        let fs = MemFs::new();
5153
5154        let outcome = run(
5155            &plan,
5156            &mut manifest,
5157            &[],
5158            &http,
5159            &fs,
5160            &StubFfmpeg::failing(),
5161            &RecordingClock::new(),
5162            &ExecOptions::default(),
5163        );
5164
5165        assert_eq!(outcome.status, RunStatus::Completed);
5166        assert_eq!(outcome.failed(), 1);
5167        assert_eq!(outcome.failures[0].clip_id, "a");
5168        // The animated cover failed to transcode: nothing written, slot empty.
5169        assert!(!fs.exists("a/cover.webp"));
5170        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
5171        // The static cover in the same run still succeeded.
5172        assert_eq!(outcome.artifacts_written, 1);
5173        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5174        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5175    }
5176
5177    #[test]
5178    fn write_artifact_uses_configured_webp_settings() {
5179        use std::sync::{Arc, Mutex};
5180
5181        struct RecordingWebpFfmpeg {
5182            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
5183        }
5184
5185        impl Ffmpeg for RecordingWebpFfmpeg {
5186            async fn wav_to_flac(
5187                &self,
5188                _wav: &[u8],
5189            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5190                Ok(Vec::new())
5191            }
5192
5193            async fn mp4_to_webp(
5194                &self,
5195                _mp4: &[u8],
5196                settings: WebpEncodeSettings,
5197            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5198                let seen = Arc::clone(&self.seen);
5199                seen.lock().unwrap().push(settings);
5200                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
5201            }
5202        }
5203
5204        let mut manifest = Manifest::new();
5205        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5206        let plan = Plan {
5207            actions: vec![Action::WriteArtifact {
5208                kind: ArtifactKind::CoverWebp,
5209                path: "a/cover.webp".to_owned(),
5210                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5211                hash: "v1".to_owned(),
5212                owner_id: "a".to_owned(),
5213                content: None,
5214            }],
5215        };
5216        let seen = Arc::new(Mutex::new(Vec::new()));
5217        let ffmpeg = RecordingWebpFfmpeg {
5218            seen: Arc::clone(&seen),
5219        };
5220        let opts = ExecOptions {
5221            cover_webp: WebpEncodeSettings {
5222                quality: 88,
5223                max_fps: 12,
5224                max_width: Some(720),
5225                lossless: false,
5226                compression_level: 4,
5227            },
5228            ..ExecOptions::default()
5229        };
5230
5231        let _ = run(
5232            &plan,
5233            &mut manifest,
5234            &[],
5235            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
5236            &MemFs::new(),
5237            &ffmpeg,
5238            &RecordingClock::new(),
5239            &opts,
5240        );
5241
5242        assert_eq!(
5243            seen.lock().unwrap().as_slice(),
5244            &[WebpEncodeSettings {
5245                quality: 88,
5246                max_fps: 12,
5247                max_width: Some(720),
5248                lossless: false,
5249                compression_level: 4,
5250            }]
5251        );
5252    }
5253
5254    // ── Phase 8: folder art routes to the album store ───────────────
5255
5256    #[test]
5257    fn folder_jpg_write_records_album_state_and_skips_manifest() {
5258        // Folder art is owned by the album root id, not a manifest clip: it
5259        // writes even with an empty manifest and records on the album store.
5260        let mut manifest = Manifest::new();
5261        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5262        let plan = Plan {
5263            actions: vec![Action::WriteArtifact {
5264                kind: ArtifactKind::FolderJpg,
5265                path: "creator/album/folder.jpg".to_owned(),
5266                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5267                hash: "jh".to_owned(),
5268                owner_id: "root".to_owned(),
5269                content: None,
5270            }],
5271        };
5272        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
5273        let fs = MemFs::new();
5274
5275        let outcome = run_with_albums(
5276            &plan,
5277            &mut manifest,
5278            &mut albums,
5279            &[],
5280            &http,
5281            &fs,
5282            &StubFfmpeg::flac(),
5283            &RecordingClock::new(),
5284            &ExecOptions::default(),
5285        );
5286
5287        assert_eq!(outcome.artifacts_written, 1);
5288        assert_eq!(outcome.status, RunStatus::Completed);
5289        assert_eq!(
5290            fs.read_file("creator/album/folder.jpg").unwrap(),
5291            b"folder-jpg"
5292        );
5293        assert_eq!(
5294            albums.get("root").unwrap().folder_jpg,
5295            Some(ArtifactState {
5296                path: "creator/album/folder.jpg".to_owned(),
5297                hash: "jh".to_owned(),
5298            })
5299        );
5300        assert!(manifest.get("root").is_none());
5301    }
5302
5303    #[test]
5304    fn folder_webp_write_transcodes_and_records_album_state() {
5305        let mut manifest = Manifest::new();
5306        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5307        let plan = Plan {
5308            actions: vec![Action::WriteArtifact {
5309                kind: ArtifactKind::FolderWebp,
5310                path: "creator/album/cover.webp".to_owned(),
5311                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5312                hash: "wh".to_owned(),
5313                owner_id: "root".to_owned(),
5314                content: None,
5315            }],
5316        };
5317        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5318        let fs = MemFs::new();
5319
5320        let outcome = run_with_albums(
5321            &plan,
5322            &mut manifest,
5323            &mut albums,
5324            &[],
5325            &http,
5326            &fs,
5327            &StubFfmpeg::webp(),
5328            &RecordingClock::new(),
5329            &ExecOptions::default(),
5330        );
5331
5332        assert_eq!(outcome.artifacts_written, 1);
5333        assert_eq!(outcome.failed(), 0);
5334        // The MP4 was transcoded to WebP, not written verbatim.
5335        let written = fs.read_file("creator/album/cover.webp").unwrap();
5336        assert_ne!(written, b"mp4-bytes");
5337        assert!(written.starts_with(b"RIFF"));
5338        assert_eq!(
5339            albums.get("root").unwrap().folder_webp,
5340            Some(ArtifactState {
5341                path: "creator/album/cover.webp".to_owned(),
5342                hash: "wh".to_owned(),
5343            })
5344        );
5345    }
5346
5347    #[test]
5348    fn folder_mp4_write_keeps_the_source_verbatim() {
5349        let mut manifest = Manifest::new();
5350        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5351        let plan = Plan {
5352            actions: vec![Action::WriteArtifact {
5353                kind: ArtifactKind::FolderMp4,
5354                path: "creator/album/cover.mp4".to_owned(),
5355                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5356                hash: "mh".to_owned(),
5357                owner_id: "root".to_owned(),
5358                content: None,
5359            }],
5360        };
5361        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5362        let fs = MemFs::new();
5363
5364        let outcome = run_with_albums(
5365            &plan,
5366            &mut manifest,
5367            &mut albums,
5368            &[],
5369            &http,
5370            &fs,
5371            &StubFfmpeg::webp(),
5372            &RecordingClock::new(),
5373            &ExecOptions::default(),
5374        );
5375
5376        assert_eq!(outcome.artifacts_written, 1);
5377        assert_eq!(outcome.failed(), 0);
5378        // The raw MP4 is written byte-for-byte, never transcoded.
5379        assert_eq!(
5380            fs.read_file("creator/album/cover.mp4").unwrap(),
5381            b"mp4-bytes"
5382        );
5383        assert_eq!(
5384            albums.get("root").unwrap().folder_mp4,
5385            Some(ArtifactState {
5386                path: "creator/album/cover.mp4".to_owned(),
5387                hash: "mh".to_owned(),
5388            })
5389        );
5390    }
5391
5392    #[test]
5393    fn both_folder_covers_fetch_the_video_cover_once() {
5394        let mut manifest = Manifest::new();
5395        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5396        // `both` retention keeps cover.webp (transcoded) and cover.mp4 (raw) from
5397        // the one video_cover_url. FolderWebp sorts first and caches the fetched
5398        // source; FolderMp4 drains it, so the source is fetched exactly once.
5399        let plan = Plan {
5400            actions: vec![
5401                Action::WriteArtifact {
5402                    kind: ArtifactKind::FolderWebp,
5403                    path: "creator/album/cover.webp".to_owned(),
5404                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5405                    hash: "wh".to_owned(),
5406                    owner_id: "root".to_owned(),
5407                    content: None,
5408                },
5409                Action::WriteArtifact {
5410                    kind: ArtifactKind::FolderMp4,
5411                    path: "creator/album/cover.mp4".to_owned(),
5412                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5413                    hash: "mh".to_owned(),
5414                    owner_id: "root".to_owned(),
5415                    content: None,
5416                },
5417            ],
5418        };
5419        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5420        let fs = MemFs::new();
5421
5422        let outcome = run_with_albums(
5423            &plan,
5424            &mut manifest,
5425            &mut albums,
5426            &[],
5427            &http,
5428            &fs,
5429            &StubFfmpeg::webp(),
5430            &RecordingClock::new(),
5431            &ExecOptions::default(),
5432        );
5433
5434        assert_eq!(outcome.artifacts_written, 2);
5435        assert_eq!(outcome.failed(), 0);
5436        // Fetched exactly once despite two artifacts consuming it (#90 / #89).
5437        assert_eq!(http.count("root/video.mp4"), 1);
5438        // The webp is transcoded; the mp4 is the raw source verbatim.
5439        assert!(
5440            fs.read_file("creator/album/cover.webp")
5441                .unwrap()
5442                .starts_with(b"RIFF")
5443        );
5444        assert_eq!(
5445            fs.read_file("creator/album/cover.mp4").unwrap(),
5446            b"mp4-bytes"
5447        );
5448    }
5449
5450    #[test]
5451    fn folder_art_delete_clears_album_state() {
5452        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
5453        let mut manifest = Manifest::new();
5454        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5455        albums.insert(
5456            "root".to_owned(),
5457            AlbumArt {
5458                folder_jpg: Some(ArtifactState {
5459                    path: "creator/album/folder.jpg".to_owned(),
5460                    hash: "jh".to_owned(),
5461                }),
5462                folder_webp: None,
5463                folder_mp4: None,
5464            },
5465        );
5466        let plan = Plan {
5467            actions: vec![Action::DeleteArtifact {
5468                kind: ArtifactKind::FolderJpg,
5469                path: "creator/album/folder.jpg".to_owned(),
5470                owner_id: "root".to_owned(),
5471            }],
5472        };
5473
5474        let outcome = run_with_albums(
5475            &plan,
5476            &mut manifest,
5477            &mut albums,
5478            &[],
5479            &ScriptedHttp::new(),
5480            &fs,
5481            &StubFfmpeg::flac(),
5482            &RecordingClock::new(),
5483            &ExecOptions::default(),
5484        );
5485
5486        assert_eq!(outcome.artifacts_deleted, 1);
5487        assert!(!fs.exists("creator/album/folder.jpg"));
5488        // The album row had only the one kind, so it is pruned entirely.
5489        assert!(!albums.contains_key("root"));
5490    }
5491
5492    // ── Phase 9: playlist artifacts ─────────────────────────────────
5493
5494    #[test]
5495    fn playlist_write_uses_inline_content_and_records_state() {
5496        // A playlist body is generated, carried inline. With an empty manifest
5497        // and NO http routes, the write still succeeds — proving it skipped the
5498        // network — and records the playlist store keyed by the playlist id.
5499        let mut manifest = Manifest::new();
5500        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5501        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5502        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
5503        let plan = Plan {
5504            actions: vec![Action::WriteArtifact {
5505                kind: ArtifactKind::Playlist,
5506                path: "Road Trip.m3u8".to_owned(),
5507                source_url: String::new(),
5508                hash: "ph1".to_owned(),
5509                owner_id: "pl1".to_owned(),
5510                content: Some(body.to_owned()),
5511            }],
5512        };
5513        let fs = MemFs::new();
5514
5515        let outcome = run_full(
5516            &plan,
5517            &mut manifest,
5518            &mut albums,
5519            &mut playlists,
5520            &[],
5521            &ScriptedHttp::new(),
5522            &fs,
5523            &StubFfmpeg::flac(),
5524            &RecordingClock::new(),
5525            &ExecOptions::default(),
5526        );
5527
5528        assert_eq!(outcome.artifacts_written, 1);
5529        assert_eq!(outcome.failed(), 0);
5530        // The exact inline bytes were written, verbatim.
5531        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
5532        assert_eq!(
5533            playlists.get("pl1"),
5534            Some(&PlaylistState {
5535                name: "Road Trip".to_owned(),
5536                path: "Road Trip.m3u8".to_owned(),
5537                hash: "ph1".to_owned(),
5538            })
5539        );
5540    }
5541
5542    #[test]
5543    fn playlist_delete_removes_file_and_clears_state() {
5544        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
5545        let mut manifest = Manifest::new();
5546        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5547        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5548        playlists.insert(
5549            "pl1".to_owned(),
5550            PlaylistState {
5551                name: "Old".to_owned(),
5552                path: "Old.m3u8".to_owned(),
5553                hash: "ph1".to_owned(),
5554            },
5555        );
5556        let plan = Plan {
5557            actions: vec![Action::DeleteArtifact {
5558                kind: ArtifactKind::Playlist,
5559                path: "Old.m3u8".to_owned(),
5560                owner_id: "pl1".to_owned(),
5561            }],
5562        };
5563
5564        let outcome = run_full(
5565            &plan,
5566            &mut manifest,
5567            &mut albums,
5568            &mut playlists,
5569            &[],
5570            &ScriptedHttp::new(),
5571            &fs,
5572            &StubFfmpeg::flac(),
5573            &RecordingClock::new(),
5574            &ExecOptions::default(),
5575        );
5576
5577        assert_eq!(outcome.artifacts_deleted, 1);
5578        assert!(!fs.exists("Old.m3u8"));
5579        assert!(
5580            !playlists.contains_key("pl1"),
5581            "the playlist row is cleared on delete"
5582        );
5583    }
5584
5585    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
5586
5587    #[test]
5588    fn rename_move_relocates_cover_and_prunes_old_album() {
5589        // A title/album change moves the audio (Rename) and re-emits the cover
5590        // at the NEW path. The old cover must be removed and the now-empty old
5591        // album directory pruned, leaving no orphan sidecar and no ghost dir.
5592        let mut manifest = Manifest::new();
5593        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
5594        e.cover_jpg = Some(ArtifactState {
5595            path: "Creator/AlbumA/cover.jpg".to_owned(),
5596            hash: "h1".to_owned(),
5597        });
5598        manifest.insert("a", e);
5599        let fs = MemFs::new()
5600            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
5601            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
5602        let plan = Plan {
5603            actions: vec![
5604                Action::Rename {
5605                    from: "Creator/AlbumA/song.flac".to_owned(),
5606                    to: "Creator/AlbumB/song.flac".to_owned(),
5607                },
5608                Action::WriteArtifact {
5609                    kind: ArtifactKind::CoverJpg,
5610                    path: "Creator/AlbumB/cover.jpg".to_owned(),
5611                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5612                    hash: "h1".to_owned(),
5613                    owner_id: "a".to_owned(),
5614                    content: None,
5615                },
5616            ],
5617        };
5618        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
5619
5620        let outcome = run(
5621            &plan,
5622            &mut manifest,
5623            &[],
5624            &http,
5625            &fs,
5626            &StubFfmpeg::flac(),
5627            &RecordingClock::new(),
5628            &ExecOptions::default(),
5629        );
5630
5631        assert_eq!(outcome.failed(), 0);
5632        // Audio moved, the new cover was written, the old cover removed.
5633        assert!(fs.exists("Creator/AlbumB/song.flac"));
5634        assert_eq!(
5635            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
5636            b"new-jpg"
5637        );
5638        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
5639        assert!(!fs.exists("Creator/AlbumA/song.flac"));
5640        // The manifest cover slot now points at the new path.
5641        assert_eq!(
5642            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5643            "Creator/AlbumB/cover.jpg"
5644        );
5645        // The emptied old album directory is pruned; the new one survives.
5646        assert!(!fs.has_dir("Creator/AlbumA"));
5647        assert!(fs.has_dir("Creator/AlbumB"));
5648    }
5649
5650    #[test]
5651    fn rename_move_relocates_folder_art_and_prunes_old_album() {
5652        // An album rename moves folder.jpg: the old file is removed, the album
5653        // store slot advanced to the new path, and the emptied dir pruned.
5654        let mut manifest = Manifest::new();
5655        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5656        albums.insert(
5657            "root".to_owned(),
5658            AlbumArt {
5659                folder_jpg: Some(ArtifactState {
5660                    path: "Creator/AlbumA/folder.jpg".to_owned(),
5661                    hash: "jh".to_owned(),
5662                }),
5663                folder_webp: None,
5664                folder_mp4: None,
5665            },
5666        );
5667        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
5668        let plan = Plan {
5669            actions: vec![Action::WriteArtifact {
5670                kind: ArtifactKind::FolderJpg,
5671                path: "Creator/AlbumB/folder.jpg".to_owned(),
5672                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5673                hash: "jh".to_owned(),
5674                owner_id: "root".to_owned(),
5675                content: None,
5676            }],
5677        };
5678        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
5679
5680        let outcome = run_with_albums(
5681            &plan,
5682            &mut manifest,
5683            &mut albums,
5684            &[],
5685            &http,
5686            &fs,
5687            &StubFfmpeg::flac(),
5688            &RecordingClock::new(),
5689            &ExecOptions::default(),
5690        );
5691
5692        assert_eq!(outcome.failed(), 0);
5693        assert_eq!(
5694            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
5695            b"new-folder"
5696        );
5697        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
5698        assert_eq!(
5699            albums
5700                .get("root")
5701                .unwrap()
5702                .folder_jpg
5703                .as_ref()
5704                .unwrap()
5705                .path,
5706            "Creator/AlbumB/folder.jpg"
5707        );
5708        assert!(!fs.has_dir("Creator/AlbumA"));
5709        assert!(fs.has_dir("Creator/AlbumB"));
5710    }
5711
5712    #[test]
5713    fn prune_empty_dirs_removes_only_empty_dirs() {
5714        // A direct exercise of the prune port's safety guarantees on a mixed
5715        // tree: nested empties go, anything holding a file (hidden ones too)
5716        // stays, and no file is touched.
5717        let fs = MemFs::new()
5718            .with_file("keep/full/song.flac", b"x".to_vec())
5719            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
5720            .with_dir("empty/leaf")
5721            .with_dir("nested/a/b/c");
5722
5723        fs.prune_empty_dirs("").unwrap();
5724
5725        // Every empty directory, however deeply nested, is pruned bottom-up.
5726        for gone in [
5727            "empty",
5728            "empty/leaf",
5729            "nested",
5730            "nested/a",
5731            "nested/a/b",
5732            "nested/a/b/c",
5733        ] {
5734            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
5735        }
5736        // A directory holding any file — including only a hidden dotfile — stays.
5737        assert!(fs.has_dir("keep"));
5738        assert!(fs.has_dir("keep/full"));
5739        assert!(fs.has_dir("hidden"));
5740        // No file was touched.
5741        assert!(fs.exists("keep/full/song.flac"));
5742        assert!(fs.exists("hidden/.suno-manifest.json"));
5743    }
5744
5745    #[test]
5746    fn prune_empty_dirs_never_removes_the_named_root() {
5747        // Pruning under a named root clears its empty children but keeps the
5748        // root itself, even when the root is now empty.
5749        let fs = MemFs::new().with_dir("empty/leaf");
5750        fs.prune_empty_dirs("empty").unwrap();
5751        assert!(fs.has_dir("empty"), "the named root is never removed");
5752        assert!(!fs.has_dir("empty/leaf"));
5753    }
5754
5755    #[test]
5756    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
5757        // If removing the old sidecar fails, the write is a per-clip failure
5758        // that never aborts the run and does NOT advance the state slot, so the
5759        // next identical run re-attempts the cleanup and the tree converges.
5760        let mut manifest = Manifest::new();
5761        let mut e = entry("a.flac", AudioFormat::Flac);
5762        e.cover_jpg = Some(ArtifactState {
5763            path: "AlbumA/cover.jpg".to_owned(),
5764            hash: "h1".to_owned(),
5765        });
5766        manifest.insert("a", e);
5767        let fs = MemFs::new()
5768            .with_file("a.flac", b"AUDIO".to_vec())
5769            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5770        let plan = Plan {
5771            actions: vec![Action::WriteArtifact {
5772                kind: ArtifactKind::CoverJpg,
5773                path: "AlbumB/cover.jpg".to_owned(),
5774                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5775                hash: "h1".to_owned(),
5776                owner_id: "a".to_owned(),
5777                content: None,
5778            }],
5779        };
5780        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5781
5782        // Run 1: the old-cover remove is forced to fail.
5783        fs.arm_fail_remove("AlbumA/cover.jpg");
5784        let first = run(
5785            &plan,
5786            &mut manifest,
5787            &[],
5788            &http,
5789            &fs,
5790            &StubFfmpeg::flac(),
5791            &RecordingClock::new(),
5792            &ExecOptions::default(),
5793        );
5794        assert_eq!(
5795            first.status,
5796            RunStatus::Completed,
5797            "a remove failure never aborts the run"
5798        );
5799        assert_eq!(first.failed(), 1);
5800        // The new cover is written but the old one lingers and the slot is stale.
5801        assert!(fs.exists("AlbumB/cover.jpg"));
5802        assert!(fs.exists("AlbumA/cover.jpg"));
5803        assert_eq!(
5804            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5805            "AlbumA/cover.jpg"
5806        );
5807        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5808
5809        // Run 2: the same plan re-runs with the fault cleared and converges.
5810        fs.disarm_fail_remove("AlbumA/cover.jpg");
5811        let second = run(
5812            &plan,
5813            &mut manifest,
5814            &[],
5815            &http,
5816            &fs,
5817            &StubFfmpeg::flac(),
5818            &RecordingClock::new(),
5819            &ExecOptions::default(),
5820        );
5821        assert_eq!(second.failed(), 0);
5822        assert!(fs.exists("AlbumB/cover.jpg"));
5823        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5824        assert_eq!(
5825            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5826            "AlbumB/cover.jpg"
5827        );
5828        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5829    }
5830
5831    #[test]
5832    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
5833        // The idempotent case: a content-only cover rewrite (hash drift, path
5834        // unchanged) attempts no remove and prunes no live directory. A remove
5835        // failure is armed on the cover path, so any spurious remove would
5836        // surface as a failure — none does.
5837        let mut manifest = Manifest::new();
5838        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
5839        e.cover_jpg = Some(ArtifactState {
5840            path: "Album/cover.jpg".to_owned(),
5841            hash: "h1".to_owned(),
5842        });
5843        manifest.insert("a", e);
5844        let fs = MemFs::new()
5845            .with_file("Album/a.mp3", b"AUDIO".to_vec())
5846            .with_file("Album/cover.jpg", b"old".to_vec());
5847        fs.arm_fail_remove("Album/cover.jpg");
5848        let plan = Plan {
5849            actions: vec![Action::WriteArtifact {
5850                kind: ArtifactKind::CoverJpg,
5851                path: "Album/cover.jpg".to_owned(),
5852                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5853                hash: "h2".to_owned(),
5854                owner_id: "a".to_owned(),
5855                content: None,
5856            }],
5857        };
5858        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5859
5860        let outcome = run(
5861            &plan,
5862            &mut manifest,
5863            &[],
5864            &http,
5865            &fs,
5866            &StubFfmpeg::flac(),
5867            &RecordingClock::new(),
5868            &ExecOptions::default(),
5869        );
5870
5871        assert_eq!(
5872            outcome.failed(),
5873            0,
5874            "no remove is attempted, so the armed failure never fires"
5875        );
5876        assert_eq!(outcome.artifacts_written, 1);
5877        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
5878        assert_eq!(
5879            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
5880            "h2"
5881        );
5882        // The live directory is untouched by prune.
5883        assert!(fs.has_dir("Album"));
5884    }
5885
5886    // ── Concurrency (issue #22) ─────────────────────────────────────
5887
5888    mod concurrency {
5889        use super::*;
5890        use crate::ffmpeg::FfmpegError;
5891        use crate::fs::{FileStat, FsError};
5892        use crate::http::{HttpRequest, TransportError};
5893        use std::future::Future;
5894        use std::pin::Pin;
5895        use std::sync::Arc;
5896        use std::sync::atomic::{AtomicUsize, Ordering};
5897        use std::task::{Context, Poll};
5898
5899        /// A future that pends exactly once before resolving, waking itself so a
5900        /// single-threaded executor re-polls. It forces the [`Http`] port to
5901        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
5902        /// each in-flight request and the true overlap becomes observable.
5903        #[derive(Default)]
5904        struct YieldOnce {
5905            yielded: bool,
5906        }
5907
5908        impl Future for YieldOnce {
5909            type Output = ();
5910            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
5911                if self.yielded {
5912                    Poll::Ready(())
5913                } else {
5914                    self.yielded = true;
5915                    cx.waker().wake_by_ref();
5916                    Poll::Pending
5917                }
5918            }
5919        }
5920
5921        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
5922        /// number of concurrently in-flight requests. Each `send` bumps a live
5923        /// counter, yields once (so peers can start), then delegates.
5924        struct GatedHttp {
5925            inner: ScriptedHttp,
5926            inflight: Arc<AtomicUsize>,
5927            peak: Arc<AtomicUsize>,
5928        }
5929
5930        impl GatedHttp {
5931            fn new(inner: ScriptedHttp) -> Self {
5932                Self {
5933                    inner,
5934                    inflight: Arc::new(AtomicUsize::new(0)),
5935                    peak: Arc::new(AtomicUsize::new(0)),
5936                }
5937            }
5938
5939            fn peak(&self) -> usize {
5940                self.peak.load(Ordering::SeqCst)
5941            }
5942        }
5943
5944        impl Http for GatedHttp {
5945            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
5946                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
5947                self.peak.fetch_max(now, Ordering::SeqCst);
5948                YieldOnce::default().await;
5949                let out = self.inner.send(request).await;
5950                self.inflight.fetch_sub(1, Ordering::SeqCst);
5951                out
5952            }
5953        }
5954
5955        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
5956            let c = clip(id);
5957            let d = desired(c.clone(), format);
5958            let action = Action::Download {
5959                clip: c.clone(),
5960                lineage: LineageContext::own_root(&c),
5961                path: d.path.clone(),
5962                format,
5963            };
5964            (c, d, action)
5965        }
5966
5967        fn opts_with(concurrency: u32) -> ExecOptions {
5968            ExecOptions {
5969                concurrency,
5970                ..small_poll()
5971            }
5972        }
5973
5974        #[test]
5975        fn concurrency_never_exceeds_the_configured_bound() {
5976            let count = 6;
5977            let concurrency = 3;
5978            let mut scripted = ScriptedHttp::new().with_auth();
5979            let mut actions = Vec::new();
5980            let mut desireds = Vec::new();
5981            for i in 0..count {
5982                let id = format!("c{i}");
5983                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5984                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5985                actions.push(action);
5986                desireds.push(d);
5987            }
5988            let http = GatedHttp::new(scripted);
5989            let fs = MemFs::new();
5990            let plan = Plan { actions };
5991            let mut manifest = Manifest::new();
5992
5993            let outcome = run_gated_fs(
5994                &plan,
5995                &mut manifest,
5996                &desireds,
5997                &http,
5998                &fs,
5999                &opts_with(concurrency),
6000            );
6001
6002            assert_eq!(outcome.downloaded, count);
6003            assert!(
6004                http.peak() <= concurrency as usize,
6005                "peak {} exceeded the bound {concurrency}",
6006                http.peak()
6007            );
6008            assert_eq!(
6009                http.peak(),
6010                concurrency as usize,
6011                "expected the run to saturate the bound"
6012            );
6013        }
6014
6015        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
6016        /// outcome. The client is built here so the limiter can be inspected by
6017        /// the caller-facing variant below.
6018        fn run_gated_fs(
6019            plan: &Plan,
6020            manifest: &mut Manifest,
6021            desired: &[Desired],
6022            http: &GatedHttp,
6023            fs: &MemFs,
6024            opts: &ExecOptions,
6025        ) -> ExecOutcome {
6026            let ffmpeg = StubFfmpeg::flac();
6027            let clock = RecordingClock::new();
6028            let mut albums = BTreeMap::new();
6029            let mut playlists = BTreeMap::new();
6030            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6031            pollster::block_on(execute(
6032                plan,
6033                manifest,
6034                &mut albums,
6035                &mut playlists,
6036                desired,
6037                &HashMap::new(),
6038                Ports {
6039                    client: &mut client,
6040                    http,
6041                    fs,
6042                    ffmpeg: &ffmpeg,
6043                    clock: &clock,
6044                },
6045                opts,
6046            ))
6047        }
6048
6049        #[test]
6050        fn a_failing_clip_does_not_abort_the_others() {
6051            let mut scripted = ScriptedHttp::new().with_auth();
6052            scripted = scripted
6053                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
6054                .route("bad.mp3", Reply::status(404))
6055                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
6056            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
6057            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
6058            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
6059            let http = GatedHttp::new(scripted);
6060            let fs = MemFs::new();
6061            let plan = Plan {
6062                actions: vec![a1, a2, a3],
6063            };
6064            let mut manifest = Manifest::new();
6065
6066            let outcome = run_gated_fs(
6067                &plan,
6068                &mut manifest,
6069                &[d1, d2, d3],
6070                &http,
6071                &fs,
6072                &opts_with(3),
6073            );
6074
6075            assert_eq!(outcome.downloaded, 2);
6076            assert_eq!(outcome.failed(), 1);
6077            assert_eq!(outcome.status, RunStatus::Completed);
6078            assert_eq!(outcome.failures[0].clip_id, "bad");
6079            assert!(manifest.get("ok1").is_some());
6080            assert!(manifest.get("ok2").is_some());
6081            assert!(manifest.get("bad").is_none());
6082        }
6083
6084        #[test]
6085        fn outcome_is_identical_across_concurrency_levels() {
6086            // A plan mixing successful and failing downloads with serial phase-2
6087            // actions (a skip and a delete), so both phases contribute.
6088            fn build() -> (Plan, Vec<Desired>) {
6089                let mut actions = Vec::new();
6090                let mut desireds = Vec::new();
6091                for id in ["a", "b", "c", "d"] {
6092                    let (_c, d, action) = download(id, AudioFormat::Mp3);
6093                    actions.push(action);
6094                    desireds.push(d);
6095                }
6096                // A failing download in the middle of the audio set.
6097                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
6098                actions.insert(2, ae);
6099                desireds.push(de);
6100                // Phase-2 actions.
6101                actions.push(Action::Skip {
6102                    clip_id: "gone".to_owned(),
6103                });
6104                actions.push(Action::Delete {
6105                    path: "old.mp3".to_owned(),
6106                    clip_id: "old".to_owned(),
6107                });
6108                (Plan { actions }, desireds)
6109            }
6110
6111            fn http() -> ScriptedHttp {
6112                ScriptedHttp::new()
6113                    .with_auth()
6114                    .route("a.mp3", Reply::ok(b"a".to_vec()))
6115                    .route("b.mp3", Reply::ok(b"b".to_vec()))
6116                    .route("c.mp3", Reply::ok(b"c".to_vec()))
6117                    .route("d.mp3", Reply::ok(b"d".to_vec()))
6118                    .route("fail.mp3", Reply::status(404))
6119            }
6120
6121            fn seed_manifest() -> Manifest {
6122                let mut m = Manifest::new();
6123                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
6124                m
6125            }
6126
6127            let (plan, desireds) = build();
6128
6129            let mut m1 = seed_manifest();
6130            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6131            let out1 = run_gated_fs(
6132                &plan,
6133                &mut m1,
6134                &desireds,
6135                &GatedHttp::new(http()),
6136                &fs1,
6137                &opts_with(1),
6138            );
6139
6140            let mut m8 = seed_manifest();
6141            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6142            let out8 = run_gated_fs(
6143                &plan,
6144                &mut m8,
6145                &desireds,
6146                &GatedHttp::new(http()),
6147                &fs8,
6148                &opts_with(8),
6149            );
6150
6151            assert_eq!(out1, out8, "outcome must not depend on concurrency");
6152            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
6153            assert_eq!(out8.downloaded, 4);
6154            assert_eq!(out8.deleted, 1);
6155            assert_eq!(out8.skipped, 1);
6156            assert_eq!(out8.failed(), 1);
6157        }
6158
6159        #[test]
6160        fn a_systemic_disk_full_aborts_promptly() {
6161            let count = 8;
6162            let concurrency = 2;
6163            let mut scripted = ScriptedHttp::new().with_auth();
6164            let mut actions = Vec::new();
6165            let mut desireds = Vec::new();
6166            for i in 0..count {
6167                let id = format!("d{i}");
6168                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6169                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6170                actions.push(action);
6171                desireds.push(d);
6172            }
6173            // The very first clip's write hits ENOSPC, a systemic failure.
6174            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
6175            let http = GatedHttp::new(scripted);
6176            let plan = Plan { actions };
6177            let mut manifest = Manifest::new();
6178
6179            let outcome = run_gated_fs(
6180                &plan,
6181                &mut manifest,
6182                &desireds,
6183                &http,
6184                &fs,
6185                &opts_with(concurrency),
6186            );
6187
6188            assert_eq!(outcome.status, RunStatus::DiskFull);
6189            assert!(
6190                outcome.downloaded < count,
6191                "a systemic abort must stop remaining work, downloaded {}",
6192                outcome.downloaded
6193            );
6194        }
6195
6196        #[test]
6197        fn limiter_records_a_rate_limit_under_concurrent_calls() {
6198            // Three concurrent FLAC renders; exactly one clip is throttled once
6199            // on its wav_file read. The shared limiter must record that single
6200            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
6201            // the mutex keeps the AIMD state correct under concurrency.
6202            let scripted = ScriptedHttp::new()
6203                .with_auth()
6204                .route_seq(
6205                    "/gen/x/wav_file/",
6206                    vec![
6207                        Reply::status(429),
6208                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
6209                    ],
6210                )
6211                .route(
6212                    "/gen/y/wav_file/",
6213                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
6214                )
6215                .route(
6216                    "/gen/z/wav_file/",
6217                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
6218                )
6219                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
6220                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
6221                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
6222
6223            let mut actions = Vec::new();
6224            let mut desireds = Vec::new();
6225            for id in ["x", "y", "z"] {
6226                let (_c, d, action) = download(id, AudioFormat::Flac);
6227                actions.push(action);
6228                desireds.push(d);
6229            }
6230            let plan = Plan { actions };
6231            let fs = MemFs::new();
6232            let ffmpeg = StubFfmpeg::flac();
6233            let clock = RecordingClock::new();
6234            let mut albums = BTreeMap::new();
6235            let mut playlists = BTreeMap::new();
6236            let mut manifest = Manifest::new();
6237            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6238
6239            let outcome = pollster::block_on(execute(
6240                &plan,
6241                &mut manifest,
6242                &mut albums,
6243                &mut playlists,
6244                &desireds,
6245                &HashMap::new(),
6246                Ports {
6247                    client: &mut client,
6248                    http: &scripted,
6249                    fs: &fs,
6250                    ffmpeg: &ffmpeg,
6251                    clock: &clock,
6252                },
6253                &opts_with(3),
6254            ));
6255
6256            assert_eq!(outcome.downloaded, 3);
6257            assert_eq!(outcome.failed(), 0);
6258            assert!(
6259                (client.limiter_rate() - 1.0).abs() < 1e-9,
6260                "one 429 must halve the rate to 1.0, got {}",
6261                client.limiter_rate()
6262            );
6263        }
6264
6265        #[test]
6266        fn a_download_is_committed_in_plan_order_around_a_rename() {
6267            // Plan order: rename "orig" away from shared.mp3 first, then download
6268            // a new clip into shared.mp3. A parallel executor that performed the
6269            // download's destination write off plan order would write shared.mp3
6270            // before the rename ran, letting the rename carry those fresh bytes
6271            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
6272            // Committing every destination effect serially in plan order keeps
6273            // moved.mp3 = the original and shared.mp3 = the new download.
6274            let c_new = clip("new");
6275            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
6276            d_new.path = "shared.mp3".to_owned();
6277            let plan = Plan {
6278                actions: vec![
6279                    Action::Rename {
6280                        from: "shared.mp3".to_owned(),
6281                        to: "moved.mp3".to_owned(),
6282                    },
6283                    Action::Download {
6284                        clip: c_new.clone(),
6285                        lineage: LineageContext::own_root(&c_new),
6286                        path: "shared.mp3".to_owned(),
6287                        format: AudioFormat::Mp3,
6288                    },
6289                ],
6290            };
6291            let scripted = ScriptedHttp::new()
6292                .with_auth()
6293                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
6294            let http = GatedHttp::new(scripted);
6295            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
6296            let mut manifest = Manifest::new();
6297            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
6298
6299            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
6300
6301            assert_eq!(outcome.renamed, 1);
6302            assert_eq!(outcome.downloaded, 1);
6303            assert_eq!(
6304                fs.read_file("moved.mp3").as_deref(),
6305                Some(&b"ORIGINAL"[..]),
6306                "the rename must carry the original bytes, untouched by the download"
6307            );
6308            let landed = fs.read_file("shared.mp3").expect("new download must land");
6309            assert_ne!(
6310                landed, b"ORIGINAL",
6311                "the new download must replace the moved original, not corrupt it"
6312            );
6313            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
6314            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
6315        }
6316
6317        #[test]
6318        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
6319            // A systemic disk-full abort strikes the download committed before the
6320            // reformat. Because the reformat's slow render is side-effect-free and
6321            // its destination write + old-file removal only happen in the serial
6322            // commit (which the abort skips), the old file survives and the
6323            // manifest still points at it: no removed-but-referenced file.
6324            let boom = clip("boom");
6325            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
6326            d_boom.path = "boom.mp3".to_owned();
6327            let reformer = clip("r");
6328            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
6329            let plan = Plan {
6330                actions: vec![
6331                    Action::Download {
6332                        clip: boom.clone(),
6333                        lineage: LineageContext::own_root(&boom),
6334                        path: "boom.mp3".to_owned(),
6335                        format: AudioFormat::Mp3,
6336                    },
6337                    Action::Reformat {
6338                        clip: reformer.clone(),
6339                        path: "r_new.mp3".to_owned(),
6340                        from_path: "r_old.flac".to_owned(),
6341                        from: AudioFormat::Flac,
6342                        to: AudioFormat::Mp3,
6343                    },
6344                ],
6345            };
6346            let scripted = ScriptedHttp::new()
6347                .with_auth()
6348                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
6349                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
6350            let http = GatedHttp::new(scripted);
6351            // The download's write hits ENOSPC, a systemic abort.
6352            let fs = MemFs::new()
6353                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
6354                .fail_write_out_of_space("boom.mp3");
6355            let mut manifest = Manifest::new();
6356            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
6357
6358            let outcome = run_gated_fs(
6359                &plan,
6360                &mut manifest,
6361                &[d_boom, d_reformer],
6362                &http,
6363                &fs,
6364                &opts_with(4),
6365            );
6366
6367            assert_eq!(outcome.status, RunStatus::DiskFull);
6368            assert!(
6369                fs.exists("r_old.flac"),
6370                "the old file must survive the abort"
6371            );
6372            assert!(
6373                !fs.exists("r_new.mp3"),
6374                "no reformatted file may be written"
6375            );
6376            let still = manifest.get("r").expect("the manifest must still track r");
6377            assert_eq!(
6378                still.path, "r_old.flac",
6379                "the manifest must still point at the surviving old file"
6380            );
6381            assert_eq!(still.format, AudioFormat::Flac);
6382        }
6383
6384        #[test]
6385        fn a_systemic_abort_leaves_no_untracked_destination_files() {
6386            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
6387            // and the rest never commit. Every file remaining on disk must be one
6388            // the manifest tracks: producers write nothing, so an abort cannot
6389            // strand an untracked file from an in-flight or buffered render.
6390            let mut scripted = ScriptedHttp::new().with_auth();
6391            let mut actions = Vec::new();
6392            let mut desireds = Vec::new();
6393            for id in ["a0", "a1", "boom", "a3", "a4"] {
6394                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
6395                let (_c, d, action) = download(id, AudioFormat::Mp3);
6396                actions.push(action);
6397                desireds.push(d);
6398            }
6399            let http = GatedHttp::new(scripted);
6400            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
6401            let plan = Plan { actions };
6402            let mut manifest = Manifest::new();
6403
6404            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
6405
6406            assert_eq!(outcome.status, RunStatus::DiskFull);
6407            let tracked: std::collections::BTreeSet<String> = manifest
6408                .entries
6409                .values()
6410                .map(|entry| entry.path.clone())
6411                .collect();
6412            for path in fs.paths() {
6413                assert!(
6414                    tracked.contains(&path),
6415                    "found an untracked destination file: {path}"
6416                );
6417            }
6418            assert!(
6419                !fs.exists("a3.mp3"),
6420                "uncommitted renders must not be on disk"
6421            );
6422            assert!(
6423                !fs.exists("a4.mp3"),
6424                "uncommitted renders must not be on disk"
6425            );
6426        }
6427
6428        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
6429        /// live: it bumps a shared counter (tracking the peak) when a transcode
6430        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
6431        /// The [transcode, write] window is a superset of the true in-memory hold,
6432        /// so the observed peak upper-bounds the real one.
6433        struct CountingFfmpeg {
6434            inner: StubFfmpeg,
6435            held: Arc<AtomicUsize>,
6436            peak: Arc<AtomicUsize>,
6437        }
6438
6439        impl Ffmpeg for CountingFfmpeg {
6440            fn wav_to_flac(
6441                &self,
6442                wav: &[u8],
6443            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6444                let fut = self.inner.wav_to_flac(wav);
6445                let held = self.held.clone();
6446                let peak = self.peak.clone();
6447                async move {
6448                    let out = fut.await;
6449                    if out.is_ok() {
6450                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
6451                        peak.fetch_max(now, Ordering::SeqCst);
6452                    }
6453                    out
6454                }
6455            }
6456
6457            fn mp4_to_webp(
6458                &self,
6459                mp4: &[u8],
6460                settings: WebpEncodeSettings,
6461            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6462                self.inner.mp4_to_webp(mp4, settings)
6463            }
6464        }
6465
6466        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
6467        /// payload counter on each committing write, closing the window opened by
6468        /// [`CountingFfmpeg`].
6469        struct CountingFs {
6470            inner: MemFs,
6471            held: Arc<AtomicUsize>,
6472        }
6473
6474        impl Filesystem for CountingFs {
6475            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
6476                let out = self.inner.write_atomic(path, bytes);
6477                self.held.fetch_sub(1, Ordering::SeqCst);
6478                out
6479            }
6480
6481            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
6482                self.inner.rename(from, to)
6483            }
6484
6485            fn remove(&self, path: &str) -> Result<(), FsError> {
6486                self.inner.remove(path)
6487            }
6488
6489            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
6490                self.inner.prune_empty_dirs(root)
6491            }
6492
6493            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
6494                self.inner.read(path)
6495            }
6496
6497            fn metadata(&self, path: &str) -> Option<FileStat> {
6498                self.inner.metadata(path)
6499            }
6500        }
6501
6502        #[test]
6503        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
6504            // Far more FLAC clips than the concurrency bound. The ordered buffered
6505            // render keeps at most about `concurrency` transcoded payloads live at
6506            // once (never the whole library), so peak held <= concurrency + 1.
6507            let count = 12;
6508            let concurrency = 3;
6509            let mut scripted = ScriptedHttp::new().with_auth();
6510            let mut actions = Vec::new();
6511            let mut desireds = Vec::new();
6512            for i in 0..count {
6513                let id = format!("f{i}");
6514                scripted = scripted
6515                    .route(
6516                        &format!("/gen/{id}/wav_file/"),
6517                        Reply::json(&format!(
6518                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
6519                        )),
6520                    )
6521                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
6522                let (_c, d, action) = download(&id, AudioFormat::Flac);
6523                actions.push(action);
6524                desireds.push(d);
6525            }
6526            let http = GatedHttp::new(scripted);
6527            let held = Arc::new(AtomicUsize::new(0));
6528            let peak = Arc::new(AtomicUsize::new(0));
6529            let ffmpeg = CountingFfmpeg {
6530                inner: StubFfmpeg::flac(),
6531                held: held.clone(),
6532                peak: peak.clone(),
6533            };
6534            let fs = CountingFs {
6535                inner: MemFs::new(),
6536                held: held.clone(),
6537            };
6538            let clock = RecordingClock::new();
6539            let mut albums = BTreeMap::new();
6540            let mut playlists = BTreeMap::new();
6541            let mut manifest = Manifest::new();
6542            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6543            let plan = Plan { actions };
6544
6545            let outcome = pollster::block_on(execute(
6546                &plan,
6547                &mut manifest,
6548                &mut albums,
6549                &mut playlists,
6550                &desireds,
6551                &HashMap::new(),
6552                Ports {
6553                    client: &mut client,
6554                    http: &http,
6555                    fs: &fs,
6556                    ffmpeg: &ffmpeg,
6557                    clock: &clock,
6558                },
6559                &opts_with(concurrency),
6560            ));
6561
6562            assert_eq!(outcome.downloaded, count as usize);
6563            assert_eq!(
6564                held.load(Ordering::SeqCst),
6565                0,
6566                "every payload must be committed"
6567            );
6568            assert!(
6569                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
6570                "peak live payloads {} exceeded the bound {}",
6571                peak.load(Ordering::SeqCst),
6572                concurrency + 1
6573            );
6574            assert!(
6575                peak.load(Ordering::SeqCst) >= 2,
6576                "the render should genuinely overlap, peak was {}",
6577                peak.load(Ordering::SeqCst)
6578            );
6579        }
6580    }
6581}