Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::time::Duration;
36
37use futures_util::lock::Mutex as AsyncMutex;
38use futures_util::stream::{self, StreamExt};
39
40use crate::backoff::{backoff_delay, retry_after};
41use crate::client::SunoClient;
42use crate::clock::Clock;
43use crate::config::AudioFormat;
44use crate::error::Error;
45use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
46use crate::fs::Filesystem;
47use crate::graph::{AlbumArt, PlaylistState};
48use crate::http::{Http, HttpRequest};
49use crate::lineage::LineageContext;
50use crate::lyrics::AlignedLyrics;
51use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
52use crate::model::Clip;
53use crate::reconcile::{Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact};
54use crate::tag::{TrackMetadata, tag_flac, tag_mp3};
55
56/// The shared Suno client behind an async mutex, so concurrent audio work can
57/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
58/// without a runtime-specific lock. Held only for the brief WAV-render calls;
59/// the heavy CDN/transcode/tag work runs unlocked.
60type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
61
62/// Tunables for one [`execute`] run.
63#[derive(Debug, Clone)]
64pub struct ExecOptions {
65    /// How many times a transient failure is retried before record-and-skip.
66    pub max_retries: u32,
67    /// How many times to poll for a server-side WAV render before giving up.
68    pub wav_poll_attempts: u32,
69    /// How long to wait between WAV render polls.
70    pub wav_poll_interval: Duration,
71    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
72    /// to at least one, so a zero collapses to sequential rather than stalling.
73    pub concurrency: u32,
74}
75
76impl Default for ExecOptions {
77    fn default() -> Self {
78        Self {
79            max_retries: 3,
80            wav_poll_attempts: 24,
81            wav_poll_interval: Duration::from_secs(5),
82            concurrency: 4,
83        }
84    }
85}
86
87/// How an [`execute`] run ended.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
89pub enum RunStatus {
90    /// Every action was attempted; some may have failed and been skipped.
91    #[default]
92    Completed,
93    /// An auth failure stopped the run early; remaining actions were not tried.
94    AuthAborted,
95    /// The disk filled; the run stopped early rather than failing every
96    /// remaining clip. Remaining actions were not tried.
97    DiskFull,
98}
99
100/// One action that could not be applied, for the run summary and failure log.
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub struct Failure {
103    /// The clip the failed action concerned (or a path when no id applies).
104    pub clip_id: String,
105    /// A short, secret-free reason.
106    pub reason: String,
107}
108
109/// The result of applying a [`Plan`]: per-action counts and the failure list.
110#[derive(Debug, Clone, Default, PartialEq, Eq)]
111pub struct ExecOutcome {
112    pub downloaded: usize,
113    pub reformatted: usize,
114    pub retagged: usize,
115    pub renamed: usize,
116    pub deleted: usize,
117    pub skipped: usize,
118    pub artifacts_written: usize,
119    pub artifacts_deleted: usize,
120    /// Actions that failed and were skipped (auth, transient-exhausted, or
121    /// permanent). The run continued past each one unless it was an auth or
122    /// disk-full abort.
123    pub failures: Vec<Failure>,
124    /// How the run ended.
125    pub status: RunStatus,
126}
127
128impl ExecOutcome {
129    /// Number of failed actions.
130    pub fn failed(&self) -> usize {
131        self.failures.len()
132    }
133
134    fn record(&mut self, effect: Effect) {
135        match effect {
136            Effect::Downloaded => self.downloaded += 1,
137            Effect::Reformatted => self.reformatted += 1,
138            Effect::Retagged => self.retagged += 1,
139            Effect::Renamed => self.renamed += 1,
140            Effect::Deleted => self.deleted += 1,
141            Effect::Skipped => self.skipped += 1,
142            Effect::ArtifactWritten => self.artifacts_written += 1,
143            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
144        }
145    }
146}
147
148/// The IO ports the executor drives, grouped so one value threads them through.
149///
150/// `client` is the only `&mut` port: it performs the authenticated WAV render
151/// flow and so mutates its cached session. The rest are shared references.
152pub struct Ports<'a, H, F, G, C> {
153    /// Performs the authenticated WAV render and poll flow.
154    pub client: &'a mut SunoClient<C>,
155    /// The public network port (CDN audio, rendered WAV, cover art).
156    pub http: &'a H,
157    /// The disk port.
158    pub fs: &'a F,
159    /// The transcode port (WAV to FLAC).
160    pub ffmpeg: &'a G,
161    /// The backoff and poll delay port.
162    pub clock: &'a C,
163}
164
165/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
166/// the outcome.
167///
168/// `desired` carries the per-clip metadata and art hashes plus the source modes
169/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
170/// by clip id (and by target path, for renames) so each written entry records
171/// the right hashes and protection. `albums` is the album-art store, keyed by
172/// stable root id: folder-art writes and deletes record their state there rather
173/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
174/// the network, disk, transcode, and backoff ports. A single clip's failure
175/// never aborts the run, except an auth failure or a full disk, which stop it
176/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
177///
178/// The audio-producing actions ([`Download`](Action::Download) and
179/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
180/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
181/// transcode, tag) overlap while the order-sensitive Suno API calls are
182/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
183/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
184/// rename, delete, and artifact writes/deletes) then run serially in plan order.
185///
186/// The outcome is deterministic regardless of completion order: concurrent audio
187/// results are committed to the manifest in plan-index order, so the same plan
188/// always yields the same manifest and counts whatever the concurrency level. A
189/// per-clip failure is recorded and the run continues; only an auth failure or a
190/// full disk aborts, and it does so promptly by stopping further audio work.
191///
192/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
193/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
194/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
195/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
196/// run with the feature off) is tagged exactly as before. The synced `.lrc`
197/// sidecar itself is a generated artifact whose body the caller has already
198/// resolved into the plan, so it is written like any other text sidecar.
199#[allow(clippy::too_many_arguments)]
200pub async fn execute<H, F, G, C>(
201    plan: &Plan,
202    manifest: &mut Manifest,
203    albums: &mut BTreeMap<String, AlbumArt>,
204    playlists: &mut BTreeMap<String, PlaylistState>,
205    desired: &[Desired],
206    synced: &HashMap<String, AlignedLyrics>,
207    ports: Ports<'_, H, F, G, C>,
208    opts: &ExecOptions,
209) -> ExecOutcome
210where
211    H: Http,
212    F: Filesystem,
213    G: Ffmpeg,
214    C: Clock,
215{
216    let Ports {
217        client,
218        http,
219        fs,
220        ffmpeg,
221        clock,
222    } = ports;
223    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
224    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
225    // Every path this run writes, so the inline old-sidecar cleanup never removes
226    // a file another action produces this run (the non-planned twin of
227    // `suppress_path_aliasing`).
228    let write_targets: BTreeSet<String> = plan
229        .actions
230        .iter()
231        .filter_map(|a| match a {
232            Action::Download { path, .. }
233            | Action::Reformat { path, .. }
234            | Action::WriteArtifact { path, .. } => Some(path.clone()),
235            Action::Rename { to, .. } => Some(to.clone()),
236            _ => None,
237        })
238        .collect();
239    // How many tracked artifact slots reference each path. The inline old-path
240    // cleanup removes a path only once nothing else holds it: each slot that
241    // moves away decrements its reference, and the removal fires only when the
242    // count reaches zero and no action writes the path this run. This keeps a
243    // live file a co-referencing slot still owns (a prior failed swap can leave
244    // two clips sharing a path) while letting the last slot to leave reclaim it,
245    // so nothing is orphaned either (#76).
246    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
247    for (_, entry) in manifest.iter() {
248        for path in entry.artifact_paths() {
249            *tracked_paths.entry(path.to_owned()).or_default() += 1;
250        }
251    }
252    for art in albums.values() {
253        for state in [art.folder_jpg.as_ref(), art.folder_webp.as_ref()]
254            .into_iter()
255            .flatten()
256        {
257            *tracked_paths.entry(state.path.clone()).or_default() += 1;
258        }
259    }
260    for playlist in playlists.values() {
261        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
262    }
263    let ctx = Ctx {
264        http,
265        fs,
266        ffmpeg,
267        clock,
268        opts,
269        by_id: &by_id,
270        by_path: &by_path,
271        synced,
272        write_targets: &write_targets,
273    };
274
275    let mut outcome = ExecOutcome::default();
276
277    // The audio-producing actions ([`Download`](Action::Download) /
278    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
279    // deliberately split so that NO destination write, file removal, or manifest
280    // update happens off the plan's order:
281    //
282    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
283    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
284    //   tag), returning the tagged bytes; and
285    // - a single serial committer below writes those bytes to the destination,
286    //   removes any superseded file, and records the manifest entry, in strict
287    //   plan-index order, interleaved with the non-audio actions.
288    //
289    // The shared client is the only `&mut` port and its API calls must stay
290    // ordered, so it rides behind an async mutex; each producer locks it only for
291    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
292    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
293    // so at most about `concurrency` tagged payloads are ever held in memory -
294    // never the whole library.
295    let client_lock = AsyncMutex::new(client);
296    let concurrency = opts.concurrency.max(1) as usize;
297    let ctx_ref = &ctx;
298    let client_lock_ref = &client_lock;
299    let mut renders = stream::iter(
300        plan.actions
301            .iter()
302            .filter(|action| is_audio_action(action))
303            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
304    )
305    .buffered(concurrency);
306
307    for action in &plan.actions {
308        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
309        // commit them here; every other action applies its own effect. Both the
310        // audio commit and the non-audio apply run serially, so all destination
311        // and manifest effects keep the plan's order exactly as the sequential
312        // executor did.
313        let result = if is_audio_action(action) {
314            match renders.next().await {
315                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
316                Some(Err(fail)) => Err(fail),
317                None => unreachable!("buffered yields one result per audio action"),
318            }
319        } else {
320            ctx.apply(action, manifest, albums, playlists, &mut tracked_paths)
321                .await
322        };
323        match result {
324            Ok(effect) => outcome.record(effect),
325            Err(fail) => {
326                let abort = abort_status(fail.class);
327                outcome.failures.push(Failure {
328                    clip_id: fail.clip_id,
329                    reason: fail.reason,
330                });
331                if let Some(status) = abort {
332                    // A systemic abort stops the run. Dropping the render stream
333                    // cancels any in-flight or completed-but-uncommitted producer;
334                    // because producers touch nothing on disk, the destination and
335                    // manifest are left exactly as the committed prefix wrote them,
336                    // with no untracked files and no removed-but-referenced file.
337                    outcome.status = status;
338                    break;
339                }
340            }
341        }
342    }
343    drop(renders);
344
345    // Renames and deletes can leave an album directory empty; prune those ghost
346    // directories bottom-up. This runs on both the completed and the aborted
347    // paths, and is best-effort: a prune failure is only a missed tidy that the
348    // next run repeats, never a reason to fail the run.
349    let _ = fs.prune_empty_dirs("");
350    outcome
351}
352
353/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
354/// file. Its slow render runs in the concurrent phase; its destination write and
355/// manifest update are committed serially in plan order. Everything else touches
356/// the manifest, album, or playlist stores directly and runs serially.
357fn is_audio_action(action: &Action) -> bool {
358    matches!(action, Action::Download { .. } | Action::Reformat { .. })
359}
360
361/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
362/// committer needs to place them. Produced concurrently and side-effect-free (no
363/// destination write, no removal, no manifest touch); [`commit_audio`] applies
364/// all of those in plan order.
365struct RenderedAudio {
366    clip_id: String,
367    path: String,
368    format: AudioFormat,
369    /// The superseded file to remove after the new one lands (a [`Reformat`]),
370    /// or `None` for a plain [`Download`].
371    from_path: Option<String>,
372    effect: Effect,
373    bytes: Vec<u8>,
374}
375
376/// What an applied action did, for the outcome counters.
377enum Effect {
378    Downloaded,
379    Reformatted,
380    Retagged,
381    Renamed,
382    Deleted,
383    Skipped,
384    ArtifactWritten,
385    ArtifactDeleted,
386}
387
388/// How a failure should be handled (SYNC-17).
389#[derive(Debug, Clone, Copy)]
390enum Class {
391    /// Stop the account run; do not retry.
392    Auth,
393    /// Stop the account run: a full disk is systemic, like auth, so aborting
394    /// beats skipping every remaining clip (each of which would first burn a
395    /// server-side WAV-render budget before failing the same way).
396    Disk,
397    /// Retry a bounded number of times, then record and skip.
398    Transient,
399    /// Record and skip immediately.
400    Permanent,
401}
402
403/// A classified action failure attributed to a clip.
404struct Fail {
405    class: Class,
406    clip_id: String,
407    reason: String,
408}
409
410/// The run-ending status for a failure class, or `None` when the failure is
411/// per-clip and the run continues.
412fn abort_status(class: Class) -> Option<RunStatus> {
413    match class {
414        Class::Auth => Some(RunStatus::AuthAborted),
415        Class::Disk => Some(RunStatus::DiskFull),
416        Class::Transient | Class::Permanent => None,
417    }
418}
419
420fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
421    Fail {
422        class: Class::Auth,
423        clip_id: clip_id.into(),
424        reason: reason.into(),
425    }
426}
427
428fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
429    Fail {
430        class: Class::Transient,
431        clip_id: clip_id.into(),
432        reason: reason.into(),
433    }
434}
435
436fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
437    Fail {
438        class: Class::Permanent,
439        clip_id: clip_id.into(),
440        reason: reason.into(),
441    }
442}
443
444fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
445    Fail {
446        class: Class::Disk,
447        clip_id: clip_id.into(),
448        reason: reason.into(),
449    }
450}
451
452/// Whether an artifact kind is album-scoped folder art (owned by a root id and
453/// recorded on the album store) rather than a per-clip sidecar (recorded on the
454/// manifest).
455fn is_album_kind(kind: ArtifactKind) -> bool {
456    matches!(kind, ArtifactKind::FolderJpg | ArtifactKind::FolderWebp)
457}
458
459/// True for the library-scoped playlist artifact, routed to the playlist store.
460fn is_playlist_kind(kind: ArtifactKind) -> bool {
461    matches!(kind, ArtifactKind::Playlist)
462}
463
464/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
465/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
466/// root/playlist id that is deliberately absent from the manifest.
467fn is_per_clip_kind(kind: ArtifactKind) -> bool {
468    matches!(
469        kind,
470        ArtifactKind::CoverJpg
471            | ArtifactKind::CoverWebp
472            | ArtifactKind::DetailsTxt
473            | ArtifactKind::LyricsTxt
474            | ArtifactKind::Lrc
475            | ArtifactKind::VideoMp4
476    )
477}
478
479/// Recover a playlist's display name from its `.m3u8` path's file stem.
480///
481/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
482/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
483/// this recovered name is a convenience for humans and its lossiness (the
484/// sanitiser is not reversible) never affects a decision.
485fn playlist_name_from_path(path: &str) -> String {
486    std::path::Path::new(path)
487        .file_stem()
488        .map(|stem| stem.to_string_lossy().into_owned())
489        .unwrap_or_default()
490}
491
492/// A classified fetch failure, not yet attributed to a clip.
493struct FetchError {
494    class: Class,
495    reason: String,
496    retry_after: Option<Duration>,
497}
498
499impl FetchError {
500    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
501        Self {
502            class: Class::Transient,
503            reason: reason.into(),
504            retry_after,
505        }
506    }
507
508    fn permanent(reason: impl Into<String>) -> Self {
509        Self {
510            class: Class::Permanent,
511            reason: reason.into(),
512            retry_after: None,
513        }
514    }
515
516    fn attribute(self, clip_id: &str) -> Fail {
517        Fail {
518            class: self.class,
519            clip_id: clip_id.to_owned(),
520            reason: self.reason,
521        }
522    }
523}
524
525/// The shared, read-only context threaded through every action handler.
526struct Ctx<'a, H, F, G, C> {
527    http: &'a H,
528    fs: &'a F,
529    ffmpeg: &'a G,
530    clock: &'a C,
531    opts: &'a ExecOptions,
532    by_id: &'a HashMap<&'a str, &'a Desired>,
533    by_path: &'a HashMap<&'a str, &'a Desired>,
534    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
535    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
536    /// lyric text; a clip absent here is tagged exactly as before. Populated by
537    /// the caller (the fetch is IO), so the engine stays free of direct IO.
538    synced: &'a HashMap<String, AlignedLyrics>,
539    /// Every destination path this run writes (audio downloads and reformats,
540    /// artifact writes, and rename targets). The inline old-sidecar cleanup in
541    /// [`write_artifact`](Ctx::write_artifact) skips any path in this set, so a
542    /// path swap between two clips can never delete a file the same run just
543    /// wrote. This mirrors [`suppress_path_aliasing`] for the one removal that
544    /// is not itself a planned action.
545    write_targets: &'a BTreeSet<String>,
546}
547
548impl<H, F, G, C> Ctx<'_, H, F, G, C>
549where
550    H: Http,
551    F: Filesystem,
552    G: Ffmpeg,
553    C: Clock,
554{
555    /// Apply one non-audio action, returning what it did or why it failed.
556    ///
557    /// Audio actions ([`Download`](Action::Download) /
558    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
559    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
560    async fn apply(
561        &self,
562        action: &Action,
563        manifest: &mut Manifest,
564        albums: &mut BTreeMap<String, AlbumArt>,
565        playlists: &mut BTreeMap<String, PlaylistState>,
566        tracked_paths: &mut HashMap<String, u32>,
567    ) -> Result<Effect, Fail> {
568        match action {
569            Action::Download { .. } | Action::Reformat { .. } => {
570                unreachable!("audio actions are applied in the concurrent phase")
571            }
572            Action::Retag {
573                clip,
574                lineage,
575                path,
576            } => self.retag(manifest, clip, lineage, path).await,
577            Action::Rename { from, to } => self.rename(manifest, from, to),
578            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
579            Action::Skip { clip_id } => {
580                self.refresh_preserve(manifest, clip_id);
581                Ok(Effect::Skipped)
582            }
583            Action::WriteArtifact {
584                kind,
585                path,
586                source_url,
587                hash,
588                owner_id,
589                content,
590            } => {
591                self.write_artifact(
592                    manifest,
593                    albums,
594                    playlists,
595                    *kind,
596                    path,
597                    source_url,
598                    hash,
599                    owner_id,
600                    content.as_deref(),
601                    tracked_paths,
602                )
603                .await
604            }
605            Action::DeleteArtifact {
606                kind,
607                path,
608                owner_id,
609            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
610        }
611    }
612
613    /// Render one audio action's tagged bytes, side-effect-free.
614    ///
615    /// This is the concurrent part: it fetches, transcodes, and tags the file
616    /// (through shared ports, plus the client behind `client_lock`), then returns
617    /// the bytes and where they must go. It deliberately writes nothing, removes
618    /// nothing, and never touches `manifest`, so many run at once and an aborted
619    /// run can drop them with no destination or manifest effect. The serial
620    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
621    async fn prepare_audio(
622        &self,
623        client_lock: &ClientLock<'_, C>,
624        action: &Action,
625    ) -> Result<RenderedAudio, Fail> {
626        match action {
627            Action::Download {
628                clip,
629                lineage,
630                path,
631                format,
632            } => {
633                let bytes = self
634                    .produce_audio(client_lock, clip, lineage, *format)
635                    .await?;
636                Ok(RenderedAudio {
637                    clip_id: clip.id.clone(),
638                    path: path.clone(),
639                    format: *format,
640                    from_path: None,
641                    effect: Effect::Downloaded,
642                    bytes,
643                })
644            }
645            Action::Reformat {
646                clip,
647                path,
648                from_path,
649                from: _,
650                to,
651            } => {
652                // A Reformat action carries no lineage, so recover it from the
653                // desired set (the same context that drove naming and the hash),
654                // falling back to a self-rooted context when the clip is not in
655                // the current selection.
656                let lineage = self
657                    .by_id
658                    .get(clip.id.as_str())
659                    .map(|d| d.lineage.clone())
660                    .unwrap_or_else(|| LineageContext::own_root(clip));
661                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
662                Ok(RenderedAudio {
663                    clip_id: clip.id.clone(),
664                    path: path.clone(),
665                    format: *to,
666                    from_path: Some(from_path.clone()),
667                    effect: Effect::Reformatted,
668                    bytes,
669                })
670            }
671            _ => unreachable!("prepare_audio only handles audio actions"),
672        }
673    }
674
675    /// Commit one rendered audio result serially, in plan order.
676    ///
677    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
678    /// the superseded file, then records the manifest entry. Ordering the write
679    /// before the removal keeps a crash from losing both copies; keeping all of
680    /// this off the concurrent phase preserves the sequential executor's plan-order
681    /// guarantee for every destination and manifest effect.
682    fn commit_audio(
683        &self,
684        manifest: &mut Manifest,
685        rendered: RenderedAudio,
686    ) -> Result<Effect, Fail> {
687        let RenderedAudio {
688            clip_id,
689            path,
690            format,
691            from_path,
692            effect,
693            bytes,
694        } = rendered;
695        let size = self.write_verify(&clip_id, &path, &bytes)?;
696        if let Some(from) = from_path {
697            // The new file is safely in place; only now drop the old rendering.
698            self.fs.remove(&from).map_err(|err| {
699                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
700            })?;
701        }
702        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
703        Ok(effect)
704    }
705
706    /// Re-tag the existing file in place to match current metadata and art.
707    async fn retag(
708        &self,
709        manifest: &mut Manifest,
710        clip: &Clip,
711        lineage: &LineageContext,
712        path: &str,
713    ) -> Result<Effect, Fail> {
714        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
715            return Err(permanent_fail(
716                &clip.id,
717                "retag target missing from manifest",
718            ));
719        };
720
721        if format == AudioFormat::Wav {
722            // WAV carries no embedded tags; just record the new hashes so the
723            // next run sees them as current and stops retagging.
724            self.refresh_hashes(manifest, &clip.id, None);
725            return Ok(Effect::Retagged);
726        }
727
728        let (meta, synced) = self.track_meta(clip, lineage);
729        let cover = self.fetch_cover(clip).await;
730        let existing = self
731            .fs
732            .read(path)
733            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
734        let tagged = match format {
735            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
736            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
737            AudioFormat::Wav => unreachable!("WAV handled above"),
738        }
739        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
740        let size = self.write_verify(&clip.id, path, &tagged)?;
741        self.refresh_hashes(manifest, &clip.id, Some(size));
742        Ok(Effect::Retagged)
743    }
744
745    /// Move the file and update the entry's path (and protection).
746    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
747        let label = self
748            .by_path
749            .get(to)
750            .map(|d| d.clip.id.clone())
751            .unwrap_or_else(|| to.to_owned());
752        self.fs.rename(from, to).map_err(|err| {
753            if err.is_out_of_space() {
754                disk_fail(label, "disk full: no space left to rename")
755            } else {
756                permanent_fail(label, format!("rename failed: {err}"))
757            }
758        })?;
759
760        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
761            manifest
762                .entries
763                .iter()
764                .find(|(_, entry)| entry.path == from)
765                .map(|(id, _)| id.clone())
766        });
767        if let Some(id) = clip_id
768            && let Some(entry) = manifest.entries.get_mut(&id)
769        {
770            entry.path = to.to_owned();
771            if let Some(d) = self.by_path.get(to) {
772                entry.preserve = preserve_for(d);
773            }
774        }
775        Ok(Effect::Renamed)
776    }
777
778    /// Remove the file and drop the manifest entry.
779    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
780        self.fs
781            .remove(path)
782            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
783        manifest.remove(clip_id);
784        Ok(Effect::Deleted)
785    }
786
787    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
788    /// on the owning manifest entry.
789    ///
790    /// The fetch and write share the audio path's resilience: `fetch_bytes`
791    /// retries transient failures and verifies `Content-Length`, and
792    /// `write_verify` confirms the on-disk size. A failure is attributed to the
793    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
794    /// aborts the whole run (only an auth failure or a full disk does, matching
795    /// audio).
796    ///
797    /// The bytes written depend on the kind: a static cover is the fetched image
798    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
799    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
800    ///
801    /// A sidecar is only ever written for a clip whose audio is present: a
802    /// successful `Download`/`Reformat` creates the manifest entry earlier in
803    /// this run, and a prior-run clip already has one. So an absent owning entry
804    /// means the audio failed or never existed this run; we skip (no fetch, no
805    /// write) rather than strand an untracked sidecar with no owning audio.
806    ///
807    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg) /
808    /// [`FolderWebp`](ArtifactKind::FolderWebp)) is album-scoped: its `owner_id`
809    /// is the album's stable root id, not a manifest clip, so it skips the
810    /// manifest presence guard and records its state on the album store instead.
811    ///
812    /// When a title or album change moves the audio, reconcile re-emits this
813    /// write at the NEW path; this handler then removes the sidecar left at the
814    /// artifact's previously tracked path, moving it rather than orphaning it.
815    /// The removal happens only after the new file is safely written, and a
816    /// remove failure returns before the state slot advances, so the next run
817    /// re-plans the identical write and retries — self-healing, never an orphan.
818    #[allow(clippy::too_many_arguments)]
819    async fn write_artifact(
820        &self,
821        manifest: &mut Manifest,
822        albums: &mut BTreeMap<String, AlbumArt>,
823        playlists: &mut BTreeMap<String, PlaylistState>,
824        kind: ArtifactKind,
825        path: &str,
826        source_url: &str,
827        hash: &str,
828        owner_id: &str,
829        content: Option<&str>,
830        tracked_paths: &mut HashMap<String, u32>,
831    ) -> Result<Effect, Fail> {
832        // A per-song sidecar needs its owning clip's manifest entry; album and
833        // playlist kinds are keyed elsewhere and skip this guard.
834        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
835            return Ok(Effect::Skipped);
836        }
837        // Capture the path this artifact was last tracked at, BEFORE the slot is
838        // overwritten below, so a path-changing write (a title/album rename that
839        // moves the audio) can clean up the old sidecar it left behind. Cover
840        // kinds live on the manifest, folder kinds on the album store; playlists
841        // reconcile their own old-path delete and so opt out here.
842        let old_path = match kind {
843            ArtifactKind::CoverJpg => manifest
844                .get(owner_id)
845                .and_then(|e| e.cover_jpg.as_ref())
846                .map(|s| s.path.clone()),
847            ArtifactKind::CoverWebp => manifest
848                .get(owner_id)
849                .and_then(|e| e.cover_webp.as_ref())
850                .map(|s| s.path.clone()),
851            ArtifactKind::DetailsTxt => manifest
852                .get(owner_id)
853                .and_then(|e| e.details_txt.as_ref())
854                .map(|s| s.path.clone()),
855            ArtifactKind::LyricsTxt => manifest
856                .get(owner_id)
857                .and_then(|e| e.lyrics_txt.as_ref())
858                .map(|s| s.path.clone()),
859            ArtifactKind::Lrc => manifest
860                .get(owner_id)
861                .and_then(|e| e.lrc.as_ref())
862                .map(|s| s.path.clone()),
863            ArtifactKind::VideoMp4 => manifest
864                .get(owner_id)
865                .and_then(|e| e.video_mp4.as_ref())
866                .map(|s| s.path.clone()),
867            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp => albums
868                .get(owner_id)
869                .and_then(|a| a.artifact(kind))
870                .map(|s| s.path.clone()),
871            ArtifactKind::Playlist => None,
872        };
873        // A generated artifact (a playlist) carries its body inline and never
874        // touches the network; a fetched one pulls (and transcodes) its source.
875        let bytes = match content {
876            Some(text) => text.as_bytes().to_vec(),
877            None => self.artifact_bytes(kind, source_url, owner_id).await?,
878        };
879        self.write_verify(owner_id, path, &bytes)?;
880        // The new sidecar is safely in place; only now drop a stale copy left at
881        // the previous path (the audio moved). `remove` is idempotent, so an
882        // already-absent old file is fine. On a genuine remove failure we return
883        // BEFORE updating the slot, leaving the manifest/album pointing at the
884        // old path: the next run sees the same path drift, re-plans this write,
885        // and retries the cleanup — convergent, no orphan persists.
886        //
887        // The removal is gated so it can never delete a live file (#76). This
888        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
889        // file is removed only once nothing else holds it — no other tracked slot
890        // still references it (count now zero) and no action writes it this run
891        // (`write_targets`, the non-planned twin of `suppress_path_aliasing`).
892        // On a path swap (A: x -> y while B: y -> x) `write_targets` keeps each
893        // freshly written file; when two slots share a path after a prior failed
894        // swap, the first to move keeps it and the last to leave reclaims it, so
895        // a co-owned file is never deleted and a vacated one is never orphaned.
896        if let Some(old) = old_path.as_deref()
897            && !old.is_empty()
898            && old != path
899        {
900            let still_referenced = tracked_paths
901                .get_mut(old)
902                .map(|count| {
903                    *count = count.saturating_sub(1);
904                    *count > 0
905                })
906                .unwrap_or(false);
907            if !still_referenced && !self.write_targets.contains(old) {
908                self.fs.remove(old).map_err(|err| {
909                    permanent_fail(
910                        owner_id,
911                        format!("could not remove old sidecar {old}: {err}"),
912                    )
913                })?;
914            }
915        }
916        if is_album_kind(kind) {
917            albums.entry(owner_id.to_owned()).or_default().set(
918                kind,
919                Some(ArtifactState {
920                    path: path.to_owned(),
921                    hash: hash.to_owned(),
922                }),
923            );
924        } else if is_playlist_kind(kind) {
925            playlists.insert(
926                owner_id.to_owned(),
927                PlaylistState {
928                    name: playlist_name_from_path(path),
929                    path: path.to_owned(),
930                    hash: hash.to_owned(),
931                },
932            );
933        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
934            set_manifest_artifact(
935                entry,
936                kind,
937                Some(ArtifactState {
938                    path: path.to_owned(),
939                    hash: hash.to_owned(),
940                }),
941            );
942        }
943        Ok(Effect::ArtifactWritten)
944    }
945
946    /// Produce a sidecar's bytes from its source, branching on kind.
947    ///
948    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
949    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
950    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
951    /// ffmpeg port; every other kind is the fetched source verbatim (e.g. the
952    /// static [`CoverJpg`](ArtifactKind::CoverJpg) or album
953    /// [`FolderJpg`](ArtifactKind::FolderJpg) image). A fetch or transcode failure
954    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
955    /// disk-full transcode, which aborts the run like the audio FLAC path.
956    async fn artifact_bytes(
957        &self,
958        kind: ArtifactKind,
959        source_url: &str,
960        owner_id: &str,
961    ) -> Result<Vec<u8>, Fail> {
962        let source = self
963            .fetch_bytes(source_url)
964            .await
965            .map_err(|err| err.attribute(owner_id))?;
966        match kind {
967            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
968                .ffmpeg
969                .mp4_to_webp(&source, WebpEncodeSettings::default())
970                .await
971                .map_err(|err| {
972                    if err.is_out_of_space() {
973                        disk_fail(owner_id, "disk full: no space left to transcode")
974                    } else {
975                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
976                    }
977                }),
978            // The text sidecars are generated and always carry inline content, so
979            // `write_artifact` never reaches this fetch path for them. Guard it so
980            // a future miswiring fails loudly rather than fetching a URL.
981            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
982                permanent_fail(owner_id, "text sidecar requires inline content"),
983            ),
984            ArtifactKind::CoverJpg
985            | ArtifactKind::FolderJpg
986            | ArtifactKind::Playlist
987            | ArtifactKind::VideoMp4 => Ok(source),
988        }
989    }
990
991    /// Remove a sidecar file and clear its slot on the owning manifest entry.
992    ///
993    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
994    /// When the owning entry is already gone (its audio was deleted earlier this
995    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
996    ///
997    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
998    /// the album's root id, not on a manifest clip.
999    ///
1000    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1001    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1002    /// untracked, but the design stays convergent rather than transactional: the
1003    /// next run re-plans the same removal and retries, and any directory it would
1004    /// have emptied is pruned once the file finally clears.
1005    fn delete_artifact(
1006        &self,
1007        manifest: &mut Manifest,
1008        albums: &mut BTreeMap<String, AlbumArt>,
1009        playlists: &mut BTreeMap<String, PlaylistState>,
1010        kind: ArtifactKind,
1011        path: &str,
1012        owner_id: &str,
1013    ) -> Result<Effect, Fail> {
1014        self.fs
1015            .remove(path)
1016            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1017        if is_album_kind(kind) {
1018            if let Some(art) = albums.get_mut(owner_id) {
1019                art.set(kind, None);
1020                if art.is_empty() {
1021                    albums.remove(owner_id);
1022                }
1023            }
1024        } else if is_playlist_kind(kind) {
1025            playlists.remove(owner_id);
1026        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1027            set_manifest_artifact(entry, kind, None);
1028        }
1029        Ok(Effect::ArtifactDeleted)
1030    }
1031
1032    /// Download (and transcode/tag) the audio for `clip` in `format`.
1033    async fn produce_audio(
1034        &self,
1035        client_lock: &ClientLock<'_, C>,
1036        clip: &Clip,
1037        lineage: &LineageContext,
1038        format: AudioFormat,
1039    ) -> Result<Vec<u8>, Fail> {
1040        let (meta, synced) = self.track_meta(clip, lineage);
1041        match format {
1042            AudioFormat::Mp3 => {
1043                let url = clip.mp3_url();
1044                let audio = self
1045                    .fetch_bytes(&url)
1046                    .await
1047                    .map_err(|err| err.attribute(&clip.id))?;
1048                let cover = self.fetch_cover(clip).await;
1049                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1050                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1051            }
1052            AudioFormat::Flac => {
1053                let wav = self.fetch_wav(client_lock, clip).await?;
1054                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1055                    if err.is_out_of_space() {
1056                        disk_fail(&clip.id, "disk full: no space left to transcode")
1057                    } else {
1058                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1059                    }
1060                })?;
1061                let cover = self.fetch_cover(clip).await;
1062                tag_flac(&flac, &meta, cover.as_deref())
1063                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1064            }
1065            AudioFormat::Wav => self.fetch_wav(client_lock, clip).await,
1066        }
1067    }
1068
1069    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1070    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1071        self.synced
1072            .get(clip_id)
1073            .filter(|aligned| !aligned.is_empty())
1074    }
1075
1076    /// The track metadata for a clip, paired with its synced lyrics (if any).
1077    ///
1078    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1079    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1080    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1081    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1082    fn track_meta<'m>(
1083        &'m self,
1084        clip: &Clip,
1085        lineage: &LineageContext,
1086    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1087        let synced = self.synced_for(&clip.id);
1088        let mut meta = TrackMetadata::from_clip(clip, lineage);
1089        if let Some(aligned) = synced {
1090            meta.lyrics = aligned.plain_text();
1091        }
1092        (meta, synced)
1093    }
1094
1095    /// Resolve the rendered WAV URL and download it.
1096    async fn fetch_wav(
1097        &self,
1098        client_lock: &ClientLock<'_, C>,
1099        clip: &Clip,
1100    ) -> Result<Vec<u8>, Fail> {
1101        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1102            Some(url) => url,
1103            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1104        };
1105        self.fetch_bytes(&url)
1106            .await
1107            .map_err(|err| err.attribute(&clip.id))
1108    }
1109
1110    /// Read the WAV URL, requesting a render and polling if it is not ready.
1111    ///
1112    /// `None` means the render did not become ready within the poll budget; the
1113    /// caller treats that as a non-fatal transient failure, never a silent skip.
1114    ///
1115    /// Each client call briefly locks `client_lock`; the poll waits happen
1116    /// unlocked, so concurrent clips interleave their WAV renders rather than
1117    /// serialising behind one clip's whole poll budget.
1118    async fn resolve_wav_url(
1119        &self,
1120        client_lock: &ClientLock<'_, C>,
1121        id: &str,
1122    ) -> Result<Option<String>, Fail> {
1123        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1124            return Ok(Some(url));
1125        }
1126        self.request_wav_retrying(client_lock, id).await?;
1127        for _ in 0..self.opts.wav_poll_attempts {
1128            self.clock.sleep(self.opts.wav_poll_interval).await;
1129            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1130                return Ok(Some(url));
1131            }
1132        }
1133        Ok(None)
1134    }
1135
1136    /// Read the rendered WAV URL, retrying transient API failures with backoff
1137    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1138    async fn wav_url_retrying(
1139        &self,
1140        client_lock: &ClientLock<'_, C>,
1141        id: &str,
1142    ) -> Result<Option<String>, Fail> {
1143        let mut attempt: u32 = 0;
1144        loop {
1145            let result = {
1146                let mut client = client_lock.lock().await;
1147                client.wav_url(self.http, id).await
1148            };
1149            match result {
1150                Ok(url) => return Ok(url),
1151                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1152                    Some(fail) => return Err(fail),
1153                    None => continue,
1154                },
1155            }
1156        }
1157    }
1158
1159    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1160    async fn request_wav_retrying(
1161        &self,
1162        client_lock: &ClientLock<'_, C>,
1163        id: &str,
1164    ) -> Result<(), Fail> {
1165        let mut attempt: u32 = 0;
1166        loop {
1167            let result = {
1168                let mut client = client_lock.lock().await;
1169                client.request_wav(self.http, id).await
1170            };
1171            match result {
1172                Ok(()) => return Ok(()),
1173                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1174                    Some(fail) => return Err(fail),
1175                    None => continue,
1176                },
1177            }
1178        }
1179    }
1180
1181    /// Classify a core error from the authenticated WAV flow. On a transient
1182    /// class within budget, back off through the [`Clock`] and return `None` to
1183    /// retry; otherwise return the terminal [`Fail`].
1184    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1185        let fail = classify_core(id, err);
1186        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1187            self.clock.sleep(backoff_delay(*attempt, None)).await;
1188            *attempt += 1;
1189            None
1190        } else {
1191            Some(fail)
1192        }
1193    }
1194
1195    /// GET `url`, retrying transient failures with backoff, verifying size.
1196    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1197        let mut attempt: u32 = 0;
1198        loop {
1199            let result = self.http.send(HttpRequest::get(url)).await;
1200            match classify_response(result) {
1201                Ok(body) => return Ok(body),
1202                Err(err) => {
1203                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1204                        let delay = backoff_delay(attempt, err.retry_after);
1205                        self.clock.sleep(delay).await;
1206                        attempt += 1;
1207                        continue;
1208                    }
1209                    return Err(err);
1210                }
1211            }
1212        }
1213    }
1214
1215    /// Download cover art, trying each candidate URL in order; `None` is fine.
1216    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1217        for url in clip.cover_candidates() {
1218            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1219                && (200..=299).contains(&response.status)
1220                && !response.body.is_empty()
1221            {
1222                return Some(response.body);
1223            }
1224        }
1225        None
1226    }
1227
1228    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1229    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1230        self.fs.write_atomic(path, bytes).map_err(|err| {
1231            if err.is_out_of_space() {
1232                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1233            } else {
1234                permanent_fail(clip_id, format!("write failed: {err}"))
1235            }
1236        })?;
1237        match self.fs.metadata(path) {
1238            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1239            Some(stat) => Err(permanent_fail(
1240                clip_id,
1241                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1242            )),
1243            None => Ok(bytes.len() as u64),
1244        }
1245    }
1246
1247    /// Build the manifest entry for a freshly written file.
1248    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1249        match self.by_id.get(clip_id) {
1250            Some(d) => manifest_entry(d, size),
1251            None => ManifestEntry {
1252                path: path.to_owned(),
1253                format,
1254                size,
1255                ..ManifestEntry::default()
1256            },
1257        }
1258    }
1259
1260    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1261    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1262        let desired = self.by_id.get(clip_id).copied();
1263        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1264            if let Some(d) = desired {
1265                entry.meta_hash = d.meta_hash.clone();
1266                entry.art_hash = d.art_hash.clone();
1267                entry.preserve = preserve_for(d);
1268            }
1269            if let Some(size) = size {
1270                entry.size = size;
1271            }
1272        }
1273    }
1274
1275    /// Refresh only an entry's preserve marker from the current desired state.
1276    ///
1277    /// A clip can gain or lose copy/private protection with no file change, which
1278    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1279    /// persisted marker a faithful image of live protection, so the cross-run
1280    /// delete guard (SYNC-8) never reads it stale.
1281    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1282        if let Some(d) = self.by_id.get(clip_id).copied()
1283            && let Some(entry) = manifest.entries.get_mut(clip_id)
1284        {
1285            entry.preserve = preserve_for(d);
1286        }
1287    }
1288}
1289
1290/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1291fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1292    ManifestEntry {
1293        path: d.path.clone(),
1294        format: d.format,
1295        meta_hash: d.meta_hash.clone(),
1296        art_hash: d.art_hash.clone(),
1297        size,
1298        preserve: preserve_for(d),
1299        ..Default::default()
1300    }
1301}
1302
1303/// Whether a written entry must be preserved across runs: held by any copy
1304/// source, or private. The reconcile delete guard reads this marker later.
1305fn preserve_for(d: &Desired) -> bool {
1306    d.private || d.modes.contains(&SourceMode::Copy)
1307}
1308
1309/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1310fn classify_response(
1311    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1312) -> Result<Vec<u8>, FetchError> {
1313    let response = match result {
1314        Ok(response) => response,
1315        Err(err) => {
1316            return Err(FetchError::transient(
1317                format!("transport error: {err}"),
1318                None,
1319            ));
1320        }
1321    };
1322    match response.status {
1323        200..=299 => {
1324            if let Some(expected) = content_length(&response) {
1325                let actual = response.body.len() as u64;
1326                if actual != expected {
1327                    return Err(FetchError::transient(
1328                        format!("truncated download: {actual} of {expected} bytes"),
1329                        None,
1330                    ));
1331                }
1332            }
1333            Ok(response.body)
1334        }
1335        401 | 403 => Err(FetchError::transient(
1336            format!("download rejected: status {}", response.status),
1337            None,
1338        )),
1339        408 => Err(FetchError::transient("request timed out", None)),
1340        429 => Err(FetchError::transient(
1341            "rate limited",
1342            retry_after(&response),
1343        )),
1344        500..=599 => Err(FetchError::transient(
1345            format!("server error {}", response.status),
1346            None,
1347        )),
1348        status => Err(FetchError::permanent(format!(
1349            "download failed: status {status}"
1350        ))),
1351    }
1352}
1353
1354/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1355fn classify_core(id: &str, err: Error) -> Fail {
1356    let reason = err.to_string();
1357    match err {
1358        Error::Auth(_) => auth_fail(id, reason),
1359        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1360        Error::Api(_) | Error::NotFound(_) | Error::Tag(_) | Error::Config(_) => {
1361            permanent_fail(id, reason)
1362        }
1363    }
1364}
1365
1366/// The provider-reported body size from `Content-Length`, if present and valid.
1367fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1368    response.header("content-length")?.trim().parse().ok()
1369}
1370
1371#[cfg(test)]
1372mod tests {
1373    use super::*;
1374    use crate::ClerkAuth;
1375    use crate::http::HttpResponse;
1376    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1377
1378    fn clip(id: &str) -> Clip {
1379        Clip {
1380            id: id.to_owned(),
1381            title: "Song".to_owned(),
1382            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1383            ..Default::default()
1384        }
1385    }
1386
1387    fn art_clip(id: &str) -> Clip {
1388        Clip {
1389            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1390            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1391            ..clip(id)
1392        }
1393    }
1394
1395    fn ext(format: AudioFormat) -> &'static str {
1396        match format {
1397            AudioFormat::Mp3 => "mp3",
1398            AudioFormat::Flac => "flac",
1399            AudioFormat::Wav => "wav",
1400        }
1401    }
1402
1403    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1404        Desired {
1405            path: format!("{}.{}", clip.id, ext(format)),
1406            lineage: LineageContext::own_root(&clip),
1407            clip,
1408            format,
1409            meta_hash: "m".to_owned(),
1410            art_hash: "art".to_owned(),
1411            modes: vec![SourceMode::Mirror],
1412            trashed: false,
1413            private: false,
1414            artifacts: Vec::new(),
1415        }
1416    }
1417
1418    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1419        ManifestEntry {
1420            path: path.to_owned(),
1421            format,
1422            meta_hash: "old".to_owned(),
1423            art_hash: "old-art".to_owned(),
1424            size: 8,
1425            preserve: false,
1426            ..Default::default()
1427        }
1428    }
1429
1430    #[allow(clippy::too_many_arguments)]
1431    fn run(
1432        plan: &Plan,
1433        manifest: &mut Manifest,
1434        desired: &[Desired],
1435        http: &ScriptedHttp,
1436        fs: &MemFs,
1437        ffmpeg: &StubFfmpeg,
1438        clock: &RecordingClock,
1439        opts: &ExecOptions,
1440    ) -> ExecOutcome {
1441        let mut albums = BTreeMap::new();
1442        run_with_albums(
1443            plan,
1444            manifest,
1445            &mut albums,
1446            desired,
1447            http,
1448            fs,
1449            ffmpeg,
1450            clock,
1451            opts,
1452        )
1453    }
1454
1455    #[allow(clippy::too_many_arguments)]
1456    fn run_with_albums(
1457        plan: &Plan,
1458        manifest: &mut Manifest,
1459        albums: &mut BTreeMap<String, AlbumArt>,
1460        desired: &[Desired],
1461        http: &ScriptedHttp,
1462        fs: &MemFs,
1463        ffmpeg: &StubFfmpeg,
1464        clock: &RecordingClock,
1465        opts: &ExecOptions,
1466    ) -> ExecOutcome {
1467        let mut playlists = BTreeMap::new();
1468        run_full(
1469            plan,
1470            manifest,
1471            albums,
1472            &mut playlists,
1473            desired,
1474            http,
1475            fs,
1476            ffmpeg,
1477            clock,
1478            opts,
1479        )
1480    }
1481
1482    #[allow(clippy::too_many_arguments)]
1483    fn run_full(
1484        plan: &Plan,
1485        manifest: &mut Manifest,
1486        albums: &mut BTreeMap<String, AlbumArt>,
1487        playlists: &mut BTreeMap<String, PlaylistState>,
1488        desired: &[Desired],
1489        http: &ScriptedHttp,
1490        fs: &MemFs,
1491        ffmpeg: &StubFfmpeg,
1492        clock: &RecordingClock,
1493        opts: &ExecOptions,
1494    ) -> ExecOutcome {
1495        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1496        let synced = HashMap::new();
1497        pollster::block_on(execute(
1498            plan,
1499            manifest,
1500            albums,
1501            playlists,
1502            desired,
1503            &synced,
1504            Ports {
1505                client: &mut client,
1506                http,
1507                fs,
1508                ffmpeg,
1509                clock,
1510            },
1511            opts,
1512        ))
1513    }
1514
1515    fn small_poll() -> ExecOptions {
1516        ExecOptions {
1517            max_retries: 3,
1518            wav_poll_attempts: 2,
1519            wav_poll_interval: Duration::from_secs(5),
1520            concurrency: 4,
1521        }
1522    }
1523
1524    // ── Download: MP3 ───────────────────────────────────────────────
1525
1526    #[test]
1527    fn download_mp3_writes_tagged_file_and_records_manifest() {
1528        let c = art_clip("a");
1529        let d = desired(c.clone(), AudioFormat::Mp3);
1530        let plan = Plan {
1531            actions: vec![Action::Download {
1532                clip: c.clone(),
1533                lineage: LineageContext::own_root(&c),
1534                path: d.path.clone(),
1535                format: AudioFormat::Mp3,
1536            }],
1537        };
1538        let http = ScriptedHttp::new()
1539            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1540            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1541        let fs = MemFs::new();
1542        let ffmpeg = StubFfmpeg::flac();
1543        let clock = RecordingClock::new();
1544        let mut manifest = Manifest::new();
1545
1546        let outcome = run(
1547            &plan,
1548            &mut manifest,
1549            &[d],
1550            &http,
1551            &fs,
1552            &ffmpeg,
1553            &clock,
1554            &ExecOptions::default(),
1555        );
1556
1557        assert_eq!(outcome.downloaded, 1);
1558        assert_eq!(outcome.failed(), 0);
1559        assert_eq!(outcome.status, RunStatus::Completed);
1560        let written = fs.read_file("a.mp3").unwrap();
1561        assert_eq!(&written[..3], b"ID3");
1562        assert!(written.ends_with(b"mp3-body"));
1563        let entry = manifest.get("a").unwrap();
1564        assert_eq!(entry.path, "a.mp3");
1565        assert_eq!(entry.format, AudioFormat::Mp3);
1566        assert_eq!(entry.meta_hash, "m");
1567        assert_eq!(entry.art_hash, "art");
1568        assert_eq!(entry.size, written.len() as u64);
1569        assert!(!entry.preserve);
1570    }
1571
1572    #[test]
1573    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
1574        // A clip whose alignment was fetched this run gets a word-level SYLT frame
1575        // and its plain lyric text embedded (USLT), end to end through execute.
1576        let c = art_clip("a");
1577        let d = desired(c.clone(), AudioFormat::Mp3);
1578        let plan = Plan {
1579            actions: vec![Action::Download {
1580                clip: c.clone(),
1581                lineage: LineageContext::own_root(&c),
1582                path: d.path.clone(),
1583                format: AudioFormat::Mp3,
1584            }],
1585        };
1586        let http = ScriptedHttp::new()
1587            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1588            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1589        let fs = MemFs::new();
1590        let ffmpeg = StubFfmpeg::flac();
1591        let clock = RecordingClock::new();
1592        let mut manifest = Manifest::new();
1593        let mut albums = BTreeMap::new();
1594        let mut playlists = BTreeMap::new();
1595        let mut synced = HashMap::new();
1596        synced.insert(
1597            "a".to_string(),
1598            AlignedLyrics::from_json(&serde_json::json!({
1599                "aligned_words": [],
1600                "aligned_lyrics": [
1601                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
1602                     "words": [
1603                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
1604                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
1605                     ]}
1606                ]
1607            })),
1608        );
1609        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1610        let outcome = pollster::block_on(execute(
1611            &plan,
1612            &mut manifest,
1613            &mut albums,
1614            &mut playlists,
1615            &[d],
1616            &synced,
1617            Ports {
1618                client: &mut client,
1619                http: &http,
1620                fs: &fs,
1621                ffmpeg: &ffmpeg,
1622                clock: &clock,
1623            },
1624            &ExecOptions::default(),
1625        ));
1626
1627        assert_eq!(outcome.downloaded, 1);
1628        let written = fs.read_file("a.mp3").unwrap();
1629        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1630        assert_eq!(
1631            tag.synchronised_lyrics().count(),
1632            1,
1633            "a SYLT frame is embedded"
1634        );
1635        // The plain lyric text is populated from the alignment for the USLT frame.
1636        assert_eq!(
1637            tag.lyrics().next().map(|frame| frame.text.as_str()),
1638            Some("hi there")
1639        );
1640    }
1641
1642    #[test]
1643    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
1644        // The synced map is empty when the feature is off (no alignment fetched),
1645        // so no SYLT frame and no lyric text are embedded.
1646        let c = art_clip("a");
1647        let d = desired(c.clone(), AudioFormat::Mp3);
1648        let plan = Plan {
1649            actions: vec![Action::Download {
1650                clip: c.clone(),
1651                lineage: LineageContext::own_root(&c),
1652                path: d.path.clone(),
1653                format: AudioFormat::Mp3,
1654            }],
1655        };
1656        let http = ScriptedHttp::new()
1657            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1658            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1659        let fs = MemFs::new();
1660        let ffmpeg = StubFfmpeg::flac();
1661        let clock = RecordingClock::new();
1662        let mut manifest = Manifest::new();
1663        let mut albums = BTreeMap::new();
1664        let mut playlists = BTreeMap::new();
1665        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1666        let outcome = pollster::block_on(execute(
1667            &plan,
1668            &mut manifest,
1669            &mut albums,
1670            &mut playlists,
1671            &[d],
1672            &HashMap::new(),
1673            Ports {
1674                client: &mut client,
1675                http: &http,
1676                fs: &fs,
1677                ffmpeg: &ffmpeg,
1678                clock: &clock,
1679            },
1680            &ExecOptions::default(),
1681        ));
1682        assert_eq!(outcome.downloaded, 1);
1683        let written = fs.read_file("a.mp3").unwrap();
1684        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1685        assert_eq!(tag.synchronised_lyrics().count(), 0);
1686        assert_eq!(tag.lyrics().count(), 0);
1687    }
1688
1689    #[test]
1690    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
1691        let mut c = clip("a");
1692        c.audio_url = String::new();
1693        let d = desired(c.clone(), AudioFormat::Mp3);
1694        let plan = Plan {
1695            actions: vec![Action::Download {
1696                clip: c.clone(),
1697                lineage: LineageContext::own_root(&c),
1698                path: d.path.clone(),
1699                format: AudioFormat::Mp3,
1700            }],
1701        };
1702        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
1703        let fs = MemFs::new();
1704        let mut manifest = Manifest::new();
1705        let outcome = run(
1706            &plan,
1707            &mut manifest,
1708            &[d],
1709            &http,
1710            &fs,
1711            &StubFfmpeg::flac(),
1712            &RecordingClock::new(),
1713            &ExecOptions::default(),
1714        );
1715        assert_eq!(outcome.downloaded, 1);
1716        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
1717    }
1718
1719    // ── Download: FLAC render + transcode ───────────────────────────
1720
1721    #[test]
1722    fn download_flac_renders_transcodes_and_records() {
1723        let c = clip("b");
1724        let d = desired(c.clone(), AudioFormat::Flac);
1725        let plan = Plan {
1726            actions: vec![Action::Download {
1727                clip: c.clone(),
1728                lineage: LineageContext::own_root(&c),
1729                path: d.path.clone(),
1730                format: AudioFormat::Flac,
1731            }],
1732        };
1733        let http = ScriptedHttp::new()
1734            .with_auth()
1735            .route(
1736                "/wav_file/",
1737                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
1738            )
1739            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
1740        let fs = MemFs::new();
1741        let clock = RecordingClock::new();
1742        let mut manifest = Manifest::new();
1743
1744        let outcome = run(
1745            &plan,
1746            &mut manifest,
1747            &[d],
1748            &http,
1749            &fs,
1750            &StubFfmpeg::flac(),
1751            &clock,
1752            &ExecOptions::default(),
1753        );
1754
1755        assert_eq!(outcome.downloaded, 1);
1756        assert_eq!(outcome.failed(), 0);
1757        let written = fs.read_file("b.flac").unwrap();
1758        assert_eq!(&written[..4], b"fLaC");
1759        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
1760        // The URL was ready immediately, so no render request and no polling.
1761        assert_eq!(http.count("/convert_wav/"), 0);
1762        assert!(clock.sleeps().is_empty());
1763    }
1764
1765    #[test]
1766    fn download_flac_requests_render_then_polls_until_ready() {
1767        let c = clip("c");
1768        let d = desired(c.clone(), AudioFormat::Flac);
1769        let plan = Plan {
1770            actions: vec![Action::Download {
1771                clip: c.clone(),
1772                lineage: LineageContext::own_root(&c),
1773                path: d.path.clone(),
1774                format: AudioFormat::Flac,
1775            }],
1776        };
1777        let http = ScriptedHttp::new()
1778            .with_auth()
1779            .route_seq(
1780                "/wav_file/",
1781                vec![
1782                    Reply::json("{}"),
1783                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
1784                ],
1785            )
1786            .route("/convert_wav/", Reply::status(200))
1787            .route("c.wav", Reply::ok(b"wav".to_vec()));
1788        let clock = RecordingClock::new();
1789        let mut manifest = Manifest::new();
1790
1791        let outcome = run(
1792            &plan,
1793            &mut manifest,
1794            &[d],
1795            &http,
1796            &fs_new(),
1797            &StubFfmpeg::flac(),
1798            &clock,
1799            &small_poll(),
1800        );
1801
1802        assert_eq!(outcome.downloaded, 1);
1803        assert_eq!(http.count("/convert_wav/"), 1);
1804        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
1805    }
1806
1807    #[test]
1808    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
1809        let c = clip("d");
1810        let d = desired(c.clone(), AudioFormat::Flac);
1811        let plan = Plan {
1812            actions: vec![Action::Download {
1813                clip: c.clone(),
1814                lineage: LineageContext::own_root(&c),
1815                path: d.path.clone(),
1816                format: AudioFormat::Flac,
1817            }],
1818        };
1819        let http = ScriptedHttp::new()
1820            .with_auth()
1821            .route("/wav_file/", Reply::json("{}"))
1822            .route("/convert_wav/", Reply::status(200));
1823        let fs = MemFs::new();
1824        let clock = RecordingClock::new();
1825        let mut manifest = Manifest::new();
1826
1827        let outcome = run(
1828            &plan,
1829            &mut manifest,
1830            &[d],
1831            &http,
1832            &fs,
1833            &StubFfmpeg::flac(),
1834            &clock,
1835            &small_poll(),
1836        );
1837
1838        assert_eq!(outcome.downloaded, 0);
1839        assert_eq!(outcome.failed(), 1);
1840        assert_eq!(outcome.failures[0].clip_id, "d");
1841        assert_eq!(outcome.status, RunStatus::Completed);
1842        assert!(!fs.exists("d.flac"));
1843        assert_eq!(clock.sleeps().len(), 2);
1844    }
1845
1846    #[test]
1847    fn flac_transcode_failure_is_recorded_and_skipped() {
1848        let c = clip("t");
1849        let d = desired(c.clone(), AudioFormat::Flac);
1850        let plan = Plan {
1851            actions: vec![Action::Download {
1852                clip: c.clone(),
1853                lineage: LineageContext::own_root(&c),
1854                path: d.path.clone(),
1855                format: AudioFormat::Flac,
1856            }],
1857        };
1858        let http = ScriptedHttp::new()
1859            .with_auth()
1860            .route(
1861                "/wav_file/",
1862                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
1863            )
1864            .route("t.wav", Reply::ok(b"wav".to_vec()));
1865        let fs = MemFs::new();
1866        let mut manifest = Manifest::new();
1867
1868        let outcome = run(
1869            &plan,
1870            &mut manifest,
1871            &[d],
1872            &http,
1873            &fs,
1874            &StubFfmpeg::failing(),
1875            &RecordingClock::new(),
1876            &ExecOptions::default(),
1877        );
1878
1879        assert_eq!(outcome.downloaded, 0);
1880        assert_eq!(outcome.failed(), 1);
1881        assert!(!fs.exists("t.flac"));
1882        assert!(manifest.get("t").is_none());
1883    }
1884
1885    // ── Cover fallback ──────────────────────────────────────────────
1886
1887    #[test]
1888    fn cover_falls_back_when_large_image_is_missing() {
1889        let c = art_clip("e");
1890        let d = desired(c.clone(), AudioFormat::Mp3);
1891        let plan = Plan {
1892            actions: vec![Action::Download {
1893                clip: c.clone(),
1894                lineage: LineageContext::own_root(&c),
1895                path: d.path.clone(),
1896                format: AudioFormat::Mp3,
1897            }],
1898        };
1899        let http = ScriptedHttp::new()
1900            .route("e.mp3", Reply::ok(b"body".to_vec()))
1901            .route("e/large.jpg", Reply::status(404))
1902            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
1903        let fs = MemFs::new();
1904        let mut manifest = Manifest::new();
1905
1906        let outcome = run(
1907            &plan,
1908            &mut manifest,
1909            &[d],
1910            &http,
1911            &fs,
1912            &StubFfmpeg::flac(),
1913            &RecordingClock::new(),
1914            &ExecOptions::default(),
1915        );
1916
1917        assert_eq!(outcome.downloaded, 1);
1918        let calls = http.calls();
1919        let large = calls
1920            .iter()
1921            .position(|u| u.contains("e/large.jpg"))
1922            .unwrap();
1923        let small = calls
1924            .iter()
1925            .position(|u| u.contains("e/small.jpg"))
1926            .unwrap();
1927        assert!(large < small, "large art tried before small");
1928    }
1929
1930    // ── Atomic write and size verification (SYNC-13/14) ─────────────
1931
1932    #[test]
1933    fn failed_write_leaves_the_prior_file_intact() {
1934        let c = clip("f");
1935        let d = desired(c.clone(), AudioFormat::Mp3);
1936        let plan = Plan {
1937            actions: vec![Action::Download {
1938                clip: c.clone(),
1939                lineage: LineageContext::own_root(&c),
1940                path: d.path.clone(),
1941                format: AudioFormat::Mp3,
1942            }],
1943        };
1944        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
1945        let fs = MemFs::new()
1946            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
1947            .fail_write("f.mp3");
1948        let mut manifest = Manifest::new();
1949
1950        let outcome = run(
1951            &plan,
1952            &mut manifest,
1953            &[d],
1954            &http,
1955            &fs,
1956            &StubFfmpeg::flac(),
1957            &RecordingClock::new(),
1958            &ExecOptions::default(),
1959        );
1960
1961        assert_eq!(outcome.downloaded, 0);
1962        assert_eq!(outcome.failed(), 1);
1963        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
1964        assert!(manifest.get("f").is_none());
1965    }
1966
1967    #[test]
1968    fn size_mismatch_after_write_is_a_failure() {
1969        let c = clip("g");
1970        let d = desired(c.clone(), AudioFormat::Mp3);
1971        let plan = Plan {
1972            actions: vec![Action::Download {
1973                clip: c.clone(),
1974                lineage: LineageContext::own_root(&c),
1975                path: d.path.clone(),
1976                format: AudioFormat::Mp3,
1977            }],
1978        };
1979        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
1980        let fs = MemFs::new().corrupt_write("g.mp3");
1981        let mut manifest = Manifest::new();
1982
1983        let outcome = run(
1984            &plan,
1985            &mut manifest,
1986            &[d],
1987            &http,
1988            &fs,
1989            &StubFfmpeg::flac(),
1990            &RecordingClock::new(),
1991            &ExecOptions::default(),
1992        );
1993
1994        assert_eq!(outcome.downloaded, 0);
1995        assert_eq!(outcome.failed(), 1);
1996        assert!(outcome.failures[0].reason.contains("expected"));
1997        assert!(manifest.get("g").is_none());
1998    }
1999
2000    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2001
2002    #[test]
2003    fn transient_failure_is_retried_then_skipped() {
2004        let c = clip("h");
2005        let d = desired(c.clone(), AudioFormat::Mp3);
2006        let plan = Plan {
2007            actions: vec![Action::Download {
2008                clip: c.clone(),
2009                lineage: LineageContext::own_root(&c),
2010                path: d.path.clone(),
2011                format: AudioFormat::Mp3,
2012            }],
2013        };
2014        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2015        let fs = MemFs::new();
2016        let clock = RecordingClock::new();
2017        let opts = ExecOptions {
2018            max_retries: 2,
2019            ..ExecOptions::default()
2020        };
2021        let mut manifest = Manifest::new();
2022
2023        let outcome = run(
2024            &plan,
2025            &mut manifest,
2026            &[d],
2027            &http,
2028            &fs,
2029            &StubFfmpeg::flac(),
2030            &clock,
2031            &opts,
2032        );
2033
2034        assert_eq!(outcome.downloaded, 0);
2035        assert_eq!(outcome.failed(), 1);
2036        assert_eq!(http.count("h.mp3"), 3);
2037        assert_eq!(clock.sleeps().len(), 2);
2038    }
2039
2040    #[test]
2041    fn truncated_download_is_retried_then_succeeds() {
2042        let c = clip("i");
2043        let d = desired(c.clone(), AudioFormat::Mp3);
2044        let plan = Plan {
2045            actions: vec![Action::Download {
2046                clip: c.clone(),
2047                lineage: LineageContext::own_root(&c),
2048                path: d.path.clone(),
2049                format: AudioFormat::Mp3,
2050            }],
2051        };
2052        let http = ScriptedHttp::new().route_seq(
2053            "i.mp3",
2054            vec![
2055                Reply::ok(b"short".to_vec()).with_content_length(999),
2056                Reply::ok(b"good-body".to_vec()),
2057            ],
2058        );
2059        let fs = MemFs::new();
2060        let clock = RecordingClock::new();
2061        let mut manifest = Manifest::new();
2062
2063        let outcome = run(
2064            &plan,
2065            &mut manifest,
2066            &[d],
2067            &http,
2068            &fs,
2069            &StubFfmpeg::flac(),
2070            &clock,
2071            &ExecOptions::default(),
2072        );
2073
2074        assert_eq!(outcome.downloaded, 1);
2075        assert_eq!(http.count("i.mp3"), 2);
2076        assert_eq!(clock.sleeps().len(), 1);
2077    }
2078
2079    #[test]
2080    fn rate_limit_backs_off_using_retry_after() {
2081        let c = clip("j");
2082        let d = desired(c.clone(), AudioFormat::Mp3);
2083        let plan = Plan {
2084            actions: vec![Action::Download {
2085                clip: c.clone(),
2086                lineage: LineageContext::own_root(&c),
2087                path: d.path.clone(),
2088                format: AudioFormat::Mp3,
2089            }],
2090        };
2091        let http = ScriptedHttp::new().route_seq(
2092            "j.mp3",
2093            vec![
2094                Reply::status(429).with_retry_after(7),
2095                Reply::ok(b"body".to_vec()),
2096            ],
2097        );
2098        let fs = MemFs::new();
2099        let clock = RecordingClock::new();
2100        let mut manifest = Manifest::new();
2101
2102        let outcome = run(
2103            &plan,
2104            &mut manifest,
2105            &[d],
2106            &http,
2107            &fs,
2108            &StubFfmpeg::flac(),
2109            &clock,
2110            &ExecOptions::default(),
2111        );
2112
2113        assert_eq!(outcome.downloaded, 1);
2114        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2115    }
2116
2117    #[test]
2118    fn auth_failure_aborts_the_run() {
2119        let c1 = clip("k1");
2120        let c2 = clip("k2");
2121        let d1 = desired(c1.clone(), AudioFormat::Flac);
2122        let d2 = desired(c2.clone(), AudioFormat::Flac);
2123        let plan = Plan {
2124            actions: vec![
2125                Action::Download {
2126                    clip: c1.clone(),
2127                    lineage: LineageContext::own_root(&c1),
2128                    path: d1.path.clone(),
2129                    format: AudioFormat::Flac,
2130                },
2131                Action::Download {
2132                    clip: c2.clone(),
2133                    lineage: LineageContext::own_root(&c2),
2134                    path: d2.path.clone(),
2135                    format: AudioFormat::Flac,
2136                },
2137            ],
2138        };
2139        // The authenticated WAV-render endpoint rejects auth even after a JWT
2140        // refresh: that is a bad token, so the whole run aborts rather than
2141        // hammering every clip. A CDN media rejection, by contrast, does not.
2142        let http = ScriptedHttp::new()
2143            .with_auth()
2144            .route("/wav_file/", Reply::status(401));
2145        let fs = MemFs::new();
2146        let mut manifest = Manifest::new();
2147
2148        let outcome = run(
2149            &plan,
2150            &mut manifest,
2151            &[d1, d2],
2152            &http,
2153            &fs,
2154            &StubFfmpeg::flac(),
2155            &RecordingClock::new(),
2156            &small_poll(),
2157        );
2158
2159        assert_eq!(outcome.status, RunStatus::AuthAborted);
2160        assert_eq!(outcome.failed(), 1);
2161        assert_eq!(outcome.failures[0].clip_id, "k1");
2162        assert_eq!(outcome.downloaded, 0);
2163    }
2164
2165    // ── Disk-full aborts the run (issue #17) ────────────────────────
2166
2167    #[test]
2168    fn disk_full_primary_write_aborts_the_run() {
2169        // Two MP3 downloads; the first write is out of space. That is systemic,
2170        // so the run aborts before the second is even attempted: exactly one
2171        // failure is recorded and its reason names the disk-full cause.
2172        let c1 = clip("d1");
2173        let c2 = clip("d2");
2174        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2175        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2176        let plan = Plan {
2177            actions: vec![
2178                Action::Download {
2179                    clip: c1.clone(),
2180                    lineage: LineageContext::own_root(&c1),
2181                    path: d1.path.clone(),
2182                    format: AudioFormat::Mp3,
2183                },
2184                Action::Download {
2185                    clip: c2.clone(),
2186                    lineage: LineageContext::own_root(&c2),
2187                    path: d2.path.clone(),
2188                    format: AudioFormat::Mp3,
2189                },
2190            ],
2191        };
2192        let http = ScriptedHttp::new()
2193            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2194            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2195        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2196        let mut manifest = Manifest::new();
2197
2198        let outcome = run(
2199            &plan,
2200            &mut manifest,
2201            &[d1, d2],
2202            &http,
2203            &fs,
2204            &StubFfmpeg::flac(),
2205            &RecordingClock::new(),
2206            &ExecOptions::default(),
2207        );
2208
2209        assert_eq!(outcome.status, RunStatus::DiskFull);
2210        assert_eq!(outcome.failed(), 1);
2211        assert_eq!(outcome.failures[0].clip_id, "d1");
2212        assert!(outcome.failures[0].reason.contains("disk full"));
2213        assert_eq!(outcome.downloaded, 0);
2214        // The second clip was never fetched: the run aborted first.
2215        assert_eq!(http.count("d2.mp3"), 0);
2216        assert!(!fs.exists("d2.mp3"));
2217    }
2218
2219    #[test]
2220    fn disk_full_flac_transcode_aborts_the_run() {
2221        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2222        // there is nowhere to stage the transcode, so the run aborts.
2223        let c1 = clip("d1");
2224        let c2 = clip("d2");
2225        let d1 = desired(c1.clone(), AudioFormat::Flac);
2226        let d2 = desired(c2.clone(), AudioFormat::Flac);
2227        let plan = Plan {
2228            actions: vec![
2229                Action::Download {
2230                    clip: c1.clone(),
2231                    lineage: LineageContext::own_root(&c1),
2232                    path: d1.path.clone(),
2233                    format: AudioFormat::Flac,
2234                },
2235                Action::Download {
2236                    clip: c2.clone(),
2237                    lineage: LineageContext::own_root(&c2),
2238                    path: d2.path.clone(),
2239                    format: AudioFormat::Flac,
2240                },
2241            ],
2242        };
2243        let http = ScriptedHttp::new()
2244            .with_auth()
2245            .route(
2246                "/wav_file/",
2247                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2248            )
2249            .route(".wav", Reply::ok(b"wav".to_vec()));
2250        let fs = MemFs::new();
2251        let mut manifest = Manifest::new();
2252
2253        let outcome = run(
2254            &plan,
2255            &mut manifest,
2256            &[d1, d2],
2257            &http,
2258            &fs,
2259            &StubFfmpeg::out_of_space(),
2260            &RecordingClock::new(),
2261            &ExecOptions::default(),
2262        );
2263
2264        assert_eq!(outcome.status, RunStatus::DiskFull);
2265        assert_eq!(outcome.failed(), 1);
2266        assert_eq!(outcome.failures[0].clip_id, "d1");
2267        assert!(outcome.failures[0].reason.contains("disk full"));
2268        assert_eq!(outcome.downloaded, 0);
2269    }
2270
2271    #[test]
2272    fn disk_full_artifact_write_aborts_the_run() {
2273        // A sidecar write (not a primary download) also aborts on a full disk:
2274        // the owning audio is present, the cover fetch succeeds, but the sidecar
2275        // cannot be written.
2276        let mut manifest = Manifest::new();
2277        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2278        let plan = Plan {
2279            actions: vec![Action::WriteArtifact {
2280                kind: ArtifactKind::CoverJpg,
2281                path: "a/cover.jpg".to_owned(),
2282                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2283                hash: "h1".to_owned(),
2284                owner_id: "a".to_owned(),
2285                content: None,
2286            }],
2287        };
2288        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2289        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
2290
2291        let outcome = run(
2292            &plan,
2293            &mut manifest,
2294            &[],
2295            &http,
2296            &fs,
2297            &StubFfmpeg::flac(),
2298            &RecordingClock::new(),
2299            &ExecOptions::default(),
2300        );
2301
2302        assert_eq!(outcome.status, RunStatus::DiskFull);
2303        assert_eq!(outcome.failed(), 1);
2304        assert!(outcome.failures[0].reason.contains("disk full"));
2305        assert_eq!(outcome.artifacts_written, 0);
2306        // The sidecar slot was never recorded: the write failed before it.
2307        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
2308    }
2309
2310    #[test]
2311    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
2312        // write_verify fails before any manifest insert, so a re-download that
2313        // hits a full disk leaves the prior entry (and file) exactly as it was.
2314        let c = clip("m");
2315        let d = desired(c.clone(), AudioFormat::Mp3);
2316        let plan = Plan {
2317            actions: vec![Action::Download {
2318                clip: c.clone(),
2319                lineage: LineageContext::own_root(&c),
2320                path: d.path.clone(),
2321                format: AudioFormat::Mp3,
2322            }],
2323        };
2324        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
2325        let fs = MemFs::new()
2326            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
2327            .fail_write_out_of_space("m.mp3");
2328        let mut manifest = Manifest::new();
2329        let before = entry("m.mp3", AudioFormat::Mp3);
2330        manifest.insert("m", before.clone());
2331
2332        let outcome = run(
2333            &plan,
2334            &mut manifest,
2335            &[d],
2336            &http,
2337            &fs,
2338            &StubFfmpeg::flac(),
2339            &RecordingClock::new(),
2340            &ExecOptions::default(),
2341        );
2342
2343        assert_eq!(outcome.status, RunStatus::DiskFull);
2344        assert_eq!(manifest.get("m"), Some(&before));
2345        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
2346    }
2347
2348    #[test]
2349    fn cdn_download_rejection_skips_the_clip_without_aborting() {
2350        let c1 = clip("k1");
2351        let c2 = clip("k2");
2352        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2353        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2354        let plan = Plan {
2355            actions: vec![
2356                Action::Download {
2357                    clip: c1.clone(),
2358                    lineage: LineageContext::own_root(&c1),
2359                    path: d1.path.clone(),
2360                    format: AudioFormat::Mp3,
2361                },
2362                Action::Download {
2363                    clip: c2.clone(),
2364                    lineage: LineageContext::own_root(&c2),
2365                    path: d2.path.clone(),
2366                    format: AudioFormat::Mp3,
2367                },
2368            ],
2369        };
2370        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
2371        // rejection (often transient), not a bad token: the clip is retried
2372        // then recorded and skipped, and the run carries on to the rest.
2373        let http = ScriptedHttp::new()
2374            .route("k1.mp3", Reply::status(403))
2375            .route("k2.mp3", Reply::ok(b"body".to_vec()));
2376        let fs = MemFs::new();
2377        let mut manifest = Manifest::new();
2378
2379        let outcome = run(
2380            &plan,
2381            &mut manifest,
2382            &[d1, d2],
2383            &http,
2384            &fs,
2385            &StubFfmpeg::flac(),
2386            &RecordingClock::new(),
2387            &ExecOptions::default(),
2388        );
2389
2390        assert_ne!(outcome.status, RunStatus::AuthAborted);
2391        assert_eq!(outcome.downloaded, 1);
2392        assert_eq!(outcome.failed(), 1);
2393        assert_eq!(outcome.failures[0].clip_id, "k1");
2394    }
2395
2396    #[test]
2397    fn one_clip_failure_does_not_abort_the_run() {
2398        let c1 = clip("l1");
2399        let c2 = clip("l2");
2400        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2401        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2402        let plan = Plan {
2403            actions: vec![
2404                Action::Download {
2405                    clip: c1.clone(),
2406                    lineage: LineageContext::own_root(&c1),
2407                    path: d1.path.clone(),
2408                    format: AudioFormat::Mp3,
2409                },
2410                Action::Download {
2411                    clip: c2.clone(),
2412                    lineage: LineageContext::own_root(&c2),
2413                    path: d2.path.clone(),
2414                    format: AudioFormat::Mp3,
2415                },
2416            ],
2417        };
2418        let http = ScriptedHttp::new()
2419            .route("l1.mp3", Reply::status(404))
2420            .route("l2.mp3", Reply::ok(b"body".to_vec()));
2421        let fs = MemFs::new();
2422        let mut manifest = Manifest::new();
2423
2424        let outcome = run(
2425            &plan,
2426            &mut manifest,
2427            &[d1, d2],
2428            &http,
2429            &fs,
2430            &StubFfmpeg::flac(),
2431            &RecordingClock::new(),
2432            &ExecOptions::default(),
2433        );
2434
2435        assert_eq!(outcome.status, RunStatus::Completed);
2436        assert_eq!(outcome.downloaded, 1);
2437        assert_eq!(outcome.failed(), 1);
2438        assert_eq!(outcome.failures[0].clip_id, "l1");
2439        assert!(fs.exists("l2.mp3"));
2440        assert!(manifest.get("l2").is_some());
2441        assert!(manifest.get("l1").is_none());
2442    }
2443
2444    // ── preserve marker (SYNC-8) ────────────────────────────────────
2445
2446    #[test]
2447    fn preserve_is_set_for_copy_held_and_private_clips() {
2448        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
2449        mirror.modes = vec![SourceMode::Mirror];
2450        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
2451        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
2452        let mut private = desired(clip("m3"), AudioFormat::Mp3);
2453        private.private = true;
2454
2455        let plan = Plan {
2456            actions: vec![
2457                Action::Download {
2458                    clip: mirror.clip.clone(),
2459                    lineage: LineageContext::own_root(&mirror.clip),
2460                    path: mirror.path.clone(),
2461                    format: AudioFormat::Mp3,
2462                },
2463                Action::Download {
2464                    clip: copy_held.clip.clone(),
2465                    lineage: LineageContext::own_root(&copy_held.clip),
2466                    path: copy_held.path.clone(),
2467                    format: AudioFormat::Mp3,
2468                },
2469                Action::Download {
2470                    clip: private.clip.clone(),
2471                    lineage: LineageContext::own_root(&private.clip),
2472                    path: private.path.clone(),
2473                    format: AudioFormat::Mp3,
2474                },
2475            ],
2476        };
2477        let http = ScriptedHttp::new()
2478            .route("m1.mp3", Reply::ok(b"a".to_vec()))
2479            .route("m2.mp3", Reply::ok(b"b".to_vec()))
2480            .route("m3.mp3", Reply::ok(b"c".to_vec()));
2481        let fs = MemFs::new();
2482        let mut manifest = Manifest::new();
2483
2484        let outcome = run(
2485            &plan,
2486            &mut manifest,
2487            &[mirror, copy_held, private],
2488            &http,
2489            &fs,
2490            &StubFfmpeg::flac(),
2491            &RecordingClock::new(),
2492            &ExecOptions::default(),
2493        );
2494
2495        assert_eq!(outcome.downloaded, 3);
2496        assert!(!manifest.get("m1").unwrap().preserve);
2497        assert!(manifest.get("m2").unwrap().preserve);
2498        assert!(manifest.get("m3").unwrap().preserve);
2499    }
2500
2501    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
2502
2503    #[test]
2504    fn reformat_writes_new_format_and_removes_old_file() {
2505        let c = clip("n");
2506        let d = desired(c.clone(), AudioFormat::Mp3);
2507        let plan = Plan {
2508            actions: vec![Action::Reformat {
2509                clip: c.clone(),
2510                path: "n.mp3".to_owned(),
2511                from_path: "n.flac".to_owned(),
2512                from: AudioFormat::Flac,
2513                to: AudioFormat::Mp3,
2514            }],
2515        };
2516        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
2517        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
2518        let mut manifest = Manifest::new();
2519        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
2520
2521        let outcome = run(
2522            &plan,
2523            &mut manifest,
2524            &[d],
2525            &http,
2526            &fs,
2527            &StubFfmpeg::flac(),
2528            &RecordingClock::new(),
2529            &ExecOptions::default(),
2530        );
2531
2532        assert_eq!(outcome.reformatted, 1);
2533        assert!(fs.exists("n.mp3"));
2534        assert!(!fs.exists("n.flac"));
2535        let updated = manifest.get("n").unwrap();
2536        assert_eq!(updated.path, "n.mp3");
2537        assert_eq!(updated.format, AudioFormat::Mp3);
2538        assert_eq!(updated.meta_hash, "m");
2539    }
2540
2541    #[test]
2542    fn retag_rewrites_file_and_updates_hashes() {
2543        let c = clip("o");
2544        let mut d = desired(c.clone(), AudioFormat::Mp3);
2545        d.meta_hash = "new".to_owned();
2546        d.art_hash = "new-art".to_owned();
2547        let existing = tag_mp3(
2548            b"audio",
2549            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
2550            None,
2551            None,
2552        )
2553        .unwrap();
2554        let fs = MemFs::new().with_file("o.mp3", existing.clone());
2555        let mut manifest = Manifest::new();
2556        let mut start = entry("o.mp3", AudioFormat::Mp3);
2557        start.size = existing.len() as u64;
2558        manifest.insert("o", start);
2559        let plan = Plan {
2560            actions: vec![Action::Retag {
2561                clip: c.clone(),
2562                lineage: LineageContext::own_root(&c),
2563                path: "o.mp3".to_owned(),
2564            }],
2565        };
2566
2567        let outcome = run(
2568            &plan,
2569            &mut manifest,
2570            &[d],
2571            &ScriptedHttp::new(),
2572            &fs,
2573            &StubFfmpeg::flac(),
2574            &RecordingClock::new(),
2575            &ExecOptions::default(),
2576        );
2577
2578        assert_eq!(outcome.retagged, 1);
2579        let updated = manifest.get("o").unwrap();
2580        assert_eq!(updated.meta_hash, "new");
2581        assert_eq!(updated.art_hash, "new-art");
2582        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
2583    }
2584
2585    #[test]
2586    fn rename_moves_file_and_updates_manifest_path() {
2587        let c = clip("p");
2588        let mut d = desired(c.clone(), AudioFormat::Mp3);
2589        d.path = "new/p.mp3".to_owned();
2590        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
2591        let mut manifest = Manifest::new();
2592        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
2593        let plan = Plan {
2594            actions: vec![Action::Rename {
2595                from: "old/p.mp3".to_owned(),
2596                to: "new/p.mp3".to_owned(),
2597            }],
2598        };
2599
2600        let outcome = run(
2601            &plan,
2602            &mut manifest,
2603            &[d],
2604            &ScriptedHttp::new(),
2605            &fs,
2606            &StubFfmpeg::flac(),
2607            &RecordingClock::new(),
2608            &ExecOptions::default(),
2609        );
2610
2611        assert_eq!(outcome.renamed, 1);
2612        assert!(fs.exists("new/p.mp3"));
2613        assert!(!fs.exists("old/p.mp3"));
2614        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
2615    }
2616
2617    #[test]
2618    fn disk_full_rename_aborts_the_run() {
2619        // A move onto a full disk is systemic like a full-disk write: the run
2620        // aborts with DiskFull and the source file is left untouched.
2621        let c = clip("p");
2622        let mut d = desired(c.clone(), AudioFormat::Mp3);
2623        d.path = "new/p.mp3".to_owned();
2624        let fs = MemFs::new()
2625            .with_file("old/p.mp3", b"DATA".to_vec())
2626            .fail_rename_out_of_space("new/p.mp3");
2627        let mut manifest = Manifest::new();
2628        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
2629        let plan = Plan {
2630            actions: vec![Action::Rename {
2631                from: "old/p.mp3".to_owned(),
2632                to: "new/p.mp3".to_owned(),
2633            }],
2634        };
2635
2636        let outcome = run(
2637            &plan,
2638            &mut manifest,
2639            &[d],
2640            &ScriptedHttp::new(),
2641            &fs,
2642            &StubFfmpeg::flac(),
2643            &RecordingClock::new(),
2644            &ExecOptions::default(),
2645        );
2646
2647        assert_eq!(outcome.status, RunStatus::DiskFull);
2648        assert_eq!(outcome.renamed, 0);
2649        assert_eq!(outcome.failed(), 1);
2650        assert!(outcome.failures[0].reason.contains("disk full"));
2651        // The source is untouched: the move never happened.
2652        assert!(fs.exists("old/p.mp3"));
2653        assert!(!fs.exists("new/p.mp3"));
2654        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
2655    }
2656
2657    #[test]
2658    fn delete_removes_file_and_manifest_entry() {
2659        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
2660        let mut manifest = Manifest::new();
2661        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
2662        let plan = Plan {
2663            actions: vec![Action::Delete {
2664                path: "q.mp3".to_owned(),
2665                clip_id: "q".to_owned(),
2666            }],
2667        };
2668
2669        let outcome = run(
2670            &plan,
2671            &mut manifest,
2672            &[],
2673            &ScriptedHttp::new(),
2674            &fs,
2675            &StubFfmpeg::flac(),
2676            &RecordingClock::new(),
2677            &ExecOptions::default(),
2678        );
2679
2680        assert_eq!(outcome.deleted, 1);
2681        assert!(!fs.exists("q.mp3"));
2682        assert!(manifest.get("q").is_none());
2683    }
2684
2685    #[test]
2686    fn failed_delete_keeps_the_manifest_entry() {
2687        let fs = MemFs::new()
2688            .with_file("s.mp3", b"DATA".to_vec())
2689            .fail_remove("s.mp3");
2690        let mut manifest = Manifest::new();
2691        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
2692        let plan = Plan {
2693            actions: vec![Action::Delete {
2694                path: "s.mp3".to_owned(),
2695                clip_id: "s".to_owned(),
2696            }],
2697        };
2698
2699        let outcome = run(
2700            &plan,
2701            &mut manifest,
2702            &[],
2703            &ScriptedHttp::new(),
2704            &fs,
2705            &StubFfmpeg::flac(),
2706            &RecordingClock::new(),
2707            &ExecOptions::default(),
2708        );
2709
2710        assert_eq!(outcome.deleted, 0);
2711        assert_eq!(outcome.failed(), 1);
2712        assert!(manifest.get("s").is_some());
2713        assert!(fs.exists("s.mp3"));
2714    }
2715
2716    #[test]
2717    fn skip_is_a_noop() {
2718        let mut manifest = Manifest::new();
2719        let plan = Plan {
2720            actions: vec![Action::Skip {
2721                clip_id: "r".to_owned(),
2722            }],
2723        };
2724        let outcome = run(
2725            &plan,
2726            &mut manifest,
2727            &[],
2728            &ScriptedHttp::new(),
2729            &MemFs::new(),
2730            &StubFfmpeg::flac(),
2731            &RecordingClock::new(),
2732            &ExecOptions::default(),
2733        );
2734        assert_eq!(outcome.skipped, 1);
2735        assert_eq!(outcome.failed(), 0);
2736    }
2737
2738    // ── Pure helpers ────────────────────────────────────────────────
2739
2740    #[test]
2741    fn header_helpers_parse_or_ignore() {
2742        let resp = HttpResponse {
2743            status: 200,
2744            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
2745            body: Vec::new(),
2746        };
2747        assert_eq!(content_length(&resp), Some(42));
2748
2749        let bare = HttpResponse {
2750            status: 200,
2751            headers: Vec::new(),
2752            body: Vec::new(),
2753        };
2754        assert_eq!(content_length(&bare), None);
2755    }
2756
2757    #[test]
2758    fn preserve_rule_covers_copy_and_private() {
2759        let base = desired(clip("x"), AudioFormat::Mp3);
2760        assert!(!preserve_for(&base));
2761        let mut copy_held = base.clone();
2762        copy_held.modes = vec![SourceMode::Copy];
2763        assert!(preserve_for(&copy_held));
2764        let mut private = base.clone();
2765        private.private = true;
2766        assert!(preserve_for(&private));
2767    }
2768
2769    fn fs_new() -> MemFs {
2770        MemFs::new()
2771    }
2772
2773    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
2774
2775    #[test]
2776    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
2777        let c = clip("s1");
2778        let mut d = desired(c.clone(), AudioFormat::Mp3);
2779        d.modes = vec![SourceMode::Copy];
2780        let plan = Plan {
2781            actions: vec![Action::Skip {
2782                clip_id: "s1".to_owned(),
2783            }],
2784        };
2785        let mut manifest = Manifest::new();
2786        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
2787        assert!(!manifest.get("s1").unwrap().preserve);
2788
2789        let outcome = run(
2790            &plan,
2791            &mut manifest,
2792            &[d],
2793            &ScriptedHttp::new(),
2794            &fs_new(),
2795            &StubFfmpeg::flac(),
2796            &RecordingClock::new(),
2797            &ExecOptions::default(),
2798        );
2799
2800        assert_eq!(outcome.skipped, 1);
2801        assert!(
2802            manifest.get("s1").unwrap().preserve,
2803            "a copy-held skip must mark the entry preserved"
2804        );
2805    }
2806
2807    #[test]
2808    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
2809        let c = clip("s2");
2810        let d = desired(c.clone(), AudioFormat::Mp3);
2811        let plan = Plan {
2812            actions: vec![Action::Skip {
2813                clip_id: "s2".to_owned(),
2814            }],
2815        };
2816        let mut manifest = Manifest::new();
2817        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
2818        stale.preserve = true;
2819        manifest.insert("s2".to_owned(), stale);
2820
2821        run(
2822            &plan,
2823            &mut manifest,
2824            &[d],
2825            &ScriptedHttp::new(),
2826            &fs_new(),
2827            &StubFfmpeg::flac(),
2828            &RecordingClock::new(),
2829            &ExecOptions::default(),
2830        );
2831
2832        assert!(
2833            !manifest.get("s2").unwrap().preserve,
2834            "a mirror-only skip must clear a stale preserve marker"
2835        );
2836    }
2837
2838    #[test]
2839    fn flac_render_retries_a_rate_limited_wav_lookup() {
2840        let c = clip("rl");
2841        let d = desired(c.clone(), AudioFormat::Flac);
2842        let plan = Plan {
2843            actions: vec![Action::Download {
2844                clip: c.clone(),
2845                lineage: LineageContext::own_root(&c),
2846                path: d.path.clone(),
2847                format: AudioFormat::Flac,
2848            }],
2849        };
2850        let http = ScriptedHttp::new()
2851            .with_auth()
2852            .route_seq(
2853                "/wav_file/",
2854                vec![
2855                    Reply::status(429),
2856                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
2857                ],
2858            )
2859            .route("rl.wav", Reply::ok(b"wav".to_vec()));
2860        let clock = RecordingClock::new();
2861        let mut manifest = Manifest::new();
2862
2863        let outcome = run(
2864            &plan,
2865            &mut manifest,
2866            &[d],
2867            &http,
2868            &fs_new(),
2869            &StubFfmpeg::flac(),
2870            &clock,
2871            &small_poll(),
2872        );
2873
2874        assert_eq!(outcome.downloaded, 1);
2875        assert_eq!(outcome.failed(), 0);
2876        // The render was ready on retry, so no fresh convert_wav was needed.
2877        assert_eq!(http.count("/convert_wav/"), 0);
2878        // One transient backoff (1s base), not the 5s poll interval.
2879        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
2880    }
2881
2882    // ── Phase 6: artifact actions ───────────────────────────────────
2883
2884    #[test]
2885    fn write_artifact_fetches_writes_and_updates_manifest() {
2886        // The owning entry exists (its audio was kept this run); WriteArtifact
2887        // fetches the source, writes the sidecar, and records it on the entry.
2888        let mut manifest = Manifest::new();
2889        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2890        let plan = Plan {
2891            actions: vec![Action::WriteArtifact {
2892                kind: ArtifactKind::CoverJpg,
2893                path: "a/cover.jpg".to_owned(),
2894                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2895                hash: "h1".to_owned(),
2896                owner_id: "a".to_owned(),
2897                content: None,
2898            }],
2899        };
2900        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2901        let fs = MemFs::new();
2902
2903        let outcome = run(
2904            &plan,
2905            &mut manifest,
2906            &[],
2907            &http,
2908            &fs,
2909            &StubFfmpeg::flac(),
2910            &RecordingClock::new(),
2911            &ExecOptions::default(),
2912        );
2913
2914        assert_eq!(outcome.artifacts_written, 1);
2915        assert_eq!(outcome.failed(), 0);
2916        assert_eq!(outcome.status, RunStatus::Completed);
2917        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
2918        assert_eq!(
2919            manifest.get("a").unwrap().cover_jpg,
2920            Some(ArtifactState {
2921                path: "a/cover.jpg".to_owned(),
2922                hash: "h1".to_owned(),
2923            })
2924        );
2925    }
2926
2927    #[test]
2928    fn write_text_sidecar_records_slot_with_no_network_fetch() {
2929        // A generated text sidecar carries its body inline, so it is written
2930        // verbatim with NO HTTP fetch and the details slot records its state.
2931        let mut manifest = Manifest::new();
2932        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2933        let plan = Plan {
2934            actions: vec![Action::WriteArtifact {
2935                kind: ArtifactKind::DetailsTxt,
2936                path: "a.details.txt".to_owned(),
2937                source_url: String::new(),
2938                hash: "dh".to_owned(),
2939                owner_id: "a".to_owned(),
2940                content: Some("Title: A\n".to_owned()),
2941            }],
2942        };
2943        // An empty HTTP script: any fetch would fail, proving none happens.
2944        let http = ScriptedHttp::new();
2945        let fs = MemFs::new();
2946
2947        let outcome = run(
2948            &plan,
2949            &mut manifest,
2950            &[],
2951            &http,
2952            &fs,
2953            &StubFfmpeg::flac(),
2954            &RecordingClock::new(),
2955            &ExecOptions::default(),
2956        );
2957
2958        assert_eq!(outcome.artifacts_written, 1);
2959        assert_eq!(outcome.failed(), 0);
2960        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
2961        assert_eq!(
2962            manifest.get("a").unwrap().details_txt,
2963            Some(ArtifactState {
2964                path: "a.details.txt".to_owned(),
2965                hash: "dh".to_owned(),
2966            })
2967        );
2968    }
2969
2970    #[test]
2971    fn write_lyrics_sidecar_relocation_removes_old_file() {
2972        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
2973        // the executor writes the new file and prunes the stale one.
2974        let mut manifest = Manifest::new();
2975        let mut e = entry("old/a.flac", AudioFormat::Flac);
2976        e.lyrics_txt = Some(ArtifactState {
2977            path: "old/a.lyrics.txt".to_owned(),
2978            hash: "lh".to_owned(),
2979        });
2980        manifest.insert("a", e);
2981        let fs = MemFs::new()
2982            .with_file("old/a.flac", b"AUDIO".to_vec())
2983            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
2984        let plan = Plan {
2985            actions: vec![Action::WriteArtifact {
2986                kind: ArtifactKind::LyricsTxt,
2987                path: "new/a.lyrics.txt".to_owned(),
2988                source_url: String::new(),
2989                hash: "lh".to_owned(),
2990                owner_id: "a".to_owned(),
2991                content: Some("new words\n".to_owned()),
2992            }],
2993        };
2994
2995        let outcome = run(
2996            &plan,
2997            &mut manifest,
2998            &[],
2999            &ScriptedHttp::new(),
3000            &fs,
3001            &StubFfmpeg::flac(),
3002            &RecordingClock::new(),
3003            &ExecOptions::default(),
3004        );
3005
3006        assert_eq!(outcome.failed(), 0);
3007        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3008        assert!(!fs.exists("old/a.lyrics.txt"));
3009        assert_eq!(
3010            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3011            "new/a.lyrics.txt"
3012        );
3013    }
3014
3015    #[test]
3016    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3017        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3018        // Each write's inline old-path cleanup must skip a path another action
3019        // writes this run, or the second write would delete the first's freshly
3020        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3021        // for every sidecar, including the .mp4 video.
3022        let mut manifest = Manifest::new();
3023        let mut a = entry("a.flac", AudioFormat::Flac);
3024        a.lyrics_txt = Some(ArtifactState {
3025            path: "x.lyrics.txt".to_owned(),
3026            hash: "ah".to_owned(),
3027        });
3028        manifest.insert("a", a);
3029        let mut b = entry("b.flac", AudioFormat::Flac);
3030        b.lyrics_txt = Some(ArtifactState {
3031            path: "y.lyrics.txt".to_owned(),
3032            hash: "bh".to_owned(),
3033        });
3034        manifest.insert("b", b);
3035        let fs = MemFs::new()
3036            .with_file("a.flac", b"A".to_vec())
3037            .with_file("b.flac", b"B".to_vec())
3038            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3039            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3040        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3041        let plan = Plan {
3042            actions: vec![
3043                Action::WriteArtifact {
3044                    kind: ArtifactKind::LyricsTxt,
3045                    path: "y.lyrics.txt".to_owned(),
3046                    source_url: String::new(),
3047                    hash: "ah".to_owned(),
3048                    owner_id: "a".to_owned(),
3049                    content: Some("A words\n".to_owned()),
3050                },
3051                Action::WriteArtifact {
3052                    kind: ArtifactKind::LyricsTxt,
3053                    path: "x.lyrics.txt".to_owned(),
3054                    source_url: String::new(),
3055                    hash: "bh".to_owned(),
3056                    owner_id: "b".to_owned(),
3057                    content: Some("B words\n".to_owned()),
3058                },
3059            ],
3060        };
3061
3062        let outcome = run(
3063            &plan,
3064            &mut manifest,
3065            &[],
3066            &ScriptedHttp::new(),
3067            &fs,
3068            &StubFfmpeg::flac(),
3069            &RecordingClock::new(),
3070            &ExecOptions::default(),
3071        );
3072
3073        assert_eq!(outcome.failed(), 0);
3074        // Both freshly written files survive; neither cleanup clobbered the other.
3075        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3076        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3077        assert_eq!(
3078            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3079            "y.lyrics.txt"
3080        );
3081        assert_eq!(
3082            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3083            "x.lyrics.txt"
3084        );
3085    }
3086
3087    #[test]
3088    fn old_sidecar_kept_when_another_clip_still_references_it() {
3089        // A prior failed swap can leave two clips pointing at one path (A -> y and
3090        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3091        // still A's live file (#76). tracked_paths counts two references to y, so
3092        // the removal is skipped even though y is not a write target this run.
3093        let mut manifest = Manifest::new();
3094        let mut a = entry("a.flac", AudioFormat::Flac);
3095        a.lyrics_txt = Some(ArtifactState {
3096            path: "y.lyrics.txt".to_owned(),
3097            hash: "ah".to_owned(),
3098        });
3099        manifest.insert("a", a);
3100        let mut b = entry("b.flac", AudioFormat::Flac);
3101        b.lyrics_txt = Some(ArtifactState {
3102            path: "y.lyrics.txt".to_owned(),
3103            hash: "bh".to_owned(),
3104        });
3105        manifest.insert("b", b);
3106        let fs = MemFs::new()
3107            .with_file("a.flac", b"A".to_vec())
3108            .with_file("b.flac", b"B".to_vec())
3109            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3110        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3111        // the tracked-reference count is what protects A's file.
3112        let plan = Plan {
3113            actions: vec![Action::WriteArtifact {
3114                kind: ArtifactKind::LyricsTxt,
3115                path: "x.lyrics.txt".to_owned(),
3116                source_url: String::new(),
3117                hash: "bh".to_owned(),
3118                owner_id: "b".to_owned(),
3119                content: Some("B words\n".to_owned()),
3120            }],
3121        };
3122
3123        let outcome = run(
3124            &plan,
3125            &mut manifest,
3126            &[],
3127            &ScriptedHttp::new(),
3128            &fs,
3129            &StubFfmpeg::flac(),
3130            &RecordingClock::new(),
3131            &ExecOptions::default(),
3132        );
3133
3134        assert_eq!(outcome.failed(), 0);
3135        assert!(
3136            fs.exists("y.lyrics.txt"),
3137            "A's live sidecar must not be deleted"
3138        );
3139        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3140    }
3141
3142    #[test]
3143    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3144        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3145        // When BOTH move away this run, the path is no longer live, so the last
3146        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3147        // still referenced. The dynamic reference count drops to zero only after
3148        // both moves, so exactly the final cleanup removes it (#76).
3149        let mut manifest = Manifest::new();
3150        let mut a = entry("a.flac", AudioFormat::Flac);
3151        a.lyrics_txt = Some(ArtifactState {
3152            path: "s.lyrics.txt".to_owned(),
3153            hash: "ah".to_owned(),
3154        });
3155        manifest.insert("a", a);
3156        let mut b = entry("b.flac", AudioFormat::Flac);
3157        b.lyrics_txt = Some(ArtifactState {
3158            path: "s.lyrics.txt".to_owned(),
3159            hash: "bh".to_owned(),
3160        });
3161        manifest.insert("b", b);
3162        let fs = MemFs::new()
3163            .with_file("a.flac", b"A".to_vec())
3164            .with_file("b.flac", b"B".to_vec())
3165            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3166        let plan = Plan {
3167            actions: vec![
3168                Action::WriteArtifact {
3169                    kind: ArtifactKind::LyricsTxt,
3170                    path: "pa.lyrics.txt".to_owned(),
3171                    source_url: String::new(),
3172                    hash: "ah".to_owned(),
3173                    owner_id: "a".to_owned(),
3174                    content: Some("A words\n".to_owned()),
3175                },
3176                Action::WriteArtifact {
3177                    kind: ArtifactKind::LyricsTxt,
3178                    path: "pb.lyrics.txt".to_owned(),
3179                    source_url: String::new(),
3180                    hash: "bh".to_owned(),
3181                    owner_id: "b".to_owned(),
3182                    content: Some("B words\n".to_owned()),
3183                },
3184            ],
3185        };
3186
3187        let outcome = run(
3188            &plan,
3189            &mut manifest,
3190            &[],
3191            &ScriptedHttp::new(),
3192            &fs,
3193            &StubFfmpeg::flac(),
3194            &RecordingClock::new(),
3195            &ExecOptions::default(),
3196        );
3197
3198        assert_eq!(outcome.failed(), 0);
3199        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3200        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3201        assert!(
3202            !fs.exists("s.lyrics.txt"),
3203            "the vacated shared path must be reclaimed, not orphaned"
3204        );
3205    }
3206
3207    #[test]
3208    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3209        // A text sidecar for a clip with no manifest entry (its audio download
3210        // failed) must be skipped, never writing an untracked file.
3211        let plan = Plan {
3212            actions: vec![Action::WriteArtifact {
3213                kind: ArtifactKind::DetailsTxt,
3214                path: "gone.details.txt".to_owned(),
3215                source_url: String::new(),
3216                hash: "dh".to_owned(),
3217                owner_id: "gone".to_owned(),
3218                content: Some("Title: Gone\n".to_owned()),
3219            }],
3220        };
3221        let fs = MemFs::new();
3222        let mut manifest = Manifest::new();
3223
3224        let outcome = run(
3225            &plan,
3226            &mut manifest,
3227            &[],
3228            &ScriptedHttp::new(),
3229            &fs,
3230            &StubFfmpeg::flac(),
3231            &RecordingClock::new(),
3232            &ExecOptions::default(),
3233        );
3234
3235        assert_eq!(outcome.artifacts_written, 0);
3236        assert_eq!(outcome.skipped, 1);
3237        assert!(!fs.exists("gone.details.txt"));
3238        assert!(manifest.get("gone").is_none());
3239    }
3240
3241    #[test]
3242    fn delete_artifact_removes_file_and_clears_slot() {
3243        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3244        let mut manifest = Manifest::new();
3245        let mut e = entry("a.mp3", AudioFormat::Mp3);
3246        e.cover_jpg = Some(ArtifactState {
3247            path: "a/cover.jpg".to_owned(),
3248            hash: "h1".to_owned(),
3249        });
3250        manifest.insert("a", e);
3251        let plan = Plan {
3252            actions: vec![Action::DeleteArtifact {
3253                kind: ArtifactKind::CoverJpg,
3254                path: "a/cover.jpg".to_owned(),
3255                owner_id: "a".to_owned(),
3256            }],
3257        };
3258
3259        let outcome = run(
3260            &plan,
3261            &mut manifest,
3262            &[],
3263            &ScriptedHttp::new(),
3264            &fs,
3265            &StubFfmpeg::flac(),
3266            &RecordingClock::new(),
3267            &ExecOptions::default(),
3268        );
3269
3270        assert_eq!(outcome.artifacts_deleted, 1);
3271        assert!(!fs.exists("a/cover.jpg"));
3272        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3273    }
3274
3275    #[test]
3276    fn delete_artifact_tolerates_already_absent_file() {
3277        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3278        // is not a failure.
3279        let mut manifest = Manifest::new();
3280        let mut e = entry("a.mp3", AudioFormat::Mp3);
3281        e.cover_jpg = Some(ArtifactState {
3282            path: "a/cover.jpg".to_owned(),
3283            hash: "h1".to_owned(),
3284        });
3285        manifest.insert("a", e);
3286        let plan = Plan {
3287            actions: vec![Action::DeleteArtifact {
3288                kind: ArtifactKind::CoverJpg,
3289                path: "a/cover.jpg".to_owned(),
3290                owner_id: "a".to_owned(),
3291            }],
3292        };
3293
3294        let outcome = run(
3295            &plan,
3296            &mut manifest,
3297            &[],
3298            &ScriptedHttp::new(),
3299            &MemFs::new(),
3300            &StubFfmpeg::flac(),
3301            &RecordingClock::new(),
3302            &ExecOptions::default(),
3303        );
3304
3305        assert_eq!(outcome.artifacts_deleted, 1);
3306        assert_eq!(outcome.failed(), 0);
3307        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3308    }
3309
3310    #[test]
3311    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
3312        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
3313        // the run continues and the following WriteArtifact still succeeds.
3314        let mut manifest = Manifest::new();
3315        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3316        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3317        let plan = Plan {
3318            actions: vec![
3319                Action::WriteArtifact {
3320                    kind: ArtifactKind::CoverJpg,
3321                    path: "a/cover.jpg".to_owned(),
3322                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3323                    hash: "h1".to_owned(),
3324                    owner_id: "a".to_owned(),
3325                    content: None,
3326                },
3327                Action::WriteArtifact {
3328                    kind: ArtifactKind::CoverJpg,
3329                    path: "b/cover.jpg".to_owned(),
3330                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3331                    hash: "h2".to_owned(),
3332                    owner_id: "b".to_owned(),
3333                    content: None,
3334                },
3335            ],
3336        };
3337        let http = ScriptedHttp::new()
3338            .route("a/large.jpg", Reply::status(404))
3339            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3340        let fs = MemFs::new();
3341
3342        let outcome = run(
3343            &plan,
3344            &mut manifest,
3345            &[],
3346            &http,
3347            &fs,
3348            &StubFfmpeg::flac(),
3349            &RecordingClock::new(),
3350            &ExecOptions::default(),
3351        );
3352
3353        assert_eq!(outcome.status, RunStatus::Completed);
3354        assert_eq!(outcome.failed(), 1);
3355        assert_eq!(outcome.failures[0].clip_id, "a");
3356        assert_eq!(outcome.artifacts_written, 1);
3357        // The failed sidecar left no file and no manifest record.
3358        assert!(!fs.exists("a/cover.jpg"));
3359        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3360        // The following sidecar was written and recorded.
3361        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3362        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3363    }
3364
3365    #[test]
3366    fn co_delete_executes_audio_delete_then_artifact_delete() {
3367        // The plan orders the audio Delete before its sidecar DeleteArtifact.
3368        // The audio delete removes the manifest entry; the sidecar delete then
3369        // removes the file and tolerates the now-absent entry.
3370        let fs = MemFs::new()
3371            .with_file("gone.mp3", b"DATA".to_vec())
3372            .with_file("gone/cover.jpg", b"jpg".to_vec());
3373        let mut manifest = Manifest::new();
3374        let mut e = entry("gone.mp3", AudioFormat::Mp3);
3375        e.cover_jpg = Some(ArtifactState {
3376            path: "gone/cover.jpg".to_owned(),
3377            hash: "h1".to_owned(),
3378        });
3379        manifest.insert("gone", e);
3380        let plan = Plan {
3381            actions: vec![
3382                Action::Delete {
3383                    path: "gone.mp3".to_owned(),
3384                    clip_id: "gone".to_owned(),
3385                },
3386                Action::DeleteArtifact {
3387                    kind: ArtifactKind::CoverJpg,
3388                    path: "gone/cover.jpg".to_owned(),
3389                    owner_id: "gone".to_owned(),
3390                },
3391            ],
3392        };
3393
3394        let outcome = run(
3395            &plan,
3396            &mut manifest,
3397            &[],
3398            &ScriptedHttp::new(),
3399            &fs,
3400            &StubFfmpeg::flac(),
3401            &RecordingClock::new(),
3402            &ExecOptions::default(),
3403        );
3404
3405        assert_eq!(outcome.deleted, 1);
3406        assert_eq!(outcome.artifacts_deleted, 1);
3407        assert_eq!(outcome.failed(), 0);
3408        assert!(!fs.exists("gone.mp3"));
3409        assert!(!fs.exists("gone/cover.jpg"));
3410        assert!(manifest.get("gone").is_none());
3411    }
3412
3413    #[test]
3414    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
3415        // A clip whose Download fails leaves no manifest entry, so its following
3416        // WriteArtifact must not strand an untracked sidecar: it is skipped with
3417        // no fetch and no write. A following healthy clip still succeeds.
3418        let ca = clip("a");
3419        let plan = Plan {
3420            actions: vec![
3421                Action::Download {
3422                    clip: ca.clone(),
3423                    lineage: LineageContext::own_root(&ca),
3424                    path: "a.mp3".to_owned(),
3425                    format: AudioFormat::Mp3,
3426                },
3427                Action::WriteArtifact {
3428                    kind: ArtifactKind::CoverJpg,
3429                    path: "a/cover.jpg".to_owned(),
3430                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3431                    hash: "h1".to_owned(),
3432                    owner_id: "a".to_owned(),
3433                    content: None,
3434                },
3435                Action::WriteArtifact {
3436                    kind: ArtifactKind::CoverJpg,
3437                    path: "b/cover.jpg".to_owned(),
3438                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3439                    hash: "h2".to_owned(),
3440                    owner_id: "b".to_owned(),
3441                    content: None,
3442                },
3443            ],
3444        };
3445        // The Download's audio 404s (permanent), so no entry for "a" is created.
3446        let http = ScriptedHttp::new()
3447            .route("a.mp3", Reply::status(404))
3448            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
3449            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3450        let fs = MemFs::new();
3451        let mut manifest = Manifest::new();
3452        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
3453        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3454
3455        let outcome = run(
3456            &plan,
3457            &mut manifest,
3458            &[],
3459            &http,
3460            &fs,
3461            &StubFfmpeg::flac(),
3462            &RecordingClock::new(),
3463            &ExecOptions::default(),
3464        );
3465
3466        assert_eq!(outcome.status, RunStatus::Completed);
3467        // The audio download is the only failure; the orphan artifact is skipped.
3468        assert_eq!(outcome.failed(), 1);
3469        assert_eq!(outcome.failures[0].clip_id, "a");
3470        assert_eq!(outcome.skipped, 1);
3471        // The orphan sidecar was neither fetched nor written, and left no record.
3472        assert_eq!(http.count("a/large.jpg"), 0);
3473        assert!(!fs.exists("a/cover.jpg"));
3474        assert!(manifest.get("a").is_none());
3475        // The healthy clip's sidecar still succeeded.
3476        assert_eq!(outcome.artifacts_written, 1);
3477        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3478        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3479    }
3480
3481    #[test]
3482    fn write_artifact_transcodes_animated_cover_to_webp() {
3483        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
3484        // port, and writes the transcoded WebP (not the fetched MP4), recording
3485        // the sidecar on the owning entry.
3486        let mut manifest = Manifest::new();
3487        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3488        let plan = Plan {
3489            actions: vec![Action::WriteArtifact {
3490                kind: ArtifactKind::CoverWebp,
3491                path: "a/cover.webp".to_owned(),
3492                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
3493                hash: "v1".to_owned(),
3494                owner_id: "a".to_owned(),
3495                content: None,
3496            }],
3497        };
3498        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
3499        let fs = MemFs::new();
3500        let ffmpeg = StubFfmpeg::webp();
3501
3502        let outcome = run(
3503            &plan,
3504            &mut manifest,
3505            &[],
3506            &http,
3507            &fs,
3508            &ffmpeg,
3509            &RecordingClock::new(),
3510            &ExecOptions::default(),
3511        );
3512
3513        assert_eq!(outcome.artifacts_written, 1);
3514        assert_eq!(outcome.failed(), 0);
3515        assert_eq!(outcome.status, RunStatus::Completed);
3516        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
3517        assert_eq!(http.count("a/video.mp4"), 1);
3518        let written = fs.read_file("a/cover.webp").unwrap();
3519        assert_ne!(written, b"mp4-bytes");
3520        assert!(written.starts_with(b"RIFF"));
3521        assert_eq!(
3522            manifest.get("a").unwrap().cover_webp,
3523            Some(ArtifactState {
3524                path: "a/cover.webp".to_owned(),
3525                hash: "v1".to_owned(),
3526            })
3527        );
3528    }
3529
3530    #[test]
3531    fn write_artifact_webp_transcode_failure_is_per_clip() {
3532        // A transcode failure is attributed to the owning clip: it is a per-clip
3533        // failure, the run completes, no sidecar is written, and the slot stays
3534        // empty. A healthy static cover in the same run still succeeds.
3535        let mut manifest = Manifest::new();
3536        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3537        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3538        let plan = Plan {
3539            actions: vec![
3540                Action::WriteArtifact {
3541                    kind: ArtifactKind::CoverWebp,
3542                    path: "a/cover.webp".to_owned(),
3543                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
3544                    hash: "v1".to_owned(),
3545                    owner_id: "a".to_owned(),
3546                    content: None,
3547                },
3548                Action::WriteArtifact {
3549                    kind: ArtifactKind::CoverJpg,
3550                    path: "b/cover.jpg".to_owned(),
3551                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3552                    hash: "h1".to_owned(),
3553                    owner_id: "b".to_owned(),
3554                    content: None,
3555                },
3556            ],
3557        };
3558        let http = ScriptedHttp::new()
3559            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
3560            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3561        let fs = MemFs::new();
3562
3563        let outcome = run(
3564            &plan,
3565            &mut manifest,
3566            &[],
3567            &http,
3568            &fs,
3569            &StubFfmpeg::failing(),
3570            &RecordingClock::new(),
3571            &ExecOptions::default(),
3572        );
3573
3574        assert_eq!(outcome.status, RunStatus::Completed);
3575        assert_eq!(outcome.failed(), 1);
3576        assert_eq!(outcome.failures[0].clip_id, "a");
3577        // The animated cover failed to transcode: nothing written, slot empty.
3578        assert!(!fs.exists("a/cover.webp"));
3579        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
3580        // The static cover in the same run still succeeded.
3581        assert_eq!(outcome.artifacts_written, 1);
3582        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3583        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3584    }
3585
3586    // ── Phase 8: folder art routes to the album store ───────────────
3587
3588    #[test]
3589    fn folder_jpg_write_records_album_state_and_skips_manifest() {
3590        // Folder art is owned by the album root id, not a manifest clip: it
3591        // writes even with an empty manifest and records on the album store.
3592        let mut manifest = Manifest::new();
3593        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3594        let plan = Plan {
3595            actions: vec![Action::WriteArtifact {
3596                kind: ArtifactKind::FolderJpg,
3597                path: "creator/album/folder.jpg".to_owned(),
3598                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
3599                hash: "jh".to_owned(),
3600                owner_id: "root".to_owned(),
3601                content: None,
3602            }],
3603        };
3604        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
3605        let fs = MemFs::new();
3606
3607        let outcome = run_with_albums(
3608            &plan,
3609            &mut manifest,
3610            &mut albums,
3611            &[],
3612            &http,
3613            &fs,
3614            &StubFfmpeg::flac(),
3615            &RecordingClock::new(),
3616            &ExecOptions::default(),
3617        );
3618
3619        assert_eq!(outcome.artifacts_written, 1);
3620        assert_eq!(outcome.status, RunStatus::Completed);
3621        assert_eq!(
3622            fs.read_file("creator/album/folder.jpg").unwrap(),
3623            b"folder-jpg"
3624        );
3625        assert_eq!(
3626            albums.get("root").unwrap().folder_jpg,
3627            Some(ArtifactState {
3628                path: "creator/album/folder.jpg".to_owned(),
3629                hash: "jh".to_owned(),
3630            })
3631        );
3632        assert!(manifest.get("root").is_none());
3633    }
3634
3635    #[test]
3636    fn folder_webp_write_transcodes_and_records_album_state() {
3637        let mut manifest = Manifest::new();
3638        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3639        let plan = Plan {
3640            actions: vec![Action::WriteArtifact {
3641                kind: ArtifactKind::FolderWebp,
3642                path: "creator/album/cover.webp".to_owned(),
3643                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
3644                hash: "wh".to_owned(),
3645                owner_id: "root".to_owned(),
3646                content: None,
3647            }],
3648        };
3649        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
3650        let fs = MemFs::new();
3651
3652        let outcome = run_with_albums(
3653            &plan,
3654            &mut manifest,
3655            &mut albums,
3656            &[],
3657            &http,
3658            &fs,
3659            &StubFfmpeg::webp(),
3660            &RecordingClock::new(),
3661            &ExecOptions::default(),
3662        );
3663
3664        assert_eq!(outcome.artifacts_written, 1);
3665        assert_eq!(outcome.failed(), 0);
3666        // The MP4 was transcoded to WebP, not written verbatim.
3667        let written = fs.read_file("creator/album/cover.webp").unwrap();
3668        assert_ne!(written, b"mp4-bytes");
3669        assert!(written.starts_with(b"RIFF"));
3670        assert_eq!(
3671            albums.get("root").unwrap().folder_webp,
3672            Some(ArtifactState {
3673                path: "creator/album/cover.webp".to_owned(),
3674                hash: "wh".to_owned(),
3675            })
3676        );
3677    }
3678
3679    #[test]
3680    fn folder_art_delete_clears_album_state() {
3681        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
3682        let mut manifest = Manifest::new();
3683        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3684        albums.insert(
3685            "root".to_owned(),
3686            AlbumArt {
3687                folder_jpg: Some(ArtifactState {
3688                    path: "creator/album/folder.jpg".to_owned(),
3689                    hash: "jh".to_owned(),
3690                }),
3691                folder_webp: None,
3692            },
3693        );
3694        let plan = Plan {
3695            actions: vec![Action::DeleteArtifact {
3696                kind: ArtifactKind::FolderJpg,
3697                path: "creator/album/folder.jpg".to_owned(),
3698                owner_id: "root".to_owned(),
3699            }],
3700        };
3701
3702        let outcome = run_with_albums(
3703            &plan,
3704            &mut manifest,
3705            &mut albums,
3706            &[],
3707            &ScriptedHttp::new(),
3708            &fs,
3709            &StubFfmpeg::flac(),
3710            &RecordingClock::new(),
3711            &ExecOptions::default(),
3712        );
3713
3714        assert_eq!(outcome.artifacts_deleted, 1);
3715        assert!(!fs.exists("creator/album/folder.jpg"));
3716        // The album row had only the one kind, so it is pruned entirely.
3717        assert!(!albums.contains_key("root"));
3718    }
3719
3720    // ── Phase 9: playlist artifacts ─────────────────────────────────
3721
3722    #[test]
3723    fn playlist_write_uses_inline_content_and_records_state() {
3724        // A playlist body is generated, carried inline. With an empty manifest
3725        // and NO http routes, the write still succeeds — proving it skipped the
3726        // network — and records the playlist store keyed by the playlist id.
3727        let mut manifest = Manifest::new();
3728        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3729        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
3730        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
3731        let plan = Plan {
3732            actions: vec![Action::WriteArtifact {
3733                kind: ArtifactKind::Playlist,
3734                path: "Road Trip.m3u8".to_owned(),
3735                source_url: String::new(),
3736                hash: "ph1".to_owned(),
3737                owner_id: "pl1".to_owned(),
3738                content: Some(body.to_owned()),
3739            }],
3740        };
3741        let fs = MemFs::new();
3742
3743        let outcome = run_full(
3744            &plan,
3745            &mut manifest,
3746            &mut albums,
3747            &mut playlists,
3748            &[],
3749            &ScriptedHttp::new(),
3750            &fs,
3751            &StubFfmpeg::flac(),
3752            &RecordingClock::new(),
3753            &ExecOptions::default(),
3754        );
3755
3756        assert_eq!(outcome.artifacts_written, 1);
3757        assert_eq!(outcome.failed(), 0);
3758        // The exact inline bytes were written, verbatim.
3759        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
3760        assert_eq!(
3761            playlists.get("pl1"),
3762            Some(&PlaylistState {
3763                name: "Road Trip".to_owned(),
3764                path: "Road Trip.m3u8".to_owned(),
3765                hash: "ph1".to_owned(),
3766            })
3767        );
3768    }
3769
3770    #[test]
3771    fn playlist_delete_removes_file_and_clears_state() {
3772        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
3773        let mut manifest = Manifest::new();
3774        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3775        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
3776        playlists.insert(
3777            "pl1".to_owned(),
3778            PlaylistState {
3779                name: "Old".to_owned(),
3780                path: "Old.m3u8".to_owned(),
3781                hash: "ph1".to_owned(),
3782            },
3783        );
3784        let plan = Plan {
3785            actions: vec![Action::DeleteArtifact {
3786                kind: ArtifactKind::Playlist,
3787                path: "Old.m3u8".to_owned(),
3788                owner_id: "pl1".to_owned(),
3789            }],
3790        };
3791
3792        let outcome = run_full(
3793            &plan,
3794            &mut manifest,
3795            &mut albums,
3796            &mut playlists,
3797            &[],
3798            &ScriptedHttp::new(),
3799            &fs,
3800            &StubFfmpeg::flac(),
3801            &RecordingClock::new(),
3802            &ExecOptions::default(),
3803        );
3804
3805        assert_eq!(outcome.artifacts_deleted, 1);
3806        assert!(!fs.exists("Old.m3u8"));
3807        assert!(
3808            !playlists.contains_key("pl1"),
3809            "the playlist row is cleared on delete"
3810        );
3811    }
3812
3813    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
3814
3815    #[test]
3816    fn rename_move_relocates_cover_and_prunes_old_album() {
3817        // A title/album change moves the audio (Rename) and re-emits the cover
3818        // at the NEW path. The old cover must be removed and the now-empty old
3819        // album directory pruned, leaving no orphan sidecar and no ghost dir.
3820        let mut manifest = Manifest::new();
3821        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
3822        e.cover_jpg = Some(ArtifactState {
3823            path: "Creator/AlbumA/cover.jpg".to_owned(),
3824            hash: "h1".to_owned(),
3825        });
3826        manifest.insert("a", e);
3827        let fs = MemFs::new()
3828            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
3829            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
3830        let plan = Plan {
3831            actions: vec![
3832                Action::Rename {
3833                    from: "Creator/AlbumA/song.flac".to_owned(),
3834                    to: "Creator/AlbumB/song.flac".to_owned(),
3835                },
3836                Action::WriteArtifact {
3837                    kind: ArtifactKind::CoverJpg,
3838                    path: "Creator/AlbumB/cover.jpg".to_owned(),
3839                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3840                    hash: "h1".to_owned(),
3841                    owner_id: "a".to_owned(),
3842                    content: None,
3843                },
3844            ],
3845        };
3846        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
3847
3848        let outcome = run(
3849            &plan,
3850            &mut manifest,
3851            &[],
3852            &http,
3853            &fs,
3854            &StubFfmpeg::flac(),
3855            &RecordingClock::new(),
3856            &ExecOptions::default(),
3857        );
3858
3859        assert_eq!(outcome.failed(), 0);
3860        // Audio moved, the new cover was written, the old cover removed.
3861        assert!(fs.exists("Creator/AlbumB/song.flac"));
3862        assert_eq!(
3863            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
3864            b"new-jpg"
3865        );
3866        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
3867        assert!(!fs.exists("Creator/AlbumA/song.flac"));
3868        // The manifest cover slot now points at the new path.
3869        assert_eq!(
3870            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
3871            "Creator/AlbumB/cover.jpg"
3872        );
3873        // The emptied old album directory is pruned; the new one survives.
3874        assert!(!fs.has_dir("Creator/AlbumA"));
3875        assert!(fs.has_dir("Creator/AlbumB"));
3876    }
3877
3878    #[test]
3879    fn rename_move_relocates_folder_art_and_prunes_old_album() {
3880        // An album rename moves folder.jpg: the old file is removed, the album
3881        // store slot advanced to the new path, and the emptied dir pruned.
3882        let mut manifest = Manifest::new();
3883        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
3884        albums.insert(
3885            "root".to_owned(),
3886            AlbumArt {
3887                folder_jpg: Some(ArtifactState {
3888                    path: "Creator/AlbumA/folder.jpg".to_owned(),
3889                    hash: "jh".to_owned(),
3890                }),
3891                folder_webp: None,
3892            },
3893        );
3894        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
3895        let plan = Plan {
3896            actions: vec![Action::WriteArtifact {
3897                kind: ArtifactKind::FolderJpg,
3898                path: "Creator/AlbumB/folder.jpg".to_owned(),
3899                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
3900                hash: "jh".to_owned(),
3901                owner_id: "root".to_owned(),
3902                content: None,
3903            }],
3904        };
3905        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
3906
3907        let outcome = run_with_albums(
3908            &plan,
3909            &mut manifest,
3910            &mut albums,
3911            &[],
3912            &http,
3913            &fs,
3914            &StubFfmpeg::flac(),
3915            &RecordingClock::new(),
3916            &ExecOptions::default(),
3917        );
3918
3919        assert_eq!(outcome.failed(), 0);
3920        assert_eq!(
3921            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
3922            b"new-folder"
3923        );
3924        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
3925        assert_eq!(
3926            albums
3927                .get("root")
3928                .unwrap()
3929                .folder_jpg
3930                .as_ref()
3931                .unwrap()
3932                .path,
3933            "Creator/AlbumB/folder.jpg"
3934        );
3935        assert!(!fs.has_dir("Creator/AlbumA"));
3936        assert!(fs.has_dir("Creator/AlbumB"));
3937    }
3938
3939    #[test]
3940    fn prune_empty_dirs_removes_only_empty_dirs() {
3941        // A direct exercise of the prune port's safety guarantees on a mixed
3942        // tree: nested empties go, anything holding a file (hidden ones too)
3943        // stays, and no file is touched.
3944        let fs = MemFs::new()
3945            .with_file("keep/full/song.flac", b"x".to_vec())
3946            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
3947            .with_dir("empty/leaf")
3948            .with_dir("nested/a/b/c");
3949
3950        fs.prune_empty_dirs("").unwrap();
3951
3952        // Every empty directory, however deeply nested, is pruned bottom-up.
3953        for gone in [
3954            "empty",
3955            "empty/leaf",
3956            "nested",
3957            "nested/a",
3958            "nested/a/b",
3959            "nested/a/b/c",
3960        ] {
3961            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
3962        }
3963        // A directory holding any file — including only a hidden dotfile — stays.
3964        assert!(fs.has_dir("keep"));
3965        assert!(fs.has_dir("keep/full"));
3966        assert!(fs.has_dir("hidden"));
3967        // No file was touched.
3968        assert!(fs.exists("keep/full/song.flac"));
3969        assert!(fs.exists("hidden/.suno-manifest.json"));
3970    }
3971
3972    #[test]
3973    fn prune_empty_dirs_never_removes_the_named_root() {
3974        // Pruning under a named root clears its empty children but keeps the
3975        // root itself, even when the root is now empty.
3976        let fs = MemFs::new().with_dir("empty/leaf");
3977        fs.prune_empty_dirs("empty").unwrap();
3978        assert!(fs.has_dir("empty"), "the named root is never removed");
3979        assert!(!fs.has_dir("empty/leaf"));
3980    }
3981
3982    #[test]
3983    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
3984        // If removing the old sidecar fails, the write is a per-clip failure
3985        // that never aborts the run and does NOT advance the state slot, so the
3986        // next identical run re-attempts the cleanup and the tree converges.
3987        let mut manifest = Manifest::new();
3988        let mut e = entry("a.flac", AudioFormat::Flac);
3989        e.cover_jpg = Some(ArtifactState {
3990            path: "AlbumA/cover.jpg".to_owned(),
3991            hash: "h1".to_owned(),
3992        });
3993        manifest.insert("a", e);
3994        let fs = MemFs::new()
3995            .with_file("a.flac", b"AUDIO".to_vec())
3996            .with_file("AlbumA/cover.jpg", b"old".to_vec());
3997        let plan = Plan {
3998            actions: vec![Action::WriteArtifact {
3999                kind: ArtifactKind::CoverJpg,
4000                path: "AlbumB/cover.jpg".to_owned(),
4001                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4002                hash: "h1".to_owned(),
4003                owner_id: "a".to_owned(),
4004                content: None,
4005            }],
4006        };
4007        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
4008
4009        // Run 1: the old-cover remove is forced to fail.
4010        fs.arm_fail_remove("AlbumA/cover.jpg");
4011        let first = run(
4012            &plan,
4013            &mut manifest,
4014            &[],
4015            &http,
4016            &fs,
4017            &StubFfmpeg::flac(),
4018            &RecordingClock::new(),
4019            &ExecOptions::default(),
4020        );
4021        assert_eq!(
4022            first.status,
4023            RunStatus::Completed,
4024            "a remove failure never aborts the run"
4025        );
4026        assert_eq!(first.failed(), 1);
4027        // The new cover is written but the old one lingers and the slot is stale.
4028        assert!(fs.exists("AlbumB/cover.jpg"));
4029        assert!(fs.exists("AlbumA/cover.jpg"));
4030        assert_eq!(
4031            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4032            "AlbumA/cover.jpg"
4033        );
4034        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
4035
4036        // Run 2: the same plan re-runs with the fault cleared and converges.
4037        fs.disarm_fail_remove("AlbumA/cover.jpg");
4038        let second = run(
4039            &plan,
4040            &mut manifest,
4041            &[],
4042            &http,
4043            &fs,
4044            &StubFfmpeg::flac(),
4045            &RecordingClock::new(),
4046            &ExecOptions::default(),
4047        );
4048        assert_eq!(second.failed(), 0);
4049        assert!(fs.exists("AlbumB/cover.jpg"));
4050        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
4051        assert_eq!(
4052            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4053            "AlbumB/cover.jpg"
4054        );
4055        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
4056    }
4057
4058    #[test]
4059    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
4060        // The idempotent case: a content-only cover rewrite (hash drift, path
4061        // unchanged) attempts no remove and prunes no live directory. A remove
4062        // failure is armed on the cover path, so any spurious remove would
4063        // surface as a failure — none does.
4064        let mut manifest = Manifest::new();
4065        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
4066        e.cover_jpg = Some(ArtifactState {
4067            path: "Album/cover.jpg".to_owned(),
4068            hash: "h1".to_owned(),
4069        });
4070        manifest.insert("a", e);
4071        let fs = MemFs::new()
4072            .with_file("Album/a.mp3", b"AUDIO".to_vec())
4073            .with_file("Album/cover.jpg", b"old".to_vec());
4074        fs.arm_fail_remove("Album/cover.jpg");
4075        let plan = Plan {
4076            actions: vec![Action::WriteArtifact {
4077                kind: ArtifactKind::CoverJpg,
4078                path: "Album/cover.jpg".to_owned(),
4079                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4080                hash: "h2".to_owned(),
4081                owner_id: "a".to_owned(),
4082                content: None,
4083            }],
4084        };
4085        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
4086
4087        let outcome = run(
4088            &plan,
4089            &mut manifest,
4090            &[],
4091            &http,
4092            &fs,
4093            &StubFfmpeg::flac(),
4094            &RecordingClock::new(),
4095            &ExecOptions::default(),
4096        );
4097
4098        assert_eq!(
4099            outcome.failed(),
4100            0,
4101            "no remove is attempted, so the armed failure never fires"
4102        );
4103        assert_eq!(outcome.artifacts_written, 1);
4104        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
4105        assert_eq!(
4106            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
4107            "h2"
4108        );
4109        // The live directory is untouched by prune.
4110        assert!(fs.has_dir("Album"));
4111    }
4112
4113    // ── Concurrency (issue #22) ─────────────────────────────────────
4114
4115    mod concurrency {
4116        use super::*;
4117        use crate::ffmpeg::FfmpegError;
4118        use crate::fs::{FileStat, FsError};
4119        use crate::http::{HttpRequest, TransportError};
4120        use std::future::Future;
4121        use std::pin::Pin;
4122        use std::sync::Arc;
4123        use std::sync::atomic::{AtomicUsize, Ordering};
4124        use std::task::{Context, Poll};
4125
4126        /// A future that pends exactly once before resolving, waking itself so a
4127        /// single-threaded executor re-polls. It forces the [`Http`] port to
4128        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
4129        /// each in-flight request and the true overlap becomes observable.
4130        #[derive(Default)]
4131        struct YieldOnce {
4132            yielded: bool,
4133        }
4134
4135        impl Future for YieldOnce {
4136            type Output = ();
4137            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
4138                if self.yielded {
4139                    Poll::Ready(())
4140                } else {
4141                    self.yielded = true;
4142                    cx.waker().wake_by_ref();
4143                    Poll::Pending
4144                }
4145            }
4146        }
4147
4148        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
4149        /// number of concurrently in-flight requests. Each `send` bumps a live
4150        /// counter, yields once (so peers can start), then delegates.
4151        struct GatedHttp {
4152            inner: ScriptedHttp,
4153            inflight: Arc<AtomicUsize>,
4154            peak: Arc<AtomicUsize>,
4155        }
4156
4157        impl GatedHttp {
4158            fn new(inner: ScriptedHttp) -> Self {
4159                Self {
4160                    inner,
4161                    inflight: Arc::new(AtomicUsize::new(0)),
4162                    peak: Arc::new(AtomicUsize::new(0)),
4163                }
4164            }
4165
4166            fn peak(&self) -> usize {
4167                self.peak.load(Ordering::SeqCst)
4168            }
4169        }
4170
4171        impl Http for GatedHttp {
4172            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
4173                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
4174                self.peak.fetch_max(now, Ordering::SeqCst);
4175                YieldOnce::default().await;
4176                let out = self.inner.send(request).await;
4177                self.inflight.fetch_sub(1, Ordering::SeqCst);
4178                out
4179            }
4180        }
4181
4182        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
4183            let c = clip(id);
4184            let d = desired(c.clone(), format);
4185            let action = Action::Download {
4186                clip: c.clone(),
4187                lineage: LineageContext::own_root(&c),
4188                path: d.path.clone(),
4189                format,
4190            };
4191            (c, d, action)
4192        }
4193
4194        fn opts_with(concurrency: u32) -> ExecOptions {
4195            ExecOptions {
4196                concurrency,
4197                ..small_poll()
4198            }
4199        }
4200
4201        #[test]
4202        fn concurrency_never_exceeds_the_configured_bound() {
4203            let count = 6;
4204            let concurrency = 3;
4205            let mut scripted = ScriptedHttp::new().with_auth();
4206            let mut actions = Vec::new();
4207            let mut desireds = Vec::new();
4208            for i in 0..count {
4209                let id = format!("c{i}");
4210                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
4211                let (_c, d, action) = download(&id, AudioFormat::Mp3);
4212                actions.push(action);
4213                desireds.push(d);
4214            }
4215            let http = GatedHttp::new(scripted);
4216            let fs = MemFs::new();
4217            let plan = Plan { actions };
4218            let mut manifest = Manifest::new();
4219
4220            let outcome = run_gated_fs(
4221                &plan,
4222                &mut manifest,
4223                &desireds,
4224                &http,
4225                &fs,
4226                &opts_with(concurrency),
4227            );
4228
4229            assert_eq!(outcome.downloaded, count);
4230            assert!(
4231                http.peak() <= concurrency as usize,
4232                "peak {} exceeded the bound {concurrency}",
4233                http.peak()
4234            );
4235            assert_eq!(
4236                http.peak(),
4237                concurrency as usize,
4238                "expected the run to saturate the bound"
4239            );
4240        }
4241
4242        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
4243        /// outcome. The client is built here so the limiter can be inspected by
4244        /// the caller-facing variant below.
4245        fn run_gated_fs(
4246            plan: &Plan,
4247            manifest: &mut Manifest,
4248            desired: &[Desired],
4249            http: &GatedHttp,
4250            fs: &MemFs,
4251            opts: &ExecOptions,
4252        ) -> ExecOutcome {
4253            let ffmpeg = StubFfmpeg::flac();
4254            let clock = RecordingClock::new();
4255            let mut albums = BTreeMap::new();
4256            let mut playlists = BTreeMap::new();
4257            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
4258            pollster::block_on(execute(
4259                plan,
4260                manifest,
4261                &mut albums,
4262                &mut playlists,
4263                desired,
4264                &HashMap::new(),
4265                Ports {
4266                    client: &mut client,
4267                    http,
4268                    fs,
4269                    ffmpeg: &ffmpeg,
4270                    clock: &clock,
4271                },
4272                opts,
4273            ))
4274        }
4275
4276        #[test]
4277        fn a_failing_clip_does_not_abort_the_others() {
4278            let mut scripted = ScriptedHttp::new().with_auth();
4279            scripted = scripted
4280                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
4281                .route("bad.mp3", Reply::status(404))
4282                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
4283            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
4284            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
4285            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
4286            let http = GatedHttp::new(scripted);
4287            let fs = MemFs::new();
4288            let plan = Plan {
4289                actions: vec![a1, a2, a3],
4290            };
4291            let mut manifest = Manifest::new();
4292
4293            let outcome = run_gated_fs(
4294                &plan,
4295                &mut manifest,
4296                &[d1, d2, d3],
4297                &http,
4298                &fs,
4299                &opts_with(3),
4300            );
4301
4302            assert_eq!(outcome.downloaded, 2);
4303            assert_eq!(outcome.failed(), 1);
4304            assert_eq!(outcome.status, RunStatus::Completed);
4305            assert_eq!(outcome.failures[0].clip_id, "bad");
4306            assert!(manifest.get("ok1").is_some());
4307            assert!(manifest.get("ok2").is_some());
4308            assert!(manifest.get("bad").is_none());
4309        }
4310
4311        #[test]
4312        fn outcome_is_identical_across_concurrency_levels() {
4313            // A plan mixing successful and failing downloads with serial phase-2
4314            // actions (a skip and a delete), so both phases contribute.
4315            fn build() -> (Plan, Vec<Desired>) {
4316                let mut actions = Vec::new();
4317                let mut desireds = Vec::new();
4318                for id in ["a", "b", "c", "d"] {
4319                    let (_c, d, action) = download(id, AudioFormat::Mp3);
4320                    actions.push(action);
4321                    desireds.push(d);
4322                }
4323                // A failing download in the middle of the audio set.
4324                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
4325                actions.insert(2, ae);
4326                desireds.push(de);
4327                // Phase-2 actions.
4328                actions.push(Action::Skip {
4329                    clip_id: "gone".to_owned(),
4330                });
4331                actions.push(Action::Delete {
4332                    path: "old.mp3".to_owned(),
4333                    clip_id: "old".to_owned(),
4334                });
4335                (Plan { actions }, desireds)
4336            }
4337
4338            fn http() -> ScriptedHttp {
4339                ScriptedHttp::new()
4340                    .with_auth()
4341                    .route("a.mp3", Reply::ok(b"a".to_vec()))
4342                    .route("b.mp3", Reply::ok(b"b".to_vec()))
4343                    .route("c.mp3", Reply::ok(b"c".to_vec()))
4344                    .route("d.mp3", Reply::ok(b"d".to_vec()))
4345                    .route("fail.mp3", Reply::status(404))
4346            }
4347
4348            fn seed_manifest() -> Manifest {
4349                let mut m = Manifest::new();
4350                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
4351                m
4352            }
4353
4354            let (plan, desireds) = build();
4355
4356            let mut m1 = seed_manifest();
4357            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
4358            let out1 = run_gated_fs(
4359                &plan,
4360                &mut m1,
4361                &desireds,
4362                &GatedHttp::new(http()),
4363                &fs1,
4364                &opts_with(1),
4365            );
4366
4367            let mut m8 = seed_manifest();
4368            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
4369            let out8 = run_gated_fs(
4370                &plan,
4371                &mut m8,
4372                &desireds,
4373                &GatedHttp::new(http()),
4374                &fs8,
4375                &opts_with(8),
4376            );
4377
4378            assert_eq!(out1, out8, "outcome must not depend on concurrency");
4379            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
4380            assert_eq!(out8.downloaded, 4);
4381            assert_eq!(out8.deleted, 1);
4382            assert_eq!(out8.skipped, 1);
4383            assert_eq!(out8.failed(), 1);
4384        }
4385
4386        #[test]
4387        fn a_systemic_disk_full_aborts_promptly() {
4388            let count = 8;
4389            let concurrency = 2;
4390            let mut scripted = ScriptedHttp::new().with_auth();
4391            let mut actions = Vec::new();
4392            let mut desireds = Vec::new();
4393            for i in 0..count {
4394                let id = format!("d{i}");
4395                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
4396                let (_c, d, action) = download(&id, AudioFormat::Mp3);
4397                actions.push(action);
4398                desireds.push(d);
4399            }
4400            // The very first clip's write hits ENOSPC, a systemic failure.
4401            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
4402            let http = GatedHttp::new(scripted);
4403            let plan = Plan { actions };
4404            let mut manifest = Manifest::new();
4405
4406            let outcome = run_gated_fs(
4407                &plan,
4408                &mut manifest,
4409                &desireds,
4410                &http,
4411                &fs,
4412                &opts_with(concurrency),
4413            );
4414
4415            assert_eq!(outcome.status, RunStatus::DiskFull);
4416            assert!(
4417                outcome.downloaded < count,
4418                "a systemic abort must stop remaining work, downloaded {}",
4419                outcome.downloaded
4420            );
4421        }
4422
4423        #[test]
4424        fn limiter_records_a_rate_limit_under_concurrent_calls() {
4425            // Three concurrent FLAC renders; exactly one clip is throttled once
4426            // on its wav_file read. The shared limiter must record that single
4427            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
4428            // the mutex keeps the AIMD state correct under concurrency.
4429            let scripted = ScriptedHttp::new()
4430                .with_auth()
4431                .route_seq(
4432                    "/gen/x/wav_file/",
4433                    vec![
4434                        Reply::status(429),
4435                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
4436                    ],
4437                )
4438                .route(
4439                    "/gen/y/wav_file/",
4440                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
4441                )
4442                .route(
4443                    "/gen/z/wav_file/",
4444                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
4445                )
4446                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
4447                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
4448                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
4449
4450            let mut actions = Vec::new();
4451            let mut desireds = Vec::new();
4452            for id in ["x", "y", "z"] {
4453                let (_c, d, action) = download(id, AudioFormat::Flac);
4454                actions.push(action);
4455                desireds.push(d);
4456            }
4457            let plan = Plan { actions };
4458            let fs = MemFs::new();
4459            let ffmpeg = StubFfmpeg::flac();
4460            let clock = RecordingClock::new();
4461            let mut albums = BTreeMap::new();
4462            let mut playlists = BTreeMap::new();
4463            let mut manifest = Manifest::new();
4464            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
4465
4466            let outcome = pollster::block_on(execute(
4467                &plan,
4468                &mut manifest,
4469                &mut albums,
4470                &mut playlists,
4471                &desireds,
4472                &HashMap::new(),
4473                Ports {
4474                    client: &mut client,
4475                    http: &scripted,
4476                    fs: &fs,
4477                    ffmpeg: &ffmpeg,
4478                    clock: &clock,
4479                },
4480                &opts_with(3),
4481            ));
4482
4483            assert_eq!(outcome.downloaded, 3);
4484            assert_eq!(outcome.failed(), 0);
4485            assert!(
4486                (client.limiter_rate() - 1.0).abs() < 1e-9,
4487                "one 429 must halve the rate to 1.0, got {}",
4488                client.limiter_rate()
4489            );
4490        }
4491
4492        #[test]
4493        fn a_download_is_committed_in_plan_order_around_a_rename() {
4494            // Plan order: rename "orig" away from shared.mp3 first, then download
4495            // a new clip into shared.mp3. A parallel executor that performed the
4496            // download's destination write off plan order would write shared.mp3
4497            // before the rename ran, letting the rename carry those fresh bytes
4498            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
4499            // Committing every destination effect serially in plan order keeps
4500            // moved.mp3 = the original and shared.mp3 = the new download.
4501            let c_new = clip("new");
4502            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
4503            d_new.path = "shared.mp3".to_owned();
4504            let plan = Plan {
4505                actions: vec![
4506                    Action::Rename {
4507                        from: "shared.mp3".to_owned(),
4508                        to: "moved.mp3".to_owned(),
4509                    },
4510                    Action::Download {
4511                        clip: c_new.clone(),
4512                        lineage: LineageContext::own_root(&c_new),
4513                        path: "shared.mp3".to_owned(),
4514                        format: AudioFormat::Mp3,
4515                    },
4516                ],
4517            };
4518            let scripted = ScriptedHttp::new()
4519                .with_auth()
4520                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
4521            let http = GatedHttp::new(scripted);
4522            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
4523            let mut manifest = Manifest::new();
4524            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
4525
4526            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
4527
4528            assert_eq!(outcome.renamed, 1);
4529            assert_eq!(outcome.downloaded, 1);
4530            assert_eq!(
4531                fs.read_file("moved.mp3").as_deref(),
4532                Some(&b"ORIGINAL"[..]),
4533                "the rename must carry the original bytes, untouched by the download"
4534            );
4535            let landed = fs.read_file("shared.mp3").expect("new download must land");
4536            assert_ne!(
4537                landed, b"ORIGINAL",
4538                "the new download must replace the moved original, not corrupt it"
4539            );
4540            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
4541            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
4542        }
4543
4544        #[test]
4545        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
4546            // A systemic disk-full abort strikes the download committed before the
4547            // reformat. Because the reformat's slow render is side-effect-free and
4548            // its destination write + old-file removal only happen in the serial
4549            // commit (which the abort skips), the old file survives and the
4550            // manifest still points at it: no removed-but-referenced file.
4551            let boom = clip("boom");
4552            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
4553            d_boom.path = "boom.mp3".to_owned();
4554            let reformer = clip("r");
4555            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
4556            let plan = Plan {
4557                actions: vec![
4558                    Action::Download {
4559                        clip: boom.clone(),
4560                        lineage: LineageContext::own_root(&boom),
4561                        path: "boom.mp3".to_owned(),
4562                        format: AudioFormat::Mp3,
4563                    },
4564                    Action::Reformat {
4565                        clip: reformer.clone(),
4566                        path: "r_new.mp3".to_owned(),
4567                        from_path: "r_old.flac".to_owned(),
4568                        from: AudioFormat::Flac,
4569                        to: AudioFormat::Mp3,
4570                    },
4571                ],
4572            };
4573            let scripted = ScriptedHttp::new()
4574                .with_auth()
4575                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
4576                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
4577            let http = GatedHttp::new(scripted);
4578            // The download's write hits ENOSPC, a systemic abort.
4579            let fs = MemFs::new()
4580                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
4581                .fail_write_out_of_space("boom.mp3");
4582            let mut manifest = Manifest::new();
4583            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
4584
4585            let outcome = run_gated_fs(
4586                &plan,
4587                &mut manifest,
4588                &[d_boom, d_reformer],
4589                &http,
4590                &fs,
4591                &opts_with(4),
4592            );
4593
4594            assert_eq!(outcome.status, RunStatus::DiskFull);
4595            assert!(
4596                fs.exists("r_old.flac"),
4597                "the old file must survive the abort"
4598            );
4599            assert!(
4600                !fs.exists("r_new.mp3"),
4601                "no reformatted file may be written"
4602            );
4603            let still = manifest.get("r").expect("the manifest must still track r");
4604            assert_eq!(
4605                still.path, "r_old.flac",
4606                "the manifest must still point at the surviving old file"
4607            );
4608            assert_eq!(still.format, AudioFormat::Flac);
4609        }
4610
4611        #[test]
4612        fn a_systemic_abort_leaves_no_untracked_destination_files() {
4613            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
4614            // and the rest never commit. Every file remaining on disk must be one
4615            // the manifest tracks: producers write nothing, so an abort cannot
4616            // strand an untracked file from an in-flight or buffered render.
4617            let mut scripted = ScriptedHttp::new().with_auth();
4618            let mut actions = Vec::new();
4619            let mut desireds = Vec::new();
4620            for id in ["a0", "a1", "boom", "a3", "a4"] {
4621                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
4622                let (_c, d, action) = download(id, AudioFormat::Mp3);
4623                actions.push(action);
4624                desireds.push(d);
4625            }
4626            let http = GatedHttp::new(scripted);
4627            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
4628            let plan = Plan { actions };
4629            let mut manifest = Manifest::new();
4630
4631            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
4632
4633            assert_eq!(outcome.status, RunStatus::DiskFull);
4634            let tracked: std::collections::BTreeSet<String> = manifest
4635                .entries
4636                .values()
4637                .map(|entry| entry.path.clone())
4638                .collect();
4639            for path in fs.paths() {
4640                assert!(
4641                    tracked.contains(&path),
4642                    "found an untracked destination file: {path}"
4643                );
4644            }
4645            assert!(
4646                !fs.exists("a3.mp3"),
4647                "uncommitted renders must not be on disk"
4648            );
4649            assert!(
4650                !fs.exists("a4.mp3"),
4651                "uncommitted renders must not be on disk"
4652            );
4653        }
4654
4655        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
4656        /// live: it bumps a shared counter (tracking the peak) when a transcode
4657        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
4658        /// The [transcode, write] window is a superset of the true in-memory hold,
4659        /// so the observed peak upper-bounds the real one.
4660        struct CountingFfmpeg {
4661            inner: StubFfmpeg,
4662            held: Arc<AtomicUsize>,
4663            peak: Arc<AtomicUsize>,
4664        }
4665
4666        impl Ffmpeg for CountingFfmpeg {
4667            fn wav_to_flac(
4668                &self,
4669                wav: &[u8],
4670            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
4671                let fut = self.inner.wav_to_flac(wav);
4672                let held = self.held.clone();
4673                let peak = self.peak.clone();
4674                async move {
4675                    let out = fut.await;
4676                    if out.is_ok() {
4677                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
4678                        peak.fetch_max(now, Ordering::SeqCst);
4679                    }
4680                    out
4681                }
4682            }
4683
4684            fn mp4_to_webp(
4685                &self,
4686                mp4: &[u8],
4687                settings: WebpEncodeSettings,
4688            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
4689                self.inner.mp4_to_webp(mp4, settings)
4690            }
4691        }
4692
4693        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
4694        /// payload counter on each committing write, closing the window opened by
4695        /// [`CountingFfmpeg`].
4696        struct CountingFs {
4697            inner: MemFs,
4698            held: Arc<AtomicUsize>,
4699        }
4700
4701        impl Filesystem for CountingFs {
4702            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
4703                let out = self.inner.write_atomic(path, bytes);
4704                self.held.fetch_sub(1, Ordering::SeqCst);
4705                out
4706            }
4707
4708            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
4709                self.inner.rename(from, to)
4710            }
4711
4712            fn remove(&self, path: &str) -> Result<(), FsError> {
4713                self.inner.remove(path)
4714            }
4715
4716            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
4717                self.inner.prune_empty_dirs(root)
4718            }
4719
4720            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
4721                self.inner.read(path)
4722            }
4723
4724            fn metadata(&self, path: &str) -> Option<FileStat> {
4725                self.inner.metadata(path)
4726            }
4727        }
4728
4729        #[test]
4730        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
4731            // Far more FLAC clips than the concurrency bound. The ordered buffered
4732            // render keeps at most about `concurrency` transcoded payloads live at
4733            // once (never the whole library), so peak held <= concurrency + 1.
4734            let count = 12;
4735            let concurrency = 3;
4736            let mut scripted = ScriptedHttp::new().with_auth();
4737            let mut actions = Vec::new();
4738            let mut desireds = Vec::new();
4739            for i in 0..count {
4740                let id = format!("f{i}");
4741                scripted = scripted
4742                    .route(
4743                        &format!("/gen/{id}/wav_file/"),
4744                        Reply::json(&format!(
4745                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
4746                        )),
4747                    )
4748                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
4749                let (_c, d, action) = download(&id, AudioFormat::Flac);
4750                actions.push(action);
4751                desireds.push(d);
4752            }
4753            let http = GatedHttp::new(scripted);
4754            let held = Arc::new(AtomicUsize::new(0));
4755            let peak = Arc::new(AtomicUsize::new(0));
4756            let ffmpeg = CountingFfmpeg {
4757                inner: StubFfmpeg::flac(),
4758                held: held.clone(),
4759                peak: peak.clone(),
4760            };
4761            let fs = CountingFs {
4762                inner: MemFs::new(),
4763                held: held.clone(),
4764            };
4765            let clock = RecordingClock::new();
4766            let mut albums = BTreeMap::new();
4767            let mut playlists = BTreeMap::new();
4768            let mut manifest = Manifest::new();
4769            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
4770            let plan = Plan { actions };
4771
4772            let outcome = pollster::block_on(execute(
4773                &plan,
4774                &mut manifest,
4775                &mut albums,
4776                &mut playlists,
4777                &desireds,
4778                &HashMap::new(),
4779                Ports {
4780                    client: &mut client,
4781                    http: &http,
4782                    fs: &fs,
4783                    ffmpeg: &ffmpeg,
4784                    clock: &clock,
4785                },
4786                &opts_with(concurrency),
4787            ));
4788
4789            assert_eq!(outcome.downloaded, count as usize);
4790            assert_eq!(
4791                held.load(Ordering::SeqCst),
4792                0,
4793                "every payload must be committed"
4794            );
4795            assert!(
4796                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
4797                "peak live payloads {} exceeded the bound {}",
4798                peak.load(Ordering::SeqCst),
4799                concurrency + 1
4800            );
4801            assert!(
4802                peak.load(Ordering::SeqCst) >= 2,
4803                "the render should genuinely overlap, peak was {}",
4804                peak.load(Ordering::SeqCst)
4805            );
4806        }
4807    }
4808}