Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3, tag_wav};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` is the only `&mut` port: it performs the authenticated WAV render
158/// flow and so mutates its cached session. The rest are shared references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a mut SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// The audio-producing actions ([`Download`](Action::Download) and
186/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
187/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
188/// transcode, tag) overlap while the order-sensitive Suno API calls are
189/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
190/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
191/// rename, delete, and artifact writes/deletes) then run serially in plan order.
192///
193/// The outcome is deterministic regardless of completion order: concurrent audio
194/// results are committed to the manifest in plan-index order, so the same plan
195/// always yields the same manifest and counts whatever the concurrency level. A
196/// per-clip failure is recorded and the run continues; only an auth failure or a
197/// full disk aborts, and it does so promptly by stopping further audio work.
198///
199/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
200/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
201/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
202/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
203/// run with the feature off) is tagged exactly as before. The synced `.lrc`
204/// sidecar itself is a generated artifact whose body the caller has already
205/// resolved into the plan, so it is written like any other text sidecar.
206#[allow(clippy::too_many_arguments)]
207pub async fn execute<H, F, G, C>(
208    plan: &Plan,
209    manifest: &mut Manifest,
210    albums: &mut BTreeMap<String, AlbumArt>,
211    playlists: &mut BTreeMap<String, PlaylistState>,
212    desired: &[Desired],
213    synced: &HashMap<String, AlignedLyrics>,
214    ports: Ports<'_, H, F, G, C>,
215    opts: &ExecOptions,
216) -> ExecOutcome
217where
218    H: Http,
219    F: Filesystem,
220    G: Ffmpeg,
221    C: Clock,
222{
223    let Ports {
224        client,
225        http,
226        fs,
227        ffmpeg,
228        clock,
229    } = ports;
230    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
231    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
232    // How many tracked artifact slots reference each path. The inline old-path
233    // cleanup removes a path only once nothing else holds it: each slot that
234    // moves away decrements its reference, and the removal fires only when the
235    // count reaches zero and no action writes the path this run. This keeps a
236    // live file a co-referencing slot still owns (a prior failed swap can leave
237    // two clips sharing a path) while letting the last slot to leave reclaim it,
238    // so nothing is orphaned either (#76).
239    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
240    for (_, entry) in manifest.iter() {
241        for path in entry.artifact_paths() {
242            *tracked_paths.entry(path.to_owned()).or_default() += 1;
243        }
244    }
245    for art in albums.values() {
246        for state in [
247            art.folder_jpg.as_ref(),
248            art.folder_webp.as_ref(),
249            art.folder_mp4.as_ref(),
250        ]
251        .into_iter()
252        .flatten()
253        {
254            *tracked_paths.entry(state.path.clone()).or_default() += 1;
255        }
256    }
257    for playlist in playlists.values() {
258        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
259    }
260    // Static cover art is otherwise fetched twice per clip (#89): once to embed
261    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
262    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
263    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
264    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
265    // its entry on use, so the map holds at most the covers for the clips in
266    // flight (bounded by `concurrency`), never the whole library.
267    let cover_wanted: HashSet<&str> = plan
268        .actions
269        .iter()
270        .filter_map(|action| match action {
271            Action::WriteArtifact {
272                kind: ArtifactKind::CoverJpg,
273                source_url,
274                ..
275            } if !source_url.is_empty() => Some(source_url.as_str()),
276            _ => None,
277        })
278        .collect();
279    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
280    // The `both` video-cover retention keeps `cover.webp` (transcoded) and
281    // `cover.mp4` (raw) for an album from the SAME `video_cover_url`. Cache that
282    // source on its first fetch so the second folder artifact drains it rather
283    // than fetching the same MP4 twice (#90 reuses the #89 fetch-once path).
284    let mut folder_cover_uses: HashMap<&str, u32> = HashMap::new();
285    for action in &plan.actions {
286        if let Action::WriteArtifact {
287            kind: ArtifactKind::FolderWebp | ArtifactKind::FolderMp4,
288            source_url,
289            ..
290        } = action
291            && !source_url.is_empty()
292        {
293            *folder_cover_uses.entry(source_url.as_str()).or_default() += 1;
294        }
295    }
296    let shared_cover_urls: HashSet<&str> = folder_cover_uses
297        .into_iter()
298        .filter(|(_, uses)| *uses > 1)
299        .map(|(url, _)| url)
300        .collect();
301    let ctx = Ctx {
302        http,
303        fs,
304        ffmpeg,
305        clock,
306        opts,
307        by_id: &by_id,
308        by_path: &by_path,
309        synced,
310        cover_cache: &cover_cache,
311        cover_wanted: &cover_wanted,
312        shared_cover_urls: &shared_cover_urls,
313    };
314
315    let mut outcome = ExecOutcome::default();
316    // Destinations whose write has actually committed this run, gating old-path
317    // cleanup so a vacated sidecar/stem is kept only when a *successful* write
318    // also targets it (#142). Serial commit order makes this a clean prefix.
319    let mut committed: BTreeSet<String> = BTreeSet::new();
320
321    // The audio-producing actions ([`Download`](Action::Download) /
322    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
323    // deliberately split so that NO destination write, file removal, or manifest
324    // update happens off the plan's order:
325    //
326    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
327    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
328    //   tag), returning the tagged bytes; and
329    // - a single serial committer below writes those bytes to the destination,
330    //   removes any superseded file, and records the manifest entry, in strict
331    //   plan-index order, interleaved with the non-audio actions.
332    //
333    // The shared client is the only `&mut` port and its API calls must stay
334    // ordered, so it rides behind an async mutex; each producer locks it only for
335    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
336    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
337    // so at most about `concurrency` tagged payloads are ever held in memory -
338    // never the whole library.
339    let client_lock = AsyncMutex::new(client);
340    let concurrency = opts.concurrency.max(1) as usize;
341    let ctx_ref = &ctx;
342    let client_lock_ref = &client_lock;
343    let mut renders = stream::iter(
344        plan.actions
345            .iter()
346            .filter(|action| is_audio_action(action))
347            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
348    )
349    .buffered(concurrency);
350
351    for action in &plan.actions {
352        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
353        // commit them here; every other action applies its own effect. Both the
354        // audio commit and the non-audio apply run serially, so all destination
355        // and manifest effects keep the plan's order exactly as the sequential
356        // executor did.
357        let result = if is_audio_action(action) {
358            match renders.next().await {
359                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
360                Some(Err(fail)) => Err(fail),
361                None => unreachable!("buffered yields one result per audio action"),
362            }
363        } else {
364            ctx.apply(
365                client_lock_ref,
366                action,
367                manifest,
368                albums,
369                playlists,
370                &mut tracked_paths,
371                &committed,
372            )
373            .await
374        };
375        match result {
376            Ok(effect) => {
377                outcome.record(effect);
378                // Record this action's destination now that its write succeeded.
379                // A later action vacating a path removes it only when no
380                // *committed* write also targets it; commit is strictly serial in
381                // plan order, so a planned-but-failed or not-yet-run write never
382                // protects a stale file from cleanup (#142).
383                if let Some(dest) = written_path(action) {
384                    committed.insert(dest.to_owned());
385                }
386            }
387            Err(fail) => {
388                let abort = abort_status(fail.class);
389                outcome.failures.push(Failure {
390                    clip_id: fail.clip_id,
391                    reason: fail.reason,
392                });
393                if let Some(status) = abort {
394                    // A systemic abort stops the run. Dropping the render stream
395                    // cancels any in-flight or completed-but-uncommitted producer;
396                    // because producers touch nothing on disk, the destination and
397                    // manifest are left exactly as the committed prefix wrote them,
398                    // with no untracked files and no removed-but-referenced file.
399                    outcome.status = status;
400                    break;
401                }
402            }
403        }
404    }
405    drop(renders);
406
407    // Renames and deletes can leave an album directory empty; prune those ghost
408    // directories bottom-up. This runs on both the completed and the aborted
409    // paths, and is best-effort: a prune failure is only a missed tidy that the
410    // next run repeats, never a reason to fail the run.
411    let _ = fs.prune_empty_dirs("");
412    outcome
413}
414
415/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
416/// file. Its slow render runs in the concurrent phase; its destination write and
417/// manifest update are committed serially in plan order. Everything else touches
418/// the manifest, album, or playlist stores directly and runs serially.
419fn is_audio_action(action: &Action) -> bool {
420    matches!(action, Action::Download { .. } | Action::Reformat { .. })
421}
422
423/// The destination path an action writes on success, or `None` for actions that
424/// write no file (skips, deletes). The serial committer records this once the
425/// action succeeds, so a later action vacating that same path keeps it rather
426/// than removing a freshly written file (#142, #76).
427fn written_path(action: &Action) -> Option<&str> {
428    match action {
429        Action::Download { path, .. }
430        | Action::Reformat { path, .. }
431        | Action::WriteArtifact { path, .. }
432        | Action::WriteStem { path, .. } => Some(path),
433        Action::Rename { to, .. }
434        | Action::MoveArtifact { to, .. }
435        | Action::MoveStem { to, .. } => Some(to),
436        _ => None,
437    }
438}
439
440/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
441/// committer needs to place them. Produced concurrently and side-effect-free (no
442/// destination write, no removal, no manifest touch); [`commit_audio`] applies
443/// all of those in plan order.
444struct RenderedAudio {
445    clip_id: String,
446    path: String,
447    format: AudioFormat,
448    /// The superseded file to remove after the new one lands (a [`Reformat`]),
449    /// or `None` for a plain [`Download`].
450    from_path: Option<String>,
451    effect: Effect,
452    bytes: Vec<u8>,
453}
454
455/// What an applied action did, for the outcome counters.
456enum Effect {
457    Downloaded,
458    Reformatted,
459    Retagged,
460    Renamed,
461    Deleted,
462    Skipped,
463    ArtifactWritten,
464    ArtifactDeleted,
465}
466
467/// How a failure should be handled (SYNC-17).
468#[derive(Debug, Clone, Copy)]
469enum Class {
470    /// Stop the account run; do not retry.
471    Auth,
472    /// Stop the account run: a full disk is systemic, like auth, so aborting
473    /// beats skipping every remaining clip (each of which would first burn a
474    /// server-side WAV-render budget before failing the same way).
475    Disk,
476    /// Retry a bounded number of times, then record and skip.
477    Transient,
478    /// Record and skip immediately.
479    Permanent,
480}
481
482/// A classified action failure attributed to a clip.
483struct Fail {
484    class: Class,
485    clip_id: String,
486    reason: String,
487}
488
489/// The run-ending status for a failure class, or `None` when the failure is
490/// per-clip and the run continues.
491fn abort_status(class: Class) -> Option<RunStatus> {
492    match class {
493        Class::Auth => Some(RunStatus::AuthAborted),
494        Class::Disk => Some(RunStatus::DiskFull),
495        Class::Transient | Class::Permanent => None,
496    }
497}
498
499fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
500    Fail {
501        class: Class::Auth,
502        clip_id: clip_id.into(),
503        reason: reason.into(),
504    }
505}
506
507fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
508    Fail {
509        class: Class::Transient,
510        clip_id: clip_id.into(),
511        reason: reason.into(),
512    }
513}
514
515fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
516    Fail {
517        class: Class::Permanent,
518        clip_id: clip_id.into(),
519        reason: reason.into(),
520    }
521}
522
523fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
524    Fail {
525        class: Class::Disk,
526        clip_id: clip_id.into(),
527        reason: reason.into(),
528    }
529}
530
531/// Whether an artifact kind is album-scoped folder art (owned by a root id and
532/// recorded on the album store) rather than a per-clip sidecar (recorded on the
533/// manifest).
534fn is_album_kind(kind: ArtifactKind) -> bool {
535    matches!(
536        kind,
537        ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4
538    )
539}
540
541/// True for the library-scoped playlist artifact, routed to the playlist store.
542fn is_playlist_kind(kind: ArtifactKind) -> bool {
543    matches!(kind, ArtifactKind::Playlist)
544}
545
546/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
547/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
548/// root/playlist id that is deliberately absent from the manifest.
549fn is_per_clip_kind(kind: ArtifactKind) -> bool {
550    matches!(
551        kind,
552        ArtifactKind::CoverJpg
553            | ArtifactKind::CoverWebp
554            | ArtifactKind::DetailsTxt
555            | ArtifactKind::LyricsTxt
556            | ArtifactKind::Lrc
557            | ArtifactKind::VideoMp4
558    )
559}
560
561/// Recover a playlist's display name from its `.m3u8` path's file stem.
562///
563/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
564/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
565/// this recovered name is a convenience for humans and its lossiness (the
566/// sanitiser is not reversible) never affects a decision.
567fn playlist_name_from_path(path: &str) -> String {
568    std::path::Path::new(path)
569        .file_stem()
570        .map(|stem| stem.to_string_lossy().into_owned())
571        .unwrap_or_default()
572}
573
574/// A classified fetch failure, not yet attributed to a clip.
575struct FetchError {
576    class: Class,
577    reason: String,
578    retry_after: Option<Duration>,
579}
580
581impl FetchError {
582    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
583        Self {
584            class: Class::Transient,
585            reason: reason.into(),
586            retry_after,
587        }
588    }
589
590    fn permanent(reason: impl Into<String>) -> Self {
591        Self {
592            class: Class::Permanent,
593            reason: reason.into(),
594            retry_after: None,
595        }
596    }
597
598    fn attribute(self, clip_id: &str) -> Fail {
599        Fail {
600            class: self.class,
601            clip_id: clip_id.to_owned(),
602            reason: self.reason,
603        }
604    }
605}
606
607/// The shared, read-only context threaded through every action handler.
608struct Ctx<'a, H, F, G, C> {
609    http: &'a H,
610    fs: &'a F,
611    ffmpeg: &'a G,
612    clock: &'a C,
613    opts: &'a ExecOptions,
614    by_id: &'a HashMap<&'a str, &'a Desired>,
615    by_path: &'a HashMap<&'a str, &'a Desired>,
616    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
617    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
618    /// lyric text; a clip absent here is tagged exactly as before. Populated by
619    /// the caller (the fetch is IO), so the engine stays free of direct IO.
620    synced: &'a HashMap<String, AlignedLyrics>,
621    /// Static cover art the audio producer already fetched to embed in the tag,
622    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
623    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
624    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
625    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
626    /// concurrent producers only ever insert, and the lock is never held across an
627    /// await.
628    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
629    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
630    /// a cover only when its URL is here, so a clip whose cover is embedded but
631    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
632    cover_wanted: &'a HashSet<&'a str>,
633    /// Album video-cover source URLs fetched by more than one folder artifact
634    /// this run. The `both` retention derives `cover.webp` (transcoded) and
635    /// `cover.mp4` (raw) from the SAME `video_cover_url`; the first fetch caches
636    /// the raw source here so the sibling drains it instead of re-fetching (#90
637    /// reuses the #89 fetch-once path). `FolderWebp` sorts before `FolderMp4`, so
638    /// the raw source is always cached before the raw sidecar reads it.
639    shared_cover_urls: &'a HashSet<&'a str>,
640}
641
642impl<H, F, G, C> Ctx<'_, H, F, G, C>
643where
644    H: Http,
645    F: Filesystem,
646    G: Ffmpeg,
647    C: Clock,
648{
649    /// Apply one non-audio action, returning what it did or why it failed.
650    ///
651    /// Audio actions ([`Download`](Action::Download) /
652    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
653    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
654    #[allow(clippy::too_many_arguments)]
655    async fn apply(
656        &self,
657        client_lock: &ClientLock<'_, C>,
658        action: &Action,
659        manifest: &mut Manifest,
660        albums: &mut BTreeMap<String, AlbumArt>,
661        playlists: &mut BTreeMap<String, PlaylistState>,
662        tracked_paths: &mut HashMap<String, u32>,
663        committed: &BTreeSet<String>,
664    ) -> Result<Effect, Fail> {
665        match action {
666            Action::Download { .. } | Action::Reformat { .. } => {
667                unreachable!("audio actions are applied in the concurrent phase")
668            }
669            Action::Retag {
670                clip,
671                lineage,
672                path,
673            } => self.retag(manifest, clip, lineage, path).await,
674            Action::Rename { from, to } => self.rename(manifest, from, to),
675            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
676            Action::Skip { clip_id } => {
677                self.refresh_preserve(manifest, clip_id);
678                Ok(Effect::Skipped)
679            }
680            Action::WriteArtifact {
681                kind,
682                path,
683                source_url,
684                hash,
685                owner_id,
686                content,
687            } => {
688                self.write_artifact(
689                    manifest,
690                    albums,
691                    playlists,
692                    *kind,
693                    path,
694                    source_url,
695                    hash,
696                    owner_id,
697                    content.as_deref(),
698                    tracked_paths,
699                    committed,
700                )
701                .await
702            }
703            Action::DeleteArtifact {
704                kind,
705                path,
706                owner_id,
707            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
708            Action::MoveArtifact {
709                kind,
710                from,
711                to,
712                source_url,
713                hash,
714                owner_id,
715            } => {
716                self.move_artifact(
717                    manifest,
718                    albums,
719                    playlists,
720                    *kind,
721                    from,
722                    to,
723                    source_url,
724                    hash,
725                    owner_id,
726                    tracked_paths,
727                    committed,
728                )
729                .await
730            }
731            Action::WriteStem {
732                clip_id,
733                key,
734                stem_id,
735                path,
736                source_url,
737                format,
738                hash,
739            } => {
740                self.write_stem(
741                    client_lock,
742                    manifest,
743                    clip_id,
744                    key,
745                    stem_id,
746                    path,
747                    source_url,
748                    *format,
749                    hash,
750                    tracked_paths,
751                    committed,
752                )
753                .await
754            }
755            Action::DeleteStem { clip_id, key, path } => {
756                self.delete_stem(manifest, clip_id, key, path)
757            }
758            Action::MoveStem {
759                clip_id,
760                key,
761                stem_id,
762                from,
763                to,
764                source_url,
765                format,
766                hash,
767            } => {
768                self.move_stem(
769                    client_lock,
770                    manifest,
771                    clip_id,
772                    key,
773                    stem_id,
774                    from,
775                    to,
776                    source_url,
777                    *format,
778                    hash,
779                    tracked_paths,
780                    committed,
781                )
782                .await
783            }
784        }
785    }
786
787    /// Render one audio action's tagged bytes, side-effect-free.
788    ///
789    /// This is the concurrent part: it fetches, transcodes, and tags the file
790    /// (through shared ports, plus the client behind `client_lock`), then returns
791    /// the bytes and where they must go. It deliberately writes nothing, removes
792    /// nothing, and never touches `manifest`, so many run at once and an aborted
793    /// run can drop them with no destination or manifest effect. The serial
794    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
795    async fn prepare_audio(
796        &self,
797        client_lock: &ClientLock<'_, C>,
798        action: &Action,
799    ) -> Result<RenderedAudio, Fail> {
800        match action {
801            Action::Download {
802                clip,
803                lineage,
804                path,
805                format,
806            } => {
807                let bytes = self
808                    .produce_audio(client_lock, clip, lineage, *format)
809                    .await?;
810                Ok(RenderedAudio {
811                    clip_id: clip.id.clone(),
812                    path: path.clone(),
813                    format: *format,
814                    from_path: None,
815                    effect: Effect::Downloaded,
816                    bytes,
817                })
818            }
819            Action::Reformat {
820                clip,
821                path,
822                from_path,
823                from: _,
824                to,
825            } => {
826                // A Reformat action carries no lineage, so recover it from the
827                // desired set (the same context that drove naming and the hash),
828                // falling back to a self-rooted context when the clip is not in
829                // the current selection.
830                let lineage = self
831                    .by_id
832                    .get(clip.id.as_str())
833                    .map(|d| d.lineage.clone())
834                    .unwrap_or_else(|| LineageContext::own_root(clip));
835                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
836                Ok(RenderedAudio {
837                    clip_id: clip.id.clone(),
838                    path: path.clone(),
839                    format: *to,
840                    from_path: Some(from_path.clone()),
841                    effect: Effect::Reformatted,
842                    bytes,
843                })
844            }
845            _ => unreachable!("prepare_audio only handles audio actions"),
846        }
847    }
848
849    /// Commit one rendered audio result serially, in plan order.
850    ///
851    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
852    /// the superseded file, then records the manifest entry. Ordering the write
853    /// before the removal keeps a crash from losing both copies; keeping all of
854    /// this off the concurrent phase preserves the sequential executor's plan-order
855    /// guarantee for every destination and manifest effect.
856    fn commit_audio(
857        &self,
858        manifest: &mut Manifest,
859        rendered: RenderedAudio,
860    ) -> Result<Effect, Fail> {
861        let RenderedAudio {
862            clip_id,
863            path,
864            format,
865            from_path,
866            effect,
867            bytes,
868        } = rendered;
869        let size = self.write_verify(&clip_id, &path, &bytes)?;
870        if let Some(from) = from_path {
871            // The new file is safely in place; only now drop the old rendering.
872            self.fs.remove(&from).map_err(|err| {
873                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
874            })?;
875        }
876        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
877        Ok(effect)
878    }
879
880    /// Re-tag the existing file in place to match current metadata and art.
881    async fn retag(
882        &self,
883        manifest: &mut Manifest,
884        clip: &Clip,
885        lineage: &LineageContext,
886        path: &str,
887    ) -> Result<Effect, Fail> {
888        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
889            return Err(permanent_fail(
890                &clip.id,
891                "retag target missing from manifest",
892            ));
893        };
894
895        if format == AudioFormat::Wav {
896            let (meta, synced) = self.track_meta(clip, lineage);
897            let cover = self.fetch_cover(clip).await;
898            let existing = self.fs.read(path).map_err(|err| {
899                permanent_fail(&clip.id, format!("could not read for retag: {err}"))
900            })?;
901            let tagged = tag_wav(&existing, &meta, cover.as_deref(), synced)
902                .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
903            let size = self.write_verify(&clip.id, path, &tagged)?;
904            self.refresh_hashes(manifest, &clip.id, Some(size));
905            return Ok(Effect::Retagged);
906        }
907
908        let (meta, synced) = self.track_meta(clip, lineage);
909        let cover = self.fetch_cover(clip).await;
910        let existing = self
911            .fs
912            .read(path)
913            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
914        let tagged = match format {
915            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
916            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
917            AudioFormat::Wav => unreachable!("WAV handled above"),
918        }
919        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
920        let size = self.write_verify(&clip.id, path, &tagged)?;
921        self.refresh_hashes(manifest, &clip.id, Some(size));
922        Ok(Effect::Retagged)
923    }
924
925    /// Move the file and update the entry's path (and protection).
926    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
927        let label = self
928            .by_path
929            .get(to)
930            .map(|d| d.clip.id.clone())
931            .unwrap_or_else(|| to.to_owned());
932        self.fs.rename(from, to).map_err(|err| {
933            if err.is_out_of_space() {
934                disk_fail(label, "disk full: no space left to rename")
935            } else {
936                permanent_fail(label, format!("rename failed: {err}"))
937            }
938        })?;
939
940        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
941            manifest
942                .entries
943                .iter()
944                .find(|(_, entry)| entry.path == from)
945                .map(|(id, _)| id.clone())
946        });
947        if let Some(id) = clip_id
948            && let Some(entry) = manifest.entries.get_mut(&id)
949        {
950            entry.path = to.to_owned();
951            if let Some(d) = self.by_path.get(to) {
952                entry.preserve = preserve_for(d);
953            }
954        }
955        Ok(Effect::Renamed)
956    }
957
958    /// Remove the file and drop the manifest entry.
959    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
960        self.fs
961            .remove(path)
962            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
963        manifest.remove(clip_id);
964        Ok(Effect::Deleted)
965    }
966
967    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
968    /// on the owning manifest entry.
969    ///
970    /// The fetch and write share the audio path's resilience: `fetch_bytes`
971    /// retries transient failures and verifies `Content-Length`, and
972    /// `write_verify` confirms the on-disk size. A failure is attributed to the
973    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
974    /// aborts the whole run (only an auth failure or a full disk does, matching
975    /// audio).
976    ///
977    /// The bytes written depend on the kind: a static cover is the fetched image
978    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
979    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
980    ///
981    /// A sidecar is only ever written for a clip whose audio is present: a
982    /// successful `Download`/`Reformat` creates the manifest entry earlier in
983    /// this run, and a prior-run clip already has one. So an absent owning entry
984    /// means the audio failed or never existed this run; we skip (no fetch, no
985    /// write) rather than strand an untracked sidecar with no owning audio.
986    ///
987    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg),
988    /// [`FolderWebp`](ArtifactKind::FolderWebp), and
989    /// [`FolderMp4`](ArtifactKind::FolderMp4)) is album-scoped: its `owner_id`
990    /// is the album's stable root id, not a manifest clip, so it skips the
991    /// manifest presence guard and records its state on the album store instead.
992    ///
993    /// When a title or album change moves the audio, reconcile re-emits this
994    /// write at the NEW path; this handler then removes the sidecar left at the
995    /// artifact's previously tracked path, moving it rather than orphaning it.
996    /// The removal happens only after the new file is safely written, and a
997    /// remove failure returns before the state slot advances, so the next run
998    /// re-plans the identical write and retries — self-healing, never an orphan.
999    #[allow(clippy::too_many_arguments)]
1000    async fn write_artifact(
1001        &self,
1002        manifest: &mut Manifest,
1003        albums: &mut BTreeMap<String, AlbumArt>,
1004        playlists: &mut BTreeMap<String, PlaylistState>,
1005        kind: ArtifactKind,
1006        path: &str,
1007        source_url: &str,
1008        hash: &str,
1009        owner_id: &str,
1010        content: Option<&str>,
1011        tracked_paths: &mut HashMap<String, u32>,
1012        committed: &BTreeSet<String>,
1013    ) -> Result<Effect, Fail> {
1014        // A per-song sidecar needs its owning clip's manifest entry; album and
1015        // playlist kinds are keyed elsewhere and skip this guard.
1016        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
1017            // The owning audio never landed this run, so this sidecar is skipped
1018            // and will never drain a cover the producer cached for it. Drop that
1019            // entry now: an insert without a matching sidecar write must not
1020            // outlive its clip, keeping `cover_cache` bounded to the clips in
1021            // flight (#89). A non-cover kind has no entry here, so this is a
1022            // harmless no-op for them.
1023            self.cover_cache
1024                .lock()
1025                .expect("cover cache mutex poisoned")
1026                .remove(source_url);
1027            return Ok(Effect::Skipped);
1028        }
1029        // Capture the path this artifact was last tracked at, BEFORE the slot is
1030        // overwritten below, so a path-changing write (a title/album rename that
1031        // moves the audio) can clean up the old sidecar it left behind. Cover
1032        // kinds live on the manifest, folder kinds on the album store; playlists
1033        // reconcile their own old-path delete and so opt out here.
1034        let old_path = match kind {
1035            ArtifactKind::CoverJpg => manifest
1036                .get(owner_id)
1037                .and_then(|e| e.cover_jpg.as_ref())
1038                .map(|s| s.path.clone()),
1039            ArtifactKind::CoverWebp => manifest
1040                .get(owner_id)
1041                .and_then(|e| e.cover_webp.as_ref())
1042                .map(|s| s.path.clone()),
1043            ArtifactKind::DetailsTxt => manifest
1044                .get(owner_id)
1045                .and_then(|e| e.details_txt.as_ref())
1046                .map(|s| s.path.clone()),
1047            ArtifactKind::LyricsTxt => manifest
1048                .get(owner_id)
1049                .and_then(|e| e.lyrics_txt.as_ref())
1050                .map(|s| s.path.clone()),
1051            ArtifactKind::Lrc => manifest
1052                .get(owner_id)
1053                .and_then(|e| e.lrc.as_ref())
1054                .map(|s| s.path.clone()),
1055            ArtifactKind::VideoMp4 => manifest
1056                .get(owner_id)
1057                .and_then(|e| e.video_mp4.as_ref())
1058                .map(|s| s.path.clone()),
1059            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4 => albums
1060                .get(owner_id)
1061                .and_then(|a| a.artifact(kind))
1062                .map(|s| s.path.clone()),
1063            ArtifactKind::Playlist => None,
1064        };
1065        // A generated artifact (a playlist) carries its body inline and never
1066        // touches the network; a fetched one pulls (and transcodes) its source.
1067        let bytes = match content {
1068            Some(text) => text.as_bytes().to_vec(),
1069            None => self.artifact_bytes(kind, source_url, owner_id).await?,
1070        };
1071        self.write_verify(owner_id, path, &bytes)?;
1072        // The new sidecar is safely in place; only now drop a stale copy left at
1073        // the previous path (the audio moved). `remove` is idempotent, so an
1074        // already-absent old file is fine. On a genuine remove failure we return
1075        // BEFORE updating the slot, leaving the manifest/album pointing at the
1076        // old path: the next run sees the same path drift, re-plans this write,
1077        // and retries the cleanup — convergent, no orphan persists.
1078        //
1079        // The removal is gated so it can never delete a live file (#76). This
1080        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
1081        // file is removed only once nothing else holds it — no other tracked slot
1082        // still references it (count now zero) and no *committed* write this run
1083        // has already placed a file there (`committed`, the commit-tracked twin of
1084        // `suppress_path_aliasing`). On a path swap (A: x -> y while B: y -> x)
1085        // the earlier write commits its path, so the later mover keeps it; when
1086        // two slots share a path after a prior failed swap, the reference count
1087        // keeps it. But a merely *planned* colliding write that later fails no
1088        // longer protects a stale file, so it is cleaned up rather than orphaned
1089        // (#142).
1090        if let Some(old) = old_path.as_deref()
1091            && !old.is_empty()
1092            && old != path
1093        {
1094            let still_referenced = tracked_paths
1095                .get_mut(old)
1096                .map(|count| {
1097                    *count = count.saturating_sub(1);
1098                    *count > 0
1099                })
1100                .unwrap_or(false);
1101            if !still_referenced && !committed.contains(old) {
1102                self.fs.remove(old).map_err(|err| {
1103                    permanent_fail(
1104                        owner_id,
1105                        format!("could not remove old sidecar {old}: {err}"),
1106                    )
1107                })?;
1108            }
1109        }
1110        if is_album_kind(kind) {
1111            albums.entry(owner_id.to_owned()).or_default().set(
1112                kind,
1113                Some(ArtifactState {
1114                    path: path.to_owned(),
1115                    hash: hash.to_owned(),
1116                }),
1117            );
1118        } else if is_playlist_kind(kind) {
1119            playlists.insert(
1120                owner_id.to_owned(),
1121                PlaylistState {
1122                    name: playlist_name_from_path(path),
1123                    path: path.to_owned(),
1124                    hash: hash.to_owned(),
1125                },
1126            );
1127        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1128            set_manifest_artifact(
1129                entry,
1130                kind,
1131                Some(ArtifactState {
1132                    path: path.to_owned(),
1133                    hash: hash.to_owned(),
1134                }),
1135            );
1136        }
1137        Ok(Effect::ArtifactWritten)
1138    }
1139
1140    /// Relocate a fetched per-clip sidecar with a local rename, falling back to a
1141    /// fetch-and-write when the move is unsafe or the old file has vanished.
1142    ///
1143    /// Reconcile downgrades a pure path drift (same bytes, new path, old file
1144    /// present, fetched kind) to a `MoveArtifact`, so a retitle renames the file
1145    /// rather than re-downloading a cover or re-transcoding an animated WebP
1146    /// (#141). The in-place rename is taken only when `from` is this slot's alone
1147    /// to give up (no other tracked slot references it and no committed write has
1148    /// placed a file there); otherwise, or if the rename fails, the ordinary
1149    /// [`write_artifact`](Self::write_artifact) fetches fresh bytes and runs the
1150    /// gated old-path cleanup, so a swap or co-reference is handled exactly as
1151    /// before.
1152    #[allow(clippy::too_many_arguments)]
1153    async fn move_artifact(
1154        &self,
1155        manifest: &mut Manifest,
1156        albums: &mut BTreeMap<String, AlbumArt>,
1157        playlists: &mut BTreeMap<String, PlaylistState>,
1158        kind: ArtifactKind,
1159        from: &str,
1160        to: &str,
1161        source_url: &str,
1162        hash: &str,
1163        owner_id: &str,
1164        tracked_paths: &mut HashMap<String, u32>,
1165        committed: &BTreeSet<String>,
1166    ) -> Result<Effect, Fail> {
1167        // A per-clip sidecar needs its owning clip's audio present, exactly as
1168        // write_artifact requires.
1169        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
1170            return Ok(Effect::Skipped);
1171        }
1172        // Relocate in place only when `from` is ours alone to give up: no other
1173        // tracked slot still references it (a prior failed swap can share a path)
1174        // and no committed write this run has already placed a file there.
1175        // Otherwise the fetch-and-write fallback copies fresh bytes and runs the
1176        // gated old-path cleanup.
1177        let exclusive =
1178            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1179        if from != to && exclusive {
1180            match self.fs.rename(from, to) {
1181                Ok(()) => {
1182                    if let Some(count) = tracked_paths.get_mut(from) {
1183                        *count = count.saturating_sub(1);
1184                    }
1185                    if let Some(entry) = manifest.entries.get_mut(owner_id) {
1186                        set_manifest_artifact(
1187                            entry,
1188                            kind,
1189                            Some(ArtifactState {
1190                                path: to.to_owned(),
1191                                hash: hash.to_owned(),
1192                            }),
1193                        );
1194                    }
1195                    return Ok(Effect::Renamed);
1196                }
1197                Err(err) if err.is_out_of_space() => {
1198                    return Err(disk_fail(
1199                        owner_id,
1200                        "disk full: no space left to move sidecar",
1201                    ));
1202                }
1203                // The old file has vanished, or the rename is unsupported: fall
1204                // through to a fetch-and-write at `to`.
1205                Err(_) => {}
1206            }
1207        }
1208        self.write_artifact(
1209            manifest,
1210            albums,
1211            playlists,
1212            kind,
1213            to,
1214            source_url,
1215            hash,
1216            owner_id,
1217            None,
1218            tracked_paths,
1219            committed,
1220        )
1221        .await
1222    }
1223    ///
1224    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1225    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1226    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1227    /// ffmpeg port; every other kind is the fetched source verbatim (the static
1228    /// [`CoverJpg`](ArtifactKind::CoverJpg) / album [`FolderJpg`](ArtifactKind::FolderJpg)
1229    /// image, or the raw album [`FolderMp4`](ArtifactKind::FolderMp4) whose
1230    /// `video_cover_url` is kept untranscoded). A fetch or transcode failure
1231    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1232    /// disk-full transcode, which aborts the run like the audio FLAC path.
1233    async fn artifact_bytes(
1234        &self,
1235        kind: ArtifactKind,
1236        source_url: &str,
1237        owner_id: &str,
1238    ) -> Result<Vec<u8>, Fail> {
1239        // Reuse the cover the audio producer already fetched for the embedded tag
1240        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1241        // is taken and dropped in its own statement so it never spans the await.
1242        let cached = self
1243            .cover_cache
1244            .lock()
1245            .expect("cover cache mutex poisoned")
1246            .remove(source_url);
1247        let source = match cached {
1248            Some(bytes) => bytes,
1249            None => {
1250                let fetched = self
1251                    .fetch_bytes(source_url)
1252                    .await
1253                    .map_err(|err| err.attribute(owner_id))?;
1254                // Cache the raw source when a sibling folder artifact will fetch
1255                // the same URL (the `both` retention: cover.webp + cover.mp4), so
1256                // it is fetched exactly once. Bounded to shared URLs and drained
1257                // on the sibling's use.
1258                if self.shared_cover_urls.contains(source_url) {
1259                    self.cover_cache
1260                        .lock()
1261                        .expect("cover cache mutex poisoned")
1262                        .insert(source_url.to_owned(), fetched.clone());
1263                }
1264                fetched
1265            }
1266        };
1267        match kind {
1268            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1269                .ffmpeg
1270                .mp4_to_webp(&source, self.opts.cover_webp)
1271                .await
1272                .map_err(|err| {
1273                    if err.is_out_of_space() {
1274                        disk_fail(owner_id, "disk full: no space left to transcode")
1275                    } else {
1276                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1277                    }
1278                }),
1279            // The text sidecars are generated and always carry inline content, so
1280            // `write_artifact` never reaches this fetch path for them. Guard it so
1281            // a future miswiring fails loudly rather than fetching a URL.
1282            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1283                permanent_fail(owner_id, "text sidecar requires inline content"),
1284            ),
1285            ArtifactKind::CoverJpg
1286            | ArtifactKind::FolderJpg
1287            | ArtifactKind::FolderMp4
1288            | ArtifactKind::Playlist
1289            | ArtifactKind::VideoMp4 => Ok(source),
1290        }
1291    }
1292
1293    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1294    ///
1295    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1296    /// When the owning entry is already gone (its audio was deleted earlier this
1297    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1298    ///
1299    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1300    /// the album's root id, not on a manifest clip.
1301    ///
1302    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1303    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1304    /// untracked, but the design stays convergent rather than transactional: the
1305    /// next run re-plans the same removal and retries, and any directory it would
1306    /// have emptied is pruned once the file finally clears.
1307    fn delete_artifact(
1308        &self,
1309        manifest: &mut Manifest,
1310        albums: &mut BTreeMap<String, AlbumArt>,
1311        playlists: &mut BTreeMap<String, PlaylistState>,
1312        kind: ArtifactKind,
1313        path: &str,
1314        owner_id: &str,
1315    ) -> Result<Effect, Fail> {
1316        self.fs
1317            .remove(path)
1318            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1319        if is_album_kind(kind) {
1320            if let Some(art) = albums.get_mut(owner_id) {
1321                art.set(kind, None);
1322                if art.is_empty() {
1323                    albums.remove(owner_id);
1324                }
1325            }
1326        } else if is_playlist_kind(kind) {
1327            playlists.remove(owner_id);
1328        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1329            set_manifest_artifact(entry, kind, None);
1330        }
1331        Ok(Effect::ArtifactDeleted)
1332    }
1333
1334    /// Fetch one stem's bytes, write them atomically, then record the stem on
1335    /// the owning clip's keyed stem map.
1336    ///
1337    /// Mirrors [`write_artifact`](Self::write_artifact) for the keyed-stem case,
1338    /// sharing the fetch resilience (`fetch_bytes` retries and verifies
1339    /// `Content-Length`) and the atomic size-verified write. A stem is only ever
1340    /// written for a clip whose audio is present, so an absent owning manifest
1341    /// entry means the audio failed or never existed this run; we skip rather
1342    /// than strand an untracked stem with no owning audio.
1343    ///
1344    /// Stems are stored RAW in their native container and are NEVER transcoded to
1345    /// FLAC, even when the song's own format is FLAC — they are the deliberate
1346    /// exception. A `Wav` stem is rendered through the free `convert_wav` flow
1347    /// (see [`fetch_stem_bytes`](Self::fetch_stem_bytes)); an `Mp3` stem is fetched
1348    /// straight from its public CDN url. Either way the bytes land verbatim at
1349    /// `path`, whose extension already matches the stem format.
1350    ///
1351    /// When a title/album change moves the song, reconcile re-emits this write at
1352    /// the NEW path; this handler then removes the stem left at the previously
1353    /// tracked path, moving it rather than orphaning it. The removal happens only
1354    /// after the new file is safely written and only when nothing else this run
1355    /// writes that path, and a remove failure returns before the slot advances so
1356    /// the next run re-plans the identical write and retries — self-healing.
1357    #[allow(clippy::too_many_arguments)]
1358    async fn write_stem(
1359        &self,
1360        client_lock: &ClientLock<'_, C>,
1361        manifest: &mut Manifest,
1362        clip_id: &str,
1363        key: &str,
1364        stem_id: &str,
1365        path: &str,
1366        source_url: &str,
1367        format: StemFormat,
1368        hash: &str,
1369        tracked_paths: &mut HashMap<String, u32>,
1370        committed: &BTreeSet<String>,
1371    ) -> Result<Effect, Fail> {
1372        // A stem needs its owning clip's manifest entry (its audio must exist).
1373        if manifest.get(clip_id).is_none() {
1374            return Ok(Effect::Skipped);
1375        }
1376        let old_path = manifest
1377            .get(clip_id)
1378            .and_then(|e| e.stems.get(key))
1379            .map(|s| s.path.clone());
1380        let bytes = self
1381            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1382            .await?;
1383        self.write_verify(clip_id, path, &bytes)?;
1384        // The new stem is in place; only now drop a stale copy left at the old
1385        // path (the song moved, or the stem format changed). `remove` is
1386        // idempotent. On a genuine remove failure we return BEFORE updating the
1387        // slot, so the next run re-plans the same write and retries the cleanup.
1388        //
1389        // The removal is gated by the same twin guards as `write_artifact` (#76,
1390        // #142): this slot decrements its reference in `tracked_paths`, and the
1391        // file is removed only when nothing else holds it (count reaches zero)
1392        // and no committed write this run already placed a file there. This
1393        // keeps the shared file alive when two clips co-reference the same stem
1394        // path after a partially-failed swap.
1395        if let Some(old) = old_path.as_deref()
1396            && !old.is_empty()
1397            && old != path
1398        {
1399            let still_referenced = tracked_paths
1400                .get_mut(old)
1401                .map(|count| {
1402                    *count = count.saturating_sub(1);
1403                    *count > 0
1404                })
1405                .unwrap_or(false);
1406            if !still_referenced && !committed.contains(old) {
1407                self.fs.remove(old).map_err(|err| {
1408                    permanent_fail(clip_id, format!("could not remove old stem {old}: {err}"))
1409                })?;
1410            }
1411        }
1412        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1413            set_manifest_stem(
1414                entry,
1415                key,
1416                Some(ArtifactState {
1417                    path: path.to_owned(),
1418                    hash: hash.to_owned(),
1419                }),
1420            );
1421        }
1422        Ok(Effect::ArtifactWritten)
1423    }
1424
1425    /// Relocate a stem with a local rename, falling back to a fetch-and-write
1426    /// when the move is unsafe or the old file has vanished (#141).
1427    ///
1428    /// Reconcile downgrades a pure stem path drift to a `MoveStem`, so a retitle
1429    /// renames the raw stem rather than re-rendering a WAV through `convert_wav`
1430    /// or re-fetching an MP3. The in-place rename is taken only when `from` is
1431    /// this slot's alone to give up (no other tracked slot references it — two
1432    /// same-base clips can share a stem path after a partially-failed swap — and
1433    /// no committed write this run already holds it); otherwise the
1434    /// fetch-and-write fallback re-fetches the correct bytes at `to`, so a
1435    /// co-referenced shared stem is never renamed away with mismatched content.
1436    #[allow(clippy::too_many_arguments)]
1437    async fn move_stem(
1438        &self,
1439        client_lock: &ClientLock<'_, C>,
1440        manifest: &mut Manifest,
1441        clip_id: &str,
1442        key: &str,
1443        stem_id: &str,
1444        from: &str,
1445        to: &str,
1446        source_url: &str,
1447        format: StemFormat,
1448        hash: &str,
1449        tracked_paths: &mut HashMap<String, u32>,
1450        committed: &BTreeSet<String>,
1451    ) -> Result<Effect, Fail> {
1452        if manifest.get(clip_id).is_none() {
1453            return Ok(Effect::Skipped);
1454        }
1455        let exclusive =
1456            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1457        if from != to && exclusive {
1458            match self.fs.rename(from, to) {
1459                Ok(()) => {
1460                    if let Some(count) = tracked_paths.get_mut(from) {
1461                        *count = count.saturating_sub(1);
1462                    }
1463                    if let Some(entry) = manifest.entries.get_mut(clip_id) {
1464                        set_manifest_stem(
1465                            entry,
1466                            key,
1467                            Some(ArtifactState {
1468                                path: to.to_owned(),
1469                                hash: hash.to_owned(),
1470                            }),
1471                        );
1472                    }
1473                    return Ok(Effect::Renamed);
1474                }
1475                Err(err) if err.is_out_of_space() => {
1476                    return Err(disk_fail(clip_id, "disk full: no space left to move stem"));
1477                }
1478                // The old file has vanished, or the rename is unsupported: fall
1479                // through to a fetch-and-write at `to`.
1480                Err(_) => {}
1481            }
1482        }
1483        self.write_stem(
1484            client_lock,
1485            manifest,
1486            clip_id,
1487            key,
1488            stem_id,
1489            to,
1490            source_url,
1491            format,
1492            hash,
1493            tracked_paths,
1494            committed,
1495        )
1496        .await
1497    }
1498
1499    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1500    ///
1501    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1502    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1503    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1504    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1505    /// stem with no id to render) downloads its public CDN url directly. Stems
1506    /// are the deliberate exception to the source format: the bytes are returned
1507    /// exactly as delivered and are never re-encoded to FLAC.
1508    async fn fetch_stem_bytes(
1509        &self,
1510        client_lock: &ClientLock<'_, C>,
1511        clip_id: &str,
1512        stem_id: &str,
1513        source_url: &str,
1514        format: StemFormat,
1515    ) -> Result<Vec<u8>, Fail> {
1516        let url = match format {
1517            StemFormat::Wav if !stem_id.is_empty() => {
1518                match self.resolve_wav_url(client_lock, stem_id).await? {
1519                    Some(url) => url,
1520                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1521                }
1522            }
1523            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1524            _ => source_url.to_owned(),
1525        };
1526        self.fetch_bytes(&url)
1527            .await
1528            .map_err(|err| err.attribute(clip_id))
1529    }
1530
1531    /// Remove one stem file and clear its slot in the owning clip's stem map.
1532    ///
1533    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1534    /// the owning entry is already gone (its audio was deleted earlier this run,
1535    /// co-deleting the stem), there is no slot to clear and that is fine; the
1536    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1537    fn delete_stem(
1538        &self,
1539        manifest: &mut Manifest,
1540        clip_id: &str,
1541        key: &str,
1542        path: &str,
1543    ) -> Result<Effect, Fail> {
1544        self.fs
1545            .remove(path)
1546            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1547        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1548            set_manifest_stem(entry, key, None);
1549        }
1550        Ok(Effect::ArtifactDeleted)
1551    }
1552
1553    /// Download (and transcode/tag) the audio for `clip` in `format`.
1554    async fn produce_audio(
1555        &self,
1556        client_lock: &ClientLock<'_, C>,
1557        clip: &Clip,
1558        lineage: &LineageContext,
1559        format: AudioFormat,
1560    ) -> Result<Vec<u8>, Fail> {
1561        let (meta, synced) = self.track_meta(clip, lineage);
1562        match format {
1563            AudioFormat::Mp3 => {
1564                let url = clip.mp3_url();
1565                let audio = self
1566                    .fetch_bytes(&url)
1567                    .await
1568                    .map_err(|err| err.attribute(&clip.id))?;
1569                let cover = self.fetch_cover(clip).await;
1570                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1571                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1572            }
1573            AudioFormat::Flac => {
1574                let wav = self.fetch_wav(client_lock, clip).await?;
1575                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1576                    if err.is_out_of_space() {
1577                        disk_fail(&clip.id, "disk full: no space left to transcode")
1578                    } else {
1579                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1580                    }
1581                })?;
1582                let cover = self.fetch_cover(clip).await;
1583                tag_flac(&flac, &meta, cover.as_deref())
1584                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1585            }
1586            AudioFormat::Wav => {
1587                let wav = self.fetch_wav(client_lock, clip).await?;
1588                let cover = self.fetch_cover(clip).await;
1589                tag_wav(&wav, &meta, cover.as_deref(), synced)
1590                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1591            }
1592        }
1593    }
1594
1595    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1596    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1597        self.synced
1598            .get(clip_id)
1599            .filter(|aligned| !aligned.is_empty())
1600    }
1601
1602    /// The track metadata for a clip, paired with its synced lyrics (if any).
1603    ///
1604    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1605    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1606    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1607    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1608    fn track_meta<'m>(
1609        &'m self,
1610        clip: &Clip,
1611        lineage: &LineageContext,
1612    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1613        let synced = self.synced_for(&clip.id);
1614        let mut meta = TrackMetadata::from_clip(clip, lineage);
1615        if let Some(aligned) = synced {
1616            meta.lyrics = aligned.plain_text();
1617        }
1618        (meta, synced)
1619    }
1620
1621    /// Resolve the rendered WAV URL and download it.
1622    async fn fetch_wav(
1623        &self,
1624        client_lock: &ClientLock<'_, C>,
1625        clip: &Clip,
1626    ) -> Result<Vec<u8>, Fail> {
1627        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1628            Some(url) => url,
1629            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1630        };
1631        self.fetch_bytes(&url)
1632            .await
1633            .map_err(|err| err.attribute(&clip.id))
1634    }
1635
1636    /// Read the WAV URL, requesting a render and polling if it is not ready.
1637    ///
1638    /// `None` means the render did not become ready within the poll budget; the
1639    /// caller treats that as a non-fatal transient failure, never a silent skip.
1640    ///
1641    /// Each client call briefly locks `client_lock`; the poll waits happen
1642    /// unlocked, so concurrent clips interleave their WAV renders rather than
1643    /// serialising behind one clip's whole poll budget.
1644    async fn resolve_wav_url(
1645        &self,
1646        client_lock: &ClientLock<'_, C>,
1647        id: &str,
1648    ) -> Result<Option<String>, Fail> {
1649        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1650            return Ok(Some(url));
1651        }
1652        self.request_wav_retrying(client_lock, id).await?;
1653        for _ in 0..self.opts.wav_poll_attempts {
1654            self.clock.sleep(self.opts.wav_poll_interval).await;
1655            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1656                return Ok(Some(url));
1657            }
1658        }
1659        Ok(None)
1660    }
1661
1662    /// Read the rendered WAV URL, retrying transient API failures with backoff
1663    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1664    async fn wav_url_retrying(
1665        &self,
1666        client_lock: &ClientLock<'_, C>,
1667        id: &str,
1668    ) -> Result<Option<String>, Fail> {
1669        let mut attempt: u32 = 0;
1670        loop {
1671            let result = {
1672                let mut client = client_lock.lock().await;
1673                client.wav_url(self.http, id).await
1674            };
1675            match result {
1676                Ok(url) => return Ok(url),
1677                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1678                    Some(fail) => return Err(fail),
1679                    None => continue,
1680                },
1681            }
1682        }
1683    }
1684
1685    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1686    async fn request_wav_retrying(
1687        &self,
1688        client_lock: &ClientLock<'_, C>,
1689        id: &str,
1690    ) -> Result<(), Fail> {
1691        let mut attempt: u32 = 0;
1692        loop {
1693            let result = {
1694                let mut client = client_lock.lock().await;
1695                client.request_wav(self.http, id).await
1696            };
1697            match result {
1698                Ok(()) => return Ok(()),
1699                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1700                    Some(fail) => return Err(fail),
1701                    None => continue,
1702                },
1703            }
1704        }
1705    }
1706
1707    /// Classify a core error from the authenticated WAV flow. On a transient
1708    /// class within budget, back off through the [`Clock`] and return `None` to
1709    /// retry; otherwise return the terminal [`Fail`].
1710    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1711        let fail = classify_core(id, err);
1712        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1713            self.clock.sleep(backoff_delay(*attempt, None)).await;
1714            *attempt += 1;
1715            None
1716        } else {
1717            Some(fail)
1718        }
1719    }
1720
1721    /// GET `url`, retrying transient failures with backoff, verifying size.
1722    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1723        let mut attempt: u32 = 0;
1724        loop {
1725            let result = self.http.send(HttpRequest::get(url)).await;
1726            match classify_response(result) {
1727                Ok(body) => return Ok(body),
1728                Err(err) => {
1729                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1730                        let delay = backoff_delay(attempt, err.retry_after);
1731                        self.clock.sleep(delay).await;
1732                        attempt += 1;
1733                        continue;
1734                    }
1735                    return Err(err);
1736                }
1737            }
1738        }
1739    }
1740
1741    /// Download cover art, trying each candidate URL in order; `None` is fine.
1742    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1743        for url in clip.cover_candidates() {
1744            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1745                && (200..=299).contains(&response.status)
1746                && !response.body.is_empty()
1747            {
1748                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1749                // bytes so its write reuses them instead of fetching again (#89).
1750                // The lock guards only the insert, never the await above.
1751                if self.cover_wanted.contains(url) {
1752                    self.cover_cache
1753                        .lock()
1754                        .expect("cover cache mutex poisoned")
1755                        .insert(url.to_owned(), response.body.clone());
1756                }
1757                return Some(response.body);
1758            }
1759        }
1760        None
1761    }
1762
1763    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1764    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1765        self.fs.write_atomic(path, bytes).map_err(|err| {
1766            if err.is_out_of_space() {
1767                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1768            } else {
1769                permanent_fail(clip_id, format!("write failed: {err}"))
1770            }
1771        })?;
1772        match self.fs.metadata(path) {
1773            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1774            Some(stat) => Err(permanent_fail(
1775                clip_id,
1776                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1777            )),
1778            None => Ok(bytes.len() as u64),
1779        }
1780    }
1781
1782    /// Build the manifest entry for a freshly written file.
1783    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1784        match self.by_id.get(clip_id) {
1785            Some(d) => manifest_entry(d, size),
1786            None => ManifestEntry {
1787                path: path.to_owned(),
1788                format,
1789                size,
1790                ..ManifestEntry::default()
1791            },
1792        }
1793    }
1794
1795    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1796    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1797        let desired = self.by_id.get(clip_id).copied();
1798        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1799            if let Some(d) = desired {
1800                entry.meta_hash = d.meta_hash.clone();
1801                entry.art_hash = d.art_hash.clone();
1802                entry.preserve = preserve_for(d);
1803            }
1804            if let Some(size) = size {
1805                entry.size = size;
1806            }
1807        }
1808    }
1809
1810    /// Refresh only an entry's preserve marker from the current desired state.
1811    ///
1812    /// A clip can gain or lose copy/private protection with no file change, which
1813    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1814    /// persisted marker a faithful image of live protection, so the cross-run
1815    /// delete guard (SYNC-8) never reads it stale.
1816    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1817        if let Some(d) = self.by_id.get(clip_id).copied()
1818            && let Some(entry) = manifest.entries.get_mut(clip_id)
1819        {
1820            entry.preserve = preserve_for(d);
1821        }
1822    }
1823}
1824
1825/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1826fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1827    ManifestEntry {
1828        path: d.path.clone(),
1829        format: d.format,
1830        meta_hash: d.meta_hash.clone(),
1831        art_hash: d.art_hash.clone(),
1832        size,
1833        preserve: preserve_for(d),
1834        ..Default::default()
1835    }
1836}
1837
1838/// Whether a written entry must be preserved across runs: held by any copy
1839/// source, or private. The reconcile delete guard reads this marker later.
1840fn preserve_for(d: &Desired) -> bool {
1841    d.private || d.modes.contains(&SourceMode::Copy)
1842}
1843
1844/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1845fn classify_response(
1846    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1847) -> Result<Vec<u8>, FetchError> {
1848    let response = match result {
1849        Ok(response) => response,
1850        Err(err) => {
1851            return Err(FetchError::transient(
1852                format!("transport error: {err}"),
1853                None,
1854            ));
1855        }
1856    };
1857    match response.status {
1858        200..=299 => {
1859            if let Some(expected) = content_length(&response) {
1860                let actual = response.body.len() as u64;
1861                if actual != expected {
1862                    return Err(FetchError::transient(
1863                        format!("truncated download: {actual} of {expected} bytes"),
1864                        None,
1865                    ));
1866                }
1867            }
1868            Ok(response.body)
1869        }
1870        401 | 403 => Err(FetchError::transient(
1871            format!("download rejected: status {}", response.status),
1872            None,
1873        )),
1874        408 => Err(FetchError::transient("request timed out", None)),
1875        429 => Err(FetchError::transient(
1876            "rate limited",
1877            retry_after(&response),
1878        )),
1879        500..=599 => Err(FetchError::transient(
1880            format!("server error {}", response.status),
1881            None,
1882        )),
1883        status => Err(FetchError::permanent(format!(
1884            "download failed: status {status}"
1885        ))),
1886    }
1887}
1888
1889/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1890fn classify_core(id: &str, err: Error) -> Fail {
1891    let reason = err.to_string();
1892    match err {
1893        Error::Auth(_) => auth_fail(id, reason),
1894        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1895        Error::Api(_)
1896        | Error::NotFound(_)
1897        | Error::Tag(_)
1898        | Error::Config(_)
1899        | Error::Refused(_) => permanent_fail(id, reason),
1900    }
1901}
1902
1903/// The provider-reported body size from `Content-Length`, if present and valid.
1904fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1905    response.header("content-length")?.trim().parse().ok()
1906}
1907
1908#[cfg(test)]
1909mod tests {
1910    use super::*;
1911    use crate::ClerkAuth;
1912    use crate::http::HttpResponse;
1913    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1914
1915    fn clip(id: &str) -> Clip {
1916        Clip {
1917            id: id.to_owned(),
1918            title: "Song".to_owned(),
1919            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1920            ..Default::default()
1921        }
1922    }
1923
1924    fn art_clip(id: &str) -> Clip {
1925        Clip {
1926            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1927            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1928            ..clip(id)
1929        }
1930    }
1931
1932    fn ext(format: AudioFormat) -> &'static str {
1933        match format {
1934            AudioFormat::Mp3 => "mp3",
1935            AudioFormat::Flac => "flac",
1936            AudioFormat::Wav => "wav",
1937        }
1938    }
1939
1940    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1941        Desired {
1942            path: format!("{}.{}", clip.id, ext(format)),
1943            lineage: LineageContext::own_root(&clip),
1944            clip,
1945            format,
1946            meta_hash: "m".to_owned(),
1947            art_hash: "art".to_owned(),
1948            modes: vec![SourceMode::Mirror],
1949            trashed: false,
1950            private: false,
1951            artifacts: Vec::new(),
1952            stems: None,
1953        }
1954    }
1955
1956    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1957        ManifestEntry {
1958            path: path.to_owned(),
1959            format,
1960            meta_hash: "old".to_owned(),
1961            art_hash: "old-art".to_owned(),
1962            size: 8,
1963            preserve: false,
1964            ..Default::default()
1965        }
1966    }
1967
1968    #[allow(clippy::too_many_arguments)]
1969    fn run<G: Ffmpeg>(
1970        plan: &Plan,
1971        manifest: &mut Manifest,
1972        desired: &[Desired],
1973        http: &ScriptedHttp,
1974        fs: &MemFs,
1975        ffmpeg: &G,
1976        clock: &RecordingClock,
1977        opts: &ExecOptions,
1978    ) -> ExecOutcome {
1979        let mut albums = BTreeMap::new();
1980        run_with_albums(
1981            plan,
1982            manifest,
1983            &mut albums,
1984            desired,
1985            http,
1986            fs,
1987            ffmpeg,
1988            clock,
1989            opts,
1990        )
1991    }
1992
1993    #[allow(clippy::too_many_arguments)]
1994    fn run_with_albums<G: Ffmpeg>(
1995        plan: &Plan,
1996        manifest: &mut Manifest,
1997        albums: &mut BTreeMap<String, AlbumArt>,
1998        desired: &[Desired],
1999        http: &ScriptedHttp,
2000        fs: &MemFs,
2001        ffmpeg: &G,
2002        clock: &RecordingClock,
2003        opts: &ExecOptions,
2004    ) -> ExecOutcome {
2005        let mut playlists = BTreeMap::new();
2006        run_full(
2007            plan,
2008            manifest,
2009            albums,
2010            &mut playlists,
2011            desired,
2012            http,
2013            fs,
2014            ffmpeg,
2015            clock,
2016            opts,
2017        )
2018    }
2019
2020    #[allow(clippy::too_many_arguments)]
2021    fn run_full<G: Ffmpeg>(
2022        plan: &Plan,
2023        manifest: &mut Manifest,
2024        albums: &mut BTreeMap<String, AlbumArt>,
2025        playlists: &mut BTreeMap<String, PlaylistState>,
2026        desired: &[Desired],
2027        http: &ScriptedHttp,
2028        fs: &MemFs,
2029        ffmpeg: &G,
2030        clock: &RecordingClock,
2031        opts: &ExecOptions,
2032    ) -> ExecOutcome {
2033        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2034        let synced = HashMap::new();
2035        pollster::block_on(execute(
2036            plan,
2037            manifest,
2038            albums,
2039            playlists,
2040            desired,
2041            &synced,
2042            Ports {
2043                client: &mut client,
2044                http,
2045                fs,
2046                ffmpeg,
2047                clock,
2048            },
2049            opts,
2050        ))
2051    }
2052
2053    fn small_poll() -> ExecOptions {
2054        ExecOptions {
2055            max_retries: 3,
2056            wav_poll_attempts: 2,
2057            wav_poll_interval: Duration::from_secs(5),
2058            concurrency: 4,
2059            cover_webp: WebpEncodeSettings::default(),
2060        }
2061    }
2062
2063    // ── Download: MP3 ───────────────────────────────────────────────
2064
2065    #[test]
2066    fn download_mp3_writes_tagged_file_and_records_manifest() {
2067        let c = art_clip("a");
2068        let d = desired(c.clone(), AudioFormat::Mp3);
2069        let plan = Plan {
2070            actions: vec![Action::Download {
2071                clip: c.clone(),
2072                lineage: LineageContext::own_root(&c),
2073                path: d.path.clone(),
2074                format: AudioFormat::Mp3,
2075            }],
2076        };
2077        let http = ScriptedHttp::new()
2078            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2079            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2080        let fs = MemFs::new();
2081        let ffmpeg = StubFfmpeg::flac();
2082        let clock = RecordingClock::new();
2083        let mut manifest = Manifest::new();
2084
2085        let outcome = run(
2086            &plan,
2087            &mut manifest,
2088            &[d],
2089            &http,
2090            &fs,
2091            &ffmpeg,
2092            &clock,
2093            &ExecOptions::default(),
2094        );
2095
2096        assert_eq!(outcome.downloaded, 1);
2097        assert_eq!(outcome.failed(), 0);
2098        assert_eq!(outcome.status, RunStatus::Completed);
2099        let written = fs.read_file("a.mp3").unwrap();
2100        assert_eq!(&written[..3], b"ID3");
2101        assert!(written.ends_with(b"mp3-body"));
2102        let entry = manifest.get("a").unwrap();
2103        assert_eq!(entry.path, "a.mp3");
2104        assert_eq!(entry.format, AudioFormat::Mp3);
2105        assert_eq!(entry.meta_hash, "m");
2106        assert_eq!(entry.art_hash, "art");
2107        assert_eq!(entry.size, written.len() as u64);
2108        assert!(!entry.preserve);
2109    }
2110
2111    #[test]
2112    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
2113        // A clip whose alignment was fetched this run gets a word-level SYLT frame
2114        // and its plain lyric text embedded (USLT), end to end through execute.
2115        let c = art_clip("a");
2116        let d = desired(c.clone(), AudioFormat::Mp3);
2117        let plan = Plan {
2118            actions: vec![Action::Download {
2119                clip: c.clone(),
2120                lineage: LineageContext::own_root(&c),
2121                path: d.path.clone(),
2122                format: AudioFormat::Mp3,
2123            }],
2124        };
2125        let http = ScriptedHttp::new()
2126            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2127            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2128        let fs = MemFs::new();
2129        let ffmpeg = StubFfmpeg::flac();
2130        let clock = RecordingClock::new();
2131        let mut manifest = Manifest::new();
2132        let mut albums = BTreeMap::new();
2133        let mut playlists = BTreeMap::new();
2134        let mut synced = HashMap::new();
2135        synced.insert(
2136            "a".to_string(),
2137            AlignedLyrics::from_json(&serde_json::json!({
2138                "aligned_words": [],
2139                "aligned_lyrics": [
2140                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
2141                     "words": [
2142                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
2143                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
2144                     ]}
2145                ]
2146            })),
2147        );
2148        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2149        let outcome = pollster::block_on(execute(
2150            &plan,
2151            &mut manifest,
2152            &mut albums,
2153            &mut playlists,
2154            &[d],
2155            &synced,
2156            Ports {
2157                client: &mut client,
2158                http: &http,
2159                fs: &fs,
2160                ffmpeg: &ffmpeg,
2161                clock: &clock,
2162            },
2163            &ExecOptions::default(),
2164        ));
2165
2166        assert_eq!(outcome.downloaded, 1);
2167        let written = fs.read_file("a.mp3").unwrap();
2168        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2169        assert_eq!(
2170            tag.synchronised_lyrics().count(),
2171            1,
2172            "a SYLT frame is embedded"
2173        );
2174        // The plain lyric text is populated from the alignment for the USLT frame.
2175        assert_eq!(
2176            tag.lyrics().next().map(|frame| frame.text.as_str()),
2177            Some("hi there")
2178        );
2179    }
2180
2181    #[test]
2182    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
2183        // The synced map is empty when the feature is off (no alignment fetched),
2184        // so no SYLT frame and no lyric text are embedded.
2185        let c = art_clip("a");
2186        let d = desired(c.clone(), AudioFormat::Mp3);
2187        let plan = Plan {
2188            actions: vec![Action::Download {
2189                clip: c.clone(),
2190                lineage: LineageContext::own_root(&c),
2191                path: d.path.clone(),
2192                format: AudioFormat::Mp3,
2193            }],
2194        };
2195        let http = ScriptedHttp::new()
2196            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2197            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2198        let fs = MemFs::new();
2199        let ffmpeg = StubFfmpeg::flac();
2200        let clock = RecordingClock::new();
2201        let mut manifest = Manifest::new();
2202        let mut albums = BTreeMap::new();
2203        let mut playlists = BTreeMap::new();
2204        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2205        let outcome = pollster::block_on(execute(
2206            &plan,
2207            &mut manifest,
2208            &mut albums,
2209            &mut playlists,
2210            &[d],
2211            &HashMap::new(),
2212            Ports {
2213                client: &mut client,
2214                http: &http,
2215                fs: &fs,
2216                ffmpeg: &ffmpeg,
2217                clock: &clock,
2218            },
2219            &ExecOptions::default(),
2220        ));
2221        assert_eq!(outcome.downloaded, 1);
2222        let written = fs.read_file("a.mp3").unwrap();
2223        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2224        assert_eq!(tag.synchronised_lyrics().count(), 0);
2225        assert_eq!(tag.lyrics().count(), 0);
2226    }
2227
2228    #[test]
2229    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
2230        let mut c = clip("a");
2231        c.audio_url = String::new();
2232        let d = desired(c.clone(), AudioFormat::Mp3);
2233        let plan = Plan {
2234            actions: vec![Action::Download {
2235                clip: c.clone(),
2236                lineage: LineageContext::own_root(&c),
2237                path: d.path.clone(),
2238                format: AudioFormat::Mp3,
2239            }],
2240        };
2241        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
2242        let fs = MemFs::new();
2243        let mut manifest = Manifest::new();
2244        let outcome = run(
2245            &plan,
2246            &mut manifest,
2247            &[d],
2248            &http,
2249            &fs,
2250            &StubFfmpeg::flac(),
2251            &RecordingClock::new(),
2252            &ExecOptions::default(),
2253        );
2254        assert_eq!(outcome.downloaded, 1);
2255        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
2256    }
2257
2258    // ── Download: FLAC render + transcode ───────────────────────────
2259
2260    #[test]
2261    fn download_flac_renders_transcodes_and_records() {
2262        let c = clip("b");
2263        let d = desired(c.clone(), AudioFormat::Flac);
2264        let plan = Plan {
2265            actions: vec![Action::Download {
2266                clip: c.clone(),
2267                lineage: LineageContext::own_root(&c),
2268                path: d.path.clone(),
2269                format: AudioFormat::Flac,
2270            }],
2271        };
2272        let http = ScriptedHttp::new()
2273            .with_auth()
2274            .route(
2275                "/wav_file/",
2276                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
2277            )
2278            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
2279        let fs = MemFs::new();
2280        let clock = RecordingClock::new();
2281        let mut manifest = Manifest::new();
2282
2283        let outcome = run(
2284            &plan,
2285            &mut manifest,
2286            &[d],
2287            &http,
2288            &fs,
2289            &StubFfmpeg::flac(),
2290            &clock,
2291            &ExecOptions::default(),
2292        );
2293
2294        assert_eq!(outcome.downloaded, 1);
2295        assert_eq!(outcome.failed(), 0);
2296        let written = fs.read_file("b.flac").unwrap();
2297        assert_eq!(&written[..4], b"fLaC");
2298        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
2299        // The URL was ready immediately, so no render request and no polling.
2300        assert_eq!(http.count("/convert_wav/"), 0);
2301        assert!(clock.sleeps().is_empty());
2302    }
2303
2304    #[test]
2305    fn download_flac_requests_render_then_polls_until_ready() {
2306        let c = clip("c");
2307        let d = desired(c.clone(), AudioFormat::Flac);
2308        let plan = Plan {
2309            actions: vec![Action::Download {
2310                clip: c.clone(),
2311                lineage: LineageContext::own_root(&c),
2312                path: d.path.clone(),
2313                format: AudioFormat::Flac,
2314            }],
2315        };
2316        let http = ScriptedHttp::new()
2317            .with_auth()
2318            .route_seq(
2319                "/wav_file/",
2320                vec![
2321                    Reply::json("{}"),
2322                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2323                ],
2324            )
2325            .route("/convert_wav/", Reply::status(200))
2326            .route("c.wav", Reply::ok(b"wav".to_vec()));
2327        let clock = RecordingClock::new();
2328        let mut manifest = Manifest::new();
2329
2330        let outcome = run(
2331            &plan,
2332            &mut manifest,
2333            &[d],
2334            &http,
2335            &fs_new(),
2336            &StubFfmpeg::flac(),
2337            &clock,
2338            &small_poll(),
2339        );
2340
2341        assert_eq!(outcome.downloaded, 1);
2342        assert_eq!(http.count("/convert_wav/"), 1);
2343        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2344    }
2345
2346    #[test]
2347    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2348        let c = clip("d");
2349        let d = desired(c.clone(), AudioFormat::Flac);
2350        let plan = Plan {
2351            actions: vec![Action::Download {
2352                clip: c.clone(),
2353                lineage: LineageContext::own_root(&c),
2354                path: d.path.clone(),
2355                format: AudioFormat::Flac,
2356            }],
2357        };
2358        let http = ScriptedHttp::new()
2359            .with_auth()
2360            .route("/wav_file/", Reply::json("{}"))
2361            .route("/convert_wav/", Reply::status(200));
2362        let fs = MemFs::new();
2363        let clock = RecordingClock::new();
2364        let mut manifest = Manifest::new();
2365
2366        let outcome = run(
2367            &plan,
2368            &mut manifest,
2369            &[d],
2370            &http,
2371            &fs,
2372            &StubFfmpeg::flac(),
2373            &clock,
2374            &small_poll(),
2375        );
2376
2377        assert_eq!(outcome.downloaded, 0);
2378        assert_eq!(outcome.failed(), 1);
2379        assert_eq!(outcome.failures[0].clip_id, "d");
2380        assert_eq!(outcome.status, RunStatus::Completed);
2381        assert!(!fs.exists("d.flac"));
2382        assert_eq!(clock.sleeps().len(), 2);
2383    }
2384
2385    #[test]
2386    fn flac_transcode_failure_is_recorded_and_skipped() {
2387        let c = clip("t");
2388        let d = desired(c.clone(), AudioFormat::Flac);
2389        let plan = Plan {
2390            actions: vec![Action::Download {
2391                clip: c.clone(),
2392                lineage: LineageContext::own_root(&c),
2393                path: d.path.clone(),
2394                format: AudioFormat::Flac,
2395            }],
2396        };
2397        let http = ScriptedHttp::new()
2398            .with_auth()
2399            .route(
2400                "/wav_file/",
2401                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2402            )
2403            .route("t.wav", Reply::ok(b"wav".to_vec()));
2404        let fs = MemFs::new();
2405        let mut manifest = Manifest::new();
2406
2407        let outcome = run(
2408            &plan,
2409            &mut manifest,
2410            &[d],
2411            &http,
2412            &fs,
2413            &StubFfmpeg::failing(),
2414            &RecordingClock::new(),
2415            &ExecOptions::default(),
2416        );
2417
2418        assert_eq!(outcome.downloaded, 0);
2419        assert_eq!(outcome.failed(), 1);
2420        assert!(!fs.exists("t.flac"));
2421        assert!(manifest.get("t").is_none());
2422    }
2423
2424    // ── Cover fallback ──────────────────────────────────────────────
2425
2426    #[test]
2427    fn cover_falls_back_when_large_image_is_missing() {
2428        let c = art_clip("e");
2429        let d = desired(c.clone(), AudioFormat::Mp3);
2430        let plan = Plan {
2431            actions: vec![Action::Download {
2432                clip: c.clone(),
2433                lineage: LineageContext::own_root(&c),
2434                path: d.path.clone(),
2435                format: AudioFormat::Mp3,
2436            }],
2437        };
2438        let http = ScriptedHttp::new()
2439            .route("e.mp3", Reply::ok(b"body".to_vec()))
2440            .route("e/large.jpg", Reply::status(404))
2441            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2442        let fs = MemFs::new();
2443        let mut manifest = Manifest::new();
2444
2445        let outcome = run(
2446            &plan,
2447            &mut manifest,
2448            &[d],
2449            &http,
2450            &fs,
2451            &StubFfmpeg::flac(),
2452            &RecordingClock::new(),
2453            &ExecOptions::default(),
2454        );
2455
2456        assert_eq!(outcome.downloaded, 1);
2457        let calls = http.calls();
2458        let large = calls
2459            .iter()
2460            .position(|u| u.contains("e/large.jpg"))
2461            .unwrap();
2462        let small = calls
2463            .iter()
2464            .position(|u| u.contains("e/small.jpg"))
2465            .unwrap();
2466        assert!(large < small, "large art tried before small");
2467    }
2468
2469    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2470
2471    #[test]
2472    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2473        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2474        // fetched once and the bytes serve both.
2475        let c = art_clip("a");
2476        let d = desired(c.clone(), AudioFormat::Mp3);
2477        let plan = Plan {
2478            actions: vec![
2479                Action::Download {
2480                    clip: c.clone(),
2481                    lineage: LineageContext::own_root(&c),
2482                    path: d.path.clone(),
2483                    format: AudioFormat::Mp3,
2484                },
2485                Action::WriteArtifact {
2486                    kind: ArtifactKind::CoverJpg,
2487                    path: "a/cover.jpg".to_owned(),
2488                    source_url: c.selected_image_url().unwrap().to_owned(),
2489                    hash: "art".to_owned(),
2490                    owner_id: "a".to_owned(),
2491                    content: None,
2492                },
2493            ],
2494        };
2495        let http = ScriptedHttp::new()
2496            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2497            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2498        let fs = MemFs::new();
2499        let mut manifest = Manifest::new();
2500
2501        let outcome = run(
2502            &plan,
2503            &mut manifest,
2504            &[d],
2505            &http,
2506            &fs,
2507            &StubFfmpeg::flac(),
2508            &RecordingClock::new(),
2509            &ExecOptions::default(),
2510        );
2511
2512        assert_eq!(outcome.downloaded, 1);
2513        assert_eq!(outcome.artifacts_written, 1);
2514        assert_eq!(outcome.failed(), 0);
2515        // Fetched once, not twice.
2516        assert_eq!(http.count("a/large.jpg"), 1);
2517        // The sidecar carries the fetched bytes, and the audio was tagged.
2518        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2519        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2520    }
2521
2522    #[test]
2523    fn concurrent_downloads_reuse_each_clips_own_cover() {
2524        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2525        // (no cross-contamination) and each cover URL is fetched exactly once.
2526        let a = art_clip("a");
2527        let b = art_clip("b");
2528        let da = desired(a.clone(), AudioFormat::Mp3);
2529        let db = desired(b.clone(), AudioFormat::Mp3);
2530        let plan = Plan {
2531            actions: vec![
2532                Action::Download {
2533                    clip: a.clone(),
2534                    lineage: LineageContext::own_root(&a),
2535                    path: da.path.clone(),
2536                    format: AudioFormat::Mp3,
2537                },
2538                Action::WriteArtifact {
2539                    kind: ArtifactKind::CoverJpg,
2540                    path: "a/cover.jpg".to_owned(),
2541                    source_url: a.selected_image_url().unwrap().to_owned(),
2542                    hash: "art".to_owned(),
2543                    owner_id: "a".to_owned(),
2544                    content: None,
2545                },
2546                Action::Download {
2547                    clip: b.clone(),
2548                    lineage: LineageContext::own_root(&b),
2549                    path: db.path.clone(),
2550                    format: AudioFormat::Mp3,
2551                },
2552                Action::WriteArtifact {
2553                    kind: ArtifactKind::CoverJpg,
2554                    path: "b/cover.jpg".to_owned(),
2555                    source_url: b.selected_image_url().unwrap().to_owned(),
2556                    hash: "art".to_owned(),
2557                    owner_id: "b".to_owned(),
2558                    content: None,
2559                },
2560            ],
2561        };
2562        let http = ScriptedHttp::new()
2563            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2564            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2565            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2566            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2567        let fs = MemFs::new();
2568        let mut manifest = Manifest::new();
2569
2570        let outcome = run(
2571            &plan,
2572            &mut manifest,
2573            &[da, db],
2574            &http,
2575            &fs,
2576            &StubFfmpeg::flac(),
2577            &RecordingClock::new(),
2578            &small_poll(),
2579        );
2580
2581        assert_eq!(outcome.downloaded, 2);
2582        assert_eq!(outcome.artifacts_written, 2);
2583        assert_eq!(http.count("a/large.jpg"), 1);
2584        assert_eq!(http.count("b/large.jpg"), 1);
2585        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2586        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2587    }
2588
2589    #[test]
2590    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2591        // The large image 404s so the embed falls back to the small image; the
2592        // sidecar still wants the (dead) large URL and must NOT be handed the
2593        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2594        // the sidecar fetches the large URL itself (then fails on the 404).
2595        let c = art_clip("e");
2596        let d = desired(c.clone(), AudioFormat::Mp3);
2597        let plan = Plan {
2598            actions: vec![
2599                Action::Download {
2600                    clip: c.clone(),
2601                    lineage: LineageContext::own_root(&c),
2602                    path: d.path.clone(),
2603                    format: AudioFormat::Mp3,
2604                },
2605                Action::WriteArtifact {
2606                    kind: ArtifactKind::CoverJpg,
2607                    path: "e/cover.jpg".to_owned(),
2608                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2609                    hash: "art".to_owned(),
2610                    owner_id: "e".to_owned(),
2611                    content: None,
2612                },
2613            ],
2614        };
2615        let http = ScriptedHttp::new()
2616            .route("e.mp3", Reply::ok(b"body".to_vec()))
2617            .route("e/large.jpg", Reply::status(404))
2618            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2619        let fs = MemFs::new();
2620        let mut manifest = Manifest::new();
2621
2622        let outcome = run(
2623            &plan,
2624            &mut manifest,
2625            &[d],
2626            &http,
2627            &fs,
2628            &StubFfmpeg::flac(),
2629            &RecordingClock::new(),
2630            &ExecOptions::default(),
2631        );
2632
2633        assert_eq!(outcome.downloaded, 1);
2634        // The small image was fetched once (the embed fallback) and never reused
2635        // for the large-keyed sidecar; the sidecar went to the network itself.
2636        assert_eq!(http.count("e/small.jpg"), 1);
2637        assert!(
2638            http.count("e/large.jpg") >= 2,
2639            "sidecar refetched the large URL"
2640        );
2641        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2642        assert!(!fs.exists("e/cover.jpg"));
2643    }
2644
2645    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2646
2647    #[test]
2648    fn failed_write_leaves_the_prior_file_intact() {
2649        let c = clip("f");
2650        let d = desired(c.clone(), AudioFormat::Mp3);
2651        let plan = Plan {
2652            actions: vec![Action::Download {
2653                clip: c.clone(),
2654                lineage: LineageContext::own_root(&c),
2655                path: d.path.clone(),
2656                format: AudioFormat::Mp3,
2657            }],
2658        };
2659        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2660        let fs = MemFs::new()
2661            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2662            .fail_write("f.mp3");
2663        let mut manifest = Manifest::new();
2664
2665        let outcome = run(
2666            &plan,
2667            &mut manifest,
2668            &[d],
2669            &http,
2670            &fs,
2671            &StubFfmpeg::flac(),
2672            &RecordingClock::new(),
2673            &ExecOptions::default(),
2674        );
2675
2676        assert_eq!(outcome.downloaded, 0);
2677        assert_eq!(outcome.failed(), 1);
2678        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2679        assert!(manifest.get("f").is_none());
2680    }
2681
2682    #[test]
2683    fn size_mismatch_after_write_is_a_failure() {
2684        let c = clip("g");
2685        let d = desired(c.clone(), AudioFormat::Mp3);
2686        let plan = Plan {
2687            actions: vec![Action::Download {
2688                clip: c.clone(),
2689                lineage: LineageContext::own_root(&c),
2690                path: d.path.clone(),
2691                format: AudioFormat::Mp3,
2692            }],
2693        };
2694        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2695        let fs = MemFs::new().corrupt_write("g.mp3");
2696        let mut manifest = Manifest::new();
2697
2698        let outcome = run(
2699            &plan,
2700            &mut manifest,
2701            &[d],
2702            &http,
2703            &fs,
2704            &StubFfmpeg::flac(),
2705            &RecordingClock::new(),
2706            &ExecOptions::default(),
2707        );
2708
2709        assert_eq!(outcome.downloaded, 0);
2710        assert_eq!(outcome.failed(), 1);
2711        assert!(outcome.failures[0].reason.contains("expected"));
2712        assert!(manifest.get("g").is_none());
2713    }
2714
2715    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2716
2717    #[test]
2718    fn transient_failure_is_retried_then_skipped() {
2719        let c = clip("h");
2720        let d = desired(c.clone(), AudioFormat::Mp3);
2721        let plan = Plan {
2722            actions: vec![Action::Download {
2723                clip: c.clone(),
2724                lineage: LineageContext::own_root(&c),
2725                path: d.path.clone(),
2726                format: AudioFormat::Mp3,
2727            }],
2728        };
2729        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2730        let fs = MemFs::new();
2731        let clock = RecordingClock::new();
2732        let opts = ExecOptions {
2733            max_retries: 2,
2734            ..ExecOptions::default()
2735        };
2736        let mut manifest = Manifest::new();
2737
2738        let outcome = run(
2739            &plan,
2740            &mut manifest,
2741            &[d],
2742            &http,
2743            &fs,
2744            &StubFfmpeg::flac(),
2745            &clock,
2746            &opts,
2747        );
2748
2749        assert_eq!(outcome.downloaded, 0);
2750        assert_eq!(outcome.failed(), 1);
2751        assert_eq!(http.count("h.mp3"), 3);
2752        assert_eq!(clock.sleeps().len(), 2);
2753    }
2754
2755    #[test]
2756    fn truncated_download_is_retried_then_succeeds() {
2757        let c = clip("i");
2758        let d = desired(c.clone(), AudioFormat::Mp3);
2759        let plan = Plan {
2760            actions: vec![Action::Download {
2761                clip: c.clone(),
2762                lineage: LineageContext::own_root(&c),
2763                path: d.path.clone(),
2764                format: AudioFormat::Mp3,
2765            }],
2766        };
2767        let http = ScriptedHttp::new().route_seq(
2768            "i.mp3",
2769            vec![
2770                Reply::ok(b"short".to_vec()).with_content_length(999),
2771                Reply::ok(b"good-body".to_vec()),
2772            ],
2773        );
2774        let fs = MemFs::new();
2775        let clock = RecordingClock::new();
2776        let mut manifest = Manifest::new();
2777
2778        let outcome = run(
2779            &plan,
2780            &mut manifest,
2781            &[d],
2782            &http,
2783            &fs,
2784            &StubFfmpeg::flac(),
2785            &clock,
2786            &ExecOptions::default(),
2787        );
2788
2789        assert_eq!(outcome.downloaded, 1);
2790        assert_eq!(http.count("i.mp3"), 2);
2791        assert_eq!(clock.sleeps().len(), 1);
2792    }
2793
2794    #[test]
2795    fn rate_limit_backs_off_using_retry_after() {
2796        let c = clip("j");
2797        let d = desired(c.clone(), AudioFormat::Mp3);
2798        let plan = Plan {
2799            actions: vec![Action::Download {
2800                clip: c.clone(),
2801                lineage: LineageContext::own_root(&c),
2802                path: d.path.clone(),
2803                format: AudioFormat::Mp3,
2804            }],
2805        };
2806        let http = ScriptedHttp::new().route_seq(
2807            "j.mp3",
2808            vec![
2809                Reply::status(429).with_retry_after(7),
2810                Reply::ok(b"body".to_vec()),
2811            ],
2812        );
2813        let fs = MemFs::new();
2814        let clock = RecordingClock::new();
2815        let mut manifest = Manifest::new();
2816
2817        let outcome = run(
2818            &plan,
2819            &mut manifest,
2820            &[d],
2821            &http,
2822            &fs,
2823            &StubFfmpeg::flac(),
2824            &clock,
2825            &ExecOptions::default(),
2826        );
2827
2828        assert_eq!(outcome.downloaded, 1);
2829        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2830    }
2831
2832    #[test]
2833    fn auth_failure_aborts_the_run() {
2834        let c1 = clip("k1");
2835        let c2 = clip("k2");
2836        let d1 = desired(c1.clone(), AudioFormat::Flac);
2837        let d2 = desired(c2.clone(), AudioFormat::Flac);
2838        let plan = Plan {
2839            actions: vec![
2840                Action::Download {
2841                    clip: c1.clone(),
2842                    lineage: LineageContext::own_root(&c1),
2843                    path: d1.path.clone(),
2844                    format: AudioFormat::Flac,
2845                },
2846                Action::Download {
2847                    clip: c2.clone(),
2848                    lineage: LineageContext::own_root(&c2),
2849                    path: d2.path.clone(),
2850                    format: AudioFormat::Flac,
2851                },
2852            ],
2853        };
2854        // The authenticated WAV-render endpoint rejects auth even after a JWT
2855        // refresh: that is a bad token, so the whole run aborts rather than
2856        // hammering every clip. A CDN media rejection, by contrast, does not.
2857        let http = ScriptedHttp::new()
2858            .with_auth()
2859            .route("/wav_file/", Reply::status(401));
2860        let fs = MemFs::new();
2861        let mut manifest = Manifest::new();
2862
2863        let outcome = run(
2864            &plan,
2865            &mut manifest,
2866            &[d1, d2],
2867            &http,
2868            &fs,
2869            &StubFfmpeg::flac(),
2870            &RecordingClock::new(),
2871            &small_poll(),
2872        );
2873
2874        assert_eq!(outcome.status, RunStatus::AuthAborted);
2875        assert_eq!(outcome.failed(), 1);
2876        assert_eq!(outcome.failures[0].clip_id, "k1");
2877        assert_eq!(outcome.downloaded, 0);
2878    }
2879
2880    // ── Disk-full aborts the run (issue #17) ────────────────────────
2881
2882    #[test]
2883    fn disk_full_primary_write_aborts_the_run() {
2884        // Two MP3 downloads; the first write is out of space. That is systemic,
2885        // so the run aborts before the second is even attempted: exactly one
2886        // failure is recorded and its reason names the disk-full cause.
2887        let c1 = clip("d1");
2888        let c2 = clip("d2");
2889        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2890        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2891        let plan = Plan {
2892            actions: vec![
2893                Action::Download {
2894                    clip: c1.clone(),
2895                    lineage: LineageContext::own_root(&c1),
2896                    path: d1.path.clone(),
2897                    format: AudioFormat::Mp3,
2898                },
2899                Action::Download {
2900                    clip: c2.clone(),
2901                    lineage: LineageContext::own_root(&c2),
2902                    path: d2.path.clone(),
2903                    format: AudioFormat::Mp3,
2904                },
2905            ],
2906        };
2907        let http = ScriptedHttp::new()
2908            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2909            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2910        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2911        let mut manifest = Manifest::new();
2912
2913        let outcome = run(
2914            &plan,
2915            &mut manifest,
2916            &[d1, d2],
2917            &http,
2918            &fs,
2919            &StubFfmpeg::flac(),
2920            &RecordingClock::new(),
2921            &ExecOptions::default(),
2922        );
2923
2924        assert_eq!(outcome.status, RunStatus::DiskFull);
2925        assert_eq!(outcome.failed(), 1);
2926        assert_eq!(outcome.failures[0].clip_id, "d1");
2927        assert!(outcome.failures[0].reason.contains("disk full"));
2928        assert_eq!(outcome.downloaded, 0);
2929        // The second clip was never fetched: the run aborted first.
2930        assert_eq!(http.count("d2.mp3"), 0);
2931        assert!(!fs.exists("d2.mp3"));
2932    }
2933
2934    #[test]
2935    fn disk_full_flac_transcode_aborts_the_run() {
2936        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2937        // there is nowhere to stage the transcode, so the run aborts.
2938        let c1 = clip("d1");
2939        let c2 = clip("d2");
2940        let d1 = desired(c1.clone(), AudioFormat::Flac);
2941        let d2 = desired(c2.clone(), AudioFormat::Flac);
2942        let plan = Plan {
2943            actions: vec![
2944                Action::Download {
2945                    clip: c1.clone(),
2946                    lineage: LineageContext::own_root(&c1),
2947                    path: d1.path.clone(),
2948                    format: AudioFormat::Flac,
2949                },
2950                Action::Download {
2951                    clip: c2.clone(),
2952                    lineage: LineageContext::own_root(&c2),
2953                    path: d2.path.clone(),
2954                    format: AudioFormat::Flac,
2955                },
2956            ],
2957        };
2958        let http = ScriptedHttp::new()
2959            .with_auth()
2960            .route(
2961                "/wav_file/",
2962                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2963            )
2964            .route(".wav", Reply::ok(b"wav".to_vec()));
2965        let fs = MemFs::new();
2966        let mut manifest = Manifest::new();
2967
2968        let outcome = run(
2969            &plan,
2970            &mut manifest,
2971            &[d1, d2],
2972            &http,
2973            &fs,
2974            &StubFfmpeg::out_of_space(),
2975            &RecordingClock::new(),
2976            &ExecOptions::default(),
2977        );
2978
2979        assert_eq!(outcome.status, RunStatus::DiskFull);
2980        assert_eq!(outcome.failed(), 1);
2981        assert_eq!(outcome.failures[0].clip_id, "d1");
2982        assert!(outcome.failures[0].reason.contains("disk full"));
2983        assert_eq!(outcome.downloaded, 0);
2984    }
2985
2986    #[test]
2987    fn disk_full_artifact_write_aborts_the_run() {
2988        // A sidecar write (not a primary download) also aborts on a full disk:
2989        // the owning audio is present, the cover fetch succeeds, but the sidecar
2990        // cannot be written.
2991        let mut manifest = Manifest::new();
2992        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2993        let plan = Plan {
2994            actions: vec![Action::WriteArtifact {
2995                kind: ArtifactKind::CoverJpg,
2996                path: "a/cover.jpg".to_owned(),
2997                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2998                hash: "h1".to_owned(),
2999                owner_id: "a".to_owned(),
3000                content: None,
3001            }],
3002        };
3003        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3004        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
3005
3006        let outcome = run(
3007            &plan,
3008            &mut manifest,
3009            &[],
3010            &http,
3011            &fs,
3012            &StubFfmpeg::flac(),
3013            &RecordingClock::new(),
3014            &ExecOptions::default(),
3015        );
3016
3017        assert_eq!(outcome.status, RunStatus::DiskFull);
3018        assert_eq!(outcome.failed(), 1);
3019        assert!(outcome.failures[0].reason.contains("disk full"));
3020        assert_eq!(outcome.artifacts_written, 0);
3021        // The sidecar slot was never recorded: the write failed before it.
3022        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3023    }
3024
3025    #[test]
3026    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
3027        // write_verify fails before any manifest insert, so a re-download that
3028        // hits a full disk leaves the prior entry (and file) exactly as it was.
3029        let c = clip("m");
3030        let d = desired(c.clone(), AudioFormat::Mp3);
3031        let plan = Plan {
3032            actions: vec![Action::Download {
3033                clip: c.clone(),
3034                lineage: LineageContext::own_root(&c),
3035                path: d.path.clone(),
3036                format: AudioFormat::Mp3,
3037            }],
3038        };
3039        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
3040        let fs = MemFs::new()
3041            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
3042            .fail_write_out_of_space("m.mp3");
3043        let mut manifest = Manifest::new();
3044        let before = entry("m.mp3", AudioFormat::Mp3);
3045        manifest.insert("m", before.clone());
3046
3047        let outcome = run(
3048            &plan,
3049            &mut manifest,
3050            &[d],
3051            &http,
3052            &fs,
3053            &StubFfmpeg::flac(),
3054            &RecordingClock::new(),
3055            &ExecOptions::default(),
3056        );
3057
3058        assert_eq!(outcome.status, RunStatus::DiskFull);
3059        assert_eq!(manifest.get("m"), Some(&before));
3060        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
3061    }
3062
3063    #[test]
3064    fn cdn_download_rejection_skips_the_clip_without_aborting() {
3065        let c1 = clip("k1");
3066        let c2 = clip("k2");
3067        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3068        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3069        let plan = Plan {
3070            actions: vec![
3071                Action::Download {
3072                    clip: c1.clone(),
3073                    lineage: LineageContext::own_root(&c1),
3074                    path: d1.path.clone(),
3075                    format: AudioFormat::Mp3,
3076                },
3077                Action::Download {
3078                    clip: c2.clone(),
3079                    lineage: LineageContext::own_root(&c2),
3080                    path: d2.path.clone(),
3081                    format: AudioFormat::Mp3,
3082                },
3083            ],
3084        };
3085        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
3086        // rejection (often transient), not a bad token: the clip is retried
3087        // then recorded and skipped, and the run carries on to the rest.
3088        let http = ScriptedHttp::new()
3089            .route("k1.mp3", Reply::status(403))
3090            .route("k2.mp3", Reply::ok(b"body".to_vec()));
3091        let fs = MemFs::new();
3092        let mut manifest = Manifest::new();
3093
3094        let outcome = run(
3095            &plan,
3096            &mut manifest,
3097            &[d1, d2],
3098            &http,
3099            &fs,
3100            &StubFfmpeg::flac(),
3101            &RecordingClock::new(),
3102            &ExecOptions::default(),
3103        );
3104
3105        assert_ne!(outcome.status, RunStatus::AuthAborted);
3106        assert_eq!(outcome.downloaded, 1);
3107        assert_eq!(outcome.failed(), 1);
3108        assert_eq!(outcome.failures[0].clip_id, "k1");
3109    }
3110
3111    #[test]
3112    fn one_clip_failure_does_not_abort_the_run() {
3113        let c1 = clip("l1");
3114        let c2 = clip("l2");
3115        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3116        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3117        let plan = Plan {
3118            actions: vec![
3119                Action::Download {
3120                    clip: c1.clone(),
3121                    lineage: LineageContext::own_root(&c1),
3122                    path: d1.path.clone(),
3123                    format: AudioFormat::Mp3,
3124                },
3125                Action::Download {
3126                    clip: c2.clone(),
3127                    lineage: LineageContext::own_root(&c2),
3128                    path: d2.path.clone(),
3129                    format: AudioFormat::Mp3,
3130                },
3131            ],
3132        };
3133        let http = ScriptedHttp::new()
3134            .route("l1.mp3", Reply::status(404))
3135            .route("l2.mp3", Reply::ok(b"body".to_vec()));
3136        let fs = MemFs::new();
3137        let mut manifest = Manifest::new();
3138
3139        let outcome = run(
3140            &plan,
3141            &mut manifest,
3142            &[d1, d2],
3143            &http,
3144            &fs,
3145            &StubFfmpeg::flac(),
3146            &RecordingClock::new(),
3147            &ExecOptions::default(),
3148        );
3149
3150        assert_eq!(outcome.status, RunStatus::Completed);
3151        assert_eq!(outcome.downloaded, 1);
3152        assert_eq!(outcome.failed(), 1);
3153        assert_eq!(outcome.failures[0].clip_id, "l1");
3154        assert!(fs.exists("l2.mp3"));
3155        assert!(manifest.get("l2").is_some());
3156        assert!(manifest.get("l1").is_none());
3157    }
3158
3159    // ── preserve marker (SYNC-8) ────────────────────────────────────
3160
3161    #[test]
3162    fn preserve_is_set_for_copy_held_and_private_clips() {
3163        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
3164        mirror.modes = vec![SourceMode::Mirror];
3165        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
3166        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
3167        let mut private = desired(clip("m3"), AudioFormat::Mp3);
3168        private.private = true;
3169
3170        let plan = Plan {
3171            actions: vec![
3172                Action::Download {
3173                    clip: mirror.clip.clone(),
3174                    lineage: LineageContext::own_root(&mirror.clip),
3175                    path: mirror.path.clone(),
3176                    format: AudioFormat::Mp3,
3177                },
3178                Action::Download {
3179                    clip: copy_held.clip.clone(),
3180                    lineage: LineageContext::own_root(&copy_held.clip),
3181                    path: copy_held.path.clone(),
3182                    format: AudioFormat::Mp3,
3183                },
3184                Action::Download {
3185                    clip: private.clip.clone(),
3186                    lineage: LineageContext::own_root(&private.clip),
3187                    path: private.path.clone(),
3188                    format: AudioFormat::Mp3,
3189                },
3190            ],
3191        };
3192        let http = ScriptedHttp::new()
3193            .route("m1.mp3", Reply::ok(b"a".to_vec()))
3194            .route("m2.mp3", Reply::ok(b"b".to_vec()))
3195            .route("m3.mp3", Reply::ok(b"c".to_vec()));
3196        let fs = MemFs::new();
3197        let mut manifest = Manifest::new();
3198
3199        let outcome = run(
3200            &plan,
3201            &mut manifest,
3202            &[mirror, copy_held, private],
3203            &http,
3204            &fs,
3205            &StubFfmpeg::flac(),
3206            &RecordingClock::new(),
3207            &ExecOptions::default(),
3208        );
3209
3210        assert_eq!(outcome.downloaded, 3);
3211        assert!(!manifest.get("m1").unwrap().preserve);
3212        assert!(manifest.get("m2").unwrap().preserve);
3213        assert!(manifest.get("m3").unwrap().preserve);
3214    }
3215
3216    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
3217
3218    #[test]
3219    fn reformat_writes_new_format_and_removes_old_file() {
3220        let c = clip("n");
3221        let d = desired(c.clone(), AudioFormat::Mp3);
3222        let plan = Plan {
3223            actions: vec![Action::Reformat {
3224                clip: c.clone(),
3225                path: "n.mp3".to_owned(),
3226                from_path: "n.flac".to_owned(),
3227                from: AudioFormat::Flac,
3228                to: AudioFormat::Mp3,
3229            }],
3230        };
3231        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
3232        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
3233        let mut manifest = Manifest::new();
3234        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
3235
3236        let outcome = run(
3237            &plan,
3238            &mut manifest,
3239            &[d],
3240            &http,
3241            &fs,
3242            &StubFfmpeg::flac(),
3243            &RecordingClock::new(),
3244            &ExecOptions::default(),
3245        );
3246
3247        assert_eq!(outcome.reformatted, 1);
3248        assert!(fs.exists("n.mp3"));
3249        assert!(!fs.exists("n.flac"));
3250        let updated = manifest.get("n").unwrap();
3251        assert_eq!(updated.path, "n.mp3");
3252        assert_eq!(updated.format, AudioFormat::Mp3);
3253        assert_eq!(updated.meta_hash, "m");
3254    }
3255
3256    #[test]
3257    fn retag_rewrites_file_and_updates_hashes() {
3258        let c = clip("o");
3259        let mut d = desired(c.clone(), AudioFormat::Mp3);
3260        d.meta_hash = "new".to_owned();
3261        d.art_hash = "new-art".to_owned();
3262        let existing = tag_mp3(
3263            b"audio",
3264            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
3265            None,
3266            None,
3267        )
3268        .unwrap();
3269        let fs = MemFs::new().with_file("o.mp3", existing.clone());
3270        let mut manifest = Manifest::new();
3271        let mut start = entry("o.mp3", AudioFormat::Mp3);
3272        start.size = existing.len() as u64;
3273        manifest.insert("o", start);
3274        let plan = Plan {
3275            actions: vec![Action::Retag {
3276                clip: c.clone(),
3277                lineage: LineageContext::own_root(&c),
3278                path: "o.mp3".to_owned(),
3279            }],
3280        };
3281
3282        let outcome = run(
3283            &plan,
3284            &mut manifest,
3285            &[d],
3286            &ScriptedHttp::new(),
3287            &fs,
3288            &StubFfmpeg::flac(),
3289            &RecordingClock::new(),
3290            &ExecOptions::default(),
3291        );
3292
3293        assert_eq!(outcome.retagged, 1);
3294        let updated = manifest.get("o").unwrap();
3295        assert_eq!(updated.meta_hash, "new");
3296        assert_eq!(updated.art_hash, "new-art");
3297        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
3298    }
3299
3300    #[test]
3301    fn rename_moves_file_and_updates_manifest_path() {
3302        let c = clip("p");
3303        let mut d = desired(c.clone(), AudioFormat::Mp3);
3304        d.path = "new/p.mp3".to_owned();
3305        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3306        let mut manifest = Manifest::new();
3307        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3308        let plan = Plan {
3309            actions: vec![Action::Rename {
3310                from: "old/p.mp3".to_owned(),
3311                to: "new/p.mp3".to_owned(),
3312            }],
3313        };
3314
3315        let outcome = run(
3316            &plan,
3317            &mut manifest,
3318            &[d],
3319            &ScriptedHttp::new(),
3320            &fs,
3321            &StubFfmpeg::flac(),
3322            &RecordingClock::new(),
3323            &ExecOptions::default(),
3324        );
3325
3326        assert_eq!(outcome.renamed, 1);
3327        assert!(fs.exists("new/p.mp3"));
3328        assert!(!fs.exists("old/p.mp3"));
3329        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3330    }
3331
3332    #[test]
3333    fn disk_full_rename_aborts_the_run() {
3334        // A move onto a full disk is systemic like a full-disk write: the run
3335        // aborts with DiskFull and the source file is left untouched.
3336        let c = clip("p");
3337        let mut d = desired(c.clone(), AudioFormat::Mp3);
3338        d.path = "new/p.mp3".to_owned();
3339        let fs = MemFs::new()
3340            .with_file("old/p.mp3", b"DATA".to_vec())
3341            .fail_rename_out_of_space("new/p.mp3");
3342        let mut manifest = Manifest::new();
3343        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3344        let plan = Plan {
3345            actions: vec![Action::Rename {
3346                from: "old/p.mp3".to_owned(),
3347                to: "new/p.mp3".to_owned(),
3348            }],
3349        };
3350
3351        let outcome = run(
3352            &plan,
3353            &mut manifest,
3354            &[d],
3355            &ScriptedHttp::new(),
3356            &fs,
3357            &StubFfmpeg::flac(),
3358            &RecordingClock::new(),
3359            &ExecOptions::default(),
3360        );
3361
3362        assert_eq!(outcome.status, RunStatus::DiskFull);
3363        assert_eq!(outcome.renamed, 0);
3364        assert_eq!(outcome.failed(), 1);
3365        assert!(outcome.failures[0].reason.contains("disk full"));
3366        // The source is untouched: the move never happened.
3367        assert!(fs.exists("old/p.mp3"));
3368        assert!(!fs.exists("new/p.mp3"));
3369        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3370    }
3371
3372    #[test]
3373    fn delete_removes_file_and_manifest_entry() {
3374        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3375        let mut manifest = Manifest::new();
3376        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3377        let plan = Plan {
3378            actions: vec![Action::Delete {
3379                path: "q.mp3".to_owned(),
3380                clip_id: "q".to_owned(),
3381            }],
3382        };
3383
3384        let outcome = run(
3385            &plan,
3386            &mut manifest,
3387            &[],
3388            &ScriptedHttp::new(),
3389            &fs,
3390            &StubFfmpeg::flac(),
3391            &RecordingClock::new(),
3392            &ExecOptions::default(),
3393        );
3394
3395        assert_eq!(outcome.deleted, 1);
3396        assert!(!fs.exists("q.mp3"));
3397        assert!(manifest.get("q").is_none());
3398    }
3399
3400    #[test]
3401    fn failed_delete_keeps_the_manifest_entry() {
3402        let fs = MemFs::new()
3403            .with_file("s.mp3", b"DATA".to_vec())
3404            .fail_remove("s.mp3");
3405        let mut manifest = Manifest::new();
3406        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3407        let plan = Plan {
3408            actions: vec![Action::Delete {
3409                path: "s.mp3".to_owned(),
3410                clip_id: "s".to_owned(),
3411            }],
3412        };
3413
3414        let outcome = run(
3415            &plan,
3416            &mut manifest,
3417            &[],
3418            &ScriptedHttp::new(),
3419            &fs,
3420            &StubFfmpeg::flac(),
3421            &RecordingClock::new(),
3422            &ExecOptions::default(),
3423        );
3424
3425        assert_eq!(outcome.deleted, 0);
3426        assert_eq!(outcome.failed(), 1);
3427        assert!(manifest.get("s").is_some());
3428        assert!(fs.exists("s.mp3"));
3429    }
3430
3431    #[test]
3432    fn skip_is_a_noop() {
3433        let mut manifest = Manifest::new();
3434        let plan = Plan {
3435            actions: vec![Action::Skip {
3436                clip_id: "r".to_owned(),
3437            }],
3438        };
3439        let outcome = run(
3440            &plan,
3441            &mut manifest,
3442            &[],
3443            &ScriptedHttp::new(),
3444            &MemFs::new(),
3445            &StubFfmpeg::flac(),
3446            &RecordingClock::new(),
3447            &ExecOptions::default(),
3448        );
3449        assert_eq!(outcome.skipped, 1);
3450        assert_eq!(outcome.failed(), 0);
3451    }
3452
3453    // ── Pure helpers ────────────────────────────────────────────────
3454
3455    #[test]
3456    fn header_helpers_parse_or_ignore() {
3457        let resp = HttpResponse {
3458            status: 200,
3459            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3460            body: Vec::new(),
3461        };
3462        assert_eq!(content_length(&resp), Some(42));
3463
3464        let bare = HttpResponse {
3465            status: 200,
3466            headers: Vec::new(),
3467            body: Vec::new(),
3468        };
3469        assert_eq!(content_length(&bare), None);
3470    }
3471
3472    #[test]
3473    fn preserve_rule_covers_copy_and_private() {
3474        let base = desired(clip("x"), AudioFormat::Mp3);
3475        assert!(!preserve_for(&base));
3476        let mut copy_held = base.clone();
3477        copy_held.modes = vec![SourceMode::Copy];
3478        assert!(preserve_for(&copy_held));
3479        let mut private = base.clone();
3480        private.private = true;
3481        assert!(preserve_for(&private));
3482    }
3483
3484    fn fs_new() -> MemFs {
3485        MemFs::new()
3486    }
3487
3488    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3489
3490    #[test]
3491    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3492        let c = clip("s1");
3493        let mut d = desired(c.clone(), AudioFormat::Mp3);
3494        d.modes = vec![SourceMode::Copy];
3495        let plan = Plan {
3496            actions: vec![Action::Skip {
3497                clip_id: "s1".to_owned(),
3498            }],
3499        };
3500        let mut manifest = Manifest::new();
3501        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3502        assert!(!manifest.get("s1").unwrap().preserve);
3503
3504        let outcome = run(
3505            &plan,
3506            &mut manifest,
3507            &[d],
3508            &ScriptedHttp::new(),
3509            &fs_new(),
3510            &StubFfmpeg::flac(),
3511            &RecordingClock::new(),
3512            &ExecOptions::default(),
3513        );
3514
3515        assert_eq!(outcome.skipped, 1);
3516        assert!(
3517            manifest.get("s1").unwrap().preserve,
3518            "a copy-held skip must mark the entry preserved"
3519        );
3520    }
3521
3522    #[test]
3523    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3524        let c = clip("s2");
3525        let d = desired(c.clone(), AudioFormat::Mp3);
3526        let plan = Plan {
3527            actions: vec![Action::Skip {
3528                clip_id: "s2".to_owned(),
3529            }],
3530        };
3531        let mut manifest = Manifest::new();
3532        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3533        stale.preserve = true;
3534        manifest.insert("s2".to_owned(), stale);
3535
3536        run(
3537            &plan,
3538            &mut manifest,
3539            &[d],
3540            &ScriptedHttp::new(),
3541            &fs_new(),
3542            &StubFfmpeg::flac(),
3543            &RecordingClock::new(),
3544            &ExecOptions::default(),
3545        );
3546
3547        assert!(
3548            !manifest.get("s2").unwrap().preserve,
3549            "a mirror-only skip must clear a stale preserve marker"
3550        );
3551    }
3552
3553    #[test]
3554    fn flac_render_retries_a_rate_limited_wav_lookup() {
3555        let c = clip("rl");
3556        let d = desired(c.clone(), AudioFormat::Flac);
3557        let plan = Plan {
3558            actions: vec![Action::Download {
3559                clip: c.clone(),
3560                lineage: LineageContext::own_root(&c),
3561                path: d.path.clone(),
3562                format: AudioFormat::Flac,
3563            }],
3564        };
3565        let http = ScriptedHttp::new()
3566            .with_auth()
3567            .route_seq(
3568                "/wav_file/",
3569                vec![
3570                    Reply::status(429),
3571                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3572                ],
3573            )
3574            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3575        let clock = RecordingClock::new();
3576        let mut manifest = Manifest::new();
3577
3578        let outcome = run(
3579            &plan,
3580            &mut manifest,
3581            &[d],
3582            &http,
3583            &fs_new(),
3584            &StubFfmpeg::flac(),
3585            &clock,
3586            &small_poll(),
3587        );
3588
3589        assert_eq!(outcome.downloaded, 1);
3590        assert_eq!(outcome.failed(), 0);
3591        // The render was ready on retry, so no fresh convert_wav was needed.
3592        assert_eq!(http.count("/convert_wav/"), 0);
3593        // One transient backoff (1s base), not the 5s poll interval.
3594        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3595    }
3596
3597    // ── Phase 6: artifact actions ───────────────────────────────────
3598
3599    #[test]
3600    fn write_artifact_fetches_writes_and_updates_manifest() {
3601        // The owning entry exists (its audio was kept this run); WriteArtifact
3602        // fetches the source, writes the sidecar, and records it on the entry.
3603        let mut manifest = Manifest::new();
3604        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3605        let plan = Plan {
3606            actions: vec![Action::WriteArtifact {
3607                kind: ArtifactKind::CoverJpg,
3608                path: "a/cover.jpg".to_owned(),
3609                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3610                hash: "h1".to_owned(),
3611                owner_id: "a".to_owned(),
3612                content: None,
3613            }],
3614        };
3615        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3616        let fs = MemFs::new();
3617
3618        let outcome = run(
3619            &plan,
3620            &mut manifest,
3621            &[],
3622            &http,
3623            &fs,
3624            &StubFfmpeg::flac(),
3625            &RecordingClock::new(),
3626            &ExecOptions::default(),
3627        );
3628
3629        assert_eq!(outcome.artifacts_written, 1);
3630        assert_eq!(outcome.failed(), 0);
3631        assert_eq!(outcome.status, RunStatus::Completed);
3632        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3633        assert_eq!(
3634            manifest.get("a").unwrap().cover_jpg,
3635            Some(ArtifactState {
3636                path: "a/cover.jpg".to_owned(),
3637                hash: "h1".to_owned(),
3638            })
3639        );
3640    }
3641
3642    #[test]
3643    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3644        // A generated text sidecar carries its body inline, so it is written
3645        // verbatim with NO HTTP fetch and the details slot records its state.
3646        let mut manifest = Manifest::new();
3647        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3648        let plan = Plan {
3649            actions: vec![Action::WriteArtifact {
3650                kind: ArtifactKind::DetailsTxt,
3651                path: "a.details.txt".to_owned(),
3652                source_url: String::new(),
3653                hash: "dh".to_owned(),
3654                owner_id: "a".to_owned(),
3655                content: Some("Title: A\n".to_owned()),
3656            }],
3657        };
3658        // An empty HTTP script: any fetch would fail, proving none happens.
3659        let http = ScriptedHttp::new();
3660        let fs = MemFs::new();
3661
3662        let outcome = run(
3663            &plan,
3664            &mut manifest,
3665            &[],
3666            &http,
3667            &fs,
3668            &StubFfmpeg::flac(),
3669            &RecordingClock::new(),
3670            &ExecOptions::default(),
3671        );
3672
3673        assert_eq!(outcome.artifacts_written, 1);
3674        assert_eq!(outcome.failed(), 0);
3675        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3676        assert_eq!(
3677            manifest.get("a").unwrap().details_txt,
3678            Some(ArtifactState {
3679                path: "a.details.txt".to_owned(),
3680                hash: "dh".to_owned(),
3681            })
3682        );
3683    }
3684
3685    #[test]
3686    fn write_lyrics_sidecar_relocation_removes_old_file() {
3687        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3688        // the executor writes the new file and prunes the stale one.
3689        let mut manifest = Manifest::new();
3690        let mut e = entry("old/a.flac", AudioFormat::Flac);
3691        e.lyrics_txt = Some(ArtifactState {
3692            path: "old/a.lyrics.txt".to_owned(),
3693            hash: "lh".to_owned(),
3694        });
3695        manifest.insert("a", e);
3696        let fs = MemFs::new()
3697            .with_file("old/a.flac", b"AUDIO".to_vec())
3698            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3699        let plan = Plan {
3700            actions: vec![Action::WriteArtifact {
3701                kind: ArtifactKind::LyricsTxt,
3702                path: "new/a.lyrics.txt".to_owned(),
3703                source_url: String::new(),
3704                hash: "lh".to_owned(),
3705                owner_id: "a".to_owned(),
3706                content: Some("new words\n".to_owned()),
3707            }],
3708        };
3709
3710        let outcome = run(
3711            &plan,
3712            &mut manifest,
3713            &[],
3714            &ScriptedHttp::new(),
3715            &fs,
3716            &StubFfmpeg::flac(),
3717            &RecordingClock::new(),
3718            &ExecOptions::default(),
3719        );
3720
3721        assert_eq!(outcome.failed(), 0);
3722        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3723        assert!(!fs.exists("old/a.lyrics.txt"));
3724        assert_eq!(
3725            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3726            "new/a.lyrics.txt"
3727        );
3728    }
3729
3730    #[test]
3731    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3732        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3733        // Each write's inline old-path cleanup must skip a path another action
3734        // writes this run, or the second write would delete the first's freshly
3735        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3736        // for every sidecar, including the .mp4 video.
3737        let mut manifest = Manifest::new();
3738        let mut a = entry("a.flac", AudioFormat::Flac);
3739        a.lyrics_txt = Some(ArtifactState {
3740            path: "x.lyrics.txt".to_owned(),
3741            hash: "ah".to_owned(),
3742        });
3743        manifest.insert("a", a);
3744        let mut b = entry("b.flac", AudioFormat::Flac);
3745        b.lyrics_txt = Some(ArtifactState {
3746            path: "y.lyrics.txt".to_owned(),
3747            hash: "bh".to_owned(),
3748        });
3749        manifest.insert("b", b);
3750        let fs = MemFs::new()
3751            .with_file("a.flac", b"A".to_vec())
3752            .with_file("b.flac", b"B".to_vec())
3753            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3754            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3755        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3756        let plan = Plan {
3757            actions: vec![
3758                Action::WriteArtifact {
3759                    kind: ArtifactKind::LyricsTxt,
3760                    path: "y.lyrics.txt".to_owned(),
3761                    source_url: String::new(),
3762                    hash: "ah".to_owned(),
3763                    owner_id: "a".to_owned(),
3764                    content: Some("A words\n".to_owned()),
3765                },
3766                Action::WriteArtifact {
3767                    kind: ArtifactKind::LyricsTxt,
3768                    path: "x.lyrics.txt".to_owned(),
3769                    source_url: String::new(),
3770                    hash: "bh".to_owned(),
3771                    owner_id: "b".to_owned(),
3772                    content: Some("B words\n".to_owned()),
3773                },
3774            ],
3775        };
3776
3777        let outcome = run(
3778            &plan,
3779            &mut manifest,
3780            &[],
3781            &ScriptedHttp::new(),
3782            &fs,
3783            &StubFfmpeg::flac(),
3784            &RecordingClock::new(),
3785            &ExecOptions::default(),
3786        );
3787
3788        assert_eq!(outcome.failed(), 0);
3789        // Both freshly written files survive; neither cleanup clobbered the other.
3790        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3791        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3792        assert_eq!(
3793            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3794            "y.lyrics.txt"
3795        );
3796        assert_eq!(
3797            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3798            "x.lyrics.txt"
3799        );
3800    }
3801
3802    #[test]
3803    fn old_sidecar_kept_when_another_clip_still_references_it() {
3804        // A prior failed swap can leave two clips pointing at one path (A -> y and
3805        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3806        // still A's live file (#76). tracked_paths counts two references to y, so
3807        // the removal is skipped even though y is not a write target this run.
3808        let mut manifest = Manifest::new();
3809        let mut a = entry("a.flac", AudioFormat::Flac);
3810        a.lyrics_txt = Some(ArtifactState {
3811            path: "y.lyrics.txt".to_owned(),
3812            hash: "ah".to_owned(),
3813        });
3814        manifest.insert("a", a);
3815        let mut b = entry("b.flac", AudioFormat::Flac);
3816        b.lyrics_txt = Some(ArtifactState {
3817            path: "y.lyrics.txt".to_owned(),
3818            hash: "bh".to_owned(),
3819        });
3820        manifest.insert("b", b);
3821        let fs = MemFs::new()
3822            .with_file("a.flac", b"A".to_vec())
3823            .with_file("b.flac", b"B".to_vec())
3824            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3825        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3826        // the tracked-reference count is what protects A's file.
3827        let plan = Plan {
3828            actions: vec![Action::WriteArtifact {
3829                kind: ArtifactKind::LyricsTxt,
3830                path: "x.lyrics.txt".to_owned(),
3831                source_url: String::new(),
3832                hash: "bh".to_owned(),
3833                owner_id: "b".to_owned(),
3834                content: Some("B words\n".to_owned()),
3835            }],
3836        };
3837
3838        let outcome = run(
3839            &plan,
3840            &mut manifest,
3841            &[],
3842            &ScriptedHttp::new(),
3843            &fs,
3844            &StubFfmpeg::flac(),
3845            &RecordingClock::new(),
3846            &ExecOptions::default(),
3847        );
3848
3849        assert_eq!(outcome.failed(), 0);
3850        assert!(
3851            fs.exists("y.lyrics.txt"),
3852            "A's live sidecar must not be deleted"
3853        );
3854        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3855    }
3856
3857    #[test]
3858    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3859        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3860        // When BOTH move away this run, the path is no longer live, so the last
3861        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3862        // still referenced. The dynamic reference count drops to zero only after
3863        // both moves, so exactly the final cleanup removes it (#76).
3864        let mut manifest = Manifest::new();
3865        let mut a = entry("a.flac", AudioFormat::Flac);
3866        a.lyrics_txt = Some(ArtifactState {
3867            path: "s.lyrics.txt".to_owned(),
3868            hash: "ah".to_owned(),
3869        });
3870        manifest.insert("a", a);
3871        let mut b = entry("b.flac", AudioFormat::Flac);
3872        b.lyrics_txt = Some(ArtifactState {
3873            path: "s.lyrics.txt".to_owned(),
3874            hash: "bh".to_owned(),
3875        });
3876        manifest.insert("b", b);
3877        let fs = MemFs::new()
3878            .with_file("a.flac", b"A".to_vec())
3879            .with_file("b.flac", b"B".to_vec())
3880            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3881        let plan = Plan {
3882            actions: vec![
3883                Action::WriteArtifact {
3884                    kind: ArtifactKind::LyricsTxt,
3885                    path: "pa.lyrics.txt".to_owned(),
3886                    source_url: String::new(),
3887                    hash: "ah".to_owned(),
3888                    owner_id: "a".to_owned(),
3889                    content: Some("A words\n".to_owned()),
3890                },
3891                Action::WriteArtifact {
3892                    kind: ArtifactKind::LyricsTxt,
3893                    path: "pb.lyrics.txt".to_owned(),
3894                    source_url: String::new(),
3895                    hash: "bh".to_owned(),
3896                    owner_id: "b".to_owned(),
3897                    content: Some("B words\n".to_owned()),
3898                },
3899            ],
3900        };
3901
3902        let outcome = run(
3903            &plan,
3904            &mut manifest,
3905            &[],
3906            &ScriptedHttp::new(),
3907            &fs,
3908            &StubFfmpeg::flac(),
3909            &RecordingClock::new(),
3910            &ExecOptions::default(),
3911        );
3912
3913        assert_eq!(outcome.failed(), 0);
3914        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3915        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3916        assert!(
3917            !fs.exists("s.lyrics.txt"),
3918            "the vacated shared path must be reclaimed, not orphaned"
3919        );
3920    }
3921
3922    #[test]
3923    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3924        // A text sidecar for a clip with no manifest entry (its audio download
3925        // failed) must be skipped, never writing an untracked file.
3926        let plan = Plan {
3927            actions: vec![Action::WriteArtifact {
3928                kind: ArtifactKind::DetailsTxt,
3929                path: "gone.details.txt".to_owned(),
3930                source_url: String::new(),
3931                hash: "dh".to_owned(),
3932                owner_id: "gone".to_owned(),
3933                content: Some("Title: Gone\n".to_owned()),
3934            }],
3935        };
3936        let fs = MemFs::new();
3937        let mut manifest = Manifest::new();
3938
3939        let outcome = run(
3940            &plan,
3941            &mut manifest,
3942            &[],
3943            &ScriptedHttp::new(),
3944            &fs,
3945            &StubFfmpeg::flac(),
3946            &RecordingClock::new(),
3947            &ExecOptions::default(),
3948        );
3949
3950        assert_eq!(outcome.artifacts_written, 0);
3951        assert_eq!(outcome.skipped, 1);
3952        assert!(!fs.exists("gone.details.txt"));
3953        assert!(manifest.get("gone").is_none());
3954    }
3955
3956    #[test]
3957    fn delete_artifact_removes_file_and_clears_slot() {
3958        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3959        let mut manifest = Manifest::new();
3960        let mut e = entry("a.mp3", AudioFormat::Mp3);
3961        e.cover_jpg = Some(ArtifactState {
3962            path: "a/cover.jpg".to_owned(),
3963            hash: "h1".to_owned(),
3964        });
3965        manifest.insert("a", e);
3966        let plan = Plan {
3967            actions: vec![Action::DeleteArtifact {
3968                kind: ArtifactKind::CoverJpg,
3969                path: "a/cover.jpg".to_owned(),
3970                owner_id: "a".to_owned(),
3971            }],
3972        };
3973
3974        let outcome = run(
3975            &plan,
3976            &mut manifest,
3977            &[],
3978            &ScriptedHttp::new(),
3979            &fs,
3980            &StubFfmpeg::flac(),
3981            &RecordingClock::new(),
3982            &ExecOptions::default(),
3983        );
3984
3985        assert_eq!(outcome.artifacts_deleted, 1);
3986        assert!(!fs.exists("a/cover.jpg"));
3987        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3988    }
3989
3990    #[test]
3991    fn delete_artifact_tolerates_already_absent_file() {
3992        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3993        // is not a failure.
3994        let mut manifest = Manifest::new();
3995        let mut e = entry("a.mp3", AudioFormat::Mp3);
3996        e.cover_jpg = Some(ArtifactState {
3997            path: "a/cover.jpg".to_owned(),
3998            hash: "h1".to_owned(),
3999        });
4000        manifest.insert("a", e);
4001        let plan = Plan {
4002            actions: vec![Action::DeleteArtifact {
4003                kind: ArtifactKind::CoverJpg,
4004                path: "a/cover.jpg".to_owned(),
4005                owner_id: "a".to_owned(),
4006            }],
4007        };
4008
4009        let outcome = run(
4010            &plan,
4011            &mut manifest,
4012            &[],
4013            &ScriptedHttp::new(),
4014            &MemFs::new(),
4015            &StubFfmpeg::flac(),
4016            &RecordingClock::new(),
4017            &ExecOptions::default(),
4018        );
4019
4020        assert_eq!(outcome.artifacts_deleted, 1);
4021        assert_eq!(outcome.failed(), 0);
4022        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4023    }
4024
4025    #[test]
4026    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
4027        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
4028        // the run continues and the following WriteArtifact still succeeds.
4029        let mut manifest = Manifest::new();
4030        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4031        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4032        let plan = Plan {
4033            actions: vec![
4034                Action::WriteArtifact {
4035                    kind: ArtifactKind::CoverJpg,
4036                    path: "a/cover.jpg".to_owned(),
4037                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4038                    hash: "h1".to_owned(),
4039                    owner_id: "a".to_owned(),
4040                    content: None,
4041                },
4042                Action::WriteArtifact {
4043                    kind: ArtifactKind::CoverJpg,
4044                    path: "b/cover.jpg".to_owned(),
4045                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4046                    hash: "h2".to_owned(),
4047                    owner_id: "b".to_owned(),
4048                    content: None,
4049                },
4050            ],
4051        };
4052        let http = ScriptedHttp::new()
4053            .route("a/large.jpg", Reply::status(404))
4054            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4055        let fs = MemFs::new();
4056
4057        let outcome = run(
4058            &plan,
4059            &mut manifest,
4060            &[],
4061            &http,
4062            &fs,
4063            &StubFfmpeg::flac(),
4064            &RecordingClock::new(),
4065            &ExecOptions::default(),
4066        );
4067
4068        assert_eq!(outcome.status, RunStatus::Completed);
4069        assert_eq!(outcome.failed(), 1);
4070        assert_eq!(outcome.failures[0].clip_id, "a");
4071        assert_eq!(outcome.artifacts_written, 1);
4072        // The failed sidecar left no file and no manifest record.
4073        assert!(!fs.exists("a/cover.jpg"));
4074        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4075        // The following sidecar was written and recorded.
4076        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4077        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4078    }
4079
4080    #[test]
4081    fn stranded_old_sidecar_removed_when_colliding_writer_fails() {
4082        // #142: clip A moves its cover shared -> a/cover.jpg (fetch succeeds);
4083        // clip B is planned to write the vacated `shared` path but its fetch
4084        // fails. The old-path cleanup is gated on COMMITTED writes, not planned
4085        // ones, so B's failed write no longer protects the stale file: A's old
4086        // `shared` copy is removed rather than left as an untracked orphan.
4087        let mut manifest = Manifest::new();
4088        let mut a = entry("a.mp3", AudioFormat::Mp3);
4089        a.cover_jpg = Some(ArtifactState {
4090            path: "shared/cover.jpg".to_owned(),
4091            hash: "ha".to_owned(),
4092        });
4093        manifest.insert("a", a);
4094        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4095        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4096        let plan = Plan {
4097            actions: vec![
4098                Action::WriteArtifact {
4099                    kind: ArtifactKind::CoverJpg,
4100                    path: "a/cover.jpg".to_owned(),
4101                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4102                    hash: "ha".to_owned(),
4103                    owner_id: "a".to_owned(),
4104                    content: None,
4105                },
4106                Action::WriteArtifact {
4107                    kind: ArtifactKind::CoverJpg,
4108                    path: "shared/cover.jpg".to_owned(),
4109                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4110                    hash: "hb".to_owned(),
4111                    owner_id: "b".to_owned(),
4112                    content: None,
4113                },
4114            ],
4115        };
4116        let http = ScriptedHttp::new()
4117            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4118            .route("b/large.jpg", Reply::status(404));
4119
4120        let outcome = run(
4121            &plan,
4122            &mut manifest,
4123            &[],
4124            &http,
4125            &fs,
4126            &StubFfmpeg::flac(),
4127            &RecordingClock::new(),
4128            &ExecOptions::default(),
4129        );
4130
4131        assert_eq!(outcome.failed(), 1);
4132        assert_eq!(outcome.failures[0].clip_id, "b");
4133        // A's move committed; the vacated file is gone, not an orphan.
4134        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4135        assert!(
4136            !fs.exists("shared/cover.jpg"),
4137            "the vacated file must be removed once the colliding writer failed"
4138        );
4139        assert_eq!(
4140            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4141            "a/cover.jpg"
4142        );
4143    }
4144
4145    #[test]
4146    fn committed_write_at_old_path_is_preserved() {
4147        // #142: clip B writes `shared` and commits BEFORE clip A vacates it
4148        // (A moves shared -> a/cover.jpg). A's cleanup sees `shared` in the
4149        // committed set and keeps B's freshly written file rather than deleting
4150        // it. This is the successful-collision case the guard must still protect.
4151        let mut manifest = Manifest::new();
4152        let mut a = entry("a.mp3", AudioFormat::Mp3);
4153        a.cover_jpg = Some(ArtifactState {
4154            path: "shared/cover.jpg".to_owned(),
4155            hash: "ha".to_owned(),
4156        });
4157        manifest.insert("a", a);
4158        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4159        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4160        let plan = Plan {
4161            actions: vec![
4162                Action::WriteArtifact {
4163                    kind: ArtifactKind::CoverJpg,
4164                    path: "shared/cover.jpg".to_owned(),
4165                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4166                    hash: "hb".to_owned(),
4167                    owner_id: "b".to_owned(),
4168                    content: None,
4169                },
4170                Action::WriteArtifact {
4171                    kind: ArtifactKind::CoverJpg,
4172                    path: "a/cover.jpg".to_owned(),
4173                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4174                    hash: "ha".to_owned(),
4175                    owner_id: "a".to_owned(),
4176                    content: None,
4177                },
4178            ],
4179        };
4180        let http = ScriptedHttp::new()
4181            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()))
4182            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()));
4183
4184        let outcome = run(
4185            &plan,
4186            &mut manifest,
4187            &[],
4188            &http,
4189            &fs,
4190            &StubFfmpeg::flac(),
4191            &RecordingClock::new(),
4192            &ExecOptions::default(),
4193        );
4194
4195        assert_eq!(outcome.failed(), 0);
4196        // B's committed write survives A's subsequent move; both files are present.
4197        assert_eq!(fs.read_file("shared/cover.jpg").unwrap(), b"jpg-b");
4198        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4199        assert_eq!(
4200            manifest.get("b").unwrap().cover_jpg.as_ref().unwrap().path,
4201            "shared/cover.jpg"
4202        );
4203        assert_eq!(
4204            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4205            "a/cover.jpg"
4206        );
4207    }
4208
4209    #[test]
4210    fn cover_move_renames_without_fetching() {
4211        // #141: a MoveArtifact relocates the cover with a local rename. The
4212        // ScriptedHttp has no route, so any fetch would fail the run; a clean
4213        // outcome proves the bytes were renamed, not re-downloaded.
4214        let mut manifest = Manifest::new();
4215        let mut e = entry("a.mp3", AudioFormat::Mp3);
4216        e.cover_jpg = Some(ArtifactState {
4217            path: "old/cover.jpg".to_owned(),
4218            hash: "h".to_owned(),
4219        });
4220        manifest.insert("a", e);
4221        let fs = MemFs::new().with_file("old/cover.jpg", b"JPGBYTES".to_vec());
4222        let plan = Plan {
4223            actions: vec![Action::MoveArtifact {
4224                kind: ArtifactKind::CoverJpg,
4225                from: "old/cover.jpg".to_owned(),
4226                to: "new/cover.jpg".to_owned(),
4227                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4228                hash: "h".to_owned(),
4229                owner_id: "a".to_owned(),
4230            }],
4231        };
4232
4233        let outcome = run(
4234            &plan,
4235            &mut manifest,
4236            &[],
4237            &ScriptedHttp::new(),
4238            &fs,
4239            &StubFfmpeg::flac(),
4240            &RecordingClock::new(),
4241            &ExecOptions::default(),
4242        );
4243
4244        assert_eq!(outcome.failed(), 0);
4245        assert_eq!(outcome.renamed, 1, "counted as a rename, not a write");
4246        // Renamed in place: the new path carries the ORIGINAL bytes, old is gone.
4247        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"JPGBYTES");
4248        assert!(!fs.exists("old/cover.jpg"));
4249        assert_eq!(
4250            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4251            "new/cover.jpg"
4252        );
4253    }
4254
4255    #[test]
4256    fn cover_move_falls_back_to_fetch_when_old_file_missing() {
4257        // #141: the old file vanished before commit, so the rename fails and the
4258        // executor fetches fresh bytes at the new path rather than failing.
4259        let mut manifest = Manifest::new();
4260        let mut e = entry("a.mp3", AudioFormat::Mp3);
4261        e.cover_jpg = Some(ArtifactState {
4262            path: "old/cover.jpg".to_owned(),
4263            hash: "h".to_owned(),
4264        });
4265        manifest.insert("a", e);
4266        let fs = MemFs::new(); // old/cover.jpg is absent.
4267        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED".to_vec()));
4268        let plan = Plan {
4269            actions: vec![Action::MoveArtifact {
4270                kind: ArtifactKind::CoverJpg,
4271                from: "old/cover.jpg".to_owned(),
4272                to: "new/cover.jpg".to_owned(),
4273                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4274                hash: "h".to_owned(),
4275                owner_id: "a".to_owned(),
4276            }],
4277        };
4278
4279        let outcome = run(
4280            &plan,
4281            &mut manifest,
4282            &[],
4283            &http,
4284            &fs,
4285            &StubFfmpeg::flac(),
4286            &RecordingClock::new(),
4287            &ExecOptions::default(),
4288        );
4289
4290        assert_eq!(outcome.failed(), 0);
4291        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"FETCHED");
4292        assert_eq!(
4293            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4294            "new/cover.jpg"
4295        );
4296    }
4297
4298    #[test]
4299    fn cover_move_falls_back_when_source_co_referenced() {
4300        // Two clips' covers share old/cover.jpg after a prior failed swap. A move
4301        // for `a` must NOT rename the shared file away (that would strand `b`); it
4302        // falls back to a fetch, and `b`'s file survives.
4303        let mut manifest = Manifest::new();
4304        let mut a = entry("a.mp3", AudioFormat::Mp3);
4305        a.cover_jpg = Some(ArtifactState {
4306            path: "old/cover.jpg".to_owned(),
4307            hash: "h".to_owned(),
4308        });
4309        manifest.insert("a", a);
4310        let mut b = entry("b.mp3", AudioFormat::Mp3);
4311        b.cover_jpg = Some(ArtifactState {
4312            path: "old/cover.jpg".to_owned(),
4313            hash: "h".to_owned(),
4314        });
4315        manifest.insert("b", b);
4316        let fs = MemFs::new().with_file("old/cover.jpg", b"SHARED".to_vec());
4317        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED-A".to_vec()));
4318        // Only `a` moves this run: old/cover.jpg -> a/cover.jpg.
4319        let plan = Plan {
4320            actions: vec![Action::MoveArtifact {
4321                kind: ArtifactKind::CoverJpg,
4322                from: "old/cover.jpg".to_owned(),
4323                to: "a/cover.jpg".to_owned(),
4324                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4325                hash: "h".to_owned(),
4326                owner_id: "a".to_owned(),
4327            }],
4328        };
4329
4330        let outcome = run(
4331            &plan,
4332            &mut manifest,
4333            &[],
4334            &http,
4335            &fs,
4336            &StubFfmpeg::flac(),
4337            &RecordingClock::new(),
4338            &ExecOptions::default(),
4339        );
4340
4341        assert_eq!(outcome.failed(), 0);
4342        // `a` got a fresh fetched copy; `b`'s shared file is untouched.
4343        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"FETCHED-A");
4344        assert_eq!(
4345            fs.read_file("old/cover.jpg").unwrap(),
4346            b"SHARED",
4347            "the co-referenced file must survive"
4348        );
4349    }
4350
4351    #[test]
4352    fn stem_move_renames_without_refetch() {
4353        // #141: a MoveStem relocates the raw stem with a rename; no route is set,
4354        // so a clean outcome proves it did not re-render or re-fetch.
4355        let mut manifest = Manifest::new();
4356        let mut e = entry("a.flac", AudioFormat::Flac);
4357        e.stems.insert(
4358            "voc".to_owned(),
4359            ArtifactState {
4360                path: "old.stems/voc.mp3".to_owned(),
4361                hash: "h1".to_owned(),
4362            },
4363        );
4364        manifest.insert("a", e);
4365        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"STEMBYTES".to_vec());
4366        let plan = Plan {
4367            actions: vec![Action::MoveStem {
4368                clip_id: "a".to_owned(),
4369                key: "voc".to_owned(),
4370                stem_id: "voc".to_owned(),
4371                from: "old.stems/voc.mp3".to_owned(),
4372                to: "new.stems/voc.mp3".to_owned(),
4373                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4374                format: StemFormat::Mp3,
4375                hash: "h1".to_owned(),
4376            }],
4377        };
4378
4379        let outcome = run(
4380            &plan,
4381            &mut manifest,
4382            &[],
4383            &ScriptedHttp::new(),
4384            &fs,
4385            &StubFfmpeg::flac(),
4386            &RecordingClock::new(),
4387            &ExecOptions::default(),
4388        );
4389
4390        assert_eq!(outcome.failed(), 0);
4391        assert_eq!(outcome.renamed, 1);
4392        assert_eq!(fs.read_file("new.stems/voc.mp3").unwrap(), b"STEMBYTES");
4393        assert!(!fs.exists("old.stems/voc.mp3"));
4394        assert_eq!(
4395            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4396            "new.stems/voc.mp3"
4397        );
4398    }
4399
4400    #[test]
4401    fn stem_move_falls_back_to_fetch_when_source_co_referenced() {
4402        // Two clips' stems share shared.stems/voc.mp3 after a partially-failed
4403        // swap (the file holds `a`'s bytes). When `b` moves it, move_stem must NOT
4404        // rename the shared file under `b`'s hash (that records `a`'s bytes as
4405        // `b`'s); it falls back to a fetch of `b`'s correct bytes.
4406        let mut manifest = Manifest::new();
4407        let mut a = entry("a.flac", AudioFormat::Flac);
4408        a.stems.insert(
4409            "voc".to_owned(),
4410            ArtifactState {
4411                path: "shared.stems/voc.mp3".to_owned(),
4412                hash: "h".to_owned(),
4413            },
4414        );
4415        manifest.insert("a", a);
4416        let mut b = entry("b.flac", AudioFormat::Flac);
4417        b.stems.insert(
4418            "voc".to_owned(),
4419            ArtifactState {
4420                path: "shared.stems/voc.mp3".to_owned(),
4421                hash: "h".to_owned(),
4422            },
4423        );
4424        manifest.insert("b", b);
4425        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4426        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4427        let plan = Plan {
4428            actions: vec![Action::MoveStem {
4429                clip_id: "b".to_owned(),
4430                key: "voc".to_owned(),
4431                stem_id: "bvoc".to_owned(),
4432                from: "shared.stems/voc.mp3".to_owned(),
4433                to: "b.stems/voc.mp3".to_owned(),
4434                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4435                format: StemFormat::Mp3,
4436                hash: "h".to_owned(),
4437            }],
4438        };
4439
4440        let outcome = run(
4441            &plan,
4442            &mut manifest,
4443            &[],
4444            &http,
4445            &fs,
4446            &StubFfmpeg::flac(),
4447            &RecordingClock::new(),
4448            &ExecOptions::default(),
4449        );
4450
4451        assert_eq!(outcome.failed(), 0);
4452        // b's new stem carries b's freshly fetched bytes, never a's renamed bytes.
4453        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4454        assert_eq!(
4455            fs.read_file("shared.stems/voc.mp3").unwrap(),
4456            b"A-STEM",
4457            "the co-referenced stem must survive"
4458        );
4459    }
4460
4461    #[test]
4462    fn write_stem_keeps_shared_stem_when_co_referenced() {
4463        // Two clips share shared.stems/voc.mp3 after a prior partially-failed swap.
4464        // When `b` writes to a new path, write_stem must NOT remove the shared file;
4465        // clip `a` still references it and its stem must survive.
4466        let mut manifest = Manifest::new();
4467        let mut a = entry("a.flac", AudioFormat::Flac);
4468        a.stems.insert(
4469            "voc".to_owned(),
4470            ArtifactState {
4471                path: "shared.stems/voc.mp3".to_owned(),
4472                hash: "h".to_owned(),
4473            },
4474        );
4475        manifest.insert("a", a);
4476        let mut b = entry("b.flac", AudioFormat::Flac);
4477        b.stems.insert(
4478            "voc".to_owned(),
4479            ArtifactState {
4480                path: "shared.stems/voc.mp3".to_owned(),
4481                hash: "h".to_owned(),
4482            },
4483        );
4484        manifest.insert("b", b);
4485        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4486        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4487        let plan = Plan {
4488            actions: vec![Action::WriteStem {
4489                clip_id: "b".to_owned(),
4490                key: "voc".to_owned(),
4491                stem_id: "bvoc".to_owned(),
4492                path: "b.stems/voc.mp3".to_owned(),
4493                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4494                format: StemFormat::Mp3,
4495                hash: "bh".to_owned(),
4496            }],
4497        };
4498
4499        let outcome = run(
4500            &plan,
4501            &mut manifest,
4502            &[],
4503            &http,
4504            &fs,
4505            &StubFfmpeg::flac(),
4506            &RecordingClock::new(),
4507            &ExecOptions::default(),
4508        );
4509
4510        assert_eq!(outcome.failed(), 0);
4511        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4512        assert_eq!(
4513            fs.read_file("shared.stems/voc.mp3").unwrap(),
4514            b"A-STEM",
4515            "the co-referenced stem must survive"
4516        );
4517    }
4518
4519    #[test]
4520    fn co_delete_executes_audio_delete_then_artifact_delete() {
4521        // The plan orders the audio Delete before its sidecar DeleteArtifact.
4522        // The audio delete removes the manifest entry; the sidecar delete then
4523        // removes the file and tolerates the now-absent entry.
4524        let fs = MemFs::new()
4525            .with_file("gone.mp3", b"DATA".to_vec())
4526            .with_file("gone/cover.jpg", b"jpg".to_vec());
4527        let mut manifest = Manifest::new();
4528        let mut e = entry("gone.mp3", AudioFormat::Mp3);
4529        e.cover_jpg = Some(ArtifactState {
4530            path: "gone/cover.jpg".to_owned(),
4531            hash: "h1".to_owned(),
4532        });
4533        manifest.insert("gone", e);
4534        let plan = Plan {
4535            actions: vec![
4536                Action::Delete {
4537                    path: "gone.mp3".to_owned(),
4538                    clip_id: "gone".to_owned(),
4539                },
4540                Action::DeleteArtifact {
4541                    kind: ArtifactKind::CoverJpg,
4542                    path: "gone/cover.jpg".to_owned(),
4543                    owner_id: "gone".to_owned(),
4544                },
4545            ],
4546        };
4547
4548        let outcome = run(
4549            &plan,
4550            &mut manifest,
4551            &[],
4552            &ScriptedHttp::new(),
4553            &fs,
4554            &StubFfmpeg::flac(),
4555            &RecordingClock::new(),
4556            &ExecOptions::default(),
4557        );
4558
4559        assert_eq!(outcome.deleted, 1);
4560        assert_eq!(outcome.artifacts_deleted, 1);
4561        assert_eq!(outcome.failed(), 0);
4562        assert!(!fs.exists("gone.mp3"));
4563        assert!(!fs.exists("gone/cover.jpg"));
4564        assert!(manifest.get("gone").is_none());
4565    }
4566
4567    #[test]
4568    fn write_stem_mp3_stores_raw_and_records_slot() {
4569        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
4570        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
4571        // keyed slot records the path and hash.
4572        let mut manifest = Manifest::new();
4573        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4574        let plan = Plan {
4575            actions: vec![Action::WriteStem {
4576                clip_id: "a".to_owned(),
4577                key: "voc".to_owned(),
4578                stem_id: "voc".to_owned(),
4579                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4580                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4581                format: StemFormat::Mp3,
4582                hash: "vh".to_owned(),
4583            }],
4584        };
4585        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
4586        let fs = MemFs::new();
4587
4588        let outcome = run(
4589            &plan,
4590            &mut manifest,
4591            &[],
4592            &http,
4593            &fs,
4594            &StubFfmpeg::flac(),
4595            &RecordingClock::new(),
4596            &ExecOptions::default(),
4597        );
4598
4599        assert_eq!(outcome.artifacts_written, 1);
4600        assert_eq!(outcome.failed(), 0);
4601        // Bytes are stored exactly as delivered (no transcode applied).
4602        assert_eq!(
4603            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
4604            b"stem-bytes"
4605        );
4606        // An MP3 stem never renders WAV: no convert_wav, no generation.
4607        assert_eq!(http.count("convert_wav"), 0);
4608        assert_eq!(http.count("/api/gen/"), 0);
4609        assert_eq!(
4610            manifest.get("a").unwrap().stems.get("voc"),
4611            Some(&ArtifactState {
4612                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4613                hash: "vh".to_owned(),
4614            })
4615        );
4616    }
4617
4618    #[test]
4619    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
4620        // A WAV stem (the default) renders the stem clip's lossless WAV through the
4621        // free convert_wav flow keyed on the stem id, then downloads and stores it
4622        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
4623        let mut manifest = Manifest::new();
4624        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4625        let plan = Plan {
4626            actions: vec![Action::WriteStem {
4627                clip_id: "a".to_owned(),
4628                key: "voc".to_owned(),
4629                stem_id: "stemvoc".to_owned(),
4630                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
4631                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
4632                format: StemFormat::Wav,
4633                hash: "vh".to_owned(),
4634            }],
4635        };
4636        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
4637        // (free) and polls again — exactly the main FLAC/WAV render path.
4638        let http = ScriptedHttp::new()
4639            .with_auth()
4640            .route_seq(
4641                "stemvoc/wav_file/",
4642                vec![
4643                    Reply::json("{}"),
4644                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
4645                ],
4646            )
4647            .route("stemvoc/convert_wav/", Reply::status(200))
4648            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
4649        let fs = MemFs::new();
4650
4651        let outcome = run(
4652            &plan,
4653            &mut manifest,
4654            &[],
4655            &http,
4656            &fs,
4657            &StubFfmpeg::flac(),
4658            &RecordingClock::new(),
4659            &small_poll(),
4660        );
4661
4662        assert_eq!(outcome.artifacts_written, 1);
4663        assert_eq!(outcome.failed(), 0);
4664        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
4665        // so the stored bytes are the raw WAV, not a FLAC transcode.
4666        assert_eq!(
4667            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
4668            b"RIFFwav-bytes"
4669        );
4670        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
4671        // The free WAV render ran; no credit-spending generation endpoint did.
4672        assert_eq!(http.count("convert_wav"), 1);
4673        assert_eq!(http.count("stem_task"), 0);
4674        assert_eq!(http.count("separate"), 0);
4675        assert_eq!(
4676            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4677            "a.stems/a - Vocals [stemvoc].wav"
4678        );
4679    }
4680
4681    #[test]
4682    fn write_stem_is_skipped_when_owner_audio_is_absent() {
4683        // No owning manifest entry (audio failed or never existed) => skip with
4684        // no fetch and no write, so a stem is never stranded without its song.
4685        let mut manifest = Manifest::new();
4686        let plan = Plan {
4687            actions: vec![Action::WriteStem {
4688                clip_id: "ghost".to_owned(),
4689                key: "voc".to_owned(),
4690                stem_id: "voc".to_owned(),
4691                path: "ghost.stems/voc.mp3".to_owned(),
4692                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4693                format: StemFormat::Mp3,
4694                hash: "vh".to_owned(),
4695            }],
4696        };
4697        // Empty HTTP script: any fetch would error, proving none happens.
4698        let http = ScriptedHttp::new();
4699        let fs = MemFs::new();
4700
4701        let outcome = run(
4702            &plan,
4703            &mut manifest,
4704            &[],
4705            &http,
4706            &fs,
4707            &StubFfmpeg::flac(),
4708            &RecordingClock::new(),
4709            &ExecOptions::default(),
4710        );
4711
4712        assert_eq!(outcome.skipped, 1);
4713        assert_eq!(outcome.artifacts_written, 0);
4714        assert_eq!(outcome.failed(), 0);
4715        assert!(!fs.exists("ghost.stems/voc.mp3"));
4716    }
4717
4718    #[test]
4719    fn write_stem_relocates_the_old_file_on_a_path_move() {
4720        // The song was renamed, so the stem moves: the new file is written and the
4721        // stale copy at the previously tracked path is removed (moved, not orphaned).
4722        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
4723        let mut manifest = Manifest::new();
4724        let mut e = entry("new.flac", AudioFormat::Flac);
4725        e.stems.insert(
4726            "voc".to_owned(),
4727            ArtifactState {
4728                path: "old.stems/voc.mp3".to_owned(),
4729                hash: "vh".to_owned(),
4730            },
4731        );
4732        manifest.insert("a", e);
4733        let plan = Plan {
4734            actions: vec![Action::WriteStem {
4735                clip_id: "a".to_owned(),
4736                key: "voc".to_owned(),
4737                stem_id: "voc".to_owned(),
4738                path: "new.stems/voc.mp3".to_owned(),
4739                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4740                format: StemFormat::Mp3,
4741                hash: "vh".to_owned(),
4742            }],
4743        };
4744        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4745
4746        let outcome = run(
4747            &plan,
4748            &mut manifest,
4749            &[],
4750            &http,
4751            &fs,
4752            &StubFfmpeg::flac(),
4753            &RecordingClock::new(),
4754            &ExecOptions::default(),
4755        );
4756
4757        assert_eq!(outcome.artifacts_written, 1);
4758        assert!(fs.exists("new.stems/voc.mp3"));
4759        assert!(
4760            !fs.exists("old.stems/voc.mp3"),
4761            "the old stem is moved, not left behind"
4762        );
4763        assert_eq!(
4764            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4765            "new.stems/voc.mp3"
4766        );
4767    }
4768
4769    #[test]
4770    fn delete_stem_removes_file_and_clears_slot() {
4771        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4772        let mut manifest = Manifest::new();
4773        let mut e = entry("a.flac", AudioFormat::Flac);
4774        e.stems.insert(
4775            "voc".to_owned(),
4776            ArtifactState {
4777                path: "a.stems/voc.mp3".to_owned(),
4778                hash: "vh".to_owned(),
4779            },
4780        );
4781        manifest.insert("a", e);
4782        let plan = Plan {
4783            actions: vec![Action::DeleteStem {
4784                clip_id: "a".to_owned(),
4785                key: "voc".to_owned(),
4786                path: "a.stems/voc.mp3".to_owned(),
4787            }],
4788        };
4789
4790        let outcome = run(
4791            &plan,
4792            &mut manifest,
4793            &[],
4794            &ScriptedHttp::new(),
4795            &fs,
4796            &StubFfmpeg::flac(),
4797            &RecordingClock::new(),
4798            &ExecOptions::default(),
4799        );
4800
4801        assert_eq!(outcome.artifacts_deleted, 1);
4802        assert!(!fs.exists("a.stems/voc.mp3"));
4803        assert!(manifest.get("a").unwrap().stems.is_empty());
4804    }
4805
4806    #[test]
4807    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4808        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4809        // pruned by the end-of-run sweep, so it can never be orphaned.
4810        let fs = MemFs::new()
4811            .with_file("song.flac", b"DATA".to_vec())
4812            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4813        assert!(fs.has_dir("song.stems"));
4814        let mut manifest = Manifest::new();
4815        let mut e = entry("song.flac", AudioFormat::Flac);
4816        e.stems.insert(
4817            "voc".to_owned(),
4818            ArtifactState {
4819                path: "song.stems/voc.mp3".to_owned(),
4820                hash: "vh".to_owned(),
4821            },
4822        );
4823        manifest.insert("a", e);
4824        let plan = Plan {
4825            actions: vec![
4826                Action::Delete {
4827                    path: "song.flac".to_owned(),
4828                    clip_id: "a".to_owned(),
4829                },
4830                Action::DeleteStem {
4831                    clip_id: "a".to_owned(),
4832                    key: "voc".to_owned(),
4833                    path: "song.stems/voc.mp3".to_owned(),
4834                },
4835            ],
4836        };
4837
4838        let outcome = run(
4839            &plan,
4840            &mut manifest,
4841            &[],
4842            &ScriptedHttp::new(),
4843            &fs,
4844            &StubFfmpeg::flac(),
4845            &RecordingClock::new(),
4846            &ExecOptions::default(),
4847        );
4848
4849        assert_eq!(outcome.deleted, 1);
4850        assert_eq!(outcome.artifacts_deleted, 1);
4851        assert!(!fs.exists("song.flac"));
4852        assert!(!fs.exists("song.stems/voc.mp3"));
4853        assert!(
4854            !fs.has_dir("song.stems"),
4855            "the emptied .stems folder is pruned"
4856        );
4857        assert!(manifest.get("a").is_none());
4858    }
4859
4860    #[test]
4861    fn write_stem_mp3_never_issues_a_generation_post() {
4862        // The MP3 stem path is GET-only: writing a stem fetches its CDN url and
4863        // never POSTs, let alone to any generation or WAV-render endpoint.
4864        let mut manifest = Manifest::new();
4865        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4866        let plan = Plan {
4867            actions: vec![Action::WriteStem {
4868                clip_id: "a".to_owned(),
4869                key: "voc".to_owned(),
4870                stem_id: "voc".to_owned(),
4871                path: "a.stems/voc.mp3".to_owned(),
4872                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4873                format: StemFormat::Mp3,
4874                hash: "vh".to_owned(),
4875            }],
4876        };
4877        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem".to_vec()));
4878
4879        run(
4880            &plan,
4881            &mut manifest,
4882            &[],
4883            &http,
4884            &MemFs::new(),
4885            &StubFfmpeg::flac(),
4886            &RecordingClock::new(),
4887            &ExecOptions::default(),
4888        );
4889
4890        assert_eq!(
4891            http.count("stem_task"),
4892            0,
4893            "no generation endpoint is ever hit"
4894        );
4895        assert_eq!(http.count("convert_wav"), 0);
4896        assert_eq!(http.count("/api/gen/"), 0);
4897    }
4898
4899    #[test]
4900    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4901        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4902        // GET over the live page-count + 0-indexed page shape), reconcile them into
4903        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4904        // is GET-only and touches NO `/api/gen/` endpoint at all.
4905        let http = ScriptedHttp::new()
4906            .with_auth()
4907            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4908            .route(
4909                "clip1/stems?page=0",
4910                Reply::json(
4911                    r#"{"stems":[
4912                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4913                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4914                    ]}"#,
4915                ),
4916            )
4917            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
4918            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
4919
4920        // List the existing stems through the client (GET-only, free).
4921        let mut auth = ClerkAuth::new("eyJtoken");
4922        pollster::block_on(auth.authenticate(&http)).unwrap();
4923        let mut client = SunoClient::new(auth, RecordingClock::new());
4924        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4925        assert!(complete);
4926        assert_eq!(stems.len(), 2);
4927        assert_eq!(stems[0].label, "Vocals");
4928
4929        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
4930        let mut manifest = Manifest::new();
4931        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4932        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4933            .iter()
4934            .map(|s| crate::reconcile::DesiredStem {
4935                key: s.id.clone(),
4936                stem_id: s.id.clone(),
4937                path: format!("clip1.stems/{}.mp3", s.id),
4938                source_url: s.url.clone(),
4939                format: StemFormat::Mp3,
4940                hash: crate::art_url_hash(&s.url),
4941            })
4942            .collect();
4943        let d = Desired {
4944            path: "clip1.flac".to_owned(),
4945            stems: Some(desired_stems),
4946            ..desired(clip("clip1"), AudioFormat::Flac)
4947        };
4948        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4949            "clip1".to_owned(),
4950            crate::reconcile::LocalFile {
4951                exists: true,
4952                size: 100,
4953            },
4954        )]
4955        .into_iter()
4956        .collect();
4957        let sources = [crate::reconcile::SourceStatus {
4958            mode: SourceMode::Mirror,
4959            fully_enumerated: true,
4960        }];
4961        let plan =
4962            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4963        assert_eq!(plan.stem_writes(), 2);
4964
4965        let fs = MemFs::new();
4966        let outcome = run(
4967            &plan,
4968            &mut manifest,
4969            std::slice::from_ref(&d),
4970            &http,
4971            &fs,
4972            &StubFfmpeg::flac(),
4973            &RecordingClock::new(),
4974            &ExecOptions::default(),
4975        );
4976
4977        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
4978        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
4979        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
4980        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
4981        // generation, no separation).
4982        assert_eq!(http.count("/api/gen/"), 0);
4983        assert_eq!(http.count("stem_task"), 0);
4984        assert_eq!(http.count("separate"), 0);
4985        assert_eq!(http.count("generate"), 0);
4986        // No stem is ever written as FLAC.
4987        assert!(!fs.exists("clip1.stems/s1.flac"));
4988    }
4989
4990    #[test]
4991    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
4992        // End-to-end #100 path with WAV stems (the default): each stem's lossless
4993        // WAV is rendered through the FREE convert_wav flow and stored RAW as
4994        // `.wav`. The mirror makes NO credit-spending generation POST.
4995        let http = ScriptedHttp::new()
4996            .with_auth()
4997            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4998            .route(
4999                "clip1/stems?page=0",
5000                Reply::json(
5001                    r#"{"stems":[
5002                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
5003                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
5004                    ]}"#,
5005                ),
5006            )
5007            // Each stem's WAV is already rendered, so wav_file returns the url and
5008            // no convert_wav POST is even needed (still free either way).
5009            .route(
5010                "s1/wav_file/",
5011                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
5012            )
5013            .route(
5014                "s2/wav_file/",
5015                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
5016            )
5017            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
5018            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
5019
5020        let mut auth = ClerkAuth::new("eyJtoken");
5021        pollster::block_on(auth.authenticate(&http)).unwrap();
5022        let mut client = SunoClient::new(auth, RecordingClock::new());
5023        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
5024
5025        let mut manifest = Manifest::new();
5026        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
5027        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
5028            .iter()
5029            .map(|s| crate::reconcile::DesiredStem {
5030                key: s.id.clone(),
5031                stem_id: s.id.clone(),
5032                path: format!("clip1.stems/{}.wav", s.id),
5033                source_url: s.url.clone(),
5034                format: StemFormat::Wav,
5035                hash: crate::art_url_hash(&s.url),
5036            })
5037            .collect();
5038        let d = Desired {
5039            path: "clip1.flac".to_owned(),
5040            stems: Some(desired_stems),
5041            ..desired(clip("clip1"), AudioFormat::Flac)
5042        };
5043        let local: HashMap<String, crate::reconcile::LocalFile> = [(
5044            "clip1".to_owned(),
5045            crate::reconcile::LocalFile {
5046                exists: true,
5047                size: 100,
5048            },
5049        )]
5050        .into_iter()
5051        .collect();
5052        let sources = [crate::reconcile::SourceStatus {
5053            mode: SourceMode::Mirror,
5054            fully_enumerated: true,
5055        }];
5056        let plan =
5057            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
5058
5059        let fs = MemFs::new();
5060        let outcome = run(
5061            &plan,
5062            &mut manifest,
5063            std::slice::from_ref(&d),
5064            &http,
5065            &fs,
5066            &StubFfmpeg::flac(),
5067            &RecordingClock::new(),
5068            &small_poll(),
5069        );
5070
5071        assert_eq!(outcome.artifacts_written, 2);
5072        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
5073        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
5074        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
5075        assert!(!fs.exists("clip1.stems/s1.flac"));
5076        // No credit-spending generation/separation endpoint is ever hit.
5077        assert_eq!(http.count("stem_task"), 0);
5078        assert_eq!(http.count("separate"), 0);
5079        assert_eq!(http.count("generate"), 0);
5080    }
5081
5082    #[test]
5083    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
5084        // A clip whose Download fails leaves no manifest entry, so its following
5085        // WriteArtifact must not strand an untracked sidecar: it is skipped with
5086        // no fetch and no write. A following healthy clip still succeeds.
5087        let ca = clip("a");
5088        let plan = Plan {
5089            actions: vec![
5090                Action::Download {
5091                    clip: ca.clone(),
5092                    lineage: LineageContext::own_root(&ca),
5093                    path: "a.mp3".to_owned(),
5094                    format: AudioFormat::Mp3,
5095                },
5096                Action::WriteArtifact {
5097                    kind: ArtifactKind::CoverJpg,
5098                    path: "a/cover.jpg".to_owned(),
5099                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5100                    hash: "h1".to_owned(),
5101                    owner_id: "a".to_owned(),
5102                    content: None,
5103                },
5104                Action::WriteArtifact {
5105                    kind: ArtifactKind::CoverJpg,
5106                    path: "b/cover.jpg".to_owned(),
5107                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5108                    hash: "h2".to_owned(),
5109                    owner_id: "b".to_owned(),
5110                    content: None,
5111                },
5112            ],
5113        };
5114        // The Download's audio 404s (permanent), so no entry for "a" is created.
5115        let http = ScriptedHttp::new()
5116            .route("a.mp3", Reply::status(404))
5117            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
5118            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5119        let fs = MemFs::new();
5120        let mut manifest = Manifest::new();
5121        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
5122        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5123
5124        let outcome = run(
5125            &plan,
5126            &mut manifest,
5127            &[],
5128            &http,
5129            &fs,
5130            &StubFfmpeg::flac(),
5131            &RecordingClock::new(),
5132            &ExecOptions::default(),
5133        );
5134
5135        assert_eq!(outcome.status, RunStatus::Completed);
5136        // The audio download is the only failure; the orphan artifact is skipped.
5137        assert_eq!(outcome.failed(), 1);
5138        assert_eq!(outcome.failures[0].clip_id, "a");
5139        assert_eq!(outcome.skipped, 1);
5140        // The orphan sidecar was neither fetched nor written, and left no record.
5141        assert_eq!(http.count("a/large.jpg"), 0);
5142        assert!(!fs.exists("a/cover.jpg"));
5143        assert!(manifest.get("a").is_none());
5144        // The healthy clip's sidecar still succeeded.
5145        assert_eq!(outcome.artifacts_written, 1);
5146        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5147        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5148    }
5149
5150    #[test]
5151    fn write_artifact_transcodes_animated_cover_to_webp() {
5152        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
5153        // port, and writes the transcoded WebP (not the fetched MP4), recording
5154        // the sidecar on the owning entry.
5155        let mut manifest = Manifest::new();
5156        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5157        let plan = Plan {
5158            actions: vec![Action::WriteArtifact {
5159                kind: ArtifactKind::CoverWebp,
5160                path: "a/cover.webp".to_owned(),
5161                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5162                hash: "v1".to_owned(),
5163                owner_id: "a".to_owned(),
5164                content: None,
5165            }],
5166        };
5167        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5168        let fs = MemFs::new();
5169        let ffmpeg = StubFfmpeg::webp();
5170
5171        let outcome = run(
5172            &plan,
5173            &mut manifest,
5174            &[],
5175            &http,
5176            &fs,
5177            &ffmpeg,
5178            &RecordingClock::new(),
5179            &ExecOptions::default(),
5180        );
5181
5182        assert_eq!(outcome.artifacts_written, 1);
5183        assert_eq!(outcome.failed(), 0);
5184        assert_eq!(outcome.status, RunStatus::Completed);
5185        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
5186        assert_eq!(http.count("a/video.mp4"), 1);
5187        let written = fs.read_file("a/cover.webp").unwrap();
5188        assert_ne!(written, b"mp4-bytes");
5189        assert!(written.starts_with(b"RIFF"));
5190        assert_eq!(
5191            manifest.get("a").unwrap().cover_webp,
5192            Some(ArtifactState {
5193                path: "a/cover.webp".to_owned(),
5194                hash: "v1".to_owned(),
5195            })
5196        );
5197    }
5198
5199    #[test]
5200    fn write_artifact_webp_transcode_failure_is_per_clip() {
5201        // A transcode failure is attributed to the owning clip: it is a per-clip
5202        // failure, the run completes, no sidecar is written, and the slot stays
5203        // empty. A healthy static cover in the same run still succeeds.
5204        let mut manifest = Manifest::new();
5205        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5206        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5207        let plan = Plan {
5208            actions: vec![
5209                Action::WriteArtifact {
5210                    kind: ArtifactKind::CoverWebp,
5211                    path: "a/cover.webp".to_owned(),
5212                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5213                    hash: "v1".to_owned(),
5214                    owner_id: "a".to_owned(),
5215                    content: None,
5216                },
5217                Action::WriteArtifact {
5218                    kind: ArtifactKind::CoverJpg,
5219                    path: "b/cover.jpg".to_owned(),
5220                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5221                    hash: "h1".to_owned(),
5222                    owner_id: "b".to_owned(),
5223                    content: None,
5224                },
5225            ],
5226        };
5227        let http = ScriptedHttp::new()
5228            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
5229            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5230        let fs = MemFs::new();
5231
5232        let outcome = run(
5233            &plan,
5234            &mut manifest,
5235            &[],
5236            &http,
5237            &fs,
5238            &StubFfmpeg::failing(),
5239            &RecordingClock::new(),
5240            &ExecOptions::default(),
5241        );
5242
5243        assert_eq!(outcome.status, RunStatus::Completed);
5244        assert_eq!(outcome.failed(), 1);
5245        assert_eq!(outcome.failures[0].clip_id, "a");
5246        // The animated cover failed to transcode: nothing written, slot empty.
5247        assert!(!fs.exists("a/cover.webp"));
5248        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
5249        // The static cover in the same run still succeeded.
5250        assert_eq!(outcome.artifacts_written, 1);
5251        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5252        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5253    }
5254
5255    #[test]
5256    fn write_artifact_uses_configured_webp_settings() {
5257        use std::sync::{Arc, Mutex};
5258
5259        struct RecordingWebpFfmpeg {
5260            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
5261        }
5262
5263        impl Ffmpeg for RecordingWebpFfmpeg {
5264            async fn wav_to_flac(
5265                &self,
5266                _wav: &[u8],
5267            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5268                Ok(Vec::new())
5269            }
5270
5271            async fn mp4_to_webp(
5272                &self,
5273                _mp4: &[u8],
5274                settings: WebpEncodeSettings,
5275            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5276                let seen = Arc::clone(&self.seen);
5277                seen.lock().unwrap().push(settings);
5278                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
5279            }
5280        }
5281
5282        let mut manifest = Manifest::new();
5283        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5284        let plan = Plan {
5285            actions: vec![Action::WriteArtifact {
5286                kind: ArtifactKind::CoverWebp,
5287                path: "a/cover.webp".to_owned(),
5288                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5289                hash: "v1".to_owned(),
5290                owner_id: "a".to_owned(),
5291                content: None,
5292            }],
5293        };
5294        let seen = Arc::new(Mutex::new(Vec::new()));
5295        let ffmpeg = RecordingWebpFfmpeg {
5296            seen: Arc::clone(&seen),
5297        };
5298        let opts = ExecOptions {
5299            cover_webp: WebpEncodeSettings {
5300                quality: 88,
5301                max_fps: 12,
5302                max_width: Some(720),
5303                lossless: false,
5304                compression_level: 4,
5305            },
5306            ..ExecOptions::default()
5307        };
5308
5309        let _ = run(
5310            &plan,
5311            &mut manifest,
5312            &[],
5313            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
5314            &MemFs::new(),
5315            &ffmpeg,
5316            &RecordingClock::new(),
5317            &opts,
5318        );
5319
5320        assert_eq!(
5321            seen.lock().unwrap().as_slice(),
5322            &[WebpEncodeSettings {
5323                quality: 88,
5324                max_fps: 12,
5325                max_width: Some(720),
5326                lossless: false,
5327                compression_level: 4,
5328            }]
5329        );
5330    }
5331
5332    // ── Phase 8: folder art routes to the album store ───────────────
5333
5334    #[test]
5335    fn folder_jpg_write_records_album_state_and_skips_manifest() {
5336        // Folder art is owned by the album root id, not a manifest clip: it
5337        // writes even with an empty manifest and records on the album store.
5338        let mut manifest = Manifest::new();
5339        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5340        let plan = Plan {
5341            actions: vec![Action::WriteArtifact {
5342                kind: ArtifactKind::FolderJpg,
5343                path: "creator/album/folder.jpg".to_owned(),
5344                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5345                hash: "jh".to_owned(),
5346                owner_id: "root".to_owned(),
5347                content: None,
5348            }],
5349        };
5350        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
5351        let fs = MemFs::new();
5352
5353        let outcome = run_with_albums(
5354            &plan,
5355            &mut manifest,
5356            &mut albums,
5357            &[],
5358            &http,
5359            &fs,
5360            &StubFfmpeg::flac(),
5361            &RecordingClock::new(),
5362            &ExecOptions::default(),
5363        );
5364
5365        assert_eq!(outcome.artifacts_written, 1);
5366        assert_eq!(outcome.status, RunStatus::Completed);
5367        assert_eq!(
5368            fs.read_file("creator/album/folder.jpg").unwrap(),
5369            b"folder-jpg"
5370        );
5371        assert_eq!(
5372            albums.get("root").unwrap().folder_jpg,
5373            Some(ArtifactState {
5374                path: "creator/album/folder.jpg".to_owned(),
5375                hash: "jh".to_owned(),
5376            })
5377        );
5378        assert!(manifest.get("root").is_none());
5379    }
5380
5381    #[test]
5382    fn folder_webp_write_transcodes_and_records_album_state() {
5383        let mut manifest = Manifest::new();
5384        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5385        let plan = Plan {
5386            actions: vec![Action::WriteArtifact {
5387                kind: ArtifactKind::FolderWebp,
5388                path: "creator/album/cover.webp".to_owned(),
5389                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5390                hash: "wh".to_owned(),
5391                owner_id: "root".to_owned(),
5392                content: None,
5393            }],
5394        };
5395        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5396        let fs = MemFs::new();
5397
5398        let outcome = run_with_albums(
5399            &plan,
5400            &mut manifest,
5401            &mut albums,
5402            &[],
5403            &http,
5404            &fs,
5405            &StubFfmpeg::webp(),
5406            &RecordingClock::new(),
5407            &ExecOptions::default(),
5408        );
5409
5410        assert_eq!(outcome.artifacts_written, 1);
5411        assert_eq!(outcome.failed(), 0);
5412        // The MP4 was transcoded to WebP, not written verbatim.
5413        let written = fs.read_file("creator/album/cover.webp").unwrap();
5414        assert_ne!(written, b"mp4-bytes");
5415        assert!(written.starts_with(b"RIFF"));
5416        assert_eq!(
5417            albums.get("root").unwrap().folder_webp,
5418            Some(ArtifactState {
5419                path: "creator/album/cover.webp".to_owned(),
5420                hash: "wh".to_owned(),
5421            })
5422        );
5423    }
5424
5425    #[test]
5426    fn folder_mp4_write_keeps_the_source_verbatim() {
5427        let mut manifest = Manifest::new();
5428        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5429        let plan = Plan {
5430            actions: vec![Action::WriteArtifact {
5431                kind: ArtifactKind::FolderMp4,
5432                path: "creator/album/cover.mp4".to_owned(),
5433                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5434                hash: "mh".to_owned(),
5435                owner_id: "root".to_owned(),
5436                content: None,
5437            }],
5438        };
5439        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5440        let fs = MemFs::new();
5441
5442        let outcome = run_with_albums(
5443            &plan,
5444            &mut manifest,
5445            &mut albums,
5446            &[],
5447            &http,
5448            &fs,
5449            &StubFfmpeg::webp(),
5450            &RecordingClock::new(),
5451            &ExecOptions::default(),
5452        );
5453
5454        assert_eq!(outcome.artifacts_written, 1);
5455        assert_eq!(outcome.failed(), 0);
5456        // The raw MP4 is written byte-for-byte, never transcoded.
5457        assert_eq!(
5458            fs.read_file("creator/album/cover.mp4").unwrap(),
5459            b"mp4-bytes"
5460        );
5461        assert_eq!(
5462            albums.get("root").unwrap().folder_mp4,
5463            Some(ArtifactState {
5464                path: "creator/album/cover.mp4".to_owned(),
5465                hash: "mh".to_owned(),
5466            })
5467        );
5468    }
5469
5470    #[test]
5471    fn both_folder_covers_fetch_the_video_cover_once() {
5472        let mut manifest = Manifest::new();
5473        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5474        // `both` retention keeps cover.webp (transcoded) and cover.mp4 (raw) from
5475        // the one video_cover_url. FolderWebp sorts first and caches the fetched
5476        // source; FolderMp4 drains it, so the source is fetched exactly once.
5477        let plan = Plan {
5478            actions: vec![
5479                Action::WriteArtifact {
5480                    kind: ArtifactKind::FolderWebp,
5481                    path: "creator/album/cover.webp".to_owned(),
5482                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5483                    hash: "wh".to_owned(),
5484                    owner_id: "root".to_owned(),
5485                    content: None,
5486                },
5487                Action::WriteArtifact {
5488                    kind: ArtifactKind::FolderMp4,
5489                    path: "creator/album/cover.mp4".to_owned(),
5490                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5491                    hash: "mh".to_owned(),
5492                    owner_id: "root".to_owned(),
5493                    content: None,
5494                },
5495            ],
5496        };
5497        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5498        let fs = MemFs::new();
5499
5500        let outcome = run_with_albums(
5501            &plan,
5502            &mut manifest,
5503            &mut albums,
5504            &[],
5505            &http,
5506            &fs,
5507            &StubFfmpeg::webp(),
5508            &RecordingClock::new(),
5509            &ExecOptions::default(),
5510        );
5511
5512        assert_eq!(outcome.artifacts_written, 2);
5513        assert_eq!(outcome.failed(), 0);
5514        // Fetched exactly once despite two artifacts consuming it (#90 / #89).
5515        assert_eq!(http.count("root/video.mp4"), 1);
5516        // The webp is transcoded; the mp4 is the raw source verbatim.
5517        assert!(
5518            fs.read_file("creator/album/cover.webp")
5519                .unwrap()
5520                .starts_with(b"RIFF")
5521        );
5522        assert_eq!(
5523            fs.read_file("creator/album/cover.mp4").unwrap(),
5524            b"mp4-bytes"
5525        );
5526    }
5527
5528    #[test]
5529    fn folder_art_delete_clears_album_state() {
5530        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
5531        let mut manifest = Manifest::new();
5532        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5533        albums.insert(
5534            "root".to_owned(),
5535            AlbumArt {
5536                folder_jpg: Some(ArtifactState {
5537                    path: "creator/album/folder.jpg".to_owned(),
5538                    hash: "jh".to_owned(),
5539                }),
5540                folder_webp: None,
5541                folder_mp4: None,
5542            },
5543        );
5544        let plan = Plan {
5545            actions: vec![Action::DeleteArtifact {
5546                kind: ArtifactKind::FolderJpg,
5547                path: "creator/album/folder.jpg".to_owned(),
5548                owner_id: "root".to_owned(),
5549            }],
5550        };
5551
5552        let outcome = run_with_albums(
5553            &plan,
5554            &mut manifest,
5555            &mut albums,
5556            &[],
5557            &ScriptedHttp::new(),
5558            &fs,
5559            &StubFfmpeg::flac(),
5560            &RecordingClock::new(),
5561            &ExecOptions::default(),
5562        );
5563
5564        assert_eq!(outcome.artifacts_deleted, 1);
5565        assert!(!fs.exists("creator/album/folder.jpg"));
5566        // The album row had only the one kind, so it is pruned entirely.
5567        assert!(!albums.contains_key("root"));
5568    }
5569
5570    // ── Phase 9: playlist artifacts ─────────────────────────────────
5571
5572    #[test]
5573    fn playlist_write_uses_inline_content_and_records_state() {
5574        // A playlist body is generated, carried inline. With an empty manifest
5575        // and NO http routes, the write still succeeds — proving it skipped the
5576        // network — and records the playlist store keyed by the playlist id.
5577        let mut manifest = Manifest::new();
5578        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5579        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5580        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
5581        let plan = Plan {
5582            actions: vec![Action::WriteArtifact {
5583                kind: ArtifactKind::Playlist,
5584                path: "Road Trip.m3u8".to_owned(),
5585                source_url: String::new(),
5586                hash: "ph1".to_owned(),
5587                owner_id: "pl1".to_owned(),
5588                content: Some(body.to_owned()),
5589            }],
5590        };
5591        let fs = MemFs::new();
5592
5593        let outcome = run_full(
5594            &plan,
5595            &mut manifest,
5596            &mut albums,
5597            &mut playlists,
5598            &[],
5599            &ScriptedHttp::new(),
5600            &fs,
5601            &StubFfmpeg::flac(),
5602            &RecordingClock::new(),
5603            &ExecOptions::default(),
5604        );
5605
5606        assert_eq!(outcome.artifacts_written, 1);
5607        assert_eq!(outcome.failed(), 0);
5608        // The exact inline bytes were written, verbatim.
5609        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
5610        assert_eq!(
5611            playlists.get("pl1"),
5612            Some(&PlaylistState {
5613                name: "Road Trip".to_owned(),
5614                path: "Road Trip.m3u8".to_owned(),
5615                hash: "ph1".to_owned(),
5616            })
5617        );
5618    }
5619
5620    #[test]
5621    fn playlist_delete_removes_file_and_clears_state() {
5622        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
5623        let mut manifest = Manifest::new();
5624        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5625        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5626        playlists.insert(
5627            "pl1".to_owned(),
5628            PlaylistState {
5629                name: "Old".to_owned(),
5630                path: "Old.m3u8".to_owned(),
5631                hash: "ph1".to_owned(),
5632            },
5633        );
5634        let plan = Plan {
5635            actions: vec![Action::DeleteArtifact {
5636                kind: ArtifactKind::Playlist,
5637                path: "Old.m3u8".to_owned(),
5638                owner_id: "pl1".to_owned(),
5639            }],
5640        };
5641
5642        let outcome = run_full(
5643            &plan,
5644            &mut manifest,
5645            &mut albums,
5646            &mut playlists,
5647            &[],
5648            &ScriptedHttp::new(),
5649            &fs,
5650            &StubFfmpeg::flac(),
5651            &RecordingClock::new(),
5652            &ExecOptions::default(),
5653        );
5654
5655        assert_eq!(outcome.artifacts_deleted, 1);
5656        assert!(!fs.exists("Old.m3u8"));
5657        assert!(
5658            !playlists.contains_key("pl1"),
5659            "the playlist row is cleared on delete"
5660        );
5661    }
5662
5663    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
5664
5665    #[test]
5666    fn rename_move_relocates_cover_and_prunes_old_album() {
5667        // A title/album change moves the audio (Rename) and re-emits the cover
5668        // at the NEW path. The old cover must be removed and the now-empty old
5669        // album directory pruned, leaving no orphan sidecar and no ghost dir.
5670        let mut manifest = Manifest::new();
5671        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
5672        e.cover_jpg = Some(ArtifactState {
5673            path: "Creator/AlbumA/cover.jpg".to_owned(),
5674            hash: "h1".to_owned(),
5675        });
5676        manifest.insert("a", e);
5677        let fs = MemFs::new()
5678            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
5679            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
5680        let plan = Plan {
5681            actions: vec![
5682                Action::Rename {
5683                    from: "Creator/AlbumA/song.flac".to_owned(),
5684                    to: "Creator/AlbumB/song.flac".to_owned(),
5685                },
5686                Action::WriteArtifact {
5687                    kind: ArtifactKind::CoverJpg,
5688                    path: "Creator/AlbumB/cover.jpg".to_owned(),
5689                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5690                    hash: "h1".to_owned(),
5691                    owner_id: "a".to_owned(),
5692                    content: None,
5693                },
5694            ],
5695        };
5696        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
5697
5698        let outcome = run(
5699            &plan,
5700            &mut manifest,
5701            &[],
5702            &http,
5703            &fs,
5704            &StubFfmpeg::flac(),
5705            &RecordingClock::new(),
5706            &ExecOptions::default(),
5707        );
5708
5709        assert_eq!(outcome.failed(), 0);
5710        // Audio moved, the new cover was written, the old cover removed.
5711        assert!(fs.exists("Creator/AlbumB/song.flac"));
5712        assert_eq!(
5713            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
5714            b"new-jpg"
5715        );
5716        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
5717        assert!(!fs.exists("Creator/AlbumA/song.flac"));
5718        // The manifest cover slot now points at the new path.
5719        assert_eq!(
5720            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5721            "Creator/AlbumB/cover.jpg"
5722        );
5723        // The emptied old album directory is pruned; the new one survives.
5724        assert!(!fs.has_dir("Creator/AlbumA"));
5725        assert!(fs.has_dir("Creator/AlbumB"));
5726    }
5727
5728    #[test]
5729    fn rename_move_relocates_folder_art_and_prunes_old_album() {
5730        // An album rename moves folder.jpg: the old file is removed, the album
5731        // store slot advanced to the new path, and the emptied dir pruned.
5732        let mut manifest = Manifest::new();
5733        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5734        albums.insert(
5735            "root".to_owned(),
5736            AlbumArt {
5737                folder_jpg: Some(ArtifactState {
5738                    path: "Creator/AlbumA/folder.jpg".to_owned(),
5739                    hash: "jh".to_owned(),
5740                }),
5741                folder_webp: None,
5742                folder_mp4: None,
5743            },
5744        );
5745        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
5746        let plan = Plan {
5747            actions: vec![Action::WriteArtifact {
5748                kind: ArtifactKind::FolderJpg,
5749                path: "Creator/AlbumB/folder.jpg".to_owned(),
5750                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5751                hash: "jh".to_owned(),
5752                owner_id: "root".to_owned(),
5753                content: None,
5754            }],
5755        };
5756        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
5757
5758        let outcome = run_with_albums(
5759            &plan,
5760            &mut manifest,
5761            &mut albums,
5762            &[],
5763            &http,
5764            &fs,
5765            &StubFfmpeg::flac(),
5766            &RecordingClock::new(),
5767            &ExecOptions::default(),
5768        );
5769
5770        assert_eq!(outcome.failed(), 0);
5771        assert_eq!(
5772            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
5773            b"new-folder"
5774        );
5775        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
5776        assert_eq!(
5777            albums
5778                .get("root")
5779                .unwrap()
5780                .folder_jpg
5781                .as_ref()
5782                .unwrap()
5783                .path,
5784            "Creator/AlbumB/folder.jpg"
5785        );
5786        assert!(!fs.has_dir("Creator/AlbumA"));
5787        assert!(fs.has_dir("Creator/AlbumB"));
5788    }
5789
5790    #[test]
5791    fn prune_empty_dirs_removes_only_empty_dirs() {
5792        // A direct exercise of the prune port's safety guarantees on a mixed
5793        // tree: nested empties go, anything holding a file (hidden ones too)
5794        // stays, and no file is touched.
5795        let fs = MemFs::new()
5796            .with_file("keep/full/song.flac", b"x".to_vec())
5797            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
5798            .with_dir("empty/leaf")
5799            .with_dir("nested/a/b/c");
5800
5801        fs.prune_empty_dirs("").unwrap();
5802
5803        // Every empty directory, however deeply nested, is pruned bottom-up.
5804        for gone in [
5805            "empty",
5806            "empty/leaf",
5807            "nested",
5808            "nested/a",
5809            "nested/a/b",
5810            "nested/a/b/c",
5811        ] {
5812            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
5813        }
5814        // A directory holding any file — including only a hidden dotfile — stays.
5815        assert!(fs.has_dir("keep"));
5816        assert!(fs.has_dir("keep/full"));
5817        assert!(fs.has_dir("hidden"));
5818        // No file was touched.
5819        assert!(fs.exists("keep/full/song.flac"));
5820        assert!(fs.exists("hidden/.suno-manifest.json"));
5821    }
5822
5823    #[test]
5824    fn prune_empty_dirs_never_removes_the_named_root() {
5825        // Pruning under a named root clears its empty children but keeps the
5826        // root itself, even when the root is now empty.
5827        let fs = MemFs::new().with_dir("empty/leaf");
5828        fs.prune_empty_dirs("empty").unwrap();
5829        assert!(fs.has_dir("empty"), "the named root is never removed");
5830        assert!(!fs.has_dir("empty/leaf"));
5831    }
5832
5833    #[test]
5834    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
5835        // If removing the old sidecar fails, the write is a per-clip failure
5836        // that never aborts the run and does NOT advance the state slot, so the
5837        // next identical run re-attempts the cleanup and the tree converges.
5838        let mut manifest = Manifest::new();
5839        let mut e = entry("a.flac", AudioFormat::Flac);
5840        e.cover_jpg = Some(ArtifactState {
5841            path: "AlbumA/cover.jpg".to_owned(),
5842            hash: "h1".to_owned(),
5843        });
5844        manifest.insert("a", e);
5845        let fs = MemFs::new()
5846            .with_file("a.flac", b"AUDIO".to_vec())
5847            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5848        let plan = Plan {
5849            actions: vec![Action::WriteArtifact {
5850                kind: ArtifactKind::CoverJpg,
5851                path: "AlbumB/cover.jpg".to_owned(),
5852                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5853                hash: "h1".to_owned(),
5854                owner_id: "a".to_owned(),
5855                content: None,
5856            }],
5857        };
5858        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5859
5860        // Run 1: the old-cover remove is forced to fail.
5861        fs.arm_fail_remove("AlbumA/cover.jpg");
5862        let first = run(
5863            &plan,
5864            &mut manifest,
5865            &[],
5866            &http,
5867            &fs,
5868            &StubFfmpeg::flac(),
5869            &RecordingClock::new(),
5870            &ExecOptions::default(),
5871        );
5872        assert_eq!(
5873            first.status,
5874            RunStatus::Completed,
5875            "a remove failure never aborts the run"
5876        );
5877        assert_eq!(first.failed(), 1);
5878        // The new cover is written but the old one lingers and the slot is stale.
5879        assert!(fs.exists("AlbumB/cover.jpg"));
5880        assert!(fs.exists("AlbumA/cover.jpg"));
5881        assert_eq!(
5882            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5883            "AlbumA/cover.jpg"
5884        );
5885        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5886
5887        // Run 2: the same plan re-runs with the fault cleared and converges.
5888        fs.disarm_fail_remove("AlbumA/cover.jpg");
5889        let second = run(
5890            &plan,
5891            &mut manifest,
5892            &[],
5893            &http,
5894            &fs,
5895            &StubFfmpeg::flac(),
5896            &RecordingClock::new(),
5897            &ExecOptions::default(),
5898        );
5899        assert_eq!(second.failed(), 0);
5900        assert!(fs.exists("AlbumB/cover.jpg"));
5901        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5902        assert_eq!(
5903            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5904            "AlbumB/cover.jpg"
5905        );
5906        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5907    }
5908
5909    #[test]
5910    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
5911        // The idempotent case: a content-only cover rewrite (hash drift, path
5912        // unchanged) attempts no remove and prunes no live directory. A remove
5913        // failure is armed on the cover path, so any spurious remove would
5914        // surface as a failure — none does.
5915        let mut manifest = Manifest::new();
5916        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
5917        e.cover_jpg = Some(ArtifactState {
5918            path: "Album/cover.jpg".to_owned(),
5919            hash: "h1".to_owned(),
5920        });
5921        manifest.insert("a", e);
5922        let fs = MemFs::new()
5923            .with_file("Album/a.mp3", b"AUDIO".to_vec())
5924            .with_file("Album/cover.jpg", b"old".to_vec());
5925        fs.arm_fail_remove("Album/cover.jpg");
5926        let plan = Plan {
5927            actions: vec![Action::WriteArtifact {
5928                kind: ArtifactKind::CoverJpg,
5929                path: "Album/cover.jpg".to_owned(),
5930                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5931                hash: "h2".to_owned(),
5932                owner_id: "a".to_owned(),
5933                content: None,
5934            }],
5935        };
5936        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5937
5938        let outcome = run(
5939            &plan,
5940            &mut manifest,
5941            &[],
5942            &http,
5943            &fs,
5944            &StubFfmpeg::flac(),
5945            &RecordingClock::new(),
5946            &ExecOptions::default(),
5947        );
5948
5949        assert_eq!(
5950            outcome.failed(),
5951            0,
5952            "no remove is attempted, so the armed failure never fires"
5953        );
5954        assert_eq!(outcome.artifacts_written, 1);
5955        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
5956        assert_eq!(
5957            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
5958            "h2"
5959        );
5960        // The live directory is untouched by prune.
5961        assert!(fs.has_dir("Album"));
5962    }
5963
5964    // ── Concurrency (issue #22) ─────────────────────────────────────
5965
5966    mod concurrency {
5967        use super::*;
5968        use crate::ffmpeg::FfmpegError;
5969        use crate::fs::{FileStat, FsError};
5970        use crate::http::{HttpRequest, TransportError};
5971        use std::future::Future;
5972        use std::pin::Pin;
5973        use std::sync::Arc;
5974        use std::sync::atomic::{AtomicUsize, Ordering};
5975        use std::task::{Context, Poll};
5976
5977        /// A future that pends exactly once before resolving, waking itself so a
5978        /// single-threaded executor re-polls. It forces the [`Http`] port to
5979        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
5980        /// each in-flight request and the true overlap becomes observable.
5981        #[derive(Default)]
5982        struct YieldOnce {
5983            yielded: bool,
5984        }
5985
5986        impl Future for YieldOnce {
5987            type Output = ();
5988            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
5989                if self.yielded {
5990                    Poll::Ready(())
5991                } else {
5992                    self.yielded = true;
5993                    cx.waker().wake_by_ref();
5994                    Poll::Pending
5995                }
5996            }
5997        }
5998
5999        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
6000        /// number of concurrently in-flight requests. Each `send` bumps a live
6001        /// counter, yields once (so peers can start), then delegates.
6002        struct GatedHttp {
6003            inner: ScriptedHttp,
6004            inflight: Arc<AtomicUsize>,
6005            peak: Arc<AtomicUsize>,
6006        }
6007
6008        impl GatedHttp {
6009            fn new(inner: ScriptedHttp) -> Self {
6010                Self {
6011                    inner,
6012                    inflight: Arc::new(AtomicUsize::new(0)),
6013                    peak: Arc::new(AtomicUsize::new(0)),
6014                }
6015            }
6016
6017            fn peak(&self) -> usize {
6018                self.peak.load(Ordering::SeqCst)
6019            }
6020        }
6021
6022        impl Http for GatedHttp {
6023            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
6024                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
6025                self.peak.fetch_max(now, Ordering::SeqCst);
6026                YieldOnce::default().await;
6027                let out = self.inner.send(request).await;
6028                self.inflight.fetch_sub(1, Ordering::SeqCst);
6029                out
6030            }
6031        }
6032
6033        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
6034            let c = clip(id);
6035            let d = desired(c.clone(), format);
6036            let action = Action::Download {
6037                clip: c.clone(),
6038                lineage: LineageContext::own_root(&c),
6039                path: d.path.clone(),
6040                format,
6041            };
6042            (c, d, action)
6043        }
6044
6045        fn opts_with(concurrency: u32) -> ExecOptions {
6046            ExecOptions {
6047                concurrency,
6048                ..small_poll()
6049            }
6050        }
6051
6052        #[test]
6053        fn concurrency_never_exceeds_the_configured_bound() {
6054            let count = 6;
6055            let concurrency = 3;
6056            let mut scripted = ScriptedHttp::new().with_auth();
6057            let mut actions = Vec::new();
6058            let mut desireds = Vec::new();
6059            for i in 0..count {
6060                let id = format!("c{i}");
6061                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6062                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6063                actions.push(action);
6064                desireds.push(d);
6065            }
6066            let http = GatedHttp::new(scripted);
6067            let fs = MemFs::new();
6068            let plan = Plan { actions };
6069            let mut manifest = Manifest::new();
6070
6071            let outcome = run_gated_fs(
6072                &plan,
6073                &mut manifest,
6074                &desireds,
6075                &http,
6076                &fs,
6077                &opts_with(concurrency),
6078            );
6079
6080            assert_eq!(outcome.downloaded, count);
6081            assert!(
6082                http.peak() <= concurrency as usize,
6083                "peak {} exceeded the bound {concurrency}",
6084                http.peak()
6085            );
6086            assert_eq!(
6087                http.peak(),
6088                concurrency as usize,
6089                "expected the run to saturate the bound"
6090            );
6091        }
6092
6093        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
6094        /// outcome. The client is built here so the limiter can be inspected by
6095        /// the caller-facing variant below.
6096        fn run_gated_fs(
6097            plan: &Plan,
6098            manifest: &mut Manifest,
6099            desired: &[Desired],
6100            http: &GatedHttp,
6101            fs: &MemFs,
6102            opts: &ExecOptions,
6103        ) -> ExecOutcome {
6104            let ffmpeg = StubFfmpeg::flac();
6105            let clock = RecordingClock::new();
6106            let mut albums = BTreeMap::new();
6107            let mut playlists = BTreeMap::new();
6108            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6109            pollster::block_on(execute(
6110                plan,
6111                manifest,
6112                &mut albums,
6113                &mut playlists,
6114                desired,
6115                &HashMap::new(),
6116                Ports {
6117                    client: &mut client,
6118                    http,
6119                    fs,
6120                    ffmpeg: &ffmpeg,
6121                    clock: &clock,
6122                },
6123                opts,
6124            ))
6125        }
6126
6127        #[test]
6128        fn a_failing_clip_does_not_abort_the_others() {
6129            let mut scripted = ScriptedHttp::new().with_auth();
6130            scripted = scripted
6131                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
6132                .route("bad.mp3", Reply::status(404))
6133                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
6134            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
6135            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
6136            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
6137            let http = GatedHttp::new(scripted);
6138            let fs = MemFs::new();
6139            let plan = Plan {
6140                actions: vec![a1, a2, a3],
6141            };
6142            let mut manifest = Manifest::new();
6143
6144            let outcome = run_gated_fs(
6145                &plan,
6146                &mut manifest,
6147                &[d1, d2, d3],
6148                &http,
6149                &fs,
6150                &opts_with(3),
6151            );
6152
6153            assert_eq!(outcome.downloaded, 2);
6154            assert_eq!(outcome.failed(), 1);
6155            assert_eq!(outcome.status, RunStatus::Completed);
6156            assert_eq!(outcome.failures[0].clip_id, "bad");
6157            assert!(manifest.get("ok1").is_some());
6158            assert!(manifest.get("ok2").is_some());
6159            assert!(manifest.get("bad").is_none());
6160        }
6161
6162        #[test]
6163        fn outcome_is_identical_across_concurrency_levels() {
6164            // A plan mixing successful and failing downloads with serial phase-2
6165            // actions (a skip and a delete), so both phases contribute.
6166            fn build() -> (Plan, Vec<Desired>) {
6167                let mut actions = Vec::new();
6168                let mut desireds = Vec::new();
6169                for id in ["a", "b", "c", "d"] {
6170                    let (_c, d, action) = download(id, AudioFormat::Mp3);
6171                    actions.push(action);
6172                    desireds.push(d);
6173                }
6174                // A failing download in the middle of the audio set.
6175                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
6176                actions.insert(2, ae);
6177                desireds.push(de);
6178                // Phase-2 actions.
6179                actions.push(Action::Skip {
6180                    clip_id: "gone".to_owned(),
6181                });
6182                actions.push(Action::Delete {
6183                    path: "old.mp3".to_owned(),
6184                    clip_id: "old".to_owned(),
6185                });
6186                (Plan { actions }, desireds)
6187            }
6188
6189            fn http() -> ScriptedHttp {
6190                ScriptedHttp::new()
6191                    .with_auth()
6192                    .route("a.mp3", Reply::ok(b"a".to_vec()))
6193                    .route("b.mp3", Reply::ok(b"b".to_vec()))
6194                    .route("c.mp3", Reply::ok(b"c".to_vec()))
6195                    .route("d.mp3", Reply::ok(b"d".to_vec()))
6196                    .route("fail.mp3", Reply::status(404))
6197            }
6198
6199            fn seed_manifest() -> Manifest {
6200                let mut m = Manifest::new();
6201                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
6202                m
6203            }
6204
6205            let (plan, desireds) = build();
6206
6207            let mut m1 = seed_manifest();
6208            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6209            let out1 = run_gated_fs(
6210                &plan,
6211                &mut m1,
6212                &desireds,
6213                &GatedHttp::new(http()),
6214                &fs1,
6215                &opts_with(1),
6216            );
6217
6218            let mut m8 = seed_manifest();
6219            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6220            let out8 = run_gated_fs(
6221                &plan,
6222                &mut m8,
6223                &desireds,
6224                &GatedHttp::new(http()),
6225                &fs8,
6226                &opts_with(8),
6227            );
6228
6229            assert_eq!(out1, out8, "outcome must not depend on concurrency");
6230            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
6231            assert_eq!(out8.downloaded, 4);
6232            assert_eq!(out8.deleted, 1);
6233            assert_eq!(out8.skipped, 1);
6234            assert_eq!(out8.failed(), 1);
6235        }
6236
6237        #[test]
6238        fn a_systemic_disk_full_aborts_promptly() {
6239            let count = 8;
6240            let concurrency = 2;
6241            let mut scripted = ScriptedHttp::new().with_auth();
6242            let mut actions = Vec::new();
6243            let mut desireds = Vec::new();
6244            for i in 0..count {
6245                let id = format!("d{i}");
6246                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6247                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6248                actions.push(action);
6249                desireds.push(d);
6250            }
6251            // The very first clip's write hits ENOSPC, a systemic failure.
6252            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
6253            let http = GatedHttp::new(scripted);
6254            let plan = Plan { actions };
6255            let mut manifest = Manifest::new();
6256
6257            let outcome = run_gated_fs(
6258                &plan,
6259                &mut manifest,
6260                &desireds,
6261                &http,
6262                &fs,
6263                &opts_with(concurrency),
6264            );
6265
6266            assert_eq!(outcome.status, RunStatus::DiskFull);
6267            assert!(
6268                outcome.downloaded < count,
6269                "a systemic abort must stop remaining work, downloaded {}",
6270                outcome.downloaded
6271            );
6272        }
6273
6274        #[test]
6275        fn limiter_records_a_rate_limit_under_concurrent_calls() {
6276            // Three concurrent FLAC renders; exactly one clip is throttled once
6277            // on its wav_file read. The shared limiter must record that single
6278            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
6279            // the mutex keeps the AIMD state correct under concurrency.
6280            let scripted = ScriptedHttp::new()
6281                .with_auth()
6282                .route_seq(
6283                    "/gen/x/wav_file/",
6284                    vec![
6285                        Reply::status(429),
6286                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
6287                    ],
6288                )
6289                .route(
6290                    "/gen/y/wav_file/",
6291                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
6292                )
6293                .route(
6294                    "/gen/z/wav_file/",
6295                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
6296                )
6297                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
6298                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
6299                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
6300
6301            let mut actions = Vec::new();
6302            let mut desireds = Vec::new();
6303            for id in ["x", "y", "z"] {
6304                let (_c, d, action) = download(id, AudioFormat::Flac);
6305                actions.push(action);
6306                desireds.push(d);
6307            }
6308            let plan = Plan { actions };
6309            let fs = MemFs::new();
6310            let ffmpeg = StubFfmpeg::flac();
6311            let clock = RecordingClock::new();
6312            let mut albums = BTreeMap::new();
6313            let mut playlists = BTreeMap::new();
6314            let mut manifest = Manifest::new();
6315            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6316
6317            let outcome = pollster::block_on(execute(
6318                &plan,
6319                &mut manifest,
6320                &mut albums,
6321                &mut playlists,
6322                &desireds,
6323                &HashMap::new(),
6324                Ports {
6325                    client: &mut client,
6326                    http: &scripted,
6327                    fs: &fs,
6328                    ffmpeg: &ffmpeg,
6329                    clock: &clock,
6330                },
6331                &opts_with(3),
6332            ));
6333
6334            assert_eq!(outcome.downloaded, 3);
6335            assert_eq!(outcome.failed(), 0);
6336            assert!(
6337                (client.limiter_rate() - 1.0).abs() < 1e-9,
6338                "one 429 must halve the rate to 1.0, got {}",
6339                client.limiter_rate()
6340            );
6341        }
6342
6343        #[test]
6344        fn a_download_is_committed_in_plan_order_around_a_rename() {
6345            // Plan order: rename "orig" away from shared.mp3 first, then download
6346            // a new clip into shared.mp3. A parallel executor that performed the
6347            // download's destination write off plan order would write shared.mp3
6348            // before the rename ran, letting the rename carry those fresh bytes
6349            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
6350            // Committing every destination effect serially in plan order keeps
6351            // moved.mp3 = the original and shared.mp3 = the new download.
6352            let c_new = clip("new");
6353            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
6354            d_new.path = "shared.mp3".to_owned();
6355            let plan = Plan {
6356                actions: vec![
6357                    Action::Rename {
6358                        from: "shared.mp3".to_owned(),
6359                        to: "moved.mp3".to_owned(),
6360                    },
6361                    Action::Download {
6362                        clip: c_new.clone(),
6363                        lineage: LineageContext::own_root(&c_new),
6364                        path: "shared.mp3".to_owned(),
6365                        format: AudioFormat::Mp3,
6366                    },
6367                ],
6368            };
6369            let scripted = ScriptedHttp::new()
6370                .with_auth()
6371                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
6372            let http = GatedHttp::new(scripted);
6373            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
6374            let mut manifest = Manifest::new();
6375            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
6376
6377            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
6378
6379            assert_eq!(outcome.renamed, 1);
6380            assert_eq!(outcome.downloaded, 1);
6381            assert_eq!(
6382                fs.read_file("moved.mp3").as_deref(),
6383                Some(&b"ORIGINAL"[..]),
6384                "the rename must carry the original bytes, untouched by the download"
6385            );
6386            let landed = fs.read_file("shared.mp3").expect("new download must land");
6387            assert_ne!(
6388                landed, b"ORIGINAL",
6389                "the new download must replace the moved original, not corrupt it"
6390            );
6391            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
6392            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
6393        }
6394
6395        #[test]
6396        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
6397            // A systemic disk-full abort strikes the download committed before the
6398            // reformat. Because the reformat's slow render is side-effect-free and
6399            // its destination write + old-file removal only happen in the serial
6400            // commit (which the abort skips), the old file survives and the
6401            // manifest still points at it: no removed-but-referenced file.
6402            let boom = clip("boom");
6403            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
6404            d_boom.path = "boom.mp3".to_owned();
6405            let reformer = clip("r");
6406            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
6407            let plan = Plan {
6408                actions: vec![
6409                    Action::Download {
6410                        clip: boom.clone(),
6411                        lineage: LineageContext::own_root(&boom),
6412                        path: "boom.mp3".to_owned(),
6413                        format: AudioFormat::Mp3,
6414                    },
6415                    Action::Reformat {
6416                        clip: reformer.clone(),
6417                        path: "r_new.mp3".to_owned(),
6418                        from_path: "r_old.flac".to_owned(),
6419                        from: AudioFormat::Flac,
6420                        to: AudioFormat::Mp3,
6421                    },
6422                ],
6423            };
6424            let scripted = ScriptedHttp::new()
6425                .with_auth()
6426                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
6427                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
6428            let http = GatedHttp::new(scripted);
6429            // The download's write hits ENOSPC, a systemic abort.
6430            let fs = MemFs::new()
6431                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
6432                .fail_write_out_of_space("boom.mp3");
6433            let mut manifest = Manifest::new();
6434            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
6435
6436            let outcome = run_gated_fs(
6437                &plan,
6438                &mut manifest,
6439                &[d_boom, d_reformer],
6440                &http,
6441                &fs,
6442                &opts_with(4),
6443            );
6444
6445            assert_eq!(outcome.status, RunStatus::DiskFull);
6446            assert!(
6447                fs.exists("r_old.flac"),
6448                "the old file must survive the abort"
6449            );
6450            assert!(
6451                !fs.exists("r_new.mp3"),
6452                "no reformatted file may be written"
6453            );
6454            let still = manifest.get("r").expect("the manifest must still track r");
6455            assert_eq!(
6456                still.path, "r_old.flac",
6457                "the manifest must still point at the surviving old file"
6458            );
6459            assert_eq!(still.format, AudioFormat::Flac);
6460        }
6461
6462        #[test]
6463        fn a_systemic_abort_leaves_no_untracked_destination_files() {
6464            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
6465            // and the rest never commit. Every file remaining on disk must be one
6466            // the manifest tracks: producers write nothing, so an abort cannot
6467            // strand an untracked file from an in-flight or buffered render.
6468            let mut scripted = ScriptedHttp::new().with_auth();
6469            let mut actions = Vec::new();
6470            let mut desireds = Vec::new();
6471            for id in ["a0", "a1", "boom", "a3", "a4"] {
6472                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
6473                let (_c, d, action) = download(id, AudioFormat::Mp3);
6474                actions.push(action);
6475                desireds.push(d);
6476            }
6477            let http = GatedHttp::new(scripted);
6478            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
6479            let plan = Plan { actions };
6480            let mut manifest = Manifest::new();
6481
6482            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
6483
6484            assert_eq!(outcome.status, RunStatus::DiskFull);
6485            let tracked: std::collections::BTreeSet<String> = manifest
6486                .entries
6487                .values()
6488                .map(|entry| entry.path.clone())
6489                .collect();
6490            for path in fs.paths() {
6491                assert!(
6492                    tracked.contains(&path),
6493                    "found an untracked destination file: {path}"
6494                );
6495            }
6496            assert!(
6497                !fs.exists("a3.mp3"),
6498                "uncommitted renders must not be on disk"
6499            );
6500            assert!(
6501                !fs.exists("a4.mp3"),
6502                "uncommitted renders must not be on disk"
6503            );
6504        }
6505
6506        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
6507        /// live: it bumps a shared counter (tracking the peak) when a transcode
6508        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
6509        /// The [transcode, write] window is a superset of the true in-memory hold,
6510        /// so the observed peak upper-bounds the real one.
6511        struct CountingFfmpeg {
6512            inner: StubFfmpeg,
6513            held: Arc<AtomicUsize>,
6514            peak: Arc<AtomicUsize>,
6515        }
6516
6517        impl Ffmpeg for CountingFfmpeg {
6518            fn wav_to_flac(
6519                &self,
6520                wav: &[u8],
6521            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6522                let fut = self.inner.wav_to_flac(wav);
6523                let held = self.held.clone();
6524                let peak = self.peak.clone();
6525                async move {
6526                    let out = fut.await;
6527                    if out.is_ok() {
6528                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
6529                        peak.fetch_max(now, Ordering::SeqCst);
6530                    }
6531                    out
6532                }
6533            }
6534
6535            fn mp4_to_webp(
6536                &self,
6537                mp4: &[u8],
6538                settings: WebpEncodeSettings,
6539            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6540                self.inner.mp4_to_webp(mp4, settings)
6541            }
6542        }
6543
6544        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
6545        /// payload counter on each committing write, closing the window opened by
6546        /// [`CountingFfmpeg`].
6547        struct CountingFs {
6548            inner: MemFs,
6549            held: Arc<AtomicUsize>,
6550        }
6551
6552        impl Filesystem for CountingFs {
6553            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
6554                let out = self.inner.write_atomic(path, bytes);
6555                self.held.fetch_sub(1, Ordering::SeqCst);
6556                out
6557            }
6558
6559            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
6560                self.inner.rename(from, to)
6561            }
6562
6563            fn remove(&self, path: &str) -> Result<(), FsError> {
6564                self.inner.remove(path)
6565            }
6566
6567            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
6568                self.inner.prune_empty_dirs(root)
6569            }
6570
6571            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
6572                self.inner.read(path)
6573            }
6574
6575            fn metadata(&self, path: &str) -> Option<FileStat> {
6576                self.inner.metadata(path)
6577            }
6578        }
6579
6580        #[test]
6581        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
6582            // Far more FLAC clips than the concurrency bound. The ordered buffered
6583            // render keeps at most about `concurrency` transcoded payloads live at
6584            // once (never the whole library), so peak held <= concurrency + 1.
6585            let count = 12;
6586            let concurrency = 3;
6587            let mut scripted = ScriptedHttp::new().with_auth();
6588            let mut actions = Vec::new();
6589            let mut desireds = Vec::new();
6590            for i in 0..count {
6591                let id = format!("f{i}");
6592                scripted = scripted
6593                    .route(
6594                        &format!("/gen/{id}/wav_file/"),
6595                        Reply::json(&format!(
6596                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
6597                        )),
6598                    )
6599                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
6600                let (_c, d, action) = download(&id, AudioFormat::Flac);
6601                actions.push(action);
6602                desireds.push(d);
6603            }
6604            let http = GatedHttp::new(scripted);
6605            let held = Arc::new(AtomicUsize::new(0));
6606            let peak = Arc::new(AtomicUsize::new(0));
6607            let ffmpeg = CountingFfmpeg {
6608                inner: StubFfmpeg::flac(),
6609                held: held.clone(),
6610                peak: peak.clone(),
6611            };
6612            let fs = CountingFs {
6613                inner: MemFs::new(),
6614                held: held.clone(),
6615            };
6616            let clock = RecordingClock::new();
6617            let mut albums = BTreeMap::new();
6618            let mut playlists = BTreeMap::new();
6619            let mut manifest = Manifest::new();
6620            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6621            let plan = Plan { actions };
6622
6623            let outcome = pollster::block_on(execute(
6624                &plan,
6625                &mut manifest,
6626                &mut albums,
6627                &mut playlists,
6628                &desireds,
6629                &HashMap::new(),
6630                Ports {
6631                    client: &mut client,
6632                    http: &http,
6633                    fs: &fs,
6634                    ffmpeg: &ffmpeg,
6635                    clock: &clock,
6636                },
6637                &opts_with(concurrency),
6638            ));
6639
6640            assert_eq!(outcome.downloaded, count as usize);
6641            assert_eq!(
6642                held.load(Ordering::SeqCst),
6643                0,
6644                "every payload must be committed"
6645            );
6646            assert!(
6647                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
6648                "peak live payloads {} exceeded the bound {}",
6649                peak.load(Ordering::SeqCst),
6650                concurrency + 1
6651            );
6652            assert!(
6653                peak.load(Ordering::SeqCst) >= 2,
6654                "the render should genuinely overlap, peak was {}",
6655                peak.load(Ordering::SeqCst)
6656            );
6657        }
6658    }
6659}