Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3, tag_wav};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` is the only `&mut` port: it performs the authenticated WAV render
158/// flow and so mutates its cached session. The rest are shared references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a mut SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// The audio-producing actions ([`Download`](Action::Download) and
186/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
187/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
188/// transcode, tag) overlap while the order-sensitive Suno API calls are
189/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
190/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
191/// rename, delete, and artifact writes/deletes) then run serially in plan order.
192///
193/// The outcome is deterministic regardless of completion order: concurrent audio
194/// results are committed to the manifest in plan-index order, so the same plan
195/// always yields the same manifest and counts whatever the concurrency level. A
196/// per-clip failure is recorded and the run continues; only an auth failure or a
197/// full disk aborts, and it does so promptly by stopping further audio work.
198///
199/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
200/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
201/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
202/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
203/// run with the feature off) is tagged exactly as before. The synced `.lrc`
204/// sidecar itself is a generated artifact whose body the caller has already
205/// resolved into the plan, so it is written like any other text sidecar.
206#[allow(clippy::too_many_arguments)]
207pub async fn execute<H, F, G, C>(
208    plan: &Plan,
209    manifest: &mut Manifest,
210    albums: &mut BTreeMap<String, AlbumArt>,
211    playlists: &mut BTreeMap<String, PlaylistState>,
212    desired: &[Desired],
213    synced: &HashMap<String, AlignedLyrics>,
214    ports: Ports<'_, H, F, G, C>,
215    opts: &ExecOptions,
216) -> ExecOutcome
217where
218    H: Http,
219    F: Filesystem,
220    G: Ffmpeg,
221    C: Clock,
222{
223    let Ports {
224        client,
225        http,
226        fs,
227        ffmpeg,
228        clock,
229    } = ports;
230    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
231    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
232    // Every path this run writes, so the inline old-sidecar cleanup never removes
233    // a file another action produces this run (the non-planned twin of
234    // `suppress_path_aliasing`).
235    let write_targets: BTreeSet<String> = plan
236        .actions
237        .iter()
238        .filter_map(|a| match a {
239            Action::Download { path, .. }
240            | Action::Reformat { path, .. }
241            | Action::WriteArtifact { path, .. }
242            | Action::WriteStem { path, .. } => Some(path.clone()),
243            Action::Rename { to, .. } => Some(to.clone()),
244            _ => None,
245        })
246        .collect();
247    // How many tracked artifact slots reference each path. The inline old-path
248    // cleanup removes a path only once nothing else holds it: each slot that
249    // moves away decrements its reference, and the removal fires only when the
250    // count reaches zero and no action writes the path this run. This keeps a
251    // live file a co-referencing slot still owns (a prior failed swap can leave
252    // two clips sharing a path) while letting the last slot to leave reclaim it,
253    // so nothing is orphaned either (#76).
254    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
255    for (_, entry) in manifest.iter() {
256        for path in entry.artifact_paths() {
257            *tracked_paths.entry(path.to_owned()).or_default() += 1;
258        }
259    }
260    for art in albums.values() {
261        for state in [
262            art.folder_jpg.as_ref(),
263            art.folder_webp.as_ref(),
264            art.folder_mp4.as_ref(),
265        ]
266        .into_iter()
267        .flatten()
268        {
269            *tracked_paths.entry(state.path.clone()).or_default() += 1;
270        }
271    }
272    for playlist in playlists.values() {
273        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
274    }
275    // Static cover art is otherwise fetched twice per clip (#89): once to embed
276    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
277    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
278    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
279    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
280    // its entry on use, so the map holds at most the covers for the clips in
281    // flight (bounded by `concurrency`), never the whole library.
282    let cover_wanted: HashSet<&str> = plan
283        .actions
284        .iter()
285        .filter_map(|action| match action {
286            Action::WriteArtifact {
287                kind: ArtifactKind::CoverJpg,
288                source_url,
289                ..
290            } if !source_url.is_empty() => Some(source_url.as_str()),
291            _ => None,
292        })
293        .collect();
294    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
295    // The `both` video-cover retention keeps `cover.webp` (transcoded) and
296    // `cover.mp4` (raw) for an album from the SAME `video_cover_url`. Cache that
297    // source on its first fetch so the second folder artifact drains it rather
298    // than fetching the same MP4 twice (#90 reuses the #89 fetch-once path).
299    let mut folder_cover_uses: HashMap<&str, u32> = HashMap::new();
300    for action in &plan.actions {
301        if let Action::WriteArtifact {
302            kind: ArtifactKind::FolderWebp | ArtifactKind::FolderMp4,
303            source_url,
304            ..
305        } = action
306            && !source_url.is_empty()
307        {
308            *folder_cover_uses.entry(source_url.as_str()).or_default() += 1;
309        }
310    }
311    let shared_cover_urls: HashSet<&str> = folder_cover_uses
312        .into_iter()
313        .filter(|(_, uses)| *uses > 1)
314        .map(|(url, _)| url)
315        .collect();
316    let ctx = Ctx {
317        http,
318        fs,
319        ffmpeg,
320        clock,
321        opts,
322        by_id: &by_id,
323        by_path: &by_path,
324        synced,
325        write_targets: &write_targets,
326        cover_cache: &cover_cache,
327        cover_wanted: &cover_wanted,
328        shared_cover_urls: &shared_cover_urls,
329    };
330
331    let mut outcome = ExecOutcome::default();
332
333    // The audio-producing actions ([`Download`](Action::Download) /
334    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
335    // deliberately split so that NO destination write, file removal, or manifest
336    // update happens off the plan's order:
337    //
338    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
339    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
340    //   tag), returning the tagged bytes; and
341    // - a single serial committer below writes those bytes to the destination,
342    //   removes any superseded file, and records the manifest entry, in strict
343    //   plan-index order, interleaved with the non-audio actions.
344    //
345    // The shared client is the only `&mut` port and its API calls must stay
346    // ordered, so it rides behind an async mutex; each producer locks it only for
347    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
348    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
349    // so at most about `concurrency` tagged payloads are ever held in memory -
350    // never the whole library.
351    let client_lock = AsyncMutex::new(client);
352    let concurrency = opts.concurrency.max(1) as usize;
353    let ctx_ref = &ctx;
354    let client_lock_ref = &client_lock;
355    let mut renders = stream::iter(
356        plan.actions
357            .iter()
358            .filter(|action| is_audio_action(action))
359            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
360    )
361    .buffered(concurrency);
362
363    for action in &plan.actions {
364        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
365        // commit them here; every other action applies its own effect. Both the
366        // audio commit and the non-audio apply run serially, so all destination
367        // and manifest effects keep the plan's order exactly as the sequential
368        // executor did.
369        let result = if is_audio_action(action) {
370            match renders.next().await {
371                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
372                Some(Err(fail)) => Err(fail),
373                None => unreachable!("buffered yields one result per audio action"),
374            }
375        } else {
376            ctx.apply(
377                client_lock_ref,
378                action,
379                manifest,
380                albums,
381                playlists,
382                &mut tracked_paths,
383            )
384            .await
385        };
386        match result {
387            Ok(effect) => outcome.record(effect),
388            Err(fail) => {
389                let abort = abort_status(fail.class);
390                outcome.failures.push(Failure {
391                    clip_id: fail.clip_id,
392                    reason: fail.reason,
393                });
394                if let Some(status) = abort {
395                    // A systemic abort stops the run. Dropping the render stream
396                    // cancels any in-flight or completed-but-uncommitted producer;
397                    // because producers touch nothing on disk, the destination and
398                    // manifest are left exactly as the committed prefix wrote them,
399                    // with no untracked files and no removed-but-referenced file.
400                    outcome.status = status;
401                    break;
402                }
403            }
404        }
405    }
406    drop(renders);
407
408    // Renames and deletes can leave an album directory empty; prune those ghost
409    // directories bottom-up. This runs on both the completed and the aborted
410    // paths, and is best-effort: a prune failure is only a missed tidy that the
411    // next run repeats, never a reason to fail the run.
412    let _ = fs.prune_empty_dirs("");
413    outcome
414}
415
416/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
417/// file. Its slow render runs in the concurrent phase; its destination write and
418/// manifest update are committed serially in plan order. Everything else touches
419/// the manifest, album, or playlist stores directly and runs serially.
420fn is_audio_action(action: &Action) -> bool {
421    matches!(action, Action::Download { .. } | Action::Reformat { .. })
422}
423
424/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
425/// committer needs to place them. Produced concurrently and side-effect-free (no
426/// destination write, no removal, no manifest touch); [`commit_audio`] applies
427/// all of those in plan order.
428struct RenderedAudio {
429    clip_id: String,
430    path: String,
431    format: AudioFormat,
432    /// The superseded file to remove after the new one lands (a [`Reformat`]),
433    /// or `None` for a plain [`Download`].
434    from_path: Option<String>,
435    effect: Effect,
436    bytes: Vec<u8>,
437}
438
439/// What an applied action did, for the outcome counters.
440enum Effect {
441    Downloaded,
442    Reformatted,
443    Retagged,
444    Renamed,
445    Deleted,
446    Skipped,
447    ArtifactWritten,
448    ArtifactDeleted,
449}
450
451/// How a failure should be handled (SYNC-17).
452#[derive(Debug, Clone, Copy)]
453enum Class {
454    /// Stop the account run; do not retry.
455    Auth,
456    /// Stop the account run: a full disk is systemic, like auth, so aborting
457    /// beats skipping every remaining clip (each of which would first burn a
458    /// server-side WAV-render budget before failing the same way).
459    Disk,
460    /// Retry a bounded number of times, then record and skip.
461    Transient,
462    /// Record and skip immediately.
463    Permanent,
464}
465
466/// A classified action failure attributed to a clip.
467struct Fail {
468    class: Class,
469    clip_id: String,
470    reason: String,
471}
472
473/// The run-ending status for a failure class, or `None` when the failure is
474/// per-clip and the run continues.
475fn abort_status(class: Class) -> Option<RunStatus> {
476    match class {
477        Class::Auth => Some(RunStatus::AuthAborted),
478        Class::Disk => Some(RunStatus::DiskFull),
479        Class::Transient | Class::Permanent => None,
480    }
481}
482
483fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
484    Fail {
485        class: Class::Auth,
486        clip_id: clip_id.into(),
487        reason: reason.into(),
488    }
489}
490
491fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
492    Fail {
493        class: Class::Transient,
494        clip_id: clip_id.into(),
495        reason: reason.into(),
496    }
497}
498
499fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
500    Fail {
501        class: Class::Permanent,
502        clip_id: clip_id.into(),
503        reason: reason.into(),
504    }
505}
506
507fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
508    Fail {
509        class: Class::Disk,
510        clip_id: clip_id.into(),
511        reason: reason.into(),
512    }
513}
514
515/// Whether an artifact kind is album-scoped folder art (owned by a root id and
516/// recorded on the album store) rather than a per-clip sidecar (recorded on the
517/// manifest).
518fn is_album_kind(kind: ArtifactKind) -> bool {
519    matches!(
520        kind,
521        ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4
522    )
523}
524
525/// True for the library-scoped playlist artifact, routed to the playlist store.
526fn is_playlist_kind(kind: ArtifactKind) -> bool {
527    matches!(kind, ArtifactKind::Playlist)
528}
529
530/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
531/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
532/// root/playlist id that is deliberately absent from the manifest.
533fn is_per_clip_kind(kind: ArtifactKind) -> bool {
534    matches!(
535        kind,
536        ArtifactKind::CoverJpg
537            | ArtifactKind::CoverWebp
538            | ArtifactKind::DetailsTxt
539            | ArtifactKind::LyricsTxt
540            | ArtifactKind::Lrc
541            | ArtifactKind::VideoMp4
542    )
543}
544
545/// Recover a playlist's display name from its `.m3u8` path's file stem.
546///
547/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
548/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
549/// this recovered name is a convenience for humans and its lossiness (the
550/// sanitiser is not reversible) never affects a decision.
551fn playlist_name_from_path(path: &str) -> String {
552    std::path::Path::new(path)
553        .file_stem()
554        .map(|stem| stem.to_string_lossy().into_owned())
555        .unwrap_or_default()
556}
557
558/// A classified fetch failure, not yet attributed to a clip.
559struct FetchError {
560    class: Class,
561    reason: String,
562    retry_after: Option<Duration>,
563}
564
565impl FetchError {
566    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
567        Self {
568            class: Class::Transient,
569            reason: reason.into(),
570            retry_after,
571        }
572    }
573
574    fn permanent(reason: impl Into<String>) -> Self {
575        Self {
576            class: Class::Permanent,
577            reason: reason.into(),
578            retry_after: None,
579        }
580    }
581
582    fn attribute(self, clip_id: &str) -> Fail {
583        Fail {
584            class: self.class,
585            clip_id: clip_id.to_owned(),
586            reason: self.reason,
587        }
588    }
589}
590
591/// The shared, read-only context threaded through every action handler.
592struct Ctx<'a, H, F, G, C> {
593    http: &'a H,
594    fs: &'a F,
595    ffmpeg: &'a G,
596    clock: &'a C,
597    opts: &'a ExecOptions,
598    by_id: &'a HashMap<&'a str, &'a Desired>,
599    by_path: &'a HashMap<&'a str, &'a Desired>,
600    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
601    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
602    /// lyric text; a clip absent here is tagged exactly as before. Populated by
603    /// the caller (the fetch is IO), so the engine stays free of direct IO.
604    synced: &'a HashMap<String, AlignedLyrics>,
605    /// Every destination path this run writes (audio downloads and reformats,
606    /// artifact writes, and rename targets). The inline old-sidecar cleanup in
607    /// [`write_artifact`](Ctx::write_artifact) skips any path in this set, so a
608    /// path swap between two clips can never delete a file the same run just
609    /// wrote. This mirrors [`suppress_path_aliasing`] for the one removal that
610    /// is not itself a planned action.
611    write_targets: &'a BTreeSet<String>,
612    /// Static cover art the audio producer already fetched to embed in the tag,
613    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
614    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
615    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
616    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
617    /// concurrent producers only ever insert, and the lock is never held across an
618    /// await.
619    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
620    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
621    /// a cover only when its URL is here, so a clip whose cover is embedded but
622    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
623    cover_wanted: &'a HashSet<&'a str>,
624    /// Album video-cover source URLs fetched by more than one folder artifact
625    /// this run. The `both` retention derives `cover.webp` (transcoded) and
626    /// `cover.mp4` (raw) from the SAME `video_cover_url`; the first fetch caches
627    /// the raw source here so the sibling drains it instead of re-fetching (#90
628    /// reuses the #89 fetch-once path). `FolderWebp` sorts before `FolderMp4`, so
629    /// the raw source is always cached before the raw sidecar reads it.
630    shared_cover_urls: &'a HashSet<&'a str>,
631}
632
633impl<H, F, G, C> Ctx<'_, H, F, G, C>
634where
635    H: Http,
636    F: Filesystem,
637    G: Ffmpeg,
638    C: Clock,
639{
640    /// Apply one non-audio action, returning what it did or why it failed.
641    ///
642    /// Audio actions ([`Download`](Action::Download) /
643    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
644    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
645    async fn apply(
646        &self,
647        client_lock: &ClientLock<'_, C>,
648        action: &Action,
649        manifest: &mut Manifest,
650        albums: &mut BTreeMap<String, AlbumArt>,
651        playlists: &mut BTreeMap<String, PlaylistState>,
652        tracked_paths: &mut HashMap<String, u32>,
653    ) -> Result<Effect, Fail> {
654        match action {
655            Action::Download { .. } | Action::Reformat { .. } => {
656                unreachable!("audio actions are applied in the concurrent phase")
657            }
658            Action::Retag {
659                clip,
660                lineage,
661                path,
662            } => self.retag(manifest, clip, lineage, path).await,
663            Action::Rename { from, to } => self.rename(manifest, from, to),
664            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
665            Action::Skip { clip_id } => {
666                self.refresh_preserve(manifest, clip_id);
667                Ok(Effect::Skipped)
668            }
669            Action::WriteArtifact {
670                kind,
671                path,
672                source_url,
673                hash,
674                owner_id,
675                content,
676            } => {
677                self.write_artifact(
678                    manifest,
679                    albums,
680                    playlists,
681                    *kind,
682                    path,
683                    source_url,
684                    hash,
685                    owner_id,
686                    content.as_deref(),
687                    tracked_paths,
688                )
689                .await
690            }
691            Action::DeleteArtifact {
692                kind,
693                path,
694                owner_id,
695            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
696            Action::WriteStem {
697                clip_id,
698                key,
699                stem_id,
700                path,
701                source_url,
702                format,
703                hash,
704            } => {
705                self.write_stem(
706                    client_lock,
707                    manifest,
708                    clip_id,
709                    key,
710                    stem_id,
711                    path,
712                    source_url,
713                    *format,
714                    hash,
715                )
716                .await
717            }
718            Action::DeleteStem { clip_id, key, path } => {
719                self.delete_stem(manifest, clip_id, key, path)
720            }
721        }
722    }
723
724    /// Render one audio action's tagged bytes, side-effect-free.
725    ///
726    /// This is the concurrent part: it fetches, transcodes, and tags the file
727    /// (through shared ports, plus the client behind `client_lock`), then returns
728    /// the bytes and where they must go. It deliberately writes nothing, removes
729    /// nothing, and never touches `manifest`, so many run at once and an aborted
730    /// run can drop them with no destination or manifest effect. The serial
731    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
732    async fn prepare_audio(
733        &self,
734        client_lock: &ClientLock<'_, C>,
735        action: &Action,
736    ) -> Result<RenderedAudio, Fail> {
737        match action {
738            Action::Download {
739                clip,
740                lineage,
741                path,
742                format,
743            } => {
744                let bytes = self
745                    .produce_audio(client_lock, clip, lineage, *format)
746                    .await?;
747                Ok(RenderedAudio {
748                    clip_id: clip.id.clone(),
749                    path: path.clone(),
750                    format: *format,
751                    from_path: None,
752                    effect: Effect::Downloaded,
753                    bytes,
754                })
755            }
756            Action::Reformat {
757                clip,
758                path,
759                from_path,
760                from: _,
761                to,
762            } => {
763                // A Reformat action carries no lineage, so recover it from the
764                // desired set (the same context that drove naming and the hash),
765                // falling back to a self-rooted context when the clip is not in
766                // the current selection.
767                let lineage = self
768                    .by_id
769                    .get(clip.id.as_str())
770                    .map(|d| d.lineage.clone())
771                    .unwrap_or_else(|| LineageContext::own_root(clip));
772                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
773                Ok(RenderedAudio {
774                    clip_id: clip.id.clone(),
775                    path: path.clone(),
776                    format: *to,
777                    from_path: Some(from_path.clone()),
778                    effect: Effect::Reformatted,
779                    bytes,
780                })
781            }
782            _ => unreachable!("prepare_audio only handles audio actions"),
783        }
784    }
785
786    /// Commit one rendered audio result serially, in plan order.
787    ///
788    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
789    /// the superseded file, then records the manifest entry. Ordering the write
790    /// before the removal keeps a crash from losing both copies; keeping all of
791    /// this off the concurrent phase preserves the sequential executor's plan-order
792    /// guarantee for every destination and manifest effect.
793    fn commit_audio(
794        &self,
795        manifest: &mut Manifest,
796        rendered: RenderedAudio,
797    ) -> Result<Effect, Fail> {
798        let RenderedAudio {
799            clip_id,
800            path,
801            format,
802            from_path,
803            effect,
804            bytes,
805        } = rendered;
806        let size = self.write_verify(&clip_id, &path, &bytes)?;
807        if let Some(from) = from_path {
808            // The new file is safely in place; only now drop the old rendering.
809            self.fs.remove(&from).map_err(|err| {
810                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
811            })?;
812        }
813        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
814        Ok(effect)
815    }
816
817    /// Re-tag the existing file in place to match current metadata and art.
818    async fn retag(
819        &self,
820        manifest: &mut Manifest,
821        clip: &Clip,
822        lineage: &LineageContext,
823        path: &str,
824    ) -> Result<Effect, Fail> {
825        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
826            return Err(permanent_fail(
827                &clip.id,
828                "retag target missing from manifest",
829            ));
830        };
831
832        if format == AudioFormat::Wav {
833            let (meta, synced) = self.track_meta(clip, lineage);
834            let cover = self.fetch_cover(clip).await;
835            let existing = self.fs.read(path).map_err(|err| {
836                permanent_fail(&clip.id, format!("could not read for retag: {err}"))
837            })?;
838            let tagged = tag_wav(&existing, &meta, cover.as_deref(), synced)
839                .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
840            let size = self.write_verify(&clip.id, path, &tagged)?;
841            self.refresh_hashes(manifest, &clip.id, Some(size));
842            return Ok(Effect::Retagged);
843        }
844
845        let (meta, synced) = self.track_meta(clip, lineage);
846        let cover = self.fetch_cover(clip).await;
847        let existing = self
848            .fs
849            .read(path)
850            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
851        let tagged = match format {
852            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
853            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
854            AudioFormat::Wav => unreachable!("WAV handled above"),
855        }
856        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
857        let size = self.write_verify(&clip.id, path, &tagged)?;
858        self.refresh_hashes(manifest, &clip.id, Some(size));
859        Ok(Effect::Retagged)
860    }
861
862    /// Move the file and update the entry's path (and protection).
863    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
864        let label = self
865            .by_path
866            .get(to)
867            .map(|d| d.clip.id.clone())
868            .unwrap_or_else(|| to.to_owned());
869        self.fs.rename(from, to).map_err(|err| {
870            if err.is_out_of_space() {
871                disk_fail(label, "disk full: no space left to rename")
872            } else {
873                permanent_fail(label, format!("rename failed: {err}"))
874            }
875        })?;
876
877        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
878            manifest
879                .entries
880                .iter()
881                .find(|(_, entry)| entry.path == from)
882                .map(|(id, _)| id.clone())
883        });
884        if let Some(id) = clip_id
885            && let Some(entry) = manifest.entries.get_mut(&id)
886        {
887            entry.path = to.to_owned();
888            if let Some(d) = self.by_path.get(to) {
889                entry.preserve = preserve_for(d);
890            }
891        }
892        Ok(Effect::Renamed)
893    }
894
895    /// Remove the file and drop the manifest entry.
896    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
897        self.fs
898            .remove(path)
899            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
900        manifest.remove(clip_id);
901        Ok(Effect::Deleted)
902    }
903
904    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
905    /// on the owning manifest entry.
906    ///
907    /// The fetch and write share the audio path's resilience: `fetch_bytes`
908    /// retries transient failures and verifies `Content-Length`, and
909    /// `write_verify` confirms the on-disk size. A failure is attributed to the
910    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
911    /// aborts the whole run (only an auth failure or a full disk does, matching
912    /// audio).
913    ///
914    /// The bytes written depend on the kind: a static cover is the fetched image
915    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
916    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
917    ///
918    /// A sidecar is only ever written for a clip whose audio is present: a
919    /// successful `Download`/`Reformat` creates the manifest entry earlier in
920    /// this run, and a prior-run clip already has one. So an absent owning entry
921    /// means the audio failed or never existed this run; we skip (no fetch, no
922    /// write) rather than strand an untracked sidecar with no owning audio.
923    ///
924    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg),
925    /// [`FolderWebp`](ArtifactKind::FolderWebp), and
926    /// [`FolderMp4`](ArtifactKind::FolderMp4)) is album-scoped: its `owner_id`
927    /// is the album's stable root id, not a manifest clip, so it skips the
928    /// manifest presence guard and records its state on the album store instead.
929    ///
930    /// When a title or album change moves the audio, reconcile re-emits this
931    /// write at the NEW path; this handler then removes the sidecar left at the
932    /// artifact's previously tracked path, moving it rather than orphaning it.
933    /// The removal happens only after the new file is safely written, and a
934    /// remove failure returns before the state slot advances, so the next run
935    /// re-plans the identical write and retries — self-healing, never an orphan.
936    #[allow(clippy::too_many_arguments)]
937    async fn write_artifact(
938        &self,
939        manifest: &mut Manifest,
940        albums: &mut BTreeMap<String, AlbumArt>,
941        playlists: &mut BTreeMap<String, PlaylistState>,
942        kind: ArtifactKind,
943        path: &str,
944        source_url: &str,
945        hash: &str,
946        owner_id: &str,
947        content: Option<&str>,
948        tracked_paths: &mut HashMap<String, u32>,
949    ) -> Result<Effect, Fail> {
950        // A per-song sidecar needs its owning clip's manifest entry; album and
951        // playlist kinds are keyed elsewhere and skip this guard.
952        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
953            // The owning audio never landed this run, so this sidecar is skipped
954            // and will never drain a cover the producer cached for it. Drop that
955            // entry now: an insert without a matching sidecar write must not
956            // outlive its clip, keeping `cover_cache` bounded to the clips in
957            // flight (#89). A non-cover kind has no entry here, so this is a
958            // harmless no-op for them.
959            self.cover_cache
960                .lock()
961                .expect("cover cache mutex poisoned")
962                .remove(source_url);
963            return Ok(Effect::Skipped);
964        }
965        // Capture the path this artifact was last tracked at, BEFORE the slot is
966        // overwritten below, so a path-changing write (a title/album rename that
967        // moves the audio) can clean up the old sidecar it left behind. Cover
968        // kinds live on the manifest, folder kinds on the album store; playlists
969        // reconcile their own old-path delete and so opt out here.
970        let old_path = match kind {
971            ArtifactKind::CoverJpg => manifest
972                .get(owner_id)
973                .and_then(|e| e.cover_jpg.as_ref())
974                .map(|s| s.path.clone()),
975            ArtifactKind::CoverWebp => manifest
976                .get(owner_id)
977                .and_then(|e| e.cover_webp.as_ref())
978                .map(|s| s.path.clone()),
979            ArtifactKind::DetailsTxt => manifest
980                .get(owner_id)
981                .and_then(|e| e.details_txt.as_ref())
982                .map(|s| s.path.clone()),
983            ArtifactKind::LyricsTxt => manifest
984                .get(owner_id)
985                .and_then(|e| e.lyrics_txt.as_ref())
986                .map(|s| s.path.clone()),
987            ArtifactKind::Lrc => manifest
988                .get(owner_id)
989                .and_then(|e| e.lrc.as_ref())
990                .map(|s| s.path.clone()),
991            ArtifactKind::VideoMp4 => manifest
992                .get(owner_id)
993                .and_then(|e| e.video_mp4.as_ref())
994                .map(|s| s.path.clone()),
995            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4 => albums
996                .get(owner_id)
997                .and_then(|a| a.artifact(kind))
998                .map(|s| s.path.clone()),
999            ArtifactKind::Playlist => None,
1000        };
1001        // A generated artifact (a playlist) carries its body inline and never
1002        // touches the network; a fetched one pulls (and transcodes) its source.
1003        let bytes = match content {
1004            Some(text) => text.as_bytes().to_vec(),
1005            None => self.artifact_bytes(kind, source_url, owner_id).await?,
1006        };
1007        self.write_verify(owner_id, path, &bytes)?;
1008        // The new sidecar is safely in place; only now drop a stale copy left at
1009        // the previous path (the audio moved). `remove` is idempotent, so an
1010        // already-absent old file is fine. On a genuine remove failure we return
1011        // BEFORE updating the slot, leaving the manifest/album pointing at the
1012        // old path: the next run sees the same path drift, re-plans this write,
1013        // and retries the cleanup — convergent, no orphan persists.
1014        //
1015        // The removal is gated so it can never delete a live file (#76). This
1016        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
1017        // file is removed only once nothing else holds it — no other tracked slot
1018        // still references it (count now zero) and no action writes it this run
1019        // (`write_targets`, the non-planned twin of `suppress_path_aliasing`).
1020        // On a path swap (A: x -> y while B: y -> x) `write_targets` keeps each
1021        // freshly written file; when two slots share a path after a prior failed
1022        // swap, the first to move keeps it and the last to leave reclaims it, so
1023        // a co-owned file is never deleted and a vacated one is never orphaned.
1024        if let Some(old) = old_path.as_deref()
1025            && !old.is_empty()
1026            && old != path
1027        {
1028            let still_referenced = tracked_paths
1029                .get_mut(old)
1030                .map(|count| {
1031                    *count = count.saturating_sub(1);
1032                    *count > 0
1033                })
1034                .unwrap_or(false);
1035            if !still_referenced && !self.write_targets.contains(old) {
1036                self.fs.remove(old).map_err(|err| {
1037                    permanent_fail(
1038                        owner_id,
1039                        format!("could not remove old sidecar {old}: {err}"),
1040                    )
1041                })?;
1042            }
1043        }
1044        if is_album_kind(kind) {
1045            albums.entry(owner_id.to_owned()).or_default().set(
1046                kind,
1047                Some(ArtifactState {
1048                    path: path.to_owned(),
1049                    hash: hash.to_owned(),
1050                }),
1051            );
1052        } else if is_playlist_kind(kind) {
1053            playlists.insert(
1054                owner_id.to_owned(),
1055                PlaylistState {
1056                    name: playlist_name_from_path(path),
1057                    path: path.to_owned(),
1058                    hash: hash.to_owned(),
1059                },
1060            );
1061        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1062            set_manifest_artifact(
1063                entry,
1064                kind,
1065                Some(ArtifactState {
1066                    path: path.to_owned(),
1067                    hash: hash.to_owned(),
1068                }),
1069            );
1070        }
1071        Ok(Effect::ArtifactWritten)
1072    }
1073
1074    /// Produce a sidecar's bytes from its source, branching on kind.
1075    ///
1076    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1077    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1078    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1079    /// ffmpeg port; every other kind is the fetched source verbatim (the static
1080    /// [`CoverJpg`](ArtifactKind::CoverJpg) / album [`FolderJpg`](ArtifactKind::FolderJpg)
1081    /// image, or the raw album [`FolderMp4`](ArtifactKind::FolderMp4) whose
1082    /// `video_cover_url` is kept untranscoded). A fetch or transcode failure
1083    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1084    /// disk-full transcode, which aborts the run like the audio FLAC path.
1085    async fn artifact_bytes(
1086        &self,
1087        kind: ArtifactKind,
1088        source_url: &str,
1089        owner_id: &str,
1090    ) -> Result<Vec<u8>, Fail> {
1091        // Reuse the cover the audio producer already fetched for the embedded tag
1092        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1093        // is taken and dropped in its own statement so it never spans the await.
1094        let cached = self
1095            .cover_cache
1096            .lock()
1097            .expect("cover cache mutex poisoned")
1098            .remove(source_url);
1099        let source = match cached {
1100            Some(bytes) => bytes,
1101            None => {
1102                let fetched = self
1103                    .fetch_bytes(source_url)
1104                    .await
1105                    .map_err(|err| err.attribute(owner_id))?;
1106                // Cache the raw source when a sibling folder artifact will fetch
1107                // the same URL (the `both` retention: cover.webp + cover.mp4), so
1108                // it is fetched exactly once. Bounded to shared URLs and drained
1109                // on the sibling's use.
1110                if self.shared_cover_urls.contains(source_url) {
1111                    self.cover_cache
1112                        .lock()
1113                        .expect("cover cache mutex poisoned")
1114                        .insert(source_url.to_owned(), fetched.clone());
1115                }
1116                fetched
1117            }
1118        };
1119        match kind {
1120            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1121                .ffmpeg
1122                .mp4_to_webp(&source, self.opts.cover_webp)
1123                .await
1124                .map_err(|err| {
1125                    if err.is_out_of_space() {
1126                        disk_fail(owner_id, "disk full: no space left to transcode")
1127                    } else {
1128                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1129                    }
1130                }),
1131            // The text sidecars are generated and always carry inline content, so
1132            // `write_artifact` never reaches this fetch path for them. Guard it so
1133            // a future miswiring fails loudly rather than fetching a URL.
1134            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1135                permanent_fail(owner_id, "text sidecar requires inline content"),
1136            ),
1137            ArtifactKind::CoverJpg
1138            | ArtifactKind::FolderJpg
1139            | ArtifactKind::FolderMp4
1140            | ArtifactKind::Playlist
1141            | ArtifactKind::VideoMp4 => Ok(source),
1142        }
1143    }
1144
1145    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1146    ///
1147    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1148    /// When the owning entry is already gone (its audio was deleted earlier this
1149    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1150    ///
1151    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1152    /// the album's root id, not on a manifest clip.
1153    ///
1154    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1155    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1156    /// untracked, but the design stays convergent rather than transactional: the
1157    /// next run re-plans the same removal and retries, and any directory it would
1158    /// have emptied is pruned once the file finally clears.
1159    fn delete_artifact(
1160        &self,
1161        manifest: &mut Manifest,
1162        albums: &mut BTreeMap<String, AlbumArt>,
1163        playlists: &mut BTreeMap<String, PlaylistState>,
1164        kind: ArtifactKind,
1165        path: &str,
1166        owner_id: &str,
1167    ) -> Result<Effect, Fail> {
1168        self.fs
1169            .remove(path)
1170            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1171        if is_album_kind(kind) {
1172            if let Some(art) = albums.get_mut(owner_id) {
1173                art.set(kind, None);
1174                if art.is_empty() {
1175                    albums.remove(owner_id);
1176                }
1177            }
1178        } else if is_playlist_kind(kind) {
1179            playlists.remove(owner_id);
1180        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1181            set_manifest_artifact(entry, kind, None);
1182        }
1183        Ok(Effect::ArtifactDeleted)
1184    }
1185
1186    /// Fetch one stem's bytes, write them atomically, then record the stem on
1187    /// the owning clip's keyed stem map.
1188    ///
1189    /// Mirrors [`write_artifact`](Self::write_artifact) for the keyed-stem case,
1190    /// sharing the fetch resilience (`fetch_bytes` retries and verifies
1191    /// `Content-Length`) and the atomic size-verified write. A stem is only ever
1192    /// written for a clip whose audio is present, so an absent owning manifest
1193    /// entry means the audio failed or never existed this run; we skip rather
1194    /// than strand an untracked stem with no owning audio.
1195    ///
1196    /// Stems are stored RAW in their native container and are NEVER transcoded to
1197    /// FLAC, even when the song's own format is FLAC — they are the deliberate
1198    /// exception. A `Wav` stem is rendered through the free `convert_wav` flow
1199    /// (see [`fetch_stem_bytes`](Self::fetch_stem_bytes)); an `Mp3` stem is fetched
1200    /// straight from its public CDN url. Either way the bytes land verbatim at
1201    /// `path`, whose extension already matches the stem format.
1202    ///
1203    /// When a title/album change moves the song, reconcile re-emits this write at
1204    /// the NEW path; this handler then removes the stem left at the previously
1205    /// tracked path, moving it rather than orphaning it. The removal happens only
1206    /// after the new file is safely written and only when nothing else this run
1207    /// writes that path, and a remove failure returns before the slot advances so
1208    /// the next run re-plans the identical write and retries — self-healing.
1209    #[allow(clippy::too_many_arguments)]
1210    async fn write_stem(
1211        &self,
1212        client_lock: &ClientLock<'_, C>,
1213        manifest: &mut Manifest,
1214        clip_id: &str,
1215        key: &str,
1216        stem_id: &str,
1217        path: &str,
1218        source_url: &str,
1219        format: StemFormat,
1220        hash: &str,
1221    ) -> Result<Effect, Fail> {
1222        // A stem needs its owning clip's manifest entry (its audio must exist).
1223        if manifest.get(clip_id).is_none() {
1224            return Ok(Effect::Skipped);
1225        }
1226        let old_path = manifest
1227            .get(clip_id)
1228            .and_then(|e| e.stems.get(key))
1229            .map(|s| s.path.clone());
1230        let bytes = self
1231            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1232            .await?;
1233        self.write_verify(clip_id, path, &bytes)?;
1234        // The new stem is in place; only now drop a stale copy left at the old
1235        // path (the song moved, or the stem format changed). `remove` is
1236        // idempotent. A path this run also writes is never removed (the
1237        // non-planned twin of `suppress_path_aliasing`). On a genuine remove
1238        // failure we return BEFORE updating the slot, so the next run re-plans the
1239        // same write and retries the cleanup — no orphan.
1240        if let Some(old) = old_path.as_deref()
1241            && !old.is_empty()
1242            && old != path
1243            && !self.write_targets.contains(old)
1244        {
1245            self.fs.remove(old).map_err(|err| {
1246                permanent_fail(clip_id, format!("could not remove old stem {old}: {err}"))
1247            })?;
1248        }
1249        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1250            set_manifest_stem(
1251                entry,
1252                key,
1253                Some(ArtifactState {
1254                    path: path.to_owned(),
1255                    hash: hash.to_owned(),
1256                }),
1257            );
1258        }
1259        Ok(Effect::ArtifactWritten)
1260    }
1261
1262    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1263    ///
1264    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1265    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1266    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1267    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1268    /// stem with no id to render) downloads its public CDN url directly. Stems
1269    /// are the deliberate exception to the source format: the bytes are returned
1270    /// exactly as delivered and are never re-encoded to FLAC.
1271    async fn fetch_stem_bytes(
1272        &self,
1273        client_lock: &ClientLock<'_, C>,
1274        clip_id: &str,
1275        stem_id: &str,
1276        source_url: &str,
1277        format: StemFormat,
1278    ) -> Result<Vec<u8>, Fail> {
1279        let url = match format {
1280            StemFormat::Wav if !stem_id.is_empty() => {
1281                match self.resolve_wav_url(client_lock, stem_id).await? {
1282                    Some(url) => url,
1283                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1284                }
1285            }
1286            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1287            _ => source_url.to_owned(),
1288        };
1289        self.fetch_bytes(&url)
1290            .await
1291            .map_err(|err| err.attribute(clip_id))
1292    }
1293
1294    /// Remove one stem file and clear its slot in the owning clip's stem map.
1295    ///
1296    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1297    /// the owning entry is already gone (its audio was deleted earlier this run,
1298    /// co-deleting the stem), there is no slot to clear and that is fine; the
1299    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1300    fn delete_stem(
1301        &self,
1302        manifest: &mut Manifest,
1303        clip_id: &str,
1304        key: &str,
1305        path: &str,
1306    ) -> Result<Effect, Fail> {
1307        self.fs
1308            .remove(path)
1309            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1310        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1311            set_manifest_stem(entry, key, None);
1312        }
1313        Ok(Effect::ArtifactDeleted)
1314    }
1315
1316    /// Download (and transcode/tag) the audio for `clip` in `format`.
1317    async fn produce_audio(
1318        &self,
1319        client_lock: &ClientLock<'_, C>,
1320        clip: &Clip,
1321        lineage: &LineageContext,
1322        format: AudioFormat,
1323    ) -> Result<Vec<u8>, Fail> {
1324        let (meta, synced) = self.track_meta(clip, lineage);
1325        match format {
1326            AudioFormat::Mp3 => {
1327                let url = clip.mp3_url();
1328                let audio = self
1329                    .fetch_bytes(&url)
1330                    .await
1331                    .map_err(|err| err.attribute(&clip.id))?;
1332                let cover = self.fetch_cover(clip).await;
1333                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1334                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1335            }
1336            AudioFormat::Flac => {
1337                let wav = self.fetch_wav(client_lock, clip).await?;
1338                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1339                    if err.is_out_of_space() {
1340                        disk_fail(&clip.id, "disk full: no space left to transcode")
1341                    } else {
1342                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1343                    }
1344                })?;
1345                let cover = self.fetch_cover(clip).await;
1346                tag_flac(&flac, &meta, cover.as_deref())
1347                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1348            }
1349            AudioFormat::Wav => {
1350                let wav = self.fetch_wav(client_lock, clip).await?;
1351                let cover = self.fetch_cover(clip).await;
1352                tag_wav(&wav, &meta, cover.as_deref(), synced)
1353                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1354            }
1355        }
1356    }
1357
1358    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1359    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1360        self.synced
1361            .get(clip_id)
1362            .filter(|aligned| !aligned.is_empty())
1363    }
1364
1365    /// The track metadata for a clip, paired with its synced lyrics (if any).
1366    ///
1367    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1368    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1369    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1370    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1371    fn track_meta<'m>(
1372        &'m self,
1373        clip: &Clip,
1374        lineage: &LineageContext,
1375    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1376        let synced = self.synced_for(&clip.id);
1377        let mut meta = TrackMetadata::from_clip(clip, lineage);
1378        if let Some(aligned) = synced {
1379            meta.lyrics = aligned.plain_text();
1380        }
1381        (meta, synced)
1382    }
1383
1384    /// Resolve the rendered WAV URL and download it.
1385    async fn fetch_wav(
1386        &self,
1387        client_lock: &ClientLock<'_, C>,
1388        clip: &Clip,
1389    ) -> Result<Vec<u8>, Fail> {
1390        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1391            Some(url) => url,
1392            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1393        };
1394        self.fetch_bytes(&url)
1395            .await
1396            .map_err(|err| err.attribute(&clip.id))
1397    }
1398
1399    /// Read the WAV URL, requesting a render and polling if it is not ready.
1400    ///
1401    /// `None` means the render did not become ready within the poll budget; the
1402    /// caller treats that as a non-fatal transient failure, never a silent skip.
1403    ///
1404    /// Each client call briefly locks `client_lock`; the poll waits happen
1405    /// unlocked, so concurrent clips interleave their WAV renders rather than
1406    /// serialising behind one clip's whole poll budget.
1407    async fn resolve_wav_url(
1408        &self,
1409        client_lock: &ClientLock<'_, C>,
1410        id: &str,
1411    ) -> Result<Option<String>, Fail> {
1412        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1413            return Ok(Some(url));
1414        }
1415        self.request_wav_retrying(client_lock, id).await?;
1416        for _ in 0..self.opts.wav_poll_attempts {
1417            self.clock.sleep(self.opts.wav_poll_interval).await;
1418            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1419                return Ok(Some(url));
1420            }
1421        }
1422        Ok(None)
1423    }
1424
1425    /// Read the rendered WAV URL, retrying transient API failures with backoff
1426    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1427    async fn wav_url_retrying(
1428        &self,
1429        client_lock: &ClientLock<'_, C>,
1430        id: &str,
1431    ) -> Result<Option<String>, Fail> {
1432        let mut attempt: u32 = 0;
1433        loop {
1434            let result = {
1435                let mut client = client_lock.lock().await;
1436                client.wav_url(self.http, id).await
1437            };
1438            match result {
1439                Ok(url) => return Ok(url),
1440                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1441                    Some(fail) => return Err(fail),
1442                    None => continue,
1443                },
1444            }
1445        }
1446    }
1447
1448    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1449    async fn request_wav_retrying(
1450        &self,
1451        client_lock: &ClientLock<'_, C>,
1452        id: &str,
1453    ) -> Result<(), Fail> {
1454        let mut attempt: u32 = 0;
1455        loop {
1456            let result = {
1457                let mut client = client_lock.lock().await;
1458                client.request_wav(self.http, id).await
1459            };
1460            match result {
1461                Ok(()) => return Ok(()),
1462                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1463                    Some(fail) => return Err(fail),
1464                    None => continue,
1465                },
1466            }
1467        }
1468    }
1469
1470    /// Classify a core error from the authenticated WAV flow. On a transient
1471    /// class within budget, back off through the [`Clock`] and return `None` to
1472    /// retry; otherwise return the terminal [`Fail`].
1473    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1474        let fail = classify_core(id, err);
1475        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1476            self.clock.sleep(backoff_delay(*attempt, None)).await;
1477            *attempt += 1;
1478            None
1479        } else {
1480            Some(fail)
1481        }
1482    }
1483
1484    /// GET `url`, retrying transient failures with backoff, verifying size.
1485    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1486        let mut attempt: u32 = 0;
1487        loop {
1488            let result = self.http.send(HttpRequest::get(url)).await;
1489            match classify_response(result) {
1490                Ok(body) => return Ok(body),
1491                Err(err) => {
1492                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1493                        let delay = backoff_delay(attempt, err.retry_after);
1494                        self.clock.sleep(delay).await;
1495                        attempt += 1;
1496                        continue;
1497                    }
1498                    return Err(err);
1499                }
1500            }
1501        }
1502    }
1503
1504    /// Download cover art, trying each candidate URL in order; `None` is fine.
1505    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1506        for url in clip.cover_candidates() {
1507            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1508                && (200..=299).contains(&response.status)
1509                && !response.body.is_empty()
1510            {
1511                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1512                // bytes so its write reuses them instead of fetching again (#89).
1513                // The lock guards only the insert, never the await above.
1514                if self.cover_wanted.contains(url) {
1515                    self.cover_cache
1516                        .lock()
1517                        .expect("cover cache mutex poisoned")
1518                        .insert(url.to_owned(), response.body.clone());
1519                }
1520                return Some(response.body);
1521            }
1522        }
1523        None
1524    }
1525
1526    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1527    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1528        self.fs.write_atomic(path, bytes).map_err(|err| {
1529            if err.is_out_of_space() {
1530                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1531            } else {
1532                permanent_fail(clip_id, format!("write failed: {err}"))
1533            }
1534        })?;
1535        match self.fs.metadata(path) {
1536            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1537            Some(stat) => Err(permanent_fail(
1538                clip_id,
1539                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1540            )),
1541            None => Ok(bytes.len() as u64),
1542        }
1543    }
1544
1545    /// Build the manifest entry for a freshly written file.
1546    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1547        match self.by_id.get(clip_id) {
1548            Some(d) => manifest_entry(d, size),
1549            None => ManifestEntry {
1550                path: path.to_owned(),
1551                format,
1552                size,
1553                ..ManifestEntry::default()
1554            },
1555        }
1556    }
1557
1558    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1559    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1560        let desired = self.by_id.get(clip_id).copied();
1561        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1562            if let Some(d) = desired {
1563                entry.meta_hash = d.meta_hash.clone();
1564                entry.art_hash = d.art_hash.clone();
1565                entry.preserve = preserve_for(d);
1566            }
1567            if let Some(size) = size {
1568                entry.size = size;
1569            }
1570        }
1571    }
1572
1573    /// Refresh only an entry's preserve marker from the current desired state.
1574    ///
1575    /// A clip can gain or lose copy/private protection with no file change, which
1576    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1577    /// persisted marker a faithful image of live protection, so the cross-run
1578    /// delete guard (SYNC-8) never reads it stale.
1579    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1580        if let Some(d) = self.by_id.get(clip_id).copied()
1581            && let Some(entry) = manifest.entries.get_mut(clip_id)
1582        {
1583            entry.preserve = preserve_for(d);
1584        }
1585    }
1586}
1587
1588/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1589fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1590    ManifestEntry {
1591        path: d.path.clone(),
1592        format: d.format,
1593        meta_hash: d.meta_hash.clone(),
1594        art_hash: d.art_hash.clone(),
1595        size,
1596        preserve: preserve_for(d),
1597        ..Default::default()
1598    }
1599}
1600
1601/// Whether a written entry must be preserved across runs: held by any copy
1602/// source, or private. The reconcile delete guard reads this marker later.
1603fn preserve_for(d: &Desired) -> bool {
1604    d.private || d.modes.contains(&SourceMode::Copy)
1605}
1606
1607/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1608fn classify_response(
1609    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1610) -> Result<Vec<u8>, FetchError> {
1611    let response = match result {
1612        Ok(response) => response,
1613        Err(err) => {
1614            return Err(FetchError::transient(
1615                format!("transport error: {err}"),
1616                None,
1617            ));
1618        }
1619    };
1620    match response.status {
1621        200..=299 => {
1622            if let Some(expected) = content_length(&response) {
1623                let actual = response.body.len() as u64;
1624                if actual != expected {
1625                    return Err(FetchError::transient(
1626                        format!("truncated download: {actual} of {expected} bytes"),
1627                        None,
1628                    ));
1629                }
1630            }
1631            Ok(response.body)
1632        }
1633        401 | 403 => Err(FetchError::transient(
1634            format!("download rejected: status {}", response.status),
1635            None,
1636        )),
1637        408 => Err(FetchError::transient("request timed out", None)),
1638        429 => Err(FetchError::transient(
1639            "rate limited",
1640            retry_after(&response),
1641        )),
1642        500..=599 => Err(FetchError::transient(
1643            format!("server error {}", response.status),
1644            None,
1645        )),
1646        status => Err(FetchError::permanent(format!(
1647            "download failed: status {status}"
1648        ))),
1649    }
1650}
1651
1652/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1653fn classify_core(id: &str, err: Error) -> Fail {
1654    let reason = err.to_string();
1655    match err {
1656        Error::Auth(_) => auth_fail(id, reason),
1657        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1658        Error::Api(_)
1659        | Error::NotFound(_)
1660        | Error::Tag(_)
1661        | Error::Config(_)
1662        | Error::Refused(_) => permanent_fail(id, reason),
1663    }
1664}
1665
1666/// The provider-reported body size from `Content-Length`, if present and valid.
1667fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1668    response.header("content-length")?.trim().parse().ok()
1669}
1670
1671#[cfg(test)]
1672mod tests {
1673    use super::*;
1674    use crate::ClerkAuth;
1675    use crate::http::HttpResponse;
1676    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1677
1678    fn clip(id: &str) -> Clip {
1679        Clip {
1680            id: id.to_owned(),
1681            title: "Song".to_owned(),
1682            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1683            ..Default::default()
1684        }
1685    }
1686
1687    fn art_clip(id: &str) -> Clip {
1688        Clip {
1689            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1690            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1691            ..clip(id)
1692        }
1693    }
1694
1695    fn ext(format: AudioFormat) -> &'static str {
1696        match format {
1697            AudioFormat::Mp3 => "mp3",
1698            AudioFormat::Flac => "flac",
1699            AudioFormat::Wav => "wav",
1700        }
1701    }
1702
1703    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1704        Desired {
1705            path: format!("{}.{}", clip.id, ext(format)),
1706            lineage: LineageContext::own_root(&clip),
1707            clip,
1708            format,
1709            meta_hash: "m".to_owned(),
1710            art_hash: "art".to_owned(),
1711            modes: vec![SourceMode::Mirror],
1712            trashed: false,
1713            private: false,
1714            artifacts: Vec::new(),
1715            stems: None,
1716        }
1717    }
1718
1719    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1720        ManifestEntry {
1721            path: path.to_owned(),
1722            format,
1723            meta_hash: "old".to_owned(),
1724            art_hash: "old-art".to_owned(),
1725            size: 8,
1726            preserve: false,
1727            ..Default::default()
1728        }
1729    }
1730
1731    #[allow(clippy::too_many_arguments)]
1732    fn run<G: Ffmpeg>(
1733        plan: &Plan,
1734        manifest: &mut Manifest,
1735        desired: &[Desired],
1736        http: &ScriptedHttp,
1737        fs: &MemFs,
1738        ffmpeg: &G,
1739        clock: &RecordingClock,
1740        opts: &ExecOptions,
1741    ) -> ExecOutcome {
1742        let mut albums = BTreeMap::new();
1743        run_with_albums(
1744            plan,
1745            manifest,
1746            &mut albums,
1747            desired,
1748            http,
1749            fs,
1750            ffmpeg,
1751            clock,
1752            opts,
1753        )
1754    }
1755
1756    #[allow(clippy::too_many_arguments)]
1757    fn run_with_albums<G: Ffmpeg>(
1758        plan: &Plan,
1759        manifest: &mut Manifest,
1760        albums: &mut BTreeMap<String, AlbumArt>,
1761        desired: &[Desired],
1762        http: &ScriptedHttp,
1763        fs: &MemFs,
1764        ffmpeg: &G,
1765        clock: &RecordingClock,
1766        opts: &ExecOptions,
1767    ) -> ExecOutcome {
1768        let mut playlists = BTreeMap::new();
1769        run_full(
1770            plan,
1771            manifest,
1772            albums,
1773            &mut playlists,
1774            desired,
1775            http,
1776            fs,
1777            ffmpeg,
1778            clock,
1779            opts,
1780        )
1781    }
1782
1783    #[allow(clippy::too_many_arguments)]
1784    fn run_full<G: Ffmpeg>(
1785        plan: &Plan,
1786        manifest: &mut Manifest,
1787        albums: &mut BTreeMap<String, AlbumArt>,
1788        playlists: &mut BTreeMap<String, PlaylistState>,
1789        desired: &[Desired],
1790        http: &ScriptedHttp,
1791        fs: &MemFs,
1792        ffmpeg: &G,
1793        clock: &RecordingClock,
1794        opts: &ExecOptions,
1795    ) -> ExecOutcome {
1796        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1797        let synced = HashMap::new();
1798        pollster::block_on(execute(
1799            plan,
1800            manifest,
1801            albums,
1802            playlists,
1803            desired,
1804            &synced,
1805            Ports {
1806                client: &mut client,
1807                http,
1808                fs,
1809                ffmpeg,
1810                clock,
1811            },
1812            opts,
1813        ))
1814    }
1815
1816    fn small_poll() -> ExecOptions {
1817        ExecOptions {
1818            max_retries: 3,
1819            wav_poll_attempts: 2,
1820            wav_poll_interval: Duration::from_secs(5),
1821            concurrency: 4,
1822            cover_webp: WebpEncodeSettings::default(),
1823        }
1824    }
1825
1826    // ── Download: MP3 ───────────────────────────────────────────────
1827
1828    #[test]
1829    fn download_mp3_writes_tagged_file_and_records_manifest() {
1830        let c = art_clip("a");
1831        let d = desired(c.clone(), AudioFormat::Mp3);
1832        let plan = Plan {
1833            actions: vec![Action::Download {
1834                clip: c.clone(),
1835                lineage: LineageContext::own_root(&c),
1836                path: d.path.clone(),
1837                format: AudioFormat::Mp3,
1838            }],
1839        };
1840        let http = ScriptedHttp::new()
1841            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1842            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1843        let fs = MemFs::new();
1844        let ffmpeg = StubFfmpeg::flac();
1845        let clock = RecordingClock::new();
1846        let mut manifest = Manifest::new();
1847
1848        let outcome = run(
1849            &plan,
1850            &mut manifest,
1851            &[d],
1852            &http,
1853            &fs,
1854            &ffmpeg,
1855            &clock,
1856            &ExecOptions::default(),
1857        );
1858
1859        assert_eq!(outcome.downloaded, 1);
1860        assert_eq!(outcome.failed(), 0);
1861        assert_eq!(outcome.status, RunStatus::Completed);
1862        let written = fs.read_file("a.mp3").unwrap();
1863        assert_eq!(&written[..3], b"ID3");
1864        assert!(written.ends_with(b"mp3-body"));
1865        let entry = manifest.get("a").unwrap();
1866        assert_eq!(entry.path, "a.mp3");
1867        assert_eq!(entry.format, AudioFormat::Mp3);
1868        assert_eq!(entry.meta_hash, "m");
1869        assert_eq!(entry.art_hash, "art");
1870        assert_eq!(entry.size, written.len() as u64);
1871        assert!(!entry.preserve);
1872    }
1873
1874    #[test]
1875    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
1876        // A clip whose alignment was fetched this run gets a word-level SYLT frame
1877        // and its plain lyric text embedded (USLT), end to end through execute.
1878        let c = art_clip("a");
1879        let d = desired(c.clone(), AudioFormat::Mp3);
1880        let plan = Plan {
1881            actions: vec![Action::Download {
1882                clip: c.clone(),
1883                lineage: LineageContext::own_root(&c),
1884                path: d.path.clone(),
1885                format: AudioFormat::Mp3,
1886            }],
1887        };
1888        let http = ScriptedHttp::new()
1889            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1890            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1891        let fs = MemFs::new();
1892        let ffmpeg = StubFfmpeg::flac();
1893        let clock = RecordingClock::new();
1894        let mut manifest = Manifest::new();
1895        let mut albums = BTreeMap::new();
1896        let mut playlists = BTreeMap::new();
1897        let mut synced = HashMap::new();
1898        synced.insert(
1899            "a".to_string(),
1900            AlignedLyrics::from_json(&serde_json::json!({
1901                "aligned_words": [],
1902                "aligned_lyrics": [
1903                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
1904                     "words": [
1905                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
1906                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
1907                     ]}
1908                ]
1909            })),
1910        );
1911        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1912        let outcome = pollster::block_on(execute(
1913            &plan,
1914            &mut manifest,
1915            &mut albums,
1916            &mut playlists,
1917            &[d],
1918            &synced,
1919            Ports {
1920                client: &mut client,
1921                http: &http,
1922                fs: &fs,
1923                ffmpeg: &ffmpeg,
1924                clock: &clock,
1925            },
1926            &ExecOptions::default(),
1927        ));
1928
1929        assert_eq!(outcome.downloaded, 1);
1930        let written = fs.read_file("a.mp3").unwrap();
1931        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1932        assert_eq!(
1933            tag.synchronised_lyrics().count(),
1934            1,
1935            "a SYLT frame is embedded"
1936        );
1937        // The plain lyric text is populated from the alignment for the USLT frame.
1938        assert_eq!(
1939            tag.lyrics().next().map(|frame| frame.text.as_str()),
1940            Some("hi there")
1941        );
1942    }
1943
1944    #[test]
1945    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
1946        // The synced map is empty when the feature is off (no alignment fetched),
1947        // so no SYLT frame and no lyric text are embedded.
1948        let c = art_clip("a");
1949        let d = desired(c.clone(), AudioFormat::Mp3);
1950        let plan = Plan {
1951            actions: vec![Action::Download {
1952                clip: c.clone(),
1953                lineage: LineageContext::own_root(&c),
1954                path: d.path.clone(),
1955                format: AudioFormat::Mp3,
1956            }],
1957        };
1958        let http = ScriptedHttp::new()
1959            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1960            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1961        let fs = MemFs::new();
1962        let ffmpeg = StubFfmpeg::flac();
1963        let clock = RecordingClock::new();
1964        let mut manifest = Manifest::new();
1965        let mut albums = BTreeMap::new();
1966        let mut playlists = BTreeMap::new();
1967        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1968        let outcome = pollster::block_on(execute(
1969            &plan,
1970            &mut manifest,
1971            &mut albums,
1972            &mut playlists,
1973            &[d],
1974            &HashMap::new(),
1975            Ports {
1976                client: &mut client,
1977                http: &http,
1978                fs: &fs,
1979                ffmpeg: &ffmpeg,
1980                clock: &clock,
1981            },
1982            &ExecOptions::default(),
1983        ));
1984        assert_eq!(outcome.downloaded, 1);
1985        let written = fs.read_file("a.mp3").unwrap();
1986        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1987        assert_eq!(tag.synchronised_lyrics().count(), 0);
1988        assert_eq!(tag.lyrics().count(), 0);
1989    }
1990
1991    #[test]
1992    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
1993        let mut c = clip("a");
1994        c.audio_url = String::new();
1995        let d = desired(c.clone(), AudioFormat::Mp3);
1996        let plan = Plan {
1997            actions: vec![Action::Download {
1998                clip: c.clone(),
1999                lineage: LineageContext::own_root(&c),
2000                path: d.path.clone(),
2001                format: AudioFormat::Mp3,
2002            }],
2003        };
2004        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
2005        let fs = MemFs::new();
2006        let mut manifest = Manifest::new();
2007        let outcome = run(
2008            &plan,
2009            &mut manifest,
2010            &[d],
2011            &http,
2012            &fs,
2013            &StubFfmpeg::flac(),
2014            &RecordingClock::new(),
2015            &ExecOptions::default(),
2016        );
2017        assert_eq!(outcome.downloaded, 1);
2018        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
2019    }
2020
2021    // ── Download: FLAC render + transcode ───────────────────────────
2022
2023    #[test]
2024    fn download_flac_renders_transcodes_and_records() {
2025        let c = clip("b");
2026        let d = desired(c.clone(), AudioFormat::Flac);
2027        let plan = Plan {
2028            actions: vec![Action::Download {
2029                clip: c.clone(),
2030                lineage: LineageContext::own_root(&c),
2031                path: d.path.clone(),
2032                format: AudioFormat::Flac,
2033            }],
2034        };
2035        let http = ScriptedHttp::new()
2036            .with_auth()
2037            .route(
2038                "/wav_file/",
2039                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
2040            )
2041            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
2042        let fs = MemFs::new();
2043        let clock = RecordingClock::new();
2044        let mut manifest = Manifest::new();
2045
2046        let outcome = run(
2047            &plan,
2048            &mut manifest,
2049            &[d],
2050            &http,
2051            &fs,
2052            &StubFfmpeg::flac(),
2053            &clock,
2054            &ExecOptions::default(),
2055        );
2056
2057        assert_eq!(outcome.downloaded, 1);
2058        assert_eq!(outcome.failed(), 0);
2059        let written = fs.read_file("b.flac").unwrap();
2060        assert_eq!(&written[..4], b"fLaC");
2061        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
2062        // The URL was ready immediately, so no render request and no polling.
2063        assert_eq!(http.count("/convert_wav/"), 0);
2064        assert!(clock.sleeps().is_empty());
2065    }
2066
2067    #[test]
2068    fn download_flac_requests_render_then_polls_until_ready() {
2069        let c = clip("c");
2070        let d = desired(c.clone(), AudioFormat::Flac);
2071        let plan = Plan {
2072            actions: vec![Action::Download {
2073                clip: c.clone(),
2074                lineage: LineageContext::own_root(&c),
2075                path: d.path.clone(),
2076                format: AudioFormat::Flac,
2077            }],
2078        };
2079        let http = ScriptedHttp::new()
2080            .with_auth()
2081            .route_seq(
2082                "/wav_file/",
2083                vec![
2084                    Reply::json("{}"),
2085                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2086                ],
2087            )
2088            .route("/convert_wav/", Reply::status(200))
2089            .route("c.wav", Reply::ok(b"wav".to_vec()));
2090        let clock = RecordingClock::new();
2091        let mut manifest = Manifest::new();
2092
2093        let outcome = run(
2094            &plan,
2095            &mut manifest,
2096            &[d],
2097            &http,
2098            &fs_new(),
2099            &StubFfmpeg::flac(),
2100            &clock,
2101            &small_poll(),
2102        );
2103
2104        assert_eq!(outcome.downloaded, 1);
2105        assert_eq!(http.count("/convert_wav/"), 1);
2106        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2107    }
2108
2109    #[test]
2110    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2111        let c = clip("d");
2112        let d = desired(c.clone(), AudioFormat::Flac);
2113        let plan = Plan {
2114            actions: vec![Action::Download {
2115                clip: c.clone(),
2116                lineage: LineageContext::own_root(&c),
2117                path: d.path.clone(),
2118                format: AudioFormat::Flac,
2119            }],
2120        };
2121        let http = ScriptedHttp::new()
2122            .with_auth()
2123            .route("/wav_file/", Reply::json("{}"))
2124            .route("/convert_wav/", Reply::status(200));
2125        let fs = MemFs::new();
2126        let clock = RecordingClock::new();
2127        let mut manifest = Manifest::new();
2128
2129        let outcome = run(
2130            &plan,
2131            &mut manifest,
2132            &[d],
2133            &http,
2134            &fs,
2135            &StubFfmpeg::flac(),
2136            &clock,
2137            &small_poll(),
2138        );
2139
2140        assert_eq!(outcome.downloaded, 0);
2141        assert_eq!(outcome.failed(), 1);
2142        assert_eq!(outcome.failures[0].clip_id, "d");
2143        assert_eq!(outcome.status, RunStatus::Completed);
2144        assert!(!fs.exists("d.flac"));
2145        assert_eq!(clock.sleeps().len(), 2);
2146    }
2147
2148    #[test]
2149    fn flac_transcode_failure_is_recorded_and_skipped() {
2150        let c = clip("t");
2151        let d = desired(c.clone(), AudioFormat::Flac);
2152        let plan = Plan {
2153            actions: vec![Action::Download {
2154                clip: c.clone(),
2155                lineage: LineageContext::own_root(&c),
2156                path: d.path.clone(),
2157                format: AudioFormat::Flac,
2158            }],
2159        };
2160        let http = ScriptedHttp::new()
2161            .with_auth()
2162            .route(
2163                "/wav_file/",
2164                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2165            )
2166            .route("t.wav", Reply::ok(b"wav".to_vec()));
2167        let fs = MemFs::new();
2168        let mut manifest = Manifest::new();
2169
2170        let outcome = run(
2171            &plan,
2172            &mut manifest,
2173            &[d],
2174            &http,
2175            &fs,
2176            &StubFfmpeg::failing(),
2177            &RecordingClock::new(),
2178            &ExecOptions::default(),
2179        );
2180
2181        assert_eq!(outcome.downloaded, 0);
2182        assert_eq!(outcome.failed(), 1);
2183        assert!(!fs.exists("t.flac"));
2184        assert!(manifest.get("t").is_none());
2185    }
2186
2187    // ── Cover fallback ──────────────────────────────────────────────
2188
2189    #[test]
2190    fn cover_falls_back_when_large_image_is_missing() {
2191        let c = art_clip("e");
2192        let d = desired(c.clone(), AudioFormat::Mp3);
2193        let plan = Plan {
2194            actions: vec![Action::Download {
2195                clip: c.clone(),
2196                lineage: LineageContext::own_root(&c),
2197                path: d.path.clone(),
2198                format: AudioFormat::Mp3,
2199            }],
2200        };
2201        let http = ScriptedHttp::new()
2202            .route("e.mp3", Reply::ok(b"body".to_vec()))
2203            .route("e/large.jpg", Reply::status(404))
2204            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2205        let fs = MemFs::new();
2206        let mut manifest = Manifest::new();
2207
2208        let outcome = run(
2209            &plan,
2210            &mut manifest,
2211            &[d],
2212            &http,
2213            &fs,
2214            &StubFfmpeg::flac(),
2215            &RecordingClock::new(),
2216            &ExecOptions::default(),
2217        );
2218
2219        assert_eq!(outcome.downloaded, 1);
2220        let calls = http.calls();
2221        let large = calls
2222            .iter()
2223            .position(|u| u.contains("e/large.jpg"))
2224            .unwrap();
2225        let small = calls
2226            .iter()
2227            .position(|u| u.contains("e/small.jpg"))
2228            .unwrap();
2229        assert!(large < small, "large art tried before small");
2230    }
2231
2232    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2233
2234    #[test]
2235    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2236        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2237        // fetched once and the bytes serve both.
2238        let c = art_clip("a");
2239        let d = desired(c.clone(), AudioFormat::Mp3);
2240        let plan = Plan {
2241            actions: vec![
2242                Action::Download {
2243                    clip: c.clone(),
2244                    lineage: LineageContext::own_root(&c),
2245                    path: d.path.clone(),
2246                    format: AudioFormat::Mp3,
2247                },
2248                Action::WriteArtifact {
2249                    kind: ArtifactKind::CoverJpg,
2250                    path: "a/cover.jpg".to_owned(),
2251                    source_url: c.selected_image_url().unwrap().to_owned(),
2252                    hash: "art".to_owned(),
2253                    owner_id: "a".to_owned(),
2254                    content: None,
2255                },
2256            ],
2257        };
2258        let http = ScriptedHttp::new()
2259            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2260            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2261        let fs = MemFs::new();
2262        let mut manifest = Manifest::new();
2263
2264        let outcome = run(
2265            &plan,
2266            &mut manifest,
2267            &[d],
2268            &http,
2269            &fs,
2270            &StubFfmpeg::flac(),
2271            &RecordingClock::new(),
2272            &ExecOptions::default(),
2273        );
2274
2275        assert_eq!(outcome.downloaded, 1);
2276        assert_eq!(outcome.artifacts_written, 1);
2277        assert_eq!(outcome.failed(), 0);
2278        // Fetched once, not twice.
2279        assert_eq!(http.count("a/large.jpg"), 1);
2280        // The sidecar carries the fetched bytes, and the audio was tagged.
2281        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2282        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2283    }
2284
2285    #[test]
2286    fn concurrent_downloads_reuse_each_clips_own_cover() {
2287        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2288        // (no cross-contamination) and each cover URL is fetched exactly once.
2289        let a = art_clip("a");
2290        let b = art_clip("b");
2291        let da = desired(a.clone(), AudioFormat::Mp3);
2292        let db = desired(b.clone(), AudioFormat::Mp3);
2293        let plan = Plan {
2294            actions: vec![
2295                Action::Download {
2296                    clip: a.clone(),
2297                    lineage: LineageContext::own_root(&a),
2298                    path: da.path.clone(),
2299                    format: AudioFormat::Mp3,
2300                },
2301                Action::WriteArtifact {
2302                    kind: ArtifactKind::CoverJpg,
2303                    path: "a/cover.jpg".to_owned(),
2304                    source_url: a.selected_image_url().unwrap().to_owned(),
2305                    hash: "art".to_owned(),
2306                    owner_id: "a".to_owned(),
2307                    content: None,
2308                },
2309                Action::Download {
2310                    clip: b.clone(),
2311                    lineage: LineageContext::own_root(&b),
2312                    path: db.path.clone(),
2313                    format: AudioFormat::Mp3,
2314                },
2315                Action::WriteArtifact {
2316                    kind: ArtifactKind::CoverJpg,
2317                    path: "b/cover.jpg".to_owned(),
2318                    source_url: b.selected_image_url().unwrap().to_owned(),
2319                    hash: "art".to_owned(),
2320                    owner_id: "b".to_owned(),
2321                    content: None,
2322                },
2323            ],
2324        };
2325        let http = ScriptedHttp::new()
2326            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2327            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2328            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2329            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2330        let fs = MemFs::new();
2331        let mut manifest = Manifest::new();
2332
2333        let outcome = run(
2334            &plan,
2335            &mut manifest,
2336            &[da, db],
2337            &http,
2338            &fs,
2339            &StubFfmpeg::flac(),
2340            &RecordingClock::new(),
2341            &small_poll(),
2342        );
2343
2344        assert_eq!(outcome.downloaded, 2);
2345        assert_eq!(outcome.artifacts_written, 2);
2346        assert_eq!(http.count("a/large.jpg"), 1);
2347        assert_eq!(http.count("b/large.jpg"), 1);
2348        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2349        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2350    }
2351
2352    #[test]
2353    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2354        // The large image 404s so the embed falls back to the small image; the
2355        // sidecar still wants the (dead) large URL and must NOT be handed the
2356        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2357        // the sidecar fetches the large URL itself (then fails on the 404).
2358        let c = art_clip("e");
2359        let d = desired(c.clone(), AudioFormat::Mp3);
2360        let plan = Plan {
2361            actions: vec![
2362                Action::Download {
2363                    clip: c.clone(),
2364                    lineage: LineageContext::own_root(&c),
2365                    path: d.path.clone(),
2366                    format: AudioFormat::Mp3,
2367                },
2368                Action::WriteArtifact {
2369                    kind: ArtifactKind::CoverJpg,
2370                    path: "e/cover.jpg".to_owned(),
2371                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2372                    hash: "art".to_owned(),
2373                    owner_id: "e".to_owned(),
2374                    content: None,
2375                },
2376            ],
2377        };
2378        let http = ScriptedHttp::new()
2379            .route("e.mp3", Reply::ok(b"body".to_vec()))
2380            .route("e/large.jpg", Reply::status(404))
2381            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2382        let fs = MemFs::new();
2383        let mut manifest = Manifest::new();
2384
2385        let outcome = run(
2386            &plan,
2387            &mut manifest,
2388            &[d],
2389            &http,
2390            &fs,
2391            &StubFfmpeg::flac(),
2392            &RecordingClock::new(),
2393            &ExecOptions::default(),
2394        );
2395
2396        assert_eq!(outcome.downloaded, 1);
2397        // The small image was fetched once (the embed fallback) and never reused
2398        // for the large-keyed sidecar; the sidecar went to the network itself.
2399        assert_eq!(http.count("e/small.jpg"), 1);
2400        assert!(
2401            http.count("e/large.jpg") >= 2,
2402            "sidecar refetched the large URL"
2403        );
2404        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2405        assert!(!fs.exists("e/cover.jpg"));
2406    }
2407
2408    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2409
2410    #[test]
2411    fn failed_write_leaves_the_prior_file_intact() {
2412        let c = clip("f");
2413        let d = desired(c.clone(), AudioFormat::Mp3);
2414        let plan = Plan {
2415            actions: vec![Action::Download {
2416                clip: c.clone(),
2417                lineage: LineageContext::own_root(&c),
2418                path: d.path.clone(),
2419                format: AudioFormat::Mp3,
2420            }],
2421        };
2422        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2423        let fs = MemFs::new()
2424            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2425            .fail_write("f.mp3");
2426        let mut manifest = Manifest::new();
2427
2428        let outcome = run(
2429            &plan,
2430            &mut manifest,
2431            &[d],
2432            &http,
2433            &fs,
2434            &StubFfmpeg::flac(),
2435            &RecordingClock::new(),
2436            &ExecOptions::default(),
2437        );
2438
2439        assert_eq!(outcome.downloaded, 0);
2440        assert_eq!(outcome.failed(), 1);
2441        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2442        assert!(manifest.get("f").is_none());
2443    }
2444
2445    #[test]
2446    fn size_mismatch_after_write_is_a_failure() {
2447        let c = clip("g");
2448        let d = desired(c.clone(), AudioFormat::Mp3);
2449        let plan = Plan {
2450            actions: vec![Action::Download {
2451                clip: c.clone(),
2452                lineage: LineageContext::own_root(&c),
2453                path: d.path.clone(),
2454                format: AudioFormat::Mp3,
2455            }],
2456        };
2457        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2458        let fs = MemFs::new().corrupt_write("g.mp3");
2459        let mut manifest = Manifest::new();
2460
2461        let outcome = run(
2462            &plan,
2463            &mut manifest,
2464            &[d],
2465            &http,
2466            &fs,
2467            &StubFfmpeg::flac(),
2468            &RecordingClock::new(),
2469            &ExecOptions::default(),
2470        );
2471
2472        assert_eq!(outcome.downloaded, 0);
2473        assert_eq!(outcome.failed(), 1);
2474        assert!(outcome.failures[0].reason.contains("expected"));
2475        assert!(manifest.get("g").is_none());
2476    }
2477
2478    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2479
2480    #[test]
2481    fn transient_failure_is_retried_then_skipped() {
2482        let c = clip("h");
2483        let d = desired(c.clone(), AudioFormat::Mp3);
2484        let plan = Plan {
2485            actions: vec![Action::Download {
2486                clip: c.clone(),
2487                lineage: LineageContext::own_root(&c),
2488                path: d.path.clone(),
2489                format: AudioFormat::Mp3,
2490            }],
2491        };
2492        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2493        let fs = MemFs::new();
2494        let clock = RecordingClock::new();
2495        let opts = ExecOptions {
2496            max_retries: 2,
2497            ..ExecOptions::default()
2498        };
2499        let mut manifest = Manifest::new();
2500
2501        let outcome = run(
2502            &plan,
2503            &mut manifest,
2504            &[d],
2505            &http,
2506            &fs,
2507            &StubFfmpeg::flac(),
2508            &clock,
2509            &opts,
2510        );
2511
2512        assert_eq!(outcome.downloaded, 0);
2513        assert_eq!(outcome.failed(), 1);
2514        assert_eq!(http.count("h.mp3"), 3);
2515        assert_eq!(clock.sleeps().len(), 2);
2516    }
2517
2518    #[test]
2519    fn truncated_download_is_retried_then_succeeds() {
2520        let c = clip("i");
2521        let d = desired(c.clone(), AudioFormat::Mp3);
2522        let plan = Plan {
2523            actions: vec![Action::Download {
2524                clip: c.clone(),
2525                lineage: LineageContext::own_root(&c),
2526                path: d.path.clone(),
2527                format: AudioFormat::Mp3,
2528            }],
2529        };
2530        let http = ScriptedHttp::new().route_seq(
2531            "i.mp3",
2532            vec![
2533                Reply::ok(b"short".to_vec()).with_content_length(999),
2534                Reply::ok(b"good-body".to_vec()),
2535            ],
2536        );
2537        let fs = MemFs::new();
2538        let clock = RecordingClock::new();
2539        let mut manifest = Manifest::new();
2540
2541        let outcome = run(
2542            &plan,
2543            &mut manifest,
2544            &[d],
2545            &http,
2546            &fs,
2547            &StubFfmpeg::flac(),
2548            &clock,
2549            &ExecOptions::default(),
2550        );
2551
2552        assert_eq!(outcome.downloaded, 1);
2553        assert_eq!(http.count("i.mp3"), 2);
2554        assert_eq!(clock.sleeps().len(), 1);
2555    }
2556
2557    #[test]
2558    fn rate_limit_backs_off_using_retry_after() {
2559        let c = clip("j");
2560        let d = desired(c.clone(), AudioFormat::Mp3);
2561        let plan = Plan {
2562            actions: vec![Action::Download {
2563                clip: c.clone(),
2564                lineage: LineageContext::own_root(&c),
2565                path: d.path.clone(),
2566                format: AudioFormat::Mp3,
2567            }],
2568        };
2569        let http = ScriptedHttp::new().route_seq(
2570            "j.mp3",
2571            vec![
2572                Reply::status(429).with_retry_after(7),
2573                Reply::ok(b"body".to_vec()),
2574            ],
2575        );
2576        let fs = MemFs::new();
2577        let clock = RecordingClock::new();
2578        let mut manifest = Manifest::new();
2579
2580        let outcome = run(
2581            &plan,
2582            &mut manifest,
2583            &[d],
2584            &http,
2585            &fs,
2586            &StubFfmpeg::flac(),
2587            &clock,
2588            &ExecOptions::default(),
2589        );
2590
2591        assert_eq!(outcome.downloaded, 1);
2592        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2593    }
2594
2595    #[test]
2596    fn auth_failure_aborts_the_run() {
2597        let c1 = clip("k1");
2598        let c2 = clip("k2");
2599        let d1 = desired(c1.clone(), AudioFormat::Flac);
2600        let d2 = desired(c2.clone(), AudioFormat::Flac);
2601        let plan = Plan {
2602            actions: vec![
2603                Action::Download {
2604                    clip: c1.clone(),
2605                    lineage: LineageContext::own_root(&c1),
2606                    path: d1.path.clone(),
2607                    format: AudioFormat::Flac,
2608                },
2609                Action::Download {
2610                    clip: c2.clone(),
2611                    lineage: LineageContext::own_root(&c2),
2612                    path: d2.path.clone(),
2613                    format: AudioFormat::Flac,
2614                },
2615            ],
2616        };
2617        // The authenticated WAV-render endpoint rejects auth even after a JWT
2618        // refresh: that is a bad token, so the whole run aborts rather than
2619        // hammering every clip. A CDN media rejection, by contrast, does not.
2620        let http = ScriptedHttp::new()
2621            .with_auth()
2622            .route("/wav_file/", Reply::status(401));
2623        let fs = MemFs::new();
2624        let mut manifest = Manifest::new();
2625
2626        let outcome = run(
2627            &plan,
2628            &mut manifest,
2629            &[d1, d2],
2630            &http,
2631            &fs,
2632            &StubFfmpeg::flac(),
2633            &RecordingClock::new(),
2634            &small_poll(),
2635        );
2636
2637        assert_eq!(outcome.status, RunStatus::AuthAborted);
2638        assert_eq!(outcome.failed(), 1);
2639        assert_eq!(outcome.failures[0].clip_id, "k1");
2640        assert_eq!(outcome.downloaded, 0);
2641    }
2642
2643    // ── Disk-full aborts the run (issue #17) ────────────────────────
2644
2645    #[test]
2646    fn disk_full_primary_write_aborts_the_run() {
2647        // Two MP3 downloads; the first write is out of space. That is systemic,
2648        // so the run aborts before the second is even attempted: exactly one
2649        // failure is recorded and its reason names the disk-full cause.
2650        let c1 = clip("d1");
2651        let c2 = clip("d2");
2652        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2653        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2654        let plan = Plan {
2655            actions: vec![
2656                Action::Download {
2657                    clip: c1.clone(),
2658                    lineage: LineageContext::own_root(&c1),
2659                    path: d1.path.clone(),
2660                    format: AudioFormat::Mp3,
2661                },
2662                Action::Download {
2663                    clip: c2.clone(),
2664                    lineage: LineageContext::own_root(&c2),
2665                    path: d2.path.clone(),
2666                    format: AudioFormat::Mp3,
2667                },
2668            ],
2669        };
2670        let http = ScriptedHttp::new()
2671            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2672            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2673        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2674        let mut manifest = Manifest::new();
2675
2676        let outcome = run(
2677            &plan,
2678            &mut manifest,
2679            &[d1, d2],
2680            &http,
2681            &fs,
2682            &StubFfmpeg::flac(),
2683            &RecordingClock::new(),
2684            &ExecOptions::default(),
2685        );
2686
2687        assert_eq!(outcome.status, RunStatus::DiskFull);
2688        assert_eq!(outcome.failed(), 1);
2689        assert_eq!(outcome.failures[0].clip_id, "d1");
2690        assert!(outcome.failures[0].reason.contains("disk full"));
2691        assert_eq!(outcome.downloaded, 0);
2692        // The second clip was never fetched: the run aborted first.
2693        assert_eq!(http.count("d2.mp3"), 0);
2694        assert!(!fs.exists("d2.mp3"));
2695    }
2696
2697    #[test]
2698    fn disk_full_flac_transcode_aborts_the_run() {
2699        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2700        // there is nowhere to stage the transcode, so the run aborts.
2701        let c1 = clip("d1");
2702        let c2 = clip("d2");
2703        let d1 = desired(c1.clone(), AudioFormat::Flac);
2704        let d2 = desired(c2.clone(), AudioFormat::Flac);
2705        let plan = Plan {
2706            actions: vec![
2707                Action::Download {
2708                    clip: c1.clone(),
2709                    lineage: LineageContext::own_root(&c1),
2710                    path: d1.path.clone(),
2711                    format: AudioFormat::Flac,
2712                },
2713                Action::Download {
2714                    clip: c2.clone(),
2715                    lineage: LineageContext::own_root(&c2),
2716                    path: d2.path.clone(),
2717                    format: AudioFormat::Flac,
2718                },
2719            ],
2720        };
2721        let http = ScriptedHttp::new()
2722            .with_auth()
2723            .route(
2724                "/wav_file/",
2725                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2726            )
2727            .route(".wav", Reply::ok(b"wav".to_vec()));
2728        let fs = MemFs::new();
2729        let mut manifest = Manifest::new();
2730
2731        let outcome = run(
2732            &plan,
2733            &mut manifest,
2734            &[d1, d2],
2735            &http,
2736            &fs,
2737            &StubFfmpeg::out_of_space(),
2738            &RecordingClock::new(),
2739            &ExecOptions::default(),
2740        );
2741
2742        assert_eq!(outcome.status, RunStatus::DiskFull);
2743        assert_eq!(outcome.failed(), 1);
2744        assert_eq!(outcome.failures[0].clip_id, "d1");
2745        assert!(outcome.failures[0].reason.contains("disk full"));
2746        assert_eq!(outcome.downloaded, 0);
2747    }
2748
2749    #[test]
2750    fn disk_full_artifact_write_aborts_the_run() {
2751        // A sidecar write (not a primary download) also aborts on a full disk:
2752        // the owning audio is present, the cover fetch succeeds, but the sidecar
2753        // cannot be written.
2754        let mut manifest = Manifest::new();
2755        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2756        let plan = Plan {
2757            actions: vec![Action::WriteArtifact {
2758                kind: ArtifactKind::CoverJpg,
2759                path: "a/cover.jpg".to_owned(),
2760                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2761                hash: "h1".to_owned(),
2762                owner_id: "a".to_owned(),
2763                content: None,
2764            }],
2765        };
2766        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2767        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
2768
2769        let outcome = run(
2770            &plan,
2771            &mut manifest,
2772            &[],
2773            &http,
2774            &fs,
2775            &StubFfmpeg::flac(),
2776            &RecordingClock::new(),
2777            &ExecOptions::default(),
2778        );
2779
2780        assert_eq!(outcome.status, RunStatus::DiskFull);
2781        assert_eq!(outcome.failed(), 1);
2782        assert!(outcome.failures[0].reason.contains("disk full"));
2783        assert_eq!(outcome.artifacts_written, 0);
2784        // The sidecar slot was never recorded: the write failed before it.
2785        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
2786    }
2787
2788    #[test]
2789    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
2790        // write_verify fails before any manifest insert, so a re-download that
2791        // hits a full disk leaves the prior entry (and file) exactly as it was.
2792        let c = clip("m");
2793        let d = desired(c.clone(), AudioFormat::Mp3);
2794        let plan = Plan {
2795            actions: vec![Action::Download {
2796                clip: c.clone(),
2797                lineage: LineageContext::own_root(&c),
2798                path: d.path.clone(),
2799                format: AudioFormat::Mp3,
2800            }],
2801        };
2802        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
2803        let fs = MemFs::new()
2804            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
2805            .fail_write_out_of_space("m.mp3");
2806        let mut manifest = Manifest::new();
2807        let before = entry("m.mp3", AudioFormat::Mp3);
2808        manifest.insert("m", before.clone());
2809
2810        let outcome = run(
2811            &plan,
2812            &mut manifest,
2813            &[d],
2814            &http,
2815            &fs,
2816            &StubFfmpeg::flac(),
2817            &RecordingClock::new(),
2818            &ExecOptions::default(),
2819        );
2820
2821        assert_eq!(outcome.status, RunStatus::DiskFull);
2822        assert_eq!(manifest.get("m"), Some(&before));
2823        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
2824    }
2825
2826    #[test]
2827    fn cdn_download_rejection_skips_the_clip_without_aborting() {
2828        let c1 = clip("k1");
2829        let c2 = clip("k2");
2830        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2831        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2832        let plan = Plan {
2833            actions: vec![
2834                Action::Download {
2835                    clip: c1.clone(),
2836                    lineage: LineageContext::own_root(&c1),
2837                    path: d1.path.clone(),
2838                    format: AudioFormat::Mp3,
2839                },
2840                Action::Download {
2841                    clip: c2.clone(),
2842                    lineage: LineageContext::own_root(&c2),
2843                    path: d2.path.clone(),
2844                    format: AudioFormat::Mp3,
2845                },
2846            ],
2847        };
2848        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
2849        // rejection (often transient), not a bad token: the clip is retried
2850        // then recorded and skipped, and the run carries on to the rest.
2851        let http = ScriptedHttp::new()
2852            .route("k1.mp3", Reply::status(403))
2853            .route("k2.mp3", Reply::ok(b"body".to_vec()));
2854        let fs = MemFs::new();
2855        let mut manifest = Manifest::new();
2856
2857        let outcome = run(
2858            &plan,
2859            &mut manifest,
2860            &[d1, d2],
2861            &http,
2862            &fs,
2863            &StubFfmpeg::flac(),
2864            &RecordingClock::new(),
2865            &ExecOptions::default(),
2866        );
2867
2868        assert_ne!(outcome.status, RunStatus::AuthAborted);
2869        assert_eq!(outcome.downloaded, 1);
2870        assert_eq!(outcome.failed(), 1);
2871        assert_eq!(outcome.failures[0].clip_id, "k1");
2872    }
2873
2874    #[test]
2875    fn one_clip_failure_does_not_abort_the_run() {
2876        let c1 = clip("l1");
2877        let c2 = clip("l2");
2878        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2879        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2880        let plan = Plan {
2881            actions: vec![
2882                Action::Download {
2883                    clip: c1.clone(),
2884                    lineage: LineageContext::own_root(&c1),
2885                    path: d1.path.clone(),
2886                    format: AudioFormat::Mp3,
2887                },
2888                Action::Download {
2889                    clip: c2.clone(),
2890                    lineage: LineageContext::own_root(&c2),
2891                    path: d2.path.clone(),
2892                    format: AudioFormat::Mp3,
2893                },
2894            ],
2895        };
2896        let http = ScriptedHttp::new()
2897            .route("l1.mp3", Reply::status(404))
2898            .route("l2.mp3", Reply::ok(b"body".to_vec()));
2899        let fs = MemFs::new();
2900        let mut manifest = Manifest::new();
2901
2902        let outcome = run(
2903            &plan,
2904            &mut manifest,
2905            &[d1, d2],
2906            &http,
2907            &fs,
2908            &StubFfmpeg::flac(),
2909            &RecordingClock::new(),
2910            &ExecOptions::default(),
2911        );
2912
2913        assert_eq!(outcome.status, RunStatus::Completed);
2914        assert_eq!(outcome.downloaded, 1);
2915        assert_eq!(outcome.failed(), 1);
2916        assert_eq!(outcome.failures[0].clip_id, "l1");
2917        assert!(fs.exists("l2.mp3"));
2918        assert!(manifest.get("l2").is_some());
2919        assert!(manifest.get("l1").is_none());
2920    }
2921
2922    // ── preserve marker (SYNC-8) ────────────────────────────────────
2923
2924    #[test]
2925    fn preserve_is_set_for_copy_held_and_private_clips() {
2926        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
2927        mirror.modes = vec![SourceMode::Mirror];
2928        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
2929        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
2930        let mut private = desired(clip("m3"), AudioFormat::Mp3);
2931        private.private = true;
2932
2933        let plan = Plan {
2934            actions: vec![
2935                Action::Download {
2936                    clip: mirror.clip.clone(),
2937                    lineage: LineageContext::own_root(&mirror.clip),
2938                    path: mirror.path.clone(),
2939                    format: AudioFormat::Mp3,
2940                },
2941                Action::Download {
2942                    clip: copy_held.clip.clone(),
2943                    lineage: LineageContext::own_root(&copy_held.clip),
2944                    path: copy_held.path.clone(),
2945                    format: AudioFormat::Mp3,
2946                },
2947                Action::Download {
2948                    clip: private.clip.clone(),
2949                    lineage: LineageContext::own_root(&private.clip),
2950                    path: private.path.clone(),
2951                    format: AudioFormat::Mp3,
2952                },
2953            ],
2954        };
2955        let http = ScriptedHttp::new()
2956            .route("m1.mp3", Reply::ok(b"a".to_vec()))
2957            .route("m2.mp3", Reply::ok(b"b".to_vec()))
2958            .route("m3.mp3", Reply::ok(b"c".to_vec()));
2959        let fs = MemFs::new();
2960        let mut manifest = Manifest::new();
2961
2962        let outcome = run(
2963            &plan,
2964            &mut manifest,
2965            &[mirror, copy_held, private],
2966            &http,
2967            &fs,
2968            &StubFfmpeg::flac(),
2969            &RecordingClock::new(),
2970            &ExecOptions::default(),
2971        );
2972
2973        assert_eq!(outcome.downloaded, 3);
2974        assert!(!manifest.get("m1").unwrap().preserve);
2975        assert!(manifest.get("m2").unwrap().preserve);
2976        assert!(manifest.get("m3").unwrap().preserve);
2977    }
2978
2979    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
2980
2981    #[test]
2982    fn reformat_writes_new_format_and_removes_old_file() {
2983        let c = clip("n");
2984        let d = desired(c.clone(), AudioFormat::Mp3);
2985        let plan = Plan {
2986            actions: vec![Action::Reformat {
2987                clip: c.clone(),
2988                path: "n.mp3".to_owned(),
2989                from_path: "n.flac".to_owned(),
2990                from: AudioFormat::Flac,
2991                to: AudioFormat::Mp3,
2992            }],
2993        };
2994        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
2995        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
2996        let mut manifest = Manifest::new();
2997        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
2998
2999        let outcome = run(
3000            &plan,
3001            &mut manifest,
3002            &[d],
3003            &http,
3004            &fs,
3005            &StubFfmpeg::flac(),
3006            &RecordingClock::new(),
3007            &ExecOptions::default(),
3008        );
3009
3010        assert_eq!(outcome.reformatted, 1);
3011        assert!(fs.exists("n.mp3"));
3012        assert!(!fs.exists("n.flac"));
3013        let updated = manifest.get("n").unwrap();
3014        assert_eq!(updated.path, "n.mp3");
3015        assert_eq!(updated.format, AudioFormat::Mp3);
3016        assert_eq!(updated.meta_hash, "m");
3017    }
3018
3019    #[test]
3020    fn retag_rewrites_file_and_updates_hashes() {
3021        let c = clip("o");
3022        let mut d = desired(c.clone(), AudioFormat::Mp3);
3023        d.meta_hash = "new".to_owned();
3024        d.art_hash = "new-art".to_owned();
3025        let existing = tag_mp3(
3026            b"audio",
3027            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
3028            None,
3029            None,
3030        )
3031        .unwrap();
3032        let fs = MemFs::new().with_file("o.mp3", existing.clone());
3033        let mut manifest = Manifest::new();
3034        let mut start = entry("o.mp3", AudioFormat::Mp3);
3035        start.size = existing.len() as u64;
3036        manifest.insert("o", start);
3037        let plan = Plan {
3038            actions: vec![Action::Retag {
3039                clip: c.clone(),
3040                lineage: LineageContext::own_root(&c),
3041                path: "o.mp3".to_owned(),
3042            }],
3043        };
3044
3045        let outcome = run(
3046            &plan,
3047            &mut manifest,
3048            &[d],
3049            &ScriptedHttp::new(),
3050            &fs,
3051            &StubFfmpeg::flac(),
3052            &RecordingClock::new(),
3053            &ExecOptions::default(),
3054        );
3055
3056        assert_eq!(outcome.retagged, 1);
3057        let updated = manifest.get("o").unwrap();
3058        assert_eq!(updated.meta_hash, "new");
3059        assert_eq!(updated.art_hash, "new-art");
3060        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
3061    }
3062
3063    #[test]
3064    fn rename_moves_file_and_updates_manifest_path() {
3065        let c = clip("p");
3066        let mut d = desired(c.clone(), AudioFormat::Mp3);
3067        d.path = "new/p.mp3".to_owned();
3068        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3069        let mut manifest = Manifest::new();
3070        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3071        let plan = Plan {
3072            actions: vec![Action::Rename {
3073                from: "old/p.mp3".to_owned(),
3074                to: "new/p.mp3".to_owned(),
3075            }],
3076        };
3077
3078        let outcome = run(
3079            &plan,
3080            &mut manifest,
3081            &[d],
3082            &ScriptedHttp::new(),
3083            &fs,
3084            &StubFfmpeg::flac(),
3085            &RecordingClock::new(),
3086            &ExecOptions::default(),
3087        );
3088
3089        assert_eq!(outcome.renamed, 1);
3090        assert!(fs.exists("new/p.mp3"));
3091        assert!(!fs.exists("old/p.mp3"));
3092        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3093    }
3094
3095    #[test]
3096    fn disk_full_rename_aborts_the_run() {
3097        // A move onto a full disk is systemic like a full-disk write: the run
3098        // aborts with DiskFull and the source file is left untouched.
3099        let c = clip("p");
3100        let mut d = desired(c.clone(), AudioFormat::Mp3);
3101        d.path = "new/p.mp3".to_owned();
3102        let fs = MemFs::new()
3103            .with_file("old/p.mp3", b"DATA".to_vec())
3104            .fail_rename_out_of_space("new/p.mp3");
3105        let mut manifest = Manifest::new();
3106        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3107        let plan = Plan {
3108            actions: vec![Action::Rename {
3109                from: "old/p.mp3".to_owned(),
3110                to: "new/p.mp3".to_owned(),
3111            }],
3112        };
3113
3114        let outcome = run(
3115            &plan,
3116            &mut manifest,
3117            &[d],
3118            &ScriptedHttp::new(),
3119            &fs,
3120            &StubFfmpeg::flac(),
3121            &RecordingClock::new(),
3122            &ExecOptions::default(),
3123        );
3124
3125        assert_eq!(outcome.status, RunStatus::DiskFull);
3126        assert_eq!(outcome.renamed, 0);
3127        assert_eq!(outcome.failed(), 1);
3128        assert!(outcome.failures[0].reason.contains("disk full"));
3129        // The source is untouched: the move never happened.
3130        assert!(fs.exists("old/p.mp3"));
3131        assert!(!fs.exists("new/p.mp3"));
3132        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3133    }
3134
3135    #[test]
3136    fn delete_removes_file_and_manifest_entry() {
3137        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3138        let mut manifest = Manifest::new();
3139        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3140        let plan = Plan {
3141            actions: vec![Action::Delete {
3142                path: "q.mp3".to_owned(),
3143                clip_id: "q".to_owned(),
3144            }],
3145        };
3146
3147        let outcome = run(
3148            &plan,
3149            &mut manifest,
3150            &[],
3151            &ScriptedHttp::new(),
3152            &fs,
3153            &StubFfmpeg::flac(),
3154            &RecordingClock::new(),
3155            &ExecOptions::default(),
3156        );
3157
3158        assert_eq!(outcome.deleted, 1);
3159        assert!(!fs.exists("q.mp3"));
3160        assert!(manifest.get("q").is_none());
3161    }
3162
3163    #[test]
3164    fn failed_delete_keeps_the_manifest_entry() {
3165        let fs = MemFs::new()
3166            .with_file("s.mp3", b"DATA".to_vec())
3167            .fail_remove("s.mp3");
3168        let mut manifest = Manifest::new();
3169        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3170        let plan = Plan {
3171            actions: vec![Action::Delete {
3172                path: "s.mp3".to_owned(),
3173                clip_id: "s".to_owned(),
3174            }],
3175        };
3176
3177        let outcome = run(
3178            &plan,
3179            &mut manifest,
3180            &[],
3181            &ScriptedHttp::new(),
3182            &fs,
3183            &StubFfmpeg::flac(),
3184            &RecordingClock::new(),
3185            &ExecOptions::default(),
3186        );
3187
3188        assert_eq!(outcome.deleted, 0);
3189        assert_eq!(outcome.failed(), 1);
3190        assert!(manifest.get("s").is_some());
3191        assert!(fs.exists("s.mp3"));
3192    }
3193
3194    #[test]
3195    fn skip_is_a_noop() {
3196        let mut manifest = Manifest::new();
3197        let plan = Plan {
3198            actions: vec![Action::Skip {
3199                clip_id: "r".to_owned(),
3200            }],
3201        };
3202        let outcome = run(
3203            &plan,
3204            &mut manifest,
3205            &[],
3206            &ScriptedHttp::new(),
3207            &MemFs::new(),
3208            &StubFfmpeg::flac(),
3209            &RecordingClock::new(),
3210            &ExecOptions::default(),
3211        );
3212        assert_eq!(outcome.skipped, 1);
3213        assert_eq!(outcome.failed(), 0);
3214    }
3215
3216    // ── Pure helpers ────────────────────────────────────────────────
3217
3218    #[test]
3219    fn header_helpers_parse_or_ignore() {
3220        let resp = HttpResponse {
3221            status: 200,
3222            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3223            body: Vec::new(),
3224        };
3225        assert_eq!(content_length(&resp), Some(42));
3226
3227        let bare = HttpResponse {
3228            status: 200,
3229            headers: Vec::new(),
3230            body: Vec::new(),
3231        };
3232        assert_eq!(content_length(&bare), None);
3233    }
3234
3235    #[test]
3236    fn preserve_rule_covers_copy_and_private() {
3237        let base = desired(clip("x"), AudioFormat::Mp3);
3238        assert!(!preserve_for(&base));
3239        let mut copy_held = base.clone();
3240        copy_held.modes = vec![SourceMode::Copy];
3241        assert!(preserve_for(&copy_held));
3242        let mut private = base.clone();
3243        private.private = true;
3244        assert!(preserve_for(&private));
3245    }
3246
3247    fn fs_new() -> MemFs {
3248        MemFs::new()
3249    }
3250
3251    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3252
3253    #[test]
3254    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3255        let c = clip("s1");
3256        let mut d = desired(c.clone(), AudioFormat::Mp3);
3257        d.modes = vec![SourceMode::Copy];
3258        let plan = Plan {
3259            actions: vec![Action::Skip {
3260                clip_id: "s1".to_owned(),
3261            }],
3262        };
3263        let mut manifest = Manifest::new();
3264        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3265        assert!(!manifest.get("s1").unwrap().preserve);
3266
3267        let outcome = run(
3268            &plan,
3269            &mut manifest,
3270            &[d],
3271            &ScriptedHttp::new(),
3272            &fs_new(),
3273            &StubFfmpeg::flac(),
3274            &RecordingClock::new(),
3275            &ExecOptions::default(),
3276        );
3277
3278        assert_eq!(outcome.skipped, 1);
3279        assert!(
3280            manifest.get("s1").unwrap().preserve,
3281            "a copy-held skip must mark the entry preserved"
3282        );
3283    }
3284
3285    #[test]
3286    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3287        let c = clip("s2");
3288        let d = desired(c.clone(), AudioFormat::Mp3);
3289        let plan = Plan {
3290            actions: vec![Action::Skip {
3291                clip_id: "s2".to_owned(),
3292            }],
3293        };
3294        let mut manifest = Manifest::new();
3295        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3296        stale.preserve = true;
3297        manifest.insert("s2".to_owned(), stale);
3298
3299        run(
3300            &plan,
3301            &mut manifest,
3302            &[d],
3303            &ScriptedHttp::new(),
3304            &fs_new(),
3305            &StubFfmpeg::flac(),
3306            &RecordingClock::new(),
3307            &ExecOptions::default(),
3308        );
3309
3310        assert!(
3311            !manifest.get("s2").unwrap().preserve,
3312            "a mirror-only skip must clear a stale preserve marker"
3313        );
3314    }
3315
3316    #[test]
3317    fn flac_render_retries_a_rate_limited_wav_lookup() {
3318        let c = clip("rl");
3319        let d = desired(c.clone(), AudioFormat::Flac);
3320        let plan = Plan {
3321            actions: vec![Action::Download {
3322                clip: c.clone(),
3323                lineage: LineageContext::own_root(&c),
3324                path: d.path.clone(),
3325                format: AudioFormat::Flac,
3326            }],
3327        };
3328        let http = ScriptedHttp::new()
3329            .with_auth()
3330            .route_seq(
3331                "/wav_file/",
3332                vec![
3333                    Reply::status(429),
3334                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3335                ],
3336            )
3337            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3338        let clock = RecordingClock::new();
3339        let mut manifest = Manifest::new();
3340
3341        let outcome = run(
3342            &plan,
3343            &mut manifest,
3344            &[d],
3345            &http,
3346            &fs_new(),
3347            &StubFfmpeg::flac(),
3348            &clock,
3349            &small_poll(),
3350        );
3351
3352        assert_eq!(outcome.downloaded, 1);
3353        assert_eq!(outcome.failed(), 0);
3354        // The render was ready on retry, so no fresh convert_wav was needed.
3355        assert_eq!(http.count("/convert_wav/"), 0);
3356        // One transient backoff (1s base), not the 5s poll interval.
3357        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3358    }
3359
3360    // ── Phase 6: artifact actions ───────────────────────────────────
3361
3362    #[test]
3363    fn write_artifact_fetches_writes_and_updates_manifest() {
3364        // The owning entry exists (its audio was kept this run); WriteArtifact
3365        // fetches the source, writes the sidecar, and records it on the entry.
3366        let mut manifest = Manifest::new();
3367        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3368        let plan = Plan {
3369            actions: vec![Action::WriteArtifact {
3370                kind: ArtifactKind::CoverJpg,
3371                path: "a/cover.jpg".to_owned(),
3372                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3373                hash: "h1".to_owned(),
3374                owner_id: "a".to_owned(),
3375                content: None,
3376            }],
3377        };
3378        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3379        let fs = MemFs::new();
3380
3381        let outcome = run(
3382            &plan,
3383            &mut manifest,
3384            &[],
3385            &http,
3386            &fs,
3387            &StubFfmpeg::flac(),
3388            &RecordingClock::new(),
3389            &ExecOptions::default(),
3390        );
3391
3392        assert_eq!(outcome.artifacts_written, 1);
3393        assert_eq!(outcome.failed(), 0);
3394        assert_eq!(outcome.status, RunStatus::Completed);
3395        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3396        assert_eq!(
3397            manifest.get("a").unwrap().cover_jpg,
3398            Some(ArtifactState {
3399                path: "a/cover.jpg".to_owned(),
3400                hash: "h1".to_owned(),
3401            })
3402        );
3403    }
3404
3405    #[test]
3406    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3407        // A generated text sidecar carries its body inline, so it is written
3408        // verbatim with NO HTTP fetch and the details slot records its state.
3409        let mut manifest = Manifest::new();
3410        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3411        let plan = Plan {
3412            actions: vec![Action::WriteArtifact {
3413                kind: ArtifactKind::DetailsTxt,
3414                path: "a.details.txt".to_owned(),
3415                source_url: String::new(),
3416                hash: "dh".to_owned(),
3417                owner_id: "a".to_owned(),
3418                content: Some("Title: A\n".to_owned()),
3419            }],
3420        };
3421        // An empty HTTP script: any fetch would fail, proving none happens.
3422        let http = ScriptedHttp::new();
3423        let fs = MemFs::new();
3424
3425        let outcome = run(
3426            &plan,
3427            &mut manifest,
3428            &[],
3429            &http,
3430            &fs,
3431            &StubFfmpeg::flac(),
3432            &RecordingClock::new(),
3433            &ExecOptions::default(),
3434        );
3435
3436        assert_eq!(outcome.artifacts_written, 1);
3437        assert_eq!(outcome.failed(), 0);
3438        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3439        assert_eq!(
3440            manifest.get("a").unwrap().details_txt,
3441            Some(ArtifactState {
3442                path: "a.details.txt".to_owned(),
3443                hash: "dh".to_owned(),
3444            })
3445        );
3446    }
3447
3448    #[test]
3449    fn write_lyrics_sidecar_relocation_removes_old_file() {
3450        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3451        // the executor writes the new file and prunes the stale one.
3452        let mut manifest = Manifest::new();
3453        let mut e = entry("old/a.flac", AudioFormat::Flac);
3454        e.lyrics_txt = Some(ArtifactState {
3455            path: "old/a.lyrics.txt".to_owned(),
3456            hash: "lh".to_owned(),
3457        });
3458        manifest.insert("a", e);
3459        let fs = MemFs::new()
3460            .with_file("old/a.flac", b"AUDIO".to_vec())
3461            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3462        let plan = Plan {
3463            actions: vec![Action::WriteArtifact {
3464                kind: ArtifactKind::LyricsTxt,
3465                path: "new/a.lyrics.txt".to_owned(),
3466                source_url: String::new(),
3467                hash: "lh".to_owned(),
3468                owner_id: "a".to_owned(),
3469                content: Some("new words\n".to_owned()),
3470            }],
3471        };
3472
3473        let outcome = run(
3474            &plan,
3475            &mut manifest,
3476            &[],
3477            &ScriptedHttp::new(),
3478            &fs,
3479            &StubFfmpeg::flac(),
3480            &RecordingClock::new(),
3481            &ExecOptions::default(),
3482        );
3483
3484        assert_eq!(outcome.failed(), 0);
3485        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3486        assert!(!fs.exists("old/a.lyrics.txt"));
3487        assert_eq!(
3488            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3489            "new/a.lyrics.txt"
3490        );
3491    }
3492
3493    #[test]
3494    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3495        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3496        // Each write's inline old-path cleanup must skip a path another action
3497        // writes this run, or the second write would delete the first's freshly
3498        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3499        // for every sidecar, including the .mp4 video.
3500        let mut manifest = Manifest::new();
3501        let mut a = entry("a.flac", AudioFormat::Flac);
3502        a.lyrics_txt = Some(ArtifactState {
3503            path: "x.lyrics.txt".to_owned(),
3504            hash: "ah".to_owned(),
3505        });
3506        manifest.insert("a", a);
3507        let mut b = entry("b.flac", AudioFormat::Flac);
3508        b.lyrics_txt = Some(ArtifactState {
3509            path: "y.lyrics.txt".to_owned(),
3510            hash: "bh".to_owned(),
3511        });
3512        manifest.insert("b", b);
3513        let fs = MemFs::new()
3514            .with_file("a.flac", b"A".to_vec())
3515            .with_file("b.flac", b"B".to_vec())
3516            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3517            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3518        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3519        let plan = Plan {
3520            actions: vec![
3521                Action::WriteArtifact {
3522                    kind: ArtifactKind::LyricsTxt,
3523                    path: "y.lyrics.txt".to_owned(),
3524                    source_url: String::new(),
3525                    hash: "ah".to_owned(),
3526                    owner_id: "a".to_owned(),
3527                    content: Some("A words\n".to_owned()),
3528                },
3529                Action::WriteArtifact {
3530                    kind: ArtifactKind::LyricsTxt,
3531                    path: "x.lyrics.txt".to_owned(),
3532                    source_url: String::new(),
3533                    hash: "bh".to_owned(),
3534                    owner_id: "b".to_owned(),
3535                    content: Some("B words\n".to_owned()),
3536                },
3537            ],
3538        };
3539
3540        let outcome = run(
3541            &plan,
3542            &mut manifest,
3543            &[],
3544            &ScriptedHttp::new(),
3545            &fs,
3546            &StubFfmpeg::flac(),
3547            &RecordingClock::new(),
3548            &ExecOptions::default(),
3549        );
3550
3551        assert_eq!(outcome.failed(), 0);
3552        // Both freshly written files survive; neither cleanup clobbered the other.
3553        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3554        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3555        assert_eq!(
3556            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3557            "y.lyrics.txt"
3558        );
3559        assert_eq!(
3560            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3561            "x.lyrics.txt"
3562        );
3563    }
3564
3565    #[test]
3566    fn old_sidecar_kept_when_another_clip_still_references_it() {
3567        // A prior failed swap can leave two clips pointing at one path (A -> y and
3568        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3569        // still A's live file (#76). tracked_paths counts two references to y, so
3570        // the removal is skipped even though y is not a write target this run.
3571        let mut manifest = Manifest::new();
3572        let mut a = entry("a.flac", AudioFormat::Flac);
3573        a.lyrics_txt = Some(ArtifactState {
3574            path: "y.lyrics.txt".to_owned(),
3575            hash: "ah".to_owned(),
3576        });
3577        manifest.insert("a", a);
3578        let mut b = entry("b.flac", AudioFormat::Flac);
3579        b.lyrics_txt = Some(ArtifactState {
3580            path: "y.lyrics.txt".to_owned(),
3581            hash: "bh".to_owned(),
3582        });
3583        manifest.insert("b", b);
3584        let fs = MemFs::new()
3585            .with_file("a.flac", b"A".to_vec())
3586            .with_file("b.flac", b"B".to_vec())
3587            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3588        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3589        // the tracked-reference count is what protects A's file.
3590        let plan = Plan {
3591            actions: vec![Action::WriteArtifact {
3592                kind: ArtifactKind::LyricsTxt,
3593                path: "x.lyrics.txt".to_owned(),
3594                source_url: String::new(),
3595                hash: "bh".to_owned(),
3596                owner_id: "b".to_owned(),
3597                content: Some("B words\n".to_owned()),
3598            }],
3599        };
3600
3601        let outcome = run(
3602            &plan,
3603            &mut manifest,
3604            &[],
3605            &ScriptedHttp::new(),
3606            &fs,
3607            &StubFfmpeg::flac(),
3608            &RecordingClock::new(),
3609            &ExecOptions::default(),
3610        );
3611
3612        assert_eq!(outcome.failed(), 0);
3613        assert!(
3614            fs.exists("y.lyrics.txt"),
3615            "A's live sidecar must not be deleted"
3616        );
3617        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3618    }
3619
3620    #[test]
3621    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3622        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3623        // When BOTH move away this run, the path is no longer live, so the last
3624        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3625        // still referenced. The dynamic reference count drops to zero only after
3626        // both moves, so exactly the final cleanup removes it (#76).
3627        let mut manifest = Manifest::new();
3628        let mut a = entry("a.flac", AudioFormat::Flac);
3629        a.lyrics_txt = Some(ArtifactState {
3630            path: "s.lyrics.txt".to_owned(),
3631            hash: "ah".to_owned(),
3632        });
3633        manifest.insert("a", a);
3634        let mut b = entry("b.flac", AudioFormat::Flac);
3635        b.lyrics_txt = Some(ArtifactState {
3636            path: "s.lyrics.txt".to_owned(),
3637            hash: "bh".to_owned(),
3638        });
3639        manifest.insert("b", b);
3640        let fs = MemFs::new()
3641            .with_file("a.flac", b"A".to_vec())
3642            .with_file("b.flac", b"B".to_vec())
3643            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3644        let plan = Plan {
3645            actions: vec![
3646                Action::WriteArtifact {
3647                    kind: ArtifactKind::LyricsTxt,
3648                    path: "pa.lyrics.txt".to_owned(),
3649                    source_url: String::new(),
3650                    hash: "ah".to_owned(),
3651                    owner_id: "a".to_owned(),
3652                    content: Some("A words\n".to_owned()),
3653                },
3654                Action::WriteArtifact {
3655                    kind: ArtifactKind::LyricsTxt,
3656                    path: "pb.lyrics.txt".to_owned(),
3657                    source_url: String::new(),
3658                    hash: "bh".to_owned(),
3659                    owner_id: "b".to_owned(),
3660                    content: Some("B words\n".to_owned()),
3661                },
3662            ],
3663        };
3664
3665        let outcome = run(
3666            &plan,
3667            &mut manifest,
3668            &[],
3669            &ScriptedHttp::new(),
3670            &fs,
3671            &StubFfmpeg::flac(),
3672            &RecordingClock::new(),
3673            &ExecOptions::default(),
3674        );
3675
3676        assert_eq!(outcome.failed(), 0);
3677        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3678        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3679        assert!(
3680            !fs.exists("s.lyrics.txt"),
3681            "the vacated shared path must be reclaimed, not orphaned"
3682        );
3683    }
3684
3685    #[test]
3686    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3687        // A text sidecar for a clip with no manifest entry (its audio download
3688        // failed) must be skipped, never writing an untracked file.
3689        let plan = Plan {
3690            actions: vec![Action::WriteArtifact {
3691                kind: ArtifactKind::DetailsTxt,
3692                path: "gone.details.txt".to_owned(),
3693                source_url: String::new(),
3694                hash: "dh".to_owned(),
3695                owner_id: "gone".to_owned(),
3696                content: Some("Title: Gone\n".to_owned()),
3697            }],
3698        };
3699        let fs = MemFs::new();
3700        let mut manifest = Manifest::new();
3701
3702        let outcome = run(
3703            &plan,
3704            &mut manifest,
3705            &[],
3706            &ScriptedHttp::new(),
3707            &fs,
3708            &StubFfmpeg::flac(),
3709            &RecordingClock::new(),
3710            &ExecOptions::default(),
3711        );
3712
3713        assert_eq!(outcome.artifacts_written, 0);
3714        assert_eq!(outcome.skipped, 1);
3715        assert!(!fs.exists("gone.details.txt"));
3716        assert!(manifest.get("gone").is_none());
3717    }
3718
3719    #[test]
3720    fn delete_artifact_removes_file_and_clears_slot() {
3721        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3722        let mut manifest = Manifest::new();
3723        let mut e = entry("a.mp3", AudioFormat::Mp3);
3724        e.cover_jpg = Some(ArtifactState {
3725            path: "a/cover.jpg".to_owned(),
3726            hash: "h1".to_owned(),
3727        });
3728        manifest.insert("a", e);
3729        let plan = Plan {
3730            actions: vec![Action::DeleteArtifact {
3731                kind: ArtifactKind::CoverJpg,
3732                path: "a/cover.jpg".to_owned(),
3733                owner_id: "a".to_owned(),
3734            }],
3735        };
3736
3737        let outcome = run(
3738            &plan,
3739            &mut manifest,
3740            &[],
3741            &ScriptedHttp::new(),
3742            &fs,
3743            &StubFfmpeg::flac(),
3744            &RecordingClock::new(),
3745            &ExecOptions::default(),
3746        );
3747
3748        assert_eq!(outcome.artifacts_deleted, 1);
3749        assert!(!fs.exists("a/cover.jpg"));
3750        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3751    }
3752
3753    #[test]
3754    fn delete_artifact_tolerates_already_absent_file() {
3755        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3756        // is not a failure.
3757        let mut manifest = Manifest::new();
3758        let mut e = entry("a.mp3", AudioFormat::Mp3);
3759        e.cover_jpg = Some(ArtifactState {
3760            path: "a/cover.jpg".to_owned(),
3761            hash: "h1".to_owned(),
3762        });
3763        manifest.insert("a", e);
3764        let plan = Plan {
3765            actions: vec![Action::DeleteArtifact {
3766                kind: ArtifactKind::CoverJpg,
3767                path: "a/cover.jpg".to_owned(),
3768                owner_id: "a".to_owned(),
3769            }],
3770        };
3771
3772        let outcome = run(
3773            &plan,
3774            &mut manifest,
3775            &[],
3776            &ScriptedHttp::new(),
3777            &MemFs::new(),
3778            &StubFfmpeg::flac(),
3779            &RecordingClock::new(),
3780            &ExecOptions::default(),
3781        );
3782
3783        assert_eq!(outcome.artifacts_deleted, 1);
3784        assert_eq!(outcome.failed(), 0);
3785        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3786    }
3787
3788    #[test]
3789    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
3790        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
3791        // the run continues and the following WriteArtifact still succeeds.
3792        let mut manifest = Manifest::new();
3793        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3794        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3795        let plan = Plan {
3796            actions: vec![
3797                Action::WriteArtifact {
3798                    kind: ArtifactKind::CoverJpg,
3799                    path: "a/cover.jpg".to_owned(),
3800                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3801                    hash: "h1".to_owned(),
3802                    owner_id: "a".to_owned(),
3803                    content: None,
3804                },
3805                Action::WriteArtifact {
3806                    kind: ArtifactKind::CoverJpg,
3807                    path: "b/cover.jpg".to_owned(),
3808                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3809                    hash: "h2".to_owned(),
3810                    owner_id: "b".to_owned(),
3811                    content: None,
3812                },
3813            ],
3814        };
3815        let http = ScriptedHttp::new()
3816            .route("a/large.jpg", Reply::status(404))
3817            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3818        let fs = MemFs::new();
3819
3820        let outcome = run(
3821            &plan,
3822            &mut manifest,
3823            &[],
3824            &http,
3825            &fs,
3826            &StubFfmpeg::flac(),
3827            &RecordingClock::new(),
3828            &ExecOptions::default(),
3829        );
3830
3831        assert_eq!(outcome.status, RunStatus::Completed);
3832        assert_eq!(outcome.failed(), 1);
3833        assert_eq!(outcome.failures[0].clip_id, "a");
3834        assert_eq!(outcome.artifacts_written, 1);
3835        // The failed sidecar left no file and no manifest record.
3836        assert!(!fs.exists("a/cover.jpg"));
3837        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3838        // The following sidecar was written and recorded.
3839        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3840        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3841    }
3842
3843    #[test]
3844    fn co_delete_executes_audio_delete_then_artifact_delete() {
3845        // The plan orders the audio Delete before its sidecar DeleteArtifact.
3846        // The audio delete removes the manifest entry; the sidecar delete then
3847        // removes the file and tolerates the now-absent entry.
3848        let fs = MemFs::new()
3849            .with_file("gone.mp3", b"DATA".to_vec())
3850            .with_file("gone/cover.jpg", b"jpg".to_vec());
3851        let mut manifest = Manifest::new();
3852        let mut e = entry("gone.mp3", AudioFormat::Mp3);
3853        e.cover_jpg = Some(ArtifactState {
3854            path: "gone/cover.jpg".to_owned(),
3855            hash: "h1".to_owned(),
3856        });
3857        manifest.insert("gone", e);
3858        let plan = Plan {
3859            actions: vec![
3860                Action::Delete {
3861                    path: "gone.mp3".to_owned(),
3862                    clip_id: "gone".to_owned(),
3863                },
3864                Action::DeleteArtifact {
3865                    kind: ArtifactKind::CoverJpg,
3866                    path: "gone/cover.jpg".to_owned(),
3867                    owner_id: "gone".to_owned(),
3868                },
3869            ],
3870        };
3871
3872        let outcome = run(
3873            &plan,
3874            &mut manifest,
3875            &[],
3876            &ScriptedHttp::new(),
3877            &fs,
3878            &StubFfmpeg::flac(),
3879            &RecordingClock::new(),
3880            &ExecOptions::default(),
3881        );
3882
3883        assert_eq!(outcome.deleted, 1);
3884        assert_eq!(outcome.artifacts_deleted, 1);
3885        assert_eq!(outcome.failed(), 0);
3886        assert!(!fs.exists("gone.mp3"));
3887        assert!(!fs.exists("gone/cover.jpg"));
3888        assert!(manifest.get("gone").is_none());
3889    }
3890
3891    #[test]
3892    fn write_stem_mp3_stores_raw_and_records_slot() {
3893        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
3894        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
3895        // keyed slot records the path and hash.
3896        let mut manifest = Manifest::new();
3897        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
3898        let plan = Plan {
3899            actions: vec![Action::WriteStem {
3900                clip_id: "a".to_owned(),
3901                key: "voc".to_owned(),
3902                stem_id: "voc".to_owned(),
3903                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
3904                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
3905                format: StemFormat::Mp3,
3906                hash: "vh".to_owned(),
3907            }],
3908        };
3909        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
3910        let fs = MemFs::new();
3911
3912        let outcome = run(
3913            &plan,
3914            &mut manifest,
3915            &[],
3916            &http,
3917            &fs,
3918            &StubFfmpeg::flac(),
3919            &RecordingClock::new(),
3920            &ExecOptions::default(),
3921        );
3922
3923        assert_eq!(outcome.artifacts_written, 1);
3924        assert_eq!(outcome.failed(), 0);
3925        // Bytes are stored exactly as delivered (no transcode applied).
3926        assert_eq!(
3927            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
3928            b"stem-bytes"
3929        );
3930        // An MP3 stem never renders WAV: no convert_wav, no generation.
3931        assert_eq!(http.count("convert_wav"), 0);
3932        assert_eq!(http.count("/api/gen/"), 0);
3933        assert_eq!(
3934            manifest.get("a").unwrap().stems.get("voc"),
3935            Some(&ArtifactState {
3936                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
3937                hash: "vh".to_owned(),
3938            })
3939        );
3940    }
3941
3942    #[test]
3943    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
3944        // A WAV stem (the default) renders the stem clip's lossless WAV through the
3945        // free convert_wav flow keyed on the stem id, then downloads and stores it
3946        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
3947        let mut manifest = Manifest::new();
3948        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
3949        let plan = Plan {
3950            actions: vec![Action::WriteStem {
3951                clip_id: "a".to_owned(),
3952                key: "voc".to_owned(),
3953                stem_id: "stemvoc".to_owned(),
3954                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
3955                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
3956                format: StemFormat::Wav,
3957                hash: "vh".to_owned(),
3958            }],
3959        };
3960        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
3961        // (free) and polls again — exactly the main FLAC/WAV render path.
3962        let http = ScriptedHttp::new()
3963            .with_auth()
3964            .route_seq(
3965                "stemvoc/wav_file/",
3966                vec![
3967                    Reply::json("{}"),
3968                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
3969                ],
3970            )
3971            .route("stemvoc/convert_wav/", Reply::status(200))
3972            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
3973        let fs = MemFs::new();
3974
3975        let outcome = run(
3976            &plan,
3977            &mut manifest,
3978            &[],
3979            &http,
3980            &fs,
3981            &StubFfmpeg::flac(),
3982            &RecordingClock::new(),
3983            &small_poll(),
3984        );
3985
3986        assert_eq!(outcome.artifacts_written, 1);
3987        assert_eq!(outcome.failed(), 0);
3988        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
3989        // so the stored bytes are the raw WAV, not a FLAC transcode.
3990        assert_eq!(
3991            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
3992            b"RIFFwav-bytes"
3993        );
3994        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
3995        // The free WAV render ran; no credit-spending generation endpoint did.
3996        assert_eq!(http.count("convert_wav"), 1);
3997        assert_eq!(http.count("stem_task"), 0);
3998        assert_eq!(http.count("separate"), 0);
3999        assert_eq!(
4000            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4001            "a.stems/a - Vocals [stemvoc].wav"
4002        );
4003    }
4004
4005    #[test]
4006    fn write_stem_is_skipped_when_owner_audio_is_absent() {
4007        // No owning manifest entry (audio failed or never existed) => skip with
4008        // no fetch and no write, so a stem is never stranded without its song.
4009        let mut manifest = Manifest::new();
4010        let plan = Plan {
4011            actions: vec![Action::WriteStem {
4012                clip_id: "ghost".to_owned(),
4013                key: "voc".to_owned(),
4014                stem_id: "voc".to_owned(),
4015                path: "ghost.stems/voc.mp3".to_owned(),
4016                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4017                format: StemFormat::Mp3,
4018                hash: "vh".to_owned(),
4019            }],
4020        };
4021        // Empty HTTP script: any fetch would error, proving none happens.
4022        let http = ScriptedHttp::new();
4023        let fs = MemFs::new();
4024
4025        let outcome = run(
4026            &plan,
4027            &mut manifest,
4028            &[],
4029            &http,
4030            &fs,
4031            &StubFfmpeg::flac(),
4032            &RecordingClock::new(),
4033            &ExecOptions::default(),
4034        );
4035
4036        assert_eq!(outcome.skipped, 1);
4037        assert_eq!(outcome.artifacts_written, 0);
4038        assert_eq!(outcome.failed(), 0);
4039        assert!(!fs.exists("ghost.stems/voc.mp3"));
4040    }
4041
4042    #[test]
4043    fn write_stem_relocates_the_old_file_on_a_path_move() {
4044        // The song was renamed, so the stem moves: the new file is written and the
4045        // stale copy at the previously tracked path is removed (moved, not orphaned).
4046        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
4047        let mut manifest = Manifest::new();
4048        let mut e = entry("new.flac", AudioFormat::Flac);
4049        e.stems.insert(
4050            "voc".to_owned(),
4051            ArtifactState {
4052                path: "old.stems/voc.mp3".to_owned(),
4053                hash: "vh".to_owned(),
4054            },
4055        );
4056        manifest.insert("a", e);
4057        let plan = Plan {
4058            actions: vec![Action::WriteStem {
4059                clip_id: "a".to_owned(),
4060                key: "voc".to_owned(),
4061                stem_id: "voc".to_owned(),
4062                path: "new.stems/voc.mp3".to_owned(),
4063                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4064                format: StemFormat::Mp3,
4065                hash: "vh".to_owned(),
4066            }],
4067        };
4068        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4069
4070        let outcome = run(
4071            &plan,
4072            &mut manifest,
4073            &[],
4074            &http,
4075            &fs,
4076            &StubFfmpeg::flac(),
4077            &RecordingClock::new(),
4078            &ExecOptions::default(),
4079        );
4080
4081        assert_eq!(outcome.artifacts_written, 1);
4082        assert!(fs.exists("new.stems/voc.mp3"));
4083        assert!(
4084            !fs.exists("old.stems/voc.mp3"),
4085            "the old stem is moved, not left behind"
4086        );
4087        assert_eq!(
4088            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4089            "new.stems/voc.mp3"
4090        );
4091    }
4092
4093    #[test]
4094    fn delete_stem_removes_file_and_clears_slot() {
4095        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4096        let mut manifest = Manifest::new();
4097        let mut e = entry("a.flac", AudioFormat::Flac);
4098        e.stems.insert(
4099            "voc".to_owned(),
4100            ArtifactState {
4101                path: "a.stems/voc.mp3".to_owned(),
4102                hash: "vh".to_owned(),
4103            },
4104        );
4105        manifest.insert("a", e);
4106        let plan = Plan {
4107            actions: vec![Action::DeleteStem {
4108                clip_id: "a".to_owned(),
4109                key: "voc".to_owned(),
4110                path: "a.stems/voc.mp3".to_owned(),
4111            }],
4112        };
4113
4114        let outcome = run(
4115            &plan,
4116            &mut manifest,
4117            &[],
4118            &ScriptedHttp::new(),
4119            &fs,
4120            &StubFfmpeg::flac(),
4121            &RecordingClock::new(),
4122            &ExecOptions::default(),
4123        );
4124
4125        assert_eq!(outcome.artifacts_deleted, 1);
4126        assert!(!fs.exists("a.stems/voc.mp3"));
4127        assert!(manifest.get("a").unwrap().stems.is_empty());
4128    }
4129
4130    #[test]
4131    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4132        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4133        // pruned by the end-of-run sweep, so it can never be orphaned.
4134        let fs = MemFs::new()
4135            .with_file("song.flac", b"DATA".to_vec())
4136            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4137        assert!(fs.has_dir("song.stems"));
4138        let mut manifest = Manifest::new();
4139        let mut e = entry("song.flac", AudioFormat::Flac);
4140        e.stems.insert(
4141            "voc".to_owned(),
4142            ArtifactState {
4143                path: "song.stems/voc.mp3".to_owned(),
4144                hash: "vh".to_owned(),
4145            },
4146        );
4147        manifest.insert("a", e);
4148        let plan = Plan {
4149            actions: vec![
4150                Action::Delete {
4151                    path: "song.flac".to_owned(),
4152                    clip_id: "a".to_owned(),
4153                },
4154                Action::DeleteStem {
4155                    clip_id: "a".to_owned(),
4156                    key: "voc".to_owned(),
4157                    path: "song.stems/voc.mp3".to_owned(),
4158                },
4159            ],
4160        };
4161
4162        let outcome = run(
4163            &plan,
4164            &mut manifest,
4165            &[],
4166            &ScriptedHttp::new(),
4167            &fs,
4168            &StubFfmpeg::flac(),
4169            &RecordingClock::new(),
4170            &ExecOptions::default(),
4171        );
4172
4173        assert_eq!(outcome.deleted, 1);
4174        assert_eq!(outcome.artifacts_deleted, 1);
4175        assert!(!fs.exists("song.flac"));
4176        assert!(!fs.exists("song.stems/voc.mp3"));
4177        assert!(
4178            !fs.has_dir("song.stems"),
4179            "the emptied .stems folder is pruned"
4180        );
4181        assert!(manifest.get("a").is_none());
4182    }
4183
4184    #[test]
4185    fn write_stem_mp3_never_issues_a_generation_post() {
4186        // The MP3 stem path is GET-only: writing a stem fetches its CDN url and
4187        // never POSTs, let alone to any generation or WAV-render endpoint.
4188        let mut manifest = Manifest::new();
4189        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4190        let plan = Plan {
4191            actions: vec![Action::WriteStem {
4192                clip_id: "a".to_owned(),
4193                key: "voc".to_owned(),
4194                stem_id: "voc".to_owned(),
4195                path: "a.stems/voc.mp3".to_owned(),
4196                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4197                format: StemFormat::Mp3,
4198                hash: "vh".to_owned(),
4199            }],
4200        };
4201        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem".to_vec()));
4202
4203        run(
4204            &plan,
4205            &mut manifest,
4206            &[],
4207            &http,
4208            &MemFs::new(),
4209            &StubFfmpeg::flac(),
4210            &RecordingClock::new(),
4211            &ExecOptions::default(),
4212        );
4213
4214        assert_eq!(
4215            http.count("stem_task"),
4216            0,
4217            "no generation endpoint is ever hit"
4218        );
4219        assert_eq!(http.count("convert_wav"), 0);
4220        assert_eq!(http.count("/api/gen/"), 0);
4221    }
4222
4223    #[test]
4224    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4225        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4226        // GET over the live page-count + 0-indexed page shape), reconcile them into
4227        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4228        // is GET-only and touches NO `/api/gen/` endpoint at all.
4229        let http = ScriptedHttp::new()
4230            .with_auth()
4231            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4232            .route(
4233                "clip1/stems?page=0",
4234                Reply::json(
4235                    r#"{"stems":[
4236                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4237                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4238                    ]}"#,
4239                ),
4240            )
4241            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
4242            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
4243
4244        // List the existing stems through the client (GET-only, free).
4245        let mut auth = ClerkAuth::new("eyJtoken");
4246        pollster::block_on(auth.authenticate(&http)).unwrap();
4247        let mut client = SunoClient::new(auth, RecordingClock::new());
4248        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4249        assert!(complete);
4250        assert_eq!(stems.len(), 2);
4251        assert_eq!(stems[0].label, "Vocals");
4252
4253        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
4254        let mut manifest = Manifest::new();
4255        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4256        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4257            .iter()
4258            .map(|s| crate::reconcile::DesiredStem {
4259                key: s.id.clone(),
4260                stem_id: s.id.clone(),
4261                path: format!("clip1.stems/{}.mp3", s.id),
4262                source_url: s.url.clone(),
4263                format: StemFormat::Mp3,
4264                hash: crate::art_url_hash(&s.url),
4265            })
4266            .collect();
4267        let d = Desired {
4268            path: "clip1.flac".to_owned(),
4269            stems: Some(desired_stems),
4270            ..desired(clip("clip1"), AudioFormat::Flac)
4271        };
4272        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4273            "clip1".to_owned(),
4274            crate::reconcile::LocalFile {
4275                exists: true,
4276                size: 100,
4277            },
4278        )]
4279        .into_iter()
4280        .collect();
4281        let sources = [crate::reconcile::SourceStatus {
4282            mode: SourceMode::Mirror,
4283            fully_enumerated: true,
4284        }];
4285        let plan =
4286            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4287        assert_eq!(plan.stem_writes(), 2);
4288
4289        let fs = MemFs::new();
4290        let outcome = run(
4291            &plan,
4292            &mut manifest,
4293            std::slice::from_ref(&d),
4294            &http,
4295            &fs,
4296            &StubFfmpeg::flac(),
4297            &RecordingClock::new(),
4298            &ExecOptions::default(),
4299        );
4300
4301        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
4302        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
4303        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
4304        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
4305        // generation, no separation).
4306        assert_eq!(http.count("/api/gen/"), 0);
4307        assert_eq!(http.count("stem_task"), 0);
4308        assert_eq!(http.count("separate"), 0);
4309        assert_eq!(http.count("generate"), 0);
4310        // No stem is ever written as FLAC.
4311        assert!(!fs.exists("clip1.stems/s1.flac"));
4312    }
4313
4314    #[test]
4315    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
4316        // End-to-end #100 path with WAV stems (the default): each stem's lossless
4317        // WAV is rendered through the FREE convert_wav flow and stored RAW as
4318        // `.wav`. The mirror makes NO credit-spending generation POST.
4319        let http = ScriptedHttp::new()
4320            .with_auth()
4321            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4322            .route(
4323                "clip1/stems?page=0",
4324                Reply::json(
4325                    r#"{"stems":[
4326                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4327                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4328                    ]}"#,
4329                ),
4330            )
4331            // Each stem's WAV is already rendered, so wav_file returns the url and
4332            // no convert_wav POST is even needed (still free either way).
4333            .route(
4334                "s1/wav_file/",
4335                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
4336            )
4337            .route(
4338                "s2/wav_file/",
4339                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
4340            )
4341            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
4342            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
4343
4344        let mut auth = ClerkAuth::new("eyJtoken");
4345        pollster::block_on(auth.authenticate(&http)).unwrap();
4346        let mut client = SunoClient::new(auth, RecordingClock::new());
4347        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4348
4349        let mut manifest = Manifest::new();
4350        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4351        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4352            .iter()
4353            .map(|s| crate::reconcile::DesiredStem {
4354                key: s.id.clone(),
4355                stem_id: s.id.clone(),
4356                path: format!("clip1.stems/{}.wav", s.id),
4357                source_url: s.url.clone(),
4358                format: StemFormat::Wav,
4359                hash: crate::art_url_hash(&s.url),
4360            })
4361            .collect();
4362        let d = Desired {
4363            path: "clip1.flac".to_owned(),
4364            stems: Some(desired_stems),
4365            ..desired(clip("clip1"), AudioFormat::Flac)
4366        };
4367        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4368            "clip1".to_owned(),
4369            crate::reconcile::LocalFile {
4370                exists: true,
4371                size: 100,
4372            },
4373        )]
4374        .into_iter()
4375        .collect();
4376        let sources = [crate::reconcile::SourceStatus {
4377            mode: SourceMode::Mirror,
4378            fully_enumerated: true,
4379        }];
4380        let plan =
4381            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4382
4383        let fs = MemFs::new();
4384        let outcome = run(
4385            &plan,
4386            &mut manifest,
4387            std::slice::from_ref(&d),
4388            &http,
4389            &fs,
4390            &StubFfmpeg::flac(),
4391            &RecordingClock::new(),
4392            &small_poll(),
4393        );
4394
4395        assert_eq!(outcome.artifacts_written, 2);
4396        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
4397        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
4398        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
4399        assert!(!fs.exists("clip1.stems/s1.flac"));
4400        // No credit-spending generation/separation endpoint is ever hit.
4401        assert_eq!(http.count("stem_task"), 0);
4402        assert_eq!(http.count("separate"), 0);
4403        assert_eq!(http.count("generate"), 0);
4404    }
4405
4406    #[test]
4407    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
4408        // A clip whose Download fails leaves no manifest entry, so its following
4409        // WriteArtifact must not strand an untracked sidecar: it is skipped with
4410        // no fetch and no write. A following healthy clip still succeeds.
4411        let ca = clip("a");
4412        let plan = Plan {
4413            actions: vec![
4414                Action::Download {
4415                    clip: ca.clone(),
4416                    lineage: LineageContext::own_root(&ca),
4417                    path: "a.mp3".to_owned(),
4418                    format: AudioFormat::Mp3,
4419                },
4420                Action::WriteArtifact {
4421                    kind: ArtifactKind::CoverJpg,
4422                    path: "a/cover.jpg".to_owned(),
4423                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4424                    hash: "h1".to_owned(),
4425                    owner_id: "a".to_owned(),
4426                    content: None,
4427                },
4428                Action::WriteArtifact {
4429                    kind: ArtifactKind::CoverJpg,
4430                    path: "b/cover.jpg".to_owned(),
4431                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4432                    hash: "h2".to_owned(),
4433                    owner_id: "b".to_owned(),
4434                    content: None,
4435                },
4436            ],
4437        };
4438        // The Download's audio 404s (permanent), so no entry for "a" is created.
4439        let http = ScriptedHttp::new()
4440            .route("a.mp3", Reply::status(404))
4441            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4442            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4443        let fs = MemFs::new();
4444        let mut manifest = Manifest::new();
4445        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
4446        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4447
4448        let outcome = run(
4449            &plan,
4450            &mut manifest,
4451            &[],
4452            &http,
4453            &fs,
4454            &StubFfmpeg::flac(),
4455            &RecordingClock::new(),
4456            &ExecOptions::default(),
4457        );
4458
4459        assert_eq!(outcome.status, RunStatus::Completed);
4460        // The audio download is the only failure; the orphan artifact is skipped.
4461        assert_eq!(outcome.failed(), 1);
4462        assert_eq!(outcome.failures[0].clip_id, "a");
4463        assert_eq!(outcome.skipped, 1);
4464        // The orphan sidecar was neither fetched nor written, and left no record.
4465        assert_eq!(http.count("a/large.jpg"), 0);
4466        assert!(!fs.exists("a/cover.jpg"));
4467        assert!(manifest.get("a").is_none());
4468        // The healthy clip's sidecar still succeeded.
4469        assert_eq!(outcome.artifacts_written, 1);
4470        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4471        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4472    }
4473
4474    #[test]
4475    fn write_artifact_transcodes_animated_cover_to_webp() {
4476        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
4477        // port, and writes the transcoded WebP (not the fetched MP4), recording
4478        // the sidecar on the owning entry.
4479        let mut manifest = Manifest::new();
4480        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4481        let plan = Plan {
4482            actions: vec![Action::WriteArtifact {
4483                kind: ArtifactKind::CoverWebp,
4484                path: "a/cover.webp".to_owned(),
4485                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4486                hash: "v1".to_owned(),
4487                owner_id: "a".to_owned(),
4488                content: None,
4489            }],
4490        };
4491        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4492        let fs = MemFs::new();
4493        let ffmpeg = StubFfmpeg::webp();
4494
4495        let outcome = run(
4496            &plan,
4497            &mut manifest,
4498            &[],
4499            &http,
4500            &fs,
4501            &ffmpeg,
4502            &RecordingClock::new(),
4503            &ExecOptions::default(),
4504        );
4505
4506        assert_eq!(outcome.artifacts_written, 1);
4507        assert_eq!(outcome.failed(), 0);
4508        assert_eq!(outcome.status, RunStatus::Completed);
4509        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
4510        assert_eq!(http.count("a/video.mp4"), 1);
4511        let written = fs.read_file("a/cover.webp").unwrap();
4512        assert_ne!(written, b"mp4-bytes");
4513        assert!(written.starts_with(b"RIFF"));
4514        assert_eq!(
4515            manifest.get("a").unwrap().cover_webp,
4516            Some(ArtifactState {
4517                path: "a/cover.webp".to_owned(),
4518                hash: "v1".to_owned(),
4519            })
4520        );
4521    }
4522
4523    #[test]
4524    fn write_artifact_webp_transcode_failure_is_per_clip() {
4525        // A transcode failure is attributed to the owning clip: it is a per-clip
4526        // failure, the run completes, no sidecar is written, and the slot stays
4527        // empty. A healthy static cover in the same run still succeeds.
4528        let mut manifest = Manifest::new();
4529        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4530        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4531        let plan = Plan {
4532            actions: vec![
4533                Action::WriteArtifact {
4534                    kind: ArtifactKind::CoverWebp,
4535                    path: "a/cover.webp".to_owned(),
4536                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4537                    hash: "v1".to_owned(),
4538                    owner_id: "a".to_owned(),
4539                    content: None,
4540                },
4541                Action::WriteArtifact {
4542                    kind: ArtifactKind::CoverJpg,
4543                    path: "b/cover.jpg".to_owned(),
4544                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4545                    hash: "h1".to_owned(),
4546                    owner_id: "b".to_owned(),
4547                    content: None,
4548                },
4549            ],
4550        };
4551        let http = ScriptedHttp::new()
4552            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
4553            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4554        let fs = MemFs::new();
4555
4556        let outcome = run(
4557            &plan,
4558            &mut manifest,
4559            &[],
4560            &http,
4561            &fs,
4562            &StubFfmpeg::failing(),
4563            &RecordingClock::new(),
4564            &ExecOptions::default(),
4565        );
4566
4567        assert_eq!(outcome.status, RunStatus::Completed);
4568        assert_eq!(outcome.failed(), 1);
4569        assert_eq!(outcome.failures[0].clip_id, "a");
4570        // The animated cover failed to transcode: nothing written, slot empty.
4571        assert!(!fs.exists("a/cover.webp"));
4572        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
4573        // The static cover in the same run still succeeded.
4574        assert_eq!(outcome.artifacts_written, 1);
4575        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4576        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4577    }
4578
4579    #[test]
4580    fn write_artifact_uses_configured_webp_settings() {
4581        use std::sync::{Arc, Mutex};
4582
4583        struct RecordingWebpFfmpeg {
4584            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
4585        }
4586
4587        impl Ffmpeg for RecordingWebpFfmpeg {
4588            async fn wav_to_flac(
4589                &self,
4590                _wav: &[u8],
4591            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
4592                Ok(Vec::new())
4593            }
4594
4595            async fn mp4_to_webp(
4596                &self,
4597                _mp4: &[u8],
4598                settings: WebpEncodeSettings,
4599            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
4600                let seen = Arc::clone(&self.seen);
4601                seen.lock().unwrap().push(settings);
4602                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
4603            }
4604        }
4605
4606        let mut manifest = Manifest::new();
4607        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4608        let plan = Plan {
4609            actions: vec![Action::WriteArtifact {
4610                kind: ArtifactKind::CoverWebp,
4611                path: "a/cover.webp".to_owned(),
4612                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4613                hash: "v1".to_owned(),
4614                owner_id: "a".to_owned(),
4615                content: None,
4616            }],
4617        };
4618        let seen = Arc::new(Mutex::new(Vec::new()));
4619        let ffmpeg = RecordingWebpFfmpeg {
4620            seen: Arc::clone(&seen),
4621        };
4622        let opts = ExecOptions {
4623            cover_webp: WebpEncodeSettings {
4624                quality: 88,
4625                max_fps: 12,
4626                max_width: Some(720),
4627                lossless: false,
4628                compression_level: 4,
4629            },
4630            ..ExecOptions::default()
4631        };
4632
4633        let _ = run(
4634            &plan,
4635            &mut manifest,
4636            &[],
4637            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
4638            &MemFs::new(),
4639            &ffmpeg,
4640            &RecordingClock::new(),
4641            &opts,
4642        );
4643
4644        assert_eq!(
4645            seen.lock().unwrap().as_slice(),
4646            &[WebpEncodeSettings {
4647                quality: 88,
4648                max_fps: 12,
4649                max_width: Some(720),
4650                lossless: false,
4651                compression_level: 4,
4652            }]
4653        );
4654    }
4655
4656    // ── Phase 8: folder art routes to the album store ───────────────
4657
4658    #[test]
4659    fn folder_jpg_write_records_album_state_and_skips_manifest() {
4660        // Folder art is owned by the album root id, not a manifest clip: it
4661        // writes even with an empty manifest and records on the album store.
4662        let mut manifest = Manifest::new();
4663        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4664        let plan = Plan {
4665            actions: vec![Action::WriteArtifact {
4666                kind: ArtifactKind::FolderJpg,
4667                path: "creator/album/folder.jpg".to_owned(),
4668                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
4669                hash: "jh".to_owned(),
4670                owner_id: "root".to_owned(),
4671                content: None,
4672            }],
4673        };
4674        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
4675        let fs = MemFs::new();
4676
4677        let outcome = run_with_albums(
4678            &plan,
4679            &mut manifest,
4680            &mut albums,
4681            &[],
4682            &http,
4683            &fs,
4684            &StubFfmpeg::flac(),
4685            &RecordingClock::new(),
4686            &ExecOptions::default(),
4687        );
4688
4689        assert_eq!(outcome.artifacts_written, 1);
4690        assert_eq!(outcome.status, RunStatus::Completed);
4691        assert_eq!(
4692            fs.read_file("creator/album/folder.jpg").unwrap(),
4693            b"folder-jpg"
4694        );
4695        assert_eq!(
4696            albums.get("root").unwrap().folder_jpg,
4697            Some(ArtifactState {
4698                path: "creator/album/folder.jpg".to_owned(),
4699                hash: "jh".to_owned(),
4700            })
4701        );
4702        assert!(manifest.get("root").is_none());
4703    }
4704
4705    #[test]
4706    fn folder_webp_write_transcodes_and_records_album_state() {
4707        let mut manifest = Manifest::new();
4708        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4709        let plan = Plan {
4710            actions: vec![Action::WriteArtifact {
4711                kind: ArtifactKind::FolderWebp,
4712                path: "creator/album/cover.webp".to_owned(),
4713                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4714                hash: "wh".to_owned(),
4715                owner_id: "root".to_owned(),
4716                content: None,
4717            }],
4718        };
4719        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4720        let fs = MemFs::new();
4721
4722        let outcome = run_with_albums(
4723            &plan,
4724            &mut manifest,
4725            &mut albums,
4726            &[],
4727            &http,
4728            &fs,
4729            &StubFfmpeg::webp(),
4730            &RecordingClock::new(),
4731            &ExecOptions::default(),
4732        );
4733
4734        assert_eq!(outcome.artifacts_written, 1);
4735        assert_eq!(outcome.failed(), 0);
4736        // The MP4 was transcoded to WebP, not written verbatim.
4737        let written = fs.read_file("creator/album/cover.webp").unwrap();
4738        assert_ne!(written, b"mp4-bytes");
4739        assert!(written.starts_with(b"RIFF"));
4740        assert_eq!(
4741            albums.get("root").unwrap().folder_webp,
4742            Some(ArtifactState {
4743                path: "creator/album/cover.webp".to_owned(),
4744                hash: "wh".to_owned(),
4745            })
4746        );
4747    }
4748
4749    #[test]
4750    fn folder_mp4_write_keeps_the_source_verbatim() {
4751        let mut manifest = Manifest::new();
4752        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4753        let plan = Plan {
4754            actions: vec![Action::WriteArtifact {
4755                kind: ArtifactKind::FolderMp4,
4756                path: "creator/album/cover.mp4".to_owned(),
4757                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4758                hash: "mh".to_owned(),
4759                owner_id: "root".to_owned(),
4760                content: None,
4761            }],
4762        };
4763        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4764        let fs = MemFs::new();
4765
4766        let outcome = run_with_albums(
4767            &plan,
4768            &mut manifest,
4769            &mut albums,
4770            &[],
4771            &http,
4772            &fs,
4773            &StubFfmpeg::webp(),
4774            &RecordingClock::new(),
4775            &ExecOptions::default(),
4776        );
4777
4778        assert_eq!(outcome.artifacts_written, 1);
4779        assert_eq!(outcome.failed(), 0);
4780        // The raw MP4 is written byte-for-byte, never transcoded.
4781        assert_eq!(
4782            fs.read_file("creator/album/cover.mp4").unwrap(),
4783            b"mp4-bytes"
4784        );
4785        assert_eq!(
4786            albums.get("root").unwrap().folder_mp4,
4787            Some(ArtifactState {
4788                path: "creator/album/cover.mp4".to_owned(),
4789                hash: "mh".to_owned(),
4790            })
4791        );
4792    }
4793
4794    #[test]
4795    fn both_folder_covers_fetch_the_video_cover_once() {
4796        let mut manifest = Manifest::new();
4797        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4798        // `both` retention keeps cover.webp (transcoded) and cover.mp4 (raw) from
4799        // the one video_cover_url. FolderWebp sorts first and caches the fetched
4800        // source; FolderMp4 drains it, so the source is fetched exactly once.
4801        let plan = Plan {
4802            actions: vec![
4803                Action::WriteArtifact {
4804                    kind: ArtifactKind::FolderWebp,
4805                    path: "creator/album/cover.webp".to_owned(),
4806                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4807                    hash: "wh".to_owned(),
4808                    owner_id: "root".to_owned(),
4809                    content: None,
4810                },
4811                Action::WriteArtifact {
4812                    kind: ArtifactKind::FolderMp4,
4813                    path: "creator/album/cover.mp4".to_owned(),
4814                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4815                    hash: "mh".to_owned(),
4816                    owner_id: "root".to_owned(),
4817                    content: None,
4818                },
4819            ],
4820        };
4821        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4822        let fs = MemFs::new();
4823
4824        let outcome = run_with_albums(
4825            &plan,
4826            &mut manifest,
4827            &mut albums,
4828            &[],
4829            &http,
4830            &fs,
4831            &StubFfmpeg::webp(),
4832            &RecordingClock::new(),
4833            &ExecOptions::default(),
4834        );
4835
4836        assert_eq!(outcome.artifacts_written, 2);
4837        assert_eq!(outcome.failed(), 0);
4838        // Fetched exactly once despite two artifacts consuming it (#90 / #89).
4839        assert_eq!(http.count("root/video.mp4"), 1);
4840        // The webp is transcoded; the mp4 is the raw source verbatim.
4841        assert!(
4842            fs.read_file("creator/album/cover.webp")
4843                .unwrap()
4844                .starts_with(b"RIFF")
4845        );
4846        assert_eq!(
4847            fs.read_file("creator/album/cover.mp4").unwrap(),
4848            b"mp4-bytes"
4849        );
4850    }
4851
4852    #[test]
4853    fn folder_art_delete_clears_album_state() {
4854        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
4855        let mut manifest = Manifest::new();
4856        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4857        albums.insert(
4858            "root".to_owned(),
4859            AlbumArt {
4860                folder_jpg: Some(ArtifactState {
4861                    path: "creator/album/folder.jpg".to_owned(),
4862                    hash: "jh".to_owned(),
4863                }),
4864                folder_webp: None,
4865                folder_mp4: None,
4866            },
4867        );
4868        let plan = Plan {
4869            actions: vec![Action::DeleteArtifact {
4870                kind: ArtifactKind::FolderJpg,
4871                path: "creator/album/folder.jpg".to_owned(),
4872                owner_id: "root".to_owned(),
4873            }],
4874        };
4875
4876        let outcome = run_with_albums(
4877            &plan,
4878            &mut manifest,
4879            &mut albums,
4880            &[],
4881            &ScriptedHttp::new(),
4882            &fs,
4883            &StubFfmpeg::flac(),
4884            &RecordingClock::new(),
4885            &ExecOptions::default(),
4886        );
4887
4888        assert_eq!(outcome.artifacts_deleted, 1);
4889        assert!(!fs.exists("creator/album/folder.jpg"));
4890        // The album row had only the one kind, so it is pruned entirely.
4891        assert!(!albums.contains_key("root"));
4892    }
4893
4894    // ── Phase 9: playlist artifacts ─────────────────────────────────
4895
4896    #[test]
4897    fn playlist_write_uses_inline_content_and_records_state() {
4898        // A playlist body is generated, carried inline. With an empty manifest
4899        // and NO http routes, the write still succeeds — proving it skipped the
4900        // network — and records the playlist store keyed by the playlist id.
4901        let mut manifest = Manifest::new();
4902        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4903        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
4904        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
4905        let plan = Plan {
4906            actions: vec![Action::WriteArtifact {
4907                kind: ArtifactKind::Playlist,
4908                path: "Road Trip.m3u8".to_owned(),
4909                source_url: String::new(),
4910                hash: "ph1".to_owned(),
4911                owner_id: "pl1".to_owned(),
4912                content: Some(body.to_owned()),
4913            }],
4914        };
4915        let fs = MemFs::new();
4916
4917        let outcome = run_full(
4918            &plan,
4919            &mut manifest,
4920            &mut albums,
4921            &mut playlists,
4922            &[],
4923            &ScriptedHttp::new(),
4924            &fs,
4925            &StubFfmpeg::flac(),
4926            &RecordingClock::new(),
4927            &ExecOptions::default(),
4928        );
4929
4930        assert_eq!(outcome.artifacts_written, 1);
4931        assert_eq!(outcome.failed(), 0);
4932        // The exact inline bytes were written, verbatim.
4933        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
4934        assert_eq!(
4935            playlists.get("pl1"),
4936            Some(&PlaylistState {
4937                name: "Road Trip".to_owned(),
4938                path: "Road Trip.m3u8".to_owned(),
4939                hash: "ph1".to_owned(),
4940            })
4941        );
4942    }
4943
4944    #[test]
4945    fn playlist_delete_removes_file_and_clears_state() {
4946        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
4947        let mut manifest = Manifest::new();
4948        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4949        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
4950        playlists.insert(
4951            "pl1".to_owned(),
4952            PlaylistState {
4953                name: "Old".to_owned(),
4954                path: "Old.m3u8".to_owned(),
4955                hash: "ph1".to_owned(),
4956            },
4957        );
4958        let plan = Plan {
4959            actions: vec![Action::DeleteArtifact {
4960                kind: ArtifactKind::Playlist,
4961                path: "Old.m3u8".to_owned(),
4962                owner_id: "pl1".to_owned(),
4963            }],
4964        };
4965
4966        let outcome = run_full(
4967            &plan,
4968            &mut manifest,
4969            &mut albums,
4970            &mut playlists,
4971            &[],
4972            &ScriptedHttp::new(),
4973            &fs,
4974            &StubFfmpeg::flac(),
4975            &RecordingClock::new(),
4976            &ExecOptions::default(),
4977        );
4978
4979        assert_eq!(outcome.artifacts_deleted, 1);
4980        assert!(!fs.exists("Old.m3u8"));
4981        assert!(
4982            !playlists.contains_key("pl1"),
4983            "the playlist row is cleared on delete"
4984        );
4985    }
4986
4987    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
4988
4989    #[test]
4990    fn rename_move_relocates_cover_and_prunes_old_album() {
4991        // A title/album change moves the audio (Rename) and re-emits the cover
4992        // at the NEW path. The old cover must be removed and the now-empty old
4993        // album directory pruned, leaving no orphan sidecar and no ghost dir.
4994        let mut manifest = Manifest::new();
4995        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
4996        e.cover_jpg = Some(ArtifactState {
4997            path: "Creator/AlbumA/cover.jpg".to_owned(),
4998            hash: "h1".to_owned(),
4999        });
5000        manifest.insert("a", e);
5001        let fs = MemFs::new()
5002            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
5003            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
5004        let plan = Plan {
5005            actions: vec![
5006                Action::Rename {
5007                    from: "Creator/AlbumA/song.flac".to_owned(),
5008                    to: "Creator/AlbumB/song.flac".to_owned(),
5009                },
5010                Action::WriteArtifact {
5011                    kind: ArtifactKind::CoverJpg,
5012                    path: "Creator/AlbumB/cover.jpg".to_owned(),
5013                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5014                    hash: "h1".to_owned(),
5015                    owner_id: "a".to_owned(),
5016                    content: None,
5017                },
5018            ],
5019        };
5020        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
5021
5022        let outcome = run(
5023            &plan,
5024            &mut manifest,
5025            &[],
5026            &http,
5027            &fs,
5028            &StubFfmpeg::flac(),
5029            &RecordingClock::new(),
5030            &ExecOptions::default(),
5031        );
5032
5033        assert_eq!(outcome.failed(), 0);
5034        // Audio moved, the new cover was written, the old cover removed.
5035        assert!(fs.exists("Creator/AlbumB/song.flac"));
5036        assert_eq!(
5037            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
5038            b"new-jpg"
5039        );
5040        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
5041        assert!(!fs.exists("Creator/AlbumA/song.flac"));
5042        // The manifest cover slot now points at the new path.
5043        assert_eq!(
5044            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5045            "Creator/AlbumB/cover.jpg"
5046        );
5047        // The emptied old album directory is pruned; the new one survives.
5048        assert!(!fs.has_dir("Creator/AlbumA"));
5049        assert!(fs.has_dir("Creator/AlbumB"));
5050    }
5051
5052    #[test]
5053    fn rename_move_relocates_folder_art_and_prunes_old_album() {
5054        // An album rename moves folder.jpg: the old file is removed, the album
5055        // store slot advanced to the new path, and the emptied dir pruned.
5056        let mut manifest = Manifest::new();
5057        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5058        albums.insert(
5059            "root".to_owned(),
5060            AlbumArt {
5061                folder_jpg: Some(ArtifactState {
5062                    path: "Creator/AlbumA/folder.jpg".to_owned(),
5063                    hash: "jh".to_owned(),
5064                }),
5065                folder_webp: None,
5066                folder_mp4: None,
5067            },
5068        );
5069        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
5070        let plan = Plan {
5071            actions: vec![Action::WriteArtifact {
5072                kind: ArtifactKind::FolderJpg,
5073                path: "Creator/AlbumB/folder.jpg".to_owned(),
5074                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5075                hash: "jh".to_owned(),
5076                owner_id: "root".to_owned(),
5077                content: None,
5078            }],
5079        };
5080        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
5081
5082        let outcome = run_with_albums(
5083            &plan,
5084            &mut manifest,
5085            &mut albums,
5086            &[],
5087            &http,
5088            &fs,
5089            &StubFfmpeg::flac(),
5090            &RecordingClock::new(),
5091            &ExecOptions::default(),
5092        );
5093
5094        assert_eq!(outcome.failed(), 0);
5095        assert_eq!(
5096            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
5097            b"new-folder"
5098        );
5099        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
5100        assert_eq!(
5101            albums
5102                .get("root")
5103                .unwrap()
5104                .folder_jpg
5105                .as_ref()
5106                .unwrap()
5107                .path,
5108            "Creator/AlbumB/folder.jpg"
5109        );
5110        assert!(!fs.has_dir("Creator/AlbumA"));
5111        assert!(fs.has_dir("Creator/AlbumB"));
5112    }
5113
5114    #[test]
5115    fn prune_empty_dirs_removes_only_empty_dirs() {
5116        // A direct exercise of the prune port's safety guarantees on a mixed
5117        // tree: nested empties go, anything holding a file (hidden ones too)
5118        // stays, and no file is touched.
5119        let fs = MemFs::new()
5120            .with_file("keep/full/song.flac", b"x".to_vec())
5121            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
5122            .with_dir("empty/leaf")
5123            .with_dir("nested/a/b/c");
5124
5125        fs.prune_empty_dirs("").unwrap();
5126
5127        // Every empty directory, however deeply nested, is pruned bottom-up.
5128        for gone in [
5129            "empty",
5130            "empty/leaf",
5131            "nested",
5132            "nested/a",
5133            "nested/a/b",
5134            "nested/a/b/c",
5135        ] {
5136            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
5137        }
5138        // A directory holding any file — including only a hidden dotfile — stays.
5139        assert!(fs.has_dir("keep"));
5140        assert!(fs.has_dir("keep/full"));
5141        assert!(fs.has_dir("hidden"));
5142        // No file was touched.
5143        assert!(fs.exists("keep/full/song.flac"));
5144        assert!(fs.exists("hidden/.suno-manifest.json"));
5145    }
5146
5147    #[test]
5148    fn prune_empty_dirs_never_removes_the_named_root() {
5149        // Pruning under a named root clears its empty children but keeps the
5150        // root itself, even when the root is now empty.
5151        let fs = MemFs::new().with_dir("empty/leaf");
5152        fs.prune_empty_dirs("empty").unwrap();
5153        assert!(fs.has_dir("empty"), "the named root is never removed");
5154        assert!(!fs.has_dir("empty/leaf"));
5155    }
5156
5157    #[test]
5158    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
5159        // If removing the old sidecar fails, the write is a per-clip failure
5160        // that never aborts the run and does NOT advance the state slot, so the
5161        // next identical run re-attempts the cleanup and the tree converges.
5162        let mut manifest = Manifest::new();
5163        let mut e = entry("a.flac", AudioFormat::Flac);
5164        e.cover_jpg = Some(ArtifactState {
5165            path: "AlbumA/cover.jpg".to_owned(),
5166            hash: "h1".to_owned(),
5167        });
5168        manifest.insert("a", e);
5169        let fs = MemFs::new()
5170            .with_file("a.flac", b"AUDIO".to_vec())
5171            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5172        let plan = Plan {
5173            actions: vec![Action::WriteArtifact {
5174                kind: ArtifactKind::CoverJpg,
5175                path: "AlbumB/cover.jpg".to_owned(),
5176                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5177                hash: "h1".to_owned(),
5178                owner_id: "a".to_owned(),
5179                content: None,
5180            }],
5181        };
5182        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5183
5184        // Run 1: the old-cover remove is forced to fail.
5185        fs.arm_fail_remove("AlbumA/cover.jpg");
5186        let first = run(
5187            &plan,
5188            &mut manifest,
5189            &[],
5190            &http,
5191            &fs,
5192            &StubFfmpeg::flac(),
5193            &RecordingClock::new(),
5194            &ExecOptions::default(),
5195        );
5196        assert_eq!(
5197            first.status,
5198            RunStatus::Completed,
5199            "a remove failure never aborts the run"
5200        );
5201        assert_eq!(first.failed(), 1);
5202        // The new cover is written but the old one lingers and the slot is stale.
5203        assert!(fs.exists("AlbumB/cover.jpg"));
5204        assert!(fs.exists("AlbumA/cover.jpg"));
5205        assert_eq!(
5206            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5207            "AlbumA/cover.jpg"
5208        );
5209        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5210
5211        // Run 2: the same plan re-runs with the fault cleared and converges.
5212        fs.disarm_fail_remove("AlbumA/cover.jpg");
5213        let second = run(
5214            &plan,
5215            &mut manifest,
5216            &[],
5217            &http,
5218            &fs,
5219            &StubFfmpeg::flac(),
5220            &RecordingClock::new(),
5221            &ExecOptions::default(),
5222        );
5223        assert_eq!(second.failed(), 0);
5224        assert!(fs.exists("AlbumB/cover.jpg"));
5225        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5226        assert_eq!(
5227            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5228            "AlbumB/cover.jpg"
5229        );
5230        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5231    }
5232
5233    #[test]
5234    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
5235        // The idempotent case: a content-only cover rewrite (hash drift, path
5236        // unchanged) attempts no remove and prunes no live directory. A remove
5237        // failure is armed on the cover path, so any spurious remove would
5238        // surface as a failure — none does.
5239        let mut manifest = Manifest::new();
5240        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
5241        e.cover_jpg = Some(ArtifactState {
5242            path: "Album/cover.jpg".to_owned(),
5243            hash: "h1".to_owned(),
5244        });
5245        manifest.insert("a", e);
5246        let fs = MemFs::new()
5247            .with_file("Album/a.mp3", b"AUDIO".to_vec())
5248            .with_file("Album/cover.jpg", b"old".to_vec());
5249        fs.arm_fail_remove("Album/cover.jpg");
5250        let plan = Plan {
5251            actions: vec![Action::WriteArtifact {
5252                kind: ArtifactKind::CoverJpg,
5253                path: "Album/cover.jpg".to_owned(),
5254                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5255                hash: "h2".to_owned(),
5256                owner_id: "a".to_owned(),
5257                content: None,
5258            }],
5259        };
5260        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5261
5262        let outcome = run(
5263            &plan,
5264            &mut manifest,
5265            &[],
5266            &http,
5267            &fs,
5268            &StubFfmpeg::flac(),
5269            &RecordingClock::new(),
5270            &ExecOptions::default(),
5271        );
5272
5273        assert_eq!(
5274            outcome.failed(),
5275            0,
5276            "no remove is attempted, so the armed failure never fires"
5277        );
5278        assert_eq!(outcome.artifacts_written, 1);
5279        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
5280        assert_eq!(
5281            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
5282            "h2"
5283        );
5284        // The live directory is untouched by prune.
5285        assert!(fs.has_dir("Album"));
5286    }
5287
5288    // ── Concurrency (issue #22) ─────────────────────────────────────
5289
5290    mod concurrency {
5291        use super::*;
5292        use crate::ffmpeg::FfmpegError;
5293        use crate::fs::{FileStat, FsError};
5294        use crate::http::{HttpRequest, TransportError};
5295        use std::future::Future;
5296        use std::pin::Pin;
5297        use std::sync::Arc;
5298        use std::sync::atomic::{AtomicUsize, Ordering};
5299        use std::task::{Context, Poll};
5300
5301        /// A future that pends exactly once before resolving, waking itself so a
5302        /// single-threaded executor re-polls. It forces the [`Http`] port to
5303        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
5304        /// each in-flight request and the true overlap becomes observable.
5305        #[derive(Default)]
5306        struct YieldOnce {
5307            yielded: bool,
5308        }
5309
5310        impl Future for YieldOnce {
5311            type Output = ();
5312            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
5313                if self.yielded {
5314                    Poll::Ready(())
5315                } else {
5316                    self.yielded = true;
5317                    cx.waker().wake_by_ref();
5318                    Poll::Pending
5319                }
5320            }
5321        }
5322
5323        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
5324        /// number of concurrently in-flight requests. Each `send` bumps a live
5325        /// counter, yields once (so peers can start), then delegates.
5326        struct GatedHttp {
5327            inner: ScriptedHttp,
5328            inflight: Arc<AtomicUsize>,
5329            peak: Arc<AtomicUsize>,
5330        }
5331
5332        impl GatedHttp {
5333            fn new(inner: ScriptedHttp) -> Self {
5334                Self {
5335                    inner,
5336                    inflight: Arc::new(AtomicUsize::new(0)),
5337                    peak: Arc::new(AtomicUsize::new(0)),
5338                }
5339            }
5340
5341            fn peak(&self) -> usize {
5342                self.peak.load(Ordering::SeqCst)
5343            }
5344        }
5345
5346        impl Http for GatedHttp {
5347            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
5348                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
5349                self.peak.fetch_max(now, Ordering::SeqCst);
5350                YieldOnce::default().await;
5351                let out = self.inner.send(request).await;
5352                self.inflight.fetch_sub(1, Ordering::SeqCst);
5353                out
5354            }
5355        }
5356
5357        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
5358            let c = clip(id);
5359            let d = desired(c.clone(), format);
5360            let action = Action::Download {
5361                clip: c.clone(),
5362                lineage: LineageContext::own_root(&c),
5363                path: d.path.clone(),
5364                format,
5365            };
5366            (c, d, action)
5367        }
5368
5369        fn opts_with(concurrency: u32) -> ExecOptions {
5370            ExecOptions {
5371                concurrency,
5372                ..small_poll()
5373            }
5374        }
5375
5376        #[test]
5377        fn concurrency_never_exceeds_the_configured_bound() {
5378            let count = 6;
5379            let concurrency = 3;
5380            let mut scripted = ScriptedHttp::new().with_auth();
5381            let mut actions = Vec::new();
5382            let mut desireds = Vec::new();
5383            for i in 0..count {
5384                let id = format!("c{i}");
5385                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5386                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5387                actions.push(action);
5388                desireds.push(d);
5389            }
5390            let http = GatedHttp::new(scripted);
5391            let fs = MemFs::new();
5392            let plan = Plan { actions };
5393            let mut manifest = Manifest::new();
5394
5395            let outcome = run_gated_fs(
5396                &plan,
5397                &mut manifest,
5398                &desireds,
5399                &http,
5400                &fs,
5401                &opts_with(concurrency),
5402            );
5403
5404            assert_eq!(outcome.downloaded, count);
5405            assert!(
5406                http.peak() <= concurrency as usize,
5407                "peak {} exceeded the bound {concurrency}",
5408                http.peak()
5409            );
5410            assert_eq!(
5411                http.peak(),
5412                concurrency as usize,
5413                "expected the run to saturate the bound"
5414            );
5415        }
5416
5417        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
5418        /// outcome. The client is built here so the limiter can be inspected by
5419        /// the caller-facing variant below.
5420        fn run_gated_fs(
5421            plan: &Plan,
5422            manifest: &mut Manifest,
5423            desired: &[Desired],
5424            http: &GatedHttp,
5425            fs: &MemFs,
5426            opts: &ExecOptions,
5427        ) -> ExecOutcome {
5428            let ffmpeg = StubFfmpeg::flac();
5429            let clock = RecordingClock::new();
5430            let mut albums = BTreeMap::new();
5431            let mut playlists = BTreeMap::new();
5432            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5433            pollster::block_on(execute(
5434                plan,
5435                manifest,
5436                &mut albums,
5437                &mut playlists,
5438                desired,
5439                &HashMap::new(),
5440                Ports {
5441                    client: &mut client,
5442                    http,
5443                    fs,
5444                    ffmpeg: &ffmpeg,
5445                    clock: &clock,
5446                },
5447                opts,
5448            ))
5449        }
5450
5451        #[test]
5452        fn a_failing_clip_does_not_abort_the_others() {
5453            let mut scripted = ScriptedHttp::new().with_auth();
5454            scripted = scripted
5455                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
5456                .route("bad.mp3", Reply::status(404))
5457                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
5458            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
5459            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
5460            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
5461            let http = GatedHttp::new(scripted);
5462            let fs = MemFs::new();
5463            let plan = Plan {
5464                actions: vec![a1, a2, a3],
5465            };
5466            let mut manifest = Manifest::new();
5467
5468            let outcome = run_gated_fs(
5469                &plan,
5470                &mut manifest,
5471                &[d1, d2, d3],
5472                &http,
5473                &fs,
5474                &opts_with(3),
5475            );
5476
5477            assert_eq!(outcome.downloaded, 2);
5478            assert_eq!(outcome.failed(), 1);
5479            assert_eq!(outcome.status, RunStatus::Completed);
5480            assert_eq!(outcome.failures[0].clip_id, "bad");
5481            assert!(manifest.get("ok1").is_some());
5482            assert!(manifest.get("ok2").is_some());
5483            assert!(manifest.get("bad").is_none());
5484        }
5485
5486        #[test]
5487        fn outcome_is_identical_across_concurrency_levels() {
5488            // A plan mixing successful and failing downloads with serial phase-2
5489            // actions (a skip and a delete), so both phases contribute.
5490            fn build() -> (Plan, Vec<Desired>) {
5491                let mut actions = Vec::new();
5492                let mut desireds = Vec::new();
5493                for id in ["a", "b", "c", "d"] {
5494                    let (_c, d, action) = download(id, AudioFormat::Mp3);
5495                    actions.push(action);
5496                    desireds.push(d);
5497                }
5498                // A failing download in the middle of the audio set.
5499                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
5500                actions.insert(2, ae);
5501                desireds.push(de);
5502                // Phase-2 actions.
5503                actions.push(Action::Skip {
5504                    clip_id: "gone".to_owned(),
5505                });
5506                actions.push(Action::Delete {
5507                    path: "old.mp3".to_owned(),
5508                    clip_id: "old".to_owned(),
5509                });
5510                (Plan { actions }, desireds)
5511            }
5512
5513            fn http() -> ScriptedHttp {
5514                ScriptedHttp::new()
5515                    .with_auth()
5516                    .route("a.mp3", Reply::ok(b"a".to_vec()))
5517                    .route("b.mp3", Reply::ok(b"b".to_vec()))
5518                    .route("c.mp3", Reply::ok(b"c".to_vec()))
5519                    .route("d.mp3", Reply::ok(b"d".to_vec()))
5520                    .route("fail.mp3", Reply::status(404))
5521            }
5522
5523            fn seed_manifest() -> Manifest {
5524                let mut m = Manifest::new();
5525                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
5526                m
5527            }
5528
5529            let (plan, desireds) = build();
5530
5531            let mut m1 = seed_manifest();
5532            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
5533            let out1 = run_gated_fs(
5534                &plan,
5535                &mut m1,
5536                &desireds,
5537                &GatedHttp::new(http()),
5538                &fs1,
5539                &opts_with(1),
5540            );
5541
5542            let mut m8 = seed_manifest();
5543            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
5544            let out8 = run_gated_fs(
5545                &plan,
5546                &mut m8,
5547                &desireds,
5548                &GatedHttp::new(http()),
5549                &fs8,
5550                &opts_with(8),
5551            );
5552
5553            assert_eq!(out1, out8, "outcome must not depend on concurrency");
5554            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
5555            assert_eq!(out8.downloaded, 4);
5556            assert_eq!(out8.deleted, 1);
5557            assert_eq!(out8.skipped, 1);
5558            assert_eq!(out8.failed(), 1);
5559        }
5560
5561        #[test]
5562        fn a_systemic_disk_full_aborts_promptly() {
5563            let count = 8;
5564            let concurrency = 2;
5565            let mut scripted = ScriptedHttp::new().with_auth();
5566            let mut actions = Vec::new();
5567            let mut desireds = Vec::new();
5568            for i in 0..count {
5569                let id = format!("d{i}");
5570                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5571                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5572                actions.push(action);
5573                desireds.push(d);
5574            }
5575            // The very first clip's write hits ENOSPC, a systemic failure.
5576            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
5577            let http = GatedHttp::new(scripted);
5578            let plan = Plan { actions };
5579            let mut manifest = Manifest::new();
5580
5581            let outcome = run_gated_fs(
5582                &plan,
5583                &mut manifest,
5584                &desireds,
5585                &http,
5586                &fs,
5587                &opts_with(concurrency),
5588            );
5589
5590            assert_eq!(outcome.status, RunStatus::DiskFull);
5591            assert!(
5592                outcome.downloaded < count,
5593                "a systemic abort must stop remaining work, downloaded {}",
5594                outcome.downloaded
5595            );
5596        }
5597
5598        #[test]
5599        fn limiter_records_a_rate_limit_under_concurrent_calls() {
5600            // Three concurrent FLAC renders; exactly one clip is throttled once
5601            // on its wav_file read. The shared limiter must record that single
5602            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
5603            // the mutex keeps the AIMD state correct under concurrency.
5604            let scripted = ScriptedHttp::new()
5605                .with_auth()
5606                .route_seq(
5607                    "/gen/x/wav_file/",
5608                    vec![
5609                        Reply::status(429),
5610                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
5611                    ],
5612                )
5613                .route(
5614                    "/gen/y/wav_file/",
5615                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
5616                )
5617                .route(
5618                    "/gen/z/wav_file/",
5619                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
5620                )
5621                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
5622                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
5623                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
5624
5625            let mut actions = Vec::new();
5626            let mut desireds = Vec::new();
5627            for id in ["x", "y", "z"] {
5628                let (_c, d, action) = download(id, AudioFormat::Flac);
5629                actions.push(action);
5630                desireds.push(d);
5631            }
5632            let plan = Plan { actions };
5633            let fs = MemFs::new();
5634            let ffmpeg = StubFfmpeg::flac();
5635            let clock = RecordingClock::new();
5636            let mut albums = BTreeMap::new();
5637            let mut playlists = BTreeMap::new();
5638            let mut manifest = Manifest::new();
5639            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5640
5641            let outcome = pollster::block_on(execute(
5642                &plan,
5643                &mut manifest,
5644                &mut albums,
5645                &mut playlists,
5646                &desireds,
5647                &HashMap::new(),
5648                Ports {
5649                    client: &mut client,
5650                    http: &scripted,
5651                    fs: &fs,
5652                    ffmpeg: &ffmpeg,
5653                    clock: &clock,
5654                },
5655                &opts_with(3),
5656            ));
5657
5658            assert_eq!(outcome.downloaded, 3);
5659            assert_eq!(outcome.failed(), 0);
5660            assert!(
5661                (client.limiter_rate() - 1.0).abs() < 1e-9,
5662                "one 429 must halve the rate to 1.0, got {}",
5663                client.limiter_rate()
5664            );
5665        }
5666
5667        #[test]
5668        fn a_download_is_committed_in_plan_order_around_a_rename() {
5669            // Plan order: rename "orig" away from shared.mp3 first, then download
5670            // a new clip into shared.mp3. A parallel executor that performed the
5671            // download's destination write off plan order would write shared.mp3
5672            // before the rename ran, letting the rename carry those fresh bytes
5673            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
5674            // Committing every destination effect serially in plan order keeps
5675            // moved.mp3 = the original and shared.mp3 = the new download.
5676            let c_new = clip("new");
5677            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
5678            d_new.path = "shared.mp3".to_owned();
5679            let plan = Plan {
5680                actions: vec![
5681                    Action::Rename {
5682                        from: "shared.mp3".to_owned(),
5683                        to: "moved.mp3".to_owned(),
5684                    },
5685                    Action::Download {
5686                        clip: c_new.clone(),
5687                        lineage: LineageContext::own_root(&c_new),
5688                        path: "shared.mp3".to_owned(),
5689                        format: AudioFormat::Mp3,
5690                    },
5691                ],
5692            };
5693            let scripted = ScriptedHttp::new()
5694                .with_auth()
5695                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
5696            let http = GatedHttp::new(scripted);
5697            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
5698            let mut manifest = Manifest::new();
5699            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
5700
5701            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
5702
5703            assert_eq!(outcome.renamed, 1);
5704            assert_eq!(outcome.downloaded, 1);
5705            assert_eq!(
5706                fs.read_file("moved.mp3").as_deref(),
5707                Some(&b"ORIGINAL"[..]),
5708                "the rename must carry the original bytes, untouched by the download"
5709            );
5710            let landed = fs.read_file("shared.mp3").expect("new download must land");
5711            assert_ne!(
5712                landed, b"ORIGINAL",
5713                "the new download must replace the moved original, not corrupt it"
5714            );
5715            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
5716            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
5717        }
5718
5719        #[test]
5720        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
5721            // A systemic disk-full abort strikes the download committed before the
5722            // reformat. Because the reformat's slow render is side-effect-free and
5723            // its destination write + old-file removal only happen in the serial
5724            // commit (which the abort skips), the old file survives and the
5725            // manifest still points at it: no removed-but-referenced file.
5726            let boom = clip("boom");
5727            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
5728            d_boom.path = "boom.mp3".to_owned();
5729            let reformer = clip("r");
5730            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
5731            let plan = Plan {
5732                actions: vec![
5733                    Action::Download {
5734                        clip: boom.clone(),
5735                        lineage: LineageContext::own_root(&boom),
5736                        path: "boom.mp3".to_owned(),
5737                        format: AudioFormat::Mp3,
5738                    },
5739                    Action::Reformat {
5740                        clip: reformer.clone(),
5741                        path: "r_new.mp3".to_owned(),
5742                        from_path: "r_old.flac".to_owned(),
5743                        from: AudioFormat::Flac,
5744                        to: AudioFormat::Mp3,
5745                    },
5746                ],
5747            };
5748            let scripted = ScriptedHttp::new()
5749                .with_auth()
5750                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
5751                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
5752            let http = GatedHttp::new(scripted);
5753            // The download's write hits ENOSPC, a systemic abort.
5754            let fs = MemFs::new()
5755                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
5756                .fail_write_out_of_space("boom.mp3");
5757            let mut manifest = Manifest::new();
5758            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
5759
5760            let outcome = run_gated_fs(
5761                &plan,
5762                &mut manifest,
5763                &[d_boom, d_reformer],
5764                &http,
5765                &fs,
5766                &opts_with(4),
5767            );
5768
5769            assert_eq!(outcome.status, RunStatus::DiskFull);
5770            assert!(
5771                fs.exists("r_old.flac"),
5772                "the old file must survive the abort"
5773            );
5774            assert!(
5775                !fs.exists("r_new.mp3"),
5776                "no reformatted file may be written"
5777            );
5778            let still = manifest.get("r").expect("the manifest must still track r");
5779            assert_eq!(
5780                still.path, "r_old.flac",
5781                "the manifest must still point at the surviving old file"
5782            );
5783            assert_eq!(still.format, AudioFormat::Flac);
5784        }
5785
5786        #[test]
5787        fn a_systemic_abort_leaves_no_untracked_destination_files() {
5788            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
5789            // and the rest never commit. Every file remaining on disk must be one
5790            // the manifest tracks: producers write nothing, so an abort cannot
5791            // strand an untracked file from an in-flight or buffered render.
5792            let mut scripted = ScriptedHttp::new().with_auth();
5793            let mut actions = Vec::new();
5794            let mut desireds = Vec::new();
5795            for id in ["a0", "a1", "boom", "a3", "a4"] {
5796                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
5797                let (_c, d, action) = download(id, AudioFormat::Mp3);
5798                actions.push(action);
5799                desireds.push(d);
5800            }
5801            let http = GatedHttp::new(scripted);
5802            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
5803            let plan = Plan { actions };
5804            let mut manifest = Manifest::new();
5805
5806            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
5807
5808            assert_eq!(outcome.status, RunStatus::DiskFull);
5809            let tracked: std::collections::BTreeSet<String> = manifest
5810                .entries
5811                .values()
5812                .map(|entry| entry.path.clone())
5813                .collect();
5814            for path in fs.paths() {
5815                assert!(
5816                    tracked.contains(&path),
5817                    "found an untracked destination file: {path}"
5818                );
5819            }
5820            assert!(
5821                !fs.exists("a3.mp3"),
5822                "uncommitted renders must not be on disk"
5823            );
5824            assert!(
5825                !fs.exists("a4.mp3"),
5826                "uncommitted renders must not be on disk"
5827            );
5828        }
5829
5830        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
5831        /// live: it bumps a shared counter (tracking the peak) when a transcode
5832        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
5833        /// The [transcode, write] window is a superset of the true in-memory hold,
5834        /// so the observed peak upper-bounds the real one.
5835        struct CountingFfmpeg {
5836            inner: StubFfmpeg,
5837            held: Arc<AtomicUsize>,
5838            peak: Arc<AtomicUsize>,
5839        }
5840
5841        impl Ffmpeg for CountingFfmpeg {
5842            fn wav_to_flac(
5843                &self,
5844                wav: &[u8],
5845            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
5846                let fut = self.inner.wav_to_flac(wav);
5847                let held = self.held.clone();
5848                let peak = self.peak.clone();
5849                async move {
5850                    let out = fut.await;
5851                    if out.is_ok() {
5852                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
5853                        peak.fetch_max(now, Ordering::SeqCst);
5854                    }
5855                    out
5856                }
5857            }
5858
5859            fn mp4_to_webp(
5860                &self,
5861                mp4: &[u8],
5862                settings: WebpEncodeSettings,
5863            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
5864                self.inner.mp4_to_webp(mp4, settings)
5865            }
5866        }
5867
5868        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
5869        /// payload counter on each committing write, closing the window opened by
5870        /// [`CountingFfmpeg`].
5871        struct CountingFs {
5872            inner: MemFs,
5873            held: Arc<AtomicUsize>,
5874        }
5875
5876        impl Filesystem for CountingFs {
5877            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
5878                let out = self.inner.write_atomic(path, bytes);
5879                self.held.fetch_sub(1, Ordering::SeqCst);
5880                out
5881            }
5882
5883            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
5884                self.inner.rename(from, to)
5885            }
5886
5887            fn remove(&self, path: &str) -> Result<(), FsError> {
5888                self.inner.remove(path)
5889            }
5890
5891            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
5892                self.inner.prune_empty_dirs(root)
5893            }
5894
5895            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
5896                self.inner.read(path)
5897            }
5898
5899            fn metadata(&self, path: &str) -> Option<FileStat> {
5900                self.inner.metadata(path)
5901            }
5902        }
5903
5904        #[test]
5905        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
5906            // Far more FLAC clips than the concurrency bound. The ordered buffered
5907            // render keeps at most about `concurrency` transcoded payloads live at
5908            // once (never the whole library), so peak held <= concurrency + 1.
5909            let count = 12;
5910            let concurrency = 3;
5911            let mut scripted = ScriptedHttp::new().with_auth();
5912            let mut actions = Vec::new();
5913            let mut desireds = Vec::new();
5914            for i in 0..count {
5915                let id = format!("f{i}");
5916                scripted = scripted
5917                    .route(
5918                        &format!("/gen/{id}/wav_file/"),
5919                        Reply::json(&format!(
5920                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
5921                        )),
5922                    )
5923                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
5924                let (_c, d, action) = download(&id, AudioFormat::Flac);
5925                actions.push(action);
5926                desireds.push(d);
5927            }
5928            let http = GatedHttp::new(scripted);
5929            let held = Arc::new(AtomicUsize::new(0));
5930            let peak = Arc::new(AtomicUsize::new(0));
5931            let ffmpeg = CountingFfmpeg {
5932                inner: StubFfmpeg::flac(),
5933                held: held.clone(),
5934                peak: peak.clone(),
5935            };
5936            let fs = CountingFs {
5937                inner: MemFs::new(),
5938                held: held.clone(),
5939            };
5940            let clock = RecordingClock::new();
5941            let mut albums = BTreeMap::new();
5942            let mut playlists = BTreeMap::new();
5943            let mut manifest = Manifest::new();
5944            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5945            let plan = Plan { actions };
5946
5947            let outcome = pollster::block_on(execute(
5948                &plan,
5949                &mut manifest,
5950                &mut albums,
5951                &mut playlists,
5952                &desireds,
5953                &HashMap::new(),
5954                Ports {
5955                    client: &mut client,
5956                    http: &http,
5957                    fs: &fs,
5958                    ffmpeg: &ffmpeg,
5959                    clock: &clock,
5960                },
5961                &opts_with(concurrency),
5962            ));
5963
5964            assert_eq!(outcome.downloaded, count as usize);
5965            assert_eq!(
5966                held.load(Ordering::SeqCst),
5967                0,
5968                "every payload must be committed"
5969            );
5970            assert!(
5971                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
5972                "peak live payloads {} exceeded the bound {}",
5973                peak.load(Ordering::SeqCst),
5974                concurrency + 1
5975            );
5976            assert!(
5977                peak.load(Ordering::SeqCst) >= 2,
5978                "the render should genuinely overlap, peak was {}",
5979                peak.load(Ordering::SeqCst)
5980            );
5981        }
5982    }
5983}