Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::time::Duration;
36
37use futures_util::lock::Mutex as AsyncMutex;
38use futures_util::stream::{self, StreamExt};
39
40use crate::backoff::{backoff_delay, retry_after};
41use crate::client::SunoClient;
42use crate::clock::Clock;
43use crate::config::AudioFormat;
44use crate::error::Error;
45use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
46use crate::fs::Filesystem;
47use crate::graph::{AlbumArt, PlaylistState};
48use crate::http::{Http, HttpRequest};
49use crate::lineage::LineageContext;
50use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
51use crate::model::Clip;
52use crate::reconcile::{Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact};
53use crate::tag::{TrackMetadata, tag_flac, tag_mp3};
54
55/// The shared Suno client behind an async mutex, so concurrent audio work can
56/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
57/// without a runtime-specific lock. Held only for the brief WAV-render calls;
58/// the heavy CDN/transcode/tag work runs unlocked.
59type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
60
61/// Tunables for one [`execute`] run.
62#[derive(Debug, Clone)]
63pub struct ExecOptions {
64    /// How many times a transient failure is retried before record-and-skip.
65    pub max_retries: u32,
66    /// How many times to poll for a server-side WAV render before giving up.
67    pub wav_poll_attempts: u32,
68    /// How long to wait between WAV render polls.
69    pub wav_poll_interval: Duration,
70    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
71    /// to at least one, so a zero collapses to sequential rather than stalling.
72    pub concurrency: u32,
73}
74
75impl Default for ExecOptions {
76    fn default() -> Self {
77        Self {
78            max_retries: 3,
79            wav_poll_attempts: 24,
80            wav_poll_interval: Duration::from_secs(5),
81            concurrency: 4,
82        }
83    }
84}
85
86/// How an [`execute`] run ended.
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
88pub enum RunStatus {
89    /// Every action was attempted; some may have failed and been skipped.
90    #[default]
91    Completed,
92    /// An auth failure stopped the run early; remaining actions were not tried.
93    AuthAborted,
94    /// The disk filled; the run stopped early rather than failing every
95    /// remaining clip. Remaining actions were not tried.
96    DiskFull,
97}
98
99/// One action that could not be applied, for the run summary and failure log.
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct Failure {
102    /// The clip the failed action concerned (or a path when no id applies).
103    pub clip_id: String,
104    /// A short, secret-free reason.
105    pub reason: String,
106}
107
108/// The result of applying a [`Plan`]: per-action counts and the failure list.
109#[derive(Debug, Clone, Default, PartialEq, Eq)]
110pub struct ExecOutcome {
111    pub downloaded: usize,
112    pub reformatted: usize,
113    pub retagged: usize,
114    pub renamed: usize,
115    pub deleted: usize,
116    pub skipped: usize,
117    pub artifacts_written: usize,
118    pub artifacts_deleted: usize,
119    /// Actions that failed and were skipped (auth, transient-exhausted, or
120    /// permanent). The run continued past each one unless it was an auth or
121    /// disk-full abort.
122    pub failures: Vec<Failure>,
123    /// How the run ended.
124    pub status: RunStatus,
125}
126
127impl ExecOutcome {
128    /// Number of failed actions.
129    pub fn failed(&self) -> usize {
130        self.failures.len()
131    }
132
133    fn record(&mut self, effect: Effect) {
134        match effect {
135            Effect::Downloaded => self.downloaded += 1,
136            Effect::Reformatted => self.reformatted += 1,
137            Effect::Retagged => self.retagged += 1,
138            Effect::Renamed => self.renamed += 1,
139            Effect::Deleted => self.deleted += 1,
140            Effect::Skipped => self.skipped += 1,
141            Effect::ArtifactWritten => self.artifacts_written += 1,
142            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
143        }
144    }
145}
146
147/// The IO ports the executor drives, grouped so one value threads them through.
148///
149/// `client` is the only `&mut` port: it performs the authenticated WAV render
150/// flow and so mutates its cached session. The rest are shared references.
151pub struct Ports<'a, H, F, G, C> {
152    /// Performs the authenticated WAV render and poll flow.
153    pub client: &'a mut SunoClient<C>,
154    /// The public network port (CDN audio, rendered WAV, cover art).
155    pub http: &'a H,
156    /// The disk port.
157    pub fs: &'a F,
158    /// The transcode port (WAV to FLAC).
159    pub ffmpeg: &'a G,
160    /// The backoff and poll delay port.
161    pub clock: &'a C,
162}
163
164/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
165/// the outcome.
166///
167/// `desired` carries the per-clip metadata and art hashes plus the source modes
168/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
169/// by clip id (and by target path, for renames) so each written entry records
170/// the right hashes and protection. `albums` is the album-art store, keyed by
171/// stable root id: folder-art writes and deletes record their state there rather
172/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
173/// the network, disk, transcode, and backoff ports. A single clip's failure
174/// never aborts the run, except an auth failure or a full disk, which stop it
175/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
176///
177/// The audio-producing actions ([`Download`](Action::Download) and
178/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
179/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
180/// transcode, tag) overlap while the order-sensitive Suno API calls are
181/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
182/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
183/// rename, delete, and artifact writes/deletes) then run serially in plan order.
184///
185/// The outcome is deterministic regardless of completion order: concurrent audio
186/// results are committed to the manifest in plan-index order, so the same plan
187/// always yields the same manifest and counts whatever the concurrency level. A
188/// per-clip failure is recorded and the run continues; only an auth failure or a
189/// full disk aborts, and it does so promptly by stopping further audio work.
190pub async fn execute<H, F, G, C>(
191    plan: &Plan,
192    manifest: &mut Manifest,
193    albums: &mut BTreeMap<String, AlbumArt>,
194    playlists: &mut BTreeMap<String, PlaylistState>,
195    desired: &[Desired],
196    ports: Ports<'_, H, F, G, C>,
197    opts: &ExecOptions,
198) -> ExecOutcome
199where
200    H: Http,
201    F: Filesystem,
202    G: Ffmpeg,
203    C: Clock,
204{
205    let Ports {
206        client,
207        http,
208        fs,
209        ffmpeg,
210        clock,
211    } = ports;
212    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
213    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
214    // Every path this run writes, so the inline old-sidecar cleanup never removes
215    // a file another action produces this run (the non-planned twin of
216    // `suppress_path_aliasing`).
217    let write_targets: BTreeSet<String> = plan
218        .actions
219        .iter()
220        .filter_map(|a| match a {
221            Action::Download { path, .. }
222            | Action::Reformat { path, .. }
223            | Action::WriteArtifact { path, .. } => Some(path.clone()),
224            Action::Rename { to, .. } => Some(to.clone()),
225            _ => None,
226        })
227        .collect();
228    // How many tracked artifact slots reference each path. The inline old-path
229    // cleanup removes a path only once nothing else holds it: each slot that
230    // moves away decrements its reference, and the removal fires only when the
231    // count reaches zero and no action writes the path this run. This keeps a
232    // live file a co-referencing slot still owns (a prior failed swap can leave
233    // two clips sharing a path) while letting the last slot to leave reclaim it,
234    // so nothing is orphaned either (#76).
235    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
236    for (_, entry) in manifest.iter() {
237        for path in entry.artifact_paths() {
238            *tracked_paths.entry(path.to_owned()).or_default() += 1;
239        }
240    }
241    for art in albums.values() {
242        for state in [art.folder_jpg.as_ref(), art.folder_webp.as_ref()]
243            .into_iter()
244            .flatten()
245        {
246            *tracked_paths.entry(state.path.clone()).or_default() += 1;
247        }
248    }
249    for playlist in playlists.values() {
250        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
251    }
252    let ctx = Ctx {
253        http,
254        fs,
255        ffmpeg,
256        clock,
257        opts,
258        by_id: &by_id,
259        by_path: &by_path,
260        write_targets: &write_targets,
261    };
262
263    let mut outcome = ExecOutcome::default();
264
265    // The audio-producing actions ([`Download`](Action::Download) /
266    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
267    // deliberately split so that NO destination write, file removal, or manifest
268    // update happens off the plan's order:
269    //
270    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
271    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
272    //   tag), returning the tagged bytes; and
273    // - a single serial committer below writes those bytes to the destination,
274    //   removes any superseded file, and records the manifest entry, in strict
275    //   plan-index order, interleaved with the non-audio actions.
276    //
277    // The shared client is the only `&mut` port and its API calls must stay
278    // ordered, so it rides behind an async mutex; each producer locks it only for
279    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
280    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
281    // so at most about `concurrency` tagged payloads are ever held in memory -
282    // never the whole library.
283    let client_lock = AsyncMutex::new(client);
284    let concurrency = opts.concurrency.max(1) as usize;
285    let ctx_ref = &ctx;
286    let client_lock_ref = &client_lock;
287    let mut renders = stream::iter(
288        plan.actions
289            .iter()
290            .filter(|action| is_audio_action(action))
291            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
292    )
293    .buffered(concurrency);
294
295    for action in &plan.actions {
296        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
297        // commit them here; every other action applies its own effect. Both the
298        // audio commit and the non-audio apply run serially, so all destination
299        // and manifest effects keep the plan's order exactly as the sequential
300        // executor did.
301        let result = if is_audio_action(action) {
302            match renders.next().await {
303                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
304                Some(Err(fail)) => Err(fail),
305                None => unreachable!("buffered yields one result per audio action"),
306            }
307        } else {
308            ctx.apply(action, manifest, albums, playlists, &mut tracked_paths)
309                .await
310        };
311        match result {
312            Ok(effect) => outcome.record(effect),
313            Err(fail) => {
314                let abort = abort_status(fail.class);
315                outcome.failures.push(Failure {
316                    clip_id: fail.clip_id,
317                    reason: fail.reason,
318                });
319                if let Some(status) = abort {
320                    // A systemic abort stops the run. Dropping the render stream
321                    // cancels any in-flight or completed-but-uncommitted producer;
322                    // because producers touch nothing on disk, the destination and
323                    // manifest are left exactly as the committed prefix wrote them,
324                    // with no untracked files and no removed-but-referenced file.
325                    outcome.status = status;
326                    break;
327                }
328            }
329        }
330    }
331    drop(renders);
332
333    // Renames and deletes can leave an album directory empty; prune those ghost
334    // directories bottom-up. This runs on both the completed and the aborted
335    // paths, and is best-effort: a prune failure is only a missed tidy that the
336    // next run repeats, never a reason to fail the run.
337    let _ = fs.prune_empty_dirs("");
338    outcome
339}
340
341/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
342/// file. Its slow render runs in the concurrent phase; its destination write and
343/// manifest update are committed serially in plan order. Everything else touches
344/// the manifest, album, or playlist stores directly and runs serially.
345fn is_audio_action(action: &Action) -> bool {
346    matches!(action, Action::Download { .. } | Action::Reformat { .. })
347}
348
349/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
350/// committer needs to place them. Produced concurrently and side-effect-free (no
351/// destination write, no removal, no manifest touch); [`commit_audio`] applies
352/// all of those in plan order.
353struct RenderedAudio {
354    clip_id: String,
355    path: String,
356    format: AudioFormat,
357    /// The superseded file to remove after the new one lands (a [`Reformat`]),
358    /// or `None` for a plain [`Download`].
359    from_path: Option<String>,
360    effect: Effect,
361    bytes: Vec<u8>,
362}
363
364/// What an applied action did, for the outcome counters.
365enum Effect {
366    Downloaded,
367    Reformatted,
368    Retagged,
369    Renamed,
370    Deleted,
371    Skipped,
372    ArtifactWritten,
373    ArtifactDeleted,
374}
375
376/// How a failure should be handled (SYNC-17).
377#[derive(Debug, Clone, Copy)]
378enum Class {
379    /// Stop the account run; do not retry.
380    Auth,
381    /// Stop the account run: a full disk is systemic, like auth, so aborting
382    /// beats skipping every remaining clip (each of which would first burn a
383    /// server-side WAV-render budget before failing the same way).
384    Disk,
385    /// Retry a bounded number of times, then record and skip.
386    Transient,
387    /// Record and skip immediately.
388    Permanent,
389}
390
391/// A classified action failure attributed to a clip.
392struct Fail {
393    class: Class,
394    clip_id: String,
395    reason: String,
396}
397
398/// The run-ending status for a failure class, or `None` when the failure is
399/// per-clip and the run continues.
400fn abort_status(class: Class) -> Option<RunStatus> {
401    match class {
402        Class::Auth => Some(RunStatus::AuthAborted),
403        Class::Disk => Some(RunStatus::DiskFull),
404        Class::Transient | Class::Permanent => None,
405    }
406}
407
408fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
409    Fail {
410        class: Class::Auth,
411        clip_id: clip_id.into(),
412        reason: reason.into(),
413    }
414}
415
416fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
417    Fail {
418        class: Class::Transient,
419        clip_id: clip_id.into(),
420        reason: reason.into(),
421    }
422}
423
424fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
425    Fail {
426        class: Class::Permanent,
427        clip_id: clip_id.into(),
428        reason: reason.into(),
429    }
430}
431
432fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
433    Fail {
434        class: Class::Disk,
435        clip_id: clip_id.into(),
436        reason: reason.into(),
437    }
438}
439
440/// Whether an artifact kind is album-scoped folder art (owned by a root id and
441/// recorded on the album store) rather than a per-clip sidecar (recorded on the
442/// manifest).
443fn is_album_kind(kind: ArtifactKind) -> bool {
444    matches!(kind, ArtifactKind::FolderJpg | ArtifactKind::FolderWebp)
445}
446
447/// True for the library-scoped playlist artifact, routed to the playlist store.
448fn is_playlist_kind(kind: ArtifactKind) -> bool {
449    matches!(kind, ArtifactKind::Playlist)
450}
451
452/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
453/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
454/// root/playlist id that is deliberately absent from the manifest.
455fn is_per_clip_kind(kind: ArtifactKind) -> bool {
456    matches!(
457        kind,
458        ArtifactKind::CoverJpg
459            | ArtifactKind::CoverWebp
460            | ArtifactKind::DetailsTxt
461            | ArtifactKind::LyricsTxt
462            | ArtifactKind::Lrc
463            | ArtifactKind::VideoMp4
464    )
465}
466
467/// Recover a playlist's display name from its `.m3u8` path's file stem.
468///
469/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
470/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
471/// this recovered name is a convenience for humans and its lossiness (the
472/// sanitiser is not reversible) never affects a decision.
473fn playlist_name_from_path(path: &str) -> String {
474    std::path::Path::new(path)
475        .file_stem()
476        .map(|stem| stem.to_string_lossy().into_owned())
477        .unwrap_or_default()
478}
479
480/// A classified fetch failure, not yet attributed to a clip.
481struct FetchError {
482    class: Class,
483    reason: String,
484    retry_after: Option<Duration>,
485}
486
487impl FetchError {
488    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
489        Self {
490            class: Class::Transient,
491            reason: reason.into(),
492            retry_after,
493        }
494    }
495
496    fn permanent(reason: impl Into<String>) -> Self {
497        Self {
498            class: Class::Permanent,
499            reason: reason.into(),
500            retry_after: None,
501        }
502    }
503
504    fn attribute(self, clip_id: &str) -> Fail {
505        Fail {
506            class: self.class,
507            clip_id: clip_id.to_owned(),
508            reason: self.reason,
509        }
510    }
511}
512
513/// The shared, read-only context threaded through every action handler.
514struct Ctx<'a, H, F, G, C> {
515    http: &'a H,
516    fs: &'a F,
517    ffmpeg: &'a G,
518    clock: &'a C,
519    opts: &'a ExecOptions,
520    by_id: &'a HashMap<&'a str, &'a Desired>,
521    by_path: &'a HashMap<&'a str, &'a Desired>,
522    /// Every destination path this run writes (audio downloads and reformats,
523    /// artifact writes, and rename targets). The inline old-sidecar cleanup in
524    /// [`write_artifact`](Ctx::write_artifact) skips any path in this set, so a
525    /// path swap between two clips can never delete a file the same run just
526    /// wrote. This mirrors [`suppress_path_aliasing`] for the one removal that
527    /// is not itself a planned action.
528    write_targets: &'a BTreeSet<String>,
529}
530
531impl<H, F, G, C> Ctx<'_, H, F, G, C>
532where
533    H: Http,
534    F: Filesystem,
535    G: Ffmpeg,
536    C: Clock,
537{
538    /// Apply one non-audio action, returning what it did or why it failed.
539    ///
540    /// Audio actions ([`Download`](Action::Download) /
541    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
542    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
543    async fn apply(
544        &self,
545        action: &Action,
546        manifest: &mut Manifest,
547        albums: &mut BTreeMap<String, AlbumArt>,
548        playlists: &mut BTreeMap<String, PlaylistState>,
549        tracked_paths: &mut HashMap<String, u32>,
550    ) -> Result<Effect, Fail> {
551        match action {
552            Action::Download { .. } | Action::Reformat { .. } => {
553                unreachable!("audio actions are applied in the concurrent phase")
554            }
555            Action::Retag {
556                clip,
557                lineage,
558                path,
559            } => self.retag(manifest, clip, lineage, path).await,
560            Action::Rename { from, to } => self.rename(manifest, from, to),
561            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
562            Action::Skip { clip_id } => {
563                self.refresh_preserve(manifest, clip_id);
564                Ok(Effect::Skipped)
565            }
566            Action::WriteArtifact {
567                kind,
568                path,
569                source_url,
570                hash,
571                owner_id,
572                content,
573            } => {
574                self.write_artifact(
575                    manifest,
576                    albums,
577                    playlists,
578                    *kind,
579                    path,
580                    source_url,
581                    hash,
582                    owner_id,
583                    content.as_deref(),
584                    tracked_paths,
585                )
586                .await
587            }
588            Action::DeleteArtifact {
589                kind,
590                path,
591                owner_id,
592            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
593        }
594    }
595
596    /// Render one audio action's tagged bytes, side-effect-free.
597    ///
598    /// This is the concurrent part: it fetches, transcodes, and tags the file
599    /// (through shared ports, plus the client behind `client_lock`), then returns
600    /// the bytes and where they must go. It deliberately writes nothing, removes
601    /// nothing, and never touches `manifest`, so many run at once and an aborted
602    /// run can drop them with no destination or manifest effect. The serial
603    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
604    async fn prepare_audio(
605        &self,
606        client_lock: &ClientLock<'_, C>,
607        action: &Action,
608    ) -> Result<RenderedAudio, Fail> {
609        match action {
610            Action::Download {
611                clip,
612                lineage,
613                path,
614                format,
615            } => {
616                let bytes = self
617                    .produce_audio(client_lock, clip, lineage, *format)
618                    .await?;
619                Ok(RenderedAudio {
620                    clip_id: clip.id.clone(),
621                    path: path.clone(),
622                    format: *format,
623                    from_path: None,
624                    effect: Effect::Downloaded,
625                    bytes,
626                })
627            }
628            Action::Reformat {
629                clip,
630                path,
631                from_path,
632                from: _,
633                to,
634            } => {
635                // A Reformat action carries no lineage, so recover it from the
636                // desired set (the same context that drove naming and the hash),
637                // falling back to a self-rooted context when the clip is not in
638                // the current selection.
639                let lineage = self
640                    .by_id
641                    .get(clip.id.as_str())
642                    .map(|d| d.lineage.clone())
643                    .unwrap_or_else(|| LineageContext::own_root(clip));
644                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
645                Ok(RenderedAudio {
646                    clip_id: clip.id.clone(),
647                    path: path.clone(),
648                    format: *to,
649                    from_path: Some(from_path.clone()),
650                    effect: Effect::Reformatted,
651                    bytes,
652                })
653            }
654            _ => unreachable!("prepare_audio only handles audio actions"),
655        }
656    }
657
658    /// Commit one rendered audio result serially, in plan order.
659    ///
660    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
661    /// the superseded file, then records the manifest entry. Ordering the write
662    /// before the removal keeps a crash from losing both copies; keeping all of
663    /// this off the concurrent phase preserves the sequential executor's plan-order
664    /// guarantee for every destination and manifest effect.
665    fn commit_audio(
666        &self,
667        manifest: &mut Manifest,
668        rendered: RenderedAudio,
669    ) -> Result<Effect, Fail> {
670        let RenderedAudio {
671            clip_id,
672            path,
673            format,
674            from_path,
675            effect,
676            bytes,
677        } = rendered;
678        let size = self.write_verify(&clip_id, &path, &bytes)?;
679        if let Some(from) = from_path {
680            // The new file is safely in place; only now drop the old rendering.
681            self.fs.remove(&from).map_err(|err| {
682                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
683            })?;
684        }
685        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
686        Ok(effect)
687    }
688
689    /// Re-tag the existing file in place to match current metadata and art.
690    async fn retag(
691        &self,
692        manifest: &mut Manifest,
693        clip: &Clip,
694        lineage: &LineageContext,
695        path: &str,
696    ) -> Result<Effect, Fail> {
697        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
698            return Err(permanent_fail(
699                &clip.id,
700                "retag target missing from manifest",
701            ));
702        };
703
704        if format == AudioFormat::Wav {
705            // WAV carries no embedded tags; just record the new hashes so the
706            // next run sees them as current and stops retagging.
707            self.refresh_hashes(manifest, &clip.id, None);
708            return Ok(Effect::Retagged);
709        }
710
711        let meta = TrackMetadata::from_clip(clip, lineage);
712        let cover = self.fetch_cover(clip).await;
713        let existing = self
714            .fs
715            .read(path)
716            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
717        let tagged = match format {
718            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref()),
719            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
720            AudioFormat::Wav => unreachable!("WAV handled above"),
721        }
722        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
723        let size = self.write_verify(&clip.id, path, &tagged)?;
724        self.refresh_hashes(manifest, &clip.id, Some(size));
725        Ok(Effect::Retagged)
726    }
727
728    /// Move the file and update the entry's path (and protection).
729    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
730        let label = self
731            .by_path
732            .get(to)
733            .map(|d| d.clip.id.clone())
734            .unwrap_or_else(|| to.to_owned());
735        self.fs.rename(from, to).map_err(|err| {
736            if err.is_out_of_space() {
737                disk_fail(label, "disk full: no space left to rename")
738            } else {
739                permanent_fail(label, format!("rename failed: {err}"))
740            }
741        })?;
742
743        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
744            manifest
745                .entries
746                .iter()
747                .find(|(_, entry)| entry.path == from)
748                .map(|(id, _)| id.clone())
749        });
750        if let Some(id) = clip_id
751            && let Some(entry) = manifest.entries.get_mut(&id)
752        {
753            entry.path = to.to_owned();
754            if let Some(d) = self.by_path.get(to) {
755                entry.preserve = preserve_for(d);
756            }
757        }
758        Ok(Effect::Renamed)
759    }
760
761    /// Remove the file and drop the manifest entry.
762    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
763        self.fs
764            .remove(path)
765            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
766        manifest.remove(clip_id);
767        Ok(Effect::Deleted)
768    }
769
770    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
771    /// on the owning manifest entry.
772    ///
773    /// The fetch and write share the audio path's resilience: `fetch_bytes`
774    /// retries transient failures and verifies `Content-Length`, and
775    /// `write_verify` confirms the on-disk size. A failure is attributed to the
776    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
777    /// aborts the whole run (only an auth failure or a full disk does, matching
778    /// audio).
779    ///
780    /// The bytes written depend on the kind: a static cover is the fetched image
781    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
782    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
783    ///
784    /// A sidecar is only ever written for a clip whose audio is present: a
785    /// successful `Download`/`Reformat` creates the manifest entry earlier in
786    /// this run, and a prior-run clip already has one. So an absent owning entry
787    /// means the audio failed or never existed this run; we skip (no fetch, no
788    /// write) rather than strand an untracked sidecar with no owning audio.
789    ///
790    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg) /
791    /// [`FolderWebp`](ArtifactKind::FolderWebp)) is album-scoped: its `owner_id`
792    /// is the album's stable root id, not a manifest clip, so it skips the
793    /// manifest presence guard and records its state on the album store instead.
794    ///
795    /// When a title or album change moves the audio, reconcile re-emits this
796    /// write at the NEW path; this handler then removes the sidecar left at the
797    /// artifact's previously tracked path, moving it rather than orphaning it.
798    /// The removal happens only after the new file is safely written, and a
799    /// remove failure returns before the state slot advances, so the next run
800    /// re-plans the identical write and retries — self-healing, never an orphan.
801    #[allow(clippy::too_many_arguments)]
802    async fn write_artifact(
803        &self,
804        manifest: &mut Manifest,
805        albums: &mut BTreeMap<String, AlbumArt>,
806        playlists: &mut BTreeMap<String, PlaylistState>,
807        kind: ArtifactKind,
808        path: &str,
809        source_url: &str,
810        hash: &str,
811        owner_id: &str,
812        content: Option<&str>,
813        tracked_paths: &mut HashMap<String, u32>,
814    ) -> Result<Effect, Fail> {
815        // A per-song sidecar needs its owning clip's manifest entry; album and
816        // playlist kinds are keyed elsewhere and skip this guard.
817        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
818            return Ok(Effect::Skipped);
819        }
820        // Capture the path this artifact was last tracked at, BEFORE the slot is
821        // overwritten below, so a path-changing write (a title/album rename that
822        // moves the audio) can clean up the old sidecar it left behind. Cover
823        // kinds live on the manifest, folder kinds on the album store; playlists
824        // reconcile their own old-path delete and so opt out here.
825        let old_path = match kind {
826            ArtifactKind::CoverJpg => manifest
827                .get(owner_id)
828                .and_then(|e| e.cover_jpg.as_ref())
829                .map(|s| s.path.clone()),
830            ArtifactKind::CoverWebp => manifest
831                .get(owner_id)
832                .and_then(|e| e.cover_webp.as_ref())
833                .map(|s| s.path.clone()),
834            ArtifactKind::DetailsTxt => manifest
835                .get(owner_id)
836                .and_then(|e| e.details_txt.as_ref())
837                .map(|s| s.path.clone()),
838            ArtifactKind::LyricsTxt => manifest
839                .get(owner_id)
840                .and_then(|e| e.lyrics_txt.as_ref())
841                .map(|s| s.path.clone()),
842            ArtifactKind::Lrc => manifest
843                .get(owner_id)
844                .and_then(|e| e.lrc.as_ref())
845                .map(|s| s.path.clone()),
846            ArtifactKind::VideoMp4 => manifest
847                .get(owner_id)
848                .and_then(|e| e.video_mp4.as_ref())
849                .map(|s| s.path.clone()),
850            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp => albums
851                .get(owner_id)
852                .and_then(|a| a.artifact(kind))
853                .map(|s| s.path.clone()),
854            ArtifactKind::Playlist => None,
855        };
856        // A generated artifact (a playlist) carries its body inline and never
857        // touches the network; a fetched one pulls (and transcodes) its source.
858        let bytes = match content {
859            Some(text) => text.as_bytes().to_vec(),
860            None => self.artifact_bytes(kind, source_url, owner_id).await?,
861        };
862        self.write_verify(owner_id, path, &bytes)?;
863        // The new sidecar is safely in place; only now drop a stale copy left at
864        // the previous path (the audio moved). `remove` is idempotent, so an
865        // already-absent old file is fine. On a genuine remove failure we return
866        // BEFORE updating the slot, leaving the manifest/album pointing at the
867        // old path: the next run sees the same path drift, re-plans this write,
868        // and retries the cleanup — convergent, no orphan persists.
869        //
870        // The removal is gated so it can never delete a live file (#76). This
871        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
872        // file is removed only once nothing else holds it — no other tracked slot
873        // still references it (count now zero) and no action writes it this run
874        // (`write_targets`, the non-planned twin of `suppress_path_aliasing`).
875        // On a path swap (A: x -> y while B: y -> x) `write_targets` keeps each
876        // freshly written file; when two slots share a path after a prior failed
877        // swap, the first to move keeps it and the last to leave reclaims it, so
878        // a co-owned file is never deleted and a vacated one is never orphaned.
879        if let Some(old) = old_path.as_deref()
880            && !old.is_empty()
881            && old != path
882        {
883            let still_referenced = tracked_paths
884                .get_mut(old)
885                .map(|count| {
886                    *count = count.saturating_sub(1);
887                    *count > 0
888                })
889                .unwrap_or(false);
890            if !still_referenced && !self.write_targets.contains(old) {
891                self.fs.remove(old).map_err(|err| {
892                    permanent_fail(
893                        owner_id,
894                        format!("could not remove old sidecar {old}: {err}"),
895                    )
896                })?;
897            }
898        }
899        if is_album_kind(kind) {
900            albums.entry(owner_id.to_owned()).or_default().set(
901                kind,
902                Some(ArtifactState {
903                    path: path.to_owned(),
904                    hash: hash.to_owned(),
905                }),
906            );
907        } else if is_playlist_kind(kind) {
908            playlists.insert(
909                owner_id.to_owned(),
910                PlaylistState {
911                    name: playlist_name_from_path(path),
912                    path: path.to_owned(),
913                    hash: hash.to_owned(),
914                },
915            );
916        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
917            set_manifest_artifact(
918                entry,
919                kind,
920                Some(ArtifactState {
921                    path: path.to_owned(),
922                    hash: hash.to_owned(),
923                }),
924            );
925        }
926        Ok(Effect::ArtifactWritten)
927    }
928
929    /// Produce a sidecar's bytes from its source, branching on kind.
930    ///
931    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
932    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
933    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
934    /// ffmpeg port; every other kind is the fetched source verbatim (e.g. the
935    /// static [`CoverJpg`](ArtifactKind::CoverJpg) or album
936    /// [`FolderJpg`](ArtifactKind::FolderJpg) image). A fetch or transcode failure
937    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
938    /// disk-full transcode, which aborts the run like the audio FLAC path.
939    async fn artifact_bytes(
940        &self,
941        kind: ArtifactKind,
942        source_url: &str,
943        owner_id: &str,
944    ) -> Result<Vec<u8>, Fail> {
945        let source = self
946            .fetch_bytes(source_url)
947            .await
948            .map_err(|err| err.attribute(owner_id))?;
949        match kind {
950            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
951                .ffmpeg
952                .mp4_to_webp(&source, WebpEncodeSettings::default())
953                .await
954                .map_err(|err| {
955                    if err.is_out_of_space() {
956                        disk_fail(owner_id, "disk full: no space left to transcode")
957                    } else {
958                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
959                    }
960                }),
961            // The text sidecars are generated and always carry inline content, so
962            // `write_artifact` never reaches this fetch path for them. Guard it so
963            // a future miswiring fails loudly rather than fetching a URL.
964            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
965                permanent_fail(owner_id, "text sidecar requires inline content"),
966            ),
967            ArtifactKind::CoverJpg
968            | ArtifactKind::FolderJpg
969            | ArtifactKind::Playlist
970            | ArtifactKind::VideoMp4 => Ok(source),
971        }
972    }
973
974    /// Remove a sidecar file and clear its slot on the owning manifest entry.
975    ///
976    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
977    /// When the owning entry is already gone (its audio was deleted earlier this
978    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
979    ///
980    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
981    /// the album's root id, not on a manifest clip.
982    ///
983    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
984    /// sidecar removal fails after the audio is already gone, the sidecar lingers
985    /// untracked, but the design stays convergent rather than transactional: the
986    /// next run re-plans the same removal and retries, and any directory it would
987    /// have emptied is pruned once the file finally clears.
988    fn delete_artifact(
989        &self,
990        manifest: &mut Manifest,
991        albums: &mut BTreeMap<String, AlbumArt>,
992        playlists: &mut BTreeMap<String, PlaylistState>,
993        kind: ArtifactKind,
994        path: &str,
995        owner_id: &str,
996    ) -> Result<Effect, Fail> {
997        self.fs
998            .remove(path)
999            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1000        if is_album_kind(kind) {
1001            if let Some(art) = albums.get_mut(owner_id) {
1002                art.set(kind, None);
1003                if art.is_empty() {
1004                    albums.remove(owner_id);
1005                }
1006            }
1007        } else if is_playlist_kind(kind) {
1008            playlists.remove(owner_id);
1009        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1010            set_manifest_artifact(entry, kind, None);
1011        }
1012        Ok(Effect::ArtifactDeleted)
1013    }
1014
1015    /// Download (and transcode/tag) the audio for `clip` in `format`.
1016    async fn produce_audio(
1017        &self,
1018        client_lock: &ClientLock<'_, C>,
1019        clip: &Clip,
1020        lineage: &LineageContext,
1021        format: AudioFormat,
1022    ) -> Result<Vec<u8>, Fail> {
1023        let meta = TrackMetadata::from_clip(clip, lineage);
1024        match format {
1025            AudioFormat::Mp3 => {
1026                let url = clip.mp3_url();
1027                let audio = self
1028                    .fetch_bytes(&url)
1029                    .await
1030                    .map_err(|err| err.attribute(&clip.id))?;
1031                let cover = self.fetch_cover(clip).await;
1032                tag_mp3(&audio, &meta, cover.as_deref())
1033                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1034            }
1035            AudioFormat::Flac => {
1036                let wav = self.fetch_wav(client_lock, clip).await?;
1037                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1038                    if err.is_out_of_space() {
1039                        disk_fail(&clip.id, "disk full: no space left to transcode")
1040                    } else {
1041                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1042                    }
1043                })?;
1044                let cover = self.fetch_cover(clip).await;
1045                tag_flac(&flac, &meta, cover.as_deref())
1046                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1047            }
1048            AudioFormat::Wav => self.fetch_wav(client_lock, clip).await,
1049        }
1050    }
1051
1052    /// Resolve the rendered WAV URL and download it.
1053    async fn fetch_wav(
1054        &self,
1055        client_lock: &ClientLock<'_, C>,
1056        clip: &Clip,
1057    ) -> Result<Vec<u8>, Fail> {
1058        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1059            Some(url) => url,
1060            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1061        };
1062        self.fetch_bytes(&url)
1063            .await
1064            .map_err(|err| err.attribute(&clip.id))
1065    }
1066
1067    /// Read the WAV URL, requesting a render and polling if it is not ready.
1068    ///
1069    /// `None` means the render did not become ready within the poll budget; the
1070    /// caller treats that as a non-fatal transient failure, never a silent skip.
1071    ///
1072    /// Each client call briefly locks `client_lock`; the poll waits happen
1073    /// unlocked, so concurrent clips interleave their WAV renders rather than
1074    /// serialising behind one clip's whole poll budget.
1075    async fn resolve_wav_url(
1076        &self,
1077        client_lock: &ClientLock<'_, C>,
1078        id: &str,
1079    ) -> Result<Option<String>, Fail> {
1080        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1081            return Ok(Some(url));
1082        }
1083        self.request_wav_retrying(client_lock, id).await?;
1084        for _ in 0..self.opts.wav_poll_attempts {
1085            self.clock.sleep(self.opts.wav_poll_interval).await;
1086            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1087                return Ok(Some(url));
1088            }
1089        }
1090        Ok(None)
1091    }
1092
1093    /// Read the rendered WAV URL, retrying transient API failures with backoff
1094    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1095    async fn wav_url_retrying(
1096        &self,
1097        client_lock: &ClientLock<'_, C>,
1098        id: &str,
1099    ) -> Result<Option<String>, Fail> {
1100        let mut attempt: u32 = 0;
1101        loop {
1102            let result = {
1103                let mut client = client_lock.lock().await;
1104                client.wav_url(self.http, id).await
1105            };
1106            match result {
1107                Ok(url) => return Ok(url),
1108                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1109                    Some(fail) => return Err(fail),
1110                    None => continue,
1111                },
1112            }
1113        }
1114    }
1115
1116    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1117    async fn request_wav_retrying(
1118        &self,
1119        client_lock: &ClientLock<'_, C>,
1120        id: &str,
1121    ) -> Result<(), Fail> {
1122        let mut attempt: u32 = 0;
1123        loop {
1124            let result = {
1125                let mut client = client_lock.lock().await;
1126                client.request_wav(self.http, id).await
1127            };
1128            match result {
1129                Ok(()) => return Ok(()),
1130                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1131                    Some(fail) => return Err(fail),
1132                    None => continue,
1133                },
1134            }
1135        }
1136    }
1137
1138    /// Classify a core error from the authenticated WAV flow. On a transient
1139    /// class within budget, back off through the [`Clock`] and return `None` to
1140    /// retry; otherwise return the terminal [`Fail`].
1141    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1142        let fail = classify_core(id, err);
1143        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1144            self.clock.sleep(backoff_delay(*attempt, None)).await;
1145            *attempt += 1;
1146            None
1147        } else {
1148            Some(fail)
1149        }
1150    }
1151
1152    /// GET `url`, retrying transient failures with backoff, verifying size.
1153    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1154        let mut attempt: u32 = 0;
1155        loop {
1156            let result = self.http.send(HttpRequest::get(url)).await;
1157            match classify_response(result) {
1158                Ok(body) => return Ok(body),
1159                Err(err) => {
1160                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1161                        let delay = backoff_delay(attempt, err.retry_after);
1162                        self.clock.sleep(delay).await;
1163                        attempt += 1;
1164                        continue;
1165                    }
1166                    return Err(err);
1167                }
1168            }
1169        }
1170    }
1171
1172    /// Download cover art, trying each candidate URL in order; `None` is fine.
1173    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1174        for url in clip.cover_candidates() {
1175            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1176                && (200..=299).contains(&response.status)
1177                && !response.body.is_empty()
1178            {
1179                return Some(response.body);
1180            }
1181        }
1182        None
1183    }
1184
1185    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1186    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1187        self.fs.write_atomic(path, bytes).map_err(|err| {
1188            if err.is_out_of_space() {
1189                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1190            } else {
1191                permanent_fail(clip_id, format!("write failed: {err}"))
1192            }
1193        })?;
1194        match self.fs.metadata(path) {
1195            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1196            Some(stat) => Err(permanent_fail(
1197                clip_id,
1198                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1199            )),
1200            None => Ok(bytes.len() as u64),
1201        }
1202    }
1203
1204    /// Build the manifest entry for a freshly written file.
1205    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1206        match self.by_id.get(clip_id) {
1207            Some(d) => manifest_entry(d, size),
1208            None => ManifestEntry {
1209                path: path.to_owned(),
1210                format,
1211                size,
1212                ..ManifestEntry::default()
1213            },
1214        }
1215    }
1216
1217    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1218    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1219        let desired = self.by_id.get(clip_id).copied();
1220        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1221            if let Some(d) = desired {
1222                entry.meta_hash = d.meta_hash.clone();
1223                entry.art_hash = d.art_hash.clone();
1224                entry.preserve = preserve_for(d);
1225            }
1226            if let Some(size) = size {
1227                entry.size = size;
1228            }
1229        }
1230    }
1231
1232    /// Refresh only an entry's preserve marker from the current desired state.
1233    ///
1234    /// A clip can gain or lose copy/private protection with no file change, which
1235    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1236    /// persisted marker a faithful image of live protection, so the cross-run
1237    /// delete guard (SYNC-8) never reads it stale.
1238    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1239        if let Some(d) = self.by_id.get(clip_id).copied()
1240            && let Some(entry) = manifest.entries.get_mut(clip_id)
1241        {
1242            entry.preserve = preserve_for(d);
1243        }
1244    }
1245}
1246
1247/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1248fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1249    ManifestEntry {
1250        path: d.path.clone(),
1251        format: d.format,
1252        meta_hash: d.meta_hash.clone(),
1253        art_hash: d.art_hash.clone(),
1254        size,
1255        preserve: preserve_for(d),
1256        ..Default::default()
1257    }
1258}
1259
1260/// Whether a written entry must be preserved across runs: held by any copy
1261/// source, or private. The reconcile delete guard reads this marker later.
1262fn preserve_for(d: &Desired) -> bool {
1263    d.private || d.modes.contains(&SourceMode::Copy)
1264}
1265
1266/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1267fn classify_response(
1268    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1269) -> Result<Vec<u8>, FetchError> {
1270    let response = match result {
1271        Ok(response) => response,
1272        Err(err) => {
1273            return Err(FetchError::transient(
1274                format!("transport error: {err}"),
1275                None,
1276            ));
1277        }
1278    };
1279    match response.status {
1280        200..=299 => {
1281            if let Some(expected) = content_length(&response) {
1282                let actual = response.body.len() as u64;
1283                if actual != expected {
1284                    return Err(FetchError::transient(
1285                        format!("truncated download: {actual} of {expected} bytes"),
1286                        None,
1287                    ));
1288                }
1289            }
1290            Ok(response.body)
1291        }
1292        401 | 403 => Err(FetchError::transient(
1293            format!("download rejected: status {}", response.status),
1294            None,
1295        )),
1296        408 => Err(FetchError::transient("request timed out", None)),
1297        429 => Err(FetchError::transient(
1298            "rate limited",
1299            retry_after(&response),
1300        )),
1301        500..=599 => Err(FetchError::transient(
1302            format!("server error {}", response.status),
1303            None,
1304        )),
1305        status => Err(FetchError::permanent(format!(
1306            "download failed: status {status}"
1307        ))),
1308    }
1309}
1310
1311/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1312fn classify_core(id: &str, err: Error) -> Fail {
1313    let reason = err.to_string();
1314    match err {
1315        Error::Auth(_) => auth_fail(id, reason),
1316        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1317        Error::Api(_) | Error::NotFound(_) | Error::Tag(_) | Error::Config(_) => {
1318            permanent_fail(id, reason)
1319        }
1320    }
1321}
1322
1323/// The provider-reported body size from `Content-Length`, if present and valid.
1324fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1325    response.header("content-length")?.trim().parse().ok()
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330    use super::*;
1331    use crate::ClerkAuth;
1332    use crate::http::HttpResponse;
1333    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1334
1335    fn clip(id: &str) -> Clip {
1336        Clip {
1337            id: id.to_owned(),
1338            title: "Song".to_owned(),
1339            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1340            ..Default::default()
1341        }
1342    }
1343
1344    fn art_clip(id: &str) -> Clip {
1345        Clip {
1346            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1347            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1348            ..clip(id)
1349        }
1350    }
1351
1352    fn ext(format: AudioFormat) -> &'static str {
1353        match format {
1354            AudioFormat::Mp3 => "mp3",
1355            AudioFormat::Flac => "flac",
1356            AudioFormat::Wav => "wav",
1357        }
1358    }
1359
1360    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1361        Desired {
1362            path: format!("{}.{}", clip.id, ext(format)),
1363            lineage: LineageContext::own_root(&clip),
1364            clip,
1365            format,
1366            meta_hash: "m".to_owned(),
1367            art_hash: "art".to_owned(),
1368            modes: vec![SourceMode::Mirror],
1369            trashed: false,
1370            private: false,
1371            artifacts: Vec::new(),
1372        }
1373    }
1374
1375    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1376        ManifestEntry {
1377            path: path.to_owned(),
1378            format,
1379            meta_hash: "old".to_owned(),
1380            art_hash: "old-art".to_owned(),
1381            size: 8,
1382            preserve: false,
1383            ..Default::default()
1384        }
1385    }
1386
1387    #[allow(clippy::too_many_arguments)]
1388    fn run(
1389        plan: &Plan,
1390        manifest: &mut Manifest,
1391        desired: &[Desired],
1392        http: &ScriptedHttp,
1393        fs: &MemFs,
1394        ffmpeg: &StubFfmpeg,
1395        clock: &RecordingClock,
1396        opts: &ExecOptions,
1397    ) -> ExecOutcome {
1398        let mut albums = BTreeMap::new();
1399        run_with_albums(
1400            plan,
1401            manifest,
1402            &mut albums,
1403            desired,
1404            http,
1405            fs,
1406            ffmpeg,
1407            clock,
1408            opts,
1409        )
1410    }
1411
1412    #[allow(clippy::too_many_arguments)]
1413    fn run_with_albums(
1414        plan: &Plan,
1415        manifest: &mut Manifest,
1416        albums: &mut BTreeMap<String, AlbumArt>,
1417        desired: &[Desired],
1418        http: &ScriptedHttp,
1419        fs: &MemFs,
1420        ffmpeg: &StubFfmpeg,
1421        clock: &RecordingClock,
1422        opts: &ExecOptions,
1423    ) -> ExecOutcome {
1424        let mut playlists = BTreeMap::new();
1425        run_full(
1426            plan,
1427            manifest,
1428            albums,
1429            &mut playlists,
1430            desired,
1431            http,
1432            fs,
1433            ffmpeg,
1434            clock,
1435            opts,
1436        )
1437    }
1438
1439    #[allow(clippy::too_many_arguments)]
1440    fn run_full(
1441        plan: &Plan,
1442        manifest: &mut Manifest,
1443        albums: &mut BTreeMap<String, AlbumArt>,
1444        playlists: &mut BTreeMap<String, PlaylistState>,
1445        desired: &[Desired],
1446        http: &ScriptedHttp,
1447        fs: &MemFs,
1448        ffmpeg: &StubFfmpeg,
1449        clock: &RecordingClock,
1450        opts: &ExecOptions,
1451    ) -> ExecOutcome {
1452        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1453        pollster::block_on(execute(
1454            plan,
1455            manifest,
1456            albums,
1457            playlists,
1458            desired,
1459            Ports {
1460                client: &mut client,
1461                http,
1462                fs,
1463                ffmpeg,
1464                clock,
1465            },
1466            opts,
1467        ))
1468    }
1469
1470    fn small_poll() -> ExecOptions {
1471        ExecOptions {
1472            max_retries: 3,
1473            wav_poll_attempts: 2,
1474            wav_poll_interval: Duration::from_secs(5),
1475            concurrency: 4,
1476        }
1477    }
1478
1479    // ── Download: MP3 ───────────────────────────────────────────────
1480
1481    #[test]
1482    fn download_mp3_writes_tagged_file_and_records_manifest() {
1483        let c = art_clip("a");
1484        let d = desired(c.clone(), AudioFormat::Mp3);
1485        let plan = Plan {
1486            actions: vec![Action::Download {
1487                clip: c.clone(),
1488                lineage: LineageContext::own_root(&c),
1489                path: d.path.clone(),
1490                format: AudioFormat::Mp3,
1491            }],
1492        };
1493        let http = ScriptedHttp::new()
1494            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1495            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1496        let fs = MemFs::new();
1497        let ffmpeg = StubFfmpeg::flac();
1498        let clock = RecordingClock::new();
1499        let mut manifest = Manifest::new();
1500
1501        let outcome = run(
1502            &plan,
1503            &mut manifest,
1504            &[d],
1505            &http,
1506            &fs,
1507            &ffmpeg,
1508            &clock,
1509            &ExecOptions::default(),
1510        );
1511
1512        assert_eq!(outcome.downloaded, 1);
1513        assert_eq!(outcome.failed(), 0);
1514        assert_eq!(outcome.status, RunStatus::Completed);
1515        let written = fs.read_file("a.mp3").unwrap();
1516        assert_eq!(&written[..3], b"ID3");
1517        assert!(written.ends_with(b"mp3-body"));
1518        let entry = manifest.get("a").unwrap();
1519        assert_eq!(entry.path, "a.mp3");
1520        assert_eq!(entry.format, AudioFormat::Mp3);
1521        assert_eq!(entry.meta_hash, "m");
1522        assert_eq!(entry.art_hash, "art");
1523        assert_eq!(entry.size, written.len() as u64);
1524        assert!(!entry.preserve);
1525    }
1526
1527    #[test]
1528    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
1529        let mut c = clip("a");
1530        c.audio_url = String::new();
1531        let d = desired(c.clone(), AudioFormat::Mp3);
1532        let plan = Plan {
1533            actions: vec![Action::Download {
1534                clip: c.clone(),
1535                lineage: LineageContext::own_root(&c),
1536                path: d.path.clone(),
1537                format: AudioFormat::Mp3,
1538            }],
1539        };
1540        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
1541        let fs = MemFs::new();
1542        let mut manifest = Manifest::new();
1543        let outcome = run(
1544            &plan,
1545            &mut manifest,
1546            &[d],
1547            &http,
1548            &fs,
1549            &StubFfmpeg::flac(),
1550            &RecordingClock::new(),
1551            &ExecOptions::default(),
1552        );
1553        assert_eq!(outcome.downloaded, 1);
1554        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
1555    }
1556
1557    // ── Download: FLAC render + transcode ───────────────────────────
1558
1559    #[test]
1560    fn download_flac_renders_transcodes_and_records() {
1561        let c = clip("b");
1562        let d = desired(c.clone(), AudioFormat::Flac);
1563        let plan = Plan {
1564            actions: vec![Action::Download {
1565                clip: c.clone(),
1566                lineage: LineageContext::own_root(&c),
1567                path: d.path.clone(),
1568                format: AudioFormat::Flac,
1569            }],
1570        };
1571        let http = ScriptedHttp::new()
1572            .with_auth()
1573            .route(
1574                "/wav_file/",
1575                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
1576            )
1577            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
1578        let fs = MemFs::new();
1579        let clock = RecordingClock::new();
1580        let mut manifest = Manifest::new();
1581
1582        let outcome = run(
1583            &plan,
1584            &mut manifest,
1585            &[d],
1586            &http,
1587            &fs,
1588            &StubFfmpeg::flac(),
1589            &clock,
1590            &ExecOptions::default(),
1591        );
1592
1593        assert_eq!(outcome.downloaded, 1);
1594        assert_eq!(outcome.failed(), 0);
1595        let written = fs.read_file("b.flac").unwrap();
1596        assert_eq!(&written[..4], b"fLaC");
1597        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
1598        // The URL was ready immediately, so no render request and no polling.
1599        assert_eq!(http.count("/convert_wav/"), 0);
1600        assert!(clock.sleeps().is_empty());
1601    }
1602
1603    #[test]
1604    fn download_flac_requests_render_then_polls_until_ready() {
1605        let c = clip("c");
1606        let d = desired(c.clone(), AudioFormat::Flac);
1607        let plan = Plan {
1608            actions: vec![Action::Download {
1609                clip: c.clone(),
1610                lineage: LineageContext::own_root(&c),
1611                path: d.path.clone(),
1612                format: AudioFormat::Flac,
1613            }],
1614        };
1615        let http = ScriptedHttp::new()
1616            .with_auth()
1617            .route_seq(
1618                "/wav_file/",
1619                vec![
1620                    Reply::json("{}"),
1621                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
1622                ],
1623            )
1624            .route("/convert_wav/", Reply::status(200))
1625            .route("c.wav", Reply::ok(b"wav".to_vec()));
1626        let clock = RecordingClock::new();
1627        let mut manifest = Manifest::new();
1628
1629        let outcome = run(
1630            &plan,
1631            &mut manifest,
1632            &[d],
1633            &http,
1634            &fs_new(),
1635            &StubFfmpeg::flac(),
1636            &clock,
1637            &small_poll(),
1638        );
1639
1640        assert_eq!(outcome.downloaded, 1);
1641        assert_eq!(http.count("/convert_wav/"), 1);
1642        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
1643    }
1644
1645    #[test]
1646    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
1647        let c = clip("d");
1648        let d = desired(c.clone(), AudioFormat::Flac);
1649        let plan = Plan {
1650            actions: vec![Action::Download {
1651                clip: c.clone(),
1652                lineage: LineageContext::own_root(&c),
1653                path: d.path.clone(),
1654                format: AudioFormat::Flac,
1655            }],
1656        };
1657        let http = ScriptedHttp::new()
1658            .with_auth()
1659            .route("/wav_file/", Reply::json("{}"))
1660            .route("/convert_wav/", Reply::status(200));
1661        let fs = MemFs::new();
1662        let clock = RecordingClock::new();
1663        let mut manifest = Manifest::new();
1664
1665        let outcome = run(
1666            &plan,
1667            &mut manifest,
1668            &[d],
1669            &http,
1670            &fs,
1671            &StubFfmpeg::flac(),
1672            &clock,
1673            &small_poll(),
1674        );
1675
1676        assert_eq!(outcome.downloaded, 0);
1677        assert_eq!(outcome.failed(), 1);
1678        assert_eq!(outcome.failures[0].clip_id, "d");
1679        assert_eq!(outcome.status, RunStatus::Completed);
1680        assert!(!fs.exists("d.flac"));
1681        assert_eq!(clock.sleeps().len(), 2);
1682    }
1683
1684    #[test]
1685    fn flac_transcode_failure_is_recorded_and_skipped() {
1686        let c = clip("t");
1687        let d = desired(c.clone(), AudioFormat::Flac);
1688        let plan = Plan {
1689            actions: vec![Action::Download {
1690                clip: c.clone(),
1691                lineage: LineageContext::own_root(&c),
1692                path: d.path.clone(),
1693                format: AudioFormat::Flac,
1694            }],
1695        };
1696        let http = ScriptedHttp::new()
1697            .with_auth()
1698            .route(
1699                "/wav_file/",
1700                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
1701            )
1702            .route("t.wav", Reply::ok(b"wav".to_vec()));
1703        let fs = MemFs::new();
1704        let mut manifest = Manifest::new();
1705
1706        let outcome = run(
1707            &plan,
1708            &mut manifest,
1709            &[d],
1710            &http,
1711            &fs,
1712            &StubFfmpeg::failing(),
1713            &RecordingClock::new(),
1714            &ExecOptions::default(),
1715        );
1716
1717        assert_eq!(outcome.downloaded, 0);
1718        assert_eq!(outcome.failed(), 1);
1719        assert!(!fs.exists("t.flac"));
1720        assert!(manifest.get("t").is_none());
1721    }
1722
1723    // ── Cover fallback ──────────────────────────────────────────────
1724
1725    #[test]
1726    fn cover_falls_back_when_large_image_is_missing() {
1727        let c = art_clip("e");
1728        let d = desired(c.clone(), AudioFormat::Mp3);
1729        let plan = Plan {
1730            actions: vec![Action::Download {
1731                clip: c.clone(),
1732                lineage: LineageContext::own_root(&c),
1733                path: d.path.clone(),
1734                format: AudioFormat::Mp3,
1735            }],
1736        };
1737        let http = ScriptedHttp::new()
1738            .route("e.mp3", Reply::ok(b"body".to_vec()))
1739            .route("e/large.jpg", Reply::status(404))
1740            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
1741        let fs = MemFs::new();
1742        let mut manifest = Manifest::new();
1743
1744        let outcome = run(
1745            &plan,
1746            &mut manifest,
1747            &[d],
1748            &http,
1749            &fs,
1750            &StubFfmpeg::flac(),
1751            &RecordingClock::new(),
1752            &ExecOptions::default(),
1753        );
1754
1755        assert_eq!(outcome.downloaded, 1);
1756        let calls = http.calls();
1757        let large = calls
1758            .iter()
1759            .position(|u| u.contains("e/large.jpg"))
1760            .unwrap();
1761        let small = calls
1762            .iter()
1763            .position(|u| u.contains("e/small.jpg"))
1764            .unwrap();
1765        assert!(large < small, "large art tried before small");
1766    }
1767
1768    // ── Atomic write and size verification (SYNC-13/14) ─────────────
1769
1770    #[test]
1771    fn failed_write_leaves_the_prior_file_intact() {
1772        let c = clip("f");
1773        let d = desired(c.clone(), AudioFormat::Mp3);
1774        let plan = Plan {
1775            actions: vec![Action::Download {
1776                clip: c.clone(),
1777                lineage: LineageContext::own_root(&c),
1778                path: d.path.clone(),
1779                format: AudioFormat::Mp3,
1780            }],
1781        };
1782        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
1783        let fs = MemFs::new()
1784            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
1785            .fail_write("f.mp3");
1786        let mut manifest = Manifest::new();
1787
1788        let outcome = run(
1789            &plan,
1790            &mut manifest,
1791            &[d],
1792            &http,
1793            &fs,
1794            &StubFfmpeg::flac(),
1795            &RecordingClock::new(),
1796            &ExecOptions::default(),
1797        );
1798
1799        assert_eq!(outcome.downloaded, 0);
1800        assert_eq!(outcome.failed(), 1);
1801        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
1802        assert!(manifest.get("f").is_none());
1803    }
1804
1805    #[test]
1806    fn size_mismatch_after_write_is_a_failure() {
1807        let c = clip("g");
1808        let d = desired(c.clone(), AudioFormat::Mp3);
1809        let plan = Plan {
1810            actions: vec![Action::Download {
1811                clip: c.clone(),
1812                lineage: LineageContext::own_root(&c),
1813                path: d.path.clone(),
1814                format: AudioFormat::Mp3,
1815            }],
1816        };
1817        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
1818        let fs = MemFs::new().corrupt_write("g.mp3");
1819        let mut manifest = Manifest::new();
1820
1821        let outcome = run(
1822            &plan,
1823            &mut manifest,
1824            &[d],
1825            &http,
1826            &fs,
1827            &StubFfmpeg::flac(),
1828            &RecordingClock::new(),
1829            &ExecOptions::default(),
1830        );
1831
1832        assert_eq!(outcome.downloaded, 0);
1833        assert_eq!(outcome.failed(), 1);
1834        assert!(outcome.failures[0].reason.contains("expected"));
1835        assert!(manifest.get("g").is_none());
1836    }
1837
1838    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
1839
1840    #[test]
1841    fn transient_failure_is_retried_then_skipped() {
1842        let c = clip("h");
1843        let d = desired(c.clone(), AudioFormat::Mp3);
1844        let plan = Plan {
1845            actions: vec![Action::Download {
1846                clip: c.clone(),
1847                lineage: LineageContext::own_root(&c),
1848                path: d.path.clone(),
1849                format: AudioFormat::Mp3,
1850            }],
1851        };
1852        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
1853        let fs = MemFs::new();
1854        let clock = RecordingClock::new();
1855        let opts = ExecOptions {
1856            max_retries: 2,
1857            ..ExecOptions::default()
1858        };
1859        let mut manifest = Manifest::new();
1860
1861        let outcome = run(
1862            &plan,
1863            &mut manifest,
1864            &[d],
1865            &http,
1866            &fs,
1867            &StubFfmpeg::flac(),
1868            &clock,
1869            &opts,
1870        );
1871
1872        assert_eq!(outcome.downloaded, 0);
1873        assert_eq!(outcome.failed(), 1);
1874        assert_eq!(http.count("h.mp3"), 3);
1875        assert_eq!(clock.sleeps().len(), 2);
1876    }
1877
1878    #[test]
1879    fn truncated_download_is_retried_then_succeeds() {
1880        let c = clip("i");
1881        let d = desired(c.clone(), AudioFormat::Mp3);
1882        let plan = Plan {
1883            actions: vec![Action::Download {
1884                clip: c.clone(),
1885                lineage: LineageContext::own_root(&c),
1886                path: d.path.clone(),
1887                format: AudioFormat::Mp3,
1888            }],
1889        };
1890        let http = ScriptedHttp::new().route_seq(
1891            "i.mp3",
1892            vec![
1893                Reply::ok(b"short".to_vec()).with_content_length(999),
1894                Reply::ok(b"good-body".to_vec()),
1895            ],
1896        );
1897        let fs = MemFs::new();
1898        let clock = RecordingClock::new();
1899        let mut manifest = Manifest::new();
1900
1901        let outcome = run(
1902            &plan,
1903            &mut manifest,
1904            &[d],
1905            &http,
1906            &fs,
1907            &StubFfmpeg::flac(),
1908            &clock,
1909            &ExecOptions::default(),
1910        );
1911
1912        assert_eq!(outcome.downloaded, 1);
1913        assert_eq!(http.count("i.mp3"), 2);
1914        assert_eq!(clock.sleeps().len(), 1);
1915    }
1916
1917    #[test]
1918    fn rate_limit_backs_off_using_retry_after() {
1919        let c = clip("j");
1920        let d = desired(c.clone(), AudioFormat::Mp3);
1921        let plan = Plan {
1922            actions: vec![Action::Download {
1923                clip: c.clone(),
1924                lineage: LineageContext::own_root(&c),
1925                path: d.path.clone(),
1926                format: AudioFormat::Mp3,
1927            }],
1928        };
1929        let http = ScriptedHttp::new().route_seq(
1930            "j.mp3",
1931            vec![
1932                Reply::status(429).with_retry_after(7),
1933                Reply::ok(b"body".to_vec()),
1934            ],
1935        );
1936        let fs = MemFs::new();
1937        let clock = RecordingClock::new();
1938        let mut manifest = Manifest::new();
1939
1940        let outcome = run(
1941            &plan,
1942            &mut manifest,
1943            &[d],
1944            &http,
1945            &fs,
1946            &StubFfmpeg::flac(),
1947            &clock,
1948            &ExecOptions::default(),
1949        );
1950
1951        assert_eq!(outcome.downloaded, 1);
1952        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
1953    }
1954
1955    #[test]
1956    fn auth_failure_aborts_the_run() {
1957        let c1 = clip("k1");
1958        let c2 = clip("k2");
1959        let d1 = desired(c1.clone(), AudioFormat::Flac);
1960        let d2 = desired(c2.clone(), AudioFormat::Flac);
1961        let plan = Plan {
1962            actions: vec![
1963                Action::Download {
1964                    clip: c1.clone(),
1965                    lineage: LineageContext::own_root(&c1),
1966                    path: d1.path.clone(),
1967                    format: AudioFormat::Flac,
1968                },
1969                Action::Download {
1970                    clip: c2.clone(),
1971                    lineage: LineageContext::own_root(&c2),
1972                    path: d2.path.clone(),
1973                    format: AudioFormat::Flac,
1974                },
1975            ],
1976        };
1977        // The authenticated WAV-render endpoint rejects auth even after a JWT
1978        // refresh: that is a bad token, so the whole run aborts rather than
1979        // hammering every clip. A CDN media rejection, by contrast, does not.
1980        let http = ScriptedHttp::new()
1981            .with_auth()
1982            .route("/wav_file/", Reply::status(401));
1983        let fs = MemFs::new();
1984        let mut manifest = Manifest::new();
1985
1986        let outcome = run(
1987            &plan,
1988            &mut manifest,
1989            &[d1, d2],
1990            &http,
1991            &fs,
1992            &StubFfmpeg::flac(),
1993            &RecordingClock::new(),
1994            &small_poll(),
1995        );
1996
1997        assert_eq!(outcome.status, RunStatus::AuthAborted);
1998        assert_eq!(outcome.failed(), 1);
1999        assert_eq!(outcome.failures[0].clip_id, "k1");
2000        assert_eq!(outcome.downloaded, 0);
2001    }
2002
2003    // ── Disk-full aborts the run (issue #17) ────────────────────────
2004
2005    #[test]
2006    fn disk_full_primary_write_aborts_the_run() {
2007        // Two MP3 downloads; the first write is out of space. That is systemic,
2008        // so the run aborts before the second is even attempted: exactly one
2009        // failure is recorded and its reason names the disk-full cause.
2010        let c1 = clip("d1");
2011        let c2 = clip("d2");
2012        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2013        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2014        let plan = Plan {
2015            actions: vec![
2016                Action::Download {
2017                    clip: c1.clone(),
2018                    lineage: LineageContext::own_root(&c1),
2019                    path: d1.path.clone(),
2020                    format: AudioFormat::Mp3,
2021                },
2022                Action::Download {
2023                    clip: c2.clone(),
2024                    lineage: LineageContext::own_root(&c2),
2025                    path: d2.path.clone(),
2026                    format: AudioFormat::Mp3,
2027                },
2028            ],
2029        };
2030        let http = ScriptedHttp::new()
2031            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2032            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2033        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2034        let mut manifest = Manifest::new();
2035
2036        let outcome = run(
2037            &plan,
2038            &mut manifest,
2039            &[d1, d2],
2040            &http,
2041            &fs,
2042            &StubFfmpeg::flac(),
2043            &RecordingClock::new(),
2044            &ExecOptions::default(),
2045        );
2046
2047        assert_eq!(outcome.status, RunStatus::DiskFull);
2048        assert_eq!(outcome.failed(), 1);
2049        assert_eq!(outcome.failures[0].clip_id, "d1");
2050        assert!(outcome.failures[0].reason.contains("disk full"));
2051        assert_eq!(outcome.downloaded, 0);
2052        // The second clip was never fetched: the run aborted first.
2053        assert_eq!(http.count("d2.mp3"), 0);
2054        assert!(!fs.exists("d2.mp3"));
2055    }
2056
2057    #[test]
2058    fn disk_full_flac_transcode_aborts_the_run() {
2059        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2060        // there is nowhere to stage the transcode, so the run aborts.
2061        let c1 = clip("d1");
2062        let c2 = clip("d2");
2063        let d1 = desired(c1.clone(), AudioFormat::Flac);
2064        let d2 = desired(c2.clone(), AudioFormat::Flac);
2065        let plan = Plan {
2066            actions: vec![
2067                Action::Download {
2068                    clip: c1.clone(),
2069                    lineage: LineageContext::own_root(&c1),
2070                    path: d1.path.clone(),
2071                    format: AudioFormat::Flac,
2072                },
2073                Action::Download {
2074                    clip: c2.clone(),
2075                    lineage: LineageContext::own_root(&c2),
2076                    path: d2.path.clone(),
2077                    format: AudioFormat::Flac,
2078                },
2079            ],
2080        };
2081        let http = ScriptedHttp::new()
2082            .with_auth()
2083            .route(
2084                "/wav_file/",
2085                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2086            )
2087            .route(".wav", Reply::ok(b"wav".to_vec()));
2088        let fs = MemFs::new();
2089        let mut manifest = Manifest::new();
2090
2091        let outcome = run(
2092            &plan,
2093            &mut manifest,
2094            &[d1, d2],
2095            &http,
2096            &fs,
2097            &StubFfmpeg::out_of_space(),
2098            &RecordingClock::new(),
2099            &ExecOptions::default(),
2100        );
2101
2102        assert_eq!(outcome.status, RunStatus::DiskFull);
2103        assert_eq!(outcome.failed(), 1);
2104        assert_eq!(outcome.failures[0].clip_id, "d1");
2105        assert!(outcome.failures[0].reason.contains("disk full"));
2106        assert_eq!(outcome.downloaded, 0);
2107    }
2108
2109    #[test]
2110    fn disk_full_artifact_write_aborts_the_run() {
2111        // A sidecar write (not a primary download) also aborts on a full disk:
2112        // the owning audio is present, the cover fetch succeeds, but the sidecar
2113        // cannot be written.
2114        let mut manifest = Manifest::new();
2115        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2116        let plan = Plan {
2117            actions: vec![Action::WriteArtifact {
2118                kind: ArtifactKind::CoverJpg,
2119                path: "a/cover.jpg".to_owned(),
2120                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2121                hash: "h1".to_owned(),
2122                owner_id: "a".to_owned(),
2123                content: None,
2124            }],
2125        };
2126        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2127        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
2128
2129        let outcome = run(
2130            &plan,
2131            &mut manifest,
2132            &[],
2133            &http,
2134            &fs,
2135            &StubFfmpeg::flac(),
2136            &RecordingClock::new(),
2137            &ExecOptions::default(),
2138        );
2139
2140        assert_eq!(outcome.status, RunStatus::DiskFull);
2141        assert_eq!(outcome.failed(), 1);
2142        assert!(outcome.failures[0].reason.contains("disk full"));
2143        assert_eq!(outcome.artifacts_written, 0);
2144        // The sidecar slot was never recorded: the write failed before it.
2145        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
2146    }
2147
2148    #[test]
2149    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
2150        // write_verify fails before any manifest insert, so a re-download that
2151        // hits a full disk leaves the prior entry (and file) exactly as it was.
2152        let c = clip("m");
2153        let d = desired(c.clone(), AudioFormat::Mp3);
2154        let plan = Plan {
2155            actions: vec![Action::Download {
2156                clip: c.clone(),
2157                lineage: LineageContext::own_root(&c),
2158                path: d.path.clone(),
2159                format: AudioFormat::Mp3,
2160            }],
2161        };
2162        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
2163        let fs = MemFs::new()
2164            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
2165            .fail_write_out_of_space("m.mp3");
2166        let mut manifest = Manifest::new();
2167        let before = entry("m.mp3", AudioFormat::Mp3);
2168        manifest.insert("m", before.clone());
2169
2170        let outcome = run(
2171            &plan,
2172            &mut manifest,
2173            &[d],
2174            &http,
2175            &fs,
2176            &StubFfmpeg::flac(),
2177            &RecordingClock::new(),
2178            &ExecOptions::default(),
2179        );
2180
2181        assert_eq!(outcome.status, RunStatus::DiskFull);
2182        assert_eq!(manifest.get("m"), Some(&before));
2183        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
2184    }
2185
2186    #[test]
2187    fn cdn_download_rejection_skips_the_clip_without_aborting() {
2188        let c1 = clip("k1");
2189        let c2 = clip("k2");
2190        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2191        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2192        let plan = Plan {
2193            actions: vec![
2194                Action::Download {
2195                    clip: c1.clone(),
2196                    lineage: LineageContext::own_root(&c1),
2197                    path: d1.path.clone(),
2198                    format: AudioFormat::Mp3,
2199                },
2200                Action::Download {
2201                    clip: c2.clone(),
2202                    lineage: LineageContext::own_root(&c2),
2203                    path: d2.path.clone(),
2204                    format: AudioFormat::Mp3,
2205                },
2206            ],
2207        };
2208        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
2209        // rejection (often transient), not a bad token: the clip is retried
2210        // then recorded and skipped, and the run carries on to the rest.
2211        let http = ScriptedHttp::new()
2212            .route("k1.mp3", Reply::status(403))
2213            .route("k2.mp3", Reply::ok(b"body".to_vec()));
2214        let fs = MemFs::new();
2215        let mut manifest = Manifest::new();
2216
2217        let outcome = run(
2218            &plan,
2219            &mut manifest,
2220            &[d1, d2],
2221            &http,
2222            &fs,
2223            &StubFfmpeg::flac(),
2224            &RecordingClock::new(),
2225            &ExecOptions::default(),
2226        );
2227
2228        assert_ne!(outcome.status, RunStatus::AuthAborted);
2229        assert_eq!(outcome.downloaded, 1);
2230        assert_eq!(outcome.failed(), 1);
2231        assert_eq!(outcome.failures[0].clip_id, "k1");
2232    }
2233
2234    #[test]
2235    fn one_clip_failure_does_not_abort_the_run() {
2236        let c1 = clip("l1");
2237        let c2 = clip("l2");
2238        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2239        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2240        let plan = Plan {
2241            actions: vec![
2242                Action::Download {
2243                    clip: c1.clone(),
2244                    lineage: LineageContext::own_root(&c1),
2245                    path: d1.path.clone(),
2246                    format: AudioFormat::Mp3,
2247                },
2248                Action::Download {
2249                    clip: c2.clone(),
2250                    lineage: LineageContext::own_root(&c2),
2251                    path: d2.path.clone(),
2252                    format: AudioFormat::Mp3,
2253                },
2254            ],
2255        };
2256        let http = ScriptedHttp::new()
2257            .route("l1.mp3", Reply::status(404))
2258            .route("l2.mp3", Reply::ok(b"body".to_vec()));
2259        let fs = MemFs::new();
2260        let mut manifest = Manifest::new();
2261
2262        let outcome = run(
2263            &plan,
2264            &mut manifest,
2265            &[d1, d2],
2266            &http,
2267            &fs,
2268            &StubFfmpeg::flac(),
2269            &RecordingClock::new(),
2270            &ExecOptions::default(),
2271        );
2272
2273        assert_eq!(outcome.status, RunStatus::Completed);
2274        assert_eq!(outcome.downloaded, 1);
2275        assert_eq!(outcome.failed(), 1);
2276        assert_eq!(outcome.failures[0].clip_id, "l1");
2277        assert!(fs.exists("l2.mp3"));
2278        assert!(manifest.get("l2").is_some());
2279        assert!(manifest.get("l1").is_none());
2280    }
2281
2282    // ── preserve marker (SYNC-8) ────────────────────────────────────
2283
2284    #[test]
2285    fn preserve_is_set_for_copy_held_and_private_clips() {
2286        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
2287        mirror.modes = vec![SourceMode::Mirror];
2288        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
2289        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
2290        let mut private = desired(clip("m3"), AudioFormat::Mp3);
2291        private.private = true;
2292
2293        let plan = Plan {
2294            actions: vec![
2295                Action::Download {
2296                    clip: mirror.clip.clone(),
2297                    lineage: LineageContext::own_root(&mirror.clip),
2298                    path: mirror.path.clone(),
2299                    format: AudioFormat::Mp3,
2300                },
2301                Action::Download {
2302                    clip: copy_held.clip.clone(),
2303                    lineage: LineageContext::own_root(&copy_held.clip),
2304                    path: copy_held.path.clone(),
2305                    format: AudioFormat::Mp3,
2306                },
2307                Action::Download {
2308                    clip: private.clip.clone(),
2309                    lineage: LineageContext::own_root(&private.clip),
2310                    path: private.path.clone(),
2311                    format: AudioFormat::Mp3,
2312                },
2313            ],
2314        };
2315        let http = ScriptedHttp::new()
2316            .route("m1.mp3", Reply::ok(b"a".to_vec()))
2317            .route("m2.mp3", Reply::ok(b"b".to_vec()))
2318            .route("m3.mp3", Reply::ok(b"c".to_vec()));
2319        let fs = MemFs::new();
2320        let mut manifest = Manifest::new();
2321
2322        let outcome = run(
2323            &plan,
2324            &mut manifest,
2325            &[mirror, copy_held, private],
2326            &http,
2327            &fs,
2328            &StubFfmpeg::flac(),
2329            &RecordingClock::new(),
2330            &ExecOptions::default(),
2331        );
2332
2333        assert_eq!(outcome.downloaded, 3);
2334        assert!(!manifest.get("m1").unwrap().preserve);
2335        assert!(manifest.get("m2").unwrap().preserve);
2336        assert!(manifest.get("m3").unwrap().preserve);
2337    }
2338
2339    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
2340
2341    #[test]
2342    fn reformat_writes_new_format_and_removes_old_file() {
2343        let c = clip("n");
2344        let d = desired(c.clone(), AudioFormat::Mp3);
2345        let plan = Plan {
2346            actions: vec![Action::Reformat {
2347                clip: c.clone(),
2348                path: "n.mp3".to_owned(),
2349                from_path: "n.flac".to_owned(),
2350                from: AudioFormat::Flac,
2351                to: AudioFormat::Mp3,
2352            }],
2353        };
2354        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
2355        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
2356        let mut manifest = Manifest::new();
2357        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
2358
2359        let outcome = run(
2360            &plan,
2361            &mut manifest,
2362            &[d],
2363            &http,
2364            &fs,
2365            &StubFfmpeg::flac(),
2366            &RecordingClock::new(),
2367            &ExecOptions::default(),
2368        );
2369
2370        assert_eq!(outcome.reformatted, 1);
2371        assert!(fs.exists("n.mp3"));
2372        assert!(!fs.exists("n.flac"));
2373        let updated = manifest.get("n").unwrap();
2374        assert_eq!(updated.path, "n.mp3");
2375        assert_eq!(updated.format, AudioFormat::Mp3);
2376        assert_eq!(updated.meta_hash, "m");
2377    }
2378
2379    #[test]
2380    fn retag_rewrites_file_and_updates_hashes() {
2381        let c = clip("o");
2382        let mut d = desired(c.clone(), AudioFormat::Mp3);
2383        d.meta_hash = "new".to_owned();
2384        d.art_hash = "new-art".to_owned();
2385        let existing = tag_mp3(
2386            b"audio",
2387            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
2388            None,
2389        )
2390        .unwrap();
2391        let fs = MemFs::new().with_file("o.mp3", existing.clone());
2392        let mut manifest = Manifest::new();
2393        let mut start = entry("o.mp3", AudioFormat::Mp3);
2394        start.size = existing.len() as u64;
2395        manifest.insert("o", start);
2396        let plan = Plan {
2397            actions: vec![Action::Retag {
2398                clip: c.clone(),
2399                lineage: LineageContext::own_root(&c),
2400                path: "o.mp3".to_owned(),
2401            }],
2402        };
2403
2404        let outcome = run(
2405            &plan,
2406            &mut manifest,
2407            &[d],
2408            &ScriptedHttp::new(),
2409            &fs,
2410            &StubFfmpeg::flac(),
2411            &RecordingClock::new(),
2412            &ExecOptions::default(),
2413        );
2414
2415        assert_eq!(outcome.retagged, 1);
2416        let updated = manifest.get("o").unwrap();
2417        assert_eq!(updated.meta_hash, "new");
2418        assert_eq!(updated.art_hash, "new-art");
2419        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
2420    }
2421
2422    #[test]
2423    fn rename_moves_file_and_updates_manifest_path() {
2424        let c = clip("p");
2425        let mut d = desired(c.clone(), AudioFormat::Mp3);
2426        d.path = "new/p.mp3".to_owned();
2427        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
2428        let mut manifest = Manifest::new();
2429        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
2430        let plan = Plan {
2431            actions: vec![Action::Rename {
2432                from: "old/p.mp3".to_owned(),
2433                to: "new/p.mp3".to_owned(),
2434            }],
2435        };
2436
2437        let outcome = run(
2438            &plan,
2439            &mut manifest,
2440            &[d],
2441            &ScriptedHttp::new(),
2442            &fs,
2443            &StubFfmpeg::flac(),
2444            &RecordingClock::new(),
2445            &ExecOptions::default(),
2446        );
2447
2448        assert_eq!(outcome.renamed, 1);
2449        assert!(fs.exists("new/p.mp3"));
2450        assert!(!fs.exists("old/p.mp3"));
2451        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
2452    }
2453
2454    #[test]
2455    fn disk_full_rename_aborts_the_run() {
2456        // A move onto a full disk is systemic like a full-disk write: the run
2457        // aborts with DiskFull and the source file is left untouched.
2458        let c = clip("p");
2459        let mut d = desired(c.clone(), AudioFormat::Mp3);
2460        d.path = "new/p.mp3".to_owned();
2461        let fs = MemFs::new()
2462            .with_file("old/p.mp3", b"DATA".to_vec())
2463            .fail_rename_out_of_space("new/p.mp3");
2464        let mut manifest = Manifest::new();
2465        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
2466        let plan = Plan {
2467            actions: vec![Action::Rename {
2468                from: "old/p.mp3".to_owned(),
2469                to: "new/p.mp3".to_owned(),
2470            }],
2471        };
2472
2473        let outcome = run(
2474            &plan,
2475            &mut manifest,
2476            &[d],
2477            &ScriptedHttp::new(),
2478            &fs,
2479            &StubFfmpeg::flac(),
2480            &RecordingClock::new(),
2481            &ExecOptions::default(),
2482        );
2483
2484        assert_eq!(outcome.status, RunStatus::DiskFull);
2485        assert_eq!(outcome.renamed, 0);
2486        assert_eq!(outcome.failed(), 1);
2487        assert!(outcome.failures[0].reason.contains("disk full"));
2488        // The source is untouched: the move never happened.
2489        assert!(fs.exists("old/p.mp3"));
2490        assert!(!fs.exists("new/p.mp3"));
2491        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
2492    }
2493
2494    #[test]
2495    fn delete_removes_file_and_manifest_entry() {
2496        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
2497        let mut manifest = Manifest::new();
2498        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
2499        let plan = Plan {
2500            actions: vec![Action::Delete {
2501                path: "q.mp3".to_owned(),
2502                clip_id: "q".to_owned(),
2503            }],
2504        };
2505
2506        let outcome = run(
2507            &plan,
2508            &mut manifest,
2509            &[],
2510            &ScriptedHttp::new(),
2511            &fs,
2512            &StubFfmpeg::flac(),
2513            &RecordingClock::new(),
2514            &ExecOptions::default(),
2515        );
2516
2517        assert_eq!(outcome.deleted, 1);
2518        assert!(!fs.exists("q.mp3"));
2519        assert!(manifest.get("q").is_none());
2520    }
2521
2522    #[test]
2523    fn failed_delete_keeps_the_manifest_entry() {
2524        let fs = MemFs::new()
2525            .with_file("s.mp3", b"DATA".to_vec())
2526            .fail_remove("s.mp3");
2527        let mut manifest = Manifest::new();
2528        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
2529        let plan = Plan {
2530            actions: vec![Action::Delete {
2531                path: "s.mp3".to_owned(),
2532                clip_id: "s".to_owned(),
2533            }],
2534        };
2535
2536        let outcome = run(
2537            &plan,
2538            &mut manifest,
2539            &[],
2540            &ScriptedHttp::new(),
2541            &fs,
2542            &StubFfmpeg::flac(),
2543            &RecordingClock::new(),
2544            &ExecOptions::default(),
2545        );
2546
2547        assert_eq!(outcome.deleted, 0);
2548        assert_eq!(outcome.failed(), 1);
2549        assert!(manifest.get("s").is_some());
2550        assert!(fs.exists("s.mp3"));
2551    }
2552
2553    #[test]
2554    fn skip_is_a_noop() {
2555        let mut manifest = Manifest::new();
2556        let plan = Plan {
2557            actions: vec![Action::Skip {
2558                clip_id: "r".to_owned(),
2559            }],
2560        };
2561        let outcome = run(
2562            &plan,
2563            &mut manifest,
2564            &[],
2565            &ScriptedHttp::new(),
2566            &MemFs::new(),
2567            &StubFfmpeg::flac(),
2568            &RecordingClock::new(),
2569            &ExecOptions::default(),
2570        );
2571        assert_eq!(outcome.skipped, 1);
2572        assert_eq!(outcome.failed(), 0);
2573    }
2574
2575    // ── Pure helpers ────────────────────────────────────────────────
2576
2577    #[test]
2578    fn header_helpers_parse_or_ignore() {
2579        let resp = HttpResponse {
2580            status: 200,
2581            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
2582            body: Vec::new(),
2583        };
2584        assert_eq!(content_length(&resp), Some(42));
2585
2586        let bare = HttpResponse {
2587            status: 200,
2588            headers: Vec::new(),
2589            body: Vec::new(),
2590        };
2591        assert_eq!(content_length(&bare), None);
2592    }
2593
2594    #[test]
2595    fn preserve_rule_covers_copy_and_private() {
2596        let base = desired(clip("x"), AudioFormat::Mp3);
2597        assert!(!preserve_for(&base));
2598        let mut copy_held = base.clone();
2599        copy_held.modes = vec![SourceMode::Copy];
2600        assert!(preserve_for(&copy_held));
2601        let mut private = base.clone();
2602        private.private = true;
2603        assert!(preserve_for(&private));
2604    }
2605
2606    fn fs_new() -> MemFs {
2607        MemFs::new()
2608    }
2609
2610    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
2611
2612    #[test]
2613    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
2614        let c = clip("s1");
2615        let mut d = desired(c.clone(), AudioFormat::Mp3);
2616        d.modes = vec![SourceMode::Copy];
2617        let plan = Plan {
2618            actions: vec![Action::Skip {
2619                clip_id: "s1".to_owned(),
2620            }],
2621        };
2622        let mut manifest = Manifest::new();
2623        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
2624        assert!(!manifest.get("s1").unwrap().preserve);
2625
2626        let outcome = run(
2627            &plan,
2628            &mut manifest,
2629            &[d],
2630            &ScriptedHttp::new(),
2631            &fs_new(),
2632            &StubFfmpeg::flac(),
2633            &RecordingClock::new(),
2634            &ExecOptions::default(),
2635        );
2636
2637        assert_eq!(outcome.skipped, 1);
2638        assert!(
2639            manifest.get("s1").unwrap().preserve,
2640            "a copy-held skip must mark the entry preserved"
2641        );
2642    }
2643
2644    #[test]
2645    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
2646        let c = clip("s2");
2647        let d = desired(c.clone(), AudioFormat::Mp3);
2648        let plan = Plan {
2649            actions: vec![Action::Skip {
2650                clip_id: "s2".to_owned(),
2651            }],
2652        };
2653        let mut manifest = Manifest::new();
2654        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
2655        stale.preserve = true;
2656        manifest.insert("s2".to_owned(), stale);
2657
2658        run(
2659            &plan,
2660            &mut manifest,
2661            &[d],
2662            &ScriptedHttp::new(),
2663            &fs_new(),
2664            &StubFfmpeg::flac(),
2665            &RecordingClock::new(),
2666            &ExecOptions::default(),
2667        );
2668
2669        assert!(
2670            !manifest.get("s2").unwrap().preserve,
2671            "a mirror-only skip must clear a stale preserve marker"
2672        );
2673    }
2674
2675    #[test]
2676    fn flac_render_retries_a_rate_limited_wav_lookup() {
2677        let c = clip("rl");
2678        let d = desired(c.clone(), AudioFormat::Flac);
2679        let plan = Plan {
2680            actions: vec![Action::Download {
2681                clip: c.clone(),
2682                lineage: LineageContext::own_root(&c),
2683                path: d.path.clone(),
2684                format: AudioFormat::Flac,
2685            }],
2686        };
2687        let http = ScriptedHttp::new()
2688            .with_auth()
2689            .route_seq(
2690                "/wav_file/",
2691                vec![
2692                    Reply::status(429),
2693                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
2694                ],
2695            )
2696            .route("rl.wav", Reply::ok(b"wav".to_vec()));
2697        let clock = RecordingClock::new();
2698        let mut manifest = Manifest::new();
2699
2700        let outcome = run(
2701            &plan,
2702            &mut manifest,
2703            &[d],
2704            &http,
2705            &fs_new(),
2706            &StubFfmpeg::flac(),
2707            &clock,
2708            &small_poll(),
2709        );
2710
2711        assert_eq!(outcome.downloaded, 1);
2712        assert_eq!(outcome.failed(), 0);
2713        // The render was ready on retry, so no fresh convert_wav was needed.
2714        assert_eq!(http.count("/convert_wav/"), 0);
2715        // One transient backoff (1s base), not the 5s poll interval.
2716        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
2717    }
2718
2719    // ── Phase 6: artifact actions ───────────────────────────────────
2720
2721    #[test]
2722    fn write_artifact_fetches_writes_and_updates_manifest() {
2723        // The owning entry exists (its audio was kept this run); WriteArtifact
2724        // fetches the source, writes the sidecar, and records it on the entry.
2725        let mut manifest = Manifest::new();
2726        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2727        let plan = Plan {
2728            actions: vec![Action::WriteArtifact {
2729                kind: ArtifactKind::CoverJpg,
2730                path: "a/cover.jpg".to_owned(),
2731                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2732                hash: "h1".to_owned(),
2733                owner_id: "a".to_owned(),
2734                content: None,
2735            }],
2736        };
2737        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2738        let fs = MemFs::new();
2739
2740        let outcome = run(
2741            &plan,
2742            &mut manifest,
2743            &[],
2744            &http,
2745            &fs,
2746            &StubFfmpeg::flac(),
2747            &RecordingClock::new(),
2748            &ExecOptions::default(),
2749        );
2750
2751        assert_eq!(outcome.artifacts_written, 1);
2752        assert_eq!(outcome.failed(), 0);
2753        assert_eq!(outcome.status, RunStatus::Completed);
2754        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
2755        assert_eq!(
2756            manifest.get("a").unwrap().cover_jpg,
2757            Some(ArtifactState {
2758                path: "a/cover.jpg".to_owned(),
2759                hash: "h1".to_owned(),
2760            })
2761        );
2762    }
2763
2764    #[test]
2765    fn write_text_sidecar_records_slot_with_no_network_fetch() {
2766        // A generated text sidecar carries its body inline, so it is written
2767        // verbatim with NO HTTP fetch and the details slot records its state.
2768        let mut manifest = Manifest::new();
2769        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2770        let plan = Plan {
2771            actions: vec![Action::WriteArtifact {
2772                kind: ArtifactKind::DetailsTxt,
2773                path: "a.details.txt".to_owned(),
2774                source_url: String::new(),
2775                hash: "dh".to_owned(),
2776                owner_id: "a".to_owned(),
2777                content: Some("Title: A\n".to_owned()),
2778            }],
2779        };
2780        // An empty HTTP script: any fetch would fail, proving none happens.
2781        let http = ScriptedHttp::new();
2782        let fs = MemFs::new();
2783
2784        let outcome = run(
2785            &plan,
2786            &mut manifest,
2787            &[],
2788            &http,
2789            &fs,
2790            &StubFfmpeg::flac(),
2791            &RecordingClock::new(),
2792            &ExecOptions::default(),
2793        );
2794
2795        assert_eq!(outcome.artifacts_written, 1);
2796        assert_eq!(outcome.failed(), 0);
2797        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
2798        assert_eq!(
2799            manifest.get("a").unwrap().details_txt,
2800            Some(ArtifactState {
2801                path: "a.details.txt".to_owned(),
2802                hash: "dh".to_owned(),
2803            })
2804        );
2805    }
2806
2807    #[test]
2808    fn write_lyrics_sidecar_relocation_removes_old_file() {
2809        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
2810        // the executor writes the new file and prunes the stale one.
2811        let mut manifest = Manifest::new();
2812        let mut e = entry("old/a.flac", AudioFormat::Flac);
2813        e.lyrics_txt = Some(ArtifactState {
2814            path: "old/a.lyrics.txt".to_owned(),
2815            hash: "lh".to_owned(),
2816        });
2817        manifest.insert("a", e);
2818        let fs = MemFs::new()
2819            .with_file("old/a.flac", b"AUDIO".to_vec())
2820            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
2821        let plan = Plan {
2822            actions: vec![Action::WriteArtifact {
2823                kind: ArtifactKind::LyricsTxt,
2824                path: "new/a.lyrics.txt".to_owned(),
2825                source_url: String::new(),
2826                hash: "lh".to_owned(),
2827                owner_id: "a".to_owned(),
2828                content: Some("new words\n".to_owned()),
2829            }],
2830        };
2831
2832        let outcome = run(
2833            &plan,
2834            &mut manifest,
2835            &[],
2836            &ScriptedHttp::new(),
2837            &fs,
2838            &StubFfmpeg::flac(),
2839            &RecordingClock::new(),
2840            &ExecOptions::default(),
2841        );
2842
2843        assert_eq!(outcome.failed(), 0);
2844        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
2845        assert!(!fs.exists("old/a.lyrics.txt"));
2846        assert_eq!(
2847            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
2848            "new/a.lyrics.txt"
2849        );
2850    }
2851
2852    #[test]
2853    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
2854        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
2855        // Each write's inline old-path cleanup must skip a path another action
2856        // writes this run, or the second write would delete the first's freshly
2857        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
2858        // for every sidecar, including the .mp4 video.
2859        let mut manifest = Manifest::new();
2860        let mut a = entry("a.flac", AudioFormat::Flac);
2861        a.lyrics_txt = Some(ArtifactState {
2862            path: "x.lyrics.txt".to_owned(),
2863            hash: "ah".to_owned(),
2864        });
2865        manifest.insert("a", a);
2866        let mut b = entry("b.flac", AudioFormat::Flac);
2867        b.lyrics_txt = Some(ArtifactState {
2868            path: "y.lyrics.txt".to_owned(),
2869            hash: "bh".to_owned(),
2870        });
2871        manifest.insert("b", b);
2872        let fs = MemFs::new()
2873            .with_file("a.flac", b"A".to_vec())
2874            .with_file("b.flac", b"B".to_vec())
2875            .with_file("x.lyrics.txt", b"A words\n".to_vec())
2876            .with_file("y.lyrics.txt", b"B words\n".to_vec());
2877        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
2878        let plan = Plan {
2879            actions: vec![
2880                Action::WriteArtifact {
2881                    kind: ArtifactKind::LyricsTxt,
2882                    path: "y.lyrics.txt".to_owned(),
2883                    source_url: String::new(),
2884                    hash: "ah".to_owned(),
2885                    owner_id: "a".to_owned(),
2886                    content: Some("A words\n".to_owned()),
2887                },
2888                Action::WriteArtifact {
2889                    kind: ArtifactKind::LyricsTxt,
2890                    path: "x.lyrics.txt".to_owned(),
2891                    source_url: String::new(),
2892                    hash: "bh".to_owned(),
2893                    owner_id: "b".to_owned(),
2894                    content: Some("B words\n".to_owned()),
2895                },
2896            ],
2897        };
2898
2899        let outcome = run(
2900            &plan,
2901            &mut manifest,
2902            &[],
2903            &ScriptedHttp::new(),
2904            &fs,
2905            &StubFfmpeg::flac(),
2906            &RecordingClock::new(),
2907            &ExecOptions::default(),
2908        );
2909
2910        assert_eq!(outcome.failed(), 0);
2911        // Both freshly written files survive; neither cleanup clobbered the other.
2912        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
2913        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
2914        assert_eq!(
2915            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
2916            "y.lyrics.txt"
2917        );
2918        assert_eq!(
2919            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
2920            "x.lyrics.txt"
2921        );
2922    }
2923
2924    #[test]
2925    fn old_sidecar_kept_when_another_clip_still_references_it() {
2926        // A prior failed swap can leave two clips pointing at one path (A -> y and
2927        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
2928        // still A's live file (#76). tracked_paths counts two references to y, so
2929        // the removal is skipped even though y is not a write target this run.
2930        let mut manifest = Manifest::new();
2931        let mut a = entry("a.flac", AudioFormat::Flac);
2932        a.lyrics_txt = Some(ArtifactState {
2933            path: "y.lyrics.txt".to_owned(),
2934            hash: "ah".to_owned(),
2935        });
2936        manifest.insert("a", a);
2937        let mut b = entry("b.flac", AudioFormat::Flac);
2938        b.lyrics_txt = Some(ArtifactState {
2939            path: "y.lyrics.txt".to_owned(),
2940            hash: "bh".to_owned(),
2941        });
2942        manifest.insert("b", b);
2943        let fs = MemFs::new()
2944            .with_file("a.flac", b"A".to_vec())
2945            .with_file("b.flac", b"B".to_vec())
2946            .with_file("y.lyrics.txt", b"A words\n".to_vec());
2947        // Only B moves this run: y -> x. A is stable, so y is not a write target;
2948        // the tracked-reference count is what protects A's file.
2949        let plan = Plan {
2950            actions: vec![Action::WriteArtifact {
2951                kind: ArtifactKind::LyricsTxt,
2952                path: "x.lyrics.txt".to_owned(),
2953                source_url: String::new(),
2954                hash: "bh".to_owned(),
2955                owner_id: "b".to_owned(),
2956                content: Some("B words\n".to_owned()),
2957            }],
2958        };
2959
2960        let outcome = run(
2961            &plan,
2962            &mut manifest,
2963            &[],
2964            &ScriptedHttp::new(),
2965            &fs,
2966            &StubFfmpeg::flac(),
2967            &RecordingClock::new(),
2968            &ExecOptions::default(),
2969        );
2970
2971        assert_eq!(outcome.failed(), 0);
2972        assert!(
2973            fs.exists("y.lyrics.txt"),
2974            "A's live sidecar must not be deleted"
2975        );
2976        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
2977    }
2978
2979    #[test]
2980    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
2981        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
2982        // When BOTH move away this run, the path is no longer live, so the last
2983        // mover must reclaim it: it is neither kept as an orphan nor deleted while
2984        // still referenced. The dynamic reference count drops to zero only after
2985        // both moves, so exactly the final cleanup removes it (#76).
2986        let mut manifest = Manifest::new();
2987        let mut a = entry("a.flac", AudioFormat::Flac);
2988        a.lyrics_txt = Some(ArtifactState {
2989            path: "s.lyrics.txt".to_owned(),
2990            hash: "ah".to_owned(),
2991        });
2992        manifest.insert("a", a);
2993        let mut b = entry("b.flac", AudioFormat::Flac);
2994        b.lyrics_txt = Some(ArtifactState {
2995            path: "s.lyrics.txt".to_owned(),
2996            hash: "bh".to_owned(),
2997        });
2998        manifest.insert("b", b);
2999        let fs = MemFs::new()
3000            .with_file("a.flac", b"A".to_vec())
3001            .with_file("b.flac", b"B".to_vec())
3002            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3003        let plan = Plan {
3004            actions: vec![
3005                Action::WriteArtifact {
3006                    kind: ArtifactKind::LyricsTxt,
3007                    path: "pa.lyrics.txt".to_owned(),
3008                    source_url: String::new(),
3009                    hash: "ah".to_owned(),
3010                    owner_id: "a".to_owned(),
3011                    content: Some("A words\n".to_owned()),
3012                },
3013                Action::WriteArtifact {
3014                    kind: ArtifactKind::LyricsTxt,
3015                    path: "pb.lyrics.txt".to_owned(),
3016                    source_url: String::new(),
3017                    hash: "bh".to_owned(),
3018                    owner_id: "b".to_owned(),
3019                    content: Some("B words\n".to_owned()),
3020                },
3021            ],
3022        };
3023
3024        let outcome = run(
3025            &plan,
3026            &mut manifest,
3027            &[],
3028            &ScriptedHttp::new(),
3029            &fs,
3030            &StubFfmpeg::flac(),
3031            &RecordingClock::new(),
3032            &ExecOptions::default(),
3033        );
3034
3035        assert_eq!(outcome.failed(), 0);
3036        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3037        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3038        assert!(
3039            !fs.exists("s.lyrics.txt"),
3040            "the vacated shared path must be reclaimed, not orphaned"
3041        );
3042    }
3043
3044    #[test]
3045    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3046        // A text sidecar for a clip with no manifest entry (its audio download
3047        // failed) must be skipped, never writing an untracked file.
3048        let plan = Plan {
3049            actions: vec![Action::WriteArtifact {
3050                kind: ArtifactKind::DetailsTxt,
3051                path: "gone.details.txt".to_owned(),
3052                source_url: String::new(),
3053                hash: "dh".to_owned(),
3054                owner_id: "gone".to_owned(),
3055                content: Some("Title: Gone\n".to_owned()),
3056            }],
3057        };
3058        let fs = MemFs::new();
3059        let mut manifest = Manifest::new();
3060
3061        let outcome = run(
3062            &plan,
3063            &mut manifest,
3064            &[],
3065            &ScriptedHttp::new(),
3066            &fs,
3067            &StubFfmpeg::flac(),
3068            &RecordingClock::new(),
3069            &ExecOptions::default(),
3070        );
3071
3072        assert_eq!(outcome.artifacts_written, 0);
3073        assert_eq!(outcome.skipped, 1);
3074        assert!(!fs.exists("gone.details.txt"));
3075        assert!(manifest.get("gone").is_none());
3076    }
3077
3078    #[test]
3079    fn delete_artifact_removes_file_and_clears_slot() {
3080        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3081        let mut manifest = Manifest::new();
3082        let mut e = entry("a.mp3", AudioFormat::Mp3);
3083        e.cover_jpg = Some(ArtifactState {
3084            path: "a/cover.jpg".to_owned(),
3085            hash: "h1".to_owned(),
3086        });
3087        manifest.insert("a", e);
3088        let plan = Plan {
3089            actions: vec![Action::DeleteArtifact {
3090                kind: ArtifactKind::CoverJpg,
3091                path: "a/cover.jpg".to_owned(),
3092                owner_id: "a".to_owned(),
3093            }],
3094        };
3095
3096        let outcome = run(
3097            &plan,
3098            &mut manifest,
3099            &[],
3100            &ScriptedHttp::new(),
3101            &fs,
3102            &StubFfmpeg::flac(),
3103            &RecordingClock::new(),
3104            &ExecOptions::default(),
3105        );
3106
3107        assert_eq!(outcome.artifacts_deleted, 1);
3108        assert!(!fs.exists("a/cover.jpg"));
3109        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3110    }
3111
3112    #[test]
3113    fn delete_artifact_tolerates_already_absent_file() {
3114        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3115        // is not a failure.
3116        let mut manifest = Manifest::new();
3117        let mut e = entry("a.mp3", AudioFormat::Mp3);
3118        e.cover_jpg = Some(ArtifactState {
3119            path: "a/cover.jpg".to_owned(),
3120            hash: "h1".to_owned(),
3121        });
3122        manifest.insert("a", e);
3123        let plan = Plan {
3124            actions: vec![Action::DeleteArtifact {
3125                kind: ArtifactKind::CoverJpg,
3126                path: "a/cover.jpg".to_owned(),
3127                owner_id: "a".to_owned(),
3128            }],
3129        };
3130
3131        let outcome = run(
3132            &plan,
3133            &mut manifest,
3134            &[],
3135            &ScriptedHttp::new(),
3136            &MemFs::new(),
3137            &StubFfmpeg::flac(),
3138            &RecordingClock::new(),
3139            &ExecOptions::default(),
3140        );
3141
3142        assert_eq!(outcome.artifacts_deleted, 1);
3143        assert_eq!(outcome.failed(), 0);
3144        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3145    }
3146
3147    #[test]
3148    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
3149        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
3150        // the run continues and the following WriteArtifact still succeeds.
3151        let mut manifest = Manifest::new();
3152        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3153        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3154        let plan = Plan {
3155            actions: vec![
3156                Action::WriteArtifact {
3157                    kind: ArtifactKind::CoverJpg,
3158                    path: "a/cover.jpg".to_owned(),
3159                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3160                    hash: "h1".to_owned(),
3161                    owner_id: "a".to_owned(),
3162                    content: None,
3163                },
3164                Action::WriteArtifact {
3165                    kind: ArtifactKind::CoverJpg,
3166                    path: "b/cover.jpg".to_owned(),
3167                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3168                    hash: "h2".to_owned(),
3169                    owner_id: "b".to_owned(),
3170                    content: None,
3171                },
3172            ],
3173        };
3174        let http = ScriptedHttp::new()
3175            .route("a/large.jpg", Reply::status(404))
3176            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3177        let fs = MemFs::new();
3178
3179        let outcome = run(
3180            &plan,
3181            &mut manifest,
3182            &[],
3183            &http,
3184            &fs,
3185            &StubFfmpeg::flac(),
3186            &RecordingClock::new(),
3187            &ExecOptions::default(),
3188        );
3189
3190        assert_eq!(outcome.status, RunStatus::Completed);
3191        assert_eq!(outcome.failed(), 1);
3192        assert_eq!(outcome.failures[0].clip_id, "a");
3193        assert_eq!(outcome.artifacts_written, 1);
3194        // The failed sidecar left no file and no manifest record.
3195        assert!(!fs.exists("a/cover.jpg"));
3196        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3197        // The following sidecar was written and recorded.
3198        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3199        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3200    }
3201
3202    #[test]
3203    fn co_delete_executes_audio_delete_then_artifact_delete() {
3204        // The plan orders the audio Delete before its sidecar DeleteArtifact.
3205        // The audio delete removes the manifest entry; the sidecar delete then
3206        // removes the file and tolerates the now-absent entry.
3207        let fs = MemFs::new()
3208            .with_file("gone.mp3", b"DATA".to_vec())
3209            .with_file("gone/cover.jpg", b"jpg".to_vec());
3210        let mut manifest = Manifest::new();
3211        let mut e = entry("gone.mp3", AudioFormat::Mp3);
3212        e.cover_jpg = Some(ArtifactState {
3213            path: "gone/cover.jpg".to_owned(),
3214            hash: "h1".to_owned(),
3215        });
3216        manifest.insert("gone", e);
3217        let plan = Plan {
3218            actions: vec![
3219                Action::Delete {
3220                    path: "gone.mp3".to_owned(),
3221                    clip_id: "gone".to_owned(),
3222                },
3223                Action::DeleteArtifact {
3224                    kind: ArtifactKind::CoverJpg,
3225                    path: "gone/cover.jpg".to_owned(),
3226                    owner_id: "gone".to_owned(),
3227                },
3228            ],
3229        };
3230
3231        let outcome = run(
3232            &plan,
3233            &mut manifest,
3234            &[],
3235            &ScriptedHttp::new(),
3236            &fs,
3237            &StubFfmpeg::flac(),
3238            &RecordingClock::new(),
3239            &ExecOptions::default(),
3240        );
3241
3242        assert_eq!(outcome.deleted, 1);
3243        assert_eq!(outcome.artifacts_deleted, 1);
3244        assert_eq!(outcome.failed(), 0);
3245        assert!(!fs.exists("gone.mp3"));
3246        assert!(!fs.exists("gone/cover.jpg"));
3247        assert!(manifest.get("gone").is_none());
3248    }
3249
3250    #[test]
3251    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
3252        // A clip whose Download fails leaves no manifest entry, so its following
3253        // WriteArtifact must not strand an untracked sidecar: it is skipped with
3254        // no fetch and no write. A following healthy clip still succeeds.
3255        let ca = clip("a");
3256        let plan = Plan {
3257            actions: vec![
3258                Action::Download {
3259                    clip: ca.clone(),
3260                    lineage: LineageContext::own_root(&ca),
3261                    path: "a.mp3".to_owned(),
3262                    format: AudioFormat::Mp3,
3263                },
3264                Action::WriteArtifact {
3265                    kind: ArtifactKind::CoverJpg,
3266                    path: "a/cover.jpg".to_owned(),
3267                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3268                    hash: "h1".to_owned(),
3269                    owner_id: "a".to_owned(),
3270                    content: None,
3271                },
3272                Action::WriteArtifact {
3273                    kind: ArtifactKind::CoverJpg,
3274                    path: "b/cover.jpg".to_owned(),
3275                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3276                    hash: "h2".to_owned(),
3277                    owner_id: "b".to_owned(),
3278                    content: None,
3279                },
3280            ],
3281        };
3282        // The Download's audio 404s (permanent), so no entry for "a" is created.
3283        let http = ScriptedHttp::new()
3284            .route("a.mp3", Reply::status(404))
3285            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
3286            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3287        let fs = MemFs::new();
3288        let mut manifest = Manifest::new();
3289        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
3290        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3291
3292        let outcome = run(
3293            &plan,
3294            &mut manifest,
3295            &[],
3296            &http,
3297            &fs,
3298            &StubFfmpeg::flac(),
3299            &RecordingClock::new(),
3300            &ExecOptions::default(),
3301        );
3302
3303        assert_eq!(outcome.status, RunStatus::Completed);
3304        // The audio download is the only failure; the orphan artifact is skipped.
3305        assert_eq!(outcome.failed(), 1);
3306        assert_eq!(outcome.failures[0].clip_id, "a");
3307        assert_eq!(outcome.skipped, 1);
3308        // The orphan sidecar was neither fetched nor written, and left no record.
3309        assert_eq!(http.count("a/large.jpg"), 0);
3310        assert!(!fs.exists("a/cover.jpg"));
3311        assert!(manifest.get("a").is_none());
3312        // The healthy clip's sidecar still succeeded.
3313        assert_eq!(outcome.artifacts_written, 1);
3314        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3315        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3316    }
3317
3318    #[test]
3319    fn write_artifact_transcodes_animated_cover_to_webp() {
3320        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
3321        // port, and writes the transcoded WebP (not the fetched MP4), recording
3322        // the sidecar on the owning entry.
3323        let mut manifest = Manifest::new();
3324        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3325        let plan = Plan {
3326            actions: vec![Action::WriteArtifact {
3327                kind: ArtifactKind::CoverWebp,
3328                path: "a/cover.webp".to_owned(),
3329                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
3330                hash: "v1".to_owned(),
3331                owner_id: "a".to_owned(),
3332                content: None,
3333            }],
3334        };
3335        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
3336        let fs = MemFs::new();
3337        let ffmpeg = StubFfmpeg::webp();
3338
3339        let outcome = run(
3340            &plan,
3341            &mut manifest,
3342            &[],
3343            &http,
3344            &fs,
3345            &ffmpeg,
3346            &RecordingClock::new(),
3347            &ExecOptions::default(),
3348        );
3349
3350        assert_eq!(outcome.artifacts_written, 1);
3351        assert_eq!(outcome.failed(), 0);
3352        assert_eq!(outcome.status, RunStatus::Completed);
3353        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
3354        assert_eq!(http.count("a/video.mp4"), 1);
3355        let written = fs.read_file("a/cover.webp").unwrap();
3356        assert_ne!(written, b"mp4-bytes");
3357        assert!(written.starts_with(b"RIFF"));
3358        assert_eq!(
3359            manifest.get("a").unwrap().cover_webp,
3360            Some(ArtifactState {
3361                path: "a/cover.webp".to_owned(),
3362                hash: "v1".to_owned(),
3363            })
3364        );
3365    }
3366
3367    #[test]
3368    fn write_artifact_webp_transcode_failure_is_per_clip() {
3369        // A transcode failure is attributed to the owning clip: it is a per-clip
3370        // failure, the run completes, no sidecar is written, and the slot stays
3371        // empty. A healthy static cover in the same run still succeeds.
3372        let mut manifest = Manifest::new();
3373        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3374        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3375        let plan = Plan {
3376            actions: vec![
3377                Action::WriteArtifact {
3378                    kind: ArtifactKind::CoverWebp,
3379                    path: "a/cover.webp".to_owned(),
3380                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
3381                    hash: "v1".to_owned(),
3382                    owner_id: "a".to_owned(),
3383                    content: None,
3384                },
3385                Action::WriteArtifact {
3386                    kind: ArtifactKind::CoverJpg,
3387                    path: "b/cover.jpg".to_owned(),
3388                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3389                    hash: "h1".to_owned(),
3390                    owner_id: "b".to_owned(),
3391                    content: None,
3392                },
3393            ],
3394        };
3395        let http = ScriptedHttp::new()
3396            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
3397            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3398        let fs = MemFs::new();
3399
3400        let outcome = run(
3401            &plan,
3402            &mut manifest,
3403            &[],
3404            &http,
3405            &fs,
3406            &StubFfmpeg::failing(),
3407            &RecordingClock::new(),
3408            &ExecOptions::default(),
3409        );
3410
3411        assert_eq!(outcome.status, RunStatus::Completed);
3412        assert_eq!(outcome.failed(), 1);
3413        assert_eq!(outcome.failures[0].clip_id, "a");
3414        // The animated cover failed to transcode: nothing written, slot empty.
3415        assert!(!fs.exists("a/cover.webp"));
3416        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
3417        // The static cover in the same run still succeeded.
3418        assert_eq!(outcome.artifacts_written, 1);
3419        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3420        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3421    }
3422
3423    // ── Phase 8: folder art routes to the album store ───────────────
3424
3425    #[test]
3426    fn folder_jpg_write_records_album_state_and_skips_manifest() {
3427        // Folder art is owned by the album root id, not a manifest clip: it
3428        // writes even with an empty manifest and records on the album store.
3429        let mut manifest = Manifest::new();
3430        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3431        let plan = Plan {
3432            actions: vec![Action::WriteArtifact {
3433                kind: ArtifactKind::FolderJpg,
3434                path: "creator/album/folder.jpg".to_owned(),
3435                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
3436                hash: "jh".to_owned(),
3437                owner_id: "root".to_owned(),
3438                content: None,
3439            }],
3440        };
3441        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
3442        let fs = MemFs::new();
3443
3444        let outcome = run_with_albums(
3445            &plan,
3446            &mut manifest,
3447            &mut albums,
3448            &[],
3449            &http,
3450            &fs,
3451            &StubFfmpeg::flac(),
3452            &RecordingClock::new(),
3453            &ExecOptions::default(),
3454        );
3455
3456        assert_eq!(outcome.artifacts_written, 1);
3457        assert_eq!(outcome.status, RunStatus::Completed);
3458        assert_eq!(
3459            fs.read_file("creator/album/folder.jpg").unwrap(),
3460            b"folder-jpg"
3461        );
3462        assert_eq!(
3463            albums.get("root").unwrap().folder_jpg,
3464            Some(ArtifactState {
3465                path: "creator/album/folder.jpg".to_owned(),
3466                hash: "jh".to_owned(),
3467            })
3468        );
3469        assert!(manifest.get("root").is_none());
3470    }
3471
3472    #[test]
3473    fn folder_webp_write_transcodes_and_records_album_state() {
3474        let mut manifest = Manifest::new();
3475        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3476        let plan = Plan {
3477            actions: vec![Action::WriteArtifact {
3478                kind: ArtifactKind::FolderWebp,
3479                path: "creator/album/cover.webp".to_owned(),
3480                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
3481                hash: "wh".to_owned(),
3482                owner_id: "root".to_owned(),
3483                content: None,
3484            }],
3485        };
3486        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
3487        let fs = MemFs::new();
3488
3489        let outcome = run_with_albums(
3490            &plan,
3491            &mut manifest,
3492            &mut albums,
3493            &[],
3494            &http,
3495            &fs,
3496            &StubFfmpeg::webp(),
3497            &RecordingClock::new(),
3498            &ExecOptions::default(),
3499        );
3500
3501        assert_eq!(outcome.artifacts_written, 1);
3502        assert_eq!(outcome.failed(), 0);
3503        // The MP4 was transcoded to WebP, not written verbatim.
3504        let written = fs.read_file("creator/album/cover.webp").unwrap();
3505        assert_ne!(written, b"mp4-bytes");
3506        assert!(written.starts_with(b"RIFF"));
3507        assert_eq!(
3508            albums.get("root").unwrap().folder_webp,
3509            Some(ArtifactState {
3510                path: "creator/album/cover.webp".to_owned(),
3511                hash: "wh".to_owned(),
3512            })
3513        );
3514    }
3515
3516    #[test]
3517    fn folder_art_delete_clears_album_state() {
3518        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
3519        let mut manifest = Manifest::new();
3520        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3521        albums.insert(
3522            "root".to_owned(),
3523            AlbumArt {
3524                folder_jpg: Some(ArtifactState {
3525                    path: "creator/album/folder.jpg".to_owned(),
3526                    hash: "jh".to_owned(),
3527                }),
3528                folder_webp: None,
3529            },
3530        );
3531        let plan = Plan {
3532            actions: vec![Action::DeleteArtifact {
3533                kind: ArtifactKind::FolderJpg,
3534                path: "creator/album/folder.jpg".to_owned(),
3535                owner_id: "root".to_owned(),
3536            }],
3537        };
3538
3539        let outcome = run_with_albums(
3540            &plan,
3541            &mut manifest,
3542            &mut albums,
3543            &[],
3544            &ScriptedHttp::new(),
3545            &fs,
3546            &StubFfmpeg::flac(),
3547            &RecordingClock::new(),
3548            &ExecOptions::default(),
3549        );
3550
3551        assert_eq!(outcome.artifacts_deleted, 1);
3552        assert!(!fs.exists("creator/album/folder.jpg"));
3553        // The album row had only the one kind, so it is pruned entirely.
3554        assert!(!albums.contains_key("root"));
3555    }
3556
3557    // ── Phase 9: playlist artifacts ─────────────────────────────────
3558
3559    #[test]
3560    fn playlist_write_uses_inline_content_and_records_state() {
3561        // A playlist body is generated, carried inline. With an empty manifest
3562        // and NO http routes, the write still succeeds — proving it skipped the
3563        // network — and records the playlist store keyed by the playlist id.
3564        let mut manifest = Manifest::new();
3565        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3566        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
3567        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
3568        let plan = Plan {
3569            actions: vec![Action::WriteArtifact {
3570                kind: ArtifactKind::Playlist,
3571                path: "Road Trip.m3u8".to_owned(),
3572                source_url: String::new(),
3573                hash: "ph1".to_owned(),
3574                owner_id: "pl1".to_owned(),
3575                content: Some(body.to_owned()),
3576            }],
3577        };
3578        let fs = MemFs::new();
3579
3580        let outcome = run_full(
3581            &plan,
3582            &mut manifest,
3583            &mut albums,
3584            &mut playlists,
3585            &[],
3586            &ScriptedHttp::new(),
3587            &fs,
3588            &StubFfmpeg::flac(),
3589            &RecordingClock::new(),
3590            &ExecOptions::default(),
3591        );
3592
3593        assert_eq!(outcome.artifacts_written, 1);
3594        assert_eq!(outcome.failed(), 0);
3595        // The exact inline bytes were written, verbatim.
3596        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
3597        assert_eq!(
3598            playlists.get("pl1"),
3599            Some(&PlaylistState {
3600                name: "Road Trip".to_owned(),
3601                path: "Road Trip.m3u8".to_owned(),
3602                hash: "ph1".to_owned(),
3603            })
3604        );
3605    }
3606
3607    #[test]
3608    fn playlist_delete_removes_file_and_clears_state() {
3609        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
3610        let mut manifest = Manifest::new();
3611        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3612        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
3613        playlists.insert(
3614            "pl1".to_owned(),
3615            PlaylistState {
3616                name: "Old".to_owned(),
3617                path: "Old.m3u8".to_owned(),
3618                hash: "ph1".to_owned(),
3619            },
3620        );
3621        let plan = Plan {
3622            actions: vec![Action::DeleteArtifact {
3623                kind: ArtifactKind::Playlist,
3624                path: "Old.m3u8".to_owned(),
3625                owner_id: "pl1".to_owned(),
3626            }],
3627        };
3628
3629        let outcome = run_full(
3630            &plan,
3631            &mut manifest,
3632            &mut albums,
3633            &mut playlists,
3634            &[],
3635            &ScriptedHttp::new(),
3636            &fs,
3637            &StubFfmpeg::flac(),
3638            &RecordingClock::new(),
3639            &ExecOptions::default(),
3640        );
3641
3642        assert_eq!(outcome.artifacts_deleted, 1);
3643        assert!(!fs.exists("Old.m3u8"));
3644        assert!(
3645            !playlists.contains_key("pl1"),
3646            "the playlist row is cleared on delete"
3647        );
3648    }
3649
3650    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
3651
3652    #[test]
3653    fn rename_move_relocates_cover_and_prunes_old_album() {
3654        // A title/album change moves the audio (Rename) and re-emits the cover
3655        // at the NEW path. The old cover must be removed and the now-empty old
3656        // album directory pruned, leaving no orphan sidecar and no ghost dir.
3657        let mut manifest = Manifest::new();
3658        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
3659        e.cover_jpg = Some(ArtifactState {
3660            path: "Creator/AlbumA/cover.jpg".to_owned(),
3661            hash: "h1".to_owned(),
3662        });
3663        manifest.insert("a", e);
3664        let fs = MemFs::new()
3665            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
3666            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
3667        let plan = Plan {
3668            actions: vec![
3669                Action::Rename {
3670                    from: "Creator/AlbumA/song.flac".to_owned(),
3671                    to: "Creator/AlbumB/song.flac".to_owned(),
3672                },
3673                Action::WriteArtifact {
3674                    kind: ArtifactKind::CoverJpg,
3675                    path: "Creator/AlbumB/cover.jpg".to_owned(),
3676                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3677                    hash: "h1".to_owned(),
3678                    owner_id: "a".to_owned(),
3679                    content: None,
3680                },
3681            ],
3682        };
3683        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
3684
3685        let outcome = run(
3686            &plan,
3687            &mut manifest,
3688            &[],
3689            &http,
3690            &fs,
3691            &StubFfmpeg::flac(),
3692            &RecordingClock::new(),
3693            &ExecOptions::default(),
3694        );
3695
3696        assert_eq!(outcome.failed(), 0);
3697        // Audio moved, the new cover was written, the old cover removed.
3698        assert!(fs.exists("Creator/AlbumB/song.flac"));
3699        assert_eq!(
3700            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
3701            b"new-jpg"
3702        );
3703        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
3704        assert!(!fs.exists("Creator/AlbumA/song.flac"));
3705        // The manifest cover slot now points at the new path.
3706        assert_eq!(
3707            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
3708            "Creator/AlbumB/cover.jpg"
3709        );
3710        // The emptied old album directory is pruned; the new one survives.
3711        assert!(!fs.has_dir("Creator/AlbumA"));
3712        assert!(fs.has_dir("Creator/AlbumB"));
3713    }
3714
3715    #[test]
3716    fn rename_move_relocates_folder_art_and_prunes_old_album() {
3717        // An album rename moves folder.jpg: the old file is removed, the album
3718        // store slot advanced to the new path, and the emptied dir pruned.
3719        let mut manifest = Manifest::new();
3720        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3721        albums.insert(
3722            "root".to_owned(),
3723            AlbumArt {
3724                folder_jpg: Some(ArtifactState {
3725                    path: "Creator/AlbumA/folder.jpg".to_owned(),
3726                    hash: "jh".to_owned(),
3727                }),
3728                folder_webp: None,
3729            },
3730        );
3731        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
3732        let plan = Plan {
3733            actions: vec![Action::WriteArtifact {
3734                kind: ArtifactKind::FolderJpg,
3735                path: "Creator/AlbumB/folder.jpg".to_owned(),
3736                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
3737                hash: "jh".to_owned(),
3738                owner_id: "root".to_owned(),
3739                content: None,
3740            }],
3741        };
3742        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
3743
3744        let outcome = run_with_albums(
3745            &plan,
3746            &mut manifest,
3747            &mut albums,
3748            &[],
3749            &http,
3750            &fs,
3751            &StubFfmpeg::flac(),
3752            &RecordingClock::new(),
3753            &ExecOptions::default(),
3754        );
3755
3756        assert_eq!(outcome.failed(), 0);
3757        assert_eq!(
3758            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
3759            b"new-folder"
3760        );
3761        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
3762        assert_eq!(
3763            albums
3764                .get("root")
3765                .unwrap()
3766                .folder_jpg
3767                .as_ref()
3768                .unwrap()
3769                .path,
3770            "Creator/AlbumB/folder.jpg"
3771        );
3772        assert!(!fs.has_dir("Creator/AlbumA"));
3773        assert!(fs.has_dir("Creator/AlbumB"));
3774    }
3775
3776    #[test]
3777    fn prune_empty_dirs_removes_only_empty_dirs() {
3778        // A direct exercise of the prune port's safety guarantees on a mixed
3779        // tree: nested empties go, anything holding a file (hidden ones too)
3780        // stays, and no file is touched.
3781        let fs = MemFs::new()
3782            .with_file("keep/full/song.flac", b"x".to_vec())
3783            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
3784            .with_dir("empty/leaf")
3785            .with_dir("nested/a/b/c");
3786
3787        fs.prune_empty_dirs("").unwrap();
3788
3789        // Every empty directory, however deeply nested, is pruned bottom-up.
3790        for gone in [
3791            "empty",
3792            "empty/leaf",
3793            "nested",
3794            "nested/a",
3795            "nested/a/b",
3796            "nested/a/b/c",
3797        ] {
3798            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
3799        }
3800        // A directory holding any file — including only a hidden dotfile — stays.
3801        assert!(fs.has_dir("keep"));
3802        assert!(fs.has_dir("keep/full"));
3803        assert!(fs.has_dir("hidden"));
3804        // No file was touched.
3805        assert!(fs.exists("keep/full/song.flac"));
3806        assert!(fs.exists("hidden/.suno-manifest.json"));
3807    }
3808
3809    #[test]
3810    fn prune_empty_dirs_never_removes_the_named_root() {
3811        // Pruning under a named root clears its empty children but keeps the
3812        // root itself, even when the root is now empty.
3813        let fs = MemFs::new().with_dir("empty/leaf");
3814        fs.prune_empty_dirs("empty").unwrap();
3815        assert!(fs.has_dir("empty"), "the named root is never removed");
3816        assert!(!fs.has_dir("empty/leaf"));
3817    }
3818
3819    #[test]
3820    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
3821        // If removing the old sidecar fails, the write is a per-clip failure
3822        // that never aborts the run and does NOT advance the state slot, so the
3823        // next identical run re-attempts the cleanup and the tree converges.
3824        let mut manifest = Manifest::new();
3825        let mut e = entry("a.flac", AudioFormat::Flac);
3826        e.cover_jpg = Some(ArtifactState {
3827            path: "AlbumA/cover.jpg".to_owned(),
3828            hash: "h1".to_owned(),
3829        });
3830        manifest.insert("a", e);
3831        let fs = MemFs::new()
3832            .with_file("a.flac", b"AUDIO".to_vec())
3833            .with_file("AlbumA/cover.jpg", b"old".to_vec());
3834        let plan = Plan {
3835            actions: vec![Action::WriteArtifact {
3836                kind: ArtifactKind::CoverJpg,
3837                path: "AlbumB/cover.jpg".to_owned(),
3838                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3839                hash: "h1".to_owned(),
3840                owner_id: "a".to_owned(),
3841                content: None,
3842            }],
3843        };
3844        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
3845
3846        // Run 1: the old-cover remove is forced to fail.
3847        fs.arm_fail_remove("AlbumA/cover.jpg");
3848        let first = run(
3849            &plan,
3850            &mut manifest,
3851            &[],
3852            &http,
3853            &fs,
3854            &StubFfmpeg::flac(),
3855            &RecordingClock::new(),
3856            &ExecOptions::default(),
3857        );
3858        assert_eq!(
3859            first.status,
3860            RunStatus::Completed,
3861            "a remove failure never aborts the run"
3862        );
3863        assert_eq!(first.failed(), 1);
3864        // The new cover is written but the old one lingers and the slot is stale.
3865        assert!(fs.exists("AlbumB/cover.jpg"));
3866        assert!(fs.exists("AlbumA/cover.jpg"));
3867        assert_eq!(
3868            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
3869            "AlbumA/cover.jpg"
3870        );
3871        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
3872
3873        // Run 2: the same plan re-runs with the fault cleared and converges.
3874        fs.disarm_fail_remove("AlbumA/cover.jpg");
3875        let second = run(
3876            &plan,
3877            &mut manifest,
3878            &[],
3879            &http,
3880            &fs,
3881            &StubFfmpeg::flac(),
3882            &RecordingClock::new(),
3883            &ExecOptions::default(),
3884        );
3885        assert_eq!(second.failed(), 0);
3886        assert!(fs.exists("AlbumB/cover.jpg"));
3887        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
3888        assert_eq!(
3889            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
3890            "AlbumB/cover.jpg"
3891        );
3892        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
3893    }
3894
3895    #[test]
3896    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
3897        // The idempotent case: a content-only cover rewrite (hash drift, path
3898        // unchanged) attempts no remove and prunes no live directory. A remove
3899        // failure is armed on the cover path, so any spurious remove would
3900        // surface as a failure — none does.
3901        let mut manifest = Manifest::new();
3902        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
3903        e.cover_jpg = Some(ArtifactState {
3904            path: "Album/cover.jpg".to_owned(),
3905            hash: "h1".to_owned(),
3906        });
3907        manifest.insert("a", e);
3908        let fs = MemFs::new()
3909            .with_file("Album/a.mp3", b"AUDIO".to_vec())
3910            .with_file("Album/cover.jpg", b"old".to_vec());
3911        fs.arm_fail_remove("Album/cover.jpg");
3912        let plan = Plan {
3913            actions: vec![Action::WriteArtifact {
3914                kind: ArtifactKind::CoverJpg,
3915                path: "Album/cover.jpg".to_owned(),
3916                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3917                hash: "h2".to_owned(),
3918                owner_id: "a".to_owned(),
3919                content: None,
3920            }],
3921        };
3922        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
3923
3924        let outcome = run(
3925            &plan,
3926            &mut manifest,
3927            &[],
3928            &http,
3929            &fs,
3930            &StubFfmpeg::flac(),
3931            &RecordingClock::new(),
3932            &ExecOptions::default(),
3933        );
3934
3935        assert_eq!(
3936            outcome.failed(),
3937            0,
3938            "no remove is attempted, so the armed failure never fires"
3939        );
3940        assert_eq!(outcome.artifacts_written, 1);
3941        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
3942        assert_eq!(
3943            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
3944            "h2"
3945        );
3946        // The live directory is untouched by prune.
3947        assert!(fs.has_dir("Album"));
3948    }
3949
3950    // ── Concurrency (issue #22) ─────────────────────────────────────
3951
3952    mod concurrency {
3953        use super::*;
3954        use crate::ffmpeg::FfmpegError;
3955        use crate::fs::{FileStat, FsError};
3956        use crate::http::{HttpRequest, TransportError};
3957        use std::future::Future;
3958        use std::pin::Pin;
3959        use std::sync::Arc;
3960        use std::sync::atomic::{AtomicUsize, Ordering};
3961        use std::task::{Context, Poll};
3962
3963        /// A future that pends exactly once before resolving, waking itself so a
3964        /// single-threaded executor re-polls. It forces the [`Http`] port to
3965        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
3966        /// each in-flight request and the true overlap becomes observable.
3967        #[derive(Default)]
3968        struct YieldOnce {
3969            yielded: bool,
3970        }
3971
3972        impl Future for YieldOnce {
3973            type Output = ();
3974            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
3975                if self.yielded {
3976                    Poll::Ready(())
3977                } else {
3978                    self.yielded = true;
3979                    cx.waker().wake_by_ref();
3980                    Poll::Pending
3981                }
3982            }
3983        }
3984
3985        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
3986        /// number of concurrently in-flight requests. Each `send` bumps a live
3987        /// counter, yields once (so peers can start), then delegates.
3988        struct GatedHttp {
3989            inner: ScriptedHttp,
3990            inflight: Arc<AtomicUsize>,
3991            peak: Arc<AtomicUsize>,
3992        }
3993
3994        impl GatedHttp {
3995            fn new(inner: ScriptedHttp) -> Self {
3996                Self {
3997                    inner,
3998                    inflight: Arc::new(AtomicUsize::new(0)),
3999                    peak: Arc::new(AtomicUsize::new(0)),
4000                }
4001            }
4002
4003            fn peak(&self) -> usize {
4004                self.peak.load(Ordering::SeqCst)
4005            }
4006        }
4007
4008        impl Http for GatedHttp {
4009            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
4010                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
4011                self.peak.fetch_max(now, Ordering::SeqCst);
4012                YieldOnce::default().await;
4013                let out = self.inner.send(request).await;
4014                self.inflight.fetch_sub(1, Ordering::SeqCst);
4015                out
4016            }
4017        }
4018
4019        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
4020            let c = clip(id);
4021            let d = desired(c.clone(), format);
4022            let action = Action::Download {
4023                clip: c.clone(),
4024                lineage: LineageContext::own_root(&c),
4025                path: d.path.clone(),
4026                format,
4027            };
4028            (c, d, action)
4029        }
4030
4031        fn opts_with(concurrency: u32) -> ExecOptions {
4032            ExecOptions {
4033                concurrency,
4034                ..small_poll()
4035            }
4036        }
4037
4038        #[test]
4039        fn concurrency_never_exceeds_the_configured_bound() {
4040            let count = 6;
4041            let concurrency = 3;
4042            let mut scripted = ScriptedHttp::new().with_auth();
4043            let mut actions = Vec::new();
4044            let mut desireds = Vec::new();
4045            for i in 0..count {
4046                let id = format!("c{i}");
4047                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
4048                let (_c, d, action) = download(&id, AudioFormat::Mp3);
4049                actions.push(action);
4050                desireds.push(d);
4051            }
4052            let http = GatedHttp::new(scripted);
4053            let fs = MemFs::new();
4054            let plan = Plan { actions };
4055            let mut manifest = Manifest::new();
4056
4057            let outcome = run_gated_fs(
4058                &plan,
4059                &mut manifest,
4060                &desireds,
4061                &http,
4062                &fs,
4063                &opts_with(concurrency),
4064            );
4065
4066            assert_eq!(outcome.downloaded, count);
4067            assert!(
4068                http.peak() <= concurrency as usize,
4069                "peak {} exceeded the bound {concurrency}",
4070                http.peak()
4071            );
4072            assert_eq!(
4073                http.peak(),
4074                concurrency as usize,
4075                "expected the run to saturate the bound"
4076            );
4077        }
4078
4079        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
4080        /// outcome. The client is built here so the limiter can be inspected by
4081        /// the caller-facing variant below.
4082        fn run_gated_fs(
4083            plan: &Plan,
4084            manifest: &mut Manifest,
4085            desired: &[Desired],
4086            http: &GatedHttp,
4087            fs: &MemFs,
4088            opts: &ExecOptions,
4089        ) -> ExecOutcome {
4090            let ffmpeg = StubFfmpeg::flac();
4091            let clock = RecordingClock::new();
4092            let mut albums = BTreeMap::new();
4093            let mut playlists = BTreeMap::new();
4094            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
4095            pollster::block_on(execute(
4096                plan,
4097                manifest,
4098                &mut albums,
4099                &mut playlists,
4100                desired,
4101                Ports {
4102                    client: &mut client,
4103                    http,
4104                    fs,
4105                    ffmpeg: &ffmpeg,
4106                    clock: &clock,
4107                },
4108                opts,
4109            ))
4110        }
4111
4112        #[test]
4113        fn a_failing_clip_does_not_abort_the_others() {
4114            let mut scripted = ScriptedHttp::new().with_auth();
4115            scripted = scripted
4116                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
4117                .route("bad.mp3", Reply::status(404))
4118                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
4119            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
4120            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
4121            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
4122            let http = GatedHttp::new(scripted);
4123            let fs = MemFs::new();
4124            let plan = Plan {
4125                actions: vec![a1, a2, a3],
4126            };
4127            let mut manifest = Manifest::new();
4128
4129            let outcome = run_gated_fs(
4130                &plan,
4131                &mut manifest,
4132                &[d1, d2, d3],
4133                &http,
4134                &fs,
4135                &opts_with(3),
4136            );
4137
4138            assert_eq!(outcome.downloaded, 2);
4139            assert_eq!(outcome.failed(), 1);
4140            assert_eq!(outcome.status, RunStatus::Completed);
4141            assert_eq!(outcome.failures[0].clip_id, "bad");
4142            assert!(manifest.get("ok1").is_some());
4143            assert!(manifest.get("ok2").is_some());
4144            assert!(manifest.get("bad").is_none());
4145        }
4146
4147        #[test]
4148        fn outcome_is_identical_across_concurrency_levels() {
4149            // A plan mixing successful and failing downloads with serial phase-2
4150            // actions (a skip and a delete), so both phases contribute.
4151            fn build() -> (Plan, Vec<Desired>) {
4152                let mut actions = Vec::new();
4153                let mut desireds = Vec::new();
4154                for id in ["a", "b", "c", "d"] {
4155                    let (_c, d, action) = download(id, AudioFormat::Mp3);
4156                    actions.push(action);
4157                    desireds.push(d);
4158                }
4159                // A failing download in the middle of the audio set.
4160                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
4161                actions.insert(2, ae);
4162                desireds.push(de);
4163                // Phase-2 actions.
4164                actions.push(Action::Skip {
4165                    clip_id: "gone".to_owned(),
4166                });
4167                actions.push(Action::Delete {
4168                    path: "old.mp3".to_owned(),
4169                    clip_id: "old".to_owned(),
4170                });
4171                (Plan { actions }, desireds)
4172            }
4173
4174            fn http() -> ScriptedHttp {
4175                ScriptedHttp::new()
4176                    .with_auth()
4177                    .route("a.mp3", Reply::ok(b"a".to_vec()))
4178                    .route("b.mp3", Reply::ok(b"b".to_vec()))
4179                    .route("c.mp3", Reply::ok(b"c".to_vec()))
4180                    .route("d.mp3", Reply::ok(b"d".to_vec()))
4181                    .route("fail.mp3", Reply::status(404))
4182            }
4183
4184            fn seed_manifest() -> Manifest {
4185                let mut m = Manifest::new();
4186                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
4187                m
4188            }
4189
4190            let (plan, desireds) = build();
4191
4192            let mut m1 = seed_manifest();
4193            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
4194            let out1 = run_gated_fs(
4195                &plan,
4196                &mut m1,
4197                &desireds,
4198                &GatedHttp::new(http()),
4199                &fs1,
4200                &opts_with(1),
4201            );
4202
4203            let mut m8 = seed_manifest();
4204            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
4205            let out8 = run_gated_fs(
4206                &plan,
4207                &mut m8,
4208                &desireds,
4209                &GatedHttp::new(http()),
4210                &fs8,
4211                &opts_with(8),
4212            );
4213
4214            assert_eq!(out1, out8, "outcome must not depend on concurrency");
4215            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
4216            assert_eq!(out8.downloaded, 4);
4217            assert_eq!(out8.deleted, 1);
4218            assert_eq!(out8.skipped, 1);
4219            assert_eq!(out8.failed(), 1);
4220        }
4221
4222        #[test]
4223        fn a_systemic_disk_full_aborts_promptly() {
4224            let count = 8;
4225            let concurrency = 2;
4226            let mut scripted = ScriptedHttp::new().with_auth();
4227            let mut actions = Vec::new();
4228            let mut desireds = Vec::new();
4229            for i in 0..count {
4230                let id = format!("d{i}");
4231                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
4232                let (_c, d, action) = download(&id, AudioFormat::Mp3);
4233                actions.push(action);
4234                desireds.push(d);
4235            }
4236            // The very first clip's write hits ENOSPC, a systemic failure.
4237            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
4238            let http = GatedHttp::new(scripted);
4239            let plan = Plan { actions };
4240            let mut manifest = Manifest::new();
4241
4242            let outcome = run_gated_fs(
4243                &plan,
4244                &mut manifest,
4245                &desireds,
4246                &http,
4247                &fs,
4248                &opts_with(concurrency),
4249            );
4250
4251            assert_eq!(outcome.status, RunStatus::DiskFull);
4252            assert!(
4253                outcome.downloaded < count,
4254                "a systemic abort must stop remaining work, downloaded {}",
4255                outcome.downloaded
4256            );
4257        }
4258
4259        #[test]
4260        fn limiter_records_a_rate_limit_under_concurrent_calls() {
4261            // Three concurrent FLAC renders; exactly one clip is throttled once
4262            // on its wav_file read. The shared limiter must record that single
4263            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
4264            // the mutex keeps the AIMD state correct under concurrency.
4265            let scripted = ScriptedHttp::new()
4266                .with_auth()
4267                .route_seq(
4268                    "/gen/x/wav_file/",
4269                    vec![
4270                        Reply::status(429),
4271                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
4272                    ],
4273                )
4274                .route(
4275                    "/gen/y/wav_file/",
4276                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
4277                )
4278                .route(
4279                    "/gen/z/wav_file/",
4280                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
4281                )
4282                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
4283                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
4284                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
4285
4286            let mut actions = Vec::new();
4287            let mut desireds = Vec::new();
4288            for id in ["x", "y", "z"] {
4289                let (_c, d, action) = download(id, AudioFormat::Flac);
4290                actions.push(action);
4291                desireds.push(d);
4292            }
4293            let plan = Plan { actions };
4294            let fs = MemFs::new();
4295            let ffmpeg = StubFfmpeg::flac();
4296            let clock = RecordingClock::new();
4297            let mut albums = BTreeMap::new();
4298            let mut playlists = BTreeMap::new();
4299            let mut manifest = Manifest::new();
4300            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
4301
4302            let outcome = pollster::block_on(execute(
4303                &plan,
4304                &mut manifest,
4305                &mut albums,
4306                &mut playlists,
4307                &desireds,
4308                Ports {
4309                    client: &mut client,
4310                    http: &scripted,
4311                    fs: &fs,
4312                    ffmpeg: &ffmpeg,
4313                    clock: &clock,
4314                },
4315                &opts_with(3),
4316            ));
4317
4318            assert_eq!(outcome.downloaded, 3);
4319            assert_eq!(outcome.failed(), 0);
4320            assert!(
4321                (client.limiter_rate() - 1.0).abs() < 1e-9,
4322                "one 429 must halve the rate to 1.0, got {}",
4323                client.limiter_rate()
4324            );
4325        }
4326
4327        #[test]
4328        fn a_download_is_committed_in_plan_order_around_a_rename() {
4329            // Plan order: rename "orig" away from shared.mp3 first, then download
4330            // a new clip into shared.mp3. A parallel executor that performed the
4331            // download's destination write off plan order would write shared.mp3
4332            // before the rename ran, letting the rename carry those fresh bytes
4333            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
4334            // Committing every destination effect serially in plan order keeps
4335            // moved.mp3 = the original and shared.mp3 = the new download.
4336            let c_new = clip("new");
4337            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
4338            d_new.path = "shared.mp3".to_owned();
4339            let plan = Plan {
4340                actions: vec![
4341                    Action::Rename {
4342                        from: "shared.mp3".to_owned(),
4343                        to: "moved.mp3".to_owned(),
4344                    },
4345                    Action::Download {
4346                        clip: c_new.clone(),
4347                        lineage: LineageContext::own_root(&c_new),
4348                        path: "shared.mp3".to_owned(),
4349                        format: AudioFormat::Mp3,
4350                    },
4351                ],
4352            };
4353            let scripted = ScriptedHttp::new()
4354                .with_auth()
4355                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
4356            let http = GatedHttp::new(scripted);
4357            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
4358            let mut manifest = Manifest::new();
4359            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
4360
4361            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
4362
4363            assert_eq!(outcome.renamed, 1);
4364            assert_eq!(outcome.downloaded, 1);
4365            assert_eq!(
4366                fs.read_file("moved.mp3").as_deref(),
4367                Some(&b"ORIGINAL"[..]),
4368                "the rename must carry the original bytes, untouched by the download"
4369            );
4370            let landed = fs.read_file("shared.mp3").expect("new download must land");
4371            assert_ne!(
4372                landed, b"ORIGINAL",
4373                "the new download must replace the moved original, not corrupt it"
4374            );
4375            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
4376            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
4377        }
4378
4379        #[test]
4380        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
4381            // A systemic disk-full abort strikes the download committed before the
4382            // reformat. Because the reformat's slow render is side-effect-free and
4383            // its destination write + old-file removal only happen in the serial
4384            // commit (which the abort skips), the old file survives and the
4385            // manifest still points at it: no removed-but-referenced file.
4386            let boom = clip("boom");
4387            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
4388            d_boom.path = "boom.mp3".to_owned();
4389            let reformer = clip("r");
4390            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
4391            let plan = Plan {
4392                actions: vec![
4393                    Action::Download {
4394                        clip: boom.clone(),
4395                        lineage: LineageContext::own_root(&boom),
4396                        path: "boom.mp3".to_owned(),
4397                        format: AudioFormat::Mp3,
4398                    },
4399                    Action::Reformat {
4400                        clip: reformer.clone(),
4401                        path: "r_new.mp3".to_owned(),
4402                        from_path: "r_old.flac".to_owned(),
4403                        from: AudioFormat::Flac,
4404                        to: AudioFormat::Mp3,
4405                    },
4406                ],
4407            };
4408            let scripted = ScriptedHttp::new()
4409                .with_auth()
4410                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
4411                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
4412            let http = GatedHttp::new(scripted);
4413            // The download's write hits ENOSPC, a systemic abort.
4414            let fs = MemFs::new()
4415                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
4416                .fail_write_out_of_space("boom.mp3");
4417            let mut manifest = Manifest::new();
4418            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
4419
4420            let outcome = run_gated_fs(
4421                &plan,
4422                &mut manifest,
4423                &[d_boom, d_reformer],
4424                &http,
4425                &fs,
4426                &opts_with(4),
4427            );
4428
4429            assert_eq!(outcome.status, RunStatus::DiskFull);
4430            assert!(
4431                fs.exists("r_old.flac"),
4432                "the old file must survive the abort"
4433            );
4434            assert!(
4435                !fs.exists("r_new.mp3"),
4436                "no reformatted file may be written"
4437            );
4438            let still = manifest.get("r").expect("the manifest must still track r");
4439            assert_eq!(
4440                still.path, "r_old.flac",
4441                "the manifest must still point at the surviving old file"
4442            );
4443            assert_eq!(still.format, AudioFormat::Flac);
4444        }
4445
4446        #[test]
4447        fn a_systemic_abort_leaves_no_untracked_destination_files() {
4448            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
4449            // and the rest never commit. Every file remaining on disk must be one
4450            // the manifest tracks: producers write nothing, so an abort cannot
4451            // strand an untracked file from an in-flight or buffered render.
4452            let mut scripted = ScriptedHttp::new().with_auth();
4453            let mut actions = Vec::new();
4454            let mut desireds = Vec::new();
4455            for id in ["a0", "a1", "boom", "a3", "a4"] {
4456                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
4457                let (_c, d, action) = download(id, AudioFormat::Mp3);
4458                actions.push(action);
4459                desireds.push(d);
4460            }
4461            let http = GatedHttp::new(scripted);
4462            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
4463            let plan = Plan { actions };
4464            let mut manifest = Manifest::new();
4465
4466            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
4467
4468            assert_eq!(outcome.status, RunStatus::DiskFull);
4469            let tracked: std::collections::BTreeSet<String> = manifest
4470                .entries
4471                .values()
4472                .map(|entry| entry.path.clone())
4473                .collect();
4474            for path in fs.paths() {
4475                assert!(
4476                    tracked.contains(&path),
4477                    "found an untracked destination file: {path}"
4478                );
4479            }
4480            assert!(
4481                !fs.exists("a3.mp3"),
4482                "uncommitted renders must not be on disk"
4483            );
4484            assert!(
4485                !fs.exists("a4.mp3"),
4486                "uncommitted renders must not be on disk"
4487            );
4488        }
4489
4490        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
4491        /// live: it bumps a shared counter (tracking the peak) when a transcode
4492        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
4493        /// The [transcode, write] window is a superset of the true in-memory hold,
4494        /// so the observed peak upper-bounds the real one.
4495        struct CountingFfmpeg {
4496            inner: StubFfmpeg,
4497            held: Arc<AtomicUsize>,
4498            peak: Arc<AtomicUsize>,
4499        }
4500
4501        impl Ffmpeg for CountingFfmpeg {
4502            fn wav_to_flac(
4503                &self,
4504                wav: &[u8],
4505            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
4506                let fut = self.inner.wav_to_flac(wav);
4507                let held = self.held.clone();
4508                let peak = self.peak.clone();
4509                async move {
4510                    let out = fut.await;
4511                    if out.is_ok() {
4512                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
4513                        peak.fetch_max(now, Ordering::SeqCst);
4514                    }
4515                    out
4516                }
4517            }
4518
4519            fn mp4_to_webp(
4520                &self,
4521                mp4: &[u8],
4522                settings: WebpEncodeSettings,
4523            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
4524                self.inner.mp4_to_webp(mp4, settings)
4525            }
4526        }
4527
4528        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
4529        /// payload counter on each committing write, closing the window opened by
4530        /// [`CountingFfmpeg`].
4531        struct CountingFs {
4532            inner: MemFs,
4533            held: Arc<AtomicUsize>,
4534        }
4535
4536        impl Filesystem for CountingFs {
4537            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
4538                let out = self.inner.write_atomic(path, bytes);
4539                self.held.fetch_sub(1, Ordering::SeqCst);
4540                out
4541            }
4542
4543            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
4544                self.inner.rename(from, to)
4545            }
4546
4547            fn remove(&self, path: &str) -> Result<(), FsError> {
4548                self.inner.remove(path)
4549            }
4550
4551            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
4552                self.inner.prune_empty_dirs(root)
4553            }
4554
4555            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
4556                self.inner.read(path)
4557            }
4558
4559            fn metadata(&self, path: &str) -> Option<FileStat> {
4560                self.inner.metadata(path)
4561            }
4562        }
4563
4564        #[test]
4565        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
4566            // Far more FLAC clips than the concurrency bound. The ordered buffered
4567            // render keeps at most about `concurrency` transcoded payloads live at
4568            // once (never the whole library), so peak held <= concurrency + 1.
4569            let count = 12;
4570            let concurrency = 3;
4571            let mut scripted = ScriptedHttp::new().with_auth();
4572            let mut actions = Vec::new();
4573            let mut desireds = Vec::new();
4574            for i in 0..count {
4575                let id = format!("f{i}");
4576                scripted = scripted
4577                    .route(
4578                        &format!("/gen/{id}/wav_file/"),
4579                        Reply::json(&format!(
4580                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
4581                        )),
4582                    )
4583                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
4584                let (_c, d, action) = download(&id, AudioFormat::Flac);
4585                actions.push(action);
4586                desireds.push(d);
4587            }
4588            let http = GatedHttp::new(scripted);
4589            let held = Arc::new(AtomicUsize::new(0));
4590            let peak = Arc::new(AtomicUsize::new(0));
4591            let ffmpeg = CountingFfmpeg {
4592                inner: StubFfmpeg::flac(),
4593                held: held.clone(),
4594                peak: peak.clone(),
4595            };
4596            let fs = CountingFs {
4597                inner: MemFs::new(),
4598                held: held.clone(),
4599            };
4600            let clock = RecordingClock::new();
4601            let mut albums = BTreeMap::new();
4602            let mut playlists = BTreeMap::new();
4603            let mut manifest = Manifest::new();
4604            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
4605            let plan = Plan { actions };
4606
4607            let outcome = pollster::block_on(execute(
4608                &plan,
4609                &mut manifest,
4610                &mut albums,
4611                &mut playlists,
4612                &desireds,
4613                Ports {
4614                    client: &mut client,
4615                    http: &http,
4616                    fs: &fs,
4617                    ffmpeg: &ffmpeg,
4618                    clock: &clock,
4619                },
4620                &opts_with(concurrency),
4621            ));
4622
4623            assert_eq!(outcome.downloaded, count as usize);
4624            assert_eq!(
4625                held.load(Ordering::SeqCst),
4626                0,
4627                "every payload must be committed"
4628            );
4629            assert!(
4630                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
4631                "peak live payloads {} exceeded the bound {}",
4632                peak.load(Ordering::SeqCst),
4633                concurrency + 1
4634            );
4635            assert!(
4636                peak.load(Ordering::SeqCst) >= 2,
4637                "the render should genuinely overlap, peak was {}",
4638                peak.load(Ordering::SeqCst)
4639            );
4640        }
4641    }
4642}