Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` is the only `&mut` port: it performs the authenticated WAV render
158/// flow and so mutates its cached session. The rest are shared references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a mut SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// The audio-producing actions ([`Download`](Action::Download) and
186/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
187/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
188/// transcode, tag) overlap while the order-sensitive Suno API calls are
189/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
190/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
191/// rename, delete, and artifact writes/deletes) then run serially in plan order.
192///
193/// The outcome is deterministic regardless of completion order: concurrent audio
194/// results are committed to the manifest in plan-index order, so the same plan
195/// always yields the same manifest and counts whatever the concurrency level. A
196/// per-clip failure is recorded and the run continues; only an auth failure or a
197/// full disk aborts, and it does so promptly by stopping further audio work.
198///
199/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
200/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
201/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
202/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
203/// run with the feature off) is tagged exactly as before. The synced `.lrc`
204/// sidecar itself is a generated artifact whose body the caller has already
205/// resolved into the plan, so it is written like any other text sidecar.
206#[allow(clippy::too_many_arguments)]
207pub async fn execute<H, F, G, C>(
208    plan: &Plan,
209    manifest: &mut Manifest,
210    albums: &mut BTreeMap<String, AlbumArt>,
211    playlists: &mut BTreeMap<String, PlaylistState>,
212    desired: &[Desired],
213    synced: &HashMap<String, AlignedLyrics>,
214    ports: Ports<'_, H, F, G, C>,
215    opts: &ExecOptions,
216) -> ExecOutcome
217where
218    H: Http,
219    F: Filesystem,
220    G: Ffmpeg,
221    C: Clock,
222{
223    let Ports {
224        client,
225        http,
226        fs,
227        ffmpeg,
228        clock,
229    } = ports;
230    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
231    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
232    // Every path this run writes, so the inline old-sidecar cleanup never removes
233    // a file another action produces this run (the non-planned twin of
234    // `suppress_path_aliasing`).
235    let write_targets: BTreeSet<String> = plan
236        .actions
237        .iter()
238        .filter_map(|a| match a {
239            Action::Download { path, .. }
240            | Action::Reformat { path, .. }
241            | Action::WriteArtifact { path, .. }
242            | Action::WriteStem { path, .. } => Some(path.clone()),
243            Action::Rename { to, .. } => Some(to.clone()),
244            _ => None,
245        })
246        .collect();
247    // How many tracked artifact slots reference each path. The inline old-path
248    // cleanup removes a path only once nothing else holds it: each slot that
249    // moves away decrements its reference, and the removal fires only when the
250    // count reaches zero and no action writes the path this run. This keeps a
251    // live file a co-referencing slot still owns (a prior failed swap can leave
252    // two clips sharing a path) while letting the last slot to leave reclaim it,
253    // so nothing is orphaned either (#76).
254    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
255    for (_, entry) in manifest.iter() {
256        for path in entry.artifact_paths() {
257            *tracked_paths.entry(path.to_owned()).or_default() += 1;
258        }
259    }
260    for art in albums.values() {
261        for state in [art.folder_jpg.as_ref(), art.folder_webp.as_ref()]
262            .into_iter()
263            .flatten()
264        {
265            *tracked_paths.entry(state.path.clone()).or_default() += 1;
266        }
267    }
268    for playlist in playlists.values() {
269        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
270    }
271    // Static cover art is otherwise fetched twice per clip (#89): once to embed
272    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
273    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
274    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
275    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
276    // its entry on use, so the map holds at most the covers for the clips in
277    // flight (bounded by `concurrency`), never the whole library.
278    let cover_wanted: HashSet<&str> = plan
279        .actions
280        .iter()
281        .filter_map(|action| match action {
282            Action::WriteArtifact {
283                kind: ArtifactKind::CoverJpg,
284                source_url,
285                ..
286            } if !source_url.is_empty() => Some(source_url.as_str()),
287            _ => None,
288        })
289        .collect();
290    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
291    let ctx = Ctx {
292        http,
293        fs,
294        ffmpeg,
295        clock,
296        opts,
297        by_id: &by_id,
298        by_path: &by_path,
299        synced,
300        write_targets: &write_targets,
301        cover_cache: &cover_cache,
302        cover_wanted: &cover_wanted,
303    };
304
305    let mut outcome = ExecOutcome::default();
306
307    // The audio-producing actions ([`Download`](Action::Download) /
308    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
309    // deliberately split so that NO destination write, file removal, or manifest
310    // update happens off the plan's order:
311    //
312    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
313    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
314    //   tag), returning the tagged bytes; and
315    // - a single serial committer below writes those bytes to the destination,
316    //   removes any superseded file, and records the manifest entry, in strict
317    //   plan-index order, interleaved with the non-audio actions.
318    //
319    // The shared client is the only `&mut` port and its API calls must stay
320    // ordered, so it rides behind an async mutex; each producer locks it only for
321    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
322    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
323    // so at most about `concurrency` tagged payloads are ever held in memory -
324    // never the whole library.
325    let client_lock = AsyncMutex::new(client);
326    let concurrency = opts.concurrency.max(1) as usize;
327    let ctx_ref = &ctx;
328    let client_lock_ref = &client_lock;
329    let mut renders = stream::iter(
330        plan.actions
331            .iter()
332            .filter(|action| is_audio_action(action))
333            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
334    )
335    .buffered(concurrency);
336
337    for action in &plan.actions {
338        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
339        // commit them here; every other action applies its own effect. Both the
340        // audio commit and the non-audio apply run serially, so all destination
341        // and manifest effects keep the plan's order exactly as the sequential
342        // executor did.
343        let result = if is_audio_action(action) {
344            match renders.next().await {
345                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
346                Some(Err(fail)) => Err(fail),
347                None => unreachable!("buffered yields one result per audio action"),
348            }
349        } else {
350            ctx.apply(
351                client_lock_ref,
352                action,
353                manifest,
354                albums,
355                playlists,
356                &mut tracked_paths,
357            )
358            .await
359        };
360        match result {
361            Ok(effect) => outcome.record(effect),
362            Err(fail) => {
363                let abort = abort_status(fail.class);
364                outcome.failures.push(Failure {
365                    clip_id: fail.clip_id,
366                    reason: fail.reason,
367                });
368                if let Some(status) = abort {
369                    // A systemic abort stops the run. Dropping the render stream
370                    // cancels any in-flight or completed-but-uncommitted producer;
371                    // because producers touch nothing on disk, the destination and
372                    // manifest are left exactly as the committed prefix wrote them,
373                    // with no untracked files and no removed-but-referenced file.
374                    outcome.status = status;
375                    break;
376                }
377            }
378        }
379    }
380    drop(renders);
381
382    // Renames and deletes can leave an album directory empty; prune those ghost
383    // directories bottom-up. This runs on both the completed and the aborted
384    // paths, and is best-effort: a prune failure is only a missed tidy that the
385    // next run repeats, never a reason to fail the run.
386    let _ = fs.prune_empty_dirs("");
387    outcome
388}
389
390/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
391/// file. Its slow render runs in the concurrent phase; its destination write and
392/// manifest update are committed serially in plan order. Everything else touches
393/// the manifest, album, or playlist stores directly and runs serially.
394fn is_audio_action(action: &Action) -> bool {
395    matches!(action, Action::Download { .. } | Action::Reformat { .. })
396}
397
398/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
399/// committer needs to place them. Produced concurrently and side-effect-free (no
400/// destination write, no removal, no manifest touch); [`commit_audio`] applies
401/// all of those in plan order.
402struct RenderedAudio {
403    clip_id: String,
404    path: String,
405    format: AudioFormat,
406    /// The superseded file to remove after the new one lands (a [`Reformat`]),
407    /// or `None` for a plain [`Download`].
408    from_path: Option<String>,
409    effect: Effect,
410    bytes: Vec<u8>,
411}
412
413/// What an applied action did, for the outcome counters.
414enum Effect {
415    Downloaded,
416    Reformatted,
417    Retagged,
418    Renamed,
419    Deleted,
420    Skipped,
421    ArtifactWritten,
422    ArtifactDeleted,
423}
424
425/// How a failure should be handled (SYNC-17).
426#[derive(Debug, Clone, Copy)]
427enum Class {
428    /// Stop the account run; do not retry.
429    Auth,
430    /// Stop the account run: a full disk is systemic, like auth, so aborting
431    /// beats skipping every remaining clip (each of which would first burn a
432    /// server-side WAV-render budget before failing the same way).
433    Disk,
434    /// Retry a bounded number of times, then record and skip.
435    Transient,
436    /// Record and skip immediately.
437    Permanent,
438}
439
440/// A classified action failure attributed to a clip.
441struct Fail {
442    class: Class,
443    clip_id: String,
444    reason: String,
445}
446
447/// The run-ending status for a failure class, or `None` when the failure is
448/// per-clip and the run continues.
449fn abort_status(class: Class) -> Option<RunStatus> {
450    match class {
451        Class::Auth => Some(RunStatus::AuthAborted),
452        Class::Disk => Some(RunStatus::DiskFull),
453        Class::Transient | Class::Permanent => None,
454    }
455}
456
457fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
458    Fail {
459        class: Class::Auth,
460        clip_id: clip_id.into(),
461        reason: reason.into(),
462    }
463}
464
465fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
466    Fail {
467        class: Class::Transient,
468        clip_id: clip_id.into(),
469        reason: reason.into(),
470    }
471}
472
473fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
474    Fail {
475        class: Class::Permanent,
476        clip_id: clip_id.into(),
477        reason: reason.into(),
478    }
479}
480
481fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
482    Fail {
483        class: Class::Disk,
484        clip_id: clip_id.into(),
485        reason: reason.into(),
486    }
487}
488
489/// Whether an artifact kind is album-scoped folder art (owned by a root id and
490/// recorded on the album store) rather than a per-clip sidecar (recorded on the
491/// manifest).
492fn is_album_kind(kind: ArtifactKind) -> bool {
493    matches!(kind, ArtifactKind::FolderJpg | ArtifactKind::FolderWebp)
494}
495
496/// True for the library-scoped playlist artifact, routed to the playlist store.
497fn is_playlist_kind(kind: ArtifactKind) -> bool {
498    matches!(kind, ArtifactKind::Playlist)
499}
500
501/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
502/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
503/// root/playlist id that is deliberately absent from the manifest.
504fn is_per_clip_kind(kind: ArtifactKind) -> bool {
505    matches!(
506        kind,
507        ArtifactKind::CoverJpg
508            | ArtifactKind::CoverWebp
509            | ArtifactKind::DetailsTxt
510            | ArtifactKind::LyricsTxt
511            | ArtifactKind::Lrc
512            | ArtifactKind::VideoMp4
513    )
514}
515
516/// Recover a playlist's display name from its `.m3u8` path's file stem.
517///
518/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
519/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
520/// this recovered name is a convenience for humans and its lossiness (the
521/// sanitiser is not reversible) never affects a decision.
522fn playlist_name_from_path(path: &str) -> String {
523    std::path::Path::new(path)
524        .file_stem()
525        .map(|stem| stem.to_string_lossy().into_owned())
526        .unwrap_or_default()
527}
528
529/// A classified fetch failure, not yet attributed to a clip.
530struct FetchError {
531    class: Class,
532    reason: String,
533    retry_after: Option<Duration>,
534}
535
536impl FetchError {
537    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
538        Self {
539            class: Class::Transient,
540            reason: reason.into(),
541            retry_after,
542        }
543    }
544
545    fn permanent(reason: impl Into<String>) -> Self {
546        Self {
547            class: Class::Permanent,
548            reason: reason.into(),
549            retry_after: None,
550        }
551    }
552
553    fn attribute(self, clip_id: &str) -> Fail {
554        Fail {
555            class: self.class,
556            clip_id: clip_id.to_owned(),
557            reason: self.reason,
558        }
559    }
560}
561
562/// The shared, read-only context threaded through every action handler.
563struct Ctx<'a, H, F, G, C> {
564    http: &'a H,
565    fs: &'a F,
566    ffmpeg: &'a G,
567    clock: &'a C,
568    opts: &'a ExecOptions,
569    by_id: &'a HashMap<&'a str, &'a Desired>,
570    by_path: &'a HashMap<&'a str, &'a Desired>,
571    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
572    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
573    /// lyric text; a clip absent here is tagged exactly as before. Populated by
574    /// the caller (the fetch is IO), so the engine stays free of direct IO.
575    synced: &'a HashMap<String, AlignedLyrics>,
576    /// Every destination path this run writes (audio downloads and reformats,
577    /// artifact writes, and rename targets). The inline old-sidecar cleanup in
578    /// [`write_artifact`](Ctx::write_artifact) skips any path in this set, so a
579    /// path swap between two clips can never delete a file the same run just
580    /// wrote. This mirrors [`suppress_path_aliasing`] for the one removal that
581    /// is not itself a planned action.
582    write_targets: &'a BTreeSet<String>,
583    /// Static cover art the audio producer already fetched to embed in the tag,
584    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
585    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
586    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
587    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
588    /// concurrent producers only ever insert, and the lock is never held across an
589    /// await.
590    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
591    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
592    /// a cover only when its URL is here, so a clip whose cover is embedded but
593    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
594    cover_wanted: &'a HashSet<&'a str>,
595}
596
597impl<H, F, G, C> Ctx<'_, H, F, G, C>
598where
599    H: Http,
600    F: Filesystem,
601    G: Ffmpeg,
602    C: Clock,
603{
604    /// Apply one non-audio action, returning what it did or why it failed.
605    ///
606    /// Audio actions ([`Download`](Action::Download) /
607    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
608    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
609    async fn apply(
610        &self,
611        client_lock: &ClientLock<'_, C>,
612        action: &Action,
613        manifest: &mut Manifest,
614        albums: &mut BTreeMap<String, AlbumArt>,
615        playlists: &mut BTreeMap<String, PlaylistState>,
616        tracked_paths: &mut HashMap<String, u32>,
617    ) -> Result<Effect, Fail> {
618        match action {
619            Action::Download { .. } | Action::Reformat { .. } => {
620                unreachable!("audio actions are applied in the concurrent phase")
621            }
622            Action::Retag {
623                clip,
624                lineage,
625                path,
626            } => self.retag(manifest, clip, lineage, path).await,
627            Action::Rename { from, to } => self.rename(manifest, from, to),
628            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
629            Action::Skip { clip_id } => {
630                self.refresh_preserve(manifest, clip_id);
631                Ok(Effect::Skipped)
632            }
633            Action::WriteArtifact {
634                kind,
635                path,
636                source_url,
637                hash,
638                owner_id,
639                content,
640            } => {
641                self.write_artifact(
642                    manifest,
643                    albums,
644                    playlists,
645                    *kind,
646                    path,
647                    source_url,
648                    hash,
649                    owner_id,
650                    content.as_deref(),
651                    tracked_paths,
652                )
653                .await
654            }
655            Action::DeleteArtifact {
656                kind,
657                path,
658                owner_id,
659            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
660            Action::WriteStem {
661                clip_id,
662                key,
663                stem_id,
664                path,
665                source_url,
666                format,
667                hash,
668            } => {
669                self.write_stem(
670                    client_lock,
671                    manifest,
672                    clip_id,
673                    key,
674                    stem_id,
675                    path,
676                    source_url,
677                    *format,
678                    hash,
679                )
680                .await
681            }
682            Action::DeleteStem { clip_id, key, path } => {
683                self.delete_stem(manifest, clip_id, key, path)
684            }
685        }
686    }
687
688    /// Render one audio action's tagged bytes, side-effect-free.
689    ///
690    /// This is the concurrent part: it fetches, transcodes, and tags the file
691    /// (through shared ports, plus the client behind `client_lock`), then returns
692    /// the bytes and where they must go. It deliberately writes nothing, removes
693    /// nothing, and never touches `manifest`, so many run at once and an aborted
694    /// run can drop them with no destination or manifest effect. The serial
695    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
696    async fn prepare_audio(
697        &self,
698        client_lock: &ClientLock<'_, C>,
699        action: &Action,
700    ) -> Result<RenderedAudio, Fail> {
701        match action {
702            Action::Download {
703                clip,
704                lineage,
705                path,
706                format,
707            } => {
708                let bytes = self
709                    .produce_audio(client_lock, clip, lineage, *format)
710                    .await?;
711                Ok(RenderedAudio {
712                    clip_id: clip.id.clone(),
713                    path: path.clone(),
714                    format: *format,
715                    from_path: None,
716                    effect: Effect::Downloaded,
717                    bytes,
718                })
719            }
720            Action::Reformat {
721                clip,
722                path,
723                from_path,
724                from: _,
725                to,
726            } => {
727                // A Reformat action carries no lineage, so recover it from the
728                // desired set (the same context that drove naming and the hash),
729                // falling back to a self-rooted context when the clip is not in
730                // the current selection.
731                let lineage = self
732                    .by_id
733                    .get(clip.id.as_str())
734                    .map(|d| d.lineage.clone())
735                    .unwrap_or_else(|| LineageContext::own_root(clip));
736                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
737                Ok(RenderedAudio {
738                    clip_id: clip.id.clone(),
739                    path: path.clone(),
740                    format: *to,
741                    from_path: Some(from_path.clone()),
742                    effect: Effect::Reformatted,
743                    bytes,
744                })
745            }
746            _ => unreachable!("prepare_audio only handles audio actions"),
747        }
748    }
749
750    /// Commit one rendered audio result serially, in plan order.
751    ///
752    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
753    /// the superseded file, then records the manifest entry. Ordering the write
754    /// before the removal keeps a crash from losing both copies; keeping all of
755    /// this off the concurrent phase preserves the sequential executor's plan-order
756    /// guarantee for every destination and manifest effect.
757    fn commit_audio(
758        &self,
759        manifest: &mut Manifest,
760        rendered: RenderedAudio,
761    ) -> Result<Effect, Fail> {
762        let RenderedAudio {
763            clip_id,
764            path,
765            format,
766            from_path,
767            effect,
768            bytes,
769        } = rendered;
770        let size = self.write_verify(&clip_id, &path, &bytes)?;
771        if let Some(from) = from_path {
772            // The new file is safely in place; only now drop the old rendering.
773            self.fs.remove(&from).map_err(|err| {
774                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
775            })?;
776        }
777        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
778        Ok(effect)
779    }
780
781    /// Re-tag the existing file in place to match current metadata and art.
782    async fn retag(
783        &self,
784        manifest: &mut Manifest,
785        clip: &Clip,
786        lineage: &LineageContext,
787        path: &str,
788    ) -> Result<Effect, Fail> {
789        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
790            return Err(permanent_fail(
791                &clip.id,
792                "retag target missing from manifest",
793            ));
794        };
795
796        if format == AudioFormat::Wav {
797            // WAV carries no embedded tags; just record the new hashes so the
798            // next run sees them as current and stops retagging.
799            self.refresh_hashes(manifest, &clip.id, None);
800            return Ok(Effect::Retagged);
801        }
802
803        let (meta, synced) = self.track_meta(clip, lineage);
804        let cover = self.fetch_cover(clip).await;
805        let existing = self
806            .fs
807            .read(path)
808            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
809        let tagged = match format {
810            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
811            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
812            AudioFormat::Wav => unreachable!("WAV handled above"),
813        }
814        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
815        let size = self.write_verify(&clip.id, path, &tagged)?;
816        self.refresh_hashes(manifest, &clip.id, Some(size));
817        Ok(Effect::Retagged)
818    }
819
820    /// Move the file and update the entry's path (and protection).
821    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
822        let label = self
823            .by_path
824            .get(to)
825            .map(|d| d.clip.id.clone())
826            .unwrap_or_else(|| to.to_owned());
827        self.fs.rename(from, to).map_err(|err| {
828            if err.is_out_of_space() {
829                disk_fail(label, "disk full: no space left to rename")
830            } else {
831                permanent_fail(label, format!("rename failed: {err}"))
832            }
833        })?;
834
835        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
836            manifest
837                .entries
838                .iter()
839                .find(|(_, entry)| entry.path == from)
840                .map(|(id, _)| id.clone())
841        });
842        if let Some(id) = clip_id
843            && let Some(entry) = manifest.entries.get_mut(&id)
844        {
845            entry.path = to.to_owned();
846            if let Some(d) = self.by_path.get(to) {
847                entry.preserve = preserve_for(d);
848            }
849        }
850        Ok(Effect::Renamed)
851    }
852
853    /// Remove the file and drop the manifest entry.
854    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
855        self.fs
856            .remove(path)
857            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
858        manifest.remove(clip_id);
859        Ok(Effect::Deleted)
860    }
861
862    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
863    /// on the owning manifest entry.
864    ///
865    /// The fetch and write share the audio path's resilience: `fetch_bytes`
866    /// retries transient failures and verifies `Content-Length`, and
867    /// `write_verify` confirms the on-disk size. A failure is attributed to the
868    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
869    /// aborts the whole run (only an auth failure or a full disk does, matching
870    /// audio).
871    ///
872    /// The bytes written depend on the kind: a static cover is the fetched image
873    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
874    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
875    ///
876    /// A sidecar is only ever written for a clip whose audio is present: a
877    /// successful `Download`/`Reformat` creates the manifest entry earlier in
878    /// this run, and a prior-run clip already has one. So an absent owning entry
879    /// means the audio failed or never existed this run; we skip (no fetch, no
880    /// write) rather than strand an untracked sidecar with no owning audio.
881    ///
882    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg) /
883    /// [`FolderWebp`](ArtifactKind::FolderWebp)) is album-scoped: its `owner_id`
884    /// is the album's stable root id, not a manifest clip, so it skips the
885    /// manifest presence guard and records its state on the album store instead.
886    ///
887    /// When a title or album change moves the audio, reconcile re-emits this
888    /// write at the NEW path; this handler then removes the sidecar left at the
889    /// artifact's previously tracked path, moving it rather than orphaning it.
890    /// The removal happens only after the new file is safely written, and a
891    /// remove failure returns before the state slot advances, so the next run
892    /// re-plans the identical write and retries — self-healing, never an orphan.
893    #[allow(clippy::too_many_arguments)]
894    async fn write_artifact(
895        &self,
896        manifest: &mut Manifest,
897        albums: &mut BTreeMap<String, AlbumArt>,
898        playlists: &mut BTreeMap<String, PlaylistState>,
899        kind: ArtifactKind,
900        path: &str,
901        source_url: &str,
902        hash: &str,
903        owner_id: &str,
904        content: Option<&str>,
905        tracked_paths: &mut HashMap<String, u32>,
906    ) -> Result<Effect, Fail> {
907        // A per-song sidecar needs its owning clip's manifest entry; album and
908        // playlist kinds are keyed elsewhere and skip this guard.
909        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
910            // The owning audio never landed this run, so this sidecar is skipped
911            // and will never drain a cover the producer cached for it. Drop that
912            // entry now: an insert without a matching sidecar write must not
913            // outlive its clip, keeping `cover_cache` bounded to the clips in
914            // flight (#89). A non-cover kind has no entry here, so this is a
915            // harmless no-op for them.
916            self.cover_cache
917                .lock()
918                .expect("cover cache mutex poisoned")
919                .remove(source_url);
920            return Ok(Effect::Skipped);
921        }
922        // Capture the path this artifact was last tracked at, BEFORE the slot is
923        // overwritten below, so a path-changing write (a title/album rename that
924        // moves the audio) can clean up the old sidecar it left behind. Cover
925        // kinds live on the manifest, folder kinds on the album store; playlists
926        // reconcile their own old-path delete and so opt out here.
927        let old_path = match kind {
928            ArtifactKind::CoverJpg => manifest
929                .get(owner_id)
930                .and_then(|e| e.cover_jpg.as_ref())
931                .map(|s| s.path.clone()),
932            ArtifactKind::CoverWebp => manifest
933                .get(owner_id)
934                .and_then(|e| e.cover_webp.as_ref())
935                .map(|s| s.path.clone()),
936            ArtifactKind::DetailsTxt => manifest
937                .get(owner_id)
938                .and_then(|e| e.details_txt.as_ref())
939                .map(|s| s.path.clone()),
940            ArtifactKind::LyricsTxt => manifest
941                .get(owner_id)
942                .and_then(|e| e.lyrics_txt.as_ref())
943                .map(|s| s.path.clone()),
944            ArtifactKind::Lrc => manifest
945                .get(owner_id)
946                .and_then(|e| e.lrc.as_ref())
947                .map(|s| s.path.clone()),
948            ArtifactKind::VideoMp4 => manifest
949                .get(owner_id)
950                .and_then(|e| e.video_mp4.as_ref())
951                .map(|s| s.path.clone()),
952            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp => albums
953                .get(owner_id)
954                .and_then(|a| a.artifact(kind))
955                .map(|s| s.path.clone()),
956            ArtifactKind::Playlist => None,
957        };
958        // A generated artifact (a playlist) carries its body inline and never
959        // touches the network; a fetched one pulls (and transcodes) its source.
960        let bytes = match content {
961            Some(text) => text.as_bytes().to_vec(),
962            None => self.artifact_bytes(kind, source_url, owner_id).await?,
963        };
964        self.write_verify(owner_id, path, &bytes)?;
965        // The new sidecar is safely in place; only now drop a stale copy left at
966        // the previous path (the audio moved). `remove` is idempotent, so an
967        // already-absent old file is fine. On a genuine remove failure we return
968        // BEFORE updating the slot, leaving the manifest/album pointing at the
969        // old path: the next run sees the same path drift, re-plans this write,
970        // and retries the cleanup — convergent, no orphan persists.
971        //
972        // The removal is gated so it can never delete a live file (#76). This
973        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
974        // file is removed only once nothing else holds it — no other tracked slot
975        // still references it (count now zero) and no action writes it this run
976        // (`write_targets`, the non-planned twin of `suppress_path_aliasing`).
977        // On a path swap (A: x -> y while B: y -> x) `write_targets` keeps each
978        // freshly written file; when two slots share a path after a prior failed
979        // swap, the first to move keeps it and the last to leave reclaims it, so
980        // a co-owned file is never deleted and a vacated one is never orphaned.
981        if let Some(old) = old_path.as_deref()
982            && !old.is_empty()
983            && old != path
984        {
985            let still_referenced = tracked_paths
986                .get_mut(old)
987                .map(|count| {
988                    *count = count.saturating_sub(1);
989                    *count > 0
990                })
991                .unwrap_or(false);
992            if !still_referenced && !self.write_targets.contains(old) {
993                self.fs.remove(old).map_err(|err| {
994                    permanent_fail(
995                        owner_id,
996                        format!("could not remove old sidecar {old}: {err}"),
997                    )
998                })?;
999            }
1000        }
1001        if is_album_kind(kind) {
1002            albums.entry(owner_id.to_owned()).or_default().set(
1003                kind,
1004                Some(ArtifactState {
1005                    path: path.to_owned(),
1006                    hash: hash.to_owned(),
1007                }),
1008            );
1009        } else if is_playlist_kind(kind) {
1010            playlists.insert(
1011                owner_id.to_owned(),
1012                PlaylistState {
1013                    name: playlist_name_from_path(path),
1014                    path: path.to_owned(),
1015                    hash: hash.to_owned(),
1016                },
1017            );
1018        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1019            set_manifest_artifact(
1020                entry,
1021                kind,
1022                Some(ArtifactState {
1023                    path: path.to_owned(),
1024                    hash: hash.to_owned(),
1025                }),
1026            );
1027        }
1028        Ok(Effect::ArtifactWritten)
1029    }
1030
1031    /// Produce a sidecar's bytes from its source, branching on kind.
1032    ///
1033    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1034    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1035    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1036    /// ffmpeg port; every other kind is the fetched source verbatim (e.g. the
1037    /// static [`CoverJpg`](ArtifactKind::CoverJpg) or album
1038    /// [`FolderJpg`](ArtifactKind::FolderJpg) image). A fetch or transcode failure
1039    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1040    /// disk-full transcode, which aborts the run like the audio FLAC path.
1041    async fn artifact_bytes(
1042        &self,
1043        kind: ArtifactKind,
1044        source_url: &str,
1045        owner_id: &str,
1046    ) -> Result<Vec<u8>, Fail> {
1047        // Reuse the cover the audio producer already fetched for the embedded tag
1048        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1049        // is taken and dropped in its own statement so it never spans the await.
1050        let cached = self
1051            .cover_cache
1052            .lock()
1053            .expect("cover cache mutex poisoned")
1054            .remove(source_url);
1055        let source = match cached {
1056            Some(bytes) => bytes,
1057            None => self
1058                .fetch_bytes(source_url)
1059                .await
1060                .map_err(|err| err.attribute(owner_id))?,
1061        };
1062        match kind {
1063            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1064                .ffmpeg
1065                .mp4_to_webp(&source, self.opts.cover_webp)
1066                .await
1067                .map_err(|err| {
1068                    if err.is_out_of_space() {
1069                        disk_fail(owner_id, "disk full: no space left to transcode")
1070                    } else {
1071                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1072                    }
1073                }),
1074            // The text sidecars are generated and always carry inline content, so
1075            // `write_artifact` never reaches this fetch path for them. Guard it so
1076            // a future miswiring fails loudly rather than fetching a URL.
1077            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1078                permanent_fail(owner_id, "text sidecar requires inline content"),
1079            ),
1080            ArtifactKind::CoverJpg
1081            | ArtifactKind::FolderJpg
1082            | ArtifactKind::Playlist
1083            | ArtifactKind::VideoMp4 => Ok(source),
1084        }
1085    }
1086
1087    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1088    ///
1089    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1090    /// When the owning entry is already gone (its audio was deleted earlier this
1091    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1092    ///
1093    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1094    /// the album's root id, not on a manifest clip.
1095    ///
1096    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1097    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1098    /// untracked, but the design stays convergent rather than transactional: the
1099    /// next run re-plans the same removal and retries, and any directory it would
1100    /// have emptied is pruned once the file finally clears.
1101    fn delete_artifact(
1102        &self,
1103        manifest: &mut Manifest,
1104        albums: &mut BTreeMap<String, AlbumArt>,
1105        playlists: &mut BTreeMap<String, PlaylistState>,
1106        kind: ArtifactKind,
1107        path: &str,
1108        owner_id: &str,
1109    ) -> Result<Effect, Fail> {
1110        self.fs
1111            .remove(path)
1112            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1113        if is_album_kind(kind) {
1114            if let Some(art) = albums.get_mut(owner_id) {
1115                art.set(kind, None);
1116                if art.is_empty() {
1117                    albums.remove(owner_id);
1118                }
1119            }
1120        } else if is_playlist_kind(kind) {
1121            playlists.remove(owner_id);
1122        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1123            set_manifest_artifact(entry, kind, None);
1124        }
1125        Ok(Effect::ArtifactDeleted)
1126    }
1127
1128    /// Fetch one stem's bytes, write them atomically, then record the stem on
1129    /// the owning clip's keyed stem map.
1130    ///
1131    /// Mirrors [`write_artifact`](Self::write_artifact) for the keyed-stem case,
1132    /// sharing the fetch resilience (`fetch_bytes` retries and verifies
1133    /// `Content-Length`) and the atomic size-verified write. A stem is only ever
1134    /// written for a clip whose audio is present, so an absent owning manifest
1135    /// entry means the audio failed or never existed this run; we skip rather
1136    /// than strand an untracked stem with no owning audio.
1137    ///
1138    /// Stems are stored RAW in their native container and are NEVER transcoded to
1139    /// FLAC, even when the song's own format is FLAC — they are the deliberate
1140    /// exception. A `Wav` stem is rendered through the free `convert_wav` flow
1141    /// (see [`fetch_stem_bytes`](Self::fetch_stem_bytes)); an `Mp3` stem is fetched
1142    /// straight from its public CDN url. Either way the bytes land verbatim at
1143    /// `path`, whose extension already matches the stem format.
1144    ///
1145    /// When a title/album change moves the song, reconcile re-emits this write at
1146    /// the NEW path; this handler then removes the stem left at the previously
1147    /// tracked path, moving it rather than orphaning it. The removal happens only
1148    /// after the new file is safely written and only when nothing else this run
1149    /// writes that path, and a remove failure returns before the slot advances so
1150    /// the next run re-plans the identical write and retries — self-healing.
1151    #[allow(clippy::too_many_arguments)]
1152    async fn write_stem(
1153        &self,
1154        client_lock: &ClientLock<'_, C>,
1155        manifest: &mut Manifest,
1156        clip_id: &str,
1157        key: &str,
1158        stem_id: &str,
1159        path: &str,
1160        source_url: &str,
1161        format: StemFormat,
1162        hash: &str,
1163    ) -> Result<Effect, Fail> {
1164        // A stem needs its owning clip's manifest entry (its audio must exist).
1165        if manifest.get(clip_id).is_none() {
1166            return Ok(Effect::Skipped);
1167        }
1168        let old_path = manifest
1169            .get(clip_id)
1170            .and_then(|e| e.stems.get(key))
1171            .map(|s| s.path.clone());
1172        let bytes = self
1173            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1174            .await?;
1175        self.write_verify(clip_id, path, &bytes)?;
1176        // The new stem is in place; only now drop a stale copy left at the old
1177        // path (the song moved, or the stem format changed). `remove` is
1178        // idempotent. A path this run also writes is never removed (the
1179        // non-planned twin of `suppress_path_aliasing`). On a genuine remove
1180        // failure we return BEFORE updating the slot, so the next run re-plans the
1181        // same write and retries the cleanup — no orphan.
1182        if let Some(old) = old_path.as_deref()
1183            && !old.is_empty()
1184            && old != path
1185            && !self.write_targets.contains(old)
1186        {
1187            self.fs.remove(old).map_err(|err| {
1188                permanent_fail(clip_id, format!("could not remove old stem {old}: {err}"))
1189            })?;
1190        }
1191        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1192            set_manifest_stem(
1193                entry,
1194                key,
1195                Some(ArtifactState {
1196                    path: path.to_owned(),
1197                    hash: hash.to_owned(),
1198                }),
1199            );
1200        }
1201        Ok(Effect::ArtifactWritten)
1202    }
1203
1204    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1205    ///
1206    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1207    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1208    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1209    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1210    /// stem with no id to render) downloads its public CDN url directly. Stems
1211    /// are the deliberate exception to the source format: the bytes are returned
1212    /// exactly as delivered and are never re-encoded to FLAC.
1213    async fn fetch_stem_bytes(
1214        &self,
1215        client_lock: &ClientLock<'_, C>,
1216        clip_id: &str,
1217        stem_id: &str,
1218        source_url: &str,
1219        format: StemFormat,
1220    ) -> Result<Vec<u8>, Fail> {
1221        let url = match format {
1222            StemFormat::Wav if !stem_id.is_empty() => {
1223                match self.resolve_wav_url(client_lock, stem_id).await? {
1224                    Some(url) => url,
1225                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1226                }
1227            }
1228            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1229            _ => source_url.to_owned(),
1230        };
1231        self.fetch_bytes(&url)
1232            .await
1233            .map_err(|err| err.attribute(clip_id))
1234    }
1235
1236    /// Remove one stem file and clear its slot in the owning clip's stem map.
1237    ///
1238    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1239    /// the owning entry is already gone (its audio was deleted earlier this run,
1240    /// co-deleting the stem), there is no slot to clear and that is fine; the
1241    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1242    fn delete_stem(
1243        &self,
1244        manifest: &mut Manifest,
1245        clip_id: &str,
1246        key: &str,
1247        path: &str,
1248    ) -> Result<Effect, Fail> {
1249        self.fs
1250            .remove(path)
1251            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1252        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1253            set_manifest_stem(entry, key, None);
1254        }
1255        Ok(Effect::ArtifactDeleted)
1256    }
1257
1258    /// Download (and transcode/tag) the audio for `clip` in `format`.
1259    async fn produce_audio(
1260        &self,
1261        client_lock: &ClientLock<'_, C>,
1262        clip: &Clip,
1263        lineage: &LineageContext,
1264        format: AudioFormat,
1265    ) -> Result<Vec<u8>, Fail> {
1266        let (meta, synced) = self.track_meta(clip, lineage);
1267        match format {
1268            AudioFormat::Mp3 => {
1269                let url = clip.mp3_url();
1270                let audio = self
1271                    .fetch_bytes(&url)
1272                    .await
1273                    .map_err(|err| err.attribute(&clip.id))?;
1274                let cover = self.fetch_cover(clip).await;
1275                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1276                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1277            }
1278            AudioFormat::Flac => {
1279                let wav = self.fetch_wav(client_lock, clip).await?;
1280                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1281                    if err.is_out_of_space() {
1282                        disk_fail(&clip.id, "disk full: no space left to transcode")
1283                    } else {
1284                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1285                    }
1286                })?;
1287                let cover = self.fetch_cover(clip).await;
1288                tag_flac(&flac, &meta, cover.as_deref())
1289                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1290            }
1291            AudioFormat::Wav => self.fetch_wav(client_lock, clip).await,
1292        }
1293    }
1294
1295    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1296    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1297        self.synced
1298            .get(clip_id)
1299            .filter(|aligned| !aligned.is_empty())
1300    }
1301
1302    /// The track metadata for a clip, paired with its synced lyrics (if any).
1303    ///
1304    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1305    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1306    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1307    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1308    fn track_meta<'m>(
1309        &'m self,
1310        clip: &Clip,
1311        lineage: &LineageContext,
1312    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1313        let synced = self.synced_for(&clip.id);
1314        let mut meta = TrackMetadata::from_clip(clip, lineage);
1315        if let Some(aligned) = synced {
1316            meta.lyrics = aligned.plain_text();
1317        }
1318        (meta, synced)
1319    }
1320
1321    /// Resolve the rendered WAV URL and download it.
1322    async fn fetch_wav(
1323        &self,
1324        client_lock: &ClientLock<'_, C>,
1325        clip: &Clip,
1326    ) -> Result<Vec<u8>, Fail> {
1327        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1328            Some(url) => url,
1329            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1330        };
1331        self.fetch_bytes(&url)
1332            .await
1333            .map_err(|err| err.attribute(&clip.id))
1334    }
1335
1336    /// Read the WAV URL, requesting a render and polling if it is not ready.
1337    ///
1338    /// `None` means the render did not become ready within the poll budget; the
1339    /// caller treats that as a non-fatal transient failure, never a silent skip.
1340    ///
1341    /// Each client call briefly locks `client_lock`; the poll waits happen
1342    /// unlocked, so concurrent clips interleave their WAV renders rather than
1343    /// serialising behind one clip's whole poll budget.
1344    async fn resolve_wav_url(
1345        &self,
1346        client_lock: &ClientLock<'_, C>,
1347        id: &str,
1348    ) -> Result<Option<String>, Fail> {
1349        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1350            return Ok(Some(url));
1351        }
1352        self.request_wav_retrying(client_lock, id).await?;
1353        for _ in 0..self.opts.wav_poll_attempts {
1354            self.clock.sleep(self.opts.wav_poll_interval).await;
1355            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1356                return Ok(Some(url));
1357            }
1358        }
1359        Ok(None)
1360    }
1361
1362    /// Read the rendered WAV URL, retrying transient API failures with backoff
1363    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1364    async fn wav_url_retrying(
1365        &self,
1366        client_lock: &ClientLock<'_, C>,
1367        id: &str,
1368    ) -> Result<Option<String>, Fail> {
1369        let mut attempt: u32 = 0;
1370        loop {
1371            let result = {
1372                let mut client = client_lock.lock().await;
1373                client.wav_url(self.http, id).await
1374            };
1375            match result {
1376                Ok(url) => return Ok(url),
1377                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1378                    Some(fail) => return Err(fail),
1379                    None => continue,
1380                },
1381            }
1382        }
1383    }
1384
1385    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1386    async fn request_wav_retrying(
1387        &self,
1388        client_lock: &ClientLock<'_, C>,
1389        id: &str,
1390    ) -> Result<(), Fail> {
1391        let mut attempt: u32 = 0;
1392        loop {
1393            let result = {
1394                let mut client = client_lock.lock().await;
1395                client.request_wav(self.http, id).await
1396            };
1397            match result {
1398                Ok(()) => return Ok(()),
1399                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1400                    Some(fail) => return Err(fail),
1401                    None => continue,
1402                },
1403            }
1404        }
1405    }
1406
1407    /// Classify a core error from the authenticated WAV flow. On a transient
1408    /// class within budget, back off through the [`Clock`] and return `None` to
1409    /// retry; otherwise return the terminal [`Fail`].
1410    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1411        let fail = classify_core(id, err);
1412        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1413            self.clock.sleep(backoff_delay(*attempt, None)).await;
1414            *attempt += 1;
1415            None
1416        } else {
1417            Some(fail)
1418        }
1419    }
1420
1421    /// GET `url`, retrying transient failures with backoff, verifying size.
1422    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1423        let mut attempt: u32 = 0;
1424        loop {
1425            let result = self.http.send(HttpRequest::get(url)).await;
1426            match classify_response(result) {
1427                Ok(body) => return Ok(body),
1428                Err(err) => {
1429                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1430                        let delay = backoff_delay(attempt, err.retry_after);
1431                        self.clock.sleep(delay).await;
1432                        attempt += 1;
1433                        continue;
1434                    }
1435                    return Err(err);
1436                }
1437            }
1438        }
1439    }
1440
1441    /// Download cover art, trying each candidate URL in order; `None` is fine.
1442    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1443        for url in clip.cover_candidates() {
1444            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1445                && (200..=299).contains(&response.status)
1446                && !response.body.is_empty()
1447            {
1448                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1449                // bytes so its write reuses them instead of fetching again (#89).
1450                // The lock guards only the insert, never the await above.
1451                if self.cover_wanted.contains(url) {
1452                    self.cover_cache
1453                        .lock()
1454                        .expect("cover cache mutex poisoned")
1455                        .insert(url.to_owned(), response.body.clone());
1456                }
1457                return Some(response.body);
1458            }
1459        }
1460        None
1461    }
1462
1463    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1464    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1465        self.fs.write_atomic(path, bytes).map_err(|err| {
1466            if err.is_out_of_space() {
1467                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1468            } else {
1469                permanent_fail(clip_id, format!("write failed: {err}"))
1470            }
1471        })?;
1472        match self.fs.metadata(path) {
1473            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1474            Some(stat) => Err(permanent_fail(
1475                clip_id,
1476                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1477            )),
1478            None => Ok(bytes.len() as u64),
1479        }
1480    }
1481
1482    /// Build the manifest entry for a freshly written file.
1483    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1484        match self.by_id.get(clip_id) {
1485            Some(d) => manifest_entry(d, size),
1486            None => ManifestEntry {
1487                path: path.to_owned(),
1488                format,
1489                size,
1490                ..ManifestEntry::default()
1491            },
1492        }
1493    }
1494
1495    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1496    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1497        let desired = self.by_id.get(clip_id).copied();
1498        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1499            if let Some(d) = desired {
1500                entry.meta_hash = d.meta_hash.clone();
1501                entry.art_hash = d.art_hash.clone();
1502                entry.preserve = preserve_for(d);
1503            }
1504            if let Some(size) = size {
1505                entry.size = size;
1506            }
1507        }
1508    }
1509
1510    /// Refresh only an entry's preserve marker from the current desired state.
1511    ///
1512    /// A clip can gain or lose copy/private protection with no file change, which
1513    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1514    /// persisted marker a faithful image of live protection, so the cross-run
1515    /// delete guard (SYNC-8) never reads it stale.
1516    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1517        if let Some(d) = self.by_id.get(clip_id).copied()
1518            && let Some(entry) = manifest.entries.get_mut(clip_id)
1519        {
1520            entry.preserve = preserve_for(d);
1521        }
1522    }
1523}
1524
1525/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1526fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1527    ManifestEntry {
1528        path: d.path.clone(),
1529        format: d.format,
1530        meta_hash: d.meta_hash.clone(),
1531        art_hash: d.art_hash.clone(),
1532        size,
1533        preserve: preserve_for(d),
1534        ..Default::default()
1535    }
1536}
1537
1538/// Whether a written entry must be preserved across runs: held by any copy
1539/// source, or private. The reconcile delete guard reads this marker later.
1540fn preserve_for(d: &Desired) -> bool {
1541    d.private || d.modes.contains(&SourceMode::Copy)
1542}
1543
1544/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1545fn classify_response(
1546    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1547) -> Result<Vec<u8>, FetchError> {
1548    let response = match result {
1549        Ok(response) => response,
1550        Err(err) => {
1551            return Err(FetchError::transient(
1552                format!("transport error: {err}"),
1553                None,
1554            ));
1555        }
1556    };
1557    match response.status {
1558        200..=299 => {
1559            if let Some(expected) = content_length(&response) {
1560                let actual = response.body.len() as u64;
1561                if actual != expected {
1562                    return Err(FetchError::transient(
1563                        format!("truncated download: {actual} of {expected} bytes"),
1564                        None,
1565                    ));
1566                }
1567            }
1568            Ok(response.body)
1569        }
1570        401 | 403 => Err(FetchError::transient(
1571            format!("download rejected: status {}", response.status),
1572            None,
1573        )),
1574        408 => Err(FetchError::transient("request timed out", None)),
1575        429 => Err(FetchError::transient(
1576            "rate limited",
1577            retry_after(&response),
1578        )),
1579        500..=599 => Err(FetchError::transient(
1580            format!("server error {}", response.status),
1581            None,
1582        )),
1583        status => Err(FetchError::permanent(format!(
1584            "download failed: status {status}"
1585        ))),
1586    }
1587}
1588
1589/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1590fn classify_core(id: &str, err: Error) -> Fail {
1591    let reason = err.to_string();
1592    match err {
1593        Error::Auth(_) => auth_fail(id, reason),
1594        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1595        Error::Api(_)
1596        | Error::NotFound(_)
1597        | Error::Tag(_)
1598        | Error::Config(_)
1599        | Error::Refused(_) => permanent_fail(id, reason),
1600    }
1601}
1602
1603/// The provider-reported body size from `Content-Length`, if present and valid.
1604fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1605    response.header("content-length")?.trim().parse().ok()
1606}
1607
1608#[cfg(test)]
1609mod tests {
1610    use super::*;
1611    use crate::ClerkAuth;
1612    use crate::http::HttpResponse;
1613    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1614
1615    fn clip(id: &str) -> Clip {
1616        Clip {
1617            id: id.to_owned(),
1618            title: "Song".to_owned(),
1619            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1620            ..Default::default()
1621        }
1622    }
1623
1624    fn art_clip(id: &str) -> Clip {
1625        Clip {
1626            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1627            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1628            ..clip(id)
1629        }
1630    }
1631
1632    fn ext(format: AudioFormat) -> &'static str {
1633        match format {
1634            AudioFormat::Mp3 => "mp3",
1635            AudioFormat::Flac => "flac",
1636            AudioFormat::Wav => "wav",
1637        }
1638    }
1639
1640    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1641        Desired {
1642            path: format!("{}.{}", clip.id, ext(format)),
1643            lineage: LineageContext::own_root(&clip),
1644            clip,
1645            format,
1646            meta_hash: "m".to_owned(),
1647            art_hash: "art".to_owned(),
1648            modes: vec![SourceMode::Mirror],
1649            trashed: false,
1650            private: false,
1651            artifacts: Vec::new(),
1652            stems: None,
1653        }
1654    }
1655
1656    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1657        ManifestEntry {
1658            path: path.to_owned(),
1659            format,
1660            meta_hash: "old".to_owned(),
1661            art_hash: "old-art".to_owned(),
1662            size: 8,
1663            preserve: false,
1664            ..Default::default()
1665        }
1666    }
1667
1668    #[allow(clippy::too_many_arguments)]
1669    fn run<G: Ffmpeg>(
1670        plan: &Plan,
1671        manifest: &mut Manifest,
1672        desired: &[Desired],
1673        http: &ScriptedHttp,
1674        fs: &MemFs,
1675        ffmpeg: &G,
1676        clock: &RecordingClock,
1677        opts: &ExecOptions,
1678    ) -> ExecOutcome {
1679        let mut albums = BTreeMap::new();
1680        run_with_albums(
1681            plan,
1682            manifest,
1683            &mut albums,
1684            desired,
1685            http,
1686            fs,
1687            ffmpeg,
1688            clock,
1689            opts,
1690        )
1691    }
1692
1693    #[allow(clippy::too_many_arguments)]
1694    fn run_with_albums<G: Ffmpeg>(
1695        plan: &Plan,
1696        manifest: &mut Manifest,
1697        albums: &mut BTreeMap<String, AlbumArt>,
1698        desired: &[Desired],
1699        http: &ScriptedHttp,
1700        fs: &MemFs,
1701        ffmpeg: &G,
1702        clock: &RecordingClock,
1703        opts: &ExecOptions,
1704    ) -> ExecOutcome {
1705        let mut playlists = BTreeMap::new();
1706        run_full(
1707            plan,
1708            manifest,
1709            albums,
1710            &mut playlists,
1711            desired,
1712            http,
1713            fs,
1714            ffmpeg,
1715            clock,
1716            opts,
1717        )
1718    }
1719
1720    #[allow(clippy::too_many_arguments)]
1721    fn run_full<G: Ffmpeg>(
1722        plan: &Plan,
1723        manifest: &mut Manifest,
1724        albums: &mut BTreeMap<String, AlbumArt>,
1725        playlists: &mut BTreeMap<String, PlaylistState>,
1726        desired: &[Desired],
1727        http: &ScriptedHttp,
1728        fs: &MemFs,
1729        ffmpeg: &G,
1730        clock: &RecordingClock,
1731        opts: &ExecOptions,
1732    ) -> ExecOutcome {
1733        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1734        let synced = HashMap::new();
1735        pollster::block_on(execute(
1736            plan,
1737            manifest,
1738            albums,
1739            playlists,
1740            desired,
1741            &synced,
1742            Ports {
1743                client: &mut client,
1744                http,
1745                fs,
1746                ffmpeg,
1747                clock,
1748            },
1749            opts,
1750        ))
1751    }
1752
1753    fn small_poll() -> ExecOptions {
1754        ExecOptions {
1755            max_retries: 3,
1756            wav_poll_attempts: 2,
1757            wav_poll_interval: Duration::from_secs(5),
1758            concurrency: 4,
1759            cover_webp: WebpEncodeSettings::default(),
1760        }
1761    }
1762
1763    // ── Download: MP3 ───────────────────────────────────────────────
1764
1765    #[test]
1766    fn download_mp3_writes_tagged_file_and_records_manifest() {
1767        let c = art_clip("a");
1768        let d = desired(c.clone(), AudioFormat::Mp3);
1769        let plan = Plan {
1770            actions: vec![Action::Download {
1771                clip: c.clone(),
1772                lineage: LineageContext::own_root(&c),
1773                path: d.path.clone(),
1774                format: AudioFormat::Mp3,
1775            }],
1776        };
1777        let http = ScriptedHttp::new()
1778            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1779            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1780        let fs = MemFs::new();
1781        let ffmpeg = StubFfmpeg::flac();
1782        let clock = RecordingClock::new();
1783        let mut manifest = Manifest::new();
1784
1785        let outcome = run(
1786            &plan,
1787            &mut manifest,
1788            &[d],
1789            &http,
1790            &fs,
1791            &ffmpeg,
1792            &clock,
1793            &ExecOptions::default(),
1794        );
1795
1796        assert_eq!(outcome.downloaded, 1);
1797        assert_eq!(outcome.failed(), 0);
1798        assert_eq!(outcome.status, RunStatus::Completed);
1799        let written = fs.read_file("a.mp3").unwrap();
1800        assert_eq!(&written[..3], b"ID3");
1801        assert!(written.ends_with(b"mp3-body"));
1802        let entry = manifest.get("a").unwrap();
1803        assert_eq!(entry.path, "a.mp3");
1804        assert_eq!(entry.format, AudioFormat::Mp3);
1805        assert_eq!(entry.meta_hash, "m");
1806        assert_eq!(entry.art_hash, "art");
1807        assert_eq!(entry.size, written.len() as u64);
1808        assert!(!entry.preserve);
1809    }
1810
1811    #[test]
1812    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
1813        // A clip whose alignment was fetched this run gets a word-level SYLT frame
1814        // and its plain lyric text embedded (USLT), end to end through execute.
1815        let c = art_clip("a");
1816        let d = desired(c.clone(), AudioFormat::Mp3);
1817        let plan = Plan {
1818            actions: vec![Action::Download {
1819                clip: c.clone(),
1820                lineage: LineageContext::own_root(&c),
1821                path: d.path.clone(),
1822                format: AudioFormat::Mp3,
1823            }],
1824        };
1825        let http = ScriptedHttp::new()
1826            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1827            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1828        let fs = MemFs::new();
1829        let ffmpeg = StubFfmpeg::flac();
1830        let clock = RecordingClock::new();
1831        let mut manifest = Manifest::new();
1832        let mut albums = BTreeMap::new();
1833        let mut playlists = BTreeMap::new();
1834        let mut synced = HashMap::new();
1835        synced.insert(
1836            "a".to_string(),
1837            AlignedLyrics::from_json(&serde_json::json!({
1838                "aligned_words": [],
1839                "aligned_lyrics": [
1840                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
1841                     "words": [
1842                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
1843                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
1844                     ]}
1845                ]
1846            })),
1847        );
1848        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1849        let outcome = pollster::block_on(execute(
1850            &plan,
1851            &mut manifest,
1852            &mut albums,
1853            &mut playlists,
1854            &[d],
1855            &synced,
1856            Ports {
1857                client: &mut client,
1858                http: &http,
1859                fs: &fs,
1860                ffmpeg: &ffmpeg,
1861                clock: &clock,
1862            },
1863            &ExecOptions::default(),
1864        ));
1865
1866        assert_eq!(outcome.downloaded, 1);
1867        let written = fs.read_file("a.mp3").unwrap();
1868        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1869        assert_eq!(
1870            tag.synchronised_lyrics().count(),
1871            1,
1872            "a SYLT frame is embedded"
1873        );
1874        // The plain lyric text is populated from the alignment for the USLT frame.
1875        assert_eq!(
1876            tag.lyrics().next().map(|frame| frame.text.as_str()),
1877            Some("hi there")
1878        );
1879    }
1880
1881    #[test]
1882    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
1883        // The synced map is empty when the feature is off (no alignment fetched),
1884        // so no SYLT frame and no lyric text are embedded.
1885        let c = art_clip("a");
1886        let d = desired(c.clone(), AudioFormat::Mp3);
1887        let plan = Plan {
1888            actions: vec![Action::Download {
1889                clip: c.clone(),
1890                lineage: LineageContext::own_root(&c),
1891                path: d.path.clone(),
1892                format: AudioFormat::Mp3,
1893            }],
1894        };
1895        let http = ScriptedHttp::new()
1896            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1897            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1898        let fs = MemFs::new();
1899        let ffmpeg = StubFfmpeg::flac();
1900        let clock = RecordingClock::new();
1901        let mut manifest = Manifest::new();
1902        let mut albums = BTreeMap::new();
1903        let mut playlists = BTreeMap::new();
1904        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1905        let outcome = pollster::block_on(execute(
1906            &plan,
1907            &mut manifest,
1908            &mut albums,
1909            &mut playlists,
1910            &[d],
1911            &HashMap::new(),
1912            Ports {
1913                client: &mut client,
1914                http: &http,
1915                fs: &fs,
1916                ffmpeg: &ffmpeg,
1917                clock: &clock,
1918            },
1919            &ExecOptions::default(),
1920        ));
1921        assert_eq!(outcome.downloaded, 1);
1922        let written = fs.read_file("a.mp3").unwrap();
1923        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1924        assert_eq!(tag.synchronised_lyrics().count(), 0);
1925        assert_eq!(tag.lyrics().count(), 0);
1926    }
1927
1928    #[test]
1929    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
1930        let mut c = clip("a");
1931        c.audio_url = String::new();
1932        let d = desired(c.clone(), AudioFormat::Mp3);
1933        let plan = Plan {
1934            actions: vec![Action::Download {
1935                clip: c.clone(),
1936                lineage: LineageContext::own_root(&c),
1937                path: d.path.clone(),
1938                format: AudioFormat::Mp3,
1939            }],
1940        };
1941        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
1942        let fs = MemFs::new();
1943        let mut manifest = Manifest::new();
1944        let outcome = run(
1945            &plan,
1946            &mut manifest,
1947            &[d],
1948            &http,
1949            &fs,
1950            &StubFfmpeg::flac(),
1951            &RecordingClock::new(),
1952            &ExecOptions::default(),
1953        );
1954        assert_eq!(outcome.downloaded, 1);
1955        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
1956    }
1957
1958    // ── Download: FLAC render + transcode ───────────────────────────
1959
1960    #[test]
1961    fn download_flac_renders_transcodes_and_records() {
1962        let c = clip("b");
1963        let d = desired(c.clone(), AudioFormat::Flac);
1964        let plan = Plan {
1965            actions: vec![Action::Download {
1966                clip: c.clone(),
1967                lineage: LineageContext::own_root(&c),
1968                path: d.path.clone(),
1969                format: AudioFormat::Flac,
1970            }],
1971        };
1972        let http = ScriptedHttp::new()
1973            .with_auth()
1974            .route(
1975                "/wav_file/",
1976                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
1977            )
1978            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
1979        let fs = MemFs::new();
1980        let clock = RecordingClock::new();
1981        let mut manifest = Manifest::new();
1982
1983        let outcome = run(
1984            &plan,
1985            &mut manifest,
1986            &[d],
1987            &http,
1988            &fs,
1989            &StubFfmpeg::flac(),
1990            &clock,
1991            &ExecOptions::default(),
1992        );
1993
1994        assert_eq!(outcome.downloaded, 1);
1995        assert_eq!(outcome.failed(), 0);
1996        let written = fs.read_file("b.flac").unwrap();
1997        assert_eq!(&written[..4], b"fLaC");
1998        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
1999        // The URL was ready immediately, so no render request and no polling.
2000        assert_eq!(http.count("/convert_wav/"), 0);
2001        assert!(clock.sleeps().is_empty());
2002    }
2003
2004    #[test]
2005    fn download_flac_requests_render_then_polls_until_ready() {
2006        let c = clip("c");
2007        let d = desired(c.clone(), AudioFormat::Flac);
2008        let plan = Plan {
2009            actions: vec![Action::Download {
2010                clip: c.clone(),
2011                lineage: LineageContext::own_root(&c),
2012                path: d.path.clone(),
2013                format: AudioFormat::Flac,
2014            }],
2015        };
2016        let http = ScriptedHttp::new()
2017            .with_auth()
2018            .route_seq(
2019                "/wav_file/",
2020                vec![
2021                    Reply::json("{}"),
2022                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2023                ],
2024            )
2025            .route("/convert_wav/", Reply::status(200))
2026            .route("c.wav", Reply::ok(b"wav".to_vec()));
2027        let clock = RecordingClock::new();
2028        let mut manifest = Manifest::new();
2029
2030        let outcome = run(
2031            &plan,
2032            &mut manifest,
2033            &[d],
2034            &http,
2035            &fs_new(),
2036            &StubFfmpeg::flac(),
2037            &clock,
2038            &small_poll(),
2039        );
2040
2041        assert_eq!(outcome.downloaded, 1);
2042        assert_eq!(http.count("/convert_wav/"), 1);
2043        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2044    }
2045
2046    #[test]
2047    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2048        let c = clip("d");
2049        let d = desired(c.clone(), AudioFormat::Flac);
2050        let plan = Plan {
2051            actions: vec![Action::Download {
2052                clip: c.clone(),
2053                lineage: LineageContext::own_root(&c),
2054                path: d.path.clone(),
2055                format: AudioFormat::Flac,
2056            }],
2057        };
2058        let http = ScriptedHttp::new()
2059            .with_auth()
2060            .route("/wav_file/", Reply::json("{}"))
2061            .route("/convert_wav/", Reply::status(200));
2062        let fs = MemFs::new();
2063        let clock = RecordingClock::new();
2064        let mut manifest = Manifest::new();
2065
2066        let outcome = run(
2067            &plan,
2068            &mut manifest,
2069            &[d],
2070            &http,
2071            &fs,
2072            &StubFfmpeg::flac(),
2073            &clock,
2074            &small_poll(),
2075        );
2076
2077        assert_eq!(outcome.downloaded, 0);
2078        assert_eq!(outcome.failed(), 1);
2079        assert_eq!(outcome.failures[0].clip_id, "d");
2080        assert_eq!(outcome.status, RunStatus::Completed);
2081        assert!(!fs.exists("d.flac"));
2082        assert_eq!(clock.sleeps().len(), 2);
2083    }
2084
2085    #[test]
2086    fn flac_transcode_failure_is_recorded_and_skipped() {
2087        let c = clip("t");
2088        let d = desired(c.clone(), AudioFormat::Flac);
2089        let plan = Plan {
2090            actions: vec![Action::Download {
2091                clip: c.clone(),
2092                lineage: LineageContext::own_root(&c),
2093                path: d.path.clone(),
2094                format: AudioFormat::Flac,
2095            }],
2096        };
2097        let http = ScriptedHttp::new()
2098            .with_auth()
2099            .route(
2100                "/wav_file/",
2101                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2102            )
2103            .route("t.wav", Reply::ok(b"wav".to_vec()));
2104        let fs = MemFs::new();
2105        let mut manifest = Manifest::new();
2106
2107        let outcome = run(
2108            &plan,
2109            &mut manifest,
2110            &[d],
2111            &http,
2112            &fs,
2113            &StubFfmpeg::failing(),
2114            &RecordingClock::new(),
2115            &ExecOptions::default(),
2116        );
2117
2118        assert_eq!(outcome.downloaded, 0);
2119        assert_eq!(outcome.failed(), 1);
2120        assert!(!fs.exists("t.flac"));
2121        assert!(manifest.get("t").is_none());
2122    }
2123
2124    // ── Cover fallback ──────────────────────────────────────────────
2125
2126    #[test]
2127    fn cover_falls_back_when_large_image_is_missing() {
2128        let c = art_clip("e");
2129        let d = desired(c.clone(), AudioFormat::Mp3);
2130        let plan = Plan {
2131            actions: vec![Action::Download {
2132                clip: c.clone(),
2133                lineage: LineageContext::own_root(&c),
2134                path: d.path.clone(),
2135                format: AudioFormat::Mp3,
2136            }],
2137        };
2138        let http = ScriptedHttp::new()
2139            .route("e.mp3", Reply::ok(b"body".to_vec()))
2140            .route("e/large.jpg", Reply::status(404))
2141            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2142        let fs = MemFs::new();
2143        let mut manifest = Manifest::new();
2144
2145        let outcome = run(
2146            &plan,
2147            &mut manifest,
2148            &[d],
2149            &http,
2150            &fs,
2151            &StubFfmpeg::flac(),
2152            &RecordingClock::new(),
2153            &ExecOptions::default(),
2154        );
2155
2156        assert_eq!(outcome.downloaded, 1);
2157        let calls = http.calls();
2158        let large = calls
2159            .iter()
2160            .position(|u| u.contains("e/large.jpg"))
2161            .unwrap();
2162        let small = calls
2163            .iter()
2164            .position(|u| u.contains("e/small.jpg"))
2165            .unwrap();
2166        assert!(large < small, "large art tried before small");
2167    }
2168
2169    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2170
2171    #[test]
2172    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2173        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2174        // fetched once and the bytes serve both.
2175        let c = art_clip("a");
2176        let d = desired(c.clone(), AudioFormat::Mp3);
2177        let plan = Plan {
2178            actions: vec![
2179                Action::Download {
2180                    clip: c.clone(),
2181                    lineage: LineageContext::own_root(&c),
2182                    path: d.path.clone(),
2183                    format: AudioFormat::Mp3,
2184                },
2185                Action::WriteArtifact {
2186                    kind: ArtifactKind::CoverJpg,
2187                    path: "a/cover.jpg".to_owned(),
2188                    source_url: c.selected_image_url().unwrap().to_owned(),
2189                    hash: "art".to_owned(),
2190                    owner_id: "a".to_owned(),
2191                    content: None,
2192                },
2193            ],
2194        };
2195        let http = ScriptedHttp::new()
2196            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2197            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2198        let fs = MemFs::new();
2199        let mut manifest = Manifest::new();
2200
2201        let outcome = run(
2202            &plan,
2203            &mut manifest,
2204            &[d],
2205            &http,
2206            &fs,
2207            &StubFfmpeg::flac(),
2208            &RecordingClock::new(),
2209            &ExecOptions::default(),
2210        );
2211
2212        assert_eq!(outcome.downloaded, 1);
2213        assert_eq!(outcome.artifacts_written, 1);
2214        assert_eq!(outcome.failed(), 0);
2215        // Fetched once, not twice.
2216        assert_eq!(http.count("a/large.jpg"), 1);
2217        // The sidecar carries the fetched bytes, and the audio was tagged.
2218        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2219        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2220    }
2221
2222    #[test]
2223    fn concurrent_downloads_reuse_each_clips_own_cover() {
2224        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2225        // (no cross-contamination) and each cover URL is fetched exactly once.
2226        let a = art_clip("a");
2227        let b = art_clip("b");
2228        let da = desired(a.clone(), AudioFormat::Mp3);
2229        let db = desired(b.clone(), AudioFormat::Mp3);
2230        let plan = Plan {
2231            actions: vec![
2232                Action::Download {
2233                    clip: a.clone(),
2234                    lineage: LineageContext::own_root(&a),
2235                    path: da.path.clone(),
2236                    format: AudioFormat::Mp3,
2237                },
2238                Action::WriteArtifact {
2239                    kind: ArtifactKind::CoverJpg,
2240                    path: "a/cover.jpg".to_owned(),
2241                    source_url: a.selected_image_url().unwrap().to_owned(),
2242                    hash: "art".to_owned(),
2243                    owner_id: "a".to_owned(),
2244                    content: None,
2245                },
2246                Action::Download {
2247                    clip: b.clone(),
2248                    lineage: LineageContext::own_root(&b),
2249                    path: db.path.clone(),
2250                    format: AudioFormat::Mp3,
2251                },
2252                Action::WriteArtifact {
2253                    kind: ArtifactKind::CoverJpg,
2254                    path: "b/cover.jpg".to_owned(),
2255                    source_url: b.selected_image_url().unwrap().to_owned(),
2256                    hash: "art".to_owned(),
2257                    owner_id: "b".to_owned(),
2258                    content: None,
2259                },
2260            ],
2261        };
2262        let http = ScriptedHttp::new()
2263            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2264            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2265            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2266            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2267        let fs = MemFs::new();
2268        let mut manifest = Manifest::new();
2269
2270        let outcome = run(
2271            &plan,
2272            &mut manifest,
2273            &[da, db],
2274            &http,
2275            &fs,
2276            &StubFfmpeg::flac(),
2277            &RecordingClock::new(),
2278            &small_poll(),
2279        );
2280
2281        assert_eq!(outcome.downloaded, 2);
2282        assert_eq!(outcome.artifacts_written, 2);
2283        assert_eq!(http.count("a/large.jpg"), 1);
2284        assert_eq!(http.count("b/large.jpg"), 1);
2285        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2286        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2287    }
2288
2289    #[test]
2290    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2291        // The large image 404s so the embed falls back to the small image; the
2292        // sidecar still wants the (dead) large URL and must NOT be handed the
2293        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2294        // the sidecar fetches the large URL itself (then fails on the 404).
2295        let c = art_clip("e");
2296        let d = desired(c.clone(), AudioFormat::Mp3);
2297        let plan = Plan {
2298            actions: vec![
2299                Action::Download {
2300                    clip: c.clone(),
2301                    lineage: LineageContext::own_root(&c),
2302                    path: d.path.clone(),
2303                    format: AudioFormat::Mp3,
2304                },
2305                Action::WriteArtifact {
2306                    kind: ArtifactKind::CoverJpg,
2307                    path: "e/cover.jpg".to_owned(),
2308                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2309                    hash: "art".to_owned(),
2310                    owner_id: "e".to_owned(),
2311                    content: None,
2312                },
2313            ],
2314        };
2315        let http = ScriptedHttp::new()
2316            .route("e.mp3", Reply::ok(b"body".to_vec()))
2317            .route("e/large.jpg", Reply::status(404))
2318            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2319        let fs = MemFs::new();
2320        let mut manifest = Manifest::new();
2321
2322        let outcome = run(
2323            &plan,
2324            &mut manifest,
2325            &[d],
2326            &http,
2327            &fs,
2328            &StubFfmpeg::flac(),
2329            &RecordingClock::new(),
2330            &ExecOptions::default(),
2331        );
2332
2333        assert_eq!(outcome.downloaded, 1);
2334        // The small image was fetched once (the embed fallback) and never reused
2335        // for the large-keyed sidecar; the sidecar went to the network itself.
2336        assert_eq!(http.count("e/small.jpg"), 1);
2337        assert!(
2338            http.count("e/large.jpg") >= 2,
2339            "sidecar refetched the large URL"
2340        );
2341        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2342        assert!(!fs.exists("e/cover.jpg"));
2343    }
2344
2345    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2346
2347    #[test]
2348    fn failed_write_leaves_the_prior_file_intact() {
2349        let c = clip("f");
2350        let d = desired(c.clone(), AudioFormat::Mp3);
2351        let plan = Plan {
2352            actions: vec![Action::Download {
2353                clip: c.clone(),
2354                lineage: LineageContext::own_root(&c),
2355                path: d.path.clone(),
2356                format: AudioFormat::Mp3,
2357            }],
2358        };
2359        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2360        let fs = MemFs::new()
2361            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2362            .fail_write("f.mp3");
2363        let mut manifest = Manifest::new();
2364
2365        let outcome = run(
2366            &plan,
2367            &mut manifest,
2368            &[d],
2369            &http,
2370            &fs,
2371            &StubFfmpeg::flac(),
2372            &RecordingClock::new(),
2373            &ExecOptions::default(),
2374        );
2375
2376        assert_eq!(outcome.downloaded, 0);
2377        assert_eq!(outcome.failed(), 1);
2378        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2379        assert!(manifest.get("f").is_none());
2380    }
2381
2382    #[test]
2383    fn size_mismatch_after_write_is_a_failure() {
2384        let c = clip("g");
2385        let d = desired(c.clone(), AudioFormat::Mp3);
2386        let plan = Plan {
2387            actions: vec![Action::Download {
2388                clip: c.clone(),
2389                lineage: LineageContext::own_root(&c),
2390                path: d.path.clone(),
2391                format: AudioFormat::Mp3,
2392            }],
2393        };
2394        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2395        let fs = MemFs::new().corrupt_write("g.mp3");
2396        let mut manifest = Manifest::new();
2397
2398        let outcome = run(
2399            &plan,
2400            &mut manifest,
2401            &[d],
2402            &http,
2403            &fs,
2404            &StubFfmpeg::flac(),
2405            &RecordingClock::new(),
2406            &ExecOptions::default(),
2407        );
2408
2409        assert_eq!(outcome.downloaded, 0);
2410        assert_eq!(outcome.failed(), 1);
2411        assert!(outcome.failures[0].reason.contains("expected"));
2412        assert!(manifest.get("g").is_none());
2413    }
2414
2415    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2416
2417    #[test]
2418    fn transient_failure_is_retried_then_skipped() {
2419        let c = clip("h");
2420        let d = desired(c.clone(), AudioFormat::Mp3);
2421        let plan = Plan {
2422            actions: vec![Action::Download {
2423                clip: c.clone(),
2424                lineage: LineageContext::own_root(&c),
2425                path: d.path.clone(),
2426                format: AudioFormat::Mp3,
2427            }],
2428        };
2429        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2430        let fs = MemFs::new();
2431        let clock = RecordingClock::new();
2432        let opts = ExecOptions {
2433            max_retries: 2,
2434            ..ExecOptions::default()
2435        };
2436        let mut manifest = Manifest::new();
2437
2438        let outcome = run(
2439            &plan,
2440            &mut manifest,
2441            &[d],
2442            &http,
2443            &fs,
2444            &StubFfmpeg::flac(),
2445            &clock,
2446            &opts,
2447        );
2448
2449        assert_eq!(outcome.downloaded, 0);
2450        assert_eq!(outcome.failed(), 1);
2451        assert_eq!(http.count("h.mp3"), 3);
2452        assert_eq!(clock.sleeps().len(), 2);
2453    }
2454
2455    #[test]
2456    fn truncated_download_is_retried_then_succeeds() {
2457        let c = clip("i");
2458        let d = desired(c.clone(), AudioFormat::Mp3);
2459        let plan = Plan {
2460            actions: vec![Action::Download {
2461                clip: c.clone(),
2462                lineage: LineageContext::own_root(&c),
2463                path: d.path.clone(),
2464                format: AudioFormat::Mp3,
2465            }],
2466        };
2467        let http = ScriptedHttp::new().route_seq(
2468            "i.mp3",
2469            vec![
2470                Reply::ok(b"short".to_vec()).with_content_length(999),
2471                Reply::ok(b"good-body".to_vec()),
2472            ],
2473        );
2474        let fs = MemFs::new();
2475        let clock = RecordingClock::new();
2476        let mut manifest = Manifest::new();
2477
2478        let outcome = run(
2479            &plan,
2480            &mut manifest,
2481            &[d],
2482            &http,
2483            &fs,
2484            &StubFfmpeg::flac(),
2485            &clock,
2486            &ExecOptions::default(),
2487        );
2488
2489        assert_eq!(outcome.downloaded, 1);
2490        assert_eq!(http.count("i.mp3"), 2);
2491        assert_eq!(clock.sleeps().len(), 1);
2492    }
2493
2494    #[test]
2495    fn rate_limit_backs_off_using_retry_after() {
2496        let c = clip("j");
2497        let d = desired(c.clone(), AudioFormat::Mp3);
2498        let plan = Plan {
2499            actions: vec![Action::Download {
2500                clip: c.clone(),
2501                lineage: LineageContext::own_root(&c),
2502                path: d.path.clone(),
2503                format: AudioFormat::Mp3,
2504            }],
2505        };
2506        let http = ScriptedHttp::new().route_seq(
2507            "j.mp3",
2508            vec![
2509                Reply::status(429).with_retry_after(7),
2510                Reply::ok(b"body".to_vec()),
2511            ],
2512        );
2513        let fs = MemFs::new();
2514        let clock = RecordingClock::new();
2515        let mut manifest = Manifest::new();
2516
2517        let outcome = run(
2518            &plan,
2519            &mut manifest,
2520            &[d],
2521            &http,
2522            &fs,
2523            &StubFfmpeg::flac(),
2524            &clock,
2525            &ExecOptions::default(),
2526        );
2527
2528        assert_eq!(outcome.downloaded, 1);
2529        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2530    }
2531
2532    #[test]
2533    fn auth_failure_aborts_the_run() {
2534        let c1 = clip("k1");
2535        let c2 = clip("k2");
2536        let d1 = desired(c1.clone(), AudioFormat::Flac);
2537        let d2 = desired(c2.clone(), AudioFormat::Flac);
2538        let plan = Plan {
2539            actions: vec![
2540                Action::Download {
2541                    clip: c1.clone(),
2542                    lineage: LineageContext::own_root(&c1),
2543                    path: d1.path.clone(),
2544                    format: AudioFormat::Flac,
2545                },
2546                Action::Download {
2547                    clip: c2.clone(),
2548                    lineage: LineageContext::own_root(&c2),
2549                    path: d2.path.clone(),
2550                    format: AudioFormat::Flac,
2551                },
2552            ],
2553        };
2554        // The authenticated WAV-render endpoint rejects auth even after a JWT
2555        // refresh: that is a bad token, so the whole run aborts rather than
2556        // hammering every clip. A CDN media rejection, by contrast, does not.
2557        let http = ScriptedHttp::new()
2558            .with_auth()
2559            .route("/wav_file/", Reply::status(401));
2560        let fs = MemFs::new();
2561        let mut manifest = Manifest::new();
2562
2563        let outcome = run(
2564            &plan,
2565            &mut manifest,
2566            &[d1, d2],
2567            &http,
2568            &fs,
2569            &StubFfmpeg::flac(),
2570            &RecordingClock::new(),
2571            &small_poll(),
2572        );
2573
2574        assert_eq!(outcome.status, RunStatus::AuthAborted);
2575        assert_eq!(outcome.failed(), 1);
2576        assert_eq!(outcome.failures[0].clip_id, "k1");
2577        assert_eq!(outcome.downloaded, 0);
2578    }
2579
2580    // ── Disk-full aborts the run (issue #17) ────────────────────────
2581
2582    #[test]
2583    fn disk_full_primary_write_aborts_the_run() {
2584        // Two MP3 downloads; the first write is out of space. That is systemic,
2585        // so the run aborts before the second is even attempted: exactly one
2586        // failure is recorded and its reason names the disk-full cause.
2587        let c1 = clip("d1");
2588        let c2 = clip("d2");
2589        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2590        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2591        let plan = Plan {
2592            actions: vec![
2593                Action::Download {
2594                    clip: c1.clone(),
2595                    lineage: LineageContext::own_root(&c1),
2596                    path: d1.path.clone(),
2597                    format: AudioFormat::Mp3,
2598                },
2599                Action::Download {
2600                    clip: c2.clone(),
2601                    lineage: LineageContext::own_root(&c2),
2602                    path: d2.path.clone(),
2603                    format: AudioFormat::Mp3,
2604                },
2605            ],
2606        };
2607        let http = ScriptedHttp::new()
2608            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2609            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2610        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2611        let mut manifest = Manifest::new();
2612
2613        let outcome = run(
2614            &plan,
2615            &mut manifest,
2616            &[d1, d2],
2617            &http,
2618            &fs,
2619            &StubFfmpeg::flac(),
2620            &RecordingClock::new(),
2621            &ExecOptions::default(),
2622        );
2623
2624        assert_eq!(outcome.status, RunStatus::DiskFull);
2625        assert_eq!(outcome.failed(), 1);
2626        assert_eq!(outcome.failures[0].clip_id, "d1");
2627        assert!(outcome.failures[0].reason.contains("disk full"));
2628        assert_eq!(outcome.downloaded, 0);
2629        // The second clip was never fetched: the run aborted first.
2630        assert_eq!(http.count("d2.mp3"), 0);
2631        assert!(!fs.exists("d2.mp3"));
2632    }
2633
2634    #[test]
2635    fn disk_full_flac_transcode_aborts_the_run() {
2636        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2637        // there is nowhere to stage the transcode, so the run aborts.
2638        let c1 = clip("d1");
2639        let c2 = clip("d2");
2640        let d1 = desired(c1.clone(), AudioFormat::Flac);
2641        let d2 = desired(c2.clone(), AudioFormat::Flac);
2642        let plan = Plan {
2643            actions: vec![
2644                Action::Download {
2645                    clip: c1.clone(),
2646                    lineage: LineageContext::own_root(&c1),
2647                    path: d1.path.clone(),
2648                    format: AudioFormat::Flac,
2649                },
2650                Action::Download {
2651                    clip: c2.clone(),
2652                    lineage: LineageContext::own_root(&c2),
2653                    path: d2.path.clone(),
2654                    format: AudioFormat::Flac,
2655                },
2656            ],
2657        };
2658        let http = ScriptedHttp::new()
2659            .with_auth()
2660            .route(
2661                "/wav_file/",
2662                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2663            )
2664            .route(".wav", Reply::ok(b"wav".to_vec()));
2665        let fs = MemFs::new();
2666        let mut manifest = Manifest::new();
2667
2668        let outcome = run(
2669            &plan,
2670            &mut manifest,
2671            &[d1, d2],
2672            &http,
2673            &fs,
2674            &StubFfmpeg::out_of_space(),
2675            &RecordingClock::new(),
2676            &ExecOptions::default(),
2677        );
2678
2679        assert_eq!(outcome.status, RunStatus::DiskFull);
2680        assert_eq!(outcome.failed(), 1);
2681        assert_eq!(outcome.failures[0].clip_id, "d1");
2682        assert!(outcome.failures[0].reason.contains("disk full"));
2683        assert_eq!(outcome.downloaded, 0);
2684    }
2685
2686    #[test]
2687    fn disk_full_artifact_write_aborts_the_run() {
2688        // A sidecar write (not a primary download) also aborts on a full disk:
2689        // the owning audio is present, the cover fetch succeeds, but the sidecar
2690        // cannot be written.
2691        let mut manifest = Manifest::new();
2692        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2693        let plan = Plan {
2694            actions: vec![Action::WriteArtifact {
2695                kind: ArtifactKind::CoverJpg,
2696                path: "a/cover.jpg".to_owned(),
2697                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2698                hash: "h1".to_owned(),
2699                owner_id: "a".to_owned(),
2700                content: None,
2701            }],
2702        };
2703        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2704        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
2705
2706        let outcome = run(
2707            &plan,
2708            &mut manifest,
2709            &[],
2710            &http,
2711            &fs,
2712            &StubFfmpeg::flac(),
2713            &RecordingClock::new(),
2714            &ExecOptions::default(),
2715        );
2716
2717        assert_eq!(outcome.status, RunStatus::DiskFull);
2718        assert_eq!(outcome.failed(), 1);
2719        assert!(outcome.failures[0].reason.contains("disk full"));
2720        assert_eq!(outcome.artifacts_written, 0);
2721        // The sidecar slot was never recorded: the write failed before it.
2722        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
2723    }
2724
2725    #[test]
2726    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
2727        // write_verify fails before any manifest insert, so a re-download that
2728        // hits a full disk leaves the prior entry (and file) exactly as it was.
2729        let c = clip("m");
2730        let d = desired(c.clone(), AudioFormat::Mp3);
2731        let plan = Plan {
2732            actions: vec![Action::Download {
2733                clip: c.clone(),
2734                lineage: LineageContext::own_root(&c),
2735                path: d.path.clone(),
2736                format: AudioFormat::Mp3,
2737            }],
2738        };
2739        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
2740        let fs = MemFs::new()
2741            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
2742            .fail_write_out_of_space("m.mp3");
2743        let mut manifest = Manifest::new();
2744        let before = entry("m.mp3", AudioFormat::Mp3);
2745        manifest.insert("m", before.clone());
2746
2747        let outcome = run(
2748            &plan,
2749            &mut manifest,
2750            &[d],
2751            &http,
2752            &fs,
2753            &StubFfmpeg::flac(),
2754            &RecordingClock::new(),
2755            &ExecOptions::default(),
2756        );
2757
2758        assert_eq!(outcome.status, RunStatus::DiskFull);
2759        assert_eq!(manifest.get("m"), Some(&before));
2760        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
2761    }
2762
2763    #[test]
2764    fn cdn_download_rejection_skips_the_clip_without_aborting() {
2765        let c1 = clip("k1");
2766        let c2 = clip("k2");
2767        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2768        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2769        let plan = Plan {
2770            actions: vec![
2771                Action::Download {
2772                    clip: c1.clone(),
2773                    lineage: LineageContext::own_root(&c1),
2774                    path: d1.path.clone(),
2775                    format: AudioFormat::Mp3,
2776                },
2777                Action::Download {
2778                    clip: c2.clone(),
2779                    lineage: LineageContext::own_root(&c2),
2780                    path: d2.path.clone(),
2781                    format: AudioFormat::Mp3,
2782                },
2783            ],
2784        };
2785        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
2786        // rejection (often transient), not a bad token: the clip is retried
2787        // then recorded and skipped, and the run carries on to the rest.
2788        let http = ScriptedHttp::new()
2789            .route("k1.mp3", Reply::status(403))
2790            .route("k2.mp3", Reply::ok(b"body".to_vec()));
2791        let fs = MemFs::new();
2792        let mut manifest = Manifest::new();
2793
2794        let outcome = run(
2795            &plan,
2796            &mut manifest,
2797            &[d1, d2],
2798            &http,
2799            &fs,
2800            &StubFfmpeg::flac(),
2801            &RecordingClock::new(),
2802            &ExecOptions::default(),
2803        );
2804
2805        assert_ne!(outcome.status, RunStatus::AuthAborted);
2806        assert_eq!(outcome.downloaded, 1);
2807        assert_eq!(outcome.failed(), 1);
2808        assert_eq!(outcome.failures[0].clip_id, "k1");
2809    }
2810
2811    #[test]
2812    fn one_clip_failure_does_not_abort_the_run() {
2813        let c1 = clip("l1");
2814        let c2 = clip("l2");
2815        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2816        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2817        let plan = Plan {
2818            actions: vec![
2819                Action::Download {
2820                    clip: c1.clone(),
2821                    lineage: LineageContext::own_root(&c1),
2822                    path: d1.path.clone(),
2823                    format: AudioFormat::Mp3,
2824                },
2825                Action::Download {
2826                    clip: c2.clone(),
2827                    lineage: LineageContext::own_root(&c2),
2828                    path: d2.path.clone(),
2829                    format: AudioFormat::Mp3,
2830                },
2831            ],
2832        };
2833        let http = ScriptedHttp::new()
2834            .route("l1.mp3", Reply::status(404))
2835            .route("l2.mp3", Reply::ok(b"body".to_vec()));
2836        let fs = MemFs::new();
2837        let mut manifest = Manifest::new();
2838
2839        let outcome = run(
2840            &plan,
2841            &mut manifest,
2842            &[d1, d2],
2843            &http,
2844            &fs,
2845            &StubFfmpeg::flac(),
2846            &RecordingClock::new(),
2847            &ExecOptions::default(),
2848        );
2849
2850        assert_eq!(outcome.status, RunStatus::Completed);
2851        assert_eq!(outcome.downloaded, 1);
2852        assert_eq!(outcome.failed(), 1);
2853        assert_eq!(outcome.failures[0].clip_id, "l1");
2854        assert!(fs.exists("l2.mp3"));
2855        assert!(manifest.get("l2").is_some());
2856        assert!(manifest.get("l1").is_none());
2857    }
2858
2859    // ── preserve marker (SYNC-8) ────────────────────────────────────
2860
2861    #[test]
2862    fn preserve_is_set_for_copy_held_and_private_clips() {
2863        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
2864        mirror.modes = vec![SourceMode::Mirror];
2865        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
2866        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
2867        let mut private = desired(clip("m3"), AudioFormat::Mp3);
2868        private.private = true;
2869
2870        let plan = Plan {
2871            actions: vec![
2872                Action::Download {
2873                    clip: mirror.clip.clone(),
2874                    lineage: LineageContext::own_root(&mirror.clip),
2875                    path: mirror.path.clone(),
2876                    format: AudioFormat::Mp3,
2877                },
2878                Action::Download {
2879                    clip: copy_held.clip.clone(),
2880                    lineage: LineageContext::own_root(&copy_held.clip),
2881                    path: copy_held.path.clone(),
2882                    format: AudioFormat::Mp3,
2883                },
2884                Action::Download {
2885                    clip: private.clip.clone(),
2886                    lineage: LineageContext::own_root(&private.clip),
2887                    path: private.path.clone(),
2888                    format: AudioFormat::Mp3,
2889                },
2890            ],
2891        };
2892        let http = ScriptedHttp::new()
2893            .route("m1.mp3", Reply::ok(b"a".to_vec()))
2894            .route("m2.mp3", Reply::ok(b"b".to_vec()))
2895            .route("m3.mp3", Reply::ok(b"c".to_vec()));
2896        let fs = MemFs::new();
2897        let mut manifest = Manifest::new();
2898
2899        let outcome = run(
2900            &plan,
2901            &mut manifest,
2902            &[mirror, copy_held, private],
2903            &http,
2904            &fs,
2905            &StubFfmpeg::flac(),
2906            &RecordingClock::new(),
2907            &ExecOptions::default(),
2908        );
2909
2910        assert_eq!(outcome.downloaded, 3);
2911        assert!(!manifest.get("m1").unwrap().preserve);
2912        assert!(manifest.get("m2").unwrap().preserve);
2913        assert!(manifest.get("m3").unwrap().preserve);
2914    }
2915
2916    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
2917
2918    #[test]
2919    fn reformat_writes_new_format_and_removes_old_file() {
2920        let c = clip("n");
2921        let d = desired(c.clone(), AudioFormat::Mp3);
2922        let plan = Plan {
2923            actions: vec![Action::Reformat {
2924                clip: c.clone(),
2925                path: "n.mp3".to_owned(),
2926                from_path: "n.flac".to_owned(),
2927                from: AudioFormat::Flac,
2928                to: AudioFormat::Mp3,
2929            }],
2930        };
2931        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
2932        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
2933        let mut manifest = Manifest::new();
2934        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
2935
2936        let outcome = run(
2937            &plan,
2938            &mut manifest,
2939            &[d],
2940            &http,
2941            &fs,
2942            &StubFfmpeg::flac(),
2943            &RecordingClock::new(),
2944            &ExecOptions::default(),
2945        );
2946
2947        assert_eq!(outcome.reformatted, 1);
2948        assert!(fs.exists("n.mp3"));
2949        assert!(!fs.exists("n.flac"));
2950        let updated = manifest.get("n").unwrap();
2951        assert_eq!(updated.path, "n.mp3");
2952        assert_eq!(updated.format, AudioFormat::Mp3);
2953        assert_eq!(updated.meta_hash, "m");
2954    }
2955
2956    #[test]
2957    fn retag_rewrites_file_and_updates_hashes() {
2958        let c = clip("o");
2959        let mut d = desired(c.clone(), AudioFormat::Mp3);
2960        d.meta_hash = "new".to_owned();
2961        d.art_hash = "new-art".to_owned();
2962        let existing = tag_mp3(
2963            b"audio",
2964            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
2965            None,
2966            None,
2967        )
2968        .unwrap();
2969        let fs = MemFs::new().with_file("o.mp3", existing.clone());
2970        let mut manifest = Manifest::new();
2971        let mut start = entry("o.mp3", AudioFormat::Mp3);
2972        start.size = existing.len() as u64;
2973        manifest.insert("o", start);
2974        let plan = Plan {
2975            actions: vec![Action::Retag {
2976                clip: c.clone(),
2977                lineage: LineageContext::own_root(&c),
2978                path: "o.mp3".to_owned(),
2979            }],
2980        };
2981
2982        let outcome = run(
2983            &plan,
2984            &mut manifest,
2985            &[d],
2986            &ScriptedHttp::new(),
2987            &fs,
2988            &StubFfmpeg::flac(),
2989            &RecordingClock::new(),
2990            &ExecOptions::default(),
2991        );
2992
2993        assert_eq!(outcome.retagged, 1);
2994        let updated = manifest.get("o").unwrap();
2995        assert_eq!(updated.meta_hash, "new");
2996        assert_eq!(updated.art_hash, "new-art");
2997        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
2998    }
2999
3000    #[test]
3001    fn rename_moves_file_and_updates_manifest_path() {
3002        let c = clip("p");
3003        let mut d = desired(c.clone(), AudioFormat::Mp3);
3004        d.path = "new/p.mp3".to_owned();
3005        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3006        let mut manifest = Manifest::new();
3007        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3008        let plan = Plan {
3009            actions: vec![Action::Rename {
3010                from: "old/p.mp3".to_owned(),
3011                to: "new/p.mp3".to_owned(),
3012            }],
3013        };
3014
3015        let outcome = run(
3016            &plan,
3017            &mut manifest,
3018            &[d],
3019            &ScriptedHttp::new(),
3020            &fs,
3021            &StubFfmpeg::flac(),
3022            &RecordingClock::new(),
3023            &ExecOptions::default(),
3024        );
3025
3026        assert_eq!(outcome.renamed, 1);
3027        assert!(fs.exists("new/p.mp3"));
3028        assert!(!fs.exists("old/p.mp3"));
3029        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3030    }
3031
3032    #[test]
3033    fn disk_full_rename_aborts_the_run() {
3034        // A move onto a full disk is systemic like a full-disk write: the run
3035        // aborts with DiskFull and the source file is left untouched.
3036        let c = clip("p");
3037        let mut d = desired(c.clone(), AudioFormat::Mp3);
3038        d.path = "new/p.mp3".to_owned();
3039        let fs = MemFs::new()
3040            .with_file("old/p.mp3", b"DATA".to_vec())
3041            .fail_rename_out_of_space("new/p.mp3");
3042        let mut manifest = Manifest::new();
3043        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3044        let plan = Plan {
3045            actions: vec![Action::Rename {
3046                from: "old/p.mp3".to_owned(),
3047                to: "new/p.mp3".to_owned(),
3048            }],
3049        };
3050
3051        let outcome = run(
3052            &plan,
3053            &mut manifest,
3054            &[d],
3055            &ScriptedHttp::new(),
3056            &fs,
3057            &StubFfmpeg::flac(),
3058            &RecordingClock::new(),
3059            &ExecOptions::default(),
3060        );
3061
3062        assert_eq!(outcome.status, RunStatus::DiskFull);
3063        assert_eq!(outcome.renamed, 0);
3064        assert_eq!(outcome.failed(), 1);
3065        assert!(outcome.failures[0].reason.contains("disk full"));
3066        // The source is untouched: the move never happened.
3067        assert!(fs.exists("old/p.mp3"));
3068        assert!(!fs.exists("new/p.mp3"));
3069        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3070    }
3071
3072    #[test]
3073    fn delete_removes_file_and_manifest_entry() {
3074        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3075        let mut manifest = Manifest::new();
3076        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3077        let plan = Plan {
3078            actions: vec![Action::Delete {
3079                path: "q.mp3".to_owned(),
3080                clip_id: "q".to_owned(),
3081            }],
3082        };
3083
3084        let outcome = run(
3085            &plan,
3086            &mut manifest,
3087            &[],
3088            &ScriptedHttp::new(),
3089            &fs,
3090            &StubFfmpeg::flac(),
3091            &RecordingClock::new(),
3092            &ExecOptions::default(),
3093        );
3094
3095        assert_eq!(outcome.deleted, 1);
3096        assert!(!fs.exists("q.mp3"));
3097        assert!(manifest.get("q").is_none());
3098    }
3099
3100    #[test]
3101    fn failed_delete_keeps_the_manifest_entry() {
3102        let fs = MemFs::new()
3103            .with_file("s.mp3", b"DATA".to_vec())
3104            .fail_remove("s.mp3");
3105        let mut manifest = Manifest::new();
3106        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3107        let plan = Plan {
3108            actions: vec![Action::Delete {
3109                path: "s.mp3".to_owned(),
3110                clip_id: "s".to_owned(),
3111            }],
3112        };
3113
3114        let outcome = run(
3115            &plan,
3116            &mut manifest,
3117            &[],
3118            &ScriptedHttp::new(),
3119            &fs,
3120            &StubFfmpeg::flac(),
3121            &RecordingClock::new(),
3122            &ExecOptions::default(),
3123        );
3124
3125        assert_eq!(outcome.deleted, 0);
3126        assert_eq!(outcome.failed(), 1);
3127        assert!(manifest.get("s").is_some());
3128        assert!(fs.exists("s.mp3"));
3129    }
3130
3131    #[test]
3132    fn skip_is_a_noop() {
3133        let mut manifest = Manifest::new();
3134        let plan = Plan {
3135            actions: vec![Action::Skip {
3136                clip_id: "r".to_owned(),
3137            }],
3138        };
3139        let outcome = run(
3140            &plan,
3141            &mut manifest,
3142            &[],
3143            &ScriptedHttp::new(),
3144            &MemFs::new(),
3145            &StubFfmpeg::flac(),
3146            &RecordingClock::new(),
3147            &ExecOptions::default(),
3148        );
3149        assert_eq!(outcome.skipped, 1);
3150        assert_eq!(outcome.failed(), 0);
3151    }
3152
3153    // ── Pure helpers ────────────────────────────────────────────────
3154
3155    #[test]
3156    fn header_helpers_parse_or_ignore() {
3157        let resp = HttpResponse {
3158            status: 200,
3159            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3160            body: Vec::new(),
3161        };
3162        assert_eq!(content_length(&resp), Some(42));
3163
3164        let bare = HttpResponse {
3165            status: 200,
3166            headers: Vec::new(),
3167            body: Vec::new(),
3168        };
3169        assert_eq!(content_length(&bare), None);
3170    }
3171
3172    #[test]
3173    fn preserve_rule_covers_copy_and_private() {
3174        let base = desired(clip("x"), AudioFormat::Mp3);
3175        assert!(!preserve_for(&base));
3176        let mut copy_held = base.clone();
3177        copy_held.modes = vec![SourceMode::Copy];
3178        assert!(preserve_for(&copy_held));
3179        let mut private = base.clone();
3180        private.private = true;
3181        assert!(preserve_for(&private));
3182    }
3183
3184    fn fs_new() -> MemFs {
3185        MemFs::new()
3186    }
3187
3188    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3189
3190    #[test]
3191    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3192        let c = clip("s1");
3193        let mut d = desired(c.clone(), AudioFormat::Mp3);
3194        d.modes = vec![SourceMode::Copy];
3195        let plan = Plan {
3196            actions: vec![Action::Skip {
3197                clip_id: "s1".to_owned(),
3198            }],
3199        };
3200        let mut manifest = Manifest::new();
3201        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3202        assert!(!manifest.get("s1").unwrap().preserve);
3203
3204        let outcome = run(
3205            &plan,
3206            &mut manifest,
3207            &[d],
3208            &ScriptedHttp::new(),
3209            &fs_new(),
3210            &StubFfmpeg::flac(),
3211            &RecordingClock::new(),
3212            &ExecOptions::default(),
3213        );
3214
3215        assert_eq!(outcome.skipped, 1);
3216        assert!(
3217            manifest.get("s1").unwrap().preserve,
3218            "a copy-held skip must mark the entry preserved"
3219        );
3220    }
3221
3222    #[test]
3223    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3224        let c = clip("s2");
3225        let d = desired(c.clone(), AudioFormat::Mp3);
3226        let plan = Plan {
3227            actions: vec![Action::Skip {
3228                clip_id: "s2".to_owned(),
3229            }],
3230        };
3231        let mut manifest = Manifest::new();
3232        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3233        stale.preserve = true;
3234        manifest.insert("s2".to_owned(), stale);
3235
3236        run(
3237            &plan,
3238            &mut manifest,
3239            &[d],
3240            &ScriptedHttp::new(),
3241            &fs_new(),
3242            &StubFfmpeg::flac(),
3243            &RecordingClock::new(),
3244            &ExecOptions::default(),
3245        );
3246
3247        assert!(
3248            !manifest.get("s2").unwrap().preserve,
3249            "a mirror-only skip must clear a stale preserve marker"
3250        );
3251    }
3252
3253    #[test]
3254    fn flac_render_retries_a_rate_limited_wav_lookup() {
3255        let c = clip("rl");
3256        let d = desired(c.clone(), AudioFormat::Flac);
3257        let plan = Plan {
3258            actions: vec![Action::Download {
3259                clip: c.clone(),
3260                lineage: LineageContext::own_root(&c),
3261                path: d.path.clone(),
3262                format: AudioFormat::Flac,
3263            }],
3264        };
3265        let http = ScriptedHttp::new()
3266            .with_auth()
3267            .route_seq(
3268                "/wav_file/",
3269                vec![
3270                    Reply::status(429),
3271                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3272                ],
3273            )
3274            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3275        let clock = RecordingClock::new();
3276        let mut manifest = Manifest::new();
3277
3278        let outcome = run(
3279            &plan,
3280            &mut manifest,
3281            &[d],
3282            &http,
3283            &fs_new(),
3284            &StubFfmpeg::flac(),
3285            &clock,
3286            &small_poll(),
3287        );
3288
3289        assert_eq!(outcome.downloaded, 1);
3290        assert_eq!(outcome.failed(), 0);
3291        // The render was ready on retry, so no fresh convert_wav was needed.
3292        assert_eq!(http.count("/convert_wav/"), 0);
3293        // One transient backoff (1s base), not the 5s poll interval.
3294        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3295    }
3296
3297    // ── Phase 6: artifact actions ───────────────────────────────────
3298
3299    #[test]
3300    fn write_artifact_fetches_writes_and_updates_manifest() {
3301        // The owning entry exists (its audio was kept this run); WriteArtifact
3302        // fetches the source, writes the sidecar, and records it on the entry.
3303        let mut manifest = Manifest::new();
3304        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3305        let plan = Plan {
3306            actions: vec![Action::WriteArtifact {
3307                kind: ArtifactKind::CoverJpg,
3308                path: "a/cover.jpg".to_owned(),
3309                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3310                hash: "h1".to_owned(),
3311                owner_id: "a".to_owned(),
3312                content: None,
3313            }],
3314        };
3315        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3316        let fs = MemFs::new();
3317
3318        let outcome = run(
3319            &plan,
3320            &mut manifest,
3321            &[],
3322            &http,
3323            &fs,
3324            &StubFfmpeg::flac(),
3325            &RecordingClock::new(),
3326            &ExecOptions::default(),
3327        );
3328
3329        assert_eq!(outcome.artifacts_written, 1);
3330        assert_eq!(outcome.failed(), 0);
3331        assert_eq!(outcome.status, RunStatus::Completed);
3332        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3333        assert_eq!(
3334            manifest.get("a").unwrap().cover_jpg,
3335            Some(ArtifactState {
3336                path: "a/cover.jpg".to_owned(),
3337                hash: "h1".to_owned(),
3338            })
3339        );
3340    }
3341
3342    #[test]
3343    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3344        // A generated text sidecar carries its body inline, so it is written
3345        // verbatim with NO HTTP fetch and the details slot records its state.
3346        let mut manifest = Manifest::new();
3347        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3348        let plan = Plan {
3349            actions: vec![Action::WriteArtifact {
3350                kind: ArtifactKind::DetailsTxt,
3351                path: "a.details.txt".to_owned(),
3352                source_url: String::new(),
3353                hash: "dh".to_owned(),
3354                owner_id: "a".to_owned(),
3355                content: Some("Title: A\n".to_owned()),
3356            }],
3357        };
3358        // An empty HTTP script: any fetch would fail, proving none happens.
3359        let http = ScriptedHttp::new();
3360        let fs = MemFs::new();
3361
3362        let outcome = run(
3363            &plan,
3364            &mut manifest,
3365            &[],
3366            &http,
3367            &fs,
3368            &StubFfmpeg::flac(),
3369            &RecordingClock::new(),
3370            &ExecOptions::default(),
3371        );
3372
3373        assert_eq!(outcome.artifacts_written, 1);
3374        assert_eq!(outcome.failed(), 0);
3375        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3376        assert_eq!(
3377            manifest.get("a").unwrap().details_txt,
3378            Some(ArtifactState {
3379                path: "a.details.txt".to_owned(),
3380                hash: "dh".to_owned(),
3381            })
3382        );
3383    }
3384
3385    #[test]
3386    fn write_lyrics_sidecar_relocation_removes_old_file() {
3387        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3388        // the executor writes the new file and prunes the stale one.
3389        let mut manifest = Manifest::new();
3390        let mut e = entry("old/a.flac", AudioFormat::Flac);
3391        e.lyrics_txt = Some(ArtifactState {
3392            path: "old/a.lyrics.txt".to_owned(),
3393            hash: "lh".to_owned(),
3394        });
3395        manifest.insert("a", e);
3396        let fs = MemFs::new()
3397            .with_file("old/a.flac", b"AUDIO".to_vec())
3398            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3399        let plan = Plan {
3400            actions: vec![Action::WriteArtifact {
3401                kind: ArtifactKind::LyricsTxt,
3402                path: "new/a.lyrics.txt".to_owned(),
3403                source_url: String::new(),
3404                hash: "lh".to_owned(),
3405                owner_id: "a".to_owned(),
3406                content: Some("new words\n".to_owned()),
3407            }],
3408        };
3409
3410        let outcome = run(
3411            &plan,
3412            &mut manifest,
3413            &[],
3414            &ScriptedHttp::new(),
3415            &fs,
3416            &StubFfmpeg::flac(),
3417            &RecordingClock::new(),
3418            &ExecOptions::default(),
3419        );
3420
3421        assert_eq!(outcome.failed(), 0);
3422        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3423        assert!(!fs.exists("old/a.lyrics.txt"));
3424        assert_eq!(
3425            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3426            "new/a.lyrics.txt"
3427        );
3428    }
3429
3430    #[test]
3431    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3432        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3433        // Each write's inline old-path cleanup must skip a path another action
3434        // writes this run, or the second write would delete the first's freshly
3435        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3436        // for every sidecar, including the .mp4 video.
3437        let mut manifest = Manifest::new();
3438        let mut a = entry("a.flac", AudioFormat::Flac);
3439        a.lyrics_txt = Some(ArtifactState {
3440            path: "x.lyrics.txt".to_owned(),
3441            hash: "ah".to_owned(),
3442        });
3443        manifest.insert("a", a);
3444        let mut b = entry("b.flac", AudioFormat::Flac);
3445        b.lyrics_txt = Some(ArtifactState {
3446            path: "y.lyrics.txt".to_owned(),
3447            hash: "bh".to_owned(),
3448        });
3449        manifest.insert("b", b);
3450        let fs = MemFs::new()
3451            .with_file("a.flac", b"A".to_vec())
3452            .with_file("b.flac", b"B".to_vec())
3453            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3454            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3455        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3456        let plan = Plan {
3457            actions: vec![
3458                Action::WriteArtifact {
3459                    kind: ArtifactKind::LyricsTxt,
3460                    path: "y.lyrics.txt".to_owned(),
3461                    source_url: String::new(),
3462                    hash: "ah".to_owned(),
3463                    owner_id: "a".to_owned(),
3464                    content: Some("A words\n".to_owned()),
3465                },
3466                Action::WriteArtifact {
3467                    kind: ArtifactKind::LyricsTxt,
3468                    path: "x.lyrics.txt".to_owned(),
3469                    source_url: String::new(),
3470                    hash: "bh".to_owned(),
3471                    owner_id: "b".to_owned(),
3472                    content: Some("B words\n".to_owned()),
3473                },
3474            ],
3475        };
3476
3477        let outcome = run(
3478            &plan,
3479            &mut manifest,
3480            &[],
3481            &ScriptedHttp::new(),
3482            &fs,
3483            &StubFfmpeg::flac(),
3484            &RecordingClock::new(),
3485            &ExecOptions::default(),
3486        );
3487
3488        assert_eq!(outcome.failed(), 0);
3489        // Both freshly written files survive; neither cleanup clobbered the other.
3490        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3491        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3492        assert_eq!(
3493            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3494            "y.lyrics.txt"
3495        );
3496        assert_eq!(
3497            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3498            "x.lyrics.txt"
3499        );
3500    }
3501
3502    #[test]
3503    fn old_sidecar_kept_when_another_clip_still_references_it() {
3504        // A prior failed swap can leave two clips pointing at one path (A -> y and
3505        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3506        // still A's live file (#76). tracked_paths counts two references to y, so
3507        // the removal is skipped even though y is not a write target this run.
3508        let mut manifest = Manifest::new();
3509        let mut a = entry("a.flac", AudioFormat::Flac);
3510        a.lyrics_txt = Some(ArtifactState {
3511            path: "y.lyrics.txt".to_owned(),
3512            hash: "ah".to_owned(),
3513        });
3514        manifest.insert("a", a);
3515        let mut b = entry("b.flac", AudioFormat::Flac);
3516        b.lyrics_txt = Some(ArtifactState {
3517            path: "y.lyrics.txt".to_owned(),
3518            hash: "bh".to_owned(),
3519        });
3520        manifest.insert("b", b);
3521        let fs = MemFs::new()
3522            .with_file("a.flac", b"A".to_vec())
3523            .with_file("b.flac", b"B".to_vec())
3524            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3525        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3526        // the tracked-reference count is what protects A's file.
3527        let plan = Plan {
3528            actions: vec![Action::WriteArtifact {
3529                kind: ArtifactKind::LyricsTxt,
3530                path: "x.lyrics.txt".to_owned(),
3531                source_url: String::new(),
3532                hash: "bh".to_owned(),
3533                owner_id: "b".to_owned(),
3534                content: Some("B words\n".to_owned()),
3535            }],
3536        };
3537
3538        let outcome = run(
3539            &plan,
3540            &mut manifest,
3541            &[],
3542            &ScriptedHttp::new(),
3543            &fs,
3544            &StubFfmpeg::flac(),
3545            &RecordingClock::new(),
3546            &ExecOptions::default(),
3547        );
3548
3549        assert_eq!(outcome.failed(), 0);
3550        assert!(
3551            fs.exists("y.lyrics.txt"),
3552            "A's live sidecar must not be deleted"
3553        );
3554        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3555    }
3556
3557    #[test]
3558    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3559        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3560        // When BOTH move away this run, the path is no longer live, so the last
3561        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3562        // still referenced. The dynamic reference count drops to zero only after
3563        // both moves, so exactly the final cleanup removes it (#76).
3564        let mut manifest = Manifest::new();
3565        let mut a = entry("a.flac", AudioFormat::Flac);
3566        a.lyrics_txt = Some(ArtifactState {
3567            path: "s.lyrics.txt".to_owned(),
3568            hash: "ah".to_owned(),
3569        });
3570        manifest.insert("a", a);
3571        let mut b = entry("b.flac", AudioFormat::Flac);
3572        b.lyrics_txt = Some(ArtifactState {
3573            path: "s.lyrics.txt".to_owned(),
3574            hash: "bh".to_owned(),
3575        });
3576        manifest.insert("b", b);
3577        let fs = MemFs::new()
3578            .with_file("a.flac", b"A".to_vec())
3579            .with_file("b.flac", b"B".to_vec())
3580            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3581        let plan = Plan {
3582            actions: vec![
3583                Action::WriteArtifact {
3584                    kind: ArtifactKind::LyricsTxt,
3585                    path: "pa.lyrics.txt".to_owned(),
3586                    source_url: String::new(),
3587                    hash: "ah".to_owned(),
3588                    owner_id: "a".to_owned(),
3589                    content: Some("A words\n".to_owned()),
3590                },
3591                Action::WriteArtifact {
3592                    kind: ArtifactKind::LyricsTxt,
3593                    path: "pb.lyrics.txt".to_owned(),
3594                    source_url: String::new(),
3595                    hash: "bh".to_owned(),
3596                    owner_id: "b".to_owned(),
3597                    content: Some("B words\n".to_owned()),
3598                },
3599            ],
3600        };
3601
3602        let outcome = run(
3603            &plan,
3604            &mut manifest,
3605            &[],
3606            &ScriptedHttp::new(),
3607            &fs,
3608            &StubFfmpeg::flac(),
3609            &RecordingClock::new(),
3610            &ExecOptions::default(),
3611        );
3612
3613        assert_eq!(outcome.failed(), 0);
3614        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3615        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3616        assert!(
3617            !fs.exists("s.lyrics.txt"),
3618            "the vacated shared path must be reclaimed, not orphaned"
3619        );
3620    }
3621
3622    #[test]
3623    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3624        // A text sidecar for a clip with no manifest entry (its audio download
3625        // failed) must be skipped, never writing an untracked file.
3626        let plan = Plan {
3627            actions: vec![Action::WriteArtifact {
3628                kind: ArtifactKind::DetailsTxt,
3629                path: "gone.details.txt".to_owned(),
3630                source_url: String::new(),
3631                hash: "dh".to_owned(),
3632                owner_id: "gone".to_owned(),
3633                content: Some("Title: Gone\n".to_owned()),
3634            }],
3635        };
3636        let fs = MemFs::new();
3637        let mut manifest = Manifest::new();
3638
3639        let outcome = run(
3640            &plan,
3641            &mut manifest,
3642            &[],
3643            &ScriptedHttp::new(),
3644            &fs,
3645            &StubFfmpeg::flac(),
3646            &RecordingClock::new(),
3647            &ExecOptions::default(),
3648        );
3649
3650        assert_eq!(outcome.artifacts_written, 0);
3651        assert_eq!(outcome.skipped, 1);
3652        assert!(!fs.exists("gone.details.txt"));
3653        assert!(manifest.get("gone").is_none());
3654    }
3655
3656    #[test]
3657    fn delete_artifact_removes_file_and_clears_slot() {
3658        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3659        let mut manifest = Manifest::new();
3660        let mut e = entry("a.mp3", AudioFormat::Mp3);
3661        e.cover_jpg = Some(ArtifactState {
3662            path: "a/cover.jpg".to_owned(),
3663            hash: "h1".to_owned(),
3664        });
3665        manifest.insert("a", e);
3666        let plan = Plan {
3667            actions: vec![Action::DeleteArtifact {
3668                kind: ArtifactKind::CoverJpg,
3669                path: "a/cover.jpg".to_owned(),
3670                owner_id: "a".to_owned(),
3671            }],
3672        };
3673
3674        let outcome = run(
3675            &plan,
3676            &mut manifest,
3677            &[],
3678            &ScriptedHttp::new(),
3679            &fs,
3680            &StubFfmpeg::flac(),
3681            &RecordingClock::new(),
3682            &ExecOptions::default(),
3683        );
3684
3685        assert_eq!(outcome.artifacts_deleted, 1);
3686        assert!(!fs.exists("a/cover.jpg"));
3687        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3688    }
3689
3690    #[test]
3691    fn delete_artifact_tolerates_already_absent_file() {
3692        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3693        // is not a failure.
3694        let mut manifest = Manifest::new();
3695        let mut e = entry("a.mp3", AudioFormat::Mp3);
3696        e.cover_jpg = Some(ArtifactState {
3697            path: "a/cover.jpg".to_owned(),
3698            hash: "h1".to_owned(),
3699        });
3700        manifest.insert("a", e);
3701        let plan = Plan {
3702            actions: vec![Action::DeleteArtifact {
3703                kind: ArtifactKind::CoverJpg,
3704                path: "a/cover.jpg".to_owned(),
3705                owner_id: "a".to_owned(),
3706            }],
3707        };
3708
3709        let outcome = run(
3710            &plan,
3711            &mut manifest,
3712            &[],
3713            &ScriptedHttp::new(),
3714            &MemFs::new(),
3715            &StubFfmpeg::flac(),
3716            &RecordingClock::new(),
3717            &ExecOptions::default(),
3718        );
3719
3720        assert_eq!(outcome.artifacts_deleted, 1);
3721        assert_eq!(outcome.failed(), 0);
3722        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3723    }
3724
3725    #[test]
3726    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
3727        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
3728        // the run continues and the following WriteArtifact still succeeds.
3729        let mut manifest = Manifest::new();
3730        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3731        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3732        let plan = Plan {
3733            actions: vec![
3734                Action::WriteArtifact {
3735                    kind: ArtifactKind::CoverJpg,
3736                    path: "a/cover.jpg".to_owned(),
3737                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3738                    hash: "h1".to_owned(),
3739                    owner_id: "a".to_owned(),
3740                    content: None,
3741                },
3742                Action::WriteArtifact {
3743                    kind: ArtifactKind::CoverJpg,
3744                    path: "b/cover.jpg".to_owned(),
3745                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3746                    hash: "h2".to_owned(),
3747                    owner_id: "b".to_owned(),
3748                    content: None,
3749                },
3750            ],
3751        };
3752        let http = ScriptedHttp::new()
3753            .route("a/large.jpg", Reply::status(404))
3754            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3755        let fs = MemFs::new();
3756
3757        let outcome = run(
3758            &plan,
3759            &mut manifest,
3760            &[],
3761            &http,
3762            &fs,
3763            &StubFfmpeg::flac(),
3764            &RecordingClock::new(),
3765            &ExecOptions::default(),
3766        );
3767
3768        assert_eq!(outcome.status, RunStatus::Completed);
3769        assert_eq!(outcome.failed(), 1);
3770        assert_eq!(outcome.failures[0].clip_id, "a");
3771        assert_eq!(outcome.artifacts_written, 1);
3772        // The failed sidecar left no file and no manifest record.
3773        assert!(!fs.exists("a/cover.jpg"));
3774        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3775        // The following sidecar was written and recorded.
3776        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3777        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3778    }
3779
3780    #[test]
3781    fn co_delete_executes_audio_delete_then_artifact_delete() {
3782        // The plan orders the audio Delete before its sidecar DeleteArtifact.
3783        // The audio delete removes the manifest entry; the sidecar delete then
3784        // removes the file and tolerates the now-absent entry.
3785        let fs = MemFs::new()
3786            .with_file("gone.mp3", b"DATA".to_vec())
3787            .with_file("gone/cover.jpg", b"jpg".to_vec());
3788        let mut manifest = Manifest::new();
3789        let mut e = entry("gone.mp3", AudioFormat::Mp3);
3790        e.cover_jpg = Some(ArtifactState {
3791            path: "gone/cover.jpg".to_owned(),
3792            hash: "h1".to_owned(),
3793        });
3794        manifest.insert("gone", e);
3795        let plan = Plan {
3796            actions: vec![
3797                Action::Delete {
3798                    path: "gone.mp3".to_owned(),
3799                    clip_id: "gone".to_owned(),
3800                },
3801                Action::DeleteArtifact {
3802                    kind: ArtifactKind::CoverJpg,
3803                    path: "gone/cover.jpg".to_owned(),
3804                    owner_id: "gone".to_owned(),
3805                },
3806            ],
3807        };
3808
3809        let outcome = run(
3810            &plan,
3811            &mut manifest,
3812            &[],
3813            &ScriptedHttp::new(),
3814            &fs,
3815            &StubFfmpeg::flac(),
3816            &RecordingClock::new(),
3817            &ExecOptions::default(),
3818        );
3819
3820        assert_eq!(outcome.deleted, 1);
3821        assert_eq!(outcome.artifacts_deleted, 1);
3822        assert_eq!(outcome.failed(), 0);
3823        assert!(!fs.exists("gone.mp3"));
3824        assert!(!fs.exists("gone/cover.jpg"));
3825        assert!(manifest.get("gone").is_none());
3826    }
3827
3828    #[test]
3829    fn write_stem_mp3_stores_raw_and_records_slot() {
3830        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
3831        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
3832        // keyed slot records the path and hash.
3833        let mut manifest = Manifest::new();
3834        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
3835        let plan = Plan {
3836            actions: vec![Action::WriteStem {
3837                clip_id: "a".to_owned(),
3838                key: "voc".to_owned(),
3839                stem_id: "voc".to_owned(),
3840                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
3841                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
3842                format: StemFormat::Mp3,
3843                hash: "vh".to_owned(),
3844            }],
3845        };
3846        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
3847        let fs = MemFs::new();
3848
3849        let outcome = run(
3850            &plan,
3851            &mut manifest,
3852            &[],
3853            &http,
3854            &fs,
3855            &StubFfmpeg::flac(),
3856            &RecordingClock::new(),
3857            &ExecOptions::default(),
3858        );
3859
3860        assert_eq!(outcome.artifacts_written, 1);
3861        assert_eq!(outcome.failed(), 0);
3862        // Bytes are stored exactly as delivered (no transcode applied).
3863        assert_eq!(
3864            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
3865            b"stem-bytes"
3866        );
3867        // An MP3 stem never renders WAV: no convert_wav, no generation.
3868        assert_eq!(http.count("convert_wav"), 0);
3869        assert_eq!(http.count("/api/gen/"), 0);
3870        assert_eq!(
3871            manifest.get("a").unwrap().stems.get("voc"),
3872            Some(&ArtifactState {
3873                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
3874                hash: "vh".to_owned(),
3875            })
3876        );
3877    }
3878
3879    #[test]
3880    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
3881        // A WAV stem (the default) renders the stem clip's lossless WAV through the
3882        // free convert_wav flow keyed on the stem id, then downloads and stores it
3883        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
3884        let mut manifest = Manifest::new();
3885        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
3886        let plan = Plan {
3887            actions: vec![Action::WriteStem {
3888                clip_id: "a".to_owned(),
3889                key: "voc".to_owned(),
3890                stem_id: "stemvoc".to_owned(),
3891                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
3892                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
3893                format: StemFormat::Wav,
3894                hash: "vh".to_owned(),
3895            }],
3896        };
3897        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
3898        // (free) and polls again — exactly the main FLAC/WAV render path.
3899        let http = ScriptedHttp::new()
3900            .with_auth()
3901            .route_seq(
3902                "stemvoc/wav_file/",
3903                vec![
3904                    Reply::json("{}"),
3905                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
3906                ],
3907            )
3908            .route("stemvoc/convert_wav/", Reply::status(200))
3909            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
3910        let fs = MemFs::new();
3911
3912        let outcome = run(
3913            &plan,
3914            &mut manifest,
3915            &[],
3916            &http,
3917            &fs,
3918            &StubFfmpeg::flac(),
3919            &RecordingClock::new(),
3920            &small_poll(),
3921        );
3922
3923        assert_eq!(outcome.artifacts_written, 1);
3924        assert_eq!(outcome.failed(), 0);
3925        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
3926        // so the stored bytes are the raw WAV, not a FLAC transcode.
3927        assert_eq!(
3928            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
3929            b"RIFFwav-bytes"
3930        );
3931        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
3932        // The free WAV render ran; no credit-spending generation endpoint did.
3933        assert_eq!(http.count("convert_wav"), 1);
3934        assert_eq!(http.count("stem_task"), 0);
3935        assert_eq!(http.count("separate"), 0);
3936        assert_eq!(
3937            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
3938            "a.stems/a - Vocals [stemvoc].wav"
3939        );
3940    }
3941
3942    #[test]
3943    fn write_stem_is_skipped_when_owner_audio_is_absent() {
3944        // No owning manifest entry (audio failed or never existed) => skip with
3945        // no fetch and no write, so a stem is never stranded without its song.
3946        let mut manifest = Manifest::new();
3947        let plan = Plan {
3948            actions: vec![Action::WriteStem {
3949                clip_id: "ghost".to_owned(),
3950                key: "voc".to_owned(),
3951                stem_id: "voc".to_owned(),
3952                path: "ghost.stems/voc.mp3".to_owned(),
3953                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
3954                format: StemFormat::Mp3,
3955                hash: "vh".to_owned(),
3956            }],
3957        };
3958        // Empty HTTP script: any fetch would error, proving none happens.
3959        let http = ScriptedHttp::new();
3960        let fs = MemFs::new();
3961
3962        let outcome = run(
3963            &plan,
3964            &mut manifest,
3965            &[],
3966            &http,
3967            &fs,
3968            &StubFfmpeg::flac(),
3969            &RecordingClock::new(),
3970            &ExecOptions::default(),
3971        );
3972
3973        assert_eq!(outcome.skipped, 1);
3974        assert_eq!(outcome.artifacts_written, 0);
3975        assert_eq!(outcome.failed(), 0);
3976        assert!(!fs.exists("ghost.stems/voc.mp3"));
3977    }
3978
3979    #[test]
3980    fn write_stem_relocates_the_old_file_on_a_path_move() {
3981        // The song was renamed, so the stem moves: the new file is written and the
3982        // stale copy at the previously tracked path is removed (moved, not orphaned).
3983        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
3984        let mut manifest = Manifest::new();
3985        let mut e = entry("new.flac", AudioFormat::Flac);
3986        e.stems.insert(
3987            "voc".to_owned(),
3988            ArtifactState {
3989                path: "old.stems/voc.mp3".to_owned(),
3990                hash: "vh".to_owned(),
3991            },
3992        );
3993        manifest.insert("a", e);
3994        let plan = Plan {
3995            actions: vec![Action::WriteStem {
3996                clip_id: "a".to_owned(),
3997                key: "voc".to_owned(),
3998                stem_id: "voc".to_owned(),
3999                path: "new.stems/voc.mp3".to_owned(),
4000                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4001                format: StemFormat::Mp3,
4002                hash: "vh".to_owned(),
4003            }],
4004        };
4005        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4006
4007        let outcome = run(
4008            &plan,
4009            &mut manifest,
4010            &[],
4011            &http,
4012            &fs,
4013            &StubFfmpeg::flac(),
4014            &RecordingClock::new(),
4015            &ExecOptions::default(),
4016        );
4017
4018        assert_eq!(outcome.artifacts_written, 1);
4019        assert!(fs.exists("new.stems/voc.mp3"));
4020        assert!(
4021            !fs.exists("old.stems/voc.mp3"),
4022            "the old stem is moved, not left behind"
4023        );
4024        assert_eq!(
4025            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4026            "new.stems/voc.mp3"
4027        );
4028    }
4029
4030    #[test]
4031    fn delete_stem_removes_file_and_clears_slot() {
4032        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4033        let mut manifest = Manifest::new();
4034        let mut e = entry("a.flac", AudioFormat::Flac);
4035        e.stems.insert(
4036            "voc".to_owned(),
4037            ArtifactState {
4038                path: "a.stems/voc.mp3".to_owned(),
4039                hash: "vh".to_owned(),
4040            },
4041        );
4042        manifest.insert("a", e);
4043        let plan = Plan {
4044            actions: vec![Action::DeleteStem {
4045                clip_id: "a".to_owned(),
4046                key: "voc".to_owned(),
4047                path: "a.stems/voc.mp3".to_owned(),
4048            }],
4049        };
4050
4051        let outcome = run(
4052            &plan,
4053            &mut manifest,
4054            &[],
4055            &ScriptedHttp::new(),
4056            &fs,
4057            &StubFfmpeg::flac(),
4058            &RecordingClock::new(),
4059            &ExecOptions::default(),
4060        );
4061
4062        assert_eq!(outcome.artifacts_deleted, 1);
4063        assert!(!fs.exists("a.stems/voc.mp3"));
4064        assert!(manifest.get("a").unwrap().stems.is_empty());
4065    }
4066
4067    #[test]
4068    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4069        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4070        // pruned by the end-of-run sweep, so it can never be orphaned.
4071        let fs = MemFs::new()
4072            .with_file("song.flac", b"DATA".to_vec())
4073            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4074        assert!(fs.has_dir("song.stems"));
4075        let mut manifest = Manifest::new();
4076        let mut e = entry("song.flac", AudioFormat::Flac);
4077        e.stems.insert(
4078            "voc".to_owned(),
4079            ArtifactState {
4080                path: "song.stems/voc.mp3".to_owned(),
4081                hash: "vh".to_owned(),
4082            },
4083        );
4084        manifest.insert("a", e);
4085        let plan = Plan {
4086            actions: vec![
4087                Action::Delete {
4088                    path: "song.flac".to_owned(),
4089                    clip_id: "a".to_owned(),
4090                },
4091                Action::DeleteStem {
4092                    clip_id: "a".to_owned(),
4093                    key: "voc".to_owned(),
4094                    path: "song.stems/voc.mp3".to_owned(),
4095                },
4096            ],
4097        };
4098
4099        let outcome = run(
4100            &plan,
4101            &mut manifest,
4102            &[],
4103            &ScriptedHttp::new(),
4104            &fs,
4105            &StubFfmpeg::flac(),
4106            &RecordingClock::new(),
4107            &ExecOptions::default(),
4108        );
4109
4110        assert_eq!(outcome.deleted, 1);
4111        assert_eq!(outcome.artifacts_deleted, 1);
4112        assert!(!fs.exists("song.flac"));
4113        assert!(!fs.exists("song.stems/voc.mp3"));
4114        assert!(
4115            !fs.has_dir("song.stems"),
4116            "the emptied .stems folder is pruned"
4117        );
4118        assert!(manifest.get("a").is_none());
4119    }
4120
4121    #[test]
4122    fn write_stem_mp3_never_issues_a_generation_post() {
4123        // The MP3 stem path is GET-only: writing a stem fetches its CDN url and
4124        // never POSTs, let alone to any generation or WAV-render endpoint.
4125        let mut manifest = Manifest::new();
4126        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4127        let plan = Plan {
4128            actions: vec![Action::WriteStem {
4129                clip_id: "a".to_owned(),
4130                key: "voc".to_owned(),
4131                stem_id: "voc".to_owned(),
4132                path: "a.stems/voc.mp3".to_owned(),
4133                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4134                format: StemFormat::Mp3,
4135                hash: "vh".to_owned(),
4136            }],
4137        };
4138        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem".to_vec()));
4139
4140        run(
4141            &plan,
4142            &mut manifest,
4143            &[],
4144            &http,
4145            &MemFs::new(),
4146            &StubFfmpeg::flac(),
4147            &RecordingClock::new(),
4148            &ExecOptions::default(),
4149        );
4150
4151        assert_eq!(
4152            http.count("stem_task"),
4153            0,
4154            "no generation endpoint is ever hit"
4155        );
4156        assert_eq!(http.count("convert_wav"), 0);
4157        assert_eq!(http.count("/api/gen/"), 0);
4158    }
4159
4160    #[test]
4161    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4162        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4163        // GET over the live page-count + 0-indexed page shape), reconcile them into
4164        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4165        // is GET-only and touches NO `/api/gen/` endpoint at all.
4166        let http = ScriptedHttp::new()
4167            .with_auth()
4168            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4169            .route(
4170                "clip1/stems?page=0",
4171                Reply::json(
4172                    r#"{"stems":[
4173                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4174                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4175                    ]}"#,
4176                ),
4177            )
4178            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
4179            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
4180
4181        // List the existing stems through the client (GET-only, free).
4182        let mut auth = ClerkAuth::new("eyJtoken");
4183        pollster::block_on(auth.authenticate(&http)).unwrap();
4184        let mut client = SunoClient::new(auth, RecordingClock::new());
4185        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4186        assert!(complete);
4187        assert_eq!(stems.len(), 2);
4188        assert_eq!(stems[0].label, "Vocals");
4189
4190        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
4191        let mut manifest = Manifest::new();
4192        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4193        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4194            .iter()
4195            .map(|s| crate::reconcile::DesiredStem {
4196                key: s.id.clone(),
4197                stem_id: s.id.clone(),
4198                path: format!("clip1.stems/{}.mp3", s.id),
4199                source_url: s.url.clone(),
4200                format: StemFormat::Mp3,
4201                hash: crate::art_url_hash(&s.url),
4202            })
4203            .collect();
4204        let d = Desired {
4205            path: "clip1.flac".to_owned(),
4206            stems: Some(desired_stems),
4207            ..desired(clip("clip1"), AudioFormat::Flac)
4208        };
4209        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4210            "clip1".to_owned(),
4211            crate::reconcile::LocalFile {
4212                exists: true,
4213                size: 100,
4214            },
4215        )]
4216        .into_iter()
4217        .collect();
4218        let sources = [crate::reconcile::SourceStatus {
4219            mode: SourceMode::Mirror,
4220            fully_enumerated: true,
4221        }];
4222        let plan =
4223            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4224        assert_eq!(plan.stem_writes(), 2);
4225
4226        let fs = MemFs::new();
4227        let outcome = run(
4228            &plan,
4229            &mut manifest,
4230            std::slice::from_ref(&d),
4231            &http,
4232            &fs,
4233            &StubFfmpeg::flac(),
4234            &RecordingClock::new(),
4235            &ExecOptions::default(),
4236        );
4237
4238        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
4239        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
4240        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
4241        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
4242        // generation, no separation).
4243        assert_eq!(http.count("/api/gen/"), 0);
4244        assert_eq!(http.count("stem_task"), 0);
4245        assert_eq!(http.count("separate"), 0);
4246        assert_eq!(http.count("generate"), 0);
4247        // No stem is ever written as FLAC.
4248        assert!(!fs.exists("clip1.stems/s1.flac"));
4249    }
4250
4251    #[test]
4252    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
4253        // End-to-end #100 path with WAV stems (the default): each stem's lossless
4254        // WAV is rendered through the FREE convert_wav flow and stored RAW as
4255        // `.wav`. The mirror makes NO credit-spending generation POST.
4256        let http = ScriptedHttp::new()
4257            .with_auth()
4258            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4259            .route(
4260                "clip1/stems?page=0",
4261                Reply::json(
4262                    r#"{"stems":[
4263                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4264                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4265                    ]}"#,
4266                ),
4267            )
4268            // Each stem's WAV is already rendered, so wav_file returns the url and
4269            // no convert_wav POST is even needed (still free either way).
4270            .route(
4271                "s1/wav_file/",
4272                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
4273            )
4274            .route(
4275                "s2/wav_file/",
4276                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
4277            )
4278            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
4279            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
4280
4281        let mut auth = ClerkAuth::new("eyJtoken");
4282        pollster::block_on(auth.authenticate(&http)).unwrap();
4283        let mut client = SunoClient::new(auth, RecordingClock::new());
4284        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4285
4286        let mut manifest = Manifest::new();
4287        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4288        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4289            .iter()
4290            .map(|s| crate::reconcile::DesiredStem {
4291                key: s.id.clone(),
4292                stem_id: s.id.clone(),
4293                path: format!("clip1.stems/{}.wav", s.id),
4294                source_url: s.url.clone(),
4295                format: StemFormat::Wav,
4296                hash: crate::art_url_hash(&s.url),
4297            })
4298            .collect();
4299        let d = Desired {
4300            path: "clip1.flac".to_owned(),
4301            stems: Some(desired_stems),
4302            ..desired(clip("clip1"), AudioFormat::Flac)
4303        };
4304        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4305            "clip1".to_owned(),
4306            crate::reconcile::LocalFile {
4307                exists: true,
4308                size: 100,
4309            },
4310        )]
4311        .into_iter()
4312        .collect();
4313        let sources = [crate::reconcile::SourceStatus {
4314            mode: SourceMode::Mirror,
4315            fully_enumerated: true,
4316        }];
4317        let plan =
4318            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4319
4320        let fs = MemFs::new();
4321        let outcome = run(
4322            &plan,
4323            &mut manifest,
4324            std::slice::from_ref(&d),
4325            &http,
4326            &fs,
4327            &StubFfmpeg::flac(),
4328            &RecordingClock::new(),
4329            &small_poll(),
4330        );
4331
4332        assert_eq!(outcome.artifacts_written, 2);
4333        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
4334        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
4335        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
4336        assert!(!fs.exists("clip1.stems/s1.flac"));
4337        // No credit-spending generation/separation endpoint is ever hit.
4338        assert_eq!(http.count("stem_task"), 0);
4339        assert_eq!(http.count("separate"), 0);
4340        assert_eq!(http.count("generate"), 0);
4341    }
4342
4343    #[test]
4344    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
4345        // A clip whose Download fails leaves no manifest entry, so its following
4346        // WriteArtifact must not strand an untracked sidecar: it is skipped with
4347        // no fetch and no write. A following healthy clip still succeeds.
4348        let ca = clip("a");
4349        let plan = Plan {
4350            actions: vec![
4351                Action::Download {
4352                    clip: ca.clone(),
4353                    lineage: LineageContext::own_root(&ca),
4354                    path: "a.mp3".to_owned(),
4355                    format: AudioFormat::Mp3,
4356                },
4357                Action::WriteArtifact {
4358                    kind: ArtifactKind::CoverJpg,
4359                    path: "a/cover.jpg".to_owned(),
4360                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4361                    hash: "h1".to_owned(),
4362                    owner_id: "a".to_owned(),
4363                    content: None,
4364                },
4365                Action::WriteArtifact {
4366                    kind: ArtifactKind::CoverJpg,
4367                    path: "b/cover.jpg".to_owned(),
4368                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4369                    hash: "h2".to_owned(),
4370                    owner_id: "b".to_owned(),
4371                    content: None,
4372                },
4373            ],
4374        };
4375        // The Download's audio 404s (permanent), so no entry for "a" is created.
4376        let http = ScriptedHttp::new()
4377            .route("a.mp3", Reply::status(404))
4378            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4379            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4380        let fs = MemFs::new();
4381        let mut manifest = Manifest::new();
4382        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
4383        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4384
4385        let outcome = run(
4386            &plan,
4387            &mut manifest,
4388            &[],
4389            &http,
4390            &fs,
4391            &StubFfmpeg::flac(),
4392            &RecordingClock::new(),
4393            &ExecOptions::default(),
4394        );
4395
4396        assert_eq!(outcome.status, RunStatus::Completed);
4397        // The audio download is the only failure; the orphan artifact is skipped.
4398        assert_eq!(outcome.failed(), 1);
4399        assert_eq!(outcome.failures[0].clip_id, "a");
4400        assert_eq!(outcome.skipped, 1);
4401        // The orphan sidecar was neither fetched nor written, and left no record.
4402        assert_eq!(http.count("a/large.jpg"), 0);
4403        assert!(!fs.exists("a/cover.jpg"));
4404        assert!(manifest.get("a").is_none());
4405        // The healthy clip's sidecar still succeeded.
4406        assert_eq!(outcome.artifacts_written, 1);
4407        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4408        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4409    }
4410
4411    #[test]
4412    fn write_artifact_transcodes_animated_cover_to_webp() {
4413        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
4414        // port, and writes the transcoded WebP (not the fetched MP4), recording
4415        // the sidecar on the owning entry.
4416        let mut manifest = Manifest::new();
4417        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4418        let plan = Plan {
4419            actions: vec![Action::WriteArtifact {
4420                kind: ArtifactKind::CoverWebp,
4421                path: "a/cover.webp".to_owned(),
4422                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4423                hash: "v1".to_owned(),
4424                owner_id: "a".to_owned(),
4425                content: None,
4426            }],
4427        };
4428        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4429        let fs = MemFs::new();
4430        let ffmpeg = StubFfmpeg::webp();
4431
4432        let outcome = run(
4433            &plan,
4434            &mut manifest,
4435            &[],
4436            &http,
4437            &fs,
4438            &ffmpeg,
4439            &RecordingClock::new(),
4440            &ExecOptions::default(),
4441        );
4442
4443        assert_eq!(outcome.artifacts_written, 1);
4444        assert_eq!(outcome.failed(), 0);
4445        assert_eq!(outcome.status, RunStatus::Completed);
4446        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
4447        assert_eq!(http.count("a/video.mp4"), 1);
4448        let written = fs.read_file("a/cover.webp").unwrap();
4449        assert_ne!(written, b"mp4-bytes");
4450        assert!(written.starts_with(b"RIFF"));
4451        assert_eq!(
4452            manifest.get("a").unwrap().cover_webp,
4453            Some(ArtifactState {
4454                path: "a/cover.webp".to_owned(),
4455                hash: "v1".to_owned(),
4456            })
4457        );
4458    }
4459
4460    #[test]
4461    fn write_artifact_webp_transcode_failure_is_per_clip() {
4462        // A transcode failure is attributed to the owning clip: it is a per-clip
4463        // failure, the run completes, no sidecar is written, and the slot stays
4464        // empty. A healthy static cover in the same run still succeeds.
4465        let mut manifest = Manifest::new();
4466        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4467        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4468        let plan = Plan {
4469            actions: vec![
4470                Action::WriteArtifact {
4471                    kind: ArtifactKind::CoverWebp,
4472                    path: "a/cover.webp".to_owned(),
4473                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4474                    hash: "v1".to_owned(),
4475                    owner_id: "a".to_owned(),
4476                    content: None,
4477                },
4478                Action::WriteArtifact {
4479                    kind: ArtifactKind::CoverJpg,
4480                    path: "b/cover.jpg".to_owned(),
4481                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4482                    hash: "h1".to_owned(),
4483                    owner_id: "b".to_owned(),
4484                    content: None,
4485                },
4486            ],
4487        };
4488        let http = ScriptedHttp::new()
4489            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
4490            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4491        let fs = MemFs::new();
4492
4493        let outcome = run(
4494            &plan,
4495            &mut manifest,
4496            &[],
4497            &http,
4498            &fs,
4499            &StubFfmpeg::failing(),
4500            &RecordingClock::new(),
4501            &ExecOptions::default(),
4502        );
4503
4504        assert_eq!(outcome.status, RunStatus::Completed);
4505        assert_eq!(outcome.failed(), 1);
4506        assert_eq!(outcome.failures[0].clip_id, "a");
4507        // The animated cover failed to transcode: nothing written, slot empty.
4508        assert!(!fs.exists("a/cover.webp"));
4509        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
4510        // The static cover in the same run still succeeded.
4511        assert_eq!(outcome.artifacts_written, 1);
4512        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4513        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4514    }
4515
4516    #[test]
4517    fn write_artifact_uses_configured_webp_settings() {
4518        use std::sync::{Arc, Mutex};
4519
4520        struct RecordingWebpFfmpeg {
4521            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
4522        }
4523
4524        impl Ffmpeg for RecordingWebpFfmpeg {
4525            async fn wav_to_flac(
4526                &self,
4527                _wav: &[u8],
4528            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
4529                Ok(Vec::new())
4530            }
4531
4532            async fn mp4_to_webp(
4533                &self,
4534                _mp4: &[u8],
4535                settings: WebpEncodeSettings,
4536            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
4537                let seen = Arc::clone(&self.seen);
4538                seen.lock().unwrap().push(settings);
4539                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
4540            }
4541        }
4542
4543        let mut manifest = Manifest::new();
4544        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4545        let plan = Plan {
4546            actions: vec![Action::WriteArtifact {
4547                kind: ArtifactKind::CoverWebp,
4548                path: "a/cover.webp".to_owned(),
4549                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4550                hash: "v1".to_owned(),
4551                owner_id: "a".to_owned(),
4552                content: None,
4553            }],
4554        };
4555        let seen = Arc::new(Mutex::new(Vec::new()));
4556        let ffmpeg = RecordingWebpFfmpeg {
4557            seen: Arc::clone(&seen),
4558        };
4559        let opts = ExecOptions {
4560            cover_webp: WebpEncodeSettings {
4561                quality: 88,
4562                max_fps: 12,
4563                max_width: Some(720),
4564                lossless: false,
4565                compression_level: 4,
4566            },
4567            ..ExecOptions::default()
4568        };
4569
4570        let _ = run(
4571            &plan,
4572            &mut manifest,
4573            &[],
4574            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
4575            &MemFs::new(),
4576            &ffmpeg,
4577            &RecordingClock::new(),
4578            &opts,
4579        );
4580
4581        assert_eq!(
4582            seen.lock().unwrap().as_slice(),
4583            &[WebpEncodeSettings {
4584                quality: 88,
4585                max_fps: 12,
4586                max_width: Some(720),
4587                lossless: false,
4588                compression_level: 4,
4589            }]
4590        );
4591    }
4592
4593    // ── Phase 8: folder art routes to the album store ───────────────
4594
4595    #[test]
4596    fn folder_jpg_write_records_album_state_and_skips_manifest() {
4597        // Folder art is owned by the album root id, not a manifest clip: it
4598        // writes even with an empty manifest and records on the album store.
4599        let mut manifest = Manifest::new();
4600        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4601        let plan = Plan {
4602            actions: vec![Action::WriteArtifact {
4603                kind: ArtifactKind::FolderJpg,
4604                path: "creator/album/folder.jpg".to_owned(),
4605                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
4606                hash: "jh".to_owned(),
4607                owner_id: "root".to_owned(),
4608                content: None,
4609            }],
4610        };
4611        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
4612        let fs = MemFs::new();
4613
4614        let outcome = run_with_albums(
4615            &plan,
4616            &mut manifest,
4617            &mut albums,
4618            &[],
4619            &http,
4620            &fs,
4621            &StubFfmpeg::flac(),
4622            &RecordingClock::new(),
4623            &ExecOptions::default(),
4624        );
4625
4626        assert_eq!(outcome.artifacts_written, 1);
4627        assert_eq!(outcome.status, RunStatus::Completed);
4628        assert_eq!(
4629            fs.read_file("creator/album/folder.jpg").unwrap(),
4630            b"folder-jpg"
4631        );
4632        assert_eq!(
4633            albums.get("root").unwrap().folder_jpg,
4634            Some(ArtifactState {
4635                path: "creator/album/folder.jpg".to_owned(),
4636                hash: "jh".to_owned(),
4637            })
4638        );
4639        assert!(manifest.get("root").is_none());
4640    }
4641
4642    #[test]
4643    fn folder_webp_write_transcodes_and_records_album_state() {
4644        let mut manifest = Manifest::new();
4645        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4646        let plan = Plan {
4647            actions: vec![Action::WriteArtifact {
4648                kind: ArtifactKind::FolderWebp,
4649                path: "creator/album/cover.webp".to_owned(),
4650                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4651                hash: "wh".to_owned(),
4652                owner_id: "root".to_owned(),
4653                content: None,
4654            }],
4655        };
4656        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4657        let fs = MemFs::new();
4658
4659        let outcome = run_with_albums(
4660            &plan,
4661            &mut manifest,
4662            &mut albums,
4663            &[],
4664            &http,
4665            &fs,
4666            &StubFfmpeg::webp(),
4667            &RecordingClock::new(),
4668            &ExecOptions::default(),
4669        );
4670
4671        assert_eq!(outcome.artifacts_written, 1);
4672        assert_eq!(outcome.failed(), 0);
4673        // The MP4 was transcoded to WebP, not written verbatim.
4674        let written = fs.read_file("creator/album/cover.webp").unwrap();
4675        assert_ne!(written, b"mp4-bytes");
4676        assert!(written.starts_with(b"RIFF"));
4677        assert_eq!(
4678            albums.get("root").unwrap().folder_webp,
4679            Some(ArtifactState {
4680                path: "creator/album/cover.webp".to_owned(),
4681                hash: "wh".to_owned(),
4682            })
4683        );
4684    }
4685
4686    #[test]
4687    fn folder_art_delete_clears_album_state() {
4688        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
4689        let mut manifest = Manifest::new();
4690        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4691        albums.insert(
4692            "root".to_owned(),
4693            AlbumArt {
4694                folder_jpg: Some(ArtifactState {
4695                    path: "creator/album/folder.jpg".to_owned(),
4696                    hash: "jh".to_owned(),
4697                }),
4698                folder_webp: None,
4699            },
4700        );
4701        let plan = Plan {
4702            actions: vec![Action::DeleteArtifact {
4703                kind: ArtifactKind::FolderJpg,
4704                path: "creator/album/folder.jpg".to_owned(),
4705                owner_id: "root".to_owned(),
4706            }],
4707        };
4708
4709        let outcome = run_with_albums(
4710            &plan,
4711            &mut manifest,
4712            &mut albums,
4713            &[],
4714            &ScriptedHttp::new(),
4715            &fs,
4716            &StubFfmpeg::flac(),
4717            &RecordingClock::new(),
4718            &ExecOptions::default(),
4719        );
4720
4721        assert_eq!(outcome.artifacts_deleted, 1);
4722        assert!(!fs.exists("creator/album/folder.jpg"));
4723        // The album row had only the one kind, so it is pruned entirely.
4724        assert!(!albums.contains_key("root"));
4725    }
4726
4727    // ── Phase 9: playlist artifacts ─────────────────────────────────
4728
4729    #[test]
4730    fn playlist_write_uses_inline_content_and_records_state() {
4731        // A playlist body is generated, carried inline. With an empty manifest
4732        // and NO http routes, the write still succeeds — proving it skipped the
4733        // network — and records the playlist store keyed by the playlist id.
4734        let mut manifest = Manifest::new();
4735        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4736        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
4737        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
4738        let plan = Plan {
4739            actions: vec![Action::WriteArtifact {
4740                kind: ArtifactKind::Playlist,
4741                path: "Road Trip.m3u8".to_owned(),
4742                source_url: String::new(),
4743                hash: "ph1".to_owned(),
4744                owner_id: "pl1".to_owned(),
4745                content: Some(body.to_owned()),
4746            }],
4747        };
4748        let fs = MemFs::new();
4749
4750        let outcome = run_full(
4751            &plan,
4752            &mut manifest,
4753            &mut albums,
4754            &mut playlists,
4755            &[],
4756            &ScriptedHttp::new(),
4757            &fs,
4758            &StubFfmpeg::flac(),
4759            &RecordingClock::new(),
4760            &ExecOptions::default(),
4761        );
4762
4763        assert_eq!(outcome.artifacts_written, 1);
4764        assert_eq!(outcome.failed(), 0);
4765        // The exact inline bytes were written, verbatim.
4766        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
4767        assert_eq!(
4768            playlists.get("pl1"),
4769            Some(&PlaylistState {
4770                name: "Road Trip".to_owned(),
4771                path: "Road Trip.m3u8".to_owned(),
4772                hash: "ph1".to_owned(),
4773            })
4774        );
4775    }
4776
4777    #[test]
4778    fn playlist_delete_removes_file_and_clears_state() {
4779        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
4780        let mut manifest = Manifest::new();
4781        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4782        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
4783        playlists.insert(
4784            "pl1".to_owned(),
4785            PlaylistState {
4786                name: "Old".to_owned(),
4787                path: "Old.m3u8".to_owned(),
4788                hash: "ph1".to_owned(),
4789            },
4790        );
4791        let plan = Plan {
4792            actions: vec![Action::DeleteArtifact {
4793                kind: ArtifactKind::Playlist,
4794                path: "Old.m3u8".to_owned(),
4795                owner_id: "pl1".to_owned(),
4796            }],
4797        };
4798
4799        let outcome = run_full(
4800            &plan,
4801            &mut manifest,
4802            &mut albums,
4803            &mut playlists,
4804            &[],
4805            &ScriptedHttp::new(),
4806            &fs,
4807            &StubFfmpeg::flac(),
4808            &RecordingClock::new(),
4809            &ExecOptions::default(),
4810        );
4811
4812        assert_eq!(outcome.artifacts_deleted, 1);
4813        assert!(!fs.exists("Old.m3u8"));
4814        assert!(
4815            !playlists.contains_key("pl1"),
4816            "the playlist row is cleared on delete"
4817        );
4818    }
4819
4820    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
4821
4822    #[test]
4823    fn rename_move_relocates_cover_and_prunes_old_album() {
4824        // A title/album change moves the audio (Rename) and re-emits the cover
4825        // at the NEW path. The old cover must be removed and the now-empty old
4826        // album directory pruned, leaving no orphan sidecar and no ghost dir.
4827        let mut manifest = Manifest::new();
4828        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
4829        e.cover_jpg = Some(ArtifactState {
4830            path: "Creator/AlbumA/cover.jpg".to_owned(),
4831            hash: "h1".to_owned(),
4832        });
4833        manifest.insert("a", e);
4834        let fs = MemFs::new()
4835            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
4836            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
4837        let plan = Plan {
4838            actions: vec![
4839                Action::Rename {
4840                    from: "Creator/AlbumA/song.flac".to_owned(),
4841                    to: "Creator/AlbumB/song.flac".to_owned(),
4842                },
4843                Action::WriteArtifact {
4844                    kind: ArtifactKind::CoverJpg,
4845                    path: "Creator/AlbumB/cover.jpg".to_owned(),
4846                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4847                    hash: "h1".to_owned(),
4848                    owner_id: "a".to_owned(),
4849                    content: None,
4850                },
4851            ],
4852        };
4853        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
4854
4855        let outcome = run(
4856            &plan,
4857            &mut manifest,
4858            &[],
4859            &http,
4860            &fs,
4861            &StubFfmpeg::flac(),
4862            &RecordingClock::new(),
4863            &ExecOptions::default(),
4864        );
4865
4866        assert_eq!(outcome.failed(), 0);
4867        // Audio moved, the new cover was written, the old cover removed.
4868        assert!(fs.exists("Creator/AlbumB/song.flac"));
4869        assert_eq!(
4870            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
4871            b"new-jpg"
4872        );
4873        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
4874        assert!(!fs.exists("Creator/AlbumA/song.flac"));
4875        // The manifest cover slot now points at the new path.
4876        assert_eq!(
4877            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4878            "Creator/AlbumB/cover.jpg"
4879        );
4880        // The emptied old album directory is pruned; the new one survives.
4881        assert!(!fs.has_dir("Creator/AlbumA"));
4882        assert!(fs.has_dir("Creator/AlbumB"));
4883    }
4884
4885    #[test]
4886    fn rename_move_relocates_folder_art_and_prunes_old_album() {
4887        // An album rename moves folder.jpg: the old file is removed, the album
4888        // store slot advanced to the new path, and the emptied dir pruned.
4889        let mut manifest = Manifest::new();
4890        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4891        albums.insert(
4892            "root".to_owned(),
4893            AlbumArt {
4894                folder_jpg: Some(ArtifactState {
4895                    path: "Creator/AlbumA/folder.jpg".to_owned(),
4896                    hash: "jh".to_owned(),
4897                }),
4898                folder_webp: None,
4899            },
4900        );
4901        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
4902        let plan = Plan {
4903            actions: vec![Action::WriteArtifact {
4904                kind: ArtifactKind::FolderJpg,
4905                path: "Creator/AlbumB/folder.jpg".to_owned(),
4906                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
4907                hash: "jh".to_owned(),
4908                owner_id: "root".to_owned(),
4909                content: None,
4910            }],
4911        };
4912        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
4913
4914        let outcome = run_with_albums(
4915            &plan,
4916            &mut manifest,
4917            &mut albums,
4918            &[],
4919            &http,
4920            &fs,
4921            &StubFfmpeg::flac(),
4922            &RecordingClock::new(),
4923            &ExecOptions::default(),
4924        );
4925
4926        assert_eq!(outcome.failed(), 0);
4927        assert_eq!(
4928            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
4929            b"new-folder"
4930        );
4931        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
4932        assert_eq!(
4933            albums
4934                .get("root")
4935                .unwrap()
4936                .folder_jpg
4937                .as_ref()
4938                .unwrap()
4939                .path,
4940            "Creator/AlbumB/folder.jpg"
4941        );
4942        assert!(!fs.has_dir("Creator/AlbumA"));
4943        assert!(fs.has_dir("Creator/AlbumB"));
4944    }
4945
4946    #[test]
4947    fn prune_empty_dirs_removes_only_empty_dirs() {
4948        // A direct exercise of the prune port's safety guarantees on a mixed
4949        // tree: nested empties go, anything holding a file (hidden ones too)
4950        // stays, and no file is touched.
4951        let fs = MemFs::new()
4952            .with_file("keep/full/song.flac", b"x".to_vec())
4953            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
4954            .with_dir("empty/leaf")
4955            .with_dir("nested/a/b/c");
4956
4957        fs.prune_empty_dirs("").unwrap();
4958
4959        // Every empty directory, however deeply nested, is pruned bottom-up.
4960        for gone in [
4961            "empty",
4962            "empty/leaf",
4963            "nested",
4964            "nested/a",
4965            "nested/a/b",
4966            "nested/a/b/c",
4967        ] {
4968            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
4969        }
4970        // A directory holding any file — including only a hidden dotfile — stays.
4971        assert!(fs.has_dir("keep"));
4972        assert!(fs.has_dir("keep/full"));
4973        assert!(fs.has_dir("hidden"));
4974        // No file was touched.
4975        assert!(fs.exists("keep/full/song.flac"));
4976        assert!(fs.exists("hidden/.suno-manifest.json"));
4977    }
4978
4979    #[test]
4980    fn prune_empty_dirs_never_removes_the_named_root() {
4981        // Pruning under a named root clears its empty children but keeps the
4982        // root itself, even when the root is now empty.
4983        let fs = MemFs::new().with_dir("empty/leaf");
4984        fs.prune_empty_dirs("empty").unwrap();
4985        assert!(fs.has_dir("empty"), "the named root is never removed");
4986        assert!(!fs.has_dir("empty/leaf"));
4987    }
4988
4989    #[test]
4990    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
4991        // If removing the old sidecar fails, the write is a per-clip failure
4992        // that never aborts the run and does NOT advance the state slot, so the
4993        // next identical run re-attempts the cleanup and the tree converges.
4994        let mut manifest = Manifest::new();
4995        let mut e = entry("a.flac", AudioFormat::Flac);
4996        e.cover_jpg = Some(ArtifactState {
4997            path: "AlbumA/cover.jpg".to_owned(),
4998            hash: "h1".to_owned(),
4999        });
5000        manifest.insert("a", e);
5001        let fs = MemFs::new()
5002            .with_file("a.flac", b"AUDIO".to_vec())
5003            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5004        let plan = Plan {
5005            actions: vec![Action::WriteArtifact {
5006                kind: ArtifactKind::CoverJpg,
5007                path: "AlbumB/cover.jpg".to_owned(),
5008                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5009                hash: "h1".to_owned(),
5010                owner_id: "a".to_owned(),
5011                content: None,
5012            }],
5013        };
5014        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5015
5016        // Run 1: the old-cover remove is forced to fail.
5017        fs.arm_fail_remove("AlbumA/cover.jpg");
5018        let first = run(
5019            &plan,
5020            &mut manifest,
5021            &[],
5022            &http,
5023            &fs,
5024            &StubFfmpeg::flac(),
5025            &RecordingClock::new(),
5026            &ExecOptions::default(),
5027        );
5028        assert_eq!(
5029            first.status,
5030            RunStatus::Completed,
5031            "a remove failure never aborts the run"
5032        );
5033        assert_eq!(first.failed(), 1);
5034        // The new cover is written but the old one lingers and the slot is stale.
5035        assert!(fs.exists("AlbumB/cover.jpg"));
5036        assert!(fs.exists("AlbumA/cover.jpg"));
5037        assert_eq!(
5038            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5039            "AlbumA/cover.jpg"
5040        );
5041        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5042
5043        // Run 2: the same plan re-runs with the fault cleared and converges.
5044        fs.disarm_fail_remove("AlbumA/cover.jpg");
5045        let second = run(
5046            &plan,
5047            &mut manifest,
5048            &[],
5049            &http,
5050            &fs,
5051            &StubFfmpeg::flac(),
5052            &RecordingClock::new(),
5053            &ExecOptions::default(),
5054        );
5055        assert_eq!(second.failed(), 0);
5056        assert!(fs.exists("AlbumB/cover.jpg"));
5057        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5058        assert_eq!(
5059            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5060            "AlbumB/cover.jpg"
5061        );
5062        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5063    }
5064
5065    #[test]
5066    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
5067        // The idempotent case: a content-only cover rewrite (hash drift, path
5068        // unchanged) attempts no remove and prunes no live directory. A remove
5069        // failure is armed on the cover path, so any spurious remove would
5070        // surface as a failure — none does.
5071        let mut manifest = Manifest::new();
5072        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
5073        e.cover_jpg = Some(ArtifactState {
5074            path: "Album/cover.jpg".to_owned(),
5075            hash: "h1".to_owned(),
5076        });
5077        manifest.insert("a", e);
5078        let fs = MemFs::new()
5079            .with_file("Album/a.mp3", b"AUDIO".to_vec())
5080            .with_file("Album/cover.jpg", b"old".to_vec());
5081        fs.arm_fail_remove("Album/cover.jpg");
5082        let plan = Plan {
5083            actions: vec![Action::WriteArtifact {
5084                kind: ArtifactKind::CoverJpg,
5085                path: "Album/cover.jpg".to_owned(),
5086                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5087                hash: "h2".to_owned(),
5088                owner_id: "a".to_owned(),
5089                content: None,
5090            }],
5091        };
5092        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5093
5094        let outcome = run(
5095            &plan,
5096            &mut manifest,
5097            &[],
5098            &http,
5099            &fs,
5100            &StubFfmpeg::flac(),
5101            &RecordingClock::new(),
5102            &ExecOptions::default(),
5103        );
5104
5105        assert_eq!(
5106            outcome.failed(),
5107            0,
5108            "no remove is attempted, so the armed failure never fires"
5109        );
5110        assert_eq!(outcome.artifacts_written, 1);
5111        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
5112        assert_eq!(
5113            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
5114            "h2"
5115        );
5116        // The live directory is untouched by prune.
5117        assert!(fs.has_dir("Album"));
5118    }
5119
5120    // ── Concurrency (issue #22) ─────────────────────────────────────
5121
5122    mod concurrency {
5123        use super::*;
5124        use crate::ffmpeg::FfmpegError;
5125        use crate::fs::{FileStat, FsError};
5126        use crate::http::{HttpRequest, TransportError};
5127        use std::future::Future;
5128        use std::pin::Pin;
5129        use std::sync::Arc;
5130        use std::sync::atomic::{AtomicUsize, Ordering};
5131        use std::task::{Context, Poll};
5132
5133        /// A future that pends exactly once before resolving, waking itself so a
5134        /// single-threaded executor re-polls. It forces the [`Http`] port to
5135        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
5136        /// each in-flight request and the true overlap becomes observable.
5137        #[derive(Default)]
5138        struct YieldOnce {
5139            yielded: bool,
5140        }
5141
5142        impl Future for YieldOnce {
5143            type Output = ();
5144            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
5145                if self.yielded {
5146                    Poll::Ready(())
5147                } else {
5148                    self.yielded = true;
5149                    cx.waker().wake_by_ref();
5150                    Poll::Pending
5151                }
5152            }
5153        }
5154
5155        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
5156        /// number of concurrently in-flight requests. Each `send` bumps a live
5157        /// counter, yields once (so peers can start), then delegates.
5158        struct GatedHttp {
5159            inner: ScriptedHttp,
5160            inflight: Arc<AtomicUsize>,
5161            peak: Arc<AtomicUsize>,
5162        }
5163
5164        impl GatedHttp {
5165            fn new(inner: ScriptedHttp) -> Self {
5166                Self {
5167                    inner,
5168                    inflight: Arc::new(AtomicUsize::new(0)),
5169                    peak: Arc::new(AtomicUsize::new(0)),
5170                }
5171            }
5172
5173            fn peak(&self) -> usize {
5174                self.peak.load(Ordering::SeqCst)
5175            }
5176        }
5177
5178        impl Http for GatedHttp {
5179            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
5180                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
5181                self.peak.fetch_max(now, Ordering::SeqCst);
5182                YieldOnce::default().await;
5183                let out = self.inner.send(request).await;
5184                self.inflight.fetch_sub(1, Ordering::SeqCst);
5185                out
5186            }
5187        }
5188
5189        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
5190            let c = clip(id);
5191            let d = desired(c.clone(), format);
5192            let action = Action::Download {
5193                clip: c.clone(),
5194                lineage: LineageContext::own_root(&c),
5195                path: d.path.clone(),
5196                format,
5197            };
5198            (c, d, action)
5199        }
5200
5201        fn opts_with(concurrency: u32) -> ExecOptions {
5202            ExecOptions {
5203                concurrency,
5204                ..small_poll()
5205            }
5206        }
5207
5208        #[test]
5209        fn concurrency_never_exceeds_the_configured_bound() {
5210            let count = 6;
5211            let concurrency = 3;
5212            let mut scripted = ScriptedHttp::new().with_auth();
5213            let mut actions = Vec::new();
5214            let mut desireds = Vec::new();
5215            for i in 0..count {
5216                let id = format!("c{i}");
5217                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5218                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5219                actions.push(action);
5220                desireds.push(d);
5221            }
5222            let http = GatedHttp::new(scripted);
5223            let fs = MemFs::new();
5224            let plan = Plan { actions };
5225            let mut manifest = Manifest::new();
5226
5227            let outcome = run_gated_fs(
5228                &plan,
5229                &mut manifest,
5230                &desireds,
5231                &http,
5232                &fs,
5233                &opts_with(concurrency),
5234            );
5235
5236            assert_eq!(outcome.downloaded, count);
5237            assert!(
5238                http.peak() <= concurrency as usize,
5239                "peak {} exceeded the bound {concurrency}",
5240                http.peak()
5241            );
5242            assert_eq!(
5243                http.peak(),
5244                concurrency as usize,
5245                "expected the run to saturate the bound"
5246            );
5247        }
5248
5249        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
5250        /// outcome. The client is built here so the limiter can be inspected by
5251        /// the caller-facing variant below.
5252        fn run_gated_fs(
5253            plan: &Plan,
5254            manifest: &mut Manifest,
5255            desired: &[Desired],
5256            http: &GatedHttp,
5257            fs: &MemFs,
5258            opts: &ExecOptions,
5259        ) -> ExecOutcome {
5260            let ffmpeg = StubFfmpeg::flac();
5261            let clock = RecordingClock::new();
5262            let mut albums = BTreeMap::new();
5263            let mut playlists = BTreeMap::new();
5264            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5265            pollster::block_on(execute(
5266                plan,
5267                manifest,
5268                &mut albums,
5269                &mut playlists,
5270                desired,
5271                &HashMap::new(),
5272                Ports {
5273                    client: &mut client,
5274                    http,
5275                    fs,
5276                    ffmpeg: &ffmpeg,
5277                    clock: &clock,
5278                },
5279                opts,
5280            ))
5281        }
5282
5283        #[test]
5284        fn a_failing_clip_does_not_abort_the_others() {
5285            let mut scripted = ScriptedHttp::new().with_auth();
5286            scripted = scripted
5287                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
5288                .route("bad.mp3", Reply::status(404))
5289                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
5290            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
5291            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
5292            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
5293            let http = GatedHttp::new(scripted);
5294            let fs = MemFs::new();
5295            let plan = Plan {
5296                actions: vec![a1, a2, a3],
5297            };
5298            let mut manifest = Manifest::new();
5299
5300            let outcome = run_gated_fs(
5301                &plan,
5302                &mut manifest,
5303                &[d1, d2, d3],
5304                &http,
5305                &fs,
5306                &opts_with(3),
5307            );
5308
5309            assert_eq!(outcome.downloaded, 2);
5310            assert_eq!(outcome.failed(), 1);
5311            assert_eq!(outcome.status, RunStatus::Completed);
5312            assert_eq!(outcome.failures[0].clip_id, "bad");
5313            assert!(manifest.get("ok1").is_some());
5314            assert!(manifest.get("ok2").is_some());
5315            assert!(manifest.get("bad").is_none());
5316        }
5317
5318        #[test]
5319        fn outcome_is_identical_across_concurrency_levels() {
5320            // A plan mixing successful and failing downloads with serial phase-2
5321            // actions (a skip and a delete), so both phases contribute.
5322            fn build() -> (Plan, Vec<Desired>) {
5323                let mut actions = Vec::new();
5324                let mut desireds = Vec::new();
5325                for id in ["a", "b", "c", "d"] {
5326                    let (_c, d, action) = download(id, AudioFormat::Mp3);
5327                    actions.push(action);
5328                    desireds.push(d);
5329                }
5330                // A failing download in the middle of the audio set.
5331                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
5332                actions.insert(2, ae);
5333                desireds.push(de);
5334                // Phase-2 actions.
5335                actions.push(Action::Skip {
5336                    clip_id: "gone".to_owned(),
5337                });
5338                actions.push(Action::Delete {
5339                    path: "old.mp3".to_owned(),
5340                    clip_id: "old".to_owned(),
5341                });
5342                (Plan { actions }, desireds)
5343            }
5344
5345            fn http() -> ScriptedHttp {
5346                ScriptedHttp::new()
5347                    .with_auth()
5348                    .route("a.mp3", Reply::ok(b"a".to_vec()))
5349                    .route("b.mp3", Reply::ok(b"b".to_vec()))
5350                    .route("c.mp3", Reply::ok(b"c".to_vec()))
5351                    .route("d.mp3", Reply::ok(b"d".to_vec()))
5352                    .route("fail.mp3", Reply::status(404))
5353            }
5354
5355            fn seed_manifest() -> Manifest {
5356                let mut m = Manifest::new();
5357                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
5358                m
5359            }
5360
5361            let (plan, desireds) = build();
5362
5363            let mut m1 = seed_manifest();
5364            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
5365            let out1 = run_gated_fs(
5366                &plan,
5367                &mut m1,
5368                &desireds,
5369                &GatedHttp::new(http()),
5370                &fs1,
5371                &opts_with(1),
5372            );
5373
5374            let mut m8 = seed_manifest();
5375            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
5376            let out8 = run_gated_fs(
5377                &plan,
5378                &mut m8,
5379                &desireds,
5380                &GatedHttp::new(http()),
5381                &fs8,
5382                &opts_with(8),
5383            );
5384
5385            assert_eq!(out1, out8, "outcome must not depend on concurrency");
5386            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
5387            assert_eq!(out8.downloaded, 4);
5388            assert_eq!(out8.deleted, 1);
5389            assert_eq!(out8.skipped, 1);
5390            assert_eq!(out8.failed(), 1);
5391        }
5392
5393        #[test]
5394        fn a_systemic_disk_full_aborts_promptly() {
5395            let count = 8;
5396            let concurrency = 2;
5397            let mut scripted = ScriptedHttp::new().with_auth();
5398            let mut actions = Vec::new();
5399            let mut desireds = Vec::new();
5400            for i in 0..count {
5401                let id = format!("d{i}");
5402                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5403                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5404                actions.push(action);
5405                desireds.push(d);
5406            }
5407            // The very first clip's write hits ENOSPC, a systemic failure.
5408            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
5409            let http = GatedHttp::new(scripted);
5410            let plan = Plan { actions };
5411            let mut manifest = Manifest::new();
5412
5413            let outcome = run_gated_fs(
5414                &plan,
5415                &mut manifest,
5416                &desireds,
5417                &http,
5418                &fs,
5419                &opts_with(concurrency),
5420            );
5421
5422            assert_eq!(outcome.status, RunStatus::DiskFull);
5423            assert!(
5424                outcome.downloaded < count,
5425                "a systemic abort must stop remaining work, downloaded {}",
5426                outcome.downloaded
5427            );
5428        }
5429
5430        #[test]
5431        fn limiter_records_a_rate_limit_under_concurrent_calls() {
5432            // Three concurrent FLAC renders; exactly one clip is throttled once
5433            // on its wav_file read. The shared limiter must record that single
5434            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
5435            // the mutex keeps the AIMD state correct under concurrency.
5436            let scripted = ScriptedHttp::new()
5437                .with_auth()
5438                .route_seq(
5439                    "/gen/x/wav_file/",
5440                    vec![
5441                        Reply::status(429),
5442                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
5443                    ],
5444                )
5445                .route(
5446                    "/gen/y/wav_file/",
5447                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
5448                )
5449                .route(
5450                    "/gen/z/wav_file/",
5451                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
5452                )
5453                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
5454                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
5455                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
5456
5457            let mut actions = Vec::new();
5458            let mut desireds = Vec::new();
5459            for id in ["x", "y", "z"] {
5460                let (_c, d, action) = download(id, AudioFormat::Flac);
5461                actions.push(action);
5462                desireds.push(d);
5463            }
5464            let plan = Plan { actions };
5465            let fs = MemFs::new();
5466            let ffmpeg = StubFfmpeg::flac();
5467            let clock = RecordingClock::new();
5468            let mut albums = BTreeMap::new();
5469            let mut playlists = BTreeMap::new();
5470            let mut manifest = Manifest::new();
5471            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5472
5473            let outcome = pollster::block_on(execute(
5474                &plan,
5475                &mut manifest,
5476                &mut albums,
5477                &mut playlists,
5478                &desireds,
5479                &HashMap::new(),
5480                Ports {
5481                    client: &mut client,
5482                    http: &scripted,
5483                    fs: &fs,
5484                    ffmpeg: &ffmpeg,
5485                    clock: &clock,
5486                },
5487                &opts_with(3),
5488            ));
5489
5490            assert_eq!(outcome.downloaded, 3);
5491            assert_eq!(outcome.failed(), 0);
5492            assert!(
5493                (client.limiter_rate() - 1.0).abs() < 1e-9,
5494                "one 429 must halve the rate to 1.0, got {}",
5495                client.limiter_rate()
5496            );
5497        }
5498
5499        #[test]
5500        fn a_download_is_committed_in_plan_order_around_a_rename() {
5501            // Plan order: rename "orig" away from shared.mp3 first, then download
5502            // a new clip into shared.mp3. A parallel executor that performed the
5503            // download's destination write off plan order would write shared.mp3
5504            // before the rename ran, letting the rename carry those fresh bytes
5505            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
5506            // Committing every destination effect serially in plan order keeps
5507            // moved.mp3 = the original and shared.mp3 = the new download.
5508            let c_new = clip("new");
5509            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
5510            d_new.path = "shared.mp3".to_owned();
5511            let plan = Plan {
5512                actions: vec![
5513                    Action::Rename {
5514                        from: "shared.mp3".to_owned(),
5515                        to: "moved.mp3".to_owned(),
5516                    },
5517                    Action::Download {
5518                        clip: c_new.clone(),
5519                        lineage: LineageContext::own_root(&c_new),
5520                        path: "shared.mp3".to_owned(),
5521                        format: AudioFormat::Mp3,
5522                    },
5523                ],
5524            };
5525            let scripted = ScriptedHttp::new()
5526                .with_auth()
5527                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
5528            let http = GatedHttp::new(scripted);
5529            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
5530            let mut manifest = Manifest::new();
5531            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
5532
5533            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
5534
5535            assert_eq!(outcome.renamed, 1);
5536            assert_eq!(outcome.downloaded, 1);
5537            assert_eq!(
5538                fs.read_file("moved.mp3").as_deref(),
5539                Some(&b"ORIGINAL"[..]),
5540                "the rename must carry the original bytes, untouched by the download"
5541            );
5542            let landed = fs.read_file("shared.mp3").expect("new download must land");
5543            assert_ne!(
5544                landed, b"ORIGINAL",
5545                "the new download must replace the moved original, not corrupt it"
5546            );
5547            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
5548            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
5549        }
5550
5551        #[test]
5552        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
5553            // A systemic disk-full abort strikes the download committed before the
5554            // reformat. Because the reformat's slow render is side-effect-free and
5555            // its destination write + old-file removal only happen in the serial
5556            // commit (which the abort skips), the old file survives and the
5557            // manifest still points at it: no removed-but-referenced file.
5558            let boom = clip("boom");
5559            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
5560            d_boom.path = "boom.mp3".to_owned();
5561            let reformer = clip("r");
5562            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
5563            let plan = Plan {
5564                actions: vec![
5565                    Action::Download {
5566                        clip: boom.clone(),
5567                        lineage: LineageContext::own_root(&boom),
5568                        path: "boom.mp3".to_owned(),
5569                        format: AudioFormat::Mp3,
5570                    },
5571                    Action::Reformat {
5572                        clip: reformer.clone(),
5573                        path: "r_new.mp3".to_owned(),
5574                        from_path: "r_old.flac".to_owned(),
5575                        from: AudioFormat::Flac,
5576                        to: AudioFormat::Mp3,
5577                    },
5578                ],
5579            };
5580            let scripted = ScriptedHttp::new()
5581                .with_auth()
5582                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
5583                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
5584            let http = GatedHttp::new(scripted);
5585            // The download's write hits ENOSPC, a systemic abort.
5586            let fs = MemFs::new()
5587                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
5588                .fail_write_out_of_space("boom.mp3");
5589            let mut manifest = Manifest::new();
5590            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
5591
5592            let outcome = run_gated_fs(
5593                &plan,
5594                &mut manifest,
5595                &[d_boom, d_reformer],
5596                &http,
5597                &fs,
5598                &opts_with(4),
5599            );
5600
5601            assert_eq!(outcome.status, RunStatus::DiskFull);
5602            assert!(
5603                fs.exists("r_old.flac"),
5604                "the old file must survive the abort"
5605            );
5606            assert!(
5607                !fs.exists("r_new.mp3"),
5608                "no reformatted file may be written"
5609            );
5610            let still = manifest.get("r").expect("the manifest must still track r");
5611            assert_eq!(
5612                still.path, "r_old.flac",
5613                "the manifest must still point at the surviving old file"
5614            );
5615            assert_eq!(still.format, AudioFormat::Flac);
5616        }
5617
5618        #[test]
5619        fn a_systemic_abort_leaves_no_untracked_destination_files() {
5620            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
5621            // and the rest never commit. Every file remaining on disk must be one
5622            // the manifest tracks: producers write nothing, so an abort cannot
5623            // strand an untracked file from an in-flight or buffered render.
5624            let mut scripted = ScriptedHttp::new().with_auth();
5625            let mut actions = Vec::new();
5626            let mut desireds = Vec::new();
5627            for id in ["a0", "a1", "boom", "a3", "a4"] {
5628                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
5629                let (_c, d, action) = download(id, AudioFormat::Mp3);
5630                actions.push(action);
5631                desireds.push(d);
5632            }
5633            let http = GatedHttp::new(scripted);
5634            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
5635            let plan = Plan { actions };
5636            let mut manifest = Manifest::new();
5637
5638            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
5639
5640            assert_eq!(outcome.status, RunStatus::DiskFull);
5641            let tracked: std::collections::BTreeSet<String> = manifest
5642                .entries
5643                .values()
5644                .map(|entry| entry.path.clone())
5645                .collect();
5646            for path in fs.paths() {
5647                assert!(
5648                    tracked.contains(&path),
5649                    "found an untracked destination file: {path}"
5650                );
5651            }
5652            assert!(
5653                !fs.exists("a3.mp3"),
5654                "uncommitted renders must not be on disk"
5655            );
5656            assert!(
5657                !fs.exists("a4.mp3"),
5658                "uncommitted renders must not be on disk"
5659            );
5660        }
5661
5662        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
5663        /// live: it bumps a shared counter (tracking the peak) when a transcode
5664        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
5665        /// The [transcode, write] window is a superset of the true in-memory hold,
5666        /// so the observed peak upper-bounds the real one.
5667        struct CountingFfmpeg {
5668            inner: StubFfmpeg,
5669            held: Arc<AtomicUsize>,
5670            peak: Arc<AtomicUsize>,
5671        }
5672
5673        impl Ffmpeg for CountingFfmpeg {
5674            fn wav_to_flac(
5675                &self,
5676                wav: &[u8],
5677            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
5678                let fut = self.inner.wav_to_flac(wav);
5679                let held = self.held.clone();
5680                let peak = self.peak.clone();
5681                async move {
5682                    let out = fut.await;
5683                    if out.is_ok() {
5684                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
5685                        peak.fetch_max(now, Ordering::SeqCst);
5686                    }
5687                    out
5688                }
5689            }
5690
5691            fn mp4_to_webp(
5692                &self,
5693                mp4: &[u8],
5694                settings: WebpEncodeSettings,
5695            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
5696                self.inner.mp4_to_webp(mp4, settings)
5697            }
5698        }
5699
5700        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
5701        /// payload counter on each committing write, closing the window opened by
5702        /// [`CountingFfmpeg`].
5703        struct CountingFs {
5704            inner: MemFs,
5705            held: Arc<AtomicUsize>,
5706        }
5707
5708        impl Filesystem for CountingFs {
5709            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
5710                let out = self.inner.write_atomic(path, bytes);
5711                self.held.fetch_sub(1, Ordering::SeqCst);
5712                out
5713            }
5714
5715            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
5716                self.inner.rename(from, to)
5717            }
5718
5719            fn remove(&self, path: &str) -> Result<(), FsError> {
5720                self.inner.remove(path)
5721            }
5722
5723            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
5724                self.inner.prune_empty_dirs(root)
5725            }
5726
5727            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
5728                self.inner.read(path)
5729            }
5730
5731            fn metadata(&self, path: &str) -> Option<FileStat> {
5732                self.inner.metadata(path)
5733            }
5734        }
5735
5736        #[test]
5737        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
5738            // Far more FLAC clips than the concurrency bound. The ordered buffered
5739            // render keeps at most about `concurrency` transcoded payloads live at
5740            // once (never the whole library), so peak held <= concurrency + 1.
5741            let count = 12;
5742            let concurrency = 3;
5743            let mut scripted = ScriptedHttp::new().with_auth();
5744            let mut actions = Vec::new();
5745            let mut desireds = Vec::new();
5746            for i in 0..count {
5747                let id = format!("f{i}");
5748                scripted = scripted
5749                    .route(
5750                        &format!("/gen/{id}/wav_file/"),
5751                        Reply::json(&format!(
5752                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
5753                        )),
5754                    )
5755                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
5756                let (_c, d, action) = download(&id, AudioFormat::Flac);
5757                actions.push(action);
5758                desireds.push(d);
5759            }
5760            let http = GatedHttp::new(scripted);
5761            let held = Arc::new(AtomicUsize::new(0));
5762            let peak = Arc::new(AtomicUsize::new(0));
5763            let ffmpeg = CountingFfmpeg {
5764                inner: StubFfmpeg::flac(),
5765                held: held.clone(),
5766                peak: peak.clone(),
5767            };
5768            let fs = CountingFs {
5769                inner: MemFs::new(),
5770                held: held.clone(),
5771            };
5772            let clock = RecordingClock::new();
5773            let mut albums = BTreeMap::new();
5774            let mut playlists = BTreeMap::new();
5775            let mut manifest = Manifest::new();
5776            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5777            let plan = Plan { actions };
5778
5779            let outcome = pollster::block_on(execute(
5780                &plan,
5781                &mut manifest,
5782                &mut albums,
5783                &mut playlists,
5784                &desireds,
5785                &HashMap::new(),
5786                Ports {
5787                    client: &mut client,
5788                    http: &http,
5789                    fs: &fs,
5790                    ffmpeg: &ffmpeg,
5791                    clock: &clock,
5792                },
5793                &opts_with(concurrency),
5794            ));
5795
5796            assert_eq!(outcome.downloaded, count as usize);
5797            assert_eq!(
5798                held.load(Ordering::SeqCst),
5799                0,
5800                "every payload must be committed"
5801            );
5802            assert!(
5803                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
5804                "peak live payloads {} exceeded the bound {}",
5805                peak.load(Ordering::SeqCst),
5806                concurrency + 1
5807            );
5808            assert!(
5809                peak.load(Ordering::SeqCst) >= 2,
5810                "the render should genuinely overlap, peak was {}",
5811                peak.load(Ordering::SeqCst)
5812            );
5813        }
5814    }
5815}