Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` is the only `&mut` port: it performs the authenticated WAV render
158/// flow and so mutates its cached session. The rest are shared references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a mut SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// The audio-producing actions ([`Download`](Action::Download) and
186/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
187/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
188/// transcode, tag) overlap while the order-sensitive Suno API calls are
189/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
190/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
191/// rename, delete, and artifact writes/deletes) then run serially in plan order.
192///
193/// The outcome is deterministic regardless of completion order: concurrent audio
194/// results are committed to the manifest in plan-index order, so the same plan
195/// always yields the same manifest and counts whatever the concurrency level. A
196/// per-clip failure is recorded and the run continues; only an auth failure or a
197/// full disk aborts, and it does so promptly by stopping further audio work.
198///
199/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
200/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
201/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
202/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
203/// run with the feature off) is tagged exactly as before. The synced `.lrc`
204/// sidecar itself is a generated artifact whose body the caller has already
205/// resolved into the plan, so it is written like any other text sidecar.
206#[allow(clippy::too_many_arguments)]
207pub async fn execute<H, F, G, C>(
208    plan: &Plan,
209    manifest: &mut Manifest,
210    albums: &mut BTreeMap<String, AlbumArt>,
211    playlists: &mut BTreeMap<String, PlaylistState>,
212    desired: &[Desired],
213    synced: &HashMap<String, AlignedLyrics>,
214    ports: Ports<'_, H, F, G, C>,
215    opts: &ExecOptions,
216) -> ExecOutcome
217where
218    H: Http,
219    F: Filesystem,
220    G: Ffmpeg,
221    C: Clock,
222{
223    let Ports {
224        client,
225        http,
226        fs,
227        ffmpeg,
228        clock,
229    } = ports;
230    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
231    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
232    // Every path this run writes, so the inline old-sidecar cleanup never removes
233    // a file another action produces this run (the non-planned twin of
234    // `suppress_path_aliasing`).
235    let write_targets: BTreeSet<String> = plan
236        .actions
237        .iter()
238        .filter_map(|a| match a {
239            Action::Download { path, .. }
240            | Action::Reformat { path, .. }
241            | Action::WriteArtifact { path, .. }
242            | Action::WriteStem { path, .. } => Some(path.clone()),
243            Action::Rename { to, .. } => Some(to.clone()),
244            _ => None,
245        })
246        .collect();
247    // How many tracked artifact slots reference each path. The inline old-path
248    // cleanup removes a path only once nothing else holds it: each slot that
249    // moves away decrements its reference, and the removal fires only when the
250    // count reaches zero and no action writes the path this run. This keeps a
251    // live file a co-referencing slot still owns (a prior failed swap can leave
252    // two clips sharing a path) while letting the last slot to leave reclaim it,
253    // so nothing is orphaned either (#76).
254    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
255    for (_, entry) in manifest.iter() {
256        for path in entry.artifact_paths() {
257            *tracked_paths.entry(path.to_owned()).or_default() += 1;
258        }
259    }
260    for art in albums.values() {
261        for state in [
262            art.folder_jpg.as_ref(),
263            art.folder_webp.as_ref(),
264            art.folder_mp4.as_ref(),
265        ]
266        .into_iter()
267        .flatten()
268        {
269            *tracked_paths.entry(state.path.clone()).or_default() += 1;
270        }
271    }
272    for playlist in playlists.values() {
273        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
274    }
275    // Static cover art is otherwise fetched twice per clip (#89): once to embed
276    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
277    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
278    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
279    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
280    // its entry on use, so the map holds at most the covers for the clips in
281    // flight (bounded by `concurrency`), never the whole library.
282    let cover_wanted: HashSet<&str> = plan
283        .actions
284        .iter()
285        .filter_map(|action| match action {
286            Action::WriteArtifact {
287                kind: ArtifactKind::CoverJpg,
288                source_url,
289                ..
290            } if !source_url.is_empty() => Some(source_url.as_str()),
291            _ => None,
292        })
293        .collect();
294    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
295    // The `both` video-cover retention keeps `cover.webp` (transcoded) and
296    // `cover.mp4` (raw) for an album from the SAME `video_cover_url`. Cache that
297    // source on its first fetch so the second folder artifact drains it rather
298    // than fetching the same MP4 twice (#90 reuses the #89 fetch-once path).
299    let mut folder_cover_uses: HashMap<&str, u32> = HashMap::new();
300    for action in &plan.actions {
301        if let Action::WriteArtifact {
302            kind: ArtifactKind::FolderWebp | ArtifactKind::FolderMp4,
303            source_url,
304            ..
305        } = action
306            && !source_url.is_empty()
307        {
308            *folder_cover_uses.entry(source_url.as_str()).or_default() += 1;
309        }
310    }
311    let shared_cover_urls: HashSet<&str> = folder_cover_uses
312        .into_iter()
313        .filter(|(_, uses)| *uses > 1)
314        .map(|(url, _)| url)
315        .collect();
316    let ctx = Ctx {
317        http,
318        fs,
319        ffmpeg,
320        clock,
321        opts,
322        by_id: &by_id,
323        by_path: &by_path,
324        synced,
325        write_targets: &write_targets,
326        cover_cache: &cover_cache,
327        cover_wanted: &cover_wanted,
328        shared_cover_urls: &shared_cover_urls,
329    };
330
331    let mut outcome = ExecOutcome::default();
332
333    // The audio-producing actions ([`Download`](Action::Download) /
334    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
335    // deliberately split so that NO destination write, file removal, or manifest
336    // update happens off the plan's order:
337    //
338    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
339    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
340    //   tag), returning the tagged bytes; and
341    // - a single serial committer below writes those bytes to the destination,
342    //   removes any superseded file, and records the manifest entry, in strict
343    //   plan-index order, interleaved with the non-audio actions.
344    //
345    // The shared client is the only `&mut` port and its API calls must stay
346    // ordered, so it rides behind an async mutex; each producer locks it only for
347    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
348    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
349    // so at most about `concurrency` tagged payloads are ever held in memory -
350    // never the whole library.
351    let client_lock = AsyncMutex::new(client);
352    let concurrency = opts.concurrency.max(1) as usize;
353    let ctx_ref = &ctx;
354    let client_lock_ref = &client_lock;
355    let mut renders = stream::iter(
356        plan.actions
357            .iter()
358            .filter(|action| is_audio_action(action))
359            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
360    )
361    .buffered(concurrency);
362
363    for action in &plan.actions {
364        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
365        // commit them here; every other action applies its own effect. Both the
366        // audio commit and the non-audio apply run serially, so all destination
367        // and manifest effects keep the plan's order exactly as the sequential
368        // executor did.
369        let result = if is_audio_action(action) {
370            match renders.next().await {
371                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
372                Some(Err(fail)) => Err(fail),
373                None => unreachable!("buffered yields one result per audio action"),
374            }
375        } else {
376            ctx.apply(
377                client_lock_ref,
378                action,
379                manifest,
380                albums,
381                playlists,
382                &mut tracked_paths,
383            )
384            .await
385        };
386        match result {
387            Ok(effect) => outcome.record(effect),
388            Err(fail) => {
389                let abort = abort_status(fail.class);
390                outcome.failures.push(Failure {
391                    clip_id: fail.clip_id,
392                    reason: fail.reason,
393                });
394                if let Some(status) = abort {
395                    // A systemic abort stops the run. Dropping the render stream
396                    // cancels any in-flight or completed-but-uncommitted producer;
397                    // because producers touch nothing on disk, the destination and
398                    // manifest are left exactly as the committed prefix wrote them,
399                    // with no untracked files and no removed-but-referenced file.
400                    outcome.status = status;
401                    break;
402                }
403            }
404        }
405    }
406    drop(renders);
407
408    // Renames and deletes can leave an album directory empty; prune those ghost
409    // directories bottom-up. This runs on both the completed and the aborted
410    // paths, and is best-effort: a prune failure is only a missed tidy that the
411    // next run repeats, never a reason to fail the run.
412    let _ = fs.prune_empty_dirs("");
413    outcome
414}
415
416/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
417/// file. Its slow render runs in the concurrent phase; its destination write and
418/// manifest update are committed serially in plan order. Everything else touches
419/// the manifest, album, or playlist stores directly and runs serially.
420fn is_audio_action(action: &Action) -> bool {
421    matches!(action, Action::Download { .. } | Action::Reformat { .. })
422}
423
424/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
425/// committer needs to place them. Produced concurrently and side-effect-free (no
426/// destination write, no removal, no manifest touch); [`commit_audio`] applies
427/// all of those in plan order.
428struct RenderedAudio {
429    clip_id: String,
430    path: String,
431    format: AudioFormat,
432    /// The superseded file to remove after the new one lands (a [`Reformat`]),
433    /// or `None` for a plain [`Download`].
434    from_path: Option<String>,
435    effect: Effect,
436    bytes: Vec<u8>,
437}
438
439/// What an applied action did, for the outcome counters.
440enum Effect {
441    Downloaded,
442    Reformatted,
443    Retagged,
444    Renamed,
445    Deleted,
446    Skipped,
447    ArtifactWritten,
448    ArtifactDeleted,
449}
450
451/// How a failure should be handled (SYNC-17).
452#[derive(Debug, Clone, Copy)]
453enum Class {
454    /// Stop the account run; do not retry.
455    Auth,
456    /// Stop the account run: a full disk is systemic, like auth, so aborting
457    /// beats skipping every remaining clip (each of which would first burn a
458    /// server-side WAV-render budget before failing the same way).
459    Disk,
460    /// Retry a bounded number of times, then record and skip.
461    Transient,
462    /// Record and skip immediately.
463    Permanent,
464}
465
466/// A classified action failure attributed to a clip.
467struct Fail {
468    class: Class,
469    clip_id: String,
470    reason: String,
471}
472
473/// The run-ending status for a failure class, or `None` when the failure is
474/// per-clip and the run continues.
475fn abort_status(class: Class) -> Option<RunStatus> {
476    match class {
477        Class::Auth => Some(RunStatus::AuthAborted),
478        Class::Disk => Some(RunStatus::DiskFull),
479        Class::Transient | Class::Permanent => None,
480    }
481}
482
483fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
484    Fail {
485        class: Class::Auth,
486        clip_id: clip_id.into(),
487        reason: reason.into(),
488    }
489}
490
491fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
492    Fail {
493        class: Class::Transient,
494        clip_id: clip_id.into(),
495        reason: reason.into(),
496    }
497}
498
499fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
500    Fail {
501        class: Class::Permanent,
502        clip_id: clip_id.into(),
503        reason: reason.into(),
504    }
505}
506
507fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
508    Fail {
509        class: Class::Disk,
510        clip_id: clip_id.into(),
511        reason: reason.into(),
512    }
513}
514
515/// Whether an artifact kind is album-scoped folder art (owned by a root id and
516/// recorded on the album store) rather than a per-clip sidecar (recorded on the
517/// manifest).
518fn is_album_kind(kind: ArtifactKind) -> bool {
519    matches!(
520        kind,
521        ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4
522    )
523}
524
525/// True for the library-scoped playlist artifact, routed to the playlist store.
526fn is_playlist_kind(kind: ArtifactKind) -> bool {
527    matches!(kind, ArtifactKind::Playlist)
528}
529
530/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
531/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
532/// root/playlist id that is deliberately absent from the manifest.
533fn is_per_clip_kind(kind: ArtifactKind) -> bool {
534    matches!(
535        kind,
536        ArtifactKind::CoverJpg
537            | ArtifactKind::CoverWebp
538            | ArtifactKind::DetailsTxt
539            | ArtifactKind::LyricsTxt
540            | ArtifactKind::Lrc
541            | ArtifactKind::VideoMp4
542    )
543}
544
545/// Recover a playlist's display name from its `.m3u8` path's file stem.
546///
547/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
548/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
549/// this recovered name is a convenience for humans and its lossiness (the
550/// sanitiser is not reversible) never affects a decision.
551fn playlist_name_from_path(path: &str) -> String {
552    std::path::Path::new(path)
553        .file_stem()
554        .map(|stem| stem.to_string_lossy().into_owned())
555        .unwrap_or_default()
556}
557
558/// A classified fetch failure, not yet attributed to a clip.
559struct FetchError {
560    class: Class,
561    reason: String,
562    retry_after: Option<Duration>,
563}
564
565impl FetchError {
566    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
567        Self {
568            class: Class::Transient,
569            reason: reason.into(),
570            retry_after,
571        }
572    }
573
574    fn permanent(reason: impl Into<String>) -> Self {
575        Self {
576            class: Class::Permanent,
577            reason: reason.into(),
578            retry_after: None,
579        }
580    }
581
582    fn attribute(self, clip_id: &str) -> Fail {
583        Fail {
584            class: self.class,
585            clip_id: clip_id.to_owned(),
586            reason: self.reason,
587        }
588    }
589}
590
591/// The shared, read-only context threaded through every action handler.
592struct Ctx<'a, H, F, G, C> {
593    http: &'a H,
594    fs: &'a F,
595    ffmpeg: &'a G,
596    clock: &'a C,
597    opts: &'a ExecOptions,
598    by_id: &'a HashMap<&'a str, &'a Desired>,
599    by_path: &'a HashMap<&'a str, &'a Desired>,
600    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
601    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
602    /// lyric text; a clip absent here is tagged exactly as before. Populated by
603    /// the caller (the fetch is IO), so the engine stays free of direct IO.
604    synced: &'a HashMap<String, AlignedLyrics>,
605    /// Every destination path this run writes (audio downloads and reformats,
606    /// artifact writes, and rename targets). The inline old-sidecar cleanup in
607    /// [`write_artifact`](Ctx::write_artifact) skips any path in this set, so a
608    /// path swap between two clips can never delete a file the same run just
609    /// wrote. This mirrors [`suppress_path_aliasing`] for the one removal that
610    /// is not itself a planned action.
611    write_targets: &'a BTreeSet<String>,
612    /// Static cover art the audio producer already fetched to embed in the tag,
613    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
614    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
615    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
616    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
617    /// concurrent producers only ever insert, and the lock is never held across an
618    /// await.
619    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
620    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
621    /// a cover only when its URL is here, so a clip whose cover is embedded but
622    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
623    cover_wanted: &'a HashSet<&'a str>,
624    /// Album video-cover source URLs fetched by more than one folder artifact
625    /// this run. The `both` retention derives `cover.webp` (transcoded) and
626    /// `cover.mp4` (raw) from the SAME `video_cover_url`; the first fetch caches
627    /// the raw source here so the sibling drains it instead of re-fetching (#90
628    /// reuses the #89 fetch-once path). `FolderWebp` sorts before `FolderMp4`, so
629    /// the raw source is always cached before the raw sidecar reads it.
630    shared_cover_urls: &'a HashSet<&'a str>,
631}
632
633impl<H, F, G, C> Ctx<'_, H, F, G, C>
634where
635    H: Http,
636    F: Filesystem,
637    G: Ffmpeg,
638    C: Clock,
639{
640    /// Apply one non-audio action, returning what it did or why it failed.
641    ///
642    /// Audio actions ([`Download`](Action::Download) /
643    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
644    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
645    async fn apply(
646        &self,
647        client_lock: &ClientLock<'_, C>,
648        action: &Action,
649        manifest: &mut Manifest,
650        albums: &mut BTreeMap<String, AlbumArt>,
651        playlists: &mut BTreeMap<String, PlaylistState>,
652        tracked_paths: &mut HashMap<String, u32>,
653    ) -> Result<Effect, Fail> {
654        match action {
655            Action::Download { .. } | Action::Reformat { .. } => {
656                unreachable!("audio actions are applied in the concurrent phase")
657            }
658            Action::Retag {
659                clip,
660                lineage,
661                path,
662            } => self.retag(manifest, clip, lineage, path).await,
663            Action::Rename { from, to } => self.rename(manifest, from, to),
664            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
665            Action::Skip { clip_id } => {
666                self.refresh_preserve(manifest, clip_id);
667                Ok(Effect::Skipped)
668            }
669            Action::WriteArtifact {
670                kind,
671                path,
672                source_url,
673                hash,
674                owner_id,
675                content,
676            } => {
677                self.write_artifact(
678                    manifest,
679                    albums,
680                    playlists,
681                    *kind,
682                    path,
683                    source_url,
684                    hash,
685                    owner_id,
686                    content.as_deref(),
687                    tracked_paths,
688                )
689                .await
690            }
691            Action::DeleteArtifact {
692                kind,
693                path,
694                owner_id,
695            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
696            Action::WriteStem {
697                clip_id,
698                key,
699                stem_id,
700                path,
701                source_url,
702                format,
703                hash,
704            } => {
705                self.write_stem(
706                    client_lock,
707                    manifest,
708                    clip_id,
709                    key,
710                    stem_id,
711                    path,
712                    source_url,
713                    *format,
714                    hash,
715                )
716                .await
717            }
718            Action::DeleteStem { clip_id, key, path } => {
719                self.delete_stem(manifest, clip_id, key, path)
720            }
721        }
722    }
723
724    /// Render one audio action's tagged bytes, side-effect-free.
725    ///
726    /// This is the concurrent part: it fetches, transcodes, and tags the file
727    /// (through shared ports, plus the client behind `client_lock`), then returns
728    /// the bytes and where they must go. It deliberately writes nothing, removes
729    /// nothing, and never touches `manifest`, so many run at once and an aborted
730    /// run can drop them with no destination or manifest effect. The serial
731    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
732    async fn prepare_audio(
733        &self,
734        client_lock: &ClientLock<'_, C>,
735        action: &Action,
736    ) -> Result<RenderedAudio, Fail> {
737        match action {
738            Action::Download {
739                clip,
740                lineage,
741                path,
742                format,
743            } => {
744                let bytes = self
745                    .produce_audio(client_lock, clip, lineage, *format)
746                    .await?;
747                Ok(RenderedAudio {
748                    clip_id: clip.id.clone(),
749                    path: path.clone(),
750                    format: *format,
751                    from_path: None,
752                    effect: Effect::Downloaded,
753                    bytes,
754                })
755            }
756            Action::Reformat {
757                clip,
758                path,
759                from_path,
760                from: _,
761                to,
762            } => {
763                // A Reformat action carries no lineage, so recover it from the
764                // desired set (the same context that drove naming and the hash),
765                // falling back to a self-rooted context when the clip is not in
766                // the current selection.
767                let lineage = self
768                    .by_id
769                    .get(clip.id.as_str())
770                    .map(|d| d.lineage.clone())
771                    .unwrap_or_else(|| LineageContext::own_root(clip));
772                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
773                Ok(RenderedAudio {
774                    clip_id: clip.id.clone(),
775                    path: path.clone(),
776                    format: *to,
777                    from_path: Some(from_path.clone()),
778                    effect: Effect::Reformatted,
779                    bytes,
780                })
781            }
782            _ => unreachable!("prepare_audio only handles audio actions"),
783        }
784    }
785
786    /// Commit one rendered audio result serially, in plan order.
787    ///
788    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
789    /// the superseded file, then records the manifest entry. Ordering the write
790    /// before the removal keeps a crash from losing both copies; keeping all of
791    /// this off the concurrent phase preserves the sequential executor's plan-order
792    /// guarantee for every destination and manifest effect.
793    fn commit_audio(
794        &self,
795        manifest: &mut Manifest,
796        rendered: RenderedAudio,
797    ) -> Result<Effect, Fail> {
798        let RenderedAudio {
799            clip_id,
800            path,
801            format,
802            from_path,
803            effect,
804            bytes,
805        } = rendered;
806        let size = self.write_verify(&clip_id, &path, &bytes)?;
807        if let Some(from) = from_path {
808            // The new file is safely in place; only now drop the old rendering.
809            self.fs.remove(&from).map_err(|err| {
810                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
811            })?;
812        }
813        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
814        Ok(effect)
815    }
816
817    /// Re-tag the existing file in place to match current metadata and art.
818    async fn retag(
819        &self,
820        manifest: &mut Manifest,
821        clip: &Clip,
822        lineage: &LineageContext,
823        path: &str,
824    ) -> Result<Effect, Fail> {
825        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
826            return Err(permanent_fail(
827                &clip.id,
828                "retag target missing from manifest",
829            ));
830        };
831
832        if format == AudioFormat::Wav {
833            // WAV carries no embedded tags; just record the new hashes so the
834            // next run sees them as current and stops retagging.
835            self.refresh_hashes(manifest, &clip.id, None);
836            return Ok(Effect::Retagged);
837        }
838
839        let (meta, synced) = self.track_meta(clip, lineage);
840        let cover = self.fetch_cover(clip).await;
841        let existing = self
842            .fs
843            .read(path)
844            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
845        let tagged = match format {
846            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
847            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
848            AudioFormat::Wav => unreachable!("WAV handled above"),
849        }
850        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
851        let size = self.write_verify(&clip.id, path, &tagged)?;
852        self.refresh_hashes(manifest, &clip.id, Some(size));
853        Ok(Effect::Retagged)
854    }
855
856    /// Move the file and update the entry's path (and protection).
857    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
858        let label = self
859            .by_path
860            .get(to)
861            .map(|d| d.clip.id.clone())
862            .unwrap_or_else(|| to.to_owned());
863        self.fs.rename(from, to).map_err(|err| {
864            if err.is_out_of_space() {
865                disk_fail(label, "disk full: no space left to rename")
866            } else {
867                permanent_fail(label, format!("rename failed: {err}"))
868            }
869        })?;
870
871        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
872            manifest
873                .entries
874                .iter()
875                .find(|(_, entry)| entry.path == from)
876                .map(|(id, _)| id.clone())
877        });
878        if let Some(id) = clip_id
879            && let Some(entry) = manifest.entries.get_mut(&id)
880        {
881            entry.path = to.to_owned();
882            if let Some(d) = self.by_path.get(to) {
883                entry.preserve = preserve_for(d);
884            }
885        }
886        Ok(Effect::Renamed)
887    }
888
889    /// Remove the file and drop the manifest entry.
890    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
891        self.fs
892            .remove(path)
893            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
894        manifest.remove(clip_id);
895        Ok(Effect::Deleted)
896    }
897
898    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
899    /// on the owning manifest entry.
900    ///
901    /// The fetch and write share the audio path's resilience: `fetch_bytes`
902    /// retries transient failures and verifies `Content-Length`, and
903    /// `write_verify` confirms the on-disk size. A failure is attributed to the
904    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
905    /// aborts the whole run (only an auth failure or a full disk does, matching
906    /// audio).
907    ///
908    /// The bytes written depend on the kind: a static cover is the fetched image
909    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
910    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
911    ///
912    /// A sidecar is only ever written for a clip whose audio is present: a
913    /// successful `Download`/`Reformat` creates the manifest entry earlier in
914    /// this run, and a prior-run clip already has one. So an absent owning entry
915    /// means the audio failed or never existed this run; we skip (no fetch, no
916    /// write) rather than strand an untracked sidecar with no owning audio.
917    ///
918    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg),
919    /// [`FolderWebp`](ArtifactKind::FolderWebp), and
920    /// [`FolderMp4`](ArtifactKind::FolderMp4)) is album-scoped: its `owner_id`
921    /// is the album's stable root id, not a manifest clip, so it skips the
922    /// manifest presence guard and records its state on the album store instead.
923    ///
924    /// When a title or album change moves the audio, reconcile re-emits this
925    /// write at the NEW path; this handler then removes the sidecar left at the
926    /// artifact's previously tracked path, moving it rather than orphaning it.
927    /// The removal happens only after the new file is safely written, and a
928    /// remove failure returns before the state slot advances, so the next run
929    /// re-plans the identical write and retries — self-healing, never an orphan.
930    #[allow(clippy::too_many_arguments)]
931    async fn write_artifact(
932        &self,
933        manifest: &mut Manifest,
934        albums: &mut BTreeMap<String, AlbumArt>,
935        playlists: &mut BTreeMap<String, PlaylistState>,
936        kind: ArtifactKind,
937        path: &str,
938        source_url: &str,
939        hash: &str,
940        owner_id: &str,
941        content: Option<&str>,
942        tracked_paths: &mut HashMap<String, u32>,
943    ) -> Result<Effect, Fail> {
944        // A per-song sidecar needs its owning clip's manifest entry; album and
945        // playlist kinds are keyed elsewhere and skip this guard.
946        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
947            // The owning audio never landed this run, so this sidecar is skipped
948            // and will never drain a cover the producer cached for it. Drop that
949            // entry now: an insert without a matching sidecar write must not
950            // outlive its clip, keeping `cover_cache` bounded to the clips in
951            // flight (#89). A non-cover kind has no entry here, so this is a
952            // harmless no-op for them.
953            self.cover_cache
954                .lock()
955                .expect("cover cache mutex poisoned")
956                .remove(source_url);
957            return Ok(Effect::Skipped);
958        }
959        // Capture the path this artifact was last tracked at, BEFORE the slot is
960        // overwritten below, so a path-changing write (a title/album rename that
961        // moves the audio) can clean up the old sidecar it left behind. Cover
962        // kinds live on the manifest, folder kinds on the album store; playlists
963        // reconcile their own old-path delete and so opt out here.
964        let old_path = match kind {
965            ArtifactKind::CoverJpg => manifest
966                .get(owner_id)
967                .and_then(|e| e.cover_jpg.as_ref())
968                .map(|s| s.path.clone()),
969            ArtifactKind::CoverWebp => manifest
970                .get(owner_id)
971                .and_then(|e| e.cover_webp.as_ref())
972                .map(|s| s.path.clone()),
973            ArtifactKind::DetailsTxt => manifest
974                .get(owner_id)
975                .and_then(|e| e.details_txt.as_ref())
976                .map(|s| s.path.clone()),
977            ArtifactKind::LyricsTxt => manifest
978                .get(owner_id)
979                .and_then(|e| e.lyrics_txt.as_ref())
980                .map(|s| s.path.clone()),
981            ArtifactKind::Lrc => manifest
982                .get(owner_id)
983                .and_then(|e| e.lrc.as_ref())
984                .map(|s| s.path.clone()),
985            ArtifactKind::VideoMp4 => manifest
986                .get(owner_id)
987                .and_then(|e| e.video_mp4.as_ref())
988                .map(|s| s.path.clone()),
989            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4 => albums
990                .get(owner_id)
991                .and_then(|a| a.artifact(kind))
992                .map(|s| s.path.clone()),
993            ArtifactKind::Playlist => None,
994        };
995        // A generated artifact (a playlist) carries its body inline and never
996        // touches the network; a fetched one pulls (and transcodes) its source.
997        let bytes = match content {
998            Some(text) => text.as_bytes().to_vec(),
999            None => self.artifact_bytes(kind, source_url, owner_id).await?,
1000        };
1001        self.write_verify(owner_id, path, &bytes)?;
1002        // The new sidecar is safely in place; only now drop a stale copy left at
1003        // the previous path (the audio moved). `remove` is idempotent, so an
1004        // already-absent old file is fine. On a genuine remove failure we return
1005        // BEFORE updating the slot, leaving the manifest/album pointing at the
1006        // old path: the next run sees the same path drift, re-plans this write,
1007        // and retries the cleanup — convergent, no orphan persists.
1008        //
1009        // The removal is gated so it can never delete a live file (#76). This
1010        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
1011        // file is removed only once nothing else holds it — no other tracked slot
1012        // still references it (count now zero) and no action writes it this run
1013        // (`write_targets`, the non-planned twin of `suppress_path_aliasing`).
1014        // On a path swap (A: x -> y while B: y -> x) `write_targets` keeps each
1015        // freshly written file; when two slots share a path after a prior failed
1016        // swap, the first to move keeps it and the last to leave reclaims it, so
1017        // a co-owned file is never deleted and a vacated one is never orphaned.
1018        if let Some(old) = old_path.as_deref()
1019            && !old.is_empty()
1020            && old != path
1021        {
1022            let still_referenced = tracked_paths
1023                .get_mut(old)
1024                .map(|count| {
1025                    *count = count.saturating_sub(1);
1026                    *count > 0
1027                })
1028                .unwrap_or(false);
1029            if !still_referenced && !self.write_targets.contains(old) {
1030                self.fs.remove(old).map_err(|err| {
1031                    permanent_fail(
1032                        owner_id,
1033                        format!("could not remove old sidecar {old}: {err}"),
1034                    )
1035                })?;
1036            }
1037        }
1038        if is_album_kind(kind) {
1039            albums.entry(owner_id.to_owned()).or_default().set(
1040                kind,
1041                Some(ArtifactState {
1042                    path: path.to_owned(),
1043                    hash: hash.to_owned(),
1044                }),
1045            );
1046        } else if is_playlist_kind(kind) {
1047            playlists.insert(
1048                owner_id.to_owned(),
1049                PlaylistState {
1050                    name: playlist_name_from_path(path),
1051                    path: path.to_owned(),
1052                    hash: hash.to_owned(),
1053                },
1054            );
1055        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1056            set_manifest_artifact(
1057                entry,
1058                kind,
1059                Some(ArtifactState {
1060                    path: path.to_owned(),
1061                    hash: hash.to_owned(),
1062                }),
1063            );
1064        }
1065        Ok(Effect::ArtifactWritten)
1066    }
1067
1068    /// Produce a sidecar's bytes from its source, branching on kind.
1069    ///
1070    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1071    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1072    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1073    /// ffmpeg port; every other kind is the fetched source verbatim (the static
1074    /// [`CoverJpg`](ArtifactKind::CoverJpg) / album [`FolderJpg`](ArtifactKind::FolderJpg)
1075    /// image, or the raw album [`FolderMp4`](ArtifactKind::FolderMp4) whose
1076    /// `video_cover_url` is kept untranscoded). A fetch or transcode failure
1077    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1078    /// disk-full transcode, which aborts the run like the audio FLAC path.
1079    async fn artifact_bytes(
1080        &self,
1081        kind: ArtifactKind,
1082        source_url: &str,
1083        owner_id: &str,
1084    ) -> Result<Vec<u8>, Fail> {
1085        // Reuse the cover the audio producer already fetched for the embedded tag
1086        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1087        // is taken and dropped in its own statement so it never spans the await.
1088        let cached = self
1089            .cover_cache
1090            .lock()
1091            .expect("cover cache mutex poisoned")
1092            .remove(source_url);
1093        let source = match cached {
1094            Some(bytes) => bytes,
1095            None => {
1096                let fetched = self
1097                    .fetch_bytes(source_url)
1098                    .await
1099                    .map_err(|err| err.attribute(owner_id))?;
1100                // Cache the raw source when a sibling folder artifact will fetch
1101                // the same URL (the `both` retention: cover.webp + cover.mp4), so
1102                // it is fetched exactly once. Bounded to shared URLs and drained
1103                // on the sibling's use.
1104                if self.shared_cover_urls.contains(source_url) {
1105                    self.cover_cache
1106                        .lock()
1107                        .expect("cover cache mutex poisoned")
1108                        .insert(source_url.to_owned(), fetched.clone());
1109                }
1110                fetched
1111            }
1112        };
1113        match kind {
1114            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1115                .ffmpeg
1116                .mp4_to_webp(&source, self.opts.cover_webp)
1117                .await
1118                .map_err(|err| {
1119                    if err.is_out_of_space() {
1120                        disk_fail(owner_id, "disk full: no space left to transcode")
1121                    } else {
1122                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1123                    }
1124                }),
1125            // The text sidecars are generated and always carry inline content, so
1126            // `write_artifact` never reaches this fetch path for them. Guard it so
1127            // a future miswiring fails loudly rather than fetching a URL.
1128            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1129                permanent_fail(owner_id, "text sidecar requires inline content"),
1130            ),
1131            ArtifactKind::CoverJpg
1132            | ArtifactKind::FolderJpg
1133            | ArtifactKind::FolderMp4
1134            | ArtifactKind::Playlist
1135            | ArtifactKind::VideoMp4 => Ok(source),
1136        }
1137    }
1138
1139    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1140    ///
1141    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1142    /// When the owning entry is already gone (its audio was deleted earlier this
1143    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1144    ///
1145    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1146    /// the album's root id, not on a manifest clip.
1147    ///
1148    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1149    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1150    /// untracked, but the design stays convergent rather than transactional: the
1151    /// next run re-plans the same removal and retries, and any directory it would
1152    /// have emptied is pruned once the file finally clears.
1153    fn delete_artifact(
1154        &self,
1155        manifest: &mut Manifest,
1156        albums: &mut BTreeMap<String, AlbumArt>,
1157        playlists: &mut BTreeMap<String, PlaylistState>,
1158        kind: ArtifactKind,
1159        path: &str,
1160        owner_id: &str,
1161    ) -> Result<Effect, Fail> {
1162        self.fs
1163            .remove(path)
1164            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1165        if is_album_kind(kind) {
1166            if let Some(art) = albums.get_mut(owner_id) {
1167                art.set(kind, None);
1168                if art.is_empty() {
1169                    albums.remove(owner_id);
1170                }
1171            }
1172        } else if is_playlist_kind(kind) {
1173            playlists.remove(owner_id);
1174        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1175            set_manifest_artifact(entry, kind, None);
1176        }
1177        Ok(Effect::ArtifactDeleted)
1178    }
1179
1180    /// Fetch one stem's bytes, write them atomically, then record the stem on
1181    /// the owning clip's keyed stem map.
1182    ///
1183    /// Mirrors [`write_artifact`](Self::write_artifact) for the keyed-stem case,
1184    /// sharing the fetch resilience (`fetch_bytes` retries and verifies
1185    /// `Content-Length`) and the atomic size-verified write. A stem is only ever
1186    /// written for a clip whose audio is present, so an absent owning manifest
1187    /// entry means the audio failed or never existed this run; we skip rather
1188    /// than strand an untracked stem with no owning audio.
1189    ///
1190    /// Stems are stored RAW in their native container and are NEVER transcoded to
1191    /// FLAC, even when the song's own format is FLAC — they are the deliberate
1192    /// exception. A `Wav` stem is rendered through the free `convert_wav` flow
1193    /// (see [`fetch_stem_bytes`](Self::fetch_stem_bytes)); an `Mp3` stem is fetched
1194    /// straight from its public CDN url. Either way the bytes land verbatim at
1195    /// `path`, whose extension already matches the stem format.
1196    ///
1197    /// When a title/album change moves the song, reconcile re-emits this write at
1198    /// the NEW path; this handler then removes the stem left at the previously
1199    /// tracked path, moving it rather than orphaning it. The removal happens only
1200    /// after the new file is safely written and only when nothing else this run
1201    /// writes that path, and a remove failure returns before the slot advances so
1202    /// the next run re-plans the identical write and retries — self-healing.
1203    #[allow(clippy::too_many_arguments)]
1204    async fn write_stem(
1205        &self,
1206        client_lock: &ClientLock<'_, C>,
1207        manifest: &mut Manifest,
1208        clip_id: &str,
1209        key: &str,
1210        stem_id: &str,
1211        path: &str,
1212        source_url: &str,
1213        format: StemFormat,
1214        hash: &str,
1215    ) -> Result<Effect, Fail> {
1216        // A stem needs its owning clip's manifest entry (its audio must exist).
1217        if manifest.get(clip_id).is_none() {
1218            return Ok(Effect::Skipped);
1219        }
1220        let old_path = manifest
1221            .get(clip_id)
1222            .and_then(|e| e.stems.get(key))
1223            .map(|s| s.path.clone());
1224        let bytes = self
1225            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1226            .await?;
1227        self.write_verify(clip_id, path, &bytes)?;
1228        // The new stem is in place; only now drop a stale copy left at the old
1229        // path (the song moved, or the stem format changed). `remove` is
1230        // idempotent. A path this run also writes is never removed (the
1231        // non-planned twin of `suppress_path_aliasing`). On a genuine remove
1232        // failure we return BEFORE updating the slot, so the next run re-plans the
1233        // same write and retries the cleanup — no orphan.
1234        if let Some(old) = old_path.as_deref()
1235            && !old.is_empty()
1236            && old != path
1237            && !self.write_targets.contains(old)
1238        {
1239            self.fs.remove(old).map_err(|err| {
1240                permanent_fail(clip_id, format!("could not remove old stem {old}: {err}"))
1241            })?;
1242        }
1243        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1244            set_manifest_stem(
1245                entry,
1246                key,
1247                Some(ArtifactState {
1248                    path: path.to_owned(),
1249                    hash: hash.to_owned(),
1250                }),
1251            );
1252        }
1253        Ok(Effect::ArtifactWritten)
1254    }
1255
1256    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1257    ///
1258    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1259    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1260    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1261    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1262    /// stem with no id to render) downloads its public CDN url directly. Stems
1263    /// are the deliberate exception to the source format: the bytes are returned
1264    /// exactly as delivered and are never re-encoded to FLAC.
1265    async fn fetch_stem_bytes(
1266        &self,
1267        client_lock: &ClientLock<'_, C>,
1268        clip_id: &str,
1269        stem_id: &str,
1270        source_url: &str,
1271        format: StemFormat,
1272    ) -> Result<Vec<u8>, Fail> {
1273        let url = match format {
1274            StemFormat::Wav if !stem_id.is_empty() => {
1275                match self.resolve_wav_url(client_lock, stem_id).await? {
1276                    Some(url) => url,
1277                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1278                }
1279            }
1280            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1281            _ => source_url.to_owned(),
1282        };
1283        self.fetch_bytes(&url)
1284            .await
1285            .map_err(|err| err.attribute(clip_id))
1286    }
1287
1288    /// Remove one stem file and clear its slot in the owning clip's stem map.
1289    ///
1290    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1291    /// the owning entry is already gone (its audio was deleted earlier this run,
1292    /// co-deleting the stem), there is no slot to clear and that is fine; the
1293    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1294    fn delete_stem(
1295        &self,
1296        manifest: &mut Manifest,
1297        clip_id: &str,
1298        key: &str,
1299        path: &str,
1300    ) -> Result<Effect, Fail> {
1301        self.fs
1302            .remove(path)
1303            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1304        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1305            set_manifest_stem(entry, key, None);
1306        }
1307        Ok(Effect::ArtifactDeleted)
1308    }
1309
1310    /// Download (and transcode/tag) the audio for `clip` in `format`.
1311    async fn produce_audio(
1312        &self,
1313        client_lock: &ClientLock<'_, C>,
1314        clip: &Clip,
1315        lineage: &LineageContext,
1316        format: AudioFormat,
1317    ) -> Result<Vec<u8>, Fail> {
1318        let (meta, synced) = self.track_meta(clip, lineage);
1319        match format {
1320            AudioFormat::Mp3 => {
1321                let url = clip.mp3_url();
1322                let audio = self
1323                    .fetch_bytes(&url)
1324                    .await
1325                    .map_err(|err| err.attribute(&clip.id))?;
1326                let cover = self.fetch_cover(clip).await;
1327                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1328                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1329            }
1330            AudioFormat::Flac => {
1331                let wav = self.fetch_wav(client_lock, clip).await?;
1332                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1333                    if err.is_out_of_space() {
1334                        disk_fail(&clip.id, "disk full: no space left to transcode")
1335                    } else {
1336                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1337                    }
1338                })?;
1339                let cover = self.fetch_cover(clip).await;
1340                tag_flac(&flac, &meta, cover.as_deref())
1341                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1342            }
1343            AudioFormat::Wav => self.fetch_wav(client_lock, clip).await,
1344        }
1345    }
1346
1347    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1348    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1349        self.synced
1350            .get(clip_id)
1351            .filter(|aligned| !aligned.is_empty())
1352    }
1353
1354    /// The track metadata for a clip, paired with its synced lyrics (if any).
1355    ///
1356    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1357    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1358    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1359    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1360    fn track_meta<'m>(
1361        &'m self,
1362        clip: &Clip,
1363        lineage: &LineageContext,
1364    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1365        let synced = self.synced_for(&clip.id);
1366        let mut meta = TrackMetadata::from_clip(clip, lineage);
1367        if let Some(aligned) = synced {
1368            meta.lyrics = aligned.plain_text();
1369        }
1370        (meta, synced)
1371    }
1372
1373    /// Resolve the rendered WAV URL and download it.
1374    async fn fetch_wav(
1375        &self,
1376        client_lock: &ClientLock<'_, C>,
1377        clip: &Clip,
1378    ) -> Result<Vec<u8>, Fail> {
1379        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1380            Some(url) => url,
1381            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1382        };
1383        self.fetch_bytes(&url)
1384            .await
1385            .map_err(|err| err.attribute(&clip.id))
1386    }
1387
1388    /// Read the WAV URL, requesting a render and polling if it is not ready.
1389    ///
1390    /// `None` means the render did not become ready within the poll budget; the
1391    /// caller treats that as a non-fatal transient failure, never a silent skip.
1392    ///
1393    /// Each client call briefly locks `client_lock`; the poll waits happen
1394    /// unlocked, so concurrent clips interleave their WAV renders rather than
1395    /// serialising behind one clip's whole poll budget.
1396    async fn resolve_wav_url(
1397        &self,
1398        client_lock: &ClientLock<'_, C>,
1399        id: &str,
1400    ) -> Result<Option<String>, Fail> {
1401        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1402            return Ok(Some(url));
1403        }
1404        self.request_wav_retrying(client_lock, id).await?;
1405        for _ in 0..self.opts.wav_poll_attempts {
1406            self.clock.sleep(self.opts.wav_poll_interval).await;
1407            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1408                return Ok(Some(url));
1409            }
1410        }
1411        Ok(None)
1412    }
1413
1414    /// Read the rendered WAV URL, retrying transient API failures with backoff
1415    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1416    async fn wav_url_retrying(
1417        &self,
1418        client_lock: &ClientLock<'_, C>,
1419        id: &str,
1420    ) -> Result<Option<String>, Fail> {
1421        let mut attempt: u32 = 0;
1422        loop {
1423            let result = {
1424                let mut client = client_lock.lock().await;
1425                client.wav_url(self.http, id).await
1426            };
1427            match result {
1428                Ok(url) => return Ok(url),
1429                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1430                    Some(fail) => return Err(fail),
1431                    None => continue,
1432                },
1433            }
1434        }
1435    }
1436
1437    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1438    async fn request_wav_retrying(
1439        &self,
1440        client_lock: &ClientLock<'_, C>,
1441        id: &str,
1442    ) -> Result<(), Fail> {
1443        let mut attempt: u32 = 0;
1444        loop {
1445            let result = {
1446                let mut client = client_lock.lock().await;
1447                client.request_wav(self.http, id).await
1448            };
1449            match result {
1450                Ok(()) => return Ok(()),
1451                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1452                    Some(fail) => return Err(fail),
1453                    None => continue,
1454                },
1455            }
1456        }
1457    }
1458
1459    /// Classify a core error from the authenticated WAV flow. On a transient
1460    /// class within budget, back off through the [`Clock`] and return `None` to
1461    /// retry; otherwise return the terminal [`Fail`].
1462    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1463        let fail = classify_core(id, err);
1464        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1465            self.clock.sleep(backoff_delay(*attempt, None)).await;
1466            *attempt += 1;
1467            None
1468        } else {
1469            Some(fail)
1470        }
1471    }
1472
1473    /// GET `url`, retrying transient failures with backoff, verifying size.
1474    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1475        let mut attempt: u32 = 0;
1476        loop {
1477            let result = self.http.send(HttpRequest::get(url)).await;
1478            match classify_response(result) {
1479                Ok(body) => return Ok(body),
1480                Err(err) => {
1481                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1482                        let delay = backoff_delay(attempt, err.retry_after);
1483                        self.clock.sleep(delay).await;
1484                        attempt += 1;
1485                        continue;
1486                    }
1487                    return Err(err);
1488                }
1489            }
1490        }
1491    }
1492
1493    /// Download cover art, trying each candidate URL in order; `None` is fine.
1494    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1495        for url in clip.cover_candidates() {
1496            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1497                && (200..=299).contains(&response.status)
1498                && !response.body.is_empty()
1499            {
1500                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1501                // bytes so its write reuses them instead of fetching again (#89).
1502                // The lock guards only the insert, never the await above.
1503                if self.cover_wanted.contains(url) {
1504                    self.cover_cache
1505                        .lock()
1506                        .expect("cover cache mutex poisoned")
1507                        .insert(url.to_owned(), response.body.clone());
1508                }
1509                return Some(response.body);
1510            }
1511        }
1512        None
1513    }
1514
1515    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1516    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1517        self.fs.write_atomic(path, bytes).map_err(|err| {
1518            if err.is_out_of_space() {
1519                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1520            } else {
1521                permanent_fail(clip_id, format!("write failed: {err}"))
1522            }
1523        })?;
1524        match self.fs.metadata(path) {
1525            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1526            Some(stat) => Err(permanent_fail(
1527                clip_id,
1528                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1529            )),
1530            None => Ok(bytes.len() as u64),
1531        }
1532    }
1533
1534    /// Build the manifest entry for a freshly written file.
1535    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1536        match self.by_id.get(clip_id) {
1537            Some(d) => manifest_entry(d, size),
1538            None => ManifestEntry {
1539                path: path.to_owned(),
1540                format,
1541                size,
1542                ..ManifestEntry::default()
1543            },
1544        }
1545    }
1546
1547    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1548    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1549        let desired = self.by_id.get(clip_id).copied();
1550        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1551            if let Some(d) = desired {
1552                entry.meta_hash = d.meta_hash.clone();
1553                entry.art_hash = d.art_hash.clone();
1554                entry.preserve = preserve_for(d);
1555            }
1556            if let Some(size) = size {
1557                entry.size = size;
1558            }
1559        }
1560    }
1561
1562    /// Refresh only an entry's preserve marker from the current desired state.
1563    ///
1564    /// A clip can gain or lose copy/private protection with no file change, which
1565    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1566    /// persisted marker a faithful image of live protection, so the cross-run
1567    /// delete guard (SYNC-8) never reads it stale.
1568    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1569        if let Some(d) = self.by_id.get(clip_id).copied()
1570            && let Some(entry) = manifest.entries.get_mut(clip_id)
1571        {
1572            entry.preserve = preserve_for(d);
1573        }
1574    }
1575}
1576
1577/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1578fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1579    ManifestEntry {
1580        path: d.path.clone(),
1581        format: d.format,
1582        meta_hash: d.meta_hash.clone(),
1583        art_hash: d.art_hash.clone(),
1584        size,
1585        preserve: preserve_for(d),
1586        ..Default::default()
1587    }
1588}
1589
1590/// Whether a written entry must be preserved across runs: held by any copy
1591/// source, or private. The reconcile delete guard reads this marker later.
1592fn preserve_for(d: &Desired) -> bool {
1593    d.private || d.modes.contains(&SourceMode::Copy)
1594}
1595
1596/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1597fn classify_response(
1598    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1599) -> Result<Vec<u8>, FetchError> {
1600    let response = match result {
1601        Ok(response) => response,
1602        Err(err) => {
1603            return Err(FetchError::transient(
1604                format!("transport error: {err}"),
1605                None,
1606            ));
1607        }
1608    };
1609    match response.status {
1610        200..=299 => {
1611            if let Some(expected) = content_length(&response) {
1612                let actual = response.body.len() as u64;
1613                if actual != expected {
1614                    return Err(FetchError::transient(
1615                        format!("truncated download: {actual} of {expected} bytes"),
1616                        None,
1617                    ));
1618                }
1619            }
1620            Ok(response.body)
1621        }
1622        401 | 403 => Err(FetchError::transient(
1623            format!("download rejected: status {}", response.status),
1624            None,
1625        )),
1626        408 => Err(FetchError::transient("request timed out", None)),
1627        429 => Err(FetchError::transient(
1628            "rate limited",
1629            retry_after(&response),
1630        )),
1631        500..=599 => Err(FetchError::transient(
1632            format!("server error {}", response.status),
1633            None,
1634        )),
1635        status => Err(FetchError::permanent(format!(
1636            "download failed: status {status}"
1637        ))),
1638    }
1639}
1640
1641/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1642fn classify_core(id: &str, err: Error) -> Fail {
1643    let reason = err.to_string();
1644    match err {
1645        Error::Auth(_) => auth_fail(id, reason),
1646        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1647        Error::Api(_)
1648        | Error::NotFound(_)
1649        | Error::Tag(_)
1650        | Error::Config(_)
1651        | Error::Refused(_) => permanent_fail(id, reason),
1652    }
1653}
1654
1655/// The provider-reported body size from `Content-Length`, if present and valid.
1656fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1657    response.header("content-length")?.trim().parse().ok()
1658}
1659
1660#[cfg(test)]
1661mod tests {
1662    use super::*;
1663    use crate::ClerkAuth;
1664    use crate::http::HttpResponse;
1665    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1666
1667    fn clip(id: &str) -> Clip {
1668        Clip {
1669            id: id.to_owned(),
1670            title: "Song".to_owned(),
1671            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1672            ..Default::default()
1673        }
1674    }
1675
1676    fn art_clip(id: &str) -> Clip {
1677        Clip {
1678            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1679            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1680            ..clip(id)
1681        }
1682    }
1683
1684    fn ext(format: AudioFormat) -> &'static str {
1685        match format {
1686            AudioFormat::Mp3 => "mp3",
1687            AudioFormat::Flac => "flac",
1688            AudioFormat::Wav => "wav",
1689        }
1690    }
1691
1692    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1693        Desired {
1694            path: format!("{}.{}", clip.id, ext(format)),
1695            lineage: LineageContext::own_root(&clip),
1696            clip,
1697            format,
1698            meta_hash: "m".to_owned(),
1699            art_hash: "art".to_owned(),
1700            modes: vec![SourceMode::Mirror],
1701            trashed: false,
1702            private: false,
1703            artifacts: Vec::new(),
1704            stems: None,
1705        }
1706    }
1707
1708    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1709        ManifestEntry {
1710            path: path.to_owned(),
1711            format,
1712            meta_hash: "old".to_owned(),
1713            art_hash: "old-art".to_owned(),
1714            size: 8,
1715            preserve: false,
1716            ..Default::default()
1717        }
1718    }
1719
1720    #[allow(clippy::too_many_arguments)]
1721    fn run<G: Ffmpeg>(
1722        plan: &Plan,
1723        manifest: &mut Manifest,
1724        desired: &[Desired],
1725        http: &ScriptedHttp,
1726        fs: &MemFs,
1727        ffmpeg: &G,
1728        clock: &RecordingClock,
1729        opts: &ExecOptions,
1730    ) -> ExecOutcome {
1731        let mut albums = BTreeMap::new();
1732        run_with_albums(
1733            plan,
1734            manifest,
1735            &mut albums,
1736            desired,
1737            http,
1738            fs,
1739            ffmpeg,
1740            clock,
1741            opts,
1742        )
1743    }
1744
1745    #[allow(clippy::too_many_arguments)]
1746    fn run_with_albums<G: Ffmpeg>(
1747        plan: &Plan,
1748        manifest: &mut Manifest,
1749        albums: &mut BTreeMap<String, AlbumArt>,
1750        desired: &[Desired],
1751        http: &ScriptedHttp,
1752        fs: &MemFs,
1753        ffmpeg: &G,
1754        clock: &RecordingClock,
1755        opts: &ExecOptions,
1756    ) -> ExecOutcome {
1757        let mut playlists = BTreeMap::new();
1758        run_full(
1759            plan,
1760            manifest,
1761            albums,
1762            &mut playlists,
1763            desired,
1764            http,
1765            fs,
1766            ffmpeg,
1767            clock,
1768            opts,
1769        )
1770    }
1771
1772    #[allow(clippy::too_many_arguments)]
1773    fn run_full<G: Ffmpeg>(
1774        plan: &Plan,
1775        manifest: &mut Manifest,
1776        albums: &mut BTreeMap<String, AlbumArt>,
1777        playlists: &mut BTreeMap<String, PlaylistState>,
1778        desired: &[Desired],
1779        http: &ScriptedHttp,
1780        fs: &MemFs,
1781        ffmpeg: &G,
1782        clock: &RecordingClock,
1783        opts: &ExecOptions,
1784    ) -> ExecOutcome {
1785        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1786        let synced = HashMap::new();
1787        pollster::block_on(execute(
1788            plan,
1789            manifest,
1790            albums,
1791            playlists,
1792            desired,
1793            &synced,
1794            Ports {
1795                client: &mut client,
1796                http,
1797                fs,
1798                ffmpeg,
1799                clock,
1800            },
1801            opts,
1802        ))
1803    }
1804
1805    fn small_poll() -> ExecOptions {
1806        ExecOptions {
1807            max_retries: 3,
1808            wav_poll_attempts: 2,
1809            wav_poll_interval: Duration::from_secs(5),
1810            concurrency: 4,
1811            cover_webp: WebpEncodeSettings::default(),
1812        }
1813    }
1814
1815    // ── Download: MP3 ───────────────────────────────────────────────
1816
1817    #[test]
1818    fn download_mp3_writes_tagged_file_and_records_manifest() {
1819        let c = art_clip("a");
1820        let d = desired(c.clone(), AudioFormat::Mp3);
1821        let plan = Plan {
1822            actions: vec![Action::Download {
1823                clip: c.clone(),
1824                lineage: LineageContext::own_root(&c),
1825                path: d.path.clone(),
1826                format: AudioFormat::Mp3,
1827            }],
1828        };
1829        let http = ScriptedHttp::new()
1830            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1831            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1832        let fs = MemFs::new();
1833        let ffmpeg = StubFfmpeg::flac();
1834        let clock = RecordingClock::new();
1835        let mut manifest = Manifest::new();
1836
1837        let outcome = run(
1838            &plan,
1839            &mut manifest,
1840            &[d],
1841            &http,
1842            &fs,
1843            &ffmpeg,
1844            &clock,
1845            &ExecOptions::default(),
1846        );
1847
1848        assert_eq!(outcome.downloaded, 1);
1849        assert_eq!(outcome.failed(), 0);
1850        assert_eq!(outcome.status, RunStatus::Completed);
1851        let written = fs.read_file("a.mp3").unwrap();
1852        assert_eq!(&written[..3], b"ID3");
1853        assert!(written.ends_with(b"mp3-body"));
1854        let entry = manifest.get("a").unwrap();
1855        assert_eq!(entry.path, "a.mp3");
1856        assert_eq!(entry.format, AudioFormat::Mp3);
1857        assert_eq!(entry.meta_hash, "m");
1858        assert_eq!(entry.art_hash, "art");
1859        assert_eq!(entry.size, written.len() as u64);
1860        assert!(!entry.preserve);
1861    }
1862
1863    #[test]
1864    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
1865        // A clip whose alignment was fetched this run gets a word-level SYLT frame
1866        // and its plain lyric text embedded (USLT), end to end through execute.
1867        let c = art_clip("a");
1868        let d = desired(c.clone(), AudioFormat::Mp3);
1869        let plan = Plan {
1870            actions: vec![Action::Download {
1871                clip: c.clone(),
1872                lineage: LineageContext::own_root(&c),
1873                path: d.path.clone(),
1874                format: AudioFormat::Mp3,
1875            }],
1876        };
1877        let http = ScriptedHttp::new()
1878            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1879            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1880        let fs = MemFs::new();
1881        let ffmpeg = StubFfmpeg::flac();
1882        let clock = RecordingClock::new();
1883        let mut manifest = Manifest::new();
1884        let mut albums = BTreeMap::new();
1885        let mut playlists = BTreeMap::new();
1886        let mut synced = HashMap::new();
1887        synced.insert(
1888            "a".to_string(),
1889            AlignedLyrics::from_json(&serde_json::json!({
1890                "aligned_words": [],
1891                "aligned_lyrics": [
1892                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
1893                     "words": [
1894                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
1895                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
1896                     ]}
1897                ]
1898            })),
1899        );
1900        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1901        let outcome = pollster::block_on(execute(
1902            &plan,
1903            &mut manifest,
1904            &mut albums,
1905            &mut playlists,
1906            &[d],
1907            &synced,
1908            Ports {
1909                client: &mut client,
1910                http: &http,
1911                fs: &fs,
1912                ffmpeg: &ffmpeg,
1913                clock: &clock,
1914            },
1915            &ExecOptions::default(),
1916        ));
1917
1918        assert_eq!(outcome.downloaded, 1);
1919        let written = fs.read_file("a.mp3").unwrap();
1920        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1921        assert_eq!(
1922            tag.synchronised_lyrics().count(),
1923            1,
1924            "a SYLT frame is embedded"
1925        );
1926        // The plain lyric text is populated from the alignment for the USLT frame.
1927        assert_eq!(
1928            tag.lyrics().next().map(|frame| frame.text.as_str()),
1929            Some("hi there")
1930        );
1931    }
1932
1933    #[test]
1934    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
1935        // The synced map is empty when the feature is off (no alignment fetched),
1936        // so no SYLT frame and no lyric text are embedded.
1937        let c = art_clip("a");
1938        let d = desired(c.clone(), AudioFormat::Mp3);
1939        let plan = Plan {
1940            actions: vec![Action::Download {
1941                clip: c.clone(),
1942                lineage: LineageContext::own_root(&c),
1943                path: d.path.clone(),
1944                format: AudioFormat::Mp3,
1945            }],
1946        };
1947        let http = ScriptedHttp::new()
1948            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1949            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1950        let fs = MemFs::new();
1951        let ffmpeg = StubFfmpeg::flac();
1952        let clock = RecordingClock::new();
1953        let mut manifest = Manifest::new();
1954        let mut albums = BTreeMap::new();
1955        let mut playlists = BTreeMap::new();
1956        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1957        let outcome = pollster::block_on(execute(
1958            &plan,
1959            &mut manifest,
1960            &mut albums,
1961            &mut playlists,
1962            &[d],
1963            &HashMap::new(),
1964            Ports {
1965                client: &mut client,
1966                http: &http,
1967                fs: &fs,
1968                ffmpeg: &ffmpeg,
1969                clock: &clock,
1970            },
1971            &ExecOptions::default(),
1972        ));
1973        assert_eq!(outcome.downloaded, 1);
1974        let written = fs.read_file("a.mp3").unwrap();
1975        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1976        assert_eq!(tag.synchronised_lyrics().count(), 0);
1977        assert_eq!(tag.lyrics().count(), 0);
1978    }
1979
1980    #[test]
1981    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
1982        let mut c = clip("a");
1983        c.audio_url = String::new();
1984        let d = desired(c.clone(), AudioFormat::Mp3);
1985        let plan = Plan {
1986            actions: vec![Action::Download {
1987                clip: c.clone(),
1988                lineage: LineageContext::own_root(&c),
1989                path: d.path.clone(),
1990                format: AudioFormat::Mp3,
1991            }],
1992        };
1993        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
1994        let fs = MemFs::new();
1995        let mut manifest = Manifest::new();
1996        let outcome = run(
1997            &plan,
1998            &mut manifest,
1999            &[d],
2000            &http,
2001            &fs,
2002            &StubFfmpeg::flac(),
2003            &RecordingClock::new(),
2004            &ExecOptions::default(),
2005        );
2006        assert_eq!(outcome.downloaded, 1);
2007        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
2008    }
2009
2010    // ── Download: FLAC render + transcode ───────────────────────────
2011
2012    #[test]
2013    fn download_flac_renders_transcodes_and_records() {
2014        let c = clip("b");
2015        let d = desired(c.clone(), AudioFormat::Flac);
2016        let plan = Plan {
2017            actions: vec![Action::Download {
2018                clip: c.clone(),
2019                lineage: LineageContext::own_root(&c),
2020                path: d.path.clone(),
2021                format: AudioFormat::Flac,
2022            }],
2023        };
2024        let http = ScriptedHttp::new()
2025            .with_auth()
2026            .route(
2027                "/wav_file/",
2028                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
2029            )
2030            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
2031        let fs = MemFs::new();
2032        let clock = RecordingClock::new();
2033        let mut manifest = Manifest::new();
2034
2035        let outcome = run(
2036            &plan,
2037            &mut manifest,
2038            &[d],
2039            &http,
2040            &fs,
2041            &StubFfmpeg::flac(),
2042            &clock,
2043            &ExecOptions::default(),
2044        );
2045
2046        assert_eq!(outcome.downloaded, 1);
2047        assert_eq!(outcome.failed(), 0);
2048        let written = fs.read_file("b.flac").unwrap();
2049        assert_eq!(&written[..4], b"fLaC");
2050        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
2051        // The URL was ready immediately, so no render request and no polling.
2052        assert_eq!(http.count("/convert_wav/"), 0);
2053        assert!(clock.sleeps().is_empty());
2054    }
2055
2056    #[test]
2057    fn download_flac_requests_render_then_polls_until_ready() {
2058        let c = clip("c");
2059        let d = desired(c.clone(), AudioFormat::Flac);
2060        let plan = Plan {
2061            actions: vec![Action::Download {
2062                clip: c.clone(),
2063                lineage: LineageContext::own_root(&c),
2064                path: d.path.clone(),
2065                format: AudioFormat::Flac,
2066            }],
2067        };
2068        let http = ScriptedHttp::new()
2069            .with_auth()
2070            .route_seq(
2071                "/wav_file/",
2072                vec![
2073                    Reply::json("{}"),
2074                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2075                ],
2076            )
2077            .route("/convert_wav/", Reply::status(200))
2078            .route("c.wav", Reply::ok(b"wav".to_vec()));
2079        let clock = RecordingClock::new();
2080        let mut manifest = Manifest::new();
2081
2082        let outcome = run(
2083            &plan,
2084            &mut manifest,
2085            &[d],
2086            &http,
2087            &fs_new(),
2088            &StubFfmpeg::flac(),
2089            &clock,
2090            &small_poll(),
2091        );
2092
2093        assert_eq!(outcome.downloaded, 1);
2094        assert_eq!(http.count("/convert_wav/"), 1);
2095        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2096    }
2097
2098    #[test]
2099    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2100        let c = clip("d");
2101        let d = desired(c.clone(), AudioFormat::Flac);
2102        let plan = Plan {
2103            actions: vec![Action::Download {
2104                clip: c.clone(),
2105                lineage: LineageContext::own_root(&c),
2106                path: d.path.clone(),
2107                format: AudioFormat::Flac,
2108            }],
2109        };
2110        let http = ScriptedHttp::new()
2111            .with_auth()
2112            .route("/wav_file/", Reply::json("{}"))
2113            .route("/convert_wav/", Reply::status(200));
2114        let fs = MemFs::new();
2115        let clock = RecordingClock::new();
2116        let mut manifest = Manifest::new();
2117
2118        let outcome = run(
2119            &plan,
2120            &mut manifest,
2121            &[d],
2122            &http,
2123            &fs,
2124            &StubFfmpeg::flac(),
2125            &clock,
2126            &small_poll(),
2127        );
2128
2129        assert_eq!(outcome.downloaded, 0);
2130        assert_eq!(outcome.failed(), 1);
2131        assert_eq!(outcome.failures[0].clip_id, "d");
2132        assert_eq!(outcome.status, RunStatus::Completed);
2133        assert!(!fs.exists("d.flac"));
2134        assert_eq!(clock.sleeps().len(), 2);
2135    }
2136
2137    #[test]
2138    fn flac_transcode_failure_is_recorded_and_skipped() {
2139        let c = clip("t");
2140        let d = desired(c.clone(), AudioFormat::Flac);
2141        let plan = Plan {
2142            actions: vec![Action::Download {
2143                clip: c.clone(),
2144                lineage: LineageContext::own_root(&c),
2145                path: d.path.clone(),
2146                format: AudioFormat::Flac,
2147            }],
2148        };
2149        let http = ScriptedHttp::new()
2150            .with_auth()
2151            .route(
2152                "/wav_file/",
2153                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2154            )
2155            .route("t.wav", Reply::ok(b"wav".to_vec()));
2156        let fs = MemFs::new();
2157        let mut manifest = Manifest::new();
2158
2159        let outcome = run(
2160            &plan,
2161            &mut manifest,
2162            &[d],
2163            &http,
2164            &fs,
2165            &StubFfmpeg::failing(),
2166            &RecordingClock::new(),
2167            &ExecOptions::default(),
2168        );
2169
2170        assert_eq!(outcome.downloaded, 0);
2171        assert_eq!(outcome.failed(), 1);
2172        assert!(!fs.exists("t.flac"));
2173        assert!(manifest.get("t").is_none());
2174    }
2175
2176    // ── Cover fallback ──────────────────────────────────────────────
2177
2178    #[test]
2179    fn cover_falls_back_when_large_image_is_missing() {
2180        let c = art_clip("e");
2181        let d = desired(c.clone(), AudioFormat::Mp3);
2182        let plan = Plan {
2183            actions: vec![Action::Download {
2184                clip: c.clone(),
2185                lineage: LineageContext::own_root(&c),
2186                path: d.path.clone(),
2187                format: AudioFormat::Mp3,
2188            }],
2189        };
2190        let http = ScriptedHttp::new()
2191            .route("e.mp3", Reply::ok(b"body".to_vec()))
2192            .route("e/large.jpg", Reply::status(404))
2193            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2194        let fs = MemFs::new();
2195        let mut manifest = Manifest::new();
2196
2197        let outcome = run(
2198            &plan,
2199            &mut manifest,
2200            &[d],
2201            &http,
2202            &fs,
2203            &StubFfmpeg::flac(),
2204            &RecordingClock::new(),
2205            &ExecOptions::default(),
2206        );
2207
2208        assert_eq!(outcome.downloaded, 1);
2209        let calls = http.calls();
2210        let large = calls
2211            .iter()
2212            .position(|u| u.contains("e/large.jpg"))
2213            .unwrap();
2214        let small = calls
2215            .iter()
2216            .position(|u| u.contains("e/small.jpg"))
2217            .unwrap();
2218        assert!(large < small, "large art tried before small");
2219    }
2220
2221    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2222
2223    #[test]
2224    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2225        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2226        // fetched once and the bytes serve both.
2227        let c = art_clip("a");
2228        let d = desired(c.clone(), AudioFormat::Mp3);
2229        let plan = Plan {
2230            actions: vec![
2231                Action::Download {
2232                    clip: c.clone(),
2233                    lineage: LineageContext::own_root(&c),
2234                    path: d.path.clone(),
2235                    format: AudioFormat::Mp3,
2236                },
2237                Action::WriteArtifact {
2238                    kind: ArtifactKind::CoverJpg,
2239                    path: "a/cover.jpg".to_owned(),
2240                    source_url: c.selected_image_url().unwrap().to_owned(),
2241                    hash: "art".to_owned(),
2242                    owner_id: "a".to_owned(),
2243                    content: None,
2244                },
2245            ],
2246        };
2247        let http = ScriptedHttp::new()
2248            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2249            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2250        let fs = MemFs::new();
2251        let mut manifest = Manifest::new();
2252
2253        let outcome = run(
2254            &plan,
2255            &mut manifest,
2256            &[d],
2257            &http,
2258            &fs,
2259            &StubFfmpeg::flac(),
2260            &RecordingClock::new(),
2261            &ExecOptions::default(),
2262        );
2263
2264        assert_eq!(outcome.downloaded, 1);
2265        assert_eq!(outcome.artifacts_written, 1);
2266        assert_eq!(outcome.failed(), 0);
2267        // Fetched once, not twice.
2268        assert_eq!(http.count("a/large.jpg"), 1);
2269        // The sidecar carries the fetched bytes, and the audio was tagged.
2270        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2271        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2272    }
2273
2274    #[test]
2275    fn concurrent_downloads_reuse_each_clips_own_cover() {
2276        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2277        // (no cross-contamination) and each cover URL is fetched exactly once.
2278        let a = art_clip("a");
2279        let b = art_clip("b");
2280        let da = desired(a.clone(), AudioFormat::Mp3);
2281        let db = desired(b.clone(), AudioFormat::Mp3);
2282        let plan = Plan {
2283            actions: vec![
2284                Action::Download {
2285                    clip: a.clone(),
2286                    lineage: LineageContext::own_root(&a),
2287                    path: da.path.clone(),
2288                    format: AudioFormat::Mp3,
2289                },
2290                Action::WriteArtifact {
2291                    kind: ArtifactKind::CoverJpg,
2292                    path: "a/cover.jpg".to_owned(),
2293                    source_url: a.selected_image_url().unwrap().to_owned(),
2294                    hash: "art".to_owned(),
2295                    owner_id: "a".to_owned(),
2296                    content: None,
2297                },
2298                Action::Download {
2299                    clip: b.clone(),
2300                    lineage: LineageContext::own_root(&b),
2301                    path: db.path.clone(),
2302                    format: AudioFormat::Mp3,
2303                },
2304                Action::WriteArtifact {
2305                    kind: ArtifactKind::CoverJpg,
2306                    path: "b/cover.jpg".to_owned(),
2307                    source_url: b.selected_image_url().unwrap().to_owned(),
2308                    hash: "art".to_owned(),
2309                    owner_id: "b".to_owned(),
2310                    content: None,
2311                },
2312            ],
2313        };
2314        let http = ScriptedHttp::new()
2315            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2316            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2317            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2318            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2319        let fs = MemFs::new();
2320        let mut manifest = Manifest::new();
2321
2322        let outcome = run(
2323            &plan,
2324            &mut manifest,
2325            &[da, db],
2326            &http,
2327            &fs,
2328            &StubFfmpeg::flac(),
2329            &RecordingClock::new(),
2330            &small_poll(),
2331        );
2332
2333        assert_eq!(outcome.downloaded, 2);
2334        assert_eq!(outcome.artifacts_written, 2);
2335        assert_eq!(http.count("a/large.jpg"), 1);
2336        assert_eq!(http.count("b/large.jpg"), 1);
2337        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2338        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2339    }
2340
2341    #[test]
2342    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2343        // The large image 404s so the embed falls back to the small image; the
2344        // sidecar still wants the (dead) large URL and must NOT be handed the
2345        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2346        // the sidecar fetches the large URL itself (then fails on the 404).
2347        let c = art_clip("e");
2348        let d = desired(c.clone(), AudioFormat::Mp3);
2349        let plan = Plan {
2350            actions: vec![
2351                Action::Download {
2352                    clip: c.clone(),
2353                    lineage: LineageContext::own_root(&c),
2354                    path: d.path.clone(),
2355                    format: AudioFormat::Mp3,
2356                },
2357                Action::WriteArtifact {
2358                    kind: ArtifactKind::CoverJpg,
2359                    path: "e/cover.jpg".to_owned(),
2360                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2361                    hash: "art".to_owned(),
2362                    owner_id: "e".to_owned(),
2363                    content: None,
2364                },
2365            ],
2366        };
2367        let http = ScriptedHttp::new()
2368            .route("e.mp3", Reply::ok(b"body".to_vec()))
2369            .route("e/large.jpg", Reply::status(404))
2370            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2371        let fs = MemFs::new();
2372        let mut manifest = Manifest::new();
2373
2374        let outcome = run(
2375            &plan,
2376            &mut manifest,
2377            &[d],
2378            &http,
2379            &fs,
2380            &StubFfmpeg::flac(),
2381            &RecordingClock::new(),
2382            &ExecOptions::default(),
2383        );
2384
2385        assert_eq!(outcome.downloaded, 1);
2386        // The small image was fetched once (the embed fallback) and never reused
2387        // for the large-keyed sidecar; the sidecar went to the network itself.
2388        assert_eq!(http.count("e/small.jpg"), 1);
2389        assert!(
2390            http.count("e/large.jpg") >= 2,
2391            "sidecar refetched the large URL"
2392        );
2393        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2394        assert!(!fs.exists("e/cover.jpg"));
2395    }
2396
2397    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2398
2399    #[test]
2400    fn failed_write_leaves_the_prior_file_intact() {
2401        let c = clip("f");
2402        let d = desired(c.clone(), AudioFormat::Mp3);
2403        let plan = Plan {
2404            actions: vec![Action::Download {
2405                clip: c.clone(),
2406                lineage: LineageContext::own_root(&c),
2407                path: d.path.clone(),
2408                format: AudioFormat::Mp3,
2409            }],
2410        };
2411        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2412        let fs = MemFs::new()
2413            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2414            .fail_write("f.mp3");
2415        let mut manifest = Manifest::new();
2416
2417        let outcome = run(
2418            &plan,
2419            &mut manifest,
2420            &[d],
2421            &http,
2422            &fs,
2423            &StubFfmpeg::flac(),
2424            &RecordingClock::new(),
2425            &ExecOptions::default(),
2426        );
2427
2428        assert_eq!(outcome.downloaded, 0);
2429        assert_eq!(outcome.failed(), 1);
2430        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2431        assert!(manifest.get("f").is_none());
2432    }
2433
2434    #[test]
2435    fn size_mismatch_after_write_is_a_failure() {
2436        let c = clip("g");
2437        let d = desired(c.clone(), AudioFormat::Mp3);
2438        let plan = Plan {
2439            actions: vec![Action::Download {
2440                clip: c.clone(),
2441                lineage: LineageContext::own_root(&c),
2442                path: d.path.clone(),
2443                format: AudioFormat::Mp3,
2444            }],
2445        };
2446        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2447        let fs = MemFs::new().corrupt_write("g.mp3");
2448        let mut manifest = Manifest::new();
2449
2450        let outcome = run(
2451            &plan,
2452            &mut manifest,
2453            &[d],
2454            &http,
2455            &fs,
2456            &StubFfmpeg::flac(),
2457            &RecordingClock::new(),
2458            &ExecOptions::default(),
2459        );
2460
2461        assert_eq!(outcome.downloaded, 0);
2462        assert_eq!(outcome.failed(), 1);
2463        assert!(outcome.failures[0].reason.contains("expected"));
2464        assert!(manifest.get("g").is_none());
2465    }
2466
2467    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2468
2469    #[test]
2470    fn transient_failure_is_retried_then_skipped() {
2471        let c = clip("h");
2472        let d = desired(c.clone(), AudioFormat::Mp3);
2473        let plan = Plan {
2474            actions: vec![Action::Download {
2475                clip: c.clone(),
2476                lineage: LineageContext::own_root(&c),
2477                path: d.path.clone(),
2478                format: AudioFormat::Mp3,
2479            }],
2480        };
2481        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2482        let fs = MemFs::new();
2483        let clock = RecordingClock::new();
2484        let opts = ExecOptions {
2485            max_retries: 2,
2486            ..ExecOptions::default()
2487        };
2488        let mut manifest = Manifest::new();
2489
2490        let outcome = run(
2491            &plan,
2492            &mut manifest,
2493            &[d],
2494            &http,
2495            &fs,
2496            &StubFfmpeg::flac(),
2497            &clock,
2498            &opts,
2499        );
2500
2501        assert_eq!(outcome.downloaded, 0);
2502        assert_eq!(outcome.failed(), 1);
2503        assert_eq!(http.count("h.mp3"), 3);
2504        assert_eq!(clock.sleeps().len(), 2);
2505    }
2506
2507    #[test]
2508    fn truncated_download_is_retried_then_succeeds() {
2509        let c = clip("i");
2510        let d = desired(c.clone(), AudioFormat::Mp3);
2511        let plan = Plan {
2512            actions: vec![Action::Download {
2513                clip: c.clone(),
2514                lineage: LineageContext::own_root(&c),
2515                path: d.path.clone(),
2516                format: AudioFormat::Mp3,
2517            }],
2518        };
2519        let http = ScriptedHttp::new().route_seq(
2520            "i.mp3",
2521            vec![
2522                Reply::ok(b"short".to_vec()).with_content_length(999),
2523                Reply::ok(b"good-body".to_vec()),
2524            ],
2525        );
2526        let fs = MemFs::new();
2527        let clock = RecordingClock::new();
2528        let mut manifest = Manifest::new();
2529
2530        let outcome = run(
2531            &plan,
2532            &mut manifest,
2533            &[d],
2534            &http,
2535            &fs,
2536            &StubFfmpeg::flac(),
2537            &clock,
2538            &ExecOptions::default(),
2539        );
2540
2541        assert_eq!(outcome.downloaded, 1);
2542        assert_eq!(http.count("i.mp3"), 2);
2543        assert_eq!(clock.sleeps().len(), 1);
2544    }
2545
2546    #[test]
2547    fn rate_limit_backs_off_using_retry_after() {
2548        let c = clip("j");
2549        let d = desired(c.clone(), AudioFormat::Mp3);
2550        let plan = Plan {
2551            actions: vec![Action::Download {
2552                clip: c.clone(),
2553                lineage: LineageContext::own_root(&c),
2554                path: d.path.clone(),
2555                format: AudioFormat::Mp3,
2556            }],
2557        };
2558        let http = ScriptedHttp::new().route_seq(
2559            "j.mp3",
2560            vec![
2561                Reply::status(429).with_retry_after(7),
2562                Reply::ok(b"body".to_vec()),
2563            ],
2564        );
2565        let fs = MemFs::new();
2566        let clock = RecordingClock::new();
2567        let mut manifest = Manifest::new();
2568
2569        let outcome = run(
2570            &plan,
2571            &mut manifest,
2572            &[d],
2573            &http,
2574            &fs,
2575            &StubFfmpeg::flac(),
2576            &clock,
2577            &ExecOptions::default(),
2578        );
2579
2580        assert_eq!(outcome.downloaded, 1);
2581        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2582    }
2583
2584    #[test]
2585    fn auth_failure_aborts_the_run() {
2586        let c1 = clip("k1");
2587        let c2 = clip("k2");
2588        let d1 = desired(c1.clone(), AudioFormat::Flac);
2589        let d2 = desired(c2.clone(), AudioFormat::Flac);
2590        let plan = Plan {
2591            actions: vec![
2592                Action::Download {
2593                    clip: c1.clone(),
2594                    lineage: LineageContext::own_root(&c1),
2595                    path: d1.path.clone(),
2596                    format: AudioFormat::Flac,
2597                },
2598                Action::Download {
2599                    clip: c2.clone(),
2600                    lineage: LineageContext::own_root(&c2),
2601                    path: d2.path.clone(),
2602                    format: AudioFormat::Flac,
2603                },
2604            ],
2605        };
2606        // The authenticated WAV-render endpoint rejects auth even after a JWT
2607        // refresh: that is a bad token, so the whole run aborts rather than
2608        // hammering every clip. A CDN media rejection, by contrast, does not.
2609        let http = ScriptedHttp::new()
2610            .with_auth()
2611            .route("/wav_file/", Reply::status(401));
2612        let fs = MemFs::new();
2613        let mut manifest = Manifest::new();
2614
2615        let outcome = run(
2616            &plan,
2617            &mut manifest,
2618            &[d1, d2],
2619            &http,
2620            &fs,
2621            &StubFfmpeg::flac(),
2622            &RecordingClock::new(),
2623            &small_poll(),
2624        );
2625
2626        assert_eq!(outcome.status, RunStatus::AuthAborted);
2627        assert_eq!(outcome.failed(), 1);
2628        assert_eq!(outcome.failures[0].clip_id, "k1");
2629        assert_eq!(outcome.downloaded, 0);
2630    }
2631
2632    // ── Disk-full aborts the run (issue #17) ────────────────────────
2633
2634    #[test]
2635    fn disk_full_primary_write_aborts_the_run() {
2636        // Two MP3 downloads; the first write is out of space. That is systemic,
2637        // so the run aborts before the second is even attempted: exactly one
2638        // failure is recorded and its reason names the disk-full cause.
2639        let c1 = clip("d1");
2640        let c2 = clip("d2");
2641        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2642        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2643        let plan = Plan {
2644            actions: vec![
2645                Action::Download {
2646                    clip: c1.clone(),
2647                    lineage: LineageContext::own_root(&c1),
2648                    path: d1.path.clone(),
2649                    format: AudioFormat::Mp3,
2650                },
2651                Action::Download {
2652                    clip: c2.clone(),
2653                    lineage: LineageContext::own_root(&c2),
2654                    path: d2.path.clone(),
2655                    format: AudioFormat::Mp3,
2656                },
2657            ],
2658        };
2659        let http = ScriptedHttp::new()
2660            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2661            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2662        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2663        let mut manifest = Manifest::new();
2664
2665        let outcome = run(
2666            &plan,
2667            &mut manifest,
2668            &[d1, d2],
2669            &http,
2670            &fs,
2671            &StubFfmpeg::flac(),
2672            &RecordingClock::new(),
2673            &ExecOptions::default(),
2674        );
2675
2676        assert_eq!(outcome.status, RunStatus::DiskFull);
2677        assert_eq!(outcome.failed(), 1);
2678        assert_eq!(outcome.failures[0].clip_id, "d1");
2679        assert!(outcome.failures[0].reason.contains("disk full"));
2680        assert_eq!(outcome.downloaded, 0);
2681        // The second clip was never fetched: the run aborted first.
2682        assert_eq!(http.count("d2.mp3"), 0);
2683        assert!(!fs.exists("d2.mp3"));
2684    }
2685
2686    #[test]
2687    fn disk_full_flac_transcode_aborts_the_run() {
2688        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2689        // there is nowhere to stage the transcode, so the run aborts.
2690        let c1 = clip("d1");
2691        let c2 = clip("d2");
2692        let d1 = desired(c1.clone(), AudioFormat::Flac);
2693        let d2 = desired(c2.clone(), AudioFormat::Flac);
2694        let plan = Plan {
2695            actions: vec![
2696                Action::Download {
2697                    clip: c1.clone(),
2698                    lineage: LineageContext::own_root(&c1),
2699                    path: d1.path.clone(),
2700                    format: AudioFormat::Flac,
2701                },
2702                Action::Download {
2703                    clip: c2.clone(),
2704                    lineage: LineageContext::own_root(&c2),
2705                    path: d2.path.clone(),
2706                    format: AudioFormat::Flac,
2707                },
2708            ],
2709        };
2710        let http = ScriptedHttp::new()
2711            .with_auth()
2712            .route(
2713                "/wav_file/",
2714                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2715            )
2716            .route(".wav", Reply::ok(b"wav".to_vec()));
2717        let fs = MemFs::new();
2718        let mut manifest = Manifest::new();
2719
2720        let outcome = run(
2721            &plan,
2722            &mut manifest,
2723            &[d1, d2],
2724            &http,
2725            &fs,
2726            &StubFfmpeg::out_of_space(),
2727            &RecordingClock::new(),
2728            &ExecOptions::default(),
2729        );
2730
2731        assert_eq!(outcome.status, RunStatus::DiskFull);
2732        assert_eq!(outcome.failed(), 1);
2733        assert_eq!(outcome.failures[0].clip_id, "d1");
2734        assert!(outcome.failures[0].reason.contains("disk full"));
2735        assert_eq!(outcome.downloaded, 0);
2736    }
2737
2738    #[test]
2739    fn disk_full_artifact_write_aborts_the_run() {
2740        // A sidecar write (not a primary download) also aborts on a full disk:
2741        // the owning audio is present, the cover fetch succeeds, but the sidecar
2742        // cannot be written.
2743        let mut manifest = Manifest::new();
2744        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2745        let plan = Plan {
2746            actions: vec![Action::WriteArtifact {
2747                kind: ArtifactKind::CoverJpg,
2748                path: "a/cover.jpg".to_owned(),
2749                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2750                hash: "h1".to_owned(),
2751                owner_id: "a".to_owned(),
2752                content: None,
2753            }],
2754        };
2755        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2756        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
2757
2758        let outcome = run(
2759            &plan,
2760            &mut manifest,
2761            &[],
2762            &http,
2763            &fs,
2764            &StubFfmpeg::flac(),
2765            &RecordingClock::new(),
2766            &ExecOptions::default(),
2767        );
2768
2769        assert_eq!(outcome.status, RunStatus::DiskFull);
2770        assert_eq!(outcome.failed(), 1);
2771        assert!(outcome.failures[0].reason.contains("disk full"));
2772        assert_eq!(outcome.artifacts_written, 0);
2773        // The sidecar slot was never recorded: the write failed before it.
2774        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
2775    }
2776
2777    #[test]
2778    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
2779        // write_verify fails before any manifest insert, so a re-download that
2780        // hits a full disk leaves the prior entry (and file) exactly as it was.
2781        let c = clip("m");
2782        let d = desired(c.clone(), AudioFormat::Mp3);
2783        let plan = Plan {
2784            actions: vec![Action::Download {
2785                clip: c.clone(),
2786                lineage: LineageContext::own_root(&c),
2787                path: d.path.clone(),
2788                format: AudioFormat::Mp3,
2789            }],
2790        };
2791        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
2792        let fs = MemFs::new()
2793            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
2794            .fail_write_out_of_space("m.mp3");
2795        let mut manifest = Manifest::new();
2796        let before = entry("m.mp3", AudioFormat::Mp3);
2797        manifest.insert("m", before.clone());
2798
2799        let outcome = run(
2800            &plan,
2801            &mut manifest,
2802            &[d],
2803            &http,
2804            &fs,
2805            &StubFfmpeg::flac(),
2806            &RecordingClock::new(),
2807            &ExecOptions::default(),
2808        );
2809
2810        assert_eq!(outcome.status, RunStatus::DiskFull);
2811        assert_eq!(manifest.get("m"), Some(&before));
2812        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
2813    }
2814
2815    #[test]
2816    fn cdn_download_rejection_skips_the_clip_without_aborting() {
2817        let c1 = clip("k1");
2818        let c2 = clip("k2");
2819        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2820        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2821        let plan = Plan {
2822            actions: vec![
2823                Action::Download {
2824                    clip: c1.clone(),
2825                    lineage: LineageContext::own_root(&c1),
2826                    path: d1.path.clone(),
2827                    format: AudioFormat::Mp3,
2828                },
2829                Action::Download {
2830                    clip: c2.clone(),
2831                    lineage: LineageContext::own_root(&c2),
2832                    path: d2.path.clone(),
2833                    format: AudioFormat::Mp3,
2834                },
2835            ],
2836        };
2837        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
2838        // rejection (often transient), not a bad token: the clip is retried
2839        // then recorded and skipped, and the run carries on to the rest.
2840        let http = ScriptedHttp::new()
2841            .route("k1.mp3", Reply::status(403))
2842            .route("k2.mp3", Reply::ok(b"body".to_vec()));
2843        let fs = MemFs::new();
2844        let mut manifest = Manifest::new();
2845
2846        let outcome = run(
2847            &plan,
2848            &mut manifest,
2849            &[d1, d2],
2850            &http,
2851            &fs,
2852            &StubFfmpeg::flac(),
2853            &RecordingClock::new(),
2854            &ExecOptions::default(),
2855        );
2856
2857        assert_ne!(outcome.status, RunStatus::AuthAborted);
2858        assert_eq!(outcome.downloaded, 1);
2859        assert_eq!(outcome.failed(), 1);
2860        assert_eq!(outcome.failures[0].clip_id, "k1");
2861    }
2862
2863    #[test]
2864    fn one_clip_failure_does_not_abort_the_run() {
2865        let c1 = clip("l1");
2866        let c2 = clip("l2");
2867        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2868        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2869        let plan = Plan {
2870            actions: vec![
2871                Action::Download {
2872                    clip: c1.clone(),
2873                    lineage: LineageContext::own_root(&c1),
2874                    path: d1.path.clone(),
2875                    format: AudioFormat::Mp3,
2876                },
2877                Action::Download {
2878                    clip: c2.clone(),
2879                    lineage: LineageContext::own_root(&c2),
2880                    path: d2.path.clone(),
2881                    format: AudioFormat::Mp3,
2882                },
2883            ],
2884        };
2885        let http = ScriptedHttp::new()
2886            .route("l1.mp3", Reply::status(404))
2887            .route("l2.mp3", Reply::ok(b"body".to_vec()));
2888        let fs = MemFs::new();
2889        let mut manifest = Manifest::new();
2890
2891        let outcome = run(
2892            &plan,
2893            &mut manifest,
2894            &[d1, d2],
2895            &http,
2896            &fs,
2897            &StubFfmpeg::flac(),
2898            &RecordingClock::new(),
2899            &ExecOptions::default(),
2900        );
2901
2902        assert_eq!(outcome.status, RunStatus::Completed);
2903        assert_eq!(outcome.downloaded, 1);
2904        assert_eq!(outcome.failed(), 1);
2905        assert_eq!(outcome.failures[0].clip_id, "l1");
2906        assert!(fs.exists("l2.mp3"));
2907        assert!(manifest.get("l2").is_some());
2908        assert!(manifest.get("l1").is_none());
2909    }
2910
2911    // ── preserve marker (SYNC-8) ────────────────────────────────────
2912
2913    #[test]
2914    fn preserve_is_set_for_copy_held_and_private_clips() {
2915        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
2916        mirror.modes = vec![SourceMode::Mirror];
2917        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
2918        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
2919        let mut private = desired(clip("m3"), AudioFormat::Mp3);
2920        private.private = true;
2921
2922        let plan = Plan {
2923            actions: vec![
2924                Action::Download {
2925                    clip: mirror.clip.clone(),
2926                    lineage: LineageContext::own_root(&mirror.clip),
2927                    path: mirror.path.clone(),
2928                    format: AudioFormat::Mp3,
2929                },
2930                Action::Download {
2931                    clip: copy_held.clip.clone(),
2932                    lineage: LineageContext::own_root(&copy_held.clip),
2933                    path: copy_held.path.clone(),
2934                    format: AudioFormat::Mp3,
2935                },
2936                Action::Download {
2937                    clip: private.clip.clone(),
2938                    lineage: LineageContext::own_root(&private.clip),
2939                    path: private.path.clone(),
2940                    format: AudioFormat::Mp3,
2941                },
2942            ],
2943        };
2944        let http = ScriptedHttp::new()
2945            .route("m1.mp3", Reply::ok(b"a".to_vec()))
2946            .route("m2.mp3", Reply::ok(b"b".to_vec()))
2947            .route("m3.mp3", Reply::ok(b"c".to_vec()));
2948        let fs = MemFs::new();
2949        let mut manifest = Manifest::new();
2950
2951        let outcome = run(
2952            &plan,
2953            &mut manifest,
2954            &[mirror, copy_held, private],
2955            &http,
2956            &fs,
2957            &StubFfmpeg::flac(),
2958            &RecordingClock::new(),
2959            &ExecOptions::default(),
2960        );
2961
2962        assert_eq!(outcome.downloaded, 3);
2963        assert!(!manifest.get("m1").unwrap().preserve);
2964        assert!(manifest.get("m2").unwrap().preserve);
2965        assert!(manifest.get("m3").unwrap().preserve);
2966    }
2967
2968    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
2969
2970    #[test]
2971    fn reformat_writes_new_format_and_removes_old_file() {
2972        let c = clip("n");
2973        let d = desired(c.clone(), AudioFormat::Mp3);
2974        let plan = Plan {
2975            actions: vec![Action::Reformat {
2976                clip: c.clone(),
2977                path: "n.mp3".to_owned(),
2978                from_path: "n.flac".to_owned(),
2979                from: AudioFormat::Flac,
2980                to: AudioFormat::Mp3,
2981            }],
2982        };
2983        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
2984        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
2985        let mut manifest = Manifest::new();
2986        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
2987
2988        let outcome = run(
2989            &plan,
2990            &mut manifest,
2991            &[d],
2992            &http,
2993            &fs,
2994            &StubFfmpeg::flac(),
2995            &RecordingClock::new(),
2996            &ExecOptions::default(),
2997        );
2998
2999        assert_eq!(outcome.reformatted, 1);
3000        assert!(fs.exists("n.mp3"));
3001        assert!(!fs.exists("n.flac"));
3002        let updated = manifest.get("n").unwrap();
3003        assert_eq!(updated.path, "n.mp3");
3004        assert_eq!(updated.format, AudioFormat::Mp3);
3005        assert_eq!(updated.meta_hash, "m");
3006    }
3007
3008    #[test]
3009    fn retag_rewrites_file_and_updates_hashes() {
3010        let c = clip("o");
3011        let mut d = desired(c.clone(), AudioFormat::Mp3);
3012        d.meta_hash = "new".to_owned();
3013        d.art_hash = "new-art".to_owned();
3014        let existing = tag_mp3(
3015            b"audio",
3016            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
3017            None,
3018            None,
3019        )
3020        .unwrap();
3021        let fs = MemFs::new().with_file("o.mp3", existing.clone());
3022        let mut manifest = Manifest::new();
3023        let mut start = entry("o.mp3", AudioFormat::Mp3);
3024        start.size = existing.len() as u64;
3025        manifest.insert("o", start);
3026        let plan = Plan {
3027            actions: vec![Action::Retag {
3028                clip: c.clone(),
3029                lineage: LineageContext::own_root(&c),
3030                path: "o.mp3".to_owned(),
3031            }],
3032        };
3033
3034        let outcome = run(
3035            &plan,
3036            &mut manifest,
3037            &[d],
3038            &ScriptedHttp::new(),
3039            &fs,
3040            &StubFfmpeg::flac(),
3041            &RecordingClock::new(),
3042            &ExecOptions::default(),
3043        );
3044
3045        assert_eq!(outcome.retagged, 1);
3046        let updated = manifest.get("o").unwrap();
3047        assert_eq!(updated.meta_hash, "new");
3048        assert_eq!(updated.art_hash, "new-art");
3049        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
3050    }
3051
3052    #[test]
3053    fn rename_moves_file_and_updates_manifest_path() {
3054        let c = clip("p");
3055        let mut d = desired(c.clone(), AudioFormat::Mp3);
3056        d.path = "new/p.mp3".to_owned();
3057        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3058        let mut manifest = Manifest::new();
3059        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3060        let plan = Plan {
3061            actions: vec![Action::Rename {
3062                from: "old/p.mp3".to_owned(),
3063                to: "new/p.mp3".to_owned(),
3064            }],
3065        };
3066
3067        let outcome = run(
3068            &plan,
3069            &mut manifest,
3070            &[d],
3071            &ScriptedHttp::new(),
3072            &fs,
3073            &StubFfmpeg::flac(),
3074            &RecordingClock::new(),
3075            &ExecOptions::default(),
3076        );
3077
3078        assert_eq!(outcome.renamed, 1);
3079        assert!(fs.exists("new/p.mp3"));
3080        assert!(!fs.exists("old/p.mp3"));
3081        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3082    }
3083
3084    #[test]
3085    fn disk_full_rename_aborts_the_run() {
3086        // A move onto a full disk is systemic like a full-disk write: the run
3087        // aborts with DiskFull and the source file is left untouched.
3088        let c = clip("p");
3089        let mut d = desired(c.clone(), AudioFormat::Mp3);
3090        d.path = "new/p.mp3".to_owned();
3091        let fs = MemFs::new()
3092            .with_file("old/p.mp3", b"DATA".to_vec())
3093            .fail_rename_out_of_space("new/p.mp3");
3094        let mut manifest = Manifest::new();
3095        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3096        let plan = Plan {
3097            actions: vec![Action::Rename {
3098                from: "old/p.mp3".to_owned(),
3099                to: "new/p.mp3".to_owned(),
3100            }],
3101        };
3102
3103        let outcome = run(
3104            &plan,
3105            &mut manifest,
3106            &[d],
3107            &ScriptedHttp::new(),
3108            &fs,
3109            &StubFfmpeg::flac(),
3110            &RecordingClock::new(),
3111            &ExecOptions::default(),
3112        );
3113
3114        assert_eq!(outcome.status, RunStatus::DiskFull);
3115        assert_eq!(outcome.renamed, 0);
3116        assert_eq!(outcome.failed(), 1);
3117        assert!(outcome.failures[0].reason.contains("disk full"));
3118        // The source is untouched: the move never happened.
3119        assert!(fs.exists("old/p.mp3"));
3120        assert!(!fs.exists("new/p.mp3"));
3121        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3122    }
3123
3124    #[test]
3125    fn delete_removes_file_and_manifest_entry() {
3126        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3127        let mut manifest = Manifest::new();
3128        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3129        let plan = Plan {
3130            actions: vec![Action::Delete {
3131                path: "q.mp3".to_owned(),
3132                clip_id: "q".to_owned(),
3133            }],
3134        };
3135
3136        let outcome = run(
3137            &plan,
3138            &mut manifest,
3139            &[],
3140            &ScriptedHttp::new(),
3141            &fs,
3142            &StubFfmpeg::flac(),
3143            &RecordingClock::new(),
3144            &ExecOptions::default(),
3145        );
3146
3147        assert_eq!(outcome.deleted, 1);
3148        assert!(!fs.exists("q.mp3"));
3149        assert!(manifest.get("q").is_none());
3150    }
3151
3152    #[test]
3153    fn failed_delete_keeps_the_manifest_entry() {
3154        let fs = MemFs::new()
3155            .with_file("s.mp3", b"DATA".to_vec())
3156            .fail_remove("s.mp3");
3157        let mut manifest = Manifest::new();
3158        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3159        let plan = Plan {
3160            actions: vec![Action::Delete {
3161                path: "s.mp3".to_owned(),
3162                clip_id: "s".to_owned(),
3163            }],
3164        };
3165
3166        let outcome = run(
3167            &plan,
3168            &mut manifest,
3169            &[],
3170            &ScriptedHttp::new(),
3171            &fs,
3172            &StubFfmpeg::flac(),
3173            &RecordingClock::new(),
3174            &ExecOptions::default(),
3175        );
3176
3177        assert_eq!(outcome.deleted, 0);
3178        assert_eq!(outcome.failed(), 1);
3179        assert!(manifest.get("s").is_some());
3180        assert!(fs.exists("s.mp3"));
3181    }
3182
3183    #[test]
3184    fn skip_is_a_noop() {
3185        let mut manifest = Manifest::new();
3186        let plan = Plan {
3187            actions: vec![Action::Skip {
3188                clip_id: "r".to_owned(),
3189            }],
3190        };
3191        let outcome = run(
3192            &plan,
3193            &mut manifest,
3194            &[],
3195            &ScriptedHttp::new(),
3196            &MemFs::new(),
3197            &StubFfmpeg::flac(),
3198            &RecordingClock::new(),
3199            &ExecOptions::default(),
3200        );
3201        assert_eq!(outcome.skipped, 1);
3202        assert_eq!(outcome.failed(), 0);
3203    }
3204
3205    // ── Pure helpers ────────────────────────────────────────────────
3206
3207    #[test]
3208    fn header_helpers_parse_or_ignore() {
3209        let resp = HttpResponse {
3210            status: 200,
3211            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3212            body: Vec::new(),
3213        };
3214        assert_eq!(content_length(&resp), Some(42));
3215
3216        let bare = HttpResponse {
3217            status: 200,
3218            headers: Vec::new(),
3219            body: Vec::new(),
3220        };
3221        assert_eq!(content_length(&bare), None);
3222    }
3223
3224    #[test]
3225    fn preserve_rule_covers_copy_and_private() {
3226        let base = desired(clip("x"), AudioFormat::Mp3);
3227        assert!(!preserve_for(&base));
3228        let mut copy_held = base.clone();
3229        copy_held.modes = vec![SourceMode::Copy];
3230        assert!(preserve_for(&copy_held));
3231        let mut private = base.clone();
3232        private.private = true;
3233        assert!(preserve_for(&private));
3234    }
3235
3236    fn fs_new() -> MemFs {
3237        MemFs::new()
3238    }
3239
3240    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3241
3242    #[test]
3243    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3244        let c = clip("s1");
3245        let mut d = desired(c.clone(), AudioFormat::Mp3);
3246        d.modes = vec![SourceMode::Copy];
3247        let plan = Plan {
3248            actions: vec![Action::Skip {
3249                clip_id: "s1".to_owned(),
3250            }],
3251        };
3252        let mut manifest = Manifest::new();
3253        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3254        assert!(!manifest.get("s1").unwrap().preserve);
3255
3256        let outcome = run(
3257            &plan,
3258            &mut manifest,
3259            &[d],
3260            &ScriptedHttp::new(),
3261            &fs_new(),
3262            &StubFfmpeg::flac(),
3263            &RecordingClock::new(),
3264            &ExecOptions::default(),
3265        );
3266
3267        assert_eq!(outcome.skipped, 1);
3268        assert!(
3269            manifest.get("s1").unwrap().preserve,
3270            "a copy-held skip must mark the entry preserved"
3271        );
3272    }
3273
3274    #[test]
3275    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3276        let c = clip("s2");
3277        let d = desired(c.clone(), AudioFormat::Mp3);
3278        let plan = Plan {
3279            actions: vec![Action::Skip {
3280                clip_id: "s2".to_owned(),
3281            }],
3282        };
3283        let mut manifest = Manifest::new();
3284        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3285        stale.preserve = true;
3286        manifest.insert("s2".to_owned(), stale);
3287
3288        run(
3289            &plan,
3290            &mut manifest,
3291            &[d],
3292            &ScriptedHttp::new(),
3293            &fs_new(),
3294            &StubFfmpeg::flac(),
3295            &RecordingClock::new(),
3296            &ExecOptions::default(),
3297        );
3298
3299        assert!(
3300            !manifest.get("s2").unwrap().preserve,
3301            "a mirror-only skip must clear a stale preserve marker"
3302        );
3303    }
3304
3305    #[test]
3306    fn flac_render_retries_a_rate_limited_wav_lookup() {
3307        let c = clip("rl");
3308        let d = desired(c.clone(), AudioFormat::Flac);
3309        let plan = Plan {
3310            actions: vec![Action::Download {
3311                clip: c.clone(),
3312                lineage: LineageContext::own_root(&c),
3313                path: d.path.clone(),
3314                format: AudioFormat::Flac,
3315            }],
3316        };
3317        let http = ScriptedHttp::new()
3318            .with_auth()
3319            .route_seq(
3320                "/wav_file/",
3321                vec![
3322                    Reply::status(429),
3323                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3324                ],
3325            )
3326            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3327        let clock = RecordingClock::new();
3328        let mut manifest = Manifest::new();
3329
3330        let outcome = run(
3331            &plan,
3332            &mut manifest,
3333            &[d],
3334            &http,
3335            &fs_new(),
3336            &StubFfmpeg::flac(),
3337            &clock,
3338            &small_poll(),
3339        );
3340
3341        assert_eq!(outcome.downloaded, 1);
3342        assert_eq!(outcome.failed(), 0);
3343        // The render was ready on retry, so no fresh convert_wav was needed.
3344        assert_eq!(http.count("/convert_wav/"), 0);
3345        // One transient backoff (1s base), not the 5s poll interval.
3346        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3347    }
3348
3349    // ── Phase 6: artifact actions ───────────────────────────────────
3350
3351    #[test]
3352    fn write_artifact_fetches_writes_and_updates_manifest() {
3353        // The owning entry exists (its audio was kept this run); WriteArtifact
3354        // fetches the source, writes the sidecar, and records it on the entry.
3355        let mut manifest = Manifest::new();
3356        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3357        let plan = Plan {
3358            actions: vec![Action::WriteArtifact {
3359                kind: ArtifactKind::CoverJpg,
3360                path: "a/cover.jpg".to_owned(),
3361                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3362                hash: "h1".to_owned(),
3363                owner_id: "a".to_owned(),
3364                content: None,
3365            }],
3366        };
3367        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3368        let fs = MemFs::new();
3369
3370        let outcome = run(
3371            &plan,
3372            &mut manifest,
3373            &[],
3374            &http,
3375            &fs,
3376            &StubFfmpeg::flac(),
3377            &RecordingClock::new(),
3378            &ExecOptions::default(),
3379        );
3380
3381        assert_eq!(outcome.artifacts_written, 1);
3382        assert_eq!(outcome.failed(), 0);
3383        assert_eq!(outcome.status, RunStatus::Completed);
3384        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3385        assert_eq!(
3386            manifest.get("a").unwrap().cover_jpg,
3387            Some(ArtifactState {
3388                path: "a/cover.jpg".to_owned(),
3389                hash: "h1".to_owned(),
3390            })
3391        );
3392    }
3393
3394    #[test]
3395    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3396        // A generated text sidecar carries its body inline, so it is written
3397        // verbatim with NO HTTP fetch and the details slot records its state.
3398        let mut manifest = Manifest::new();
3399        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3400        let plan = Plan {
3401            actions: vec![Action::WriteArtifact {
3402                kind: ArtifactKind::DetailsTxt,
3403                path: "a.details.txt".to_owned(),
3404                source_url: String::new(),
3405                hash: "dh".to_owned(),
3406                owner_id: "a".to_owned(),
3407                content: Some("Title: A\n".to_owned()),
3408            }],
3409        };
3410        // An empty HTTP script: any fetch would fail, proving none happens.
3411        let http = ScriptedHttp::new();
3412        let fs = MemFs::new();
3413
3414        let outcome = run(
3415            &plan,
3416            &mut manifest,
3417            &[],
3418            &http,
3419            &fs,
3420            &StubFfmpeg::flac(),
3421            &RecordingClock::new(),
3422            &ExecOptions::default(),
3423        );
3424
3425        assert_eq!(outcome.artifacts_written, 1);
3426        assert_eq!(outcome.failed(), 0);
3427        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3428        assert_eq!(
3429            manifest.get("a").unwrap().details_txt,
3430            Some(ArtifactState {
3431                path: "a.details.txt".to_owned(),
3432                hash: "dh".to_owned(),
3433            })
3434        );
3435    }
3436
3437    #[test]
3438    fn write_lyrics_sidecar_relocation_removes_old_file() {
3439        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3440        // the executor writes the new file and prunes the stale one.
3441        let mut manifest = Manifest::new();
3442        let mut e = entry("old/a.flac", AudioFormat::Flac);
3443        e.lyrics_txt = Some(ArtifactState {
3444            path: "old/a.lyrics.txt".to_owned(),
3445            hash: "lh".to_owned(),
3446        });
3447        manifest.insert("a", e);
3448        let fs = MemFs::new()
3449            .with_file("old/a.flac", b"AUDIO".to_vec())
3450            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3451        let plan = Plan {
3452            actions: vec![Action::WriteArtifact {
3453                kind: ArtifactKind::LyricsTxt,
3454                path: "new/a.lyrics.txt".to_owned(),
3455                source_url: String::new(),
3456                hash: "lh".to_owned(),
3457                owner_id: "a".to_owned(),
3458                content: Some("new words\n".to_owned()),
3459            }],
3460        };
3461
3462        let outcome = run(
3463            &plan,
3464            &mut manifest,
3465            &[],
3466            &ScriptedHttp::new(),
3467            &fs,
3468            &StubFfmpeg::flac(),
3469            &RecordingClock::new(),
3470            &ExecOptions::default(),
3471        );
3472
3473        assert_eq!(outcome.failed(), 0);
3474        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3475        assert!(!fs.exists("old/a.lyrics.txt"));
3476        assert_eq!(
3477            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3478            "new/a.lyrics.txt"
3479        );
3480    }
3481
3482    #[test]
3483    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3484        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3485        // Each write's inline old-path cleanup must skip a path another action
3486        // writes this run, or the second write would delete the first's freshly
3487        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3488        // for every sidecar, including the .mp4 video.
3489        let mut manifest = Manifest::new();
3490        let mut a = entry("a.flac", AudioFormat::Flac);
3491        a.lyrics_txt = Some(ArtifactState {
3492            path: "x.lyrics.txt".to_owned(),
3493            hash: "ah".to_owned(),
3494        });
3495        manifest.insert("a", a);
3496        let mut b = entry("b.flac", AudioFormat::Flac);
3497        b.lyrics_txt = Some(ArtifactState {
3498            path: "y.lyrics.txt".to_owned(),
3499            hash: "bh".to_owned(),
3500        });
3501        manifest.insert("b", b);
3502        let fs = MemFs::new()
3503            .with_file("a.flac", b"A".to_vec())
3504            .with_file("b.flac", b"B".to_vec())
3505            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3506            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3507        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3508        let plan = Plan {
3509            actions: vec![
3510                Action::WriteArtifact {
3511                    kind: ArtifactKind::LyricsTxt,
3512                    path: "y.lyrics.txt".to_owned(),
3513                    source_url: String::new(),
3514                    hash: "ah".to_owned(),
3515                    owner_id: "a".to_owned(),
3516                    content: Some("A words\n".to_owned()),
3517                },
3518                Action::WriteArtifact {
3519                    kind: ArtifactKind::LyricsTxt,
3520                    path: "x.lyrics.txt".to_owned(),
3521                    source_url: String::new(),
3522                    hash: "bh".to_owned(),
3523                    owner_id: "b".to_owned(),
3524                    content: Some("B words\n".to_owned()),
3525                },
3526            ],
3527        };
3528
3529        let outcome = run(
3530            &plan,
3531            &mut manifest,
3532            &[],
3533            &ScriptedHttp::new(),
3534            &fs,
3535            &StubFfmpeg::flac(),
3536            &RecordingClock::new(),
3537            &ExecOptions::default(),
3538        );
3539
3540        assert_eq!(outcome.failed(), 0);
3541        // Both freshly written files survive; neither cleanup clobbered the other.
3542        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3543        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3544        assert_eq!(
3545            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3546            "y.lyrics.txt"
3547        );
3548        assert_eq!(
3549            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3550            "x.lyrics.txt"
3551        );
3552    }
3553
3554    #[test]
3555    fn old_sidecar_kept_when_another_clip_still_references_it() {
3556        // A prior failed swap can leave two clips pointing at one path (A -> y and
3557        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3558        // still A's live file (#76). tracked_paths counts two references to y, so
3559        // the removal is skipped even though y is not a write target this run.
3560        let mut manifest = Manifest::new();
3561        let mut a = entry("a.flac", AudioFormat::Flac);
3562        a.lyrics_txt = Some(ArtifactState {
3563            path: "y.lyrics.txt".to_owned(),
3564            hash: "ah".to_owned(),
3565        });
3566        manifest.insert("a", a);
3567        let mut b = entry("b.flac", AudioFormat::Flac);
3568        b.lyrics_txt = Some(ArtifactState {
3569            path: "y.lyrics.txt".to_owned(),
3570            hash: "bh".to_owned(),
3571        });
3572        manifest.insert("b", b);
3573        let fs = MemFs::new()
3574            .with_file("a.flac", b"A".to_vec())
3575            .with_file("b.flac", b"B".to_vec())
3576            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3577        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3578        // the tracked-reference count is what protects A's file.
3579        let plan = Plan {
3580            actions: vec![Action::WriteArtifact {
3581                kind: ArtifactKind::LyricsTxt,
3582                path: "x.lyrics.txt".to_owned(),
3583                source_url: String::new(),
3584                hash: "bh".to_owned(),
3585                owner_id: "b".to_owned(),
3586                content: Some("B words\n".to_owned()),
3587            }],
3588        };
3589
3590        let outcome = run(
3591            &plan,
3592            &mut manifest,
3593            &[],
3594            &ScriptedHttp::new(),
3595            &fs,
3596            &StubFfmpeg::flac(),
3597            &RecordingClock::new(),
3598            &ExecOptions::default(),
3599        );
3600
3601        assert_eq!(outcome.failed(), 0);
3602        assert!(
3603            fs.exists("y.lyrics.txt"),
3604            "A's live sidecar must not be deleted"
3605        );
3606        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3607    }
3608
3609    #[test]
3610    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3611        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3612        // When BOTH move away this run, the path is no longer live, so the last
3613        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3614        // still referenced. The dynamic reference count drops to zero only after
3615        // both moves, so exactly the final cleanup removes it (#76).
3616        let mut manifest = Manifest::new();
3617        let mut a = entry("a.flac", AudioFormat::Flac);
3618        a.lyrics_txt = Some(ArtifactState {
3619            path: "s.lyrics.txt".to_owned(),
3620            hash: "ah".to_owned(),
3621        });
3622        manifest.insert("a", a);
3623        let mut b = entry("b.flac", AudioFormat::Flac);
3624        b.lyrics_txt = Some(ArtifactState {
3625            path: "s.lyrics.txt".to_owned(),
3626            hash: "bh".to_owned(),
3627        });
3628        manifest.insert("b", b);
3629        let fs = MemFs::new()
3630            .with_file("a.flac", b"A".to_vec())
3631            .with_file("b.flac", b"B".to_vec())
3632            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3633        let plan = Plan {
3634            actions: vec![
3635                Action::WriteArtifact {
3636                    kind: ArtifactKind::LyricsTxt,
3637                    path: "pa.lyrics.txt".to_owned(),
3638                    source_url: String::new(),
3639                    hash: "ah".to_owned(),
3640                    owner_id: "a".to_owned(),
3641                    content: Some("A words\n".to_owned()),
3642                },
3643                Action::WriteArtifact {
3644                    kind: ArtifactKind::LyricsTxt,
3645                    path: "pb.lyrics.txt".to_owned(),
3646                    source_url: String::new(),
3647                    hash: "bh".to_owned(),
3648                    owner_id: "b".to_owned(),
3649                    content: Some("B words\n".to_owned()),
3650                },
3651            ],
3652        };
3653
3654        let outcome = run(
3655            &plan,
3656            &mut manifest,
3657            &[],
3658            &ScriptedHttp::new(),
3659            &fs,
3660            &StubFfmpeg::flac(),
3661            &RecordingClock::new(),
3662            &ExecOptions::default(),
3663        );
3664
3665        assert_eq!(outcome.failed(), 0);
3666        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3667        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3668        assert!(
3669            !fs.exists("s.lyrics.txt"),
3670            "the vacated shared path must be reclaimed, not orphaned"
3671        );
3672    }
3673
3674    #[test]
3675    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3676        // A text sidecar for a clip with no manifest entry (its audio download
3677        // failed) must be skipped, never writing an untracked file.
3678        let plan = Plan {
3679            actions: vec![Action::WriteArtifact {
3680                kind: ArtifactKind::DetailsTxt,
3681                path: "gone.details.txt".to_owned(),
3682                source_url: String::new(),
3683                hash: "dh".to_owned(),
3684                owner_id: "gone".to_owned(),
3685                content: Some("Title: Gone\n".to_owned()),
3686            }],
3687        };
3688        let fs = MemFs::new();
3689        let mut manifest = Manifest::new();
3690
3691        let outcome = run(
3692            &plan,
3693            &mut manifest,
3694            &[],
3695            &ScriptedHttp::new(),
3696            &fs,
3697            &StubFfmpeg::flac(),
3698            &RecordingClock::new(),
3699            &ExecOptions::default(),
3700        );
3701
3702        assert_eq!(outcome.artifacts_written, 0);
3703        assert_eq!(outcome.skipped, 1);
3704        assert!(!fs.exists("gone.details.txt"));
3705        assert!(manifest.get("gone").is_none());
3706    }
3707
3708    #[test]
3709    fn delete_artifact_removes_file_and_clears_slot() {
3710        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3711        let mut manifest = Manifest::new();
3712        let mut e = entry("a.mp3", AudioFormat::Mp3);
3713        e.cover_jpg = Some(ArtifactState {
3714            path: "a/cover.jpg".to_owned(),
3715            hash: "h1".to_owned(),
3716        });
3717        manifest.insert("a", e);
3718        let plan = Plan {
3719            actions: vec![Action::DeleteArtifact {
3720                kind: ArtifactKind::CoverJpg,
3721                path: "a/cover.jpg".to_owned(),
3722                owner_id: "a".to_owned(),
3723            }],
3724        };
3725
3726        let outcome = run(
3727            &plan,
3728            &mut manifest,
3729            &[],
3730            &ScriptedHttp::new(),
3731            &fs,
3732            &StubFfmpeg::flac(),
3733            &RecordingClock::new(),
3734            &ExecOptions::default(),
3735        );
3736
3737        assert_eq!(outcome.artifacts_deleted, 1);
3738        assert!(!fs.exists("a/cover.jpg"));
3739        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3740    }
3741
3742    #[test]
3743    fn delete_artifact_tolerates_already_absent_file() {
3744        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3745        // is not a failure.
3746        let mut manifest = Manifest::new();
3747        let mut e = entry("a.mp3", AudioFormat::Mp3);
3748        e.cover_jpg = Some(ArtifactState {
3749            path: "a/cover.jpg".to_owned(),
3750            hash: "h1".to_owned(),
3751        });
3752        manifest.insert("a", e);
3753        let plan = Plan {
3754            actions: vec![Action::DeleteArtifact {
3755                kind: ArtifactKind::CoverJpg,
3756                path: "a/cover.jpg".to_owned(),
3757                owner_id: "a".to_owned(),
3758            }],
3759        };
3760
3761        let outcome = run(
3762            &plan,
3763            &mut manifest,
3764            &[],
3765            &ScriptedHttp::new(),
3766            &MemFs::new(),
3767            &StubFfmpeg::flac(),
3768            &RecordingClock::new(),
3769            &ExecOptions::default(),
3770        );
3771
3772        assert_eq!(outcome.artifacts_deleted, 1);
3773        assert_eq!(outcome.failed(), 0);
3774        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3775    }
3776
3777    #[test]
3778    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
3779        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
3780        // the run continues and the following WriteArtifact still succeeds.
3781        let mut manifest = Manifest::new();
3782        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3783        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3784        let plan = Plan {
3785            actions: vec![
3786                Action::WriteArtifact {
3787                    kind: ArtifactKind::CoverJpg,
3788                    path: "a/cover.jpg".to_owned(),
3789                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3790                    hash: "h1".to_owned(),
3791                    owner_id: "a".to_owned(),
3792                    content: None,
3793                },
3794                Action::WriteArtifact {
3795                    kind: ArtifactKind::CoverJpg,
3796                    path: "b/cover.jpg".to_owned(),
3797                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3798                    hash: "h2".to_owned(),
3799                    owner_id: "b".to_owned(),
3800                    content: None,
3801                },
3802            ],
3803        };
3804        let http = ScriptedHttp::new()
3805            .route("a/large.jpg", Reply::status(404))
3806            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3807        let fs = MemFs::new();
3808
3809        let outcome = run(
3810            &plan,
3811            &mut manifest,
3812            &[],
3813            &http,
3814            &fs,
3815            &StubFfmpeg::flac(),
3816            &RecordingClock::new(),
3817            &ExecOptions::default(),
3818        );
3819
3820        assert_eq!(outcome.status, RunStatus::Completed);
3821        assert_eq!(outcome.failed(), 1);
3822        assert_eq!(outcome.failures[0].clip_id, "a");
3823        assert_eq!(outcome.artifacts_written, 1);
3824        // The failed sidecar left no file and no manifest record.
3825        assert!(!fs.exists("a/cover.jpg"));
3826        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3827        // The following sidecar was written and recorded.
3828        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3829        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3830    }
3831
3832    #[test]
3833    fn co_delete_executes_audio_delete_then_artifact_delete() {
3834        // The plan orders the audio Delete before its sidecar DeleteArtifact.
3835        // The audio delete removes the manifest entry; the sidecar delete then
3836        // removes the file and tolerates the now-absent entry.
3837        let fs = MemFs::new()
3838            .with_file("gone.mp3", b"DATA".to_vec())
3839            .with_file("gone/cover.jpg", b"jpg".to_vec());
3840        let mut manifest = Manifest::new();
3841        let mut e = entry("gone.mp3", AudioFormat::Mp3);
3842        e.cover_jpg = Some(ArtifactState {
3843            path: "gone/cover.jpg".to_owned(),
3844            hash: "h1".to_owned(),
3845        });
3846        manifest.insert("gone", e);
3847        let plan = Plan {
3848            actions: vec![
3849                Action::Delete {
3850                    path: "gone.mp3".to_owned(),
3851                    clip_id: "gone".to_owned(),
3852                },
3853                Action::DeleteArtifact {
3854                    kind: ArtifactKind::CoverJpg,
3855                    path: "gone/cover.jpg".to_owned(),
3856                    owner_id: "gone".to_owned(),
3857                },
3858            ],
3859        };
3860
3861        let outcome = run(
3862            &plan,
3863            &mut manifest,
3864            &[],
3865            &ScriptedHttp::new(),
3866            &fs,
3867            &StubFfmpeg::flac(),
3868            &RecordingClock::new(),
3869            &ExecOptions::default(),
3870        );
3871
3872        assert_eq!(outcome.deleted, 1);
3873        assert_eq!(outcome.artifacts_deleted, 1);
3874        assert_eq!(outcome.failed(), 0);
3875        assert!(!fs.exists("gone.mp3"));
3876        assert!(!fs.exists("gone/cover.jpg"));
3877        assert!(manifest.get("gone").is_none());
3878    }
3879
3880    #[test]
3881    fn write_stem_mp3_stores_raw_and_records_slot() {
3882        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
3883        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
3884        // keyed slot records the path and hash.
3885        let mut manifest = Manifest::new();
3886        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
3887        let plan = Plan {
3888            actions: vec![Action::WriteStem {
3889                clip_id: "a".to_owned(),
3890                key: "voc".to_owned(),
3891                stem_id: "voc".to_owned(),
3892                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
3893                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
3894                format: StemFormat::Mp3,
3895                hash: "vh".to_owned(),
3896            }],
3897        };
3898        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
3899        let fs = MemFs::new();
3900
3901        let outcome = run(
3902            &plan,
3903            &mut manifest,
3904            &[],
3905            &http,
3906            &fs,
3907            &StubFfmpeg::flac(),
3908            &RecordingClock::new(),
3909            &ExecOptions::default(),
3910        );
3911
3912        assert_eq!(outcome.artifacts_written, 1);
3913        assert_eq!(outcome.failed(), 0);
3914        // Bytes are stored exactly as delivered (no transcode applied).
3915        assert_eq!(
3916            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
3917            b"stem-bytes"
3918        );
3919        // An MP3 stem never renders WAV: no convert_wav, no generation.
3920        assert_eq!(http.count("convert_wav"), 0);
3921        assert_eq!(http.count("/api/gen/"), 0);
3922        assert_eq!(
3923            manifest.get("a").unwrap().stems.get("voc"),
3924            Some(&ArtifactState {
3925                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
3926                hash: "vh".to_owned(),
3927            })
3928        );
3929    }
3930
3931    #[test]
3932    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
3933        // A WAV stem (the default) renders the stem clip's lossless WAV through the
3934        // free convert_wav flow keyed on the stem id, then downloads and stores it
3935        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
3936        let mut manifest = Manifest::new();
3937        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
3938        let plan = Plan {
3939            actions: vec![Action::WriteStem {
3940                clip_id: "a".to_owned(),
3941                key: "voc".to_owned(),
3942                stem_id: "stemvoc".to_owned(),
3943                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
3944                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
3945                format: StemFormat::Wav,
3946                hash: "vh".to_owned(),
3947            }],
3948        };
3949        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
3950        // (free) and polls again — exactly the main FLAC/WAV render path.
3951        let http = ScriptedHttp::new()
3952            .with_auth()
3953            .route_seq(
3954                "stemvoc/wav_file/",
3955                vec![
3956                    Reply::json("{}"),
3957                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
3958                ],
3959            )
3960            .route("stemvoc/convert_wav/", Reply::status(200))
3961            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
3962        let fs = MemFs::new();
3963
3964        let outcome = run(
3965            &plan,
3966            &mut manifest,
3967            &[],
3968            &http,
3969            &fs,
3970            &StubFfmpeg::flac(),
3971            &RecordingClock::new(),
3972            &small_poll(),
3973        );
3974
3975        assert_eq!(outcome.artifacts_written, 1);
3976        assert_eq!(outcome.failed(), 0);
3977        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
3978        // so the stored bytes are the raw WAV, not a FLAC transcode.
3979        assert_eq!(
3980            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
3981            b"RIFFwav-bytes"
3982        );
3983        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
3984        // The free WAV render ran; no credit-spending generation endpoint did.
3985        assert_eq!(http.count("convert_wav"), 1);
3986        assert_eq!(http.count("stem_task"), 0);
3987        assert_eq!(http.count("separate"), 0);
3988        assert_eq!(
3989            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
3990            "a.stems/a - Vocals [stemvoc].wav"
3991        );
3992    }
3993
3994    #[test]
3995    fn write_stem_is_skipped_when_owner_audio_is_absent() {
3996        // No owning manifest entry (audio failed or never existed) => skip with
3997        // no fetch and no write, so a stem is never stranded without its song.
3998        let mut manifest = Manifest::new();
3999        let plan = Plan {
4000            actions: vec![Action::WriteStem {
4001                clip_id: "ghost".to_owned(),
4002                key: "voc".to_owned(),
4003                stem_id: "voc".to_owned(),
4004                path: "ghost.stems/voc.mp3".to_owned(),
4005                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4006                format: StemFormat::Mp3,
4007                hash: "vh".to_owned(),
4008            }],
4009        };
4010        // Empty HTTP script: any fetch would error, proving none happens.
4011        let http = ScriptedHttp::new();
4012        let fs = MemFs::new();
4013
4014        let outcome = run(
4015            &plan,
4016            &mut manifest,
4017            &[],
4018            &http,
4019            &fs,
4020            &StubFfmpeg::flac(),
4021            &RecordingClock::new(),
4022            &ExecOptions::default(),
4023        );
4024
4025        assert_eq!(outcome.skipped, 1);
4026        assert_eq!(outcome.artifacts_written, 0);
4027        assert_eq!(outcome.failed(), 0);
4028        assert!(!fs.exists("ghost.stems/voc.mp3"));
4029    }
4030
4031    #[test]
4032    fn write_stem_relocates_the_old_file_on_a_path_move() {
4033        // The song was renamed, so the stem moves: the new file is written and the
4034        // stale copy at the previously tracked path is removed (moved, not orphaned).
4035        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
4036        let mut manifest = Manifest::new();
4037        let mut e = entry("new.flac", AudioFormat::Flac);
4038        e.stems.insert(
4039            "voc".to_owned(),
4040            ArtifactState {
4041                path: "old.stems/voc.mp3".to_owned(),
4042                hash: "vh".to_owned(),
4043            },
4044        );
4045        manifest.insert("a", e);
4046        let plan = Plan {
4047            actions: vec![Action::WriteStem {
4048                clip_id: "a".to_owned(),
4049                key: "voc".to_owned(),
4050                stem_id: "voc".to_owned(),
4051                path: "new.stems/voc.mp3".to_owned(),
4052                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4053                format: StemFormat::Mp3,
4054                hash: "vh".to_owned(),
4055            }],
4056        };
4057        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4058
4059        let outcome = run(
4060            &plan,
4061            &mut manifest,
4062            &[],
4063            &http,
4064            &fs,
4065            &StubFfmpeg::flac(),
4066            &RecordingClock::new(),
4067            &ExecOptions::default(),
4068        );
4069
4070        assert_eq!(outcome.artifacts_written, 1);
4071        assert!(fs.exists("new.stems/voc.mp3"));
4072        assert!(
4073            !fs.exists("old.stems/voc.mp3"),
4074            "the old stem is moved, not left behind"
4075        );
4076        assert_eq!(
4077            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4078            "new.stems/voc.mp3"
4079        );
4080    }
4081
4082    #[test]
4083    fn delete_stem_removes_file_and_clears_slot() {
4084        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4085        let mut manifest = Manifest::new();
4086        let mut e = entry("a.flac", AudioFormat::Flac);
4087        e.stems.insert(
4088            "voc".to_owned(),
4089            ArtifactState {
4090                path: "a.stems/voc.mp3".to_owned(),
4091                hash: "vh".to_owned(),
4092            },
4093        );
4094        manifest.insert("a", e);
4095        let plan = Plan {
4096            actions: vec![Action::DeleteStem {
4097                clip_id: "a".to_owned(),
4098                key: "voc".to_owned(),
4099                path: "a.stems/voc.mp3".to_owned(),
4100            }],
4101        };
4102
4103        let outcome = run(
4104            &plan,
4105            &mut manifest,
4106            &[],
4107            &ScriptedHttp::new(),
4108            &fs,
4109            &StubFfmpeg::flac(),
4110            &RecordingClock::new(),
4111            &ExecOptions::default(),
4112        );
4113
4114        assert_eq!(outcome.artifacts_deleted, 1);
4115        assert!(!fs.exists("a.stems/voc.mp3"));
4116        assert!(manifest.get("a").unwrap().stems.is_empty());
4117    }
4118
4119    #[test]
4120    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4121        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4122        // pruned by the end-of-run sweep, so it can never be orphaned.
4123        let fs = MemFs::new()
4124            .with_file("song.flac", b"DATA".to_vec())
4125            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4126        assert!(fs.has_dir("song.stems"));
4127        let mut manifest = Manifest::new();
4128        let mut e = entry("song.flac", AudioFormat::Flac);
4129        e.stems.insert(
4130            "voc".to_owned(),
4131            ArtifactState {
4132                path: "song.stems/voc.mp3".to_owned(),
4133                hash: "vh".to_owned(),
4134            },
4135        );
4136        manifest.insert("a", e);
4137        let plan = Plan {
4138            actions: vec![
4139                Action::Delete {
4140                    path: "song.flac".to_owned(),
4141                    clip_id: "a".to_owned(),
4142                },
4143                Action::DeleteStem {
4144                    clip_id: "a".to_owned(),
4145                    key: "voc".to_owned(),
4146                    path: "song.stems/voc.mp3".to_owned(),
4147                },
4148            ],
4149        };
4150
4151        let outcome = run(
4152            &plan,
4153            &mut manifest,
4154            &[],
4155            &ScriptedHttp::new(),
4156            &fs,
4157            &StubFfmpeg::flac(),
4158            &RecordingClock::new(),
4159            &ExecOptions::default(),
4160        );
4161
4162        assert_eq!(outcome.deleted, 1);
4163        assert_eq!(outcome.artifacts_deleted, 1);
4164        assert!(!fs.exists("song.flac"));
4165        assert!(!fs.exists("song.stems/voc.mp3"));
4166        assert!(
4167            !fs.has_dir("song.stems"),
4168            "the emptied .stems folder is pruned"
4169        );
4170        assert!(manifest.get("a").is_none());
4171    }
4172
4173    #[test]
4174    fn write_stem_mp3_never_issues_a_generation_post() {
4175        // The MP3 stem path is GET-only: writing a stem fetches its CDN url and
4176        // never POSTs, let alone to any generation or WAV-render endpoint.
4177        let mut manifest = Manifest::new();
4178        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4179        let plan = Plan {
4180            actions: vec![Action::WriteStem {
4181                clip_id: "a".to_owned(),
4182                key: "voc".to_owned(),
4183                stem_id: "voc".to_owned(),
4184                path: "a.stems/voc.mp3".to_owned(),
4185                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4186                format: StemFormat::Mp3,
4187                hash: "vh".to_owned(),
4188            }],
4189        };
4190        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem".to_vec()));
4191
4192        run(
4193            &plan,
4194            &mut manifest,
4195            &[],
4196            &http,
4197            &MemFs::new(),
4198            &StubFfmpeg::flac(),
4199            &RecordingClock::new(),
4200            &ExecOptions::default(),
4201        );
4202
4203        assert_eq!(
4204            http.count("stem_task"),
4205            0,
4206            "no generation endpoint is ever hit"
4207        );
4208        assert_eq!(http.count("convert_wav"), 0);
4209        assert_eq!(http.count("/api/gen/"), 0);
4210    }
4211
4212    #[test]
4213    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4214        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4215        // GET over the live page-count + 0-indexed page shape), reconcile them into
4216        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4217        // is GET-only and touches NO `/api/gen/` endpoint at all.
4218        let http = ScriptedHttp::new()
4219            .with_auth()
4220            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4221            .route(
4222                "clip1/stems?page=0",
4223                Reply::json(
4224                    r#"{"stems":[
4225                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4226                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4227                    ]}"#,
4228                ),
4229            )
4230            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
4231            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
4232
4233        // List the existing stems through the client (GET-only, free).
4234        let mut auth = ClerkAuth::new("eyJtoken");
4235        pollster::block_on(auth.authenticate(&http)).unwrap();
4236        let mut client = SunoClient::new(auth, RecordingClock::new());
4237        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4238        assert!(complete);
4239        assert_eq!(stems.len(), 2);
4240        assert_eq!(stems[0].label, "Vocals");
4241
4242        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
4243        let mut manifest = Manifest::new();
4244        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4245        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4246            .iter()
4247            .map(|s| crate::reconcile::DesiredStem {
4248                key: s.id.clone(),
4249                stem_id: s.id.clone(),
4250                path: format!("clip1.stems/{}.mp3", s.id),
4251                source_url: s.url.clone(),
4252                format: StemFormat::Mp3,
4253                hash: crate::art_url_hash(&s.url),
4254            })
4255            .collect();
4256        let d = Desired {
4257            path: "clip1.flac".to_owned(),
4258            stems: Some(desired_stems),
4259            ..desired(clip("clip1"), AudioFormat::Flac)
4260        };
4261        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4262            "clip1".to_owned(),
4263            crate::reconcile::LocalFile {
4264                exists: true,
4265                size: 100,
4266            },
4267        )]
4268        .into_iter()
4269        .collect();
4270        let sources = [crate::reconcile::SourceStatus {
4271            mode: SourceMode::Mirror,
4272            fully_enumerated: true,
4273        }];
4274        let plan =
4275            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4276        assert_eq!(plan.stem_writes(), 2);
4277
4278        let fs = MemFs::new();
4279        let outcome = run(
4280            &plan,
4281            &mut manifest,
4282            std::slice::from_ref(&d),
4283            &http,
4284            &fs,
4285            &StubFfmpeg::flac(),
4286            &RecordingClock::new(),
4287            &ExecOptions::default(),
4288        );
4289
4290        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
4291        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
4292        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
4293        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
4294        // generation, no separation).
4295        assert_eq!(http.count("/api/gen/"), 0);
4296        assert_eq!(http.count("stem_task"), 0);
4297        assert_eq!(http.count("separate"), 0);
4298        assert_eq!(http.count("generate"), 0);
4299        // No stem is ever written as FLAC.
4300        assert!(!fs.exists("clip1.stems/s1.flac"));
4301    }
4302
4303    #[test]
4304    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
4305        // End-to-end #100 path with WAV stems (the default): each stem's lossless
4306        // WAV is rendered through the FREE convert_wav flow and stored RAW as
4307        // `.wav`. The mirror makes NO credit-spending generation POST.
4308        let http = ScriptedHttp::new()
4309            .with_auth()
4310            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4311            .route(
4312                "clip1/stems?page=0",
4313                Reply::json(
4314                    r#"{"stems":[
4315                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4316                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4317                    ]}"#,
4318                ),
4319            )
4320            // Each stem's WAV is already rendered, so wav_file returns the url and
4321            // no convert_wav POST is even needed (still free either way).
4322            .route(
4323                "s1/wav_file/",
4324                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
4325            )
4326            .route(
4327                "s2/wav_file/",
4328                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
4329            )
4330            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
4331            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
4332
4333        let mut auth = ClerkAuth::new("eyJtoken");
4334        pollster::block_on(auth.authenticate(&http)).unwrap();
4335        let mut client = SunoClient::new(auth, RecordingClock::new());
4336        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4337
4338        let mut manifest = Manifest::new();
4339        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4340        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4341            .iter()
4342            .map(|s| crate::reconcile::DesiredStem {
4343                key: s.id.clone(),
4344                stem_id: s.id.clone(),
4345                path: format!("clip1.stems/{}.wav", s.id),
4346                source_url: s.url.clone(),
4347                format: StemFormat::Wav,
4348                hash: crate::art_url_hash(&s.url),
4349            })
4350            .collect();
4351        let d = Desired {
4352            path: "clip1.flac".to_owned(),
4353            stems: Some(desired_stems),
4354            ..desired(clip("clip1"), AudioFormat::Flac)
4355        };
4356        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4357            "clip1".to_owned(),
4358            crate::reconcile::LocalFile {
4359                exists: true,
4360                size: 100,
4361            },
4362        )]
4363        .into_iter()
4364        .collect();
4365        let sources = [crate::reconcile::SourceStatus {
4366            mode: SourceMode::Mirror,
4367            fully_enumerated: true,
4368        }];
4369        let plan =
4370            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4371
4372        let fs = MemFs::new();
4373        let outcome = run(
4374            &plan,
4375            &mut manifest,
4376            std::slice::from_ref(&d),
4377            &http,
4378            &fs,
4379            &StubFfmpeg::flac(),
4380            &RecordingClock::new(),
4381            &small_poll(),
4382        );
4383
4384        assert_eq!(outcome.artifacts_written, 2);
4385        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
4386        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
4387        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
4388        assert!(!fs.exists("clip1.stems/s1.flac"));
4389        // No credit-spending generation/separation endpoint is ever hit.
4390        assert_eq!(http.count("stem_task"), 0);
4391        assert_eq!(http.count("separate"), 0);
4392        assert_eq!(http.count("generate"), 0);
4393    }
4394
4395    #[test]
4396    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
4397        // A clip whose Download fails leaves no manifest entry, so its following
4398        // WriteArtifact must not strand an untracked sidecar: it is skipped with
4399        // no fetch and no write. A following healthy clip still succeeds.
4400        let ca = clip("a");
4401        let plan = Plan {
4402            actions: vec![
4403                Action::Download {
4404                    clip: ca.clone(),
4405                    lineage: LineageContext::own_root(&ca),
4406                    path: "a.mp3".to_owned(),
4407                    format: AudioFormat::Mp3,
4408                },
4409                Action::WriteArtifact {
4410                    kind: ArtifactKind::CoverJpg,
4411                    path: "a/cover.jpg".to_owned(),
4412                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4413                    hash: "h1".to_owned(),
4414                    owner_id: "a".to_owned(),
4415                    content: None,
4416                },
4417                Action::WriteArtifact {
4418                    kind: ArtifactKind::CoverJpg,
4419                    path: "b/cover.jpg".to_owned(),
4420                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4421                    hash: "h2".to_owned(),
4422                    owner_id: "b".to_owned(),
4423                    content: None,
4424                },
4425            ],
4426        };
4427        // The Download's audio 404s (permanent), so no entry for "a" is created.
4428        let http = ScriptedHttp::new()
4429            .route("a.mp3", Reply::status(404))
4430            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4431            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4432        let fs = MemFs::new();
4433        let mut manifest = Manifest::new();
4434        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
4435        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4436
4437        let outcome = run(
4438            &plan,
4439            &mut manifest,
4440            &[],
4441            &http,
4442            &fs,
4443            &StubFfmpeg::flac(),
4444            &RecordingClock::new(),
4445            &ExecOptions::default(),
4446        );
4447
4448        assert_eq!(outcome.status, RunStatus::Completed);
4449        // The audio download is the only failure; the orphan artifact is skipped.
4450        assert_eq!(outcome.failed(), 1);
4451        assert_eq!(outcome.failures[0].clip_id, "a");
4452        assert_eq!(outcome.skipped, 1);
4453        // The orphan sidecar was neither fetched nor written, and left no record.
4454        assert_eq!(http.count("a/large.jpg"), 0);
4455        assert!(!fs.exists("a/cover.jpg"));
4456        assert!(manifest.get("a").is_none());
4457        // The healthy clip's sidecar still succeeded.
4458        assert_eq!(outcome.artifacts_written, 1);
4459        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4460        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4461    }
4462
4463    #[test]
4464    fn write_artifact_transcodes_animated_cover_to_webp() {
4465        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
4466        // port, and writes the transcoded WebP (not the fetched MP4), recording
4467        // the sidecar on the owning entry.
4468        let mut manifest = Manifest::new();
4469        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4470        let plan = Plan {
4471            actions: vec![Action::WriteArtifact {
4472                kind: ArtifactKind::CoverWebp,
4473                path: "a/cover.webp".to_owned(),
4474                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4475                hash: "v1".to_owned(),
4476                owner_id: "a".to_owned(),
4477                content: None,
4478            }],
4479        };
4480        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4481        let fs = MemFs::new();
4482        let ffmpeg = StubFfmpeg::webp();
4483
4484        let outcome = run(
4485            &plan,
4486            &mut manifest,
4487            &[],
4488            &http,
4489            &fs,
4490            &ffmpeg,
4491            &RecordingClock::new(),
4492            &ExecOptions::default(),
4493        );
4494
4495        assert_eq!(outcome.artifacts_written, 1);
4496        assert_eq!(outcome.failed(), 0);
4497        assert_eq!(outcome.status, RunStatus::Completed);
4498        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
4499        assert_eq!(http.count("a/video.mp4"), 1);
4500        let written = fs.read_file("a/cover.webp").unwrap();
4501        assert_ne!(written, b"mp4-bytes");
4502        assert!(written.starts_with(b"RIFF"));
4503        assert_eq!(
4504            manifest.get("a").unwrap().cover_webp,
4505            Some(ArtifactState {
4506                path: "a/cover.webp".to_owned(),
4507                hash: "v1".to_owned(),
4508            })
4509        );
4510    }
4511
4512    #[test]
4513    fn write_artifact_webp_transcode_failure_is_per_clip() {
4514        // A transcode failure is attributed to the owning clip: it is a per-clip
4515        // failure, the run completes, no sidecar is written, and the slot stays
4516        // empty. A healthy static cover in the same run still succeeds.
4517        let mut manifest = Manifest::new();
4518        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4519        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4520        let plan = Plan {
4521            actions: vec![
4522                Action::WriteArtifact {
4523                    kind: ArtifactKind::CoverWebp,
4524                    path: "a/cover.webp".to_owned(),
4525                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4526                    hash: "v1".to_owned(),
4527                    owner_id: "a".to_owned(),
4528                    content: None,
4529                },
4530                Action::WriteArtifact {
4531                    kind: ArtifactKind::CoverJpg,
4532                    path: "b/cover.jpg".to_owned(),
4533                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4534                    hash: "h1".to_owned(),
4535                    owner_id: "b".to_owned(),
4536                    content: None,
4537                },
4538            ],
4539        };
4540        let http = ScriptedHttp::new()
4541            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
4542            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4543        let fs = MemFs::new();
4544
4545        let outcome = run(
4546            &plan,
4547            &mut manifest,
4548            &[],
4549            &http,
4550            &fs,
4551            &StubFfmpeg::failing(),
4552            &RecordingClock::new(),
4553            &ExecOptions::default(),
4554        );
4555
4556        assert_eq!(outcome.status, RunStatus::Completed);
4557        assert_eq!(outcome.failed(), 1);
4558        assert_eq!(outcome.failures[0].clip_id, "a");
4559        // The animated cover failed to transcode: nothing written, slot empty.
4560        assert!(!fs.exists("a/cover.webp"));
4561        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
4562        // The static cover in the same run still succeeded.
4563        assert_eq!(outcome.artifacts_written, 1);
4564        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4565        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4566    }
4567
4568    #[test]
4569    fn write_artifact_uses_configured_webp_settings() {
4570        use std::sync::{Arc, Mutex};
4571
4572        struct RecordingWebpFfmpeg {
4573            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
4574        }
4575
4576        impl Ffmpeg for RecordingWebpFfmpeg {
4577            async fn wav_to_flac(
4578                &self,
4579                _wav: &[u8],
4580            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
4581                Ok(Vec::new())
4582            }
4583
4584            async fn mp4_to_webp(
4585                &self,
4586                _mp4: &[u8],
4587                settings: WebpEncodeSettings,
4588            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
4589                let seen = Arc::clone(&self.seen);
4590                seen.lock().unwrap().push(settings);
4591                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
4592            }
4593        }
4594
4595        let mut manifest = Manifest::new();
4596        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4597        let plan = Plan {
4598            actions: vec![Action::WriteArtifact {
4599                kind: ArtifactKind::CoverWebp,
4600                path: "a/cover.webp".to_owned(),
4601                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4602                hash: "v1".to_owned(),
4603                owner_id: "a".to_owned(),
4604                content: None,
4605            }],
4606        };
4607        let seen = Arc::new(Mutex::new(Vec::new()));
4608        let ffmpeg = RecordingWebpFfmpeg {
4609            seen: Arc::clone(&seen),
4610        };
4611        let opts = ExecOptions {
4612            cover_webp: WebpEncodeSettings {
4613                quality: 88,
4614                max_fps: 12,
4615                max_width: Some(720),
4616                lossless: false,
4617                compression_level: 4,
4618            },
4619            ..ExecOptions::default()
4620        };
4621
4622        let _ = run(
4623            &plan,
4624            &mut manifest,
4625            &[],
4626            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
4627            &MemFs::new(),
4628            &ffmpeg,
4629            &RecordingClock::new(),
4630            &opts,
4631        );
4632
4633        assert_eq!(
4634            seen.lock().unwrap().as_slice(),
4635            &[WebpEncodeSettings {
4636                quality: 88,
4637                max_fps: 12,
4638                max_width: Some(720),
4639                lossless: false,
4640                compression_level: 4,
4641            }]
4642        );
4643    }
4644
4645    // ── Phase 8: folder art routes to the album store ───────────────
4646
4647    #[test]
4648    fn folder_jpg_write_records_album_state_and_skips_manifest() {
4649        // Folder art is owned by the album root id, not a manifest clip: it
4650        // writes even with an empty manifest and records on the album store.
4651        let mut manifest = Manifest::new();
4652        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4653        let plan = Plan {
4654            actions: vec![Action::WriteArtifact {
4655                kind: ArtifactKind::FolderJpg,
4656                path: "creator/album/folder.jpg".to_owned(),
4657                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
4658                hash: "jh".to_owned(),
4659                owner_id: "root".to_owned(),
4660                content: None,
4661            }],
4662        };
4663        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
4664        let fs = MemFs::new();
4665
4666        let outcome = run_with_albums(
4667            &plan,
4668            &mut manifest,
4669            &mut albums,
4670            &[],
4671            &http,
4672            &fs,
4673            &StubFfmpeg::flac(),
4674            &RecordingClock::new(),
4675            &ExecOptions::default(),
4676        );
4677
4678        assert_eq!(outcome.artifacts_written, 1);
4679        assert_eq!(outcome.status, RunStatus::Completed);
4680        assert_eq!(
4681            fs.read_file("creator/album/folder.jpg").unwrap(),
4682            b"folder-jpg"
4683        );
4684        assert_eq!(
4685            albums.get("root").unwrap().folder_jpg,
4686            Some(ArtifactState {
4687                path: "creator/album/folder.jpg".to_owned(),
4688                hash: "jh".to_owned(),
4689            })
4690        );
4691        assert!(manifest.get("root").is_none());
4692    }
4693
4694    #[test]
4695    fn folder_webp_write_transcodes_and_records_album_state() {
4696        let mut manifest = Manifest::new();
4697        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4698        let plan = Plan {
4699            actions: vec![Action::WriteArtifact {
4700                kind: ArtifactKind::FolderWebp,
4701                path: "creator/album/cover.webp".to_owned(),
4702                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4703                hash: "wh".to_owned(),
4704                owner_id: "root".to_owned(),
4705                content: None,
4706            }],
4707        };
4708        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4709        let fs = MemFs::new();
4710
4711        let outcome = run_with_albums(
4712            &plan,
4713            &mut manifest,
4714            &mut albums,
4715            &[],
4716            &http,
4717            &fs,
4718            &StubFfmpeg::webp(),
4719            &RecordingClock::new(),
4720            &ExecOptions::default(),
4721        );
4722
4723        assert_eq!(outcome.artifacts_written, 1);
4724        assert_eq!(outcome.failed(), 0);
4725        // The MP4 was transcoded to WebP, not written verbatim.
4726        let written = fs.read_file("creator/album/cover.webp").unwrap();
4727        assert_ne!(written, b"mp4-bytes");
4728        assert!(written.starts_with(b"RIFF"));
4729        assert_eq!(
4730            albums.get("root").unwrap().folder_webp,
4731            Some(ArtifactState {
4732                path: "creator/album/cover.webp".to_owned(),
4733                hash: "wh".to_owned(),
4734            })
4735        );
4736    }
4737
4738    #[test]
4739    fn folder_mp4_write_keeps_the_source_verbatim() {
4740        let mut manifest = Manifest::new();
4741        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4742        let plan = Plan {
4743            actions: vec![Action::WriteArtifact {
4744                kind: ArtifactKind::FolderMp4,
4745                path: "creator/album/cover.mp4".to_owned(),
4746                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4747                hash: "mh".to_owned(),
4748                owner_id: "root".to_owned(),
4749                content: None,
4750            }],
4751        };
4752        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4753        let fs = MemFs::new();
4754
4755        let outcome = run_with_albums(
4756            &plan,
4757            &mut manifest,
4758            &mut albums,
4759            &[],
4760            &http,
4761            &fs,
4762            &StubFfmpeg::webp(),
4763            &RecordingClock::new(),
4764            &ExecOptions::default(),
4765        );
4766
4767        assert_eq!(outcome.artifacts_written, 1);
4768        assert_eq!(outcome.failed(), 0);
4769        // The raw MP4 is written byte-for-byte, never transcoded.
4770        assert_eq!(
4771            fs.read_file("creator/album/cover.mp4").unwrap(),
4772            b"mp4-bytes"
4773        );
4774        assert_eq!(
4775            albums.get("root").unwrap().folder_mp4,
4776            Some(ArtifactState {
4777                path: "creator/album/cover.mp4".to_owned(),
4778                hash: "mh".to_owned(),
4779            })
4780        );
4781    }
4782
4783    #[test]
4784    fn both_folder_covers_fetch_the_video_cover_once() {
4785        let mut manifest = Manifest::new();
4786        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4787        // `both` retention keeps cover.webp (transcoded) and cover.mp4 (raw) from
4788        // the one video_cover_url. FolderWebp sorts first and caches the fetched
4789        // source; FolderMp4 drains it, so the source is fetched exactly once.
4790        let plan = Plan {
4791            actions: vec![
4792                Action::WriteArtifact {
4793                    kind: ArtifactKind::FolderWebp,
4794                    path: "creator/album/cover.webp".to_owned(),
4795                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4796                    hash: "wh".to_owned(),
4797                    owner_id: "root".to_owned(),
4798                    content: None,
4799                },
4800                Action::WriteArtifact {
4801                    kind: ArtifactKind::FolderMp4,
4802                    path: "creator/album/cover.mp4".to_owned(),
4803                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4804                    hash: "mh".to_owned(),
4805                    owner_id: "root".to_owned(),
4806                    content: None,
4807                },
4808            ],
4809        };
4810        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4811        let fs = MemFs::new();
4812
4813        let outcome = run_with_albums(
4814            &plan,
4815            &mut manifest,
4816            &mut albums,
4817            &[],
4818            &http,
4819            &fs,
4820            &StubFfmpeg::webp(),
4821            &RecordingClock::new(),
4822            &ExecOptions::default(),
4823        );
4824
4825        assert_eq!(outcome.artifacts_written, 2);
4826        assert_eq!(outcome.failed(), 0);
4827        // Fetched exactly once despite two artifacts consuming it (#90 / #89).
4828        assert_eq!(http.count("root/video.mp4"), 1);
4829        // The webp is transcoded; the mp4 is the raw source verbatim.
4830        assert!(
4831            fs.read_file("creator/album/cover.webp")
4832                .unwrap()
4833                .starts_with(b"RIFF")
4834        );
4835        assert_eq!(
4836            fs.read_file("creator/album/cover.mp4").unwrap(),
4837            b"mp4-bytes"
4838        );
4839    }
4840
4841    #[test]
4842    fn folder_art_delete_clears_album_state() {
4843        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
4844        let mut manifest = Manifest::new();
4845        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4846        albums.insert(
4847            "root".to_owned(),
4848            AlbumArt {
4849                folder_jpg: Some(ArtifactState {
4850                    path: "creator/album/folder.jpg".to_owned(),
4851                    hash: "jh".to_owned(),
4852                }),
4853                folder_webp: None,
4854                folder_mp4: None,
4855            },
4856        );
4857        let plan = Plan {
4858            actions: vec![Action::DeleteArtifact {
4859                kind: ArtifactKind::FolderJpg,
4860                path: "creator/album/folder.jpg".to_owned(),
4861                owner_id: "root".to_owned(),
4862            }],
4863        };
4864
4865        let outcome = run_with_albums(
4866            &plan,
4867            &mut manifest,
4868            &mut albums,
4869            &[],
4870            &ScriptedHttp::new(),
4871            &fs,
4872            &StubFfmpeg::flac(),
4873            &RecordingClock::new(),
4874            &ExecOptions::default(),
4875        );
4876
4877        assert_eq!(outcome.artifacts_deleted, 1);
4878        assert!(!fs.exists("creator/album/folder.jpg"));
4879        // The album row had only the one kind, so it is pruned entirely.
4880        assert!(!albums.contains_key("root"));
4881    }
4882
4883    // ── Phase 9: playlist artifacts ─────────────────────────────────
4884
4885    #[test]
4886    fn playlist_write_uses_inline_content_and_records_state() {
4887        // A playlist body is generated, carried inline. With an empty manifest
4888        // and NO http routes, the write still succeeds — proving it skipped the
4889        // network — and records the playlist store keyed by the playlist id.
4890        let mut manifest = Manifest::new();
4891        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4892        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
4893        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
4894        let plan = Plan {
4895            actions: vec![Action::WriteArtifact {
4896                kind: ArtifactKind::Playlist,
4897                path: "Road Trip.m3u8".to_owned(),
4898                source_url: String::new(),
4899                hash: "ph1".to_owned(),
4900                owner_id: "pl1".to_owned(),
4901                content: Some(body.to_owned()),
4902            }],
4903        };
4904        let fs = MemFs::new();
4905
4906        let outcome = run_full(
4907            &plan,
4908            &mut manifest,
4909            &mut albums,
4910            &mut playlists,
4911            &[],
4912            &ScriptedHttp::new(),
4913            &fs,
4914            &StubFfmpeg::flac(),
4915            &RecordingClock::new(),
4916            &ExecOptions::default(),
4917        );
4918
4919        assert_eq!(outcome.artifacts_written, 1);
4920        assert_eq!(outcome.failed(), 0);
4921        // The exact inline bytes were written, verbatim.
4922        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
4923        assert_eq!(
4924            playlists.get("pl1"),
4925            Some(&PlaylistState {
4926                name: "Road Trip".to_owned(),
4927                path: "Road Trip.m3u8".to_owned(),
4928                hash: "ph1".to_owned(),
4929            })
4930        );
4931    }
4932
4933    #[test]
4934    fn playlist_delete_removes_file_and_clears_state() {
4935        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
4936        let mut manifest = Manifest::new();
4937        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4938        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
4939        playlists.insert(
4940            "pl1".to_owned(),
4941            PlaylistState {
4942                name: "Old".to_owned(),
4943                path: "Old.m3u8".to_owned(),
4944                hash: "ph1".to_owned(),
4945            },
4946        );
4947        let plan = Plan {
4948            actions: vec![Action::DeleteArtifact {
4949                kind: ArtifactKind::Playlist,
4950                path: "Old.m3u8".to_owned(),
4951                owner_id: "pl1".to_owned(),
4952            }],
4953        };
4954
4955        let outcome = run_full(
4956            &plan,
4957            &mut manifest,
4958            &mut albums,
4959            &mut playlists,
4960            &[],
4961            &ScriptedHttp::new(),
4962            &fs,
4963            &StubFfmpeg::flac(),
4964            &RecordingClock::new(),
4965            &ExecOptions::default(),
4966        );
4967
4968        assert_eq!(outcome.artifacts_deleted, 1);
4969        assert!(!fs.exists("Old.m3u8"));
4970        assert!(
4971            !playlists.contains_key("pl1"),
4972            "the playlist row is cleared on delete"
4973        );
4974    }
4975
4976    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
4977
4978    #[test]
4979    fn rename_move_relocates_cover_and_prunes_old_album() {
4980        // A title/album change moves the audio (Rename) and re-emits the cover
4981        // at the NEW path. The old cover must be removed and the now-empty old
4982        // album directory pruned, leaving no orphan sidecar and no ghost dir.
4983        let mut manifest = Manifest::new();
4984        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
4985        e.cover_jpg = Some(ArtifactState {
4986            path: "Creator/AlbumA/cover.jpg".to_owned(),
4987            hash: "h1".to_owned(),
4988        });
4989        manifest.insert("a", e);
4990        let fs = MemFs::new()
4991            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
4992            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
4993        let plan = Plan {
4994            actions: vec![
4995                Action::Rename {
4996                    from: "Creator/AlbumA/song.flac".to_owned(),
4997                    to: "Creator/AlbumB/song.flac".to_owned(),
4998                },
4999                Action::WriteArtifact {
5000                    kind: ArtifactKind::CoverJpg,
5001                    path: "Creator/AlbumB/cover.jpg".to_owned(),
5002                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5003                    hash: "h1".to_owned(),
5004                    owner_id: "a".to_owned(),
5005                    content: None,
5006                },
5007            ],
5008        };
5009        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
5010
5011        let outcome = run(
5012            &plan,
5013            &mut manifest,
5014            &[],
5015            &http,
5016            &fs,
5017            &StubFfmpeg::flac(),
5018            &RecordingClock::new(),
5019            &ExecOptions::default(),
5020        );
5021
5022        assert_eq!(outcome.failed(), 0);
5023        // Audio moved, the new cover was written, the old cover removed.
5024        assert!(fs.exists("Creator/AlbumB/song.flac"));
5025        assert_eq!(
5026            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
5027            b"new-jpg"
5028        );
5029        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
5030        assert!(!fs.exists("Creator/AlbumA/song.flac"));
5031        // The manifest cover slot now points at the new path.
5032        assert_eq!(
5033            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5034            "Creator/AlbumB/cover.jpg"
5035        );
5036        // The emptied old album directory is pruned; the new one survives.
5037        assert!(!fs.has_dir("Creator/AlbumA"));
5038        assert!(fs.has_dir("Creator/AlbumB"));
5039    }
5040
5041    #[test]
5042    fn rename_move_relocates_folder_art_and_prunes_old_album() {
5043        // An album rename moves folder.jpg: the old file is removed, the album
5044        // store slot advanced to the new path, and the emptied dir pruned.
5045        let mut manifest = Manifest::new();
5046        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5047        albums.insert(
5048            "root".to_owned(),
5049            AlbumArt {
5050                folder_jpg: Some(ArtifactState {
5051                    path: "Creator/AlbumA/folder.jpg".to_owned(),
5052                    hash: "jh".to_owned(),
5053                }),
5054                folder_webp: None,
5055                folder_mp4: None,
5056            },
5057        );
5058        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
5059        let plan = Plan {
5060            actions: vec![Action::WriteArtifact {
5061                kind: ArtifactKind::FolderJpg,
5062                path: "Creator/AlbumB/folder.jpg".to_owned(),
5063                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5064                hash: "jh".to_owned(),
5065                owner_id: "root".to_owned(),
5066                content: None,
5067            }],
5068        };
5069        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
5070
5071        let outcome = run_with_albums(
5072            &plan,
5073            &mut manifest,
5074            &mut albums,
5075            &[],
5076            &http,
5077            &fs,
5078            &StubFfmpeg::flac(),
5079            &RecordingClock::new(),
5080            &ExecOptions::default(),
5081        );
5082
5083        assert_eq!(outcome.failed(), 0);
5084        assert_eq!(
5085            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
5086            b"new-folder"
5087        );
5088        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
5089        assert_eq!(
5090            albums
5091                .get("root")
5092                .unwrap()
5093                .folder_jpg
5094                .as_ref()
5095                .unwrap()
5096                .path,
5097            "Creator/AlbumB/folder.jpg"
5098        );
5099        assert!(!fs.has_dir("Creator/AlbumA"));
5100        assert!(fs.has_dir("Creator/AlbumB"));
5101    }
5102
5103    #[test]
5104    fn prune_empty_dirs_removes_only_empty_dirs() {
5105        // A direct exercise of the prune port's safety guarantees on a mixed
5106        // tree: nested empties go, anything holding a file (hidden ones too)
5107        // stays, and no file is touched.
5108        let fs = MemFs::new()
5109            .with_file("keep/full/song.flac", b"x".to_vec())
5110            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
5111            .with_dir("empty/leaf")
5112            .with_dir("nested/a/b/c");
5113
5114        fs.prune_empty_dirs("").unwrap();
5115
5116        // Every empty directory, however deeply nested, is pruned bottom-up.
5117        for gone in [
5118            "empty",
5119            "empty/leaf",
5120            "nested",
5121            "nested/a",
5122            "nested/a/b",
5123            "nested/a/b/c",
5124        ] {
5125            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
5126        }
5127        // A directory holding any file — including only a hidden dotfile — stays.
5128        assert!(fs.has_dir("keep"));
5129        assert!(fs.has_dir("keep/full"));
5130        assert!(fs.has_dir("hidden"));
5131        // No file was touched.
5132        assert!(fs.exists("keep/full/song.flac"));
5133        assert!(fs.exists("hidden/.suno-manifest.json"));
5134    }
5135
5136    #[test]
5137    fn prune_empty_dirs_never_removes_the_named_root() {
5138        // Pruning under a named root clears its empty children but keeps the
5139        // root itself, even when the root is now empty.
5140        let fs = MemFs::new().with_dir("empty/leaf");
5141        fs.prune_empty_dirs("empty").unwrap();
5142        assert!(fs.has_dir("empty"), "the named root is never removed");
5143        assert!(!fs.has_dir("empty/leaf"));
5144    }
5145
5146    #[test]
5147    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
5148        // If removing the old sidecar fails, the write is a per-clip failure
5149        // that never aborts the run and does NOT advance the state slot, so the
5150        // next identical run re-attempts the cleanup and the tree converges.
5151        let mut manifest = Manifest::new();
5152        let mut e = entry("a.flac", AudioFormat::Flac);
5153        e.cover_jpg = Some(ArtifactState {
5154            path: "AlbumA/cover.jpg".to_owned(),
5155            hash: "h1".to_owned(),
5156        });
5157        manifest.insert("a", e);
5158        let fs = MemFs::new()
5159            .with_file("a.flac", b"AUDIO".to_vec())
5160            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5161        let plan = Plan {
5162            actions: vec![Action::WriteArtifact {
5163                kind: ArtifactKind::CoverJpg,
5164                path: "AlbumB/cover.jpg".to_owned(),
5165                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5166                hash: "h1".to_owned(),
5167                owner_id: "a".to_owned(),
5168                content: None,
5169            }],
5170        };
5171        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5172
5173        // Run 1: the old-cover remove is forced to fail.
5174        fs.arm_fail_remove("AlbumA/cover.jpg");
5175        let first = run(
5176            &plan,
5177            &mut manifest,
5178            &[],
5179            &http,
5180            &fs,
5181            &StubFfmpeg::flac(),
5182            &RecordingClock::new(),
5183            &ExecOptions::default(),
5184        );
5185        assert_eq!(
5186            first.status,
5187            RunStatus::Completed,
5188            "a remove failure never aborts the run"
5189        );
5190        assert_eq!(first.failed(), 1);
5191        // The new cover is written but the old one lingers and the slot is stale.
5192        assert!(fs.exists("AlbumB/cover.jpg"));
5193        assert!(fs.exists("AlbumA/cover.jpg"));
5194        assert_eq!(
5195            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5196            "AlbumA/cover.jpg"
5197        );
5198        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5199
5200        // Run 2: the same plan re-runs with the fault cleared and converges.
5201        fs.disarm_fail_remove("AlbumA/cover.jpg");
5202        let second = run(
5203            &plan,
5204            &mut manifest,
5205            &[],
5206            &http,
5207            &fs,
5208            &StubFfmpeg::flac(),
5209            &RecordingClock::new(),
5210            &ExecOptions::default(),
5211        );
5212        assert_eq!(second.failed(), 0);
5213        assert!(fs.exists("AlbumB/cover.jpg"));
5214        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5215        assert_eq!(
5216            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5217            "AlbumB/cover.jpg"
5218        );
5219        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5220    }
5221
5222    #[test]
5223    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
5224        // The idempotent case: a content-only cover rewrite (hash drift, path
5225        // unchanged) attempts no remove and prunes no live directory. A remove
5226        // failure is armed on the cover path, so any spurious remove would
5227        // surface as a failure — none does.
5228        let mut manifest = Manifest::new();
5229        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
5230        e.cover_jpg = Some(ArtifactState {
5231            path: "Album/cover.jpg".to_owned(),
5232            hash: "h1".to_owned(),
5233        });
5234        manifest.insert("a", e);
5235        let fs = MemFs::new()
5236            .with_file("Album/a.mp3", b"AUDIO".to_vec())
5237            .with_file("Album/cover.jpg", b"old".to_vec());
5238        fs.arm_fail_remove("Album/cover.jpg");
5239        let plan = Plan {
5240            actions: vec![Action::WriteArtifact {
5241                kind: ArtifactKind::CoverJpg,
5242                path: "Album/cover.jpg".to_owned(),
5243                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5244                hash: "h2".to_owned(),
5245                owner_id: "a".to_owned(),
5246                content: None,
5247            }],
5248        };
5249        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5250
5251        let outcome = run(
5252            &plan,
5253            &mut manifest,
5254            &[],
5255            &http,
5256            &fs,
5257            &StubFfmpeg::flac(),
5258            &RecordingClock::new(),
5259            &ExecOptions::default(),
5260        );
5261
5262        assert_eq!(
5263            outcome.failed(),
5264            0,
5265            "no remove is attempted, so the armed failure never fires"
5266        );
5267        assert_eq!(outcome.artifacts_written, 1);
5268        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
5269        assert_eq!(
5270            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
5271            "h2"
5272        );
5273        // The live directory is untouched by prune.
5274        assert!(fs.has_dir("Album"));
5275    }
5276
5277    // ── Concurrency (issue #22) ─────────────────────────────────────
5278
5279    mod concurrency {
5280        use super::*;
5281        use crate::ffmpeg::FfmpegError;
5282        use crate::fs::{FileStat, FsError};
5283        use crate::http::{HttpRequest, TransportError};
5284        use std::future::Future;
5285        use std::pin::Pin;
5286        use std::sync::Arc;
5287        use std::sync::atomic::{AtomicUsize, Ordering};
5288        use std::task::{Context, Poll};
5289
5290        /// A future that pends exactly once before resolving, waking itself so a
5291        /// single-threaded executor re-polls. It forces the [`Http`] port to
5292        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
5293        /// each in-flight request and the true overlap becomes observable.
5294        #[derive(Default)]
5295        struct YieldOnce {
5296            yielded: bool,
5297        }
5298
5299        impl Future for YieldOnce {
5300            type Output = ();
5301            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
5302                if self.yielded {
5303                    Poll::Ready(())
5304                } else {
5305                    self.yielded = true;
5306                    cx.waker().wake_by_ref();
5307                    Poll::Pending
5308                }
5309            }
5310        }
5311
5312        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
5313        /// number of concurrently in-flight requests. Each `send` bumps a live
5314        /// counter, yields once (so peers can start), then delegates.
5315        struct GatedHttp {
5316            inner: ScriptedHttp,
5317            inflight: Arc<AtomicUsize>,
5318            peak: Arc<AtomicUsize>,
5319        }
5320
5321        impl GatedHttp {
5322            fn new(inner: ScriptedHttp) -> Self {
5323                Self {
5324                    inner,
5325                    inflight: Arc::new(AtomicUsize::new(0)),
5326                    peak: Arc::new(AtomicUsize::new(0)),
5327                }
5328            }
5329
5330            fn peak(&self) -> usize {
5331                self.peak.load(Ordering::SeqCst)
5332            }
5333        }
5334
5335        impl Http for GatedHttp {
5336            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
5337                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
5338                self.peak.fetch_max(now, Ordering::SeqCst);
5339                YieldOnce::default().await;
5340                let out = self.inner.send(request).await;
5341                self.inflight.fetch_sub(1, Ordering::SeqCst);
5342                out
5343            }
5344        }
5345
5346        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
5347            let c = clip(id);
5348            let d = desired(c.clone(), format);
5349            let action = Action::Download {
5350                clip: c.clone(),
5351                lineage: LineageContext::own_root(&c),
5352                path: d.path.clone(),
5353                format,
5354            };
5355            (c, d, action)
5356        }
5357
5358        fn opts_with(concurrency: u32) -> ExecOptions {
5359            ExecOptions {
5360                concurrency,
5361                ..small_poll()
5362            }
5363        }
5364
5365        #[test]
5366        fn concurrency_never_exceeds_the_configured_bound() {
5367            let count = 6;
5368            let concurrency = 3;
5369            let mut scripted = ScriptedHttp::new().with_auth();
5370            let mut actions = Vec::new();
5371            let mut desireds = Vec::new();
5372            for i in 0..count {
5373                let id = format!("c{i}");
5374                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5375                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5376                actions.push(action);
5377                desireds.push(d);
5378            }
5379            let http = GatedHttp::new(scripted);
5380            let fs = MemFs::new();
5381            let plan = Plan { actions };
5382            let mut manifest = Manifest::new();
5383
5384            let outcome = run_gated_fs(
5385                &plan,
5386                &mut manifest,
5387                &desireds,
5388                &http,
5389                &fs,
5390                &opts_with(concurrency),
5391            );
5392
5393            assert_eq!(outcome.downloaded, count);
5394            assert!(
5395                http.peak() <= concurrency as usize,
5396                "peak {} exceeded the bound {concurrency}",
5397                http.peak()
5398            );
5399            assert_eq!(
5400                http.peak(),
5401                concurrency as usize,
5402                "expected the run to saturate the bound"
5403            );
5404        }
5405
5406        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
5407        /// outcome. The client is built here so the limiter can be inspected by
5408        /// the caller-facing variant below.
5409        fn run_gated_fs(
5410            plan: &Plan,
5411            manifest: &mut Manifest,
5412            desired: &[Desired],
5413            http: &GatedHttp,
5414            fs: &MemFs,
5415            opts: &ExecOptions,
5416        ) -> ExecOutcome {
5417            let ffmpeg = StubFfmpeg::flac();
5418            let clock = RecordingClock::new();
5419            let mut albums = BTreeMap::new();
5420            let mut playlists = BTreeMap::new();
5421            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5422            pollster::block_on(execute(
5423                plan,
5424                manifest,
5425                &mut albums,
5426                &mut playlists,
5427                desired,
5428                &HashMap::new(),
5429                Ports {
5430                    client: &mut client,
5431                    http,
5432                    fs,
5433                    ffmpeg: &ffmpeg,
5434                    clock: &clock,
5435                },
5436                opts,
5437            ))
5438        }
5439
5440        #[test]
5441        fn a_failing_clip_does_not_abort_the_others() {
5442            let mut scripted = ScriptedHttp::new().with_auth();
5443            scripted = scripted
5444                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
5445                .route("bad.mp3", Reply::status(404))
5446                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
5447            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
5448            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
5449            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
5450            let http = GatedHttp::new(scripted);
5451            let fs = MemFs::new();
5452            let plan = Plan {
5453                actions: vec![a1, a2, a3],
5454            };
5455            let mut manifest = Manifest::new();
5456
5457            let outcome = run_gated_fs(
5458                &plan,
5459                &mut manifest,
5460                &[d1, d2, d3],
5461                &http,
5462                &fs,
5463                &opts_with(3),
5464            );
5465
5466            assert_eq!(outcome.downloaded, 2);
5467            assert_eq!(outcome.failed(), 1);
5468            assert_eq!(outcome.status, RunStatus::Completed);
5469            assert_eq!(outcome.failures[0].clip_id, "bad");
5470            assert!(manifest.get("ok1").is_some());
5471            assert!(manifest.get("ok2").is_some());
5472            assert!(manifest.get("bad").is_none());
5473        }
5474
5475        #[test]
5476        fn outcome_is_identical_across_concurrency_levels() {
5477            // A plan mixing successful and failing downloads with serial phase-2
5478            // actions (a skip and a delete), so both phases contribute.
5479            fn build() -> (Plan, Vec<Desired>) {
5480                let mut actions = Vec::new();
5481                let mut desireds = Vec::new();
5482                for id in ["a", "b", "c", "d"] {
5483                    let (_c, d, action) = download(id, AudioFormat::Mp3);
5484                    actions.push(action);
5485                    desireds.push(d);
5486                }
5487                // A failing download in the middle of the audio set.
5488                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
5489                actions.insert(2, ae);
5490                desireds.push(de);
5491                // Phase-2 actions.
5492                actions.push(Action::Skip {
5493                    clip_id: "gone".to_owned(),
5494                });
5495                actions.push(Action::Delete {
5496                    path: "old.mp3".to_owned(),
5497                    clip_id: "old".to_owned(),
5498                });
5499                (Plan { actions }, desireds)
5500            }
5501
5502            fn http() -> ScriptedHttp {
5503                ScriptedHttp::new()
5504                    .with_auth()
5505                    .route("a.mp3", Reply::ok(b"a".to_vec()))
5506                    .route("b.mp3", Reply::ok(b"b".to_vec()))
5507                    .route("c.mp3", Reply::ok(b"c".to_vec()))
5508                    .route("d.mp3", Reply::ok(b"d".to_vec()))
5509                    .route("fail.mp3", Reply::status(404))
5510            }
5511
5512            fn seed_manifest() -> Manifest {
5513                let mut m = Manifest::new();
5514                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
5515                m
5516            }
5517
5518            let (plan, desireds) = build();
5519
5520            let mut m1 = seed_manifest();
5521            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
5522            let out1 = run_gated_fs(
5523                &plan,
5524                &mut m1,
5525                &desireds,
5526                &GatedHttp::new(http()),
5527                &fs1,
5528                &opts_with(1),
5529            );
5530
5531            let mut m8 = seed_manifest();
5532            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
5533            let out8 = run_gated_fs(
5534                &plan,
5535                &mut m8,
5536                &desireds,
5537                &GatedHttp::new(http()),
5538                &fs8,
5539                &opts_with(8),
5540            );
5541
5542            assert_eq!(out1, out8, "outcome must not depend on concurrency");
5543            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
5544            assert_eq!(out8.downloaded, 4);
5545            assert_eq!(out8.deleted, 1);
5546            assert_eq!(out8.skipped, 1);
5547            assert_eq!(out8.failed(), 1);
5548        }
5549
5550        #[test]
5551        fn a_systemic_disk_full_aborts_promptly() {
5552            let count = 8;
5553            let concurrency = 2;
5554            let mut scripted = ScriptedHttp::new().with_auth();
5555            let mut actions = Vec::new();
5556            let mut desireds = Vec::new();
5557            for i in 0..count {
5558                let id = format!("d{i}");
5559                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5560                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5561                actions.push(action);
5562                desireds.push(d);
5563            }
5564            // The very first clip's write hits ENOSPC, a systemic failure.
5565            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
5566            let http = GatedHttp::new(scripted);
5567            let plan = Plan { actions };
5568            let mut manifest = Manifest::new();
5569
5570            let outcome = run_gated_fs(
5571                &plan,
5572                &mut manifest,
5573                &desireds,
5574                &http,
5575                &fs,
5576                &opts_with(concurrency),
5577            );
5578
5579            assert_eq!(outcome.status, RunStatus::DiskFull);
5580            assert!(
5581                outcome.downloaded < count,
5582                "a systemic abort must stop remaining work, downloaded {}",
5583                outcome.downloaded
5584            );
5585        }
5586
5587        #[test]
5588        fn limiter_records_a_rate_limit_under_concurrent_calls() {
5589            // Three concurrent FLAC renders; exactly one clip is throttled once
5590            // on its wav_file read. The shared limiter must record that single
5591            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
5592            // the mutex keeps the AIMD state correct under concurrency.
5593            let scripted = ScriptedHttp::new()
5594                .with_auth()
5595                .route_seq(
5596                    "/gen/x/wav_file/",
5597                    vec![
5598                        Reply::status(429),
5599                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
5600                    ],
5601                )
5602                .route(
5603                    "/gen/y/wav_file/",
5604                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
5605                )
5606                .route(
5607                    "/gen/z/wav_file/",
5608                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
5609                )
5610                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
5611                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
5612                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
5613
5614            let mut actions = Vec::new();
5615            let mut desireds = Vec::new();
5616            for id in ["x", "y", "z"] {
5617                let (_c, d, action) = download(id, AudioFormat::Flac);
5618                actions.push(action);
5619                desireds.push(d);
5620            }
5621            let plan = Plan { actions };
5622            let fs = MemFs::new();
5623            let ffmpeg = StubFfmpeg::flac();
5624            let clock = RecordingClock::new();
5625            let mut albums = BTreeMap::new();
5626            let mut playlists = BTreeMap::new();
5627            let mut manifest = Manifest::new();
5628            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5629
5630            let outcome = pollster::block_on(execute(
5631                &plan,
5632                &mut manifest,
5633                &mut albums,
5634                &mut playlists,
5635                &desireds,
5636                &HashMap::new(),
5637                Ports {
5638                    client: &mut client,
5639                    http: &scripted,
5640                    fs: &fs,
5641                    ffmpeg: &ffmpeg,
5642                    clock: &clock,
5643                },
5644                &opts_with(3),
5645            ));
5646
5647            assert_eq!(outcome.downloaded, 3);
5648            assert_eq!(outcome.failed(), 0);
5649            assert!(
5650                (client.limiter_rate() - 1.0).abs() < 1e-9,
5651                "one 429 must halve the rate to 1.0, got {}",
5652                client.limiter_rate()
5653            );
5654        }
5655
5656        #[test]
5657        fn a_download_is_committed_in_plan_order_around_a_rename() {
5658            // Plan order: rename "orig" away from shared.mp3 first, then download
5659            // a new clip into shared.mp3. A parallel executor that performed the
5660            // download's destination write off plan order would write shared.mp3
5661            // before the rename ran, letting the rename carry those fresh bytes
5662            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
5663            // Committing every destination effect serially in plan order keeps
5664            // moved.mp3 = the original and shared.mp3 = the new download.
5665            let c_new = clip("new");
5666            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
5667            d_new.path = "shared.mp3".to_owned();
5668            let plan = Plan {
5669                actions: vec![
5670                    Action::Rename {
5671                        from: "shared.mp3".to_owned(),
5672                        to: "moved.mp3".to_owned(),
5673                    },
5674                    Action::Download {
5675                        clip: c_new.clone(),
5676                        lineage: LineageContext::own_root(&c_new),
5677                        path: "shared.mp3".to_owned(),
5678                        format: AudioFormat::Mp3,
5679                    },
5680                ],
5681            };
5682            let scripted = ScriptedHttp::new()
5683                .with_auth()
5684                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
5685            let http = GatedHttp::new(scripted);
5686            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
5687            let mut manifest = Manifest::new();
5688            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
5689
5690            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
5691
5692            assert_eq!(outcome.renamed, 1);
5693            assert_eq!(outcome.downloaded, 1);
5694            assert_eq!(
5695                fs.read_file("moved.mp3").as_deref(),
5696                Some(&b"ORIGINAL"[..]),
5697                "the rename must carry the original bytes, untouched by the download"
5698            );
5699            let landed = fs.read_file("shared.mp3").expect("new download must land");
5700            assert_ne!(
5701                landed, b"ORIGINAL",
5702                "the new download must replace the moved original, not corrupt it"
5703            );
5704            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
5705            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
5706        }
5707
5708        #[test]
5709        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
5710            // A systemic disk-full abort strikes the download committed before the
5711            // reformat. Because the reformat's slow render is side-effect-free and
5712            // its destination write + old-file removal only happen in the serial
5713            // commit (which the abort skips), the old file survives and the
5714            // manifest still points at it: no removed-but-referenced file.
5715            let boom = clip("boom");
5716            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
5717            d_boom.path = "boom.mp3".to_owned();
5718            let reformer = clip("r");
5719            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
5720            let plan = Plan {
5721                actions: vec![
5722                    Action::Download {
5723                        clip: boom.clone(),
5724                        lineage: LineageContext::own_root(&boom),
5725                        path: "boom.mp3".to_owned(),
5726                        format: AudioFormat::Mp3,
5727                    },
5728                    Action::Reformat {
5729                        clip: reformer.clone(),
5730                        path: "r_new.mp3".to_owned(),
5731                        from_path: "r_old.flac".to_owned(),
5732                        from: AudioFormat::Flac,
5733                        to: AudioFormat::Mp3,
5734                    },
5735                ],
5736            };
5737            let scripted = ScriptedHttp::new()
5738                .with_auth()
5739                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
5740                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
5741            let http = GatedHttp::new(scripted);
5742            // The download's write hits ENOSPC, a systemic abort.
5743            let fs = MemFs::new()
5744                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
5745                .fail_write_out_of_space("boom.mp3");
5746            let mut manifest = Manifest::new();
5747            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
5748
5749            let outcome = run_gated_fs(
5750                &plan,
5751                &mut manifest,
5752                &[d_boom, d_reformer],
5753                &http,
5754                &fs,
5755                &opts_with(4),
5756            );
5757
5758            assert_eq!(outcome.status, RunStatus::DiskFull);
5759            assert!(
5760                fs.exists("r_old.flac"),
5761                "the old file must survive the abort"
5762            );
5763            assert!(
5764                !fs.exists("r_new.mp3"),
5765                "no reformatted file may be written"
5766            );
5767            let still = manifest.get("r").expect("the manifest must still track r");
5768            assert_eq!(
5769                still.path, "r_old.flac",
5770                "the manifest must still point at the surviving old file"
5771            );
5772            assert_eq!(still.format, AudioFormat::Flac);
5773        }
5774
5775        #[test]
5776        fn a_systemic_abort_leaves_no_untracked_destination_files() {
5777            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
5778            // and the rest never commit. Every file remaining on disk must be one
5779            // the manifest tracks: producers write nothing, so an abort cannot
5780            // strand an untracked file from an in-flight or buffered render.
5781            let mut scripted = ScriptedHttp::new().with_auth();
5782            let mut actions = Vec::new();
5783            let mut desireds = Vec::new();
5784            for id in ["a0", "a1", "boom", "a3", "a4"] {
5785                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
5786                let (_c, d, action) = download(id, AudioFormat::Mp3);
5787                actions.push(action);
5788                desireds.push(d);
5789            }
5790            let http = GatedHttp::new(scripted);
5791            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
5792            let plan = Plan { actions };
5793            let mut manifest = Manifest::new();
5794
5795            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
5796
5797            assert_eq!(outcome.status, RunStatus::DiskFull);
5798            let tracked: std::collections::BTreeSet<String> = manifest
5799                .entries
5800                .values()
5801                .map(|entry| entry.path.clone())
5802                .collect();
5803            for path in fs.paths() {
5804                assert!(
5805                    tracked.contains(&path),
5806                    "found an untracked destination file: {path}"
5807                );
5808            }
5809            assert!(
5810                !fs.exists("a3.mp3"),
5811                "uncommitted renders must not be on disk"
5812            );
5813            assert!(
5814                !fs.exists("a4.mp3"),
5815                "uncommitted renders must not be on disk"
5816            );
5817        }
5818
5819        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
5820        /// live: it bumps a shared counter (tracking the peak) when a transcode
5821        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
5822        /// The [transcode, write] window is a superset of the true in-memory hold,
5823        /// so the observed peak upper-bounds the real one.
5824        struct CountingFfmpeg {
5825            inner: StubFfmpeg,
5826            held: Arc<AtomicUsize>,
5827            peak: Arc<AtomicUsize>,
5828        }
5829
5830        impl Ffmpeg for CountingFfmpeg {
5831            fn wav_to_flac(
5832                &self,
5833                wav: &[u8],
5834            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
5835                let fut = self.inner.wav_to_flac(wav);
5836                let held = self.held.clone();
5837                let peak = self.peak.clone();
5838                async move {
5839                    let out = fut.await;
5840                    if out.is_ok() {
5841                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
5842                        peak.fetch_max(now, Ordering::SeqCst);
5843                    }
5844                    out
5845                }
5846            }
5847
5848            fn mp4_to_webp(
5849                &self,
5850                mp4: &[u8],
5851                settings: WebpEncodeSettings,
5852            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
5853                self.inner.mp4_to_webp(mp4, settings)
5854            }
5855        }
5856
5857        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
5858        /// payload counter on each committing write, closing the window opened by
5859        /// [`CountingFfmpeg`].
5860        struct CountingFs {
5861            inner: MemFs,
5862            held: Arc<AtomicUsize>,
5863        }
5864
5865        impl Filesystem for CountingFs {
5866            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
5867                let out = self.inner.write_atomic(path, bytes);
5868                self.held.fetch_sub(1, Ordering::SeqCst);
5869                out
5870            }
5871
5872            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
5873                self.inner.rename(from, to)
5874            }
5875
5876            fn remove(&self, path: &str) -> Result<(), FsError> {
5877                self.inner.remove(path)
5878            }
5879
5880            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
5881                self.inner.prune_empty_dirs(root)
5882            }
5883
5884            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
5885                self.inner.read(path)
5886            }
5887
5888            fn metadata(&self, path: &str) -> Option<FileStat> {
5889                self.inner.metadata(path)
5890            }
5891        }
5892
5893        #[test]
5894        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
5895            // Far more FLAC clips than the concurrency bound. The ordered buffered
5896            // render keeps at most about `concurrency` transcoded payloads live at
5897            // once (never the whole library), so peak held <= concurrency + 1.
5898            let count = 12;
5899            let concurrency = 3;
5900            let mut scripted = ScriptedHttp::new().with_auth();
5901            let mut actions = Vec::new();
5902            let mut desireds = Vec::new();
5903            for i in 0..count {
5904                let id = format!("f{i}");
5905                scripted = scripted
5906                    .route(
5907                        &format!("/gen/{id}/wav_file/"),
5908                        Reply::json(&format!(
5909                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
5910                        )),
5911                    )
5912                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
5913                let (_c, d, action) = download(&id, AudioFormat::Flac);
5914                actions.push(action);
5915                desireds.push(d);
5916            }
5917            let http = GatedHttp::new(scripted);
5918            let held = Arc::new(AtomicUsize::new(0));
5919            let peak = Arc::new(AtomicUsize::new(0));
5920            let ffmpeg = CountingFfmpeg {
5921                inner: StubFfmpeg::flac(),
5922                held: held.clone(),
5923                peak: peak.clone(),
5924            };
5925            let fs = CountingFs {
5926                inner: MemFs::new(),
5927                held: held.clone(),
5928            };
5929            let clock = RecordingClock::new();
5930            let mut albums = BTreeMap::new();
5931            let mut playlists = BTreeMap::new();
5932            let mut manifest = Manifest::new();
5933            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5934            let plan = Plan { actions };
5935
5936            let outcome = pollster::block_on(execute(
5937                &plan,
5938                &mut manifest,
5939                &mut albums,
5940                &mut playlists,
5941                &desireds,
5942                &HashMap::new(),
5943                Ports {
5944                    client: &mut client,
5945                    http: &http,
5946                    fs: &fs,
5947                    ffmpeg: &ffmpeg,
5948                    clock: &clock,
5949                },
5950                &opts_with(concurrency),
5951            ));
5952
5953            assert_eq!(outcome.downloaded, count as usize);
5954            assert_eq!(
5955                held.load(Ordering::SeqCst),
5956                0,
5957                "every payload must be committed"
5958            );
5959            assert!(
5960                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
5961                "peak live payloads {} exceeded the bound {}",
5962                peak.load(Ordering::SeqCst),
5963                concurrency + 1
5964            );
5965            assert!(
5966                peak.load(Ordering::SeqCst) >= 2,
5967                "the render should genuinely overlap, peak was {}",
5968                peak.load(Ordering::SeqCst)
5969            );
5970        }
5971    }
5972}