Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3, tag_wav};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` is the only `&mut` port: it performs the authenticated WAV render
158/// flow and so mutates its cached session. The rest are shared references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a mut SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// The audio-producing actions ([`Download`](Action::Download) and
186/// [`Reformat`](Action::Reformat)) run concurrently, bounded by
187/// [`ExecOptions::concurrency`]: their slow parts (WAV render, CDN download,
188/// transcode, tag) overlap while the order-sensitive Suno API calls are
189/// serialised behind an async mutex over the shared [`SunoClient`], keeping the
190/// adaptive limiter and JWT refresh correct. The remaining actions (retag,
191/// rename, delete, and artifact writes/deletes) then run serially in plan order.
192///
193/// The outcome is deterministic regardless of completion order: concurrent audio
194/// results are committed to the manifest in plan-index order, so the same plan
195/// always yields the same manifest and counts whatever the concurrency level. A
196/// per-clip failure is recorded and the run continues; only an auth failure or a
197/// full disk aborts, and it does so promptly by stopping further audio work.
198///
199/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
200/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
201/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
202/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
203/// run with the feature off) is tagged exactly as before. The synced `.lrc`
204/// sidecar itself is a generated artifact whose body the caller has already
205/// resolved into the plan, so it is written like any other text sidecar.
206#[allow(clippy::too_many_arguments)]
207pub async fn execute<H, F, G, C>(
208    plan: &Plan,
209    manifest: &mut Manifest,
210    albums: &mut BTreeMap<String, AlbumArt>,
211    playlists: &mut BTreeMap<String, PlaylistState>,
212    desired: &[Desired],
213    synced: &HashMap<String, AlignedLyrics>,
214    ports: Ports<'_, H, F, G, C>,
215    opts: &ExecOptions,
216) -> ExecOutcome
217where
218    H: Http,
219    F: Filesystem,
220    G: Ffmpeg,
221    C: Clock,
222{
223    let Ports {
224        client,
225        http,
226        fs,
227        ffmpeg,
228        clock,
229    } = ports;
230    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
231    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
232    // How many tracked artifact slots reference each path. The inline old-path
233    // cleanup removes a path only once nothing else holds it: each slot that
234    // moves away decrements its reference, and the removal fires only when the
235    // count reaches zero and no action writes the path this run. This keeps a
236    // live file a co-referencing slot still owns (a prior failed swap can leave
237    // two clips sharing a path) while letting the last slot to leave reclaim it,
238    // so nothing is orphaned either (#76).
239    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
240    for (_, entry) in manifest.iter() {
241        for path in entry.artifact_paths() {
242            *tracked_paths.entry(path.to_owned()).or_default() += 1;
243        }
244    }
245    for art in albums.values() {
246        for state in [
247            art.folder_jpg.as_ref(),
248            art.folder_webp.as_ref(),
249            art.folder_mp4.as_ref(),
250        ]
251        .into_iter()
252        .flatten()
253        {
254            *tracked_paths.entry(state.path.clone()).or_default() += 1;
255        }
256    }
257    for playlist in playlists.values() {
258        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
259    }
260    // Static cover art is otherwise fetched twice per clip (#89): once to embed
261    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
262    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
263    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
264    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
265    // its entry on use, so the map holds at most the covers for the clips in
266    // flight (bounded by `concurrency`), never the whole library.
267    let cover_wanted: HashSet<&str> = plan
268        .actions
269        .iter()
270        .filter_map(|action| match action {
271            Action::WriteArtifact {
272                kind: ArtifactKind::CoverJpg,
273                source_url,
274                ..
275            } if !source_url.is_empty() => Some(source_url.as_str()),
276            _ => None,
277        })
278        .collect();
279    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
280    // The `both` video-cover retention keeps `cover.webp` (transcoded) and
281    // `cover.mp4` (raw) for an album from the SAME `video_cover_url`. Cache that
282    // source on its first fetch so the second folder artifact drains it rather
283    // than fetching the same MP4 twice (#90 reuses the #89 fetch-once path).
284    let mut folder_cover_uses: HashMap<&str, u32> = HashMap::new();
285    for action in &plan.actions {
286        if let Action::WriteArtifact {
287            kind: ArtifactKind::FolderWebp | ArtifactKind::FolderMp4,
288            source_url,
289            ..
290        } = action
291            && !source_url.is_empty()
292        {
293            *folder_cover_uses.entry(source_url.as_str()).or_default() += 1;
294        }
295    }
296    let shared_cover_urls: HashSet<&str> = folder_cover_uses
297        .into_iter()
298        .filter(|(_, uses)| *uses > 1)
299        .map(|(url, _)| url)
300        .collect();
301    let ctx = Ctx {
302        http,
303        fs,
304        ffmpeg,
305        clock,
306        opts,
307        by_id: &by_id,
308        by_path: &by_path,
309        synced,
310        cover_cache: &cover_cache,
311        cover_wanted: &cover_wanted,
312        shared_cover_urls: &shared_cover_urls,
313    };
314
315    let mut outcome = ExecOutcome::default();
316    // Destinations whose write has actually committed this run, gating old-path
317    // cleanup so a vacated sidecar/stem is kept only when a *successful* write
318    // also targets it (#142). Serial commit order makes this a clean prefix.
319    let mut committed: BTreeSet<String> = BTreeSet::new();
320
321    // The audio-producing actions ([`Download`](Action::Download) /
322    // [`Reformat`](Action::Reformat)) render concurrently, but their work is
323    // deliberately split so that NO destination write, file removal, or manifest
324    // update happens off the plan's order:
325    //
326    // - the parallel producers ([`prepare_audio`](Ctx::prepare_audio)) do only
327    //   the slow, side-effect-free work (fetch the CDN/WAV bytes, transcode, and
328    //   tag), returning the tagged bytes; and
329    // - a single serial committer below writes those bytes to the destination,
330    //   removes any superseded file, and records the manifest entry, in strict
331    //   plan-index order, interleaved with the non-audio actions.
332    //
333    // The shared client is the only `&mut` port and its API calls must stay
334    // ordered, so it rides behind an async mutex; each producer locks it only for
335    // the brief WAV-render calls and runs the heavy work unlocked. Renders are
336    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
337    // so at most about `concurrency` tagged payloads are ever held in memory -
338    // never the whole library.
339    let client_lock = AsyncMutex::new(client);
340    let concurrency = opts.concurrency.max(1) as usize;
341    let ctx_ref = &ctx;
342    let client_lock_ref = &client_lock;
343    let mut renders = stream::iter(
344        plan.actions
345            .iter()
346            .filter(|action| is_audio_action(action))
347            .map(|action| async move { ctx_ref.prepare_audio(client_lock_ref, action).await }),
348    )
349    .buffered(concurrency);
350
351    for action in &plan.actions {
352        // Audio actions pull their pre-rendered bytes (yielded in plan order) and
353        // commit them here; every other action applies its own effect. Both the
354        // audio commit and the non-audio apply run serially, so all destination
355        // and manifest effects keep the plan's order exactly as the sequential
356        // executor did.
357        let result = if is_audio_action(action) {
358            match renders.next().await {
359                Some(Ok(rendered)) => ctx.commit_audio(manifest, rendered),
360                Some(Err(fail)) => Err(fail),
361                None => unreachable!("buffered yields one result per audio action"),
362            }
363        } else {
364            ctx.apply(
365                client_lock_ref,
366                action,
367                manifest,
368                albums,
369                playlists,
370                &mut tracked_paths,
371                &committed,
372            )
373            .await
374        };
375        match result {
376            Ok(effect) => {
377                outcome.record(effect);
378                // Record this action's destination now that its write succeeded.
379                // A later action vacating a path removes it only when no
380                // *committed* write also targets it; commit is strictly serial in
381                // plan order, so a planned-but-failed or not-yet-run write never
382                // protects a stale file from cleanup (#142).
383                if let Some(dest) = written_path(action) {
384                    committed.insert(dest.to_owned());
385                }
386            }
387            Err(fail) => {
388                let abort = abort_status(fail.class);
389                outcome.failures.push(Failure {
390                    clip_id: fail.clip_id,
391                    reason: fail.reason,
392                });
393                if let Some(status) = abort {
394                    // A systemic abort stops the run. Dropping the render stream
395                    // cancels any in-flight or completed-but-uncommitted producer;
396                    // because producers touch nothing on disk, the destination and
397                    // manifest are left exactly as the committed prefix wrote them,
398                    // with no untracked files and no removed-but-referenced file.
399                    outcome.status = status;
400                    break;
401                }
402            }
403        }
404    }
405    drop(renders);
406
407    // Renames and deletes can leave an album directory empty; prune those ghost
408    // directories bottom-up. This runs on both the completed and the aborted
409    // paths, and is best-effort: a prune failure is only a missed tidy that the
410    // next run repeats, never a reason to fail the run.
411    let _ = fs.prune_empty_dirs("");
412    outcome
413}
414
415/// Whether an action produces audio: it fetches, transcodes, and tags a clip's
416/// file. Its slow render runs in the concurrent phase; its destination write and
417/// manifest update are committed serially in plan order. Everything else touches
418/// the manifest, album, or playlist stores directly and runs serially.
419fn is_audio_action(action: &Action) -> bool {
420    matches!(action, Action::Download { .. } | Action::Reformat { .. })
421}
422
423/// The destination path an action writes on success, or `None` for actions that
424/// write no file (skips, deletes). The serial committer records this once the
425/// action succeeds, so a later action vacating that same path keeps it rather
426/// than removing a freshly written file (#142, #76).
427fn written_path(action: &Action) -> Option<&str> {
428    match action {
429        Action::Download { path, .. }
430        | Action::Reformat { path, .. }
431        | Action::WriteArtifact { path, .. }
432        | Action::WriteStem { path, .. } => Some(path),
433        Action::Rename { to, .. } => Some(to),
434        _ => None,
435    }
436}
437
438/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
439/// committer needs to place them. Produced concurrently and side-effect-free (no
440/// destination write, no removal, no manifest touch); [`commit_audio`] applies
441/// all of those in plan order.
442struct RenderedAudio {
443    clip_id: String,
444    path: String,
445    format: AudioFormat,
446    /// The superseded file to remove after the new one lands (a [`Reformat`]),
447    /// or `None` for a plain [`Download`].
448    from_path: Option<String>,
449    effect: Effect,
450    bytes: Vec<u8>,
451}
452
453/// What an applied action did, for the outcome counters.
454enum Effect {
455    Downloaded,
456    Reformatted,
457    Retagged,
458    Renamed,
459    Deleted,
460    Skipped,
461    ArtifactWritten,
462    ArtifactDeleted,
463}
464
465/// How a failure should be handled (SYNC-17).
466#[derive(Debug, Clone, Copy)]
467enum Class {
468    /// Stop the account run; do not retry.
469    Auth,
470    /// Stop the account run: a full disk is systemic, like auth, so aborting
471    /// beats skipping every remaining clip (each of which would first burn a
472    /// server-side WAV-render budget before failing the same way).
473    Disk,
474    /// Retry a bounded number of times, then record and skip.
475    Transient,
476    /// Record and skip immediately.
477    Permanent,
478}
479
480/// A classified action failure attributed to a clip.
481struct Fail {
482    class: Class,
483    clip_id: String,
484    reason: String,
485}
486
487/// The run-ending status for a failure class, or `None` when the failure is
488/// per-clip and the run continues.
489fn abort_status(class: Class) -> Option<RunStatus> {
490    match class {
491        Class::Auth => Some(RunStatus::AuthAborted),
492        Class::Disk => Some(RunStatus::DiskFull),
493        Class::Transient | Class::Permanent => None,
494    }
495}
496
497fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
498    Fail {
499        class: Class::Auth,
500        clip_id: clip_id.into(),
501        reason: reason.into(),
502    }
503}
504
505fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
506    Fail {
507        class: Class::Transient,
508        clip_id: clip_id.into(),
509        reason: reason.into(),
510    }
511}
512
513fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
514    Fail {
515        class: Class::Permanent,
516        clip_id: clip_id.into(),
517        reason: reason.into(),
518    }
519}
520
521fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
522    Fail {
523        class: Class::Disk,
524        clip_id: clip_id.into(),
525        reason: reason.into(),
526    }
527}
528
529/// Whether an artifact kind is album-scoped folder art (owned by a root id and
530/// recorded on the album store) rather than a per-clip sidecar (recorded on the
531/// manifest).
532fn is_album_kind(kind: ArtifactKind) -> bool {
533    matches!(
534        kind,
535        ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4
536    )
537}
538
539/// True for the library-scoped playlist artifact, routed to the playlist store.
540fn is_playlist_kind(kind: ArtifactKind) -> bool {
541    matches!(kind, ArtifactKind::Playlist)
542}
543
544/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
545/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
546/// root/playlist id that is deliberately absent from the manifest.
547fn is_per_clip_kind(kind: ArtifactKind) -> bool {
548    matches!(
549        kind,
550        ArtifactKind::CoverJpg
551            | ArtifactKind::CoverWebp
552            | ArtifactKind::DetailsTxt
553            | ArtifactKind::LyricsTxt
554            | ArtifactKind::Lrc
555            | ArtifactKind::VideoMp4
556    )
557}
558
559/// Recover a playlist's display name from its `.m3u8` path's file stem.
560///
561/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
562/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
563/// this recovered name is a convenience for humans and its lossiness (the
564/// sanitiser is not reversible) never affects a decision.
565fn playlist_name_from_path(path: &str) -> String {
566    std::path::Path::new(path)
567        .file_stem()
568        .map(|stem| stem.to_string_lossy().into_owned())
569        .unwrap_or_default()
570}
571
572/// A classified fetch failure, not yet attributed to a clip.
573struct FetchError {
574    class: Class,
575    reason: String,
576    retry_after: Option<Duration>,
577}
578
579impl FetchError {
580    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
581        Self {
582            class: Class::Transient,
583            reason: reason.into(),
584            retry_after,
585        }
586    }
587
588    fn permanent(reason: impl Into<String>) -> Self {
589        Self {
590            class: Class::Permanent,
591            reason: reason.into(),
592            retry_after: None,
593        }
594    }
595
596    fn attribute(self, clip_id: &str) -> Fail {
597        Fail {
598            class: self.class,
599            clip_id: clip_id.to_owned(),
600            reason: self.reason,
601        }
602    }
603}
604
605/// The shared, read-only context threaded through every action handler.
606struct Ctx<'a, H, F, G, C> {
607    http: &'a H,
608    fs: &'a F,
609    ffmpeg: &'a G,
610    clock: &'a C,
611    opts: &'a ExecOptions,
612    by_id: &'a HashMap<&'a str, &'a Desired>,
613    by_path: &'a HashMap<&'a str, &'a Desired>,
614    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
615    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
616    /// lyric text; a clip absent here is tagged exactly as before. Populated by
617    /// the caller (the fetch is IO), so the engine stays free of direct IO.
618    synced: &'a HashMap<String, AlignedLyrics>,
619    /// Static cover art the audio producer already fetched to embed in the tag,
620    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
621    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
622    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
623    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
624    /// concurrent producers only ever insert, and the lock is never held across an
625    /// await.
626    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
627    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
628    /// a cover only when its URL is here, so a clip whose cover is embedded but
629    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
630    cover_wanted: &'a HashSet<&'a str>,
631    /// Album video-cover source URLs fetched by more than one folder artifact
632    /// this run. The `both` retention derives `cover.webp` (transcoded) and
633    /// `cover.mp4` (raw) from the SAME `video_cover_url`; the first fetch caches
634    /// the raw source here so the sibling drains it instead of re-fetching (#90
635    /// reuses the #89 fetch-once path). `FolderWebp` sorts before `FolderMp4`, so
636    /// the raw source is always cached before the raw sidecar reads it.
637    shared_cover_urls: &'a HashSet<&'a str>,
638}
639
640impl<H, F, G, C> Ctx<'_, H, F, G, C>
641where
642    H: Http,
643    F: Filesystem,
644    G: Ffmpeg,
645    C: Clock,
646{
647    /// Apply one non-audio action, returning what it did or why it failed.
648    ///
649    /// Audio actions ([`Download`](Action::Download) /
650    /// [`Reformat`](Action::Reformat)) run in the concurrent phase through
651    /// [`prepare_audio`](Self::prepare_audio) and never reach here.
652    #[allow(clippy::too_many_arguments)]
653    async fn apply(
654        &self,
655        client_lock: &ClientLock<'_, C>,
656        action: &Action,
657        manifest: &mut Manifest,
658        albums: &mut BTreeMap<String, AlbumArt>,
659        playlists: &mut BTreeMap<String, PlaylistState>,
660        tracked_paths: &mut HashMap<String, u32>,
661        committed: &BTreeSet<String>,
662    ) -> Result<Effect, Fail> {
663        match action {
664            Action::Download { .. } | Action::Reformat { .. } => {
665                unreachable!("audio actions are applied in the concurrent phase")
666            }
667            Action::Retag {
668                clip,
669                lineage,
670                path,
671            } => self.retag(manifest, clip, lineage, path).await,
672            Action::Rename { from, to } => self.rename(manifest, from, to),
673            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
674            Action::Skip { clip_id } => {
675                self.refresh_preserve(manifest, clip_id);
676                Ok(Effect::Skipped)
677            }
678            Action::WriteArtifact {
679                kind,
680                path,
681                source_url,
682                hash,
683                owner_id,
684                content,
685            } => {
686                self.write_artifact(
687                    manifest,
688                    albums,
689                    playlists,
690                    *kind,
691                    path,
692                    source_url,
693                    hash,
694                    owner_id,
695                    content.as_deref(),
696                    tracked_paths,
697                    committed,
698                )
699                .await
700            }
701            Action::DeleteArtifact {
702                kind,
703                path,
704                owner_id,
705            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
706            Action::WriteStem {
707                clip_id,
708                key,
709                stem_id,
710                path,
711                source_url,
712                format,
713                hash,
714            } => {
715                self.write_stem(
716                    client_lock,
717                    manifest,
718                    clip_id,
719                    key,
720                    stem_id,
721                    path,
722                    source_url,
723                    *format,
724                    hash,
725                    committed,
726                )
727                .await
728            }
729            Action::DeleteStem { clip_id, key, path } => {
730                self.delete_stem(manifest, clip_id, key, path)
731            }
732        }
733    }
734
735    /// Render one audio action's tagged bytes, side-effect-free.
736    ///
737    /// This is the concurrent part: it fetches, transcodes, and tags the file
738    /// (through shared ports, plus the client behind `client_lock`), then returns
739    /// the bytes and where they must go. It deliberately writes nothing, removes
740    /// nothing, and never touches `manifest`, so many run at once and an aborted
741    /// run can drop them with no destination or manifest effect. The serial
742    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
743    async fn prepare_audio(
744        &self,
745        client_lock: &ClientLock<'_, C>,
746        action: &Action,
747    ) -> Result<RenderedAudio, Fail> {
748        match action {
749            Action::Download {
750                clip,
751                lineage,
752                path,
753                format,
754            } => {
755                let bytes = self
756                    .produce_audio(client_lock, clip, lineage, *format)
757                    .await?;
758                Ok(RenderedAudio {
759                    clip_id: clip.id.clone(),
760                    path: path.clone(),
761                    format: *format,
762                    from_path: None,
763                    effect: Effect::Downloaded,
764                    bytes,
765                })
766            }
767            Action::Reformat {
768                clip,
769                path,
770                from_path,
771                from: _,
772                to,
773            } => {
774                // A Reformat action carries no lineage, so recover it from the
775                // desired set (the same context that drove naming and the hash),
776                // falling back to a self-rooted context when the clip is not in
777                // the current selection.
778                let lineage = self
779                    .by_id
780                    .get(clip.id.as_str())
781                    .map(|d| d.lineage.clone())
782                    .unwrap_or_else(|| LineageContext::own_root(clip));
783                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
784                Ok(RenderedAudio {
785                    clip_id: clip.id.clone(),
786                    path: path.clone(),
787                    format: *to,
788                    from_path: Some(from_path.clone()),
789                    effect: Effect::Reformatted,
790                    bytes,
791                })
792            }
793            _ => unreachable!("prepare_audio only handles audio actions"),
794        }
795    }
796
797    /// Commit one rendered audio result serially, in plan order.
798    ///
799    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
800    /// the superseded file, then records the manifest entry. Ordering the write
801    /// before the removal keeps a crash from losing both copies; keeping all of
802    /// this off the concurrent phase preserves the sequential executor's plan-order
803    /// guarantee for every destination and manifest effect.
804    fn commit_audio(
805        &self,
806        manifest: &mut Manifest,
807        rendered: RenderedAudio,
808    ) -> Result<Effect, Fail> {
809        let RenderedAudio {
810            clip_id,
811            path,
812            format,
813            from_path,
814            effect,
815            bytes,
816        } = rendered;
817        let size = self.write_verify(&clip_id, &path, &bytes)?;
818        if let Some(from) = from_path {
819            // The new file is safely in place; only now drop the old rendering.
820            self.fs.remove(&from).map_err(|err| {
821                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
822            })?;
823        }
824        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
825        Ok(effect)
826    }
827
828    /// Re-tag the existing file in place to match current metadata and art.
829    async fn retag(
830        &self,
831        manifest: &mut Manifest,
832        clip: &Clip,
833        lineage: &LineageContext,
834        path: &str,
835    ) -> Result<Effect, Fail> {
836        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
837            return Err(permanent_fail(
838                &clip.id,
839                "retag target missing from manifest",
840            ));
841        };
842
843        if format == AudioFormat::Wav {
844            let (meta, synced) = self.track_meta(clip, lineage);
845            let cover = self.fetch_cover(clip).await;
846            let existing = self.fs.read(path).map_err(|err| {
847                permanent_fail(&clip.id, format!("could not read for retag: {err}"))
848            })?;
849            let tagged = tag_wav(&existing, &meta, cover.as_deref(), synced)
850                .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
851            let size = self.write_verify(&clip.id, path, &tagged)?;
852            self.refresh_hashes(manifest, &clip.id, Some(size));
853            return Ok(Effect::Retagged);
854        }
855
856        let (meta, synced) = self.track_meta(clip, lineage);
857        let cover = self.fetch_cover(clip).await;
858        let existing = self
859            .fs
860            .read(path)
861            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
862        let tagged = match format {
863            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
864            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
865            AudioFormat::Wav => unreachable!("WAV handled above"),
866        }
867        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
868        let size = self.write_verify(&clip.id, path, &tagged)?;
869        self.refresh_hashes(manifest, &clip.id, Some(size));
870        Ok(Effect::Retagged)
871    }
872
873    /// Move the file and update the entry's path (and protection).
874    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
875        let label = self
876            .by_path
877            .get(to)
878            .map(|d| d.clip.id.clone())
879            .unwrap_or_else(|| to.to_owned());
880        self.fs.rename(from, to).map_err(|err| {
881            if err.is_out_of_space() {
882                disk_fail(label, "disk full: no space left to rename")
883            } else {
884                permanent_fail(label, format!("rename failed: {err}"))
885            }
886        })?;
887
888        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
889            manifest
890                .entries
891                .iter()
892                .find(|(_, entry)| entry.path == from)
893                .map(|(id, _)| id.clone())
894        });
895        if let Some(id) = clip_id
896            && let Some(entry) = manifest.entries.get_mut(&id)
897        {
898            entry.path = to.to_owned();
899            if let Some(d) = self.by_path.get(to) {
900                entry.preserve = preserve_for(d);
901            }
902        }
903        Ok(Effect::Renamed)
904    }
905
906    /// Remove the file and drop the manifest entry.
907    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
908        self.fs
909            .remove(path)
910            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
911        manifest.remove(clip_id);
912        Ok(Effect::Deleted)
913    }
914
915    /// Fetch an artifact's bytes, write them atomically, then record the sidecar
916    /// on the owning manifest entry.
917    ///
918    /// The fetch and write share the audio path's resilience: `fetch_bytes`
919    /// retries transient failures and verifies `Content-Length`, and
920    /// `write_verify` confirms the on-disk size. A failure is attributed to the
921    /// owning clip and returned as a per-clip [`Fail`], so a bad sidecar never
922    /// aborts the whole run (only an auth failure or a full disk does, matching
923    /// audio).
924    ///
925    /// The bytes written depend on the kind: a static cover is the fetched image
926    /// verbatim, while an animated cover is the clip's MP4 preview transcoded to
927    /// WebP through the ffmpeg port (see [`artifact_bytes`](Self::artifact_bytes)).
928    ///
929    /// A sidecar is only ever written for a clip whose audio is present: a
930    /// successful `Download`/`Reformat` creates the manifest entry earlier in
931    /// this run, and a prior-run clip already has one. So an absent owning entry
932    /// means the audio failed or never existed this run; we skip (no fetch, no
933    /// write) rather than strand an untracked sidecar with no owning audio.
934    ///
935    /// Folder art ([`FolderJpg`](ArtifactKind::FolderJpg),
936    /// [`FolderWebp`](ArtifactKind::FolderWebp), and
937    /// [`FolderMp4`](ArtifactKind::FolderMp4)) is album-scoped: its `owner_id`
938    /// is the album's stable root id, not a manifest clip, so it skips the
939    /// manifest presence guard and records its state on the album store instead.
940    ///
941    /// When a title or album change moves the audio, reconcile re-emits this
942    /// write at the NEW path; this handler then removes the sidecar left at the
943    /// artifact's previously tracked path, moving it rather than orphaning it.
944    /// The removal happens only after the new file is safely written, and a
945    /// remove failure returns before the state slot advances, so the next run
946    /// re-plans the identical write and retries — self-healing, never an orphan.
947    #[allow(clippy::too_many_arguments)]
948    async fn write_artifact(
949        &self,
950        manifest: &mut Manifest,
951        albums: &mut BTreeMap<String, AlbumArt>,
952        playlists: &mut BTreeMap<String, PlaylistState>,
953        kind: ArtifactKind,
954        path: &str,
955        source_url: &str,
956        hash: &str,
957        owner_id: &str,
958        content: Option<&str>,
959        tracked_paths: &mut HashMap<String, u32>,
960        committed: &BTreeSet<String>,
961    ) -> Result<Effect, Fail> {
962        // A per-song sidecar needs its owning clip's manifest entry; album and
963        // playlist kinds are keyed elsewhere and skip this guard.
964        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
965            // The owning audio never landed this run, so this sidecar is skipped
966            // and will never drain a cover the producer cached for it. Drop that
967            // entry now: an insert without a matching sidecar write must not
968            // outlive its clip, keeping `cover_cache` bounded to the clips in
969            // flight (#89). A non-cover kind has no entry here, so this is a
970            // harmless no-op for them.
971            self.cover_cache
972                .lock()
973                .expect("cover cache mutex poisoned")
974                .remove(source_url);
975            return Ok(Effect::Skipped);
976        }
977        // Capture the path this artifact was last tracked at, BEFORE the slot is
978        // overwritten below, so a path-changing write (a title/album rename that
979        // moves the audio) can clean up the old sidecar it left behind. Cover
980        // kinds live on the manifest, folder kinds on the album store; playlists
981        // reconcile their own old-path delete and so opt out here.
982        let old_path = match kind {
983            ArtifactKind::CoverJpg => manifest
984                .get(owner_id)
985                .and_then(|e| e.cover_jpg.as_ref())
986                .map(|s| s.path.clone()),
987            ArtifactKind::CoverWebp => manifest
988                .get(owner_id)
989                .and_then(|e| e.cover_webp.as_ref())
990                .map(|s| s.path.clone()),
991            ArtifactKind::DetailsTxt => manifest
992                .get(owner_id)
993                .and_then(|e| e.details_txt.as_ref())
994                .map(|s| s.path.clone()),
995            ArtifactKind::LyricsTxt => manifest
996                .get(owner_id)
997                .and_then(|e| e.lyrics_txt.as_ref())
998                .map(|s| s.path.clone()),
999            ArtifactKind::Lrc => manifest
1000                .get(owner_id)
1001                .and_then(|e| e.lrc.as_ref())
1002                .map(|s| s.path.clone()),
1003            ArtifactKind::VideoMp4 => manifest
1004                .get(owner_id)
1005                .and_then(|e| e.video_mp4.as_ref())
1006                .map(|s| s.path.clone()),
1007            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4 => albums
1008                .get(owner_id)
1009                .and_then(|a| a.artifact(kind))
1010                .map(|s| s.path.clone()),
1011            ArtifactKind::Playlist => None,
1012        };
1013        // A generated artifact (a playlist) carries its body inline and never
1014        // touches the network; a fetched one pulls (and transcodes) its source.
1015        let bytes = match content {
1016            Some(text) => text.as_bytes().to_vec(),
1017            None => self.artifact_bytes(kind, source_url, owner_id).await?,
1018        };
1019        self.write_verify(owner_id, path, &bytes)?;
1020        // The new sidecar is safely in place; only now drop a stale copy left at
1021        // the previous path (the audio moved). `remove` is idempotent, so an
1022        // already-absent old file is fine. On a genuine remove failure we return
1023        // BEFORE updating the slot, leaving the manifest/album pointing at the
1024        // old path: the next run sees the same path drift, re-plans this write,
1025        // and retries the cleanup — convergent, no orphan persists.
1026        //
1027        // The removal is gated so it can never delete a live file (#76). This
1028        // slot is releasing `old`, so drop its reference in `tracked_paths`; the
1029        // file is removed only once nothing else holds it — no other tracked slot
1030        // still references it (count now zero) and no *committed* write this run
1031        // has already placed a file there (`committed`, the commit-tracked twin of
1032        // `suppress_path_aliasing`). On a path swap (A: x -> y while B: y -> x)
1033        // the earlier write commits its path, so the later mover keeps it; when
1034        // two slots share a path after a prior failed swap, the reference count
1035        // keeps it. But a merely *planned* colliding write that later fails no
1036        // longer protects a stale file, so it is cleaned up rather than orphaned
1037        // (#142).
1038        if let Some(old) = old_path.as_deref()
1039            && !old.is_empty()
1040            && old != path
1041        {
1042            let still_referenced = tracked_paths
1043                .get_mut(old)
1044                .map(|count| {
1045                    *count = count.saturating_sub(1);
1046                    *count > 0
1047                })
1048                .unwrap_or(false);
1049            if !still_referenced && !committed.contains(old) {
1050                self.fs.remove(old).map_err(|err| {
1051                    permanent_fail(
1052                        owner_id,
1053                        format!("could not remove old sidecar {old}: {err}"),
1054                    )
1055                })?;
1056            }
1057        }
1058        if is_album_kind(kind) {
1059            albums.entry(owner_id.to_owned()).or_default().set(
1060                kind,
1061                Some(ArtifactState {
1062                    path: path.to_owned(),
1063                    hash: hash.to_owned(),
1064                }),
1065            );
1066        } else if is_playlist_kind(kind) {
1067            playlists.insert(
1068                owner_id.to_owned(),
1069                PlaylistState {
1070                    name: playlist_name_from_path(path),
1071                    path: path.to_owned(),
1072                    hash: hash.to_owned(),
1073                },
1074            );
1075        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1076            set_manifest_artifact(
1077                entry,
1078                kind,
1079                Some(ArtifactState {
1080                    path: path.to_owned(),
1081                    hash: hash.to_owned(),
1082                }),
1083            );
1084        }
1085        Ok(Effect::ArtifactWritten)
1086    }
1087
1088    /// Produce a sidecar's bytes from its source, branching on kind.
1089    ///
1090    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1091    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1092    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1093    /// ffmpeg port; every other kind is the fetched source verbatim (the static
1094    /// [`CoverJpg`](ArtifactKind::CoverJpg) / album [`FolderJpg`](ArtifactKind::FolderJpg)
1095    /// image, or the raw album [`FolderMp4`](ArtifactKind::FolderMp4) whose
1096    /// `video_cover_url` is kept untranscoded). A fetch or transcode failure
1097    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1098    /// disk-full transcode, which aborts the run like the audio FLAC path.
1099    async fn artifact_bytes(
1100        &self,
1101        kind: ArtifactKind,
1102        source_url: &str,
1103        owner_id: &str,
1104    ) -> Result<Vec<u8>, Fail> {
1105        // Reuse the cover the audio producer already fetched for the embedded tag
1106        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1107        // is taken and dropped in its own statement so it never spans the await.
1108        let cached = self
1109            .cover_cache
1110            .lock()
1111            .expect("cover cache mutex poisoned")
1112            .remove(source_url);
1113        let source = match cached {
1114            Some(bytes) => bytes,
1115            None => {
1116                let fetched = self
1117                    .fetch_bytes(source_url)
1118                    .await
1119                    .map_err(|err| err.attribute(owner_id))?;
1120                // Cache the raw source when a sibling folder artifact will fetch
1121                // the same URL (the `both` retention: cover.webp + cover.mp4), so
1122                // it is fetched exactly once. Bounded to shared URLs and drained
1123                // on the sibling's use.
1124                if self.shared_cover_urls.contains(source_url) {
1125                    self.cover_cache
1126                        .lock()
1127                        .expect("cover cache mutex poisoned")
1128                        .insert(source_url.to_owned(), fetched.clone());
1129                }
1130                fetched
1131            }
1132        };
1133        match kind {
1134            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1135                .ffmpeg
1136                .mp4_to_webp(&source, self.opts.cover_webp)
1137                .await
1138                .map_err(|err| {
1139                    if err.is_out_of_space() {
1140                        disk_fail(owner_id, "disk full: no space left to transcode")
1141                    } else {
1142                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1143                    }
1144                }),
1145            // The text sidecars are generated and always carry inline content, so
1146            // `write_artifact` never reaches this fetch path for them. Guard it so
1147            // a future miswiring fails loudly rather than fetching a URL.
1148            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1149                permanent_fail(owner_id, "text sidecar requires inline content"),
1150            ),
1151            ArtifactKind::CoverJpg
1152            | ArtifactKind::FolderJpg
1153            | ArtifactKind::FolderMp4
1154            | ArtifactKind::Playlist
1155            | ArtifactKind::VideoMp4 => Ok(source),
1156        }
1157    }
1158
1159    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1160    ///
1161    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1162    /// When the owning entry is already gone (its audio was deleted earlier this
1163    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1164    ///
1165    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1166    /// the album's root id, not on a manifest clip.
1167    ///
1168    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1169    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1170    /// untracked, but the design stays convergent rather than transactional: the
1171    /// next run re-plans the same removal and retries, and any directory it would
1172    /// have emptied is pruned once the file finally clears.
1173    fn delete_artifact(
1174        &self,
1175        manifest: &mut Manifest,
1176        albums: &mut BTreeMap<String, AlbumArt>,
1177        playlists: &mut BTreeMap<String, PlaylistState>,
1178        kind: ArtifactKind,
1179        path: &str,
1180        owner_id: &str,
1181    ) -> Result<Effect, Fail> {
1182        self.fs
1183            .remove(path)
1184            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1185        if is_album_kind(kind) {
1186            if let Some(art) = albums.get_mut(owner_id) {
1187                art.set(kind, None);
1188                if art.is_empty() {
1189                    albums.remove(owner_id);
1190                }
1191            }
1192        } else if is_playlist_kind(kind) {
1193            playlists.remove(owner_id);
1194        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1195            set_manifest_artifact(entry, kind, None);
1196        }
1197        Ok(Effect::ArtifactDeleted)
1198    }
1199
1200    /// Fetch one stem's bytes, write them atomically, then record the stem on
1201    /// the owning clip's keyed stem map.
1202    ///
1203    /// Mirrors [`write_artifact`](Self::write_artifact) for the keyed-stem case,
1204    /// sharing the fetch resilience (`fetch_bytes` retries and verifies
1205    /// `Content-Length`) and the atomic size-verified write. A stem is only ever
1206    /// written for a clip whose audio is present, so an absent owning manifest
1207    /// entry means the audio failed or never existed this run; we skip rather
1208    /// than strand an untracked stem with no owning audio.
1209    ///
1210    /// Stems are stored RAW in their native container and are NEVER transcoded to
1211    /// FLAC, even when the song's own format is FLAC — they are the deliberate
1212    /// exception. A `Wav` stem is rendered through the free `convert_wav` flow
1213    /// (see [`fetch_stem_bytes`](Self::fetch_stem_bytes)); an `Mp3` stem is fetched
1214    /// straight from its public CDN url. Either way the bytes land verbatim at
1215    /// `path`, whose extension already matches the stem format.
1216    ///
1217    /// When a title/album change moves the song, reconcile re-emits this write at
1218    /// the NEW path; this handler then removes the stem left at the previously
1219    /// tracked path, moving it rather than orphaning it. The removal happens only
1220    /// after the new file is safely written and only when nothing else this run
1221    /// writes that path, and a remove failure returns before the slot advances so
1222    /// the next run re-plans the identical write and retries — self-healing.
1223    #[allow(clippy::too_many_arguments)]
1224    async fn write_stem(
1225        &self,
1226        client_lock: &ClientLock<'_, C>,
1227        manifest: &mut Manifest,
1228        clip_id: &str,
1229        key: &str,
1230        stem_id: &str,
1231        path: &str,
1232        source_url: &str,
1233        format: StemFormat,
1234        hash: &str,
1235        committed: &BTreeSet<String>,
1236    ) -> Result<Effect, Fail> {
1237        // A stem needs its owning clip's manifest entry (its audio must exist).
1238        if manifest.get(clip_id).is_none() {
1239            return Ok(Effect::Skipped);
1240        }
1241        let old_path = manifest
1242            .get(clip_id)
1243            .and_then(|e| e.stems.get(key))
1244            .map(|s| s.path.clone());
1245        let bytes = self
1246            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1247            .await?;
1248        self.write_verify(clip_id, path, &bytes)?;
1249        // The new stem is in place; only now drop a stale copy left at the old
1250        // path (the song moved, or the stem format changed). `remove` is
1251        // idempotent. A path a *committed* write this run has already placed is
1252        // never removed (the commit-tracked twin of `suppress_path_aliasing`); a
1253        // merely planned write that later fails no longer protects a stale file
1254        // (#142). On a genuine remove failure we return BEFORE updating the slot,
1255        // so the next run re-plans the same write and retries the cleanup.
1256        if let Some(old) = old_path.as_deref()
1257            && !old.is_empty()
1258            && old != path
1259            && !committed.contains(old)
1260        {
1261            self.fs.remove(old).map_err(|err| {
1262                permanent_fail(clip_id, format!("could not remove old stem {old}: {err}"))
1263            })?;
1264        }
1265        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1266            set_manifest_stem(
1267                entry,
1268                key,
1269                Some(ArtifactState {
1270                    path: path.to_owned(),
1271                    hash: hash.to_owned(),
1272                }),
1273            );
1274        }
1275        Ok(Effect::ArtifactWritten)
1276    }
1277
1278    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1279    ///
1280    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1281    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1282    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1283    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1284    /// stem with no id to render) downloads its public CDN url directly. Stems
1285    /// are the deliberate exception to the source format: the bytes are returned
1286    /// exactly as delivered and are never re-encoded to FLAC.
1287    async fn fetch_stem_bytes(
1288        &self,
1289        client_lock: &ClientLock<'_, C>,
1290        clip_id: &str,
1291        stem_id: &str,
1292        source_url: &str,
1293        format: StemFormat,
1294    ) -> Result<Vec<u8>, Fail> {
1295        let url = match format {
1296            StemFormat::Wav if !stem_id.is_empty() => {
1297                match self.resolve_wav_url(client_lock, stem_id).await? {
1298                    Some(url) => url,
1299                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1300                }
1301            }
1302            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1303            _ => source_url.to_owned(),
1304        };
1305        self.fetch_bytes(&url)
1306            .await
1307            .map_err(|err| err.attribute(clip_id))
1308    }
1309
1310    /// Remove one stem file and clear its slot in the owning clip's stem map.
1311    ///
1312    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1313    /// the owning entry is already gone (its audio was deleted earlier this run,
1314    /// co-deleting the stem), there is no slot to clear and that is fine; the
1315    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1316    fn delete_stem(
1317        &self,
1318        manifest: &mut Manifest,
1319        clip_id: &str,
1320        key: &str,
1321        path: &str,
1322    ) -> Result<Effect, Fail> {
1323        self.fs
1324            .remove(path)
1325            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1326        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1327            set_manifest_stem(entry, key, None);
1328        }
1329        Ok(Effect::ArtifactDeleted)
1330    }
1331
1332    /// Download (and transcode/tag) the audio for `clip` in `format`.
1333    async fn produce_audio(
1334        &self,
1335        client_lock: &ClientLock<'_, C>,
1336        clip: &Clip,
1337        lineage: &LineageContext,
1338        format: AudioFormat,
1339    ) -> Result<Vec<u8>, Fail> {
1340        let (meta, synced) = self.track_meta(clip, lineage);
1341        match format {
1342            AudioFormat::Mp3 => {
1343                let url = clip.mp3_url();
1344                let audio = self
1345                    .fetch_bytes(&url)
1346                    .await
1347                    .map_err(|err| err.attribute(&clip.id))?;
1348                let cover = self.fetch_cover(clip).await;
1349                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1350                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1351            }
1352            AudioFormat::Flac => {
1353                let wav = self.fetch_wav(client_lock, clip).await?;
1354                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1355                    if err.is_out_of_space() {
1356                        disk_fail(&clip.id, "disk full: no space left to transcode")
1357                    } else {
1358                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1359                    }
1360                })?;
1361                let cover = self.fetch_cover(clip).await;
1362                tag_flac(&flac, &meta, cover.as_deref())
1363                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1364            }
1365            AudioFormat::Wav => {
1366                let wav = self.fetch_wav(client_lock, clip).await?;
1367                let cover = self.fetch_cover(clip).await;
1368                tag_wav(&wav, &meta, cover.as_deref(), synced)
1369                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1370            }
1371        }
1372    }
1373
1374    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1375    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1376        self.synced
1377            .get(clip_id)
1378            .filter(|aligned| !aligned.is_empty())
1379    }
1380
1381    /// The track metadata for a clip, paired with its synced lyrics (if any).
1382    ///
1383    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1384    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1385    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1386    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1387    fn track_meta<'m>(
1388        &'m self,
1389        clip: &Clip,
1390        lineage: &LineageContext,
1391    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1392        let synced = self.synced_for(&clip.id);
1393        let mut meta = TrackMetadata::from_clip(clip, lineage);
1394        if let Some(aligned) = synced {
1395            meta.lyrics = aligned.plain_text();
1396        }
1397        (meta, synced)
1398    }
1399
1400    /// Resolve the rendered WAV URL and download it.
1401    async fn fetch_wav(
1402        &self,
1403        client_lock: &ClientLock<'_, C>,
1404        clip: &Clip,
1405    ) -> Result<Vec<u8>, Fail> {
1406        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1407            Some(url) => url,
1408            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1409        };
1410        self.fetch_bytes(&url)
1411            .await
1412            .map_err(|err| err.attribute(&clip.id))
1413    }
1414
1415    /// Read the WAV URL, requesting a render and polling if it is not ready.
1416    ///
1417    /// `None` means the render did not become ready within the poll budget; the
1418    /// caller treats that as a non-fatal transient failure, never a silent skip.
1419    ///
1420    /// Each client call briefly locks `client_lock`; the poll waits happen
1421    /// unlocked, so concurrent clips interleave their WAV renders rather than
1422    /// serialising behind one clip's whole poll budget.
1423    async fn resolve_wav_url(
1424        &self,
1425        client_lock: &ClientLock<'_, C>,
1426        id: &str,
1427    ) -> Result<Option<String>, Fail> {
1428        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1429            return Ok(Some(url));
1430        }
1431        self.request_wav_retrying(client_lock, id).await?;
1432        for _ in 0..self.opts.wav_poll_attempts {
1433            self.clock.sleep(self.opts.wav_poll_interval).await;
1434            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1435                return Ok(Some(url));
1436            }
1437        }
1438        Ok(None)
1439    }
1440
1441    /// Read the rendered WAV URL, retrying transient API failures with backoff
1442    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1443    async fn wav_url_retrying(
1444        &self,
1445        client_lock: &ClientLock<'_, C>,
1446        id: &str,
1447    ) -> Result<Option<String>, Fail> {
1448        let mut attempt: u32 = 0;
1449        loop {
1450            let result = {
1451                let mut client = client_lock.lock().await;
1452                client.wav_url(self.http, id).await
1453            };
1454            match result {
1455                Ok(url) => return Ok(url),
1456                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1457                    Some(fail) => return Err(fail),
1458                    None => continue,
1459                },
1460            }
1461        }
1462    }
1463
1464    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1465    async fn request_wav_retrying(
1466        &self,
1467        client_lock: &ClientLock<'_, C>,
1468        id: &str,
1469    ) -> Result<(), Fail> {
1470        let mut attempt: u32 = 0;
1471        loop {
1472            let result = {
1473                let mut client = client_lock.lock().await;
1474                client.request_wav(self.http, id).await
1475            };
1476            match result {
1477                Ok(()) => return Ok(()),
1478                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1479                    Some(fail) => return Err(fail),
1480                    None => continue,
1481                },
1482            }
1483        }
1484    }
1485
1486    /// Classify a core error from the authenticated WAV flow. On a transient
1487    /// class within budget, back off through the [`Clock`] and return `None` to
1488    /// retry; otherwise return the terminal [`Fail`].
1489    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1490        let fail = classify_core(id, err);
1491        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1492            self.clock.sleep(backoff_delay(*attempt, None)).await;
1493            *attempt += 1;
1494            None
1495        } else {
1496            Some(fail)
1497        }
1498    }
1499
1500    /// GET `url`, retrying transient failures with backoff, verifying size.
1501    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1502        let mut attempt: u32 = 0;
1503        loop {
1504            let result = self.http.send(HttpRequest::get(url)).await;
1505            match classify_response(result) {
1506                Ok(body) => return Ok(body),
1507                Err(err) => {
1508                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1509                        let delay = backoff_delay(attempt, err.retry_after);
1510                        self.clock.sleep(delay).await;
1511                        attempt += 1;
1512                        continue;
1513                    }
1514                    return Err(err);
1515                }
1516            }
1517        }
1518    }
1519
1520    /// Download cover art, trying each candidate URL in order; `None` is fine.
1521    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1522        for url in clip.cover_candidates() {
1523            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1524                && (200..=299).contains(&response.status)
1525                && !response.body.is_empty()
1526            {
1527                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1528                // bytes so its write reuses them instead of fetching again (#89).
1529                // The lock guards only the insert, never the await above.
1530                if self.cover_wanted.contains(url) {
1531                    self.cover_cache
1532                        .lock()
1533                        .expect("cover cache mutex poisoned")
1534                        .insert(url.to_owned(), response.body.clone());
1535                }
1536                return Some(response.body);
1537            }
1538        }
1539        None
1540    }
1541
1542    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1543    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1544        self.fs.write_atomic(path, bytes).map_err(|err| {
1545            if err.is_out_of_space() {
1546                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1547            } else {
1548                permanent_fail(clip_id, format!("write failed: {err}"))
1549            }
1550        })?;
1551        match self.fs.metadata(path) {
1552            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1553            Some(stat) => Err(permanent_fail(
1554                clip_id,
1555                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1556            )),
1557            None => Ok(bytes.len() as u64),
1558        }
1559    }
1560
1561    /// Build the manifest entry for a freshly written file.
1562    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1563        match self.by_id.get(clip_id) {
1564            Some(d) => manifest_entry(d, size),
1565            None => ManifestEntry {
1566                path: path.to_owned(),
1567                format,
1568                size,
1569                ..ManifestEntry::default()
1570            },
1571        }
1572    }
1573
1574    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1575    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1576        let desired = self.by_id.get(clip_id).copied();
1577        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1578            if let Some(d) = desired {
1579                entry.meta_hash = d.meta_hash.clone();
1580                entry.art_hash = d.art_hash.clone();
1581                entry.preserve = preserve_for(d);
1582            }
1583            if let Some(size) = size {
1584                entry.size = size;
1585            }
1586        }
1587    }
1588
1589    /// Refresh only an entry's preserve marker from the current desired state.
1590    ///
1591    /// A clip can gain or lose copy/private protection with no file change, which
1592    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1593    /// persisted marker a faithful image of live protection, so the cross-run
1594    /// delete guard (SYNC-8) never reads it stale.
1595    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1596        if let Some(d) = self.by_id.get(clip_id).copied()
1597            && let Some(entry) = manifest.entries.get_mut(clip_id)
1598        {
1599            entry.preserve = preserve_for(d);
1600        }
1601    }
1602}
1603
1604/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1605fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1606    ManifestEntry {
1607        path: d.path.clone(),
1608        format: d.format,
1609        meta_hash: d.meta_hash.clone(),
1610        art_hash: d.art_hash.clone(),
1611        size,
1612        preserve: preserve_for(d),
1613        ..Default::default()
1614    }
1615}
1616
1617/// Whether a written entry must be preserved across runs: held by any copy
1618/// source, or private. The reconcile delete guard reads this marker later.
1619fn preserve_for(d: &Desired) -> bool {
1620    d.private || d.modes.contains(&SourceMode::Copy)
1621}
1622
1623/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1624fn classify_response(
1625    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1626) -> Result<Vec<u8>, FetchError> {
1627    let response = match result {
1628        Ok(response) => response,
1629        Err(err) => {
1630            return Err(FetchError::transient(
1631                format!("transport error: {err}"),
1632                None,
1633            ));
1634        }
1635    };
1636    match response.status {
1637        200..=299 => {
1638            if let Some(expected) = content_length(&response) {
1639                let actual = response.body.len() as u64;
1640                if actual != expected {
1641                    return Err(FetchError::transient(
1642                        format!("truncated download: {actual} of {expected} bytes"),
1643                        None,
1644                    ));
1645                }
1646            }
1647            Ok(response.body)
1648        }
1649        401 | 403 => Err(FetchError::transient(
1650            format!("download rejected: status {}", response.status),
1651            None,
1652        )),
1653        408 => Err(FetchError::transient("request timed out", None)),
1654        429 => Err(FetchError::transient(
1655            "rate limited",
1656            retry_after(&response),
1657        )),
1658        500..=599 => Err(FetchError::transient(
1659            format!("server error {}", response.status),
1660            None,
1661        )),
1662        status => Err(FetchError::permanent(format!(
1663            "download failed: status {status}"
1664        ))),
1665    }
1666}
1667
1668/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1669fn classify_core(id: &str, err: Error) -> Fail {
1670    let reason = err.to_string();
1671    match err {
1672        Error::Auth(_) => auth_fail(id, reason),
1673        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1674        Error::Api(_)
1675        | Error::NotFound(_)
1676        | Error::Tag(_)
1677        | Error::Config(_)
1678        | Error::Refused(_) => permanent_fail(id, reason),
1679    }
1680}
1681
1682/// The provider-reported body size from `Content-Length`, if present and valid.
1683fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1684    response.header("content-length")?.trim().parse().ok()
1685}
1686
1687#[cfg(test)]
1688mod tests {
1689    use super::*;
1690    use crate::ClerkAuth;
1691    use crate::http::HttpResponse;
1692    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
1693
1694    fn clip(id: &str) -> Clip {
1695        Clip {
1696            id: id.to_owned(),
1697            title: "Song".to_owned(),
1698            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
1699            ..Default::default()
1700        }
1701    }
1702
1703    fn art_clip(id: &str) -> Clip {
1704        Clip {
1705            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
1706            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
1707            ..clip(id)
1708        }
1709    }
1710
1711    fn ext(format: AudioFormat) -> &'static str {
1712        match format {
1713            AudioFormat::Mp3 => "mp3",
1714            AudioFormat::Flac => "flac",
1715            AudioFormat::Wav => "wav",
1716        }
1717    }
1718
1719    fn desired(clip: Clip, format: AudioFormat) -> Desired {
1720        Desired {
1721            path: format!("{}.{}", clip.id, ext(format)),
1722            lineage: LineageContext::own_root(&clip),
1723            clip,
1724            format,
1725            meta_hash: "m".to_owned(),
1726            art_hash: "art".to_owned(),
1727            modes: vec![SourceMode::Mirror],
1728            trashed: false,
1729            private: false,
1730            artifacts: Vec::new(),
1731            stems: None,
1732        }
1733    }
1734
1735    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
1736        ManifestEntry {
1737            path: path.to_owned(),
1738            format,
1739            meta_hash: "old".to_owned(),
1740            art_hash: "old-art".to_owned(),
1741            size: 8,
1742            preserve: false,
1743            ..Default::default()
1744        }
1745    }
1746
1747    #[allow(clippy::too_many_arguments)]
1748    fn run<G: Ffmpeg>(
1749        plan: &Plan,
1750        manifest: &mut Manifest,
1751        desired: &[Desired],
1752        http: &ScriptedHttp,
1753        fs: &MemFs,
1754        ffmpeg: &G,
1755        clock: &RecordingClock,
1756        opts: &ExecOptions,
1757    ) -> ExecOutcome {
1758        let mut albums = BTreeMap::new();
1759        run_with_albums(
1760            plan,
1761            manifest,
1762            &mut albums,
1763            desired,
1764            http,
1765            fs,
1766            ffmpeg,
1767            clock,
1768            opts,
1769        )
1770    }
1771
1772    #[allow(clippy::too_many_arguments)]
1773    fn run_with_albums<G: Ffmpeg>(
1774        plan: &Plan,
1775        manifest: &mut Manifest,
1776        albums: &mut BTreeMap<String, AlbumArt>,
1777        desired: &[Desired],
1778        http: &ScriptedHttp,
1779        fs: &MemFs,
1780        ffmpeg: &G,
1781        clock: &RecordingClock,
1782        opts: &ExecOptions,
1783    ) -> ExecOutcome {
1784        let mut playlists = BTreeMap::new();
1785        run_full(
1786            plan,
1787            manifest,
1788            albums,
1789            &mut playlists,
1790            desired,
1791            http,
1792            fs,
1793            ffmpeg,
1794            clock,
1795            opts,
1796        )
1797    }
1798
1799    #[allow(clippy::too_many_arguments)]
1800    fn run_full<G: Ffmpeg>(
1801        plan: &Plan,
1802        manifest: &mut Manifest,
1803        albums: &mut BTreeMap<String, AlbumArt>,
1804        playlists: &mut BTreeMap<String, PlaylistState>,
1805        desired: &[Desired],
1806        http: &ScriptedHttp,
1807        fs: &MemFs,
1808        ffmpeg: &G,
1809        clock: &RecordingClock,
1810        opts: &ExecOptions,
1811    ) -> ExecOutcome {
1812        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1813        let synced = HashMap::new();
1814        pollster::block_on(execute(
1815            plan,
1816            manifest,
1817            albums,
1818            playlists,
1819            desired,
1820            &synced,
1821            Ports {
1822                client: &mut client,
1823                http,
1824                fs,
1825                ffmpeg,
1826                clock,
1827            },
1828            opts,
1829        ))
1830    }
1831
1832    fn small_poll() -> ExecOptions {
1833        ExecOptions {
1834            max_retries: 3,
1835            wav_poll_attempts: 2,
1836            wav_poll_interval: Duration::from_secs(5),
1837            concurrency: 4,
1838            cover_webp: WebpEncodeSettings::default(),
1839        }
1840    }
1841
1842    // ── Download: MP3 ───────────────────────────────────────────────
1843
1844    #[test]
1845    fn download_mp3_writes_tagged_file_and_records_manifest() {
1846        let c = art_clip("a");
1847        let d = desired(c.clone(), AudioFormat::Mp3);
1848        let plan = Plan {
1849            actions: vec![Action::Download {
1850                clip: c.clone(),
1851                lineage: LineageContext::own_root(&c),
1852                path: d.path.clone(),
1853                format: AudioFormat::Mp3,
1854            }],
1855        };
1856        let http = ScriptedHttp::new()
1857            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1858            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1859        let fs = MemFs::new();
1860        let ffmpeg = StubFfmpeg::flac();
1861        let clock = RecordingClock::new();
1862        let mut manifest = Manifest::new();
1863
1864        let outcome = run(
1865            &plan,
1866            &mut manifest,
1867            &[d],
1868            &http,
1869            &fs,
1870            &ffmpeg,
1871            &clock,
1872            &ExecOptions::default(),
1873        );
1874
1875        assert_eq!(outcome.downloaded, 1);
1876        assert_eq!(outcome.failed(), 0);
1877        assert_eq!(outcome.status, RunStatus::Completed);
1878        let written = fs.read_file("a.mp3").unwrap();
1879        assert_eq!(&written[..3], b"ID3");
1880        assert!(written.ends_with(b"mp3-body"));
1881        let entry = manifest.get("a").unwrap();
1882        assert_eq!(entry.path, "a.mp3");
1883        assert_eq!(entry.format, AudioFormat::Mp3);
1884        assert_eq!(entry.meta_hash, "m");
1885        assert_eq!(entry.art_hash, "art");
1886        assert_eq!(entry.size, written.len() as u64);
1887        assert!(!entry.preserve);
1888    }
1889
1890    #[test]
1891    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
1892        // A clip whose alignment was fetched this run gets a word-level SYLT frame
1893        // and its plain lyric text embedded (USLT), end to end through execute.
1894        let c = art_clip("a");
1895        let d = desired(c.clone(), AudioFormat::Mp3);
1896        let plan = Plan {
1897            actions: vec![Action::Download {
1898                clip: c.clone(),
1899                lineage: LineageContext::own_root(&c),
1900                path: d.path.clone(),
1901                format: AudioFormat::Mp3,
1902            }],
1903        };
1904        let http = ScriptedHttp::new()
1905            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1906            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1907        let fs = MemFs::new();
1908        let ffmpeg = StubFfmpeg::flac();
1909        let clock = RecordingClock::new();
1910        let mut manifest = Manifest::new();
1911        let mut albums = BTreeMap::new();
1912        let mut playlists = BTreeMap::new();
1913        let mut synced = HashMap::new();
1914        synced.insert(
1915            "a".to_string(),
1916            AlignedLyrics::from_json(&serde_json::json!({
1917                "aligned_words": [],
1918                "aligned_lyrics": [
1919                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
1920                     "words": [
1921                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
1922                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
1923                     ]}
1924                ]
1925            })),
1926        );
1927        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1928        let outcome = pollster::block_on(execute(
1929            &plan,
1930            &mut manifest,
1931            &mut albums,
1932            &mut playlists,
1933            &[d],
1934            &synced,
1935            Ports {
1936                client: &mut client,
1937                http: &http,
1938                fs: &fs,
1939                ffmpeg: &ffmpeg,
1940                clock: &clock,
1941            },
1942            &ExecOptions::default(),
1943        ));
1944
1945        assert_eq!(outcome.downloaded, 1);
1946        let written = fs.read_file("a.mp3").unwrap();
1947        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
1948        assert_eq!(
1949            tag.synchronised_lyrics().count(),
1950            1,
1951            "a SYLT frame is embedded"
1952        );
1953        // The plain lyric text is populated from the alignment for the USLT frame.
1954        assert_eq!(
1955            tag.lyrics().next().map(|frame| frame.text.as_str()),
1956            Some("hi there")
1957        );
1958    }
1959
1960    #[test]
1961    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
1962        // The synced map is empty when the feature is off (no alignment fetched),
1963        // so no SYLT frame and no lyric text are embedded.
1964        let c = art_clip("a");
1965        let d = desired(c.clone(), AudioFormat::Mp3);
1966        let plan = Plan {
1967            actions: vec![Action::Download {
1968                clip: c.clone(),
1969                lineage: LineageContext::own_root(&c),
1970                path: d.path.clone(),
1971                format: AudioFormat::Mp3,
1972            }],
1973        };
1974        let http = ScriptedHttp::new()
1975            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
1976            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
1977        let fs = MemFs::new();
1978        let ffmpeg = StubFfmpeg::flac();
1979        let clock = RecordingClock::new();
1980        let mut manifest = Manifest::new();
1981        let mut albums = BTreeMap::new();
1982        let mut playlists = BTreeMap::new();
1983        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
1984        let outcome = pollster::block_on(execute(
1985            &plan,
1986            &mut manifest,
1987            &mut albums,
1988            &mut playlists,
1989            &[d],
1990            &HashMap::new(),
1991            Ports {
1992                client: &mut client,
1993                http: &http,
1994                fs: &fs,
1995                ffmpeg: &ffmpeg,
1996                clock: &clock,
1997            },
1998            &ExecOptions::default(),
1999        ));
2000        assert_eq!(outcome.downloaded, 1);
2001        let written = fs.read_file("a.mp3").unwrap();
2002        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2003        assert_eq!(tag.synchronised_lyrics().count(), 0);
2004        assert_eq!(tag.lyrics().count(), 0);
2005    }
2006
2007    #[test]
2008    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
2009        let mut c = clip("a");
2010        c.audio_url = String::new();
2011        let d = desired(c.clone(), AudioFormat::Mp3);
2012        let plan = Plan {
2013            actions: vec![Action::Download {
2014                clip: c.clone(),
2015                lineage: LineageContext::own_root(&c),
2016                path: d.path.clone(),
2017                format: AudioFormat::Mp3,
2018            }],
2019        };
2020        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
2021        let fs = MemFs::new();
2022        let mut manifest = Manifest::new();
2023        let outcome = run(
2024            &plan,
2025            &mut manifest,
2026            &[d],
2027            &http,
2028            &fs,
2029            &StubFfmpeg::flac(),
2030            &RecordingClock::new(),
2031            &ExecOptions::default(),
2032        );
2033        assert_eq!(outcome.downloaded, 1);
2034        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
2035    }
2036
2037    // ── Download: FLAC render + transcode ───────────────────────────
2038
2039    #[test]
2040    fn download_flac_renders_transcodes_and_records() {
2041        let c = clip("b");
2042        let d = desired(c.clone(), AudioFormat::Flac);
2043        let plan = Plan {
2044            actions: vec![Action::Download {
2045                clip: c.clone(),
2046                lineage: LineageContext::own_root(&c),
2047                path: d.path.clone(),
2048                format: AudioFormat::Flac,
2049            }],
2050        };
2051        let http = ScriptedHttp::new()
2052            .with_auth()
2053            .route(
2054                "/wav_file/",
2055                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
2056            )
2057            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
2058        let fs = MemFs::new();
2059        let clock = RecordingClock::new();
2060        let mut manifest = Manifest::new();
2061
2062        let outcome = run(
2063            &plan,
2064            &mut manifest,
2065            &[d],
2066            &http,
2067            &fs,
2068            &StubFfmpeg::flac(),
2069            &clock,
2070            &ExecOptions::default(),
2071        );
2072
2073        assert_eq!(outcome.downloaded, 1);
2074        assert_eq!(outcome.failed(), 0);
2075        let written = fs.read_file("b.flac").unwrap();
2076        assert_eq!(&written[..4], b"fLaC");
2077        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
2078        // The URL was ready immediately, so no render request and no polling.
2079        assert_eq!(http.count("/convert_wav/"), 0);
2080        assert!(clock.sleeps().is_empty());
2081    }
2082
2083    #[test]
2084    fn download_flac_requests_render_then_polls_until_ready() {
2085        let c = clip("c");
2086        let d = desired(c.clone(), AudioFormat::Flac);
2087        let plan = Plan {
2088            actions: vec![Action::Download {
2089                clip: c.clone(),
2090                lineage: LineageContext::own_root(&c),
2091                path: d.path.clone(),
2092                format: AudioFormat::Flac,
2093            }],
2094        };
2095        let http = ScriptedHttp::new()
2096            .with_auth()
2097            .route_seq(
2098                "/wav_file/",
2099                vec![
2100                    Reply::json("{}"),
2101                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2102                ],
2103            )
2104            .route("/convert_wav/", Reply::status(200))
2105            .route("c.wav", Reply::ok(b"wav".to_vec()));
2106        let clock = RecordingClock::new();
2107        let mut manifest = Manifest::new();
2108
2109        let outcome = run(
2110            &plan,
2111            &mut manifest,
2112            &[d],
2113            &http,
2114            &fs_new(),
2115            &StubFfmpeg::flac(),
2116            &clock,
2117            &small_poll(),
2118        );
2119
2120        assert_eq!(outcome.downloaded, 1);
2121        assert_eq!(http.count("/convert_wav/"), 1);
2122        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2123    }
2124
2125    #[test]
2126    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2127        let c = clip("d");
2128        let d = desired(c.clone(), AudioFormat::Flac);
2129        let plan = Plan {
2130            actions: vec![Action::Download {
2131                clip: c.clone(),
2132                lineage: LineageContext::own_root(&c),
2133                path: d.path.clone(),
2134                format: AudioFormat::Flac,
2135            }],
2136        };
2137        let http = ScriptedHttp::new()
2138            .with_auth()
2139            .route("/wav_file/", Reply::json("{}"))
2140            .route("/convert_wav/", Reply::status(200));
2141        let fs = MemFs::new();
2142        let clock = RecordingClock::new();
2143        let mut manifest = Manifest::new();
2144
2145        let outcome = run(
2146            &plan,
2147            &mut manifest,
2148            &[d],
2149            &http,
2150            &fs,
2151            &StubFfmpeg::flac(),
2152            &clock,
2153            &small_poll(),
2154        );
2155
2156        assert_eq!(outcome.downloaded, 0);
2157        assert_eq!(outcome.failed(), 1);
2158        assert_eq!(outcome.failures[0].clip_id, "d");
2159        assert_eq!(outcome.status, RunStatus::Completed);
2160        assert!(!fs.exists("d.flac"));
2161        assert_eq!(clock.sleeps().len(), 2);
2162    }
2163
2164    #[test]
2165    fn flac_transcode_failure_is_recorded_and_skipped() {
2166        let c = clip("t");
2167        let d = desired(c.clone(), AudioFormat::Flac);
2168        let plan = Plan {
2169            actions: vec![Action::Download {
2170                clip: c.clone(),
2171                lineage: LineageContext::own_root(&c),
2172                path: d.path.clone(),
2173                format: AudioFormat::Flac,
2174            }],
2175        };
2176        let http = ScriptedHttp::new()
2177            .with_auth()
2178            .route(
2179                "/wav_file/",
2180                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2181            )
2182            .route("t.wav", Reply::ok(b"wav".to_vec()));
2183        let fs = MemFs::new();
2184        let mut manifest = Manifest::new();
2185
2186        let outcome = run(
2187            &plan,
2188            &mut manifest,
2189            &[d],
2190            &http,
2191            &fs,
2192            &StubFfmpeg::failing(),
2193            &RecordingClock::new(),
2194            &ExecOptions::default(),
2195        );
2196
2197        assert_eq!(outcome.downloaded, 0);
2198        assert_eq!(outcome.failed(), 1);
2199        assert!(!fs.exists("t.flac"));
2200        assert!(manifest.get("t").is_none());
2201    }
2202
2203    // ── Cover fallback ──────────────────────────────────────────────
2204
2205    #[test]
2206    fn cover_falls_back_when_large_image_is_missing() {
2207        let c = art_clip("e");
2208        let d = desired(c.clone(), AudioFormat::Mp3);
2209        let plan = Plan {
2210            actions: vec![Action::Download {
2211                clip: c.clone(),
2212                lineage: LineageContext::own_root(&c),
2213                path: d.path.clone(),
2214                format: AudioFormat::Mp3,
2215            }],
2216        };
2217        let http = ScriptedHttp::new()
2218            .route("e.mp3", Reply::ok(b"body".to_vec()))
2219            .route("e/large.jpg", Reply::status(404))
2220            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2221        let fs = MemFs::new();
2222        let mut manifest = Manifest::new();
2223
2224        let outcome = run(
2225            &plan,
2226            &mut manifest,
2227            &[d],
2228            &http,
2229            &fs,
2230            &StubFfmpeg::flac(),
2231            &RecordingClock::new(),
2232            &ExecOptions::default(),
2233        );
2234
2235        assert_eq!(outcome.downloaded, 1);
2236        let calls = http.calls();
2237        let large = calls
2238            .iter()
2239            .position(|u| u.contains("e/large.jpg"))
2240            .unwrap();
2241        let small = calls
2242            .iter()
2243            .position(|u| u.contains("e/small.jpg"))
2244            .unwrap();
2245        assert!(large < small, "large art tried before small");
2246    }
2247
2248    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2249
2250    #[test]
2251    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2252        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2253        // fetched once and the bytes serve both.
2254        let c = art_clip("a");
2255        let d = desired(c.clone(), AudioFormat::Mp3);
2256        let plan = Plan {
2257            actions: vec![
2258                Action::Download {
2259                    clip: c.clone(),
2260                    lineage: LineageContext::own_root(&c),
2261                    path: d.path.clone(),
2262                    format: AudioFormat::Mp3,
2263                },
2264                Action::WriteArtifact {
2265                    kind: ArtifactKind::CoverJpg,
2266                    path: "a/cover.jpg".to_owned(),
2267                    source_url: c.selected_image_url().unwrap().to_owned(),
2268                    hash: "art".to_owned(),
2269                    owner_id: "a".to_owned(),
2270                    content: None,
2271                },
2272            ],
2273        };
2274        let http = ScriptedHttp::new()
2275            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2276            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2277        let fs = MemFs::new();
2278        let mut manifest = Manifest::new();
2279
2280        let outcome = run(
2281            &plan,
2282            &mut manifest,
2283            &[d],
2284            &http,
2285            &fs,
2286            &StubFfmpeg::flac(),
2287            &RecordingClock::new(),
2288            &ExecOptions::default(),
2289        );
2290
2291        assert_eq!(outcome.downloaded, 1);
2292        assert_eq!(outcome.artifacts_written, 1);
2293        assert_eq!(outcome.failed(), 0);
2294        // Fetched once, not twice.
2295        assert_eq!(http.count("a/large.jpg"), 1);
2296        // The sidecar carries the fetched bytes, and the audio was tagged.
2297        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2298        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2299    }
2300
2301    #[test]
2302    fn concurrent_downloads_reuse_each_clips_own_cover() {
2303        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2304        // (no cross-contamination) and each cover URL is fetched exactly once.
2305        let a = art_clip("a");
2306        let b = art_clip("b");
2307        let da = desired(a.clone(), AudioFormat::Mp3);
2308        let db = desired(b.clone(), AudioFormat::Mp3);
2309        let plan = Plan {
2310            actions: vec![
2311                Action::Download {
2312                    clip: a.clone(),
2313                    lineage: LineageContext::own_root(&a),
2314                    path: da.path.clone(),
2315                    format: AudioFormat::Mp3,
2316                },
2317                Action::WriteArtifact {
2318                    kind: ArtifactKind::CoverJpg,
2319                    path: "a/cover.jpg".to_owned(),
2320                    source_url: a.selected_image_url().unwrap().to_owned(),
2321                    hash: "art".to_owned(),
2322                    owner_id: "a".to_owned(),
2323                    content: None,
2324                },
2325                Action::Download {
2326                    clip: b.clone(),
2327                    lineage: LineageContext::own_root(&b),
2328                    path: db.path.clone(),
2329                    format: AudioFormat::Mp3,
2330                },
2331                Action::WriteArtifact {
2332                    kind: ArtifactKind::CoverJpg,
2333                    path: "b/cover.jpg".to_owned(),
2334                    source_url: b.selected_image_url().unwrap().to_owned(),
2335                    hash: "art".to_owned(),
2336                    owner_id: "b".to_owned(),
2337                    content: None,
2338                },
2339            ],
2340        };
2341        let http = ScriptedHttp::new()
2342            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2343            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2344            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2345            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2346        let fs = MemFs::new();
2347        let mut manifest = Manifest::new();
2348
2349        let outcome = run(
2350            &plan,
2351            &mut manifest,
2352            &[da, db],
2353            &http,
2354            &fs,
2355            &StubFfmpeg::flac(),
2356            &RecordingClock::new(),
2357            &small_poll(),
2358        );
2359
2360        assert_eq!(outcome.downloaded, 2);
2361        assert_eq!(outcome.artifacts_written, 2);
2362        assert_eq!(http.count("a/large.jpg"), 1);
2363        assert_eq!(http.count("b/large.jpg"), 1);
2364        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2365        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2366    }
2367
2368    #[test]
2369    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2370        // The large image 404s so the embed falls back to the small image; the
2371        // sidecar still wants the (dead) large URL and must NOT be handed the
2372        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2373        // the sidecar fetches the large URL itself (then fails on the 404).
2374        let c = art_clip("e");
2375        let d = desired(c.clone(), AudioFormat::Mp3);
2376        let plan = Plan {
2377            actions: vec![
2378                Action::Download {
2379                    clip: c.clone(),
2380                    lineage: LineageContext::own_root(&c),
2381                    path: d.path.clone(),
2382                    format: AudioFormat::Mp3,
2383                },
2384                Action::WriteArtifact {
2385                    kind: ArtifactKind::CoverJpg,
2386                    path: "e/cover.jpg".to_owned(),
2387                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2388                    hash: "art".to_owned(),
2389                    owner_id: "e".to_owned(),
2390                    content: None,
2391                },
2392            ],
2393        };
2394        let http = ScriptedHttp::new()
2395            .route("e.mp3", Reply::ok(b"body".to_vec()))
2396            .route("e/large.jpg", Reply::status(404))
2397            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2398        let fs = MemFs::new();
2399        let mut manifest = Manifest::new();
2400
2401        let outcome = run(
2402            &plan,
2403            &mut manifest,
2404            &[d],
2405            &http,
2406            &fs,
2407            &StubFfmpeg::flac(),
2408            &RecordingClock::new(),
2409            &ExecOptions::default(),
2410        );
2411
2412        assert_eq!(outcome.downloaded, 1);
2413        // The small image was fetched once (the embed fallback) and never reused
2414        // for the large-keyed sidecar; the sidecar went to the network itself.
2415        assert_eq!(http.count("e/small.jpg"), 1);
2416        assert!(
2417            http.count("e/large.jpg") >= 2,
2418            "sidecar refetched the large URL"
2419        );
2420        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2421        assert!(!fs.exists("e/cover.jpg"));
2422    }
2423
2424    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2425
2426    #[test]
2427    fn failed_write_leaves_the_prior_file_intact() {
2428        let c = clip("f");
2429        let d = desired(c.clone(), AudioFormat::Mp3);
2430        let plan = Plan {
2431            actions: vec![Action::Download {
2432                clip: c.clone(),
2433                lineage: LineageContext::own_root(&c),
2434                path: d.path.clone(),
2435                format: AudioFormat::Mp3,
2436            }],
2437        };
2438        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2439        let fs = MemFs::new()
2440            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2441            .fail_write("f.mp3");
2442        let mut manifest = Manifest::new();
2443
2444        let outcome = run(
2445            &plan,
2446            &mut manifest,
2447            &[d],
2448            &http,
2449            &fs,
2450            &StubFfmpeg::flac(),
2451            &RecordingClock::new(),
2452            &ExecOptions::default(),
2453        );
2454
2455        assert_eq!(outcome.downloaded, 0);
2456        assert_eq!(outcome.failed(), 1);
2457        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2458        assert!(manifest.get("f").is_none());
2459    }
2460
2461    #[test]
2462    fn size_mismatch_after_write_is_a_failure() {
2463        let c = clip("g");
2464        let d = desired(c.clone(), AudioFormat::Mp3);
2465        let plan = Plan {
2466            actions: vec![Action::Download {
2467                clip: c.clone(),
2468                lineage: LineageContext::own_root(&c),
2469                path: d.path.clone(),
2470                format: AudioFormat::Mp3,
2471            }],
2472        };
2473        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2474        let fs = MemFs::new().corrupt_write("g.mp3");
2475        let mut manifest = Manifest::new();
2476
2477        let outcome = run(
2478            &plan,
2479            &mut manifest,
2480            &[d],
2481            &http,
2482            &fs,
2483            &StubFfmpeg::flac(),
2484            &RecordingClock::new(),
2485            &ExecOptions::default(),
2486        );
2487
2488        assert_eq!(outcome.downloaded, 0);
2489        assert_eq!(outcome.failed(), 1);
2490        assert!(outcome.failures[0].reason.contains("expected"));
2491        assert!(manifest.get("g").is_none());
2492    }
2493
2494    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2495
2496    #[test]
2497    fn transient_failure_is_retried_then_skipped() {
2498        let c = clip("h");
2499        let d = desired(c.clone(), AudioFormat::Mp3);
2500        let plan = Plan {
2501            actions: vec![Action::Download {
2502                clip: c.clone(),
2503                lineage: LineageContext::own_root(&c),
2504                path: d.path.clone(),
2505                format: AudioFormat::Mp3,
2506            }],
2507        };
2508        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2509        let fs = MemFs::new();
2510        let clock = RecordingClock::new();
2511        let opts = ExecOptions {
2512            max_retries: 2,
2513            ..ExecOptions::default()
2514        };
2515        let mut manifest = Manifest::new();
2516
2517        let outcome = run(
2518            &plan,
2519            &mut manifest,
2520            &[d],
2521            &http,
2522            &fs,
2523            &StubFfmpeg::flac(),
2524            &clock,
2525            &opts,
2526        );
2527
2528        assert_eq!(outcome.downloaded, 0);
2529        assert_eq!(outcome.failed(), 1);
2530        assert_eq!(http.count("h.mp3"), 3);
2531        assert_eq!(clock.sleeps().len(), 2);
2532    }
2533
2534    #[test]
2535    fn truncated_download_is_retried_then_succeeds() {
2536        let c = clip("i");
2537        let d = desired(c.clone(), AudioFormat::Mp3);
2538        let plan = Plan {
2539            actions: vec![Action::Download {
2540                clip: c.clone(),
2541                lineage: LineageContext::own_root(&c),
2542                path: d.path.clone(),
2543                format: AudioFormat::Mp3,
2544            }],
2545        };
2546        let http = ScriptedHttp::new().route_seq(
2547            "i.mp3",
2548            vec![
2549                Reply::ok(b"short".to_vec()).with_content_length(999),
2550                Reply::ok(b"good-body".to_vec()),
2551            ],
2552        );
2553        let fs = MemFs::new();
2554        let clock = RecordingClock::new();
2555        let mut manifest = Manifest::new();
2556
2557        let outcome = run(
2558            &plan,
2559            &mut manifest,
2560            &[d],
2561            &http,
2562            &fs,
2563            &StubFfmpeg::flac(),
2564            &clock,
2565            &ExecOptions::default(),
2566        );
2567
2568        assert_eq!(outcome.downloaded, 1);
2569        assert_eq!(http.count("i.mp3"), 2);
2570        assert_eq!(clock.sleeps().len(), 1);
2571    }
2572
2573    #[test]
2574    fn rate_limit_backs_off_using_retry_after() {
2575        let c = clip("j");
2576        let d = desired(c.clone(), AudioFormat::Mp3);
2577        let plan = Plan {
2578            actions: vec![Action::Download {
2579                clip: c.clone(),
2580                lineage: LineageContext::own_root(&c),
2581                path: d.path.clone(),
2582                format: AudioFormat::Mp3,
2583            }],
2584        };
2585        let http = ScriptedHttp::new().route_seq(
2586            "j.mp3",
2587            vec![
2588                Reply::status(429).with_retry_after(7),
2589                Reply::ok(b"body".to_vec()),
2590            ],
2591        );
2592        let fs = MemFs::new();
2593        let clock = RecordingClock::new();
2594        let mut manifest = Manifest::new();
2595
2596        let outcome = run(
2597            &plan,
2598            &mut manifest,
2599            &[d],
2600            &http,
2601            &fs,
2602            &StubFfmpeg::flac(),
2603            &clock,
2604            &ExecOptions::default(),
2605        );
2606
2607        assert_eq!(outcome.downloaded, 1);
2608        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2609    }
2610
2611    #[test]
2612    fn auth_failure_aborts_the_run() {
2613        let c1 = clip("k1");
2614        let c2 = clip("k2");
2615        let d1 = desired(c1.clone(), AudioFormat::Flac);
2616        let d2 = desired(c2.clone(), AudioFormat::Flac);
2617        let plan = Plan {
2618            actions: vec![
2619                Action::Download {
2620                    clip: c1.clone(),
2621                    lineage: LineageContext::own_root(&c1),
2622                    path: d1.path.clone(),
2623                    format: AudioFormat::Flac,
2624                },
2625                Action::Download {
2626                    clip: c2.clone(),
2627                    lineage: LineageContext::own_root(&c2),
2628                    path: d2.path.clone(),
2629                    format: AudioFormat::Flac,
2630                },
2631            ],
2632        };
2633        // The authenticated WAV-render endpoint rejects auth even after a JWT
2634        // refresh: that is a bad token, so the whole run aborts rather than
2635        // hammering every clip. A CDN media rejection, by contrast, does not.
2636        let http = ScriptedHttp::new()
2637            .with_auth()
2638            .route("/wav_file/", Reply::status(401));
2639        let fs = MemFs::new();
2640        let mut manifest = Manifest::new();
2641
2642        let outcome = run(
2643            &plan,
2644            &mut manifest,
2645            &[d1, d2],
2646            &http,
2647            &fs,
2648            &StubFfmpeg::flac(),
2649            &RecordingClock::new(),
2650            &small_poll(),
2651        );
2652
2653        assert_eq!(outcome.status, RunStatus::AuthAborted);
2654        assert_eq!(outcome.failed(), 1);
2655        assert_eq!(outcome.failures[0].clip_id, "k1");
2656        assert_eq!(outcome.downloaded, 0);
2657    }
2658
2659    // ── Disk-full aborts the run (issue #17) ────────────────────────
2660
2661    #[test]
2662    fn disk_full_primary_write_aborts_the_run() {
2663        // Two MP3 downloads; the first write is out of space. That is systemic,
2664        // so the run aborts before the second is even attempted: exactly one
2665        // failure is recorded and its reason names the disk-full cause.
2666        let c1 = clip("d1");
2667        let c2 = clip("d2");
2668        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2669        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2670        let plan = Plan {
2671            actions: vec![
2672                Action::Download {
2673                    clip: c1.clone(),
2674                    lineage: LineageContext::own_root(&c1),
2675                    path: d1.path.clone(),
2676                    format: AudioFormat::Mp3,
2677                },
2678                Action::Download {
2679                    clip: c2.clone(),
2680                    lineage: LineageContext::own_root(&c2),
2681                    path: d2.path.clone(),
2682                    format: AudioFormat::Mp3,
2683                },
2684            ],
2685        };
2686        let http = ScriptedHttp::new()
2687            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2688            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2689        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
2690        let mut manifest = Manifest::new();
2691
2692        let outcome = run(
2693            &plan,
2694            &mut manifest,
2695            &[d1, d2],
2696            &http,
2697            &fs,
2698            &StubFfmpeg::flac(),
2699            &RecordingClock::new(),
2700            &ExecOptions::default(),
2701        );
2702
2703        assert_eq!(outcome.status, RunStatus::DiskFull);
2704        assert_eq!(outcome.failed(), 1);
2705        assert_eq!(outcome.failures[0].clip_id, "d1");
2706        assert!(outcome.failures[0].reason.contains("disk full"));
2707        assert_eq!(outcome.downloaded, 0);
2708        // The second clip was never fetched: the run aborted first.
2709        assert_eq!(http.count("d2.mp3"), 0);
2710        assert!(!fs.exists("d2.mp3"));
2711    }
2712
2713    #[test]
2714    fn disk_full_flac_transcode_aborts_the_run() {
2715        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
2716        // there is nowhere to stage the transcode, so the run aborts.
2717        let c1 = clip("d1");
2718        let c2 = clip("d2");
2719        let d1 = desired(c1.clone(), AudioFormat::Flac);
2720        let d2 = desired(c2.clone(), AudioFormat::Flac);
2721        let plan = Plan {
2722            actions: vec![
2723                Action::Download {
2724                    clip: c1.clone(),
2725                    lineage: LineageContext::own_root(&c1),
2726                    path: d1.path.clone(),
2727                    format: AudioFormat::Flac,
2728                },
2729                Action::Download {
2730                    clip: c2.clone(),
2731                    lineage: LineageContext::own_root(&c2),
2732                    path: d2.path.clone(),
2733                    format: AudioFormat::Flac,
2734                },
2735            ],
2736        };
2737        let http = ScriptedHttp::new()
2738            .with_auth()
2739            .route(
2740                "/wav_file/",
2741                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
2742            )
2743            .route(".wav", Reply::ok(b"wav".to_vec()));
2744        let fs = MemFs::new();
2745        let mut manifest = Manifest::new();
2746
2747        let outcome = run(
2748            &plan,
2749            &mut manifest,
2750            &[d1, d2],
2751            &http,
2752            &fs,
2753            &StubFfmpeg::out_of_space(),
2754            &RecordingClock::new(),
2755            &ExecOptions::default(),
2756        );
2757
2758        assert_eq!(outcome.status, RunStatus::DiskFull);
2759        assert_eq!(outcome.failed(), 1);
2760        assert_eq!(outcome.failures[0].clip_id, "d1");
2761        assert!(outcome.failures[0].reason.contains("disk full"));
2762        assert_eq!(outcome.downloaded, 0);
2763    }
2764
2765    #[test]
2766    fn disk_full_artifact_write_aborts_the_run() {
2767        // A sidecar write (not a primary download) also aborts on a full disk:
2768        // the owning audio is present, the cover fetch succeeds, but the sidecar
2769        // cannot be written.
2770        let mut manifest = Manifest::new();
2771        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
2772        let plan = Plan {
2773            actions: vec![Action::WriteArtifact {
2774                kind: ArtifactKind::CoverJpg,
2775                path: "a/cover.jpg".to_owned(),
2776                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
2777                hash: "h1".to_owned(),
2778                owner_id: "a".to_owned(),
2779                content: None,
2780            }],
2781        };
2782        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
2783        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
2784
2785        let outcome = run(
2786            &plan,
2787            &mut manifest,
2788            &[],
2789            &http,
2790            &fs,
2791            &StubFfmpeg::flac(),
2792            &RecordingClock::new(),
2793            &ExecOptions::default(),
2794        );
2795
2796        assert_eq!(outcome.status, RunStatus::DiskFull);
2797        assert_eq!(outcome.failed(), 1);
2798        assert!(outcome.failures[0].reason.contains("disk full"));
2799        assert_eq!(outcome.artifacts_written, 0);
2800        // The sidecar slot was never recorded: the write failed before it.
2801        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
2802    }
2803
2804    #[test]
2805    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
2806        // write_verify fails before any manifest insert, so a re-download that
2807        // hits a full disk leaves the prior entry (and file) exactly as it was.
2808        let c = clip("m");
2809        let d = desired(c.clone(), AudioFormat::Mp3);
2810        let plan = Plan {
2811            actions: vec![Action::Download {
2812                clip: c.clone(),
2813                lineage: LineageContext::own_root(&c),
2814                path: d.path.clone(),
2815                format: AudioFormat::Mp3,
2816            }],
2817        };
2818        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
2819        let fs = MemFs::new()
2820            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
2821            .fail_write_out_of_space("m.mp3");
2822        let mut manifest = Manifest::new();
2823        let before = entry("m.mp3", AudioFormat::Mp3);
2824        manifest.insert("m", before.clone());
2825
2826        let outcome = run(
2827            &plan,
2828            &mut manifest,
2829            &[d],
2830            &http,
2831            &fs,
2832            &StubFfmpeg::flac(),
2833            &RecordingClock::new(),
2834            &ExecOptions::default(),
2835        );
2836
2837        assert_eq!(outcome.status, RunStatus::DiskFull);
2838        assert_eq!(manifest.get("m"), Some(&before));
2839        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
2840    }
2841
2842    #[test]
2843    fn cdn_download_rejection_skips_the_clip_without_aborting() {
2844        let c1 = clip("k1");
2845        let c2 = clip("k2");
2846        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2847        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2848        let plan = Plan {
2849            actions: vec![
2850                Action::Download {
2851                    clip: c1.clone(),
2852                    lineage: LineageContext::own_root(&c1),
2853                    path: d1.path.clone(),
2854                    format: AudioFormat::Mp3,
2855                },
2856                Action::Download {
2857                    clip: c2.clone(),
2858                    lineage: LineageContext::own_root(&c2),
2859                    path: d2.path.clone(),
2860                    format: AudioFormat::Mp3,
2861                },
2862            ],
2863        };
2864        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
2865        // rejection (often transient), not a bad token: the clip is retried
2866        // then recorded and skipped, and the run carries on to the rest.
2867        let http = ScriptedHttp::new()
2868            .route("k1.mp3", Reply::status(403))
2869            .route("k2.mp3", Reply::ok(b"body".to_vec()));
2870        let fs = MemFs::new();
2871        let mut manifest = Manifest::new();
2872
2873        let outcome = run(
2874            &plan,
2875            &mut manifest,
2876            &[d1, d2],
2877            &http,
2878            &fs,
2879            &StubFfmpeg::flac(),
2880            &RecordingClock::new(),
2881            &ExecOptions::default(),
2882        );
2883
2884        assert_ne!(outcome.status, RunStatus::AuthAborted);
2885        assert_eq!(outcome.downloaded, 1);
2886        assert_eq!(outcome.failed(), 1);
2887        assert_eq!(outcome.failures[0].clip_id, "k1");
2888    }
2889
2890    #[test]
2891    fn one_clip_failure_does_not_abort_the_run() {
2892        let c1 = clip("l1");
2893        let c2 = clip("l2");
2894        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2895        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2896        let plan = Plan {
2897            actions: vec![
2898                Action::Download {
2899                    clip: c1.clone(),
2900                    lineage: LineageContext::own_root(&c1),
2901                    path: d1.path.clone(),
2902                    format: AudioFormat::Mp3,
2903                },
2904                Action::Download {
2905                    clip: c2.clone(),
2906                    lineage: LineageContext::own_root(&c2),
2907                    path: d2.path.clone(),
2908                    format: AudioFormat::Mp3,
2909                },
2910            ],
2911        };
2912        let http = ScriptedHttp::new()
2913            .route("l1.mp3", Reply::status(404))
2914            .route("l2.mp3", Reply::ok(b"body".to_vec()));
2915        let fs = MemFs::new();
2916        let mut manifest = Manifest::new();
2917
2918        let outcome = run(
2919            &plan,
2920            &mut manifest,
2921            &[d1, d2],
2922            &http,
2923            &fs,
2924            &StubFfmpeg::flac(),
2925            &RecordingClock::new(),
2926            &ExecOptions::default(),
2927        );
2928
2929        assert_eq!(outcome.status, RunStatus::Completed);
2930        assert_eq!(outcome.downloaded, 1);
2931        assert_eq!(outcome.failed(), 1);
2932        assert_eq!(outcome.failures[0].clip_id, "l1");
2933        assert!(fs.exists("l2.mp3"));
2934        assert!(manifest.get("l2").is_some());
2935        assert!(manifest.get("l1").is_none());
2936    }
2937
2938    // ── preserve marker (SYNC-8) ────────────────────────────────────
2939
2940    #[test]
2941    fn preserve_is_set_for_copy_held_and_private_clips() {
2942        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
2943        mirror.modes = vec![SourceMode::Mirror];
2944        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
2945        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
2946        let mut private = desired(clip("m3"), AudioFormat::Mp3);
2947        private.private = true;
2948
2949        let plan = Plan {
2950            actions: vec![
2951                Action::Download {
2952                    clip: mirror.clip.clone(),
2953                    lineage: LineageContext::own_root(&mirror.clip),
2954                    path: mirror.path.clone(),
2955                    format: AudioFormat::Mp3,
2956                },
2957                Action::Download {
2958                    clip: copy_held.clip.clone(),
2959                    lineage: LineageContext::own_root(&copy_held.clip),
2960                    path: copy_held.path.clone(),
2961                    format: AudioFormat::Mp3,
2962                },
2963                Action::Download {
2964                    clip: private.clip.clone(),
2965                    lineage: LineageContext::own_root(&private.clip),
2966                    path: private.path.clone(),
2967                    format: AudioFormat::Mp3,
2968                },
2969            ],
2970        };
2971        let http = ScriptedHttp::new()
2972            .route("m1.mp3", Reply::ok(b"a".to_vec()))
2973            .route("m2.mp3", Reply::ok(b"b".to_vec()))
2974            .route("m3.mp3", Reply::ok(b"c".to_vec()));
2975        let fs = MemFs::new();
2976        let mut manifest = Manifest::new();
2977
2978        let outcome = run(
2979            &plan,
2980            &mut manifest,
2981            &[mirror, copy_held, private],
2982            &http,
2983            &fs,
2984            &StubFfmpeg::flac(),
2985            &RecordingClock::new(),
2986            &ExecOptions::default(),
2987        );
2988
2989        assert_eq!(outcome.downloaded, 3);
2990        assert!(!manifest.get("m1").unwrap().preserve);
2991        assert!(manifest.get("m2").unwrap().preserve);
2992        assert!(manifest.get("m3").unwrap().preserve);
2993    }
2994
2995    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
2996
2997    #[test]
2998    fn reformat_writes_new_format_and_removes_old_file() {
2999        let c = clip("n");
3000        let d = desired(c.clone(), AudioFormat::Mp3);
3001        let plan = Plan {
3002            actions: vec![Action::Reformat {
3003                clip: c.clone(),
3004                path: "n.mp3".to_owned(),
3005                from_path: "n.flac".to_owned(),
3006                from: AudioFormat::Flac,
3007                to: AudioFormat::Mp3,
3008            }],
3009        };
3010        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
3011        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
3012        let mut manifest = Manifest::new();
3013        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
3014
3015        let outcome = run(
3016            &plan,
3017            &mut manifest,
3018            &[d],
3019            &http,
3020            &fs,
3021            &StubFfmpeg::flac(),
3022            &RecordingClock::new(),
3023            &ExecOptions::default(),
3024        );
3025
3026        assert_eq!(outcome.reformatted, 1);
3027        assert!(fs.exists("n.mp3"));
3028        assert!(!fs.exists("n.flac"));
3029        let updated = manifest.get("n").unwrap();
3030        assert_eq!(updated.path, "n.mp3");
3031        assert_eq!(updated.format, AudioFormat::Mp3);
3032        assert_eq!(updated.meta_hash, "m");
3033    }
3034
3035    #[test]
3036    fn retag_rewrites_file_and_updates_hashes() {
3037        let c = clip("o");
3038        let mut d = desired(c.clone(), AudioFormat::Mp3);
3039        d.meta_hash = "new".to_owned();
3040        d.art_hash = "new-art".to_owned();
3041        let existing = tag_mp3(
3042            b"audio",
3043            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
3044            None,
3045            None,
3046        )
3047        .unwrap();
3048        let fs = MemFs::new().with_file("o.mp3", existing.clone());
3049        let mut manifest = Manifest::new();
3050        let mut start = entry("o.mp3", AudioFormat::Mp3);
3051        start.size = existing.len() as u64;
3052        manifest.insert("o", start);
3053        let plan = Plan {
3054            actions: vec![Action::Retag {
3055                clip: c.clone(),
3056                lineage: LineageContext::own_root(&c),
3057                path: "o.mp3".to_owned(),
3058            }],
3059        };
3060
3061        let outcome = run(
3062            &plan,
3063            &mut manifest,
3064            &[d],
3065            &ScriptedHttp::new(),
3066            &fs,
3067            &StubFfmpeg::flac(),
3068            &RecordingClock::new(),
3069            &ExecOptions::default(),
3070        );
3071
3072        assert_eq!(outcome.retagged, 1);
3073        let updated = manifest.get("o").unwrap();
3074        assert_eq!(updated.meta_hash, "new");
3075        assert_eq!(updated.art_hash, "new-art");
3076        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
3077    }
3078
3079    #[test]
3080    fn rename_moves_file_and_updates_manifest_path() {
3081        let c = clip("p");
3082        let mut d = desired(c.clone(), AudioFormat::Mp3);
3083        d.path = "new/p.mp3".to_owned();
3084        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3085        let mut manifest = Manifest::new();
3086        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3087        let plan = Plan {
3088            actions: vec![Action::Rename {
3089                from: "old/p.mp3".to_owned(),
3090                to: "new/p.mp3".to_owned(),
3091            }],
3092        };
3093
3094        let outcome = run(
3095            &plan,
3096            &mut manifest,
3097            &[d],
3098            &ScriptedHttp::new(),
3099            &fs,
3100            &StubFfmpeg::flac(),
3101            &RecordingClock::new(),
3102            &ExecOptions::default(),
3103        );
3104
3105        assert_eq!(outcome.renamed, 1);
3106        assert!(fs.exists("new/p.mp3"));
3107        assert!(!fs.exists("old/p.mp3"));
3108        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3109    }
3110
3111    #[test]
3112    fn disk_full_rename_aborts_the_run() {
3113        // A move onto a full disk is systemic like a full-disk write: the run
3114        // aborts with DiskFull and the source file is left untouched.
3115        let c = clip("p");
3116        let mut d = desired(c.clone(), AudioFormat::Mp3);
3117        d.path = "new/p.mp3".to_owned();
3118        let fs = MemFs::new()
3119            .with_file("old/p.mp3", b"DATA".to_vec())
3120            .fail_rename_out_of_space("new/p.mp3");
3121        let mut manifest = Manifest::new();
3122        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3123        let plan = Plan {
3124            actions: vec![Action::Rename {
3125                from: "old/p.mp3".to_owned(),
3126                to: "new/p.mp3".to_owned(),
3127            }],
3128        };
3129
3130        let outcome = run(
3131            &plan,
3132            &mut manifest,
3133            &[d],
3134            &ScriptedHttp::new(),
3135            &fs,
3136            &StubFfmpeg::flac(),
3137            &RecordingClock::new(),
3138            &ExecOptions::default(),
3139        );
3140
3141        assert_eq!(outcome.status, RunStatus::DiskFull);
3142        assert_eq!(outcome.renamed, 0);
3143        assert_eq!(outcome.failed(), 1);
3144        assert!(outcome.failures[0].reason.contains("disk full"));
3145        // The source is untouched: the move never happened.
3146        assert!(fs.exists("old/p.mp3"));
3147        assert!(!fs.exists("new/p.mp3"));
3148        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3149    }
3150
3151    #[test]
3152    fn delete_removes_file_and_manifest_entry() {
3153        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3154        let mut manifest = Manifest::new();
3155        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3156        let plan = Plan {
3157            actions: vec![Action::Delete {
3158                path: "q.mp3".to_owned(),
3159                clip_id: "q".to_owned(),
3160            }],
3161        };
3162
3163        let outcome = run(
3164            &plan,
3165            &mut manifest,
3166            &[],
3167            &ScriptedHttp::new(),
3168            &fs,
3169            &StubFfmpeg::flac(),
3170            &RecordingClock::new(),
3171            &ExecOptions::default(),
3172        );
3173
3174        assert_eq!(outcome.deleted, 1);
3175        assert!(!fs.exists("q.mp3"));
3176        assert!(manifest.get("q").is_none());
3177    }
3178
3179    #[test]
3180    fn failed_delete_keeps_the_manifest_entry() {
3181        let fs = MemFs::new()
3182            .with_file("s.mp3", b"DATA".to_vec())
3183            .fail_remove("s.mp3");
3184        let mut manifest = Manifest::new();
3185        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3186        let plan = Plan {
3187            actions: vec![Action::Delete {
3188                path: "s.mp3".to_owned(),
3189                clip_id: "s".to_owned(),
3190            }],
3191        };
3192
3193        let outcome = run(
3194            &plan,
3195            &mut manifest,
3196            &[],
3197            &ScriptedHttp::new(),
3198            &fs,
3199            &StubFfmpeg::flac(),
3200            &RecordingClock::new(),
3201            &ExecOptions::default(),
3202        );
3203
3204        assert_eq!(outcome.deleted, 0);
3205        assert_eq!(outcome.failed(), 1);
3206        assert!(manifest.get("s").is_some());
3207        assert!(fs.exists("s.mp3"));
3208    }
3209
3210    #[test]
3211    fn skip_is_a_noop() {
3212        let mut manifest = Manifest::new();
3213        let plan = Plan {
3214            actions: vec![Action::Skip {
3215                clip_id: "r".to_owned(),
3216            }],
3217        };
3218        let outcome = run(
3219            &plan,
3220            &mut manifest,
3221            &[],
3222            &ScriptedHttp::new(),
3223            &MemFs::new(),
3224            &StubFfmpeg::flac(),
3225            &RecordingClock::new(),
3226            &ExecOptions::default(),
3227        );
3228        assert_eq!(outcome.skipped, 1);
3229        assert_eq!(outcome.failed(), 0);
3230    }
3231
3232    // ── Pure helpers ────────────────────────────────────────────────
3233
3234    #[test]
3235    fn header_helpers_parse_or_ignore() {
3236        let resp = HttpResponse {
3237            status: 200,
3238            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3239            body: Vec::new(),
3240        };
3241        assert_eq!(content_length(&resp), Some(42));
3242
3243        let bare = HttpResponse {
3244            status: 200,
3245            headers: Vec::new(),
3246            body: Vec::new(),
3247        };
3248        assert_eq!(content_length(&bare), None);
3249    }
3250
3251    #[test]
3252    fn preserve_rule_covers_copy_and_private() {
3253        let base = desired(clip("x"), AudioFormat::Mp3);
3254        assert!(!preserve_for(&base));
3255        let mut copy_held = base.clone();
3256        copy_held.modes = vec![SourceMode::Copy];
3257        assert!(preserve_for(&copy_held));
3258        let mut private = base.clone();
3259        private.private = true;
3260        assert!(preserve_for(&private));
3261    }
3262
3263    fn fs_new() -> MemFs {
3264        MemFs::new()
3265    }
3266
3267    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3268
3269    #[test]
3270    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3271        let c = clip("s1");
3272        let mut d = desired(c.clone(), AudioFormat::Mp3);
3273        d.modes = vec![SourceMode::Copy];
3274        let plan = Plan {
3275            actions: vec![Action::Skip {
3276                clip_id: "s1".to_owned(),
3277            }],
3278        };
3279        let mut manifest = Manifest::new();
3280        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3281        assert!(!manifest.get("s1").unwrap().preserve);
3282
3283        let outcome = run(
3284            &plan,
3285            &mut manifest,
3286            &[d],
3287            &ScriptedHttp::new(),
3288            &fs_new(),
3289            &StubFfmpeg::flac(),
3290            &RecordingClock::new(),
3291            &ExecOptions::default(),
3292        );
3293
3294        assert_eq!(outcome.skipped, 1);
3295        assert!(
3296            manifest.get("s1").unwrap().preserve,
3297            "a copy-held skip must mark the entry preserved"
3298        );
3299    }
3300
3301    #[test]
3302    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3303        let c = clip("s2");
3304        let d = desired(c.clone(), AudioFormat::Mp3);
3305        let plan = Plan {
3306            actions: vec![Action::Skip {
3307                clip_id: "s2".to_owned(),
3308            }],
3309        };
3310        let mut manifest = Manifest::new();
3311        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3312        stale.preserve = true;
3313        manifest.insert("s2".to_owned(), stale);
3314
3315        run(
3316            &plan,
3317            &mut manifest,
3318            &[d],
3319            &ScriptedHttp::new(),
3320            &fs_new(),
3321            &StubFfmpeg::flac(),
3322            &RecordingClock::new(),
3323            &ExecOptions::default(),
3324        );
3325
3326        assert!(
3327            !manifest.get("s2").unwrap().preserve,
3328            "a mirror-only skip must clear a stale preserve marker"
3329        );
3330    }
3331
3332    #[test]
3333    fn flac_render_retries_a_rate_limited_wav_lookup() {
3334        let c = clip("rl");
3335        let d = desired(c.clone(), AudioFormat::Flac);
3336        let plan = Plan {
3337            actions: vec![Action::Download {
3338                clip: c.clone(),
3339                lineage: LineageContext::own_root(&c),
3340                path: d.path.clone(),
3341                format: AudioFormat::Flac,
3342            }],
3343        };
3344        let http = ScriptedHttp::new()
3345            .with_auth()
3346            .route_seq(
3347                "/wav_file/",
3348                vec![
3349                    Reply::status(429),
3350                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3351                ],
3352            )
3353            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3354        let clock = RecordingClock::new();
3355        let mut manifest = Manifest::new();
3356
3357        let outcome = run(
3358            &plan,
3359            &mut manifest,
3360            &[d],
3361            &http,
3362            &fs_new(),
3363            &StubFfmpeg::flac(),
3364            &clock,
3365            &small_poll(),
3366        );
3367
3368        assert_eq!(outcome.downloaded, 1);
3369        assert_eq!(outcome.failed(), 0);
3370        // The render was ready on retry, so no fresh convert_wav was needed.
3371        assert_eq!(http.count("/convert_wav/"), 0);
3372        // One transient backoff (1s base), not the 5s poll interval.
3373        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3374    }
3375
3376    // ── Phase 6: artifact actions ───────────────────────────────────
3377
3378    #[test]
3379    fn write_artifact_fetches_writes_and_updates_manifest() {
3380        // The owning entry exists (its audio was kept this run); WriteArtifact
3381        // fetches the source, writes the sidecar, and records it on the entry.
3382        let mut manifest = Manifest::new();
3383        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3384        let plan = Plan {
3385            actions: vec![Action::WriteArtifact {
3386                kind: ArtifactKind::CoverJpg,
3387                path: "a/cover.jpg".to_owned(),
3388                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3389                hash: "h1".to_owned(),
3390                owner_id: "a".to_owned(),
3391                content: None,
3392            }],
3393        };
3394        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3395        let fs = MemFs::new();
3396
3397        let outcome = run(
3398            &plan,
3399            &mut manifest,
3400            &[],
3401            &http,
3402            &fs,
3403            &StubFfmpeg::flac(),
3404            &RecordingClock::new(),
3405            &ExecOptions::default(),
3406        );
3407
3408        assert_eq!(outcome.artifacts_written, 1);
3409        assert_eq!(outcome.failed(), 0);
3410        assert_eq!(outcome.status, RunStatus::Completed);
3411        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3412        assert_eq!(
3413            manifest.get("a").unwrap().cover_jpg,
3414            Some(ArtifactState {
3415                path: "a/cover.jpg".to_owned(),
3416                hash: "h1".to_owned(),
3417            })
3418        );
3419    }
3420
3421    #[test]
3422    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3423        // A generated text sidecar carries its body inline, so it is written
3424        // verbatim with NO HTTP fetch and the details slot records its state.
3425        let mut manifest = Manifest::new();
3426        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3427        let plan = Plan {
3428            actions: vec![Action::WriteArtifact {
3429                kind: ArtifactKind::DetailsTxt,
3430                path: "a.details.txt".to_owned(),
3431                source_url: String::new(),
3432                hash: "dh".to_owned(),
3433                owner_id: "a".to_owned(),
3434                content: Some("Title: A\n".to_owned()),
3435            }],
3436        };
3437        // An empty HTTP script: any fetch would fail, proving none happens.
3438        let http = ScriptedHttp::new();
3439        let fs = MemFs::new();
3440
3441        let outcome = run(
3442            &plan,
3443            &mut manifest,
3444            &[],
3445            &http,
3446            &fs,
3447            &StubFfmpeg::flac(),
3448            &RecordingClock::new(),
3449            &ExecOptions::default(),
3450        );
3451
3452        assert_eq!(outcome.artifacts_written, 1);
3453        assert_eq!(outcome.failed(), 0);
3454        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3455        assert_eq!(
3456            manifest.get("a").unwrap().details_txt,
3457            Some(ArtifactState {
3458                path: "a.details.txt".to_owned(),
3459                hash: "dh".to_owned(),
3460            })
3461        );
3462    }
3463
3464    #[test]
3465    fn write_lyrics_sidecar_relocation_removes_old_file() {
3466        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3467        // the executor writes the new file and prunes the stale one.
3468        let mut manifest = Manifest::new();
3469        let mut e = entry("old/a.flac", AudioFormat::Flac);
3470        e.lyrics_txt = Some(ArtifactState {
3471            path: "old/a.lyrics.txt".to_owned(),
3472            hash: "lh".to_owned(),
3473        });
3474        manifest.insert("a", e);
3475        let fs = MemFs::new()
3476            .with_file("old/a.flac", b"AUDIO".to_vec())
3477            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3478        let plan = Plan {
3479            actions: vec![Action::WriteArtifact {
3480                kind: ArtifactKind::LyricsTxt,
3481                path: "new/a.lyrics.txt".to_owned(),
3482                source_url: String::new(),
3483                hash: "lh".to_owned(),
3484                owner_id: "a".to_owned(),
3485                content: Some("new words\n".to_owned()),
3486            }],
3487        };
3488
3489        let outcome = run(
3490            &plan,
3491            &mut manifest,
3492            &[],
3493            &ScriptedHttp::new(),
3494            &fs,
3495            &StubFfmpeg::flac(),
3496            &RecordingClock::new(),
3497            &ExecOptions::default(),
3498        );
3499
3500        assert_eq!(outcome.failed(), 0);
3501        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3502        assert!(!fs.exists("old/a.lyrics.txt"));
3503        assert_eq!(
3504            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3505            "new/a.lyrics.txt"
3506        );
3507    }
3508
3509    #[test]
3510    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3511        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3512        // Each write's inline old-path cleanup must skip a path another action
3513        // writes this run, or the second write would delete the first's freshly
3514        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3515        // for every sidecar, including the .mp4 video.
3516        let mut manifest = Manifest::new();
3517        let mut a = entry("a.flac", AudioFormat::Flac);
3518        a.lyrics_txt = Some(ArtifactState {
3519            path: "x.lyrics.txt".to_owned(),
3520            hash: "ah".to_owned(),
3521        });
3522        manifest.insert("a", a);
3523        let mut b = entry("b.flac", AudioFormat::Flac);
3524        b.lyrics_txt = Some(ArtifactState {
3525            path: "y.lyrics.txt".to_owned(),
3526            hash: "bh".to_owned(),
3527        });
3528        manifest.insert("b", b);
3529        let fs = MemFs::new()
3530            .with_file("a.flac", b"A".to_vec())
3531            .with_file("b.flac", b"B".to_vec())
3532            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3533            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3534        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3535        let plan = Plan {
3536            actions: vec![
3537                Action::WriteArtifact {
3538                    kind: ArtifactKind::LyricsTxt,
3539                    path: "y.lyrics.txt".to_owned(),
3540                    source_url: String::new(),
3541                    hash: "ah".to_owned(),
3542                    owner_id: "a".to_owned(),
3543                    content: Some("A words\n".to_owned()),
3544                },
3545                Action::WriteArtifact {
3546                    kind: ArtifactKind::LyricsTxt,
3547                    path: "x.lyrics.txt".to_owned(),
3548                    source_url: String::new(),
3549                    hash: "bh".to_owned(),
3550                    owner_id: "b".to_owned(),
3551                    content: Some("B words\n".to_owned()),
3552                },
3553            ],
3554        };
3555
3556        let outcome = run(
3557            &plan,
3558            &mut manifest,
3559            &[],
3560            &ScriptedHttp::new(),
3561            &fs,
3562            &StubFfmpeg::flac(),
3563            &RecordingClock::new(),
3564            &ExecOptions::default(),
3565        );
3566
3567        assert_eq!(outcome.failed(), 0);
3568        // Both freshly written files survive; neither cleanup clobbered the other.
3569        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3570        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3571        assert_eq!(
3572            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3573            "y.lyrics.txt"
3574        );
3575        assert_eq!(
3576            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3577            "x.lyrics.txt"
3578        );
3579    }
3580
3581    #[test]
3582    fn old_sidecar_kept_when_another_clip_still_references_it() {
3583        // A prior failed swap can leave two clips pointing at one path (A -> y and
3584        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3585        // still A's live file (#76). tracked_paths counts two references to y, so
3586        // the removal is skipped even though y is not a write target this run.
3587        let mut manifest = Manifest::new();
3588        let mut a = entry("a.flac", AudioFormat::Flac);
3589        a.lyrics_txt = Some(ArtifactState {
3590            path: "y.lyrics.txt".to_owned(),
3591            hash: "ah".to_owned(),
3592        });
3593        manifest.insert("a", a);
3594        let mut b = entry("b.flac", AudioFormat::Flac);
3595        b.lyrics_txt = Some(ArtifactState {
3596            path: "y.lyrics.txt".to_owned(),
3597            hash: "bh".to_owned(),
3598        });
3599        manifest.insert("b", b);
3600        let fs = MemFs::new()
3601            .with_file("a.flac", b"A".to_vec())
3602            .with_file("b.flac", b"B".to_vec())
3603            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3604        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3605        // the tracked-reference count is what protects A's file.
3606        let plan = Plan {
3607            actions: vec![Action::WriteArtifact {
3608                kind: ArtifactKind::LyricsTxt,
3609                path: "x.lyrics.txt".to_owned(),
3610                source_url: String::new(),
3611                hash: "bh".to_owned(),
3612                owner_id: "b".to_owned(),
3613                content: Some("B words\n".to_owned()),
3614            }],
3615        };
3616
3617        let outcome = run(
3618            &plan,
3619            &mut manifest,
3620            &[],
3621            &ScriptedHttp::new(),
3622            &fs,
3623            &StubFfmpeg::flac(),
3624            &RecordingClock::new(),
3625            &ExecOptions::default(),
3626        );
3627
3628        assert_eq!(outcome.failed(), 0);
3629        assert!(
3630            fs.exists("y.lyrics.txt"),
3631            "A's live sidecar must not be deleted"
3632        );
3633        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3634    }
3635
3636    #[test]
3637    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3638        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3639        // When BOTH move away this run, the path is no longer live, so the last
3640        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3641        // still referenced. The dynamic reference count drops to zero only after
3642        // both moves, so exactly the final cleanup removes it (#76).
3643        let mut manifest = Manifest::new();
3644        let mut a = entry("a.flac", AudioFormat::Flac);
3645        a.lyrics_txt = Some(ArtifactState {
3646            path: "s.lyrics.txt".to_owned(),
3647            hash: "ah".to_owned(),
3648        });
3649        manifest.insert("a", a);
3650        let mut b = entry("b.flac", AudioFormat::Flac);
3651        b.lyrics_txt = Some(ArtifactState {
3652            path: "s.lyrics.txt".to_owned(),
3653            hash: "bh".to_owned(),
3654        });
3655        manifest.insert("b", b);
3656        let fs = MemFs::new()
3657            .with_file("a.flac", b"A".to_vec())
3658            .with_file("b.flac", b"B".to_vec())
3659            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3660        let plan = Plan {
3661            actions: vec![
3662                Action::WriteArtifact {
3663                    kind: ArtifactKind::LyricsTxt,
3664                    path: "pa.lyrics.txt".to_owned(),
3665                    source_url: String::new(),
3666                    hash: "ah".to_owned(),
3667                    owner_id: "a".to_owned(),
3668                    content: Some("A words\n".to_owned()),
3669                },
3670                Action::WriteArtifact {
3671                    kind: ArtifactKind::LyricsTxt,
3672                    path: "pb.lyrics.txt".to_owned(),
3673                    source_url: String::new(),
3674                    hash: "bh".to_owned(),
3675                    owner_id: "b".to_owned(),
3676                    content: Some("B words\n".to_owned()),
3677                },
3678            ],
3679        };
3680
3681        let outcome = run(
3682            &plan,
3683            &mut manifest,
3684            &[],
3685            &ScriptedHttp::new(),
3686            &fs,
3687            &StubFfmpeg::flac(),
3688            &RecordingClock::new(),
3689            &ExecOptions::default(),
3690        );
3691
3692        assert_eq!(outcome.failed(), 0);
3693        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
3694        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
3695        assert!(
3696            !fs.exists("s.lyrics.txt"),
3697            "the vacated shared path must be reclaimed, not orphaned"
3698        );
3699    }
3700
3701    #[test]
3702    fn write_text_sidecar_skipped_when_owner_audio_absent() {
3703        // A text sidecar for a clip with no manifest entry (its audio download
3704        // failed) must be skipped, never writing an untracked file.
3705        let plan = Plan {
3706            actions: vec![Action::WriteArtifact {
3707                kind: ArtifactKind::DetailsTxt,
3708                path: "gone.details.txt".to_owned(),
3709                source_url: String::new(),
3710                hash: "dh".to_owned(),
3711                owner_id: "gone".to_owned(),
3712                content: Some("Title: Gone\n".to_owned()),
3713            }],
3714        };
3715        let fs = MemFs::new();
3716        let mut manifest = Manifest::new();
3717
3718        let outcome = run(
3719            &plan,
3720            &mut manifest,
3721            &[],
3722            &ScriptedHttp::new(),
3723            &fs,
3724            &StubFfmpeg::flac(),
3725            &RecordingClock::new(),
3726            &ExecOptions::default(),
3727        );
3728
3729        assert_eq!(outcome.artifacts_written, 0);
3730        assert_eq!(outcome.skipped, 1);
3731        assert!(!fs.exists("gone.details.txt"));
3732        assert!(manifest.get("gone").is_none());
3733    }
3734
3735    #[test]
3736    fn delete_artifact_removes_file_and_clears_slot() {
3737        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
3738        let mut manifest = Manifest::new();
3739        let mut e = entry("a.mp3", AudioFormat::Mp3);
3740        e.cover_jpg = Some(ArtifactState {
3741            path: "a/cover.jpg".to_owned(),
3742            hash: "h1".to_owned(),
3743        });
3744        manifest.insert("a", e);
3745        let plan = Plan {
3746            actions: vec![Action::DeleteArtifact {
3747                kind: ArtifactKind::CoverJpg,
3748                path: "a/cover.jpg".to_owned(),
3749                owner_id: "a".to_owned(),
3750            }],
3751        };
3752
3753        let outcome = run(
3754            &plan,
3755            &mut manifest,
3756            &[],
3757            &ScriptedHttp::new(),
3758            &fs,
3759            &StubFfmpeg::flac(),
3760            &RecordingClock::new(),
3761            &ExecOptions::default(),
3762        );
3763
3764        assert_eq!(outcome.artifacts_deleted, 1);
3765        assert!(!fs.exists("a/cover.jpg"));
3766        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3767    }
3768
3769    #[test]
3770    fn delete_artifact_tolerates_already_absent_file() {
3771        // `remove` is idempotent, so co-deleting a sidecar that is already gone
3772        // is not a failure.
3773        let mut manifest = Manifest::new();
3774        let mut e = entry("a.mp3", AudioFormat::Mp3);
3775        e.cover_jpg = Some(ArtifactState {
3776            path: "a/cover.jpg".to_owned(),
3777            hash: "h1".to_owned(),
3778        });
3779        manifest.insert("a", e);
3780        let plan = Plan {
3781            actions: vec![Action::DeleteArtifact {
3782                kind: ArtifactKind::CoverJpg,
3783                path: "a/cover.jpg".to_owned(),
3784                owner_id: "a".to_owned(),
3785            }],
3786        };
3787
3788        let outcome = run(
3789            &plan,
3790            &mut manifest,
3791            &[],
3792            &ScriptedHttp::new(),
3793            &MemFs::new(),
3794            &StubFfmpeg::flac(),
3795            &RecordingClock::new(),
3796            &ExecOptions::default(),
3797        );
3798
3799        assert_eq!(outcome.artifacts_deleted, 1);
3800        assert_eq!(outcome.failed(), 0);
3801        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3802    }
3803
3804    #[test]
3805    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
3806        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
3807        // the run continues and the following WriteArtifact still succeeds.
3808        let mut manifest = Manifest::new();
3809        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3810        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3811        let plan = Plan {
3812            actions: vec![
3813                Action::WriteArtifact {
3814                    kind: ArtifactKind::CoverJpg,
3815                    path: "a/cover.jpg".to_owned(),
3816                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3817                    hash: "h1".to_owned(),
3818                    owner_id: "a".to_owned(),
3819                    content: None,
3820                },
3821                Action::WriteArtifact {
3822                    kind: ArtifactKind::CoverJpg,
3823                    path: "b/cover.jpg".to_owned(),
3824                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3825                    hash: "h2".to_owned(),
3826                    owner_id: "b".to_owned(),
3827                    content: None,
3828                },
3829            ],
3830        };
3831        let http = ScriptedHttp::new()
3832            .route("a/large.jpg", Reply::status(404))
3833            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
3834        let fs = MemFs::new();
3835
3836        let outcome = run(
3837            &plan,
3838            &mut manifest,
3839            &[],
3840            &http,
3841            &fs,
3842            &StubFfmpeg::flac(),
3843            &RecordingClock::new(),
3844            &ExecOptions::default(),
3845        );
3846
3847        assert_eq!(outcome.status, RunStatus::Completed);
3848        assert_eq!(outcome.failed(), 1);
3849        assert_eq!(outcome.failures[0].clip_id, "a");
3850        assert_eq!(outcome.artifacts_written, 1);
3851        // The failed sidecar left no file and no manifest record.
3852        assert!(!fs.exists("a/cover.jpg"));
3853        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3854        // The following sidecar was written and recorded.
3855        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
3856        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
3857    }
3858
3859    #[test]
3860    fn stranded_old_sidecar_removed_when_colliding_writer_fails() {
3861        // #142: clip A moves its cover shared -> a/cover.jpg (fetch succeeds);
3862        // clip B is planned to write the vacated `shared` path but its fetch
3863        // fails. The old-path cleanup is gated on COMMITTED writes, not planned
3864        // ones, so B's failed write no longer protects the stale file: A's old
3865        // `shared` copy is removed rather than left as an untracked orphan.
3866        let mut manifest = Manifest::new();
3867        let mut a = entry("a.mp3", AudioFormat::Mp3);
3868        a.cover_jpg = Some(ArtifactState {
3869            path: "shared/cover.jpg".to_owned(),
3870            hash: "ha".to_owned(),
3871        });
3872        manifest.insert("a", a);
3873        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3874        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
3875        let plan = Plan {
3876            actions: vec![
3877                Action::WriteArtifact {
3878                    kind: ArtifactKind::CoverJpg,
3879                    path: "a/cover.jpg".to_owned(),
3880                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3881                    hash: "ha".to_owned(),
3882                    owner_id: "a".to_owned(),
3883                    content: None,
3884                },
3885                Action::WriteArtifact {
3886                    kind: ArtifactKind::CoverJpg,
3887                    path: "shared/cover.jpg".to_owned(),
3888                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3889                    hash: "hb".to_owned(),
3890                    owner_id: "b".to_owned(),
3891                    content: None,
3892                },
3893            ],
3894        };
3895        let http = ScriptedHttp::new()
3896            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
3897            .route("b/large.jpg", Reply::status(404));
3898
3899        let outcome = run(
3900            &plan,
3901            &mut manifest,
3902            &[],
3903            &http,
3904            &fs,
3905            &StubFfmpeg::flac(),
3906            &RecordingClock::new(),
3907            &ExecOptions::default(),
3908        );
3909
3910        assert_eq!(outcome.failed(), 1);
3911        assert_eq!(outcome.failures[0].clip_id, "b");
3912        // A's move committed; the vacated file is gone, not an orphan.
3913        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
3914        assert!(
3915            !fs.exists("shared/cover.jpg"),
3916            "the vacated file must be removed once the colliding writer failed"
3917        );
3918        assert_eq!(
3919            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
3920            "a/cover.jpg"
3921        );
3922    }
3923
3924    #[test]
3925    fn committed_write_at_old_path_is_preserved() {
3926        // #142: clip B writes `shared` and commits BEFORE clip A vacates it
3927        // (A moves shared -> a/cover.jpg). A's cleanup sees `shared` in the
3928        // committed set and keeps B's freshly written file rather than deleting
3929        // it. This is the successful-collision case the guard must still protect.
3930        let mut manifest = Manifest::new();
3931        let mut a = entry("a.mp3", AudioFormat::Mp3);
3932        a.cover_jpg = Some(ArtifactState {
3933            path: "shared/cover.jpg".to_owned(),
3934            hash: "ha".to_owned(),
3935        });
3936        manifest.insert("a", a);
3937        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
3938        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
3939        let plan = Plan {
3940            actions: vec![
3941                Action::WriteArtifact {
3942                    kind: ArtifactKind::CoverJpg,
3943                    path: "shared/cover.jpg".to_owned(),
3944                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
3945                    hash: "hb".to_owned(),
3946                    owner_id: "b".to_owned(),
3947                    content: None,
3948                },
3949                Action::WriteArtifact {
3950                    kind: ArtifactKind::CoverJpg,
3951                    path: "a/cover.jpg".to_owned(),
3952                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3953                    hash: "ha".to_owned(),
3954                    owner_id: "a".to_owned(),
3955                    content: None,
3956                },
3957            ],
3958        };
3959        let http = ScriptedHttp::new()
3960            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()))
3961            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()));
3962
3963        let outcome = run(
3964            &plan,
3965            &mut manifest,
3966            &[],
3967            &http,
3968            &fs,
3969            &StubFfmpeg::flac(),
3970            &RecordingClock::new(),
3971            &ExecOptions::default(),
3972        );
3973
3974        assert_eq!(outcome.failed(), 0);
3975        // B's committed write survives A's subsequent move; both files are present.
3976        assert_eq!(fs.read_file("shared/cover.jpg").unwrap(), b"jpg-b");
3977        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
3978        assert_eq!(
3979            manifest.get("b").unwrap().cover_jpg.as_ref().unwrap().path,
3980            "shared/cover.jpg"
3981        );
3982        assert_eq!(
3983            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
3984            "a/cover.jpg"
3985        );
3986    }
3987
3988    #[test]
3989    fn co_delete_executes_audio_delete_then_artifact_delete() {
3990        // The plan orders the audio Delete before its sidecar DeleteArtifact.
3991        // The audio delete removes the manifest entry; the sidecar delete then
3992        // removes the file and tolerates the now-absent entry.
3993        let fs = MemFs::new()
3994            .with_file("gone.mp3", b"DATA".to_vec())
3995            .with_file("gone/cover.jpg", b"jpg".to_vec());
3996        let mut manifest = Manifest::new();
3997        let mut e = entry("gone.mp3", AudioFormat::Mp3);
3998        e.cover_jpg = Some(ArtifactState {
3999            path: "gone/cover.jpg".to_owned(),
4000            hash: "h1".to_owned(),
4001        });
4002        manifest.insert("gone", e);
4003        let plan = Plan {
4004            actions: vec![
4005                Action::Delete {
4006                    path: "gone.mp3".to_owned(),
4007                    clip_id: "gone".to_owned(),
4008                },
4009                Action::DeleteArtifact {
4010                    kind: ArtifactKind::CoverJpg,
4011                    path: "gone/cover.jpg".to_owned(),
4012                    owner_id: "gone".to_owned(),
4013                },
4014            ],
4015        };
4016
4017        let outcome = run(
4018            &plan,
4019            &mut manifest,
4020            &[],
4021            &ScriptedHttp::new(),
4022            &fs,
4023            &StubFfmpeg::flac(),
4024            &RecordingClock::new(),
4025            &ExecOptions::default(),
4026        );
4027
4028        assert_eq!(outcome.deleted, 1);
4029        assert_eq!(outcome.artifacts_deleted, 1);
4030        assert_eq!(outcome.failed(), 0);
4031        assert!(!fs.exists("gone.mp3"));
4032        assert!(!fs.exists("gone/cover.jpg"));
4033        assert!(manifest.get("gone").is_none());
4034    }
4035
4036    #[test]
4037    fn write_stem_mp3_stores_raw_and_records_slot() {
4038        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
4039        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
4040        // keyed slot records the path and hash.
4041        let mut manifest = Manifest::new();
4042        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4043        let plan = Plan {
4044            actions: vec![Action::WriteStem {
4045                clip_id: "a".to_owned(),
4046                key: "voc".to_owned(),
4047                stem_id: "voc".to_owned(),
4048                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4049                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4050                format: StemFormat::Mp3,
4051                hash: "vh".to_owned(),
4052            }],
4053        };
4054        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
4055        let fs = MemFs::new();
4056
4057        let outcome = run(
4058            &plan,
4059            &mut manifest,
4060            &[],
4061            &http,
4062            &fs,
4063            &StubFfmpeg::flac(),
4064            &RecordingClock::new(),
4065            &ExecOptions::default(),
4066        );
4067
4068        assert_eq!(outcome.artifacts_written, 1);
4069        assert_eq!(outcome.failed(), 0);
4070        // Bytes are stored exactly as delivered (no transcode applied).
4071        assert_eq!(
4072            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
4073            b"stem-bytes"
4074        );
4075        // An MP3 stem never renders WAV: no convert_wav, no generation.
4076        assert_eq!(http.count("convert_wav"), 0);
4077        assert_eq!(http.count("/api/gen/"), 0);
4078        assert_eq!(
4079            manifest.get("a").unwrap().stems.get("voc"),
4080            Some(&ArtifactState {
4081                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4082                hash: "vh".to_owned(),
4083            })
4084        );
4085    }
4086
4087    #[test]
4088    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
4089        // A WAV stem (the default) renders the stem clip's lossless WAV through the
4090        // free convert_wav flow keyed on the stem id, then downloads and stores it
4091        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
4092        let mut manifest = Manifest::new();
4093        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4094        let plan = Plan {
4095            actions: vec![Action::WriteStem {
4096                clip_id: "a".to_owned(),
4097                key: "voc".to_owned(),
4098                stem_id: "stemvoc".to_owned(),
4099                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
4100                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
4101                format: StemFormat::Wav,
4102                hash: "vh".to_owned(),
4103            }],
4104        };
4105        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
4106        // (free) and polls again — exactly the main FLAC/WAV render path.
4107        let http = ScriptedHttp::new()
4108            .with_auth()
4109            .route_seq(
4110                "stemvoc/wav_file/",
4111                vec![
4112                    Reply::json("{}"),
4113                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
4114                ],
4115            )
4116            .route("stemvoc/convert_wav/", Reply::status(200))
4117            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
4118        let fs = MemFs::new();
4119
4120        let outcome = run(
4121            &plan,
4122            &mut manifest,
4123            &[],
4124            &http,
4125            &fs,
4126            &StubFfmpeg::flac(),
4127            &RecordingClock::new(),
4128            &small_poll(),
4129        );
4130
4131        assert_eq!(outcome.artifacts_written, 1);
4132        assert_eq!(outcome.failed(), 0);
4133        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
4134        // so the stored bytes are the raw WAV, not a FLAC transcode.
4135        assert_eq!(
4136            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
4137            b"RIFFwav-bytes"
4138        );
4139        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
4140        // The free WAV render ran; no credit-spending generation endpoint did.
4141        assert_eq!(http.count("convert_wav"), 1);
4142        assert_eq!(http.count("stem_task"), 0);
4143        assert_eq!(http.count("separate"), 0);
4144        assert_eq!(
4145            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4146            "a.stems/a - Vocals [stemvoc].wav"
4147        );
4148    }
4149
4150    #[test]
4151    fn write_stem_is_skipped_when_owner_audio_is_absent() {
4152        // No owning manifest entry (audio failed or never existed) => skip with
4153        // no fetch and no write, so a stem is never stranded without its song.
4154        let mut manifest = Manifest::new();
4155        let plan = Plan {
4156            actions: vec![Action::WriteStem {
4157                clip_id: "ghost".to_owned(),
4158                key: "voc".to_owned(),
4159                stem_id: "voc".to_owned(),
4160                path: "ghost.stems/voc.mp3".to_owned(),
4161                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4162                format: StemFormat::Mp3,
4163                hash: "vh".to_owned(),
4164            }],
4165        };
4166        // Empty HTTP script: any fetch would error, proving none happens.
4167        let http = ScriptedHttp::new();
4168        let fs = MemFs::new();
4169
4170        let outcome = run(
4171            &plan,
4172            &mut manifest,
4173            &[],
4174            &http,
4175            &fs,
4176            &StubFfmpeg::flac(),
4177            &RecordingClock::new(),
4178            &ExecOptions::default(),
4179        );
4180
4181        assert_eq!(outcome.skipped, 1);
4182        assert_eq!(outcome.artifacts_written, 0);
4183        assert_eq!(outcome.failed(), 0);
4184        assert!(!fs.exists("ghost.stems/voc.mp3"));
4185    }
4186
4187    #[test]
4188    fn write_stem_relocates_the_old_file_on_a_path_move() {
4189        // The song was renamed, so the stem moves: the new file is written and the
4190        // stale copy at the previously tracked path is removed (moved, not orphaned).
4191        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
4192        let mut manifest = Manifest::new();
4193        let mut e = entry("new.flac", AudioFormat::Flac);
4194        e.stems.insert(
4195            "voc".to_owned(),
4196            ArtifactState {
4197                path: "old.stems/voc.mp3".to_owned(),
4198                hash: "vh".to_owned(),
4199            },
4200        );
4201        manifest.insert("a", e);
4202        let plan = Plan {
4203            actions: vec![Action::WriteStem {
4204                clip_id: "a".to_owned(),
4205                key: "voc".to_owned(),
4206                stem_id: "voc".to_owned(),
4207                path: "new.stems/voc.mp3".to_owned(),
4208                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4209                format: StemFormat::Mp3,
4210                hash: "vh".to_owned(),
4211            }],
4212        };
4213        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4214
4215        let outcome = run(
4216            &plan,
4217            &mut manifest,
4218            &[],
4219            &http,
4220            &fs,
4221            &StubFfmpeg::flac(),
4222            &RecordingClock::new(),
4223            &ExecOptions::default(),
4224        );
4225
4226        assert_eq!(outcome.artifacts_written, 1);
4227        assert!(fs.exists("new.stems/voc.mp3"));
4228        assert!(
4229            !fs.exists("old.stems/voc.mp3"),
4230            "the old stem is moved, not left behind"
4231        );
4232        assert_eq!(
4233            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4234            "new.stems/voc.mp3"
4235        );
4236    }
4237
4238    #[test]
4239    fn delete_stem_removes_file_and_clears_slot() {
4240        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4241        let mut manifest = Manifest::new();
4242        let mut e = entry("a.flac", AudioFormat::Flac);
4243        e.stems.insert(
4244            "voc".to_owned(),
4245            ArtifactState {
4246                path: "a.stems/voc.mp3".to_owned(),
4247                hash: "vh".to_owned(),
4248            },
4249        );
4250        manifest.insert("a", e);
4251        let plan = Plan {
4252            actions: vec![Action::DeleteStem {
4253                clip_id: "a".to_owned(),
4254                key: "voc".to_owned(),
4255                path: "a.stems/voc.mp3".to_owned(),
4256            }],
4257        };
4258
4259        let outcome = run(
4260            &plan,
4261            &mut manifest,
4262            &[],
4263            &ScriptedHttp::new(),
4264            &fs,
4265            &StubFfmpeg::flac(),
4266            &RecordingClock::new(),
4267            &ExecOptions::default(),
4268        );
4269
4270        assert_eq!(outcome.artifacts_deleted, 1);
4271        assert!(!fs.exists("a.stems/voc.mp3"));
4272        assert!(manifest.get("a").unwrap().stems.is_empty());
4273    }
4274
4275    #[test]
4276    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4277        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4278        // pruned by the end-of-run sweep, so it can never be orphaned.
4279        let fs = MemFs::new()
4280            .with_file("song.flac", b"DATA".to_vec())
4281            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4282        assert!(fs.has_dir("song.stems"));
4283        let mut manifest = Manifest::new();
4284        let mut e = entry("song.flac", AudioFormat::Flac);
4285        e.stems.insert(
4286            "voc".to_owned(),
4287            ArtifactState {
4288                path: "song.stems/voc.mp3".to_owned(),
4289                hash: "vh".to_owned(),
4290            },
4291        );
4292        manifest.insert("a", e);
4293        let plan = Plan {
4294            actions: vec![
4295                Action::Delete {
4296                    path: "song.flac".to_owned(),
4297                    clip_id: "a".to_owned(),
4298                },
4299                Action::DeleteStem {
4300                    clip_id: "a".to_owned(),
4301                    key: "voc".to_owned(),
4302                    path: "song.stems/voc.mp3".to_owned(),
4303                },
4304            ],
4305        };
4306
4307        let outcome = run(
4308            &plan,
4309            &mut manifest,
4310            &[],
4311            &ScriptedHttp::new(),
4312            &fs,
4313            &StubFfmpeg::flac(),
4314            &RecordingClock::new(),
4315            &ExecOptions::default(),
4316        );
4317
4318        assert_eq!(outcome.deleted, 1);
4319        assert_eq!(outcome.artifacts_deleted, 1);
4320        assert!(!fs.exists("song.flac"));
4321        assert!(!fs.exists("song.stems/voc.mp3"));
4322        assert!(
4323            !fs.has_dir("song.stems"),
4324            "the emptied .stems folder is pruned"
4325        );
4326        assert!(manifest.get("a").is_none());
4327    }
4328
4329    #[test]
4330    fn write_stem_mp3_never_issues_a_generation_post() {
4331        // The MP3 stem path is GET-only: writing a stem fetches its CDN url and
4332        // never POSTs, let alone to any generation or WAV-render endpoint.
4333        let mut manifest = Manifest::new();
4334        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4335        let plan = Plan {
4336            actions: vec![Action::WriteStem {
4337                clip_id: "a".to_owned(),
4338                key: "voc".to_owned(),
4339                stem_id: "voc".to_owned(),
4340                path: "a.stems/voc.mp3".to_owned(),
4341                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4342                format: StemFormat::Mp3,
4343                hash: "vh".to_owned(),
4344            }],
4345        };
4346        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem".to_vec()));
4347
4348        run(
4349            &plan,
4350            &mut manifest,
4351            &[],
4352            &http,
4353            &MemFs::new(),
4354            &StubFfmpeg::flac(),
4355            &RecordingClock::new(),
4356            &ExecOptions::default(),
4357        );
4358
4359        assert_eq!(
4360            http.count("stem_task"),
4361            0,
4362            "no generation endpoint is ever hit"
4363        );
4364        assert_eq!(http.count("convert_wav"), 0);
4365        assert_eq!(http.count("/api/gen/"), 0);
4366    }
4367
4368    #[test]
4369    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4370        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4371        // GET over the live page-count + 0-indexed page shape), reconcile them into
4372        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4373        // is GET-only and touches NO `/api/gen/` endpoint at all.
4374        let http = ScriptedHttp::new()
4375            .with_auth()
4376            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4377            .route(
4378                "clip1/stems?page=0",
4379                Reply::json(
4380                    r#"{"stems":[
4381                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4382                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4383                    ]}"#,
4384                ),
4385            )
4386            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
4387            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
4388
4389        // List the existing stems through the client (GET-only, free).
4390        let mut auth = ClerkAuth::new("eyJtoken");
4391        pollster::block_on(auth.authenticate(&http)).unwrap();
4392        let mut client = SunoClient::new(auth, RecordingClock::new());
4393        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4394        assert!(complete);
4395        assert_eq!(stems.len(), 2);
4396        assert_eq!(stems[0].label, "Vocals");
4397
4398        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
4399        let mut manifest = Manifest::new();
4400        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4401        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4402            .iter()
4403            .map(|s| crate::reconcile::DesiredStem {
4404                key: s.id.clone(),
4405                stem_id: s.id.clone(),
4406                path: format!("clip1.stems/{}.mp3", s.id),
4407                source_url: s.url.clone(),
4408                format: StemFormat::Mp3,
4409                hash: crate::art_url_hash(&s.url),
4410            })
4411            .collect();
4412        let d = Desired {
4413            path: "clip1.flac".to_owned(),
4414            stems: Some(desired_stems),
4415            ..desired(clip("clip1"), AudioFormat::Flac)
4416        };
4417        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4418            "clip1".to_owned(),
4419            crate::reconcile::LocalFile {
4420                exists: true,
4421                size: 100,
4422            },
4423        )]
4424        .into_iter()
4425        .collect();
4426        let sources = [crate::reconcile::SourceStatus {
4427            mode: SourceMode::Mirror,
4428            fully_enumerated: true,
4429        }];
4430        let plan =
4431            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4432        assert_eq!(plan.stem_writes(), 2);
4433
4434        let fs = MemFs::new();
4435        let outcome = run(
4436            &plan,
4437            &mut manifest,
4438            std::slice::from_ref(&d),
4439            &http,
4440            &fs,
4441            &StubFfmpeg::flac(),
4442            &RecordingClock::new(),
4443            &ExecOptions::default(),
4444        );
4445
4446        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
4447        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
4448        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
4449        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
4450        // generation, no separation).
4451        assert_eq!(http.count("/api/gen/"), 0);
4452        assert_eq!(http.count("stem_task"), 0);
4453        assert_eq!(http.count("separate"), 0);
4454        assert_eq!(http.count("generate"), 0);
4455        // No stem is ever written as FLAC.
4456        assert!(!fs.exists("clip1.stems/s1.flac"));
4457    }
4458
4459    #[test]
4460    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
4461        // End-to-end #100 path with WAV stems (the default): each stem's lossless
4462        // WAV is rendered through the FREE convert_wav flow and stored RAW as
4463        // `.wav`. The mirror makes NO credit-spending generation POST.
4464        let http = ScriptedHttp::new()
4465            .with_auth()
4466            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4467            .route(
4468                "clip1/stems?page=0",
4469                Reply::json(
4470                    r#"{"stems":[
4471                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4472                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4473                    ]}"#,
4474                ),
4475            )
4476            // Each stem's WAV is already rendered, so wav_file returns the url and
4477            // no convert_wav POST is even needed (still free either way).
4478            .route(
4479                "s1/wav_file/",
4480                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
4481            )
4482            .route(
4483                "s2/wav_file/",
4484                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
4485            )
4486            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
4487            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
4488
4489        let mut auth = ClerkAuth::new("eyJtoken");
4490        pollster::block_on(auth.authenticate(&http)).unwrap();
4491        let mut client = SunoClient::new(auth, RecordingClock::new());
4492        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4493
4494        let mut manifest = Manifest::new();
4495        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4496        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4497            .iter()
4498            .map(|s| crate::reconcile::DesiredStem {
4499                key: s.id.clone(),
4500                stem_id: s.id.clone(),
4501                path: format!("clip1.stems/{}.wav", s.id),
4502                source_url: s.url.clone(),
4503                format: StemFormat::Wav,
4504                hash: crate::art_url_hash(&s.url),
4505            })
4506            .collect();
4507        let d = Desired {
4508            path: "clip1.flac".to_owned(),
4509            stems: Some(desired_stems),
4510            ..desired(clip("clip1"), AudioFormat::Flac)
4511        };
4512        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4513            "clip1".to_owned(),
4514            crate::reconcile::LocalFile {
4515                exists: true,
4516                size: 100,
4517            },
4518        )]
4519        .into_iter()
4520        .collect();
4521        let sources = [crate::reconcile::SourceStatus {
4522            mode: SourceMode::Mirror,
4523            fully_enumerated: true,
4524        }];
4525        let plan =
4526            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
4527
4528        let fs = MemFs::new();
4529        let outcome = run(
4530            &plan,
4531            &mut manifest,
4532            std::slice::from_ref(&d),
4533            &http,
4534            &fs,
4535            &StubFfmpeg::flac(),
4536            &RecordingClock::new(),
4537            &small_poll(),
4538        );
4539
4540        assert_eq!(outcome.artifacts_written, 2);
4541        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
4542        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
4543        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
4544        assert!(!fs.exists("clip1.stems/s1.flac"));
4545        // No credit-spending generation/separation endpoint is ever hit.
4546        assert_eq!(http.count("stem_task"), 0);
4547        assert_eq!(http.count("separate"), 0);
4548        assert_eq!(http.count("generate"), 0);
4549    }
4550
4551    #[test]
4552    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
4553        // A clip whose Download fails leaves no manifest entry, so its following
4554        // WriteArtifact must not strand an untracked sidecar: it is skipped with
4555        // no fetch and no write. A following healthy clip still succeeds.
4556        let ca = clip("a");
4557        let plan = Plan {
4558            actions: vec![
4559                Action::Download {
4560                    clip: ca.clone(),
4561                    lineage: LineageContext::own_root(&ca),
4562                    path: "a.mp3".to_owned(),
4563                    format: AudioFormat::Mp3,
4564                },
4565                Action::WriteArtifact {
4566                    kind: ArtifactKind::CoverJpg,
4567                    path: "a/cover.jpg".to_owned(),
4568                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4569                    hash: "h1".to_owned(),
4570                    owner_id: "a".to_owned(),
4571                    content: None,
4572                },
4573                Action::WriteArtifact {
4574                    kind: ArtifactKind::CoverJpg,
4575                    path: "b/cover.jpg".to_owned(),
4576                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4577                    hash: "h2".to_owned(),
4578                    owner_id: "b".to_owned(),
4579                    content: None,
4580                },
4581            ],
4582        };
4583        // The Download's audio 404s (permanent), so no entry for "a" is created.
4584        let http = ScriptedHttp::new()
4585            .route("a.mp3", Reply::status(404))
4586            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4587            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4588        let fs = MemFs::new();
4589        let mut manifest = Manifest::new();
4590        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
4591        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4592
4593        let outcome = run(
4594            &plan,
4595            &mut manifest,
4596            &[],
4597            &http,
4598            &fs,
4599            &StubFfmpeg::flac(),
4600            &RecordingClock::new(),
4601            &ExecOptions::default(),
4602        );
4603
4604        assert_eq!(outcome.status, RunStatus::Completed);
4605        // The audio download is the only failure; the orphan artifact is skipped.
4606        assert_eq!(outcome.failed(), 1);
4607        assert_eq!(outcome.failures[0].clip_id, "a");
4608        assert_eq!(outcome.skipped, 1);
4609        // The orphan sidecar was neither fetched nor written, and left no record.
4610        assert_eq!(http.count("a/large.jpg"), 0);
4611        assert!(!fs.exists("a/cover.jpg"));
4612        assert!(manifest.get("a").is_none());
4613        // The healthy clip's sidecar still succeeded.
4614        assert_eq!(outcome.artifacts_written, 1);
4615        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4616        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4617    }
4618
4619    #[test]
4620    fn write_artifact_transcodes_animated_cover_to_webp() {
4621        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
4622        // port, and writes the transcoded WebP (not the fetched MP4), recording
4623        // the sidecar on the owning entry.
4624        let mut manifest = Manifest::new();
4625        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4626        let plan = Plan {
4627            actions: vec![Action::WriteArtifact {
4628                kind: ArtifactKind::CoverWebp,
4629                path: "a/cover.webp".to_owned(),
4630                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4631                hash: "v1".to_owned(),
4632                owner_id: "a".to_owned(),
4633                content: None,
4634            }],
4635        };
4636        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4637        let fs = MemFs::new();
4638        let ffmpeg = StubFfmpeg::webp();
4639
4640        let outcome = run(
4641            &plan,
4642            &mut manifest,
4643            &[],
4644            &http,
4645            &fs,
4646            &ffmpeg,
4647            &RecordingClock::new(),
4648            &ExecOptions::default(),
4649        );
4650
4651        assert_eq!(outcome.artifacts_written, 1);
4652        assert_eq!(outcome.failed(), 0);
4653        assert_eq!(outcome.status, RunStatus::Completed);
4654        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
4655        assert_eq!(http.count("a/video.mp4"), 1);
4656        let written = fs.read_file("a/cover.webp").unwrap();
4657        assert_ne!(written, b"mp4-bytes");
4658        assert!(written.starts_with(b"RIFF"));
4659        assert_eq!(
4660            manifest.get("a").unwrap().cover_webp,
4661            Some(ArtifactState {
4662                path: "a/cover.webp".to_owned(),
4663                hash: "v1".to_owned(),
4664            })
4665        );
4666    }
4667
4668    #[test]
4669    fn write_artifact_webp_transcode_failure_is_per_clip() {
4670        // A transcode failure is attributed to the owning clip: it is a per-clip
4671        // failure, the run completes, no sidecar is written, and the slot stays
4672        // empty. A healthy static cover in the same run still succeeds.
4673        let mut manifest = Manifest::new();
4674        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4675        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4676        let plan = Plan {
4677            actions: vec![
4678                Action::WriteArtifact {
4679                    kind: ArtifactKind::CoverWebp,
4680                    path: "a/cover.webp".to_owned(),
4681                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4682                    hash: "v1".to_owned(),
4683                    owner_id: "a".to_owned(),
4684                    content: None,
4685                },
4686                Action::WriteArtifact {
4687                    kind: ArtifactKind::CoverJpg,
4688                    path: "b/cover.jpg".to_owned(),
4689                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4690                    hash: "h1".to_owned(),
4691                    owner_id: "b".to_owned(),
4692                    content: None,
4693                },
4694            ],
4695        };
4696        let http = ScriptedHttp::new()
4697            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
4698            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4699        let fs = MemFs::new();
4700
4701        let outcome = run(
4702            &plan,
4703            &mut manifest,
4704            &[],
4705            &http,
4706            &fs,
4707            &StubFfmpeg::failing(),
4708            &RecordingClock::new(),
4709            &ExecOptions::default(),
4710        );
4711
4712        assert_eq!(outcome.status, RunStatus::Completed);
4713        assert_eq!(outcome.failed(), 1);
4714        assert_eq!(outcome.failures[0].clip_id, "a");
4715        // The animated cover failed to transcode: nothing written, slot empty.
4716        assert!(!fs.exists("a/cover.webp"));
4717        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
4718        // The static cover in the same run still succeeded.
4719        assert_eq!(outcome.artifacts_written, 1);
4720        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4721        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4722    }
4723
4724    #[test]
4725    fn write_artifact_uses_configured_webp_settings() {
4726        use std::sync::{Arc, Mutex};
4727
4728        struct RecordingWebpFfmpeg {
4729            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
4730        }
4731
4732        impl Ffmpeg for RecordingWebpFfmpeg {
4733            async fn wav_to_flac(
4734                &self,
4735                _wav: &[u8],
4736            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
4737                Ok(Vec::new())
4738            }
4739
4740            async fn mp4_to_webp(
4741                &self,
4742                _mp4: &[u8],
4743                settings: WebpEncodeSettings,
4744            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
4745                let seen = Arc::clone(&self.seen);
4746                seen.lock().unwrap().push(settings);
4747                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
4748            }
4749        }
4750
4751        let mut manifest = Manifest::new();
4752        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4753        let plan = Plan {
4754            actions: vec![Action::WriteArtifact {
4755                kind: ArtifactKind::CoverWebp,
4756                path: "a/cover.webp".to_owned(),
4757                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
4758                hash: "v1".to_owned(),
4759                owner_id: "a".to_owned(),
4760                content: None,
4761            }],
4762        };
4763        let seen = Arc::new(Mutex::new(Vec::new()));
4764        let ffmpeg = RecordingWebpFfmpeg {
4765            seen: Arc::clone(&seen),
4766        };
4767        let opts = ExecOptions {
4768            cover_webp: WebpEncodeSettings {
4769                quality: 88,
4770                max_fps: 12,
4771                max_width: Some(720),
4772                lossless: false,
4773                compression_level: 4,
4774            },
4775            ..ExecOptions::default()
4776        };
4777
4778        let _ = run(
4779            &plan,
4780            &mut manifest,
4781            &[],
4782            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
4783            &MemFs::new(),
4784            &ffmpeg,
4785            &RecordingClock::new(),
4786            &opts,
4787        );
4788
4789        assert_eq!(
4790            seen.lock().unwrap().as_slice(),
4791            &[WebpEncodeSettings {
4792                quality: 88,
4793                max_fps: 12,
4794                max_width: Some(720),
4795                lossless: false,
4796                compression_level: 4,
4797            }]
4798        );
4799    }
4800
4801    // ── Phase 8: folder art routes to the album store ───────────────
4802
4803    #[test]
4804    fn folder_jpg_write_records_album_state_and_skips_manifest() {
4805        // Folder art is owned by the album root id, not a manifest clip: it
4806        // writes even with an empty manifest and records on the album store.
4807        let mut manifest = Manifest::new();
4808        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4809        let plan = Plan {
4810            actions: vec![Action::WriteArtifact {
4811                kind: ArtifactKind::FolderJpg,
4812                path: "creator/album/folder.jpg".to_owned(),
4813                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
4814                hash: "jh".to_owned(),
4815                owner_id: "root".to_owned(),
4816                content: None,
4817            }],
4818        };
4819        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
4820        let fs = MemFs::new();
4821
4822        let outcome = run_with_albums(
4823            &plan,
4824            &mut manifest,
4825            &mut albums,
4826            &[],
4827            &http,
4828            &fs,
4829            &StubFfmpeg::flac(),
4830            &RecordingClock::new(),
4831            &ExecOptions::default(),
4832        );
4833
4834        assert_eq!(outcome.artifacts_written, 1);
4835        assert_eq!(outcome.status, RunStatus::Completed);
4836        assert_eq!(
4837            fs.read_file("creator/album/folder.jpg").unwrap(),
4838            b"folder-jpg"
4839        );
4840        assert_eq!(
4841            albums.get("root").unwrap().folder_jpg,
4842            Some(ArtifactState {
4843                path: "creator/album/folder.jpg".to_owned(),
4844                hash: "jh".to_owned(),
4845            })
4846        );
4847        assert!(manifest.get("root").is_none());
4848    }
4849
4850    #[test]
4851    fn folder_webp_write_transcodes_and_records_album_state() {
4852        let mut manifest = Manifest::new();
4853        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4854        let plan = Plan {
4855            actions: vec![Action::WriteArtifact {
4856                kind: ArtifactKind::FolderWebp,
4857                path: "creator/album/cover.webp".to_owned(),
4858                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4859                hash: "wh".to_owned(),
4860                owner_id: "root".to_owned(),
4861                content: None,
4862            }],
4863        };
4864        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4865        let fs = MemFs::new();
4866
4867        let outcome = run_with_albums(
4868            &plan,
4869            &mut manifest,
4870            &mut albums,
4871            &[],
4872            &http,
4873            &fs,
4874            &StubFfmpeg::webp(),
4875            &RecordingClock::new(),
4876            &ExecOptions::default(),
4877        );
4878
4879        assert_eq!(outcome.artifacts_written, 1);
4880        assert_eq!(outcome.failed(), 0);
4881        // The MP4 was transcoded to WebP, not written verbatim.
4882        let written = fs.read_file("creator/album/cover.webp").unwrap();
4883        assert_ne!(written, b"mp4-bytes");
4884        assert!(written.starts_with(b"RIFF"));
4885        assert_eq!(
4886            albums.get("root").unwrap().folder_webp,
4887            Some(ArtifactState {
4888                path: "creator/album/cover.webp".to_owned(),
4889                hash: "wh".to_owned(),
4890            })
4891        );
4892    }
4893
4894    #[test]
4895    fn folder_mp4_write_keeps_the_source_verbatim() {
4896        let mut manifest = Manifest::new();
4897        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4898        let plan = Plan {
4899            actions: vec![Action::WriteArtifact {
4900                kind: ArtifactKind::FolderMp4,
4901                path: "creator/album/cover.mp4".to_owned(),
4902                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4903                hash: "mh".to_owned(),
4904                owner_id: "root".to_owned(),
4905                content: None,
4906            }],
4907        };
4908        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4909        let fs = MemFs::new();
4910
4911        let outcome = run_with_albums(
4912            &plan,
4913            &mut manifest,
4914            &mut albums,
4915            &[],
4916            &http,
4917            &fs,
4918            &StubFfmpeg::webp(),
4919            &RecordingClock::new(),
4920            &ExecOptions::default(),
4921        );
4922
4923        assert_eq!(outcome.artifacts_written, 1);
4924        assert_eq!(outcome.failed(), 0);
4925        // The raw MP4 is written byte-for-byte, never transcoded.
4926        assert_eq!(
4927            fs.read_file("creator/album/cover.mp4").unwrap(),
4928            b"mp4-bytes"
4929        );
4930        assert_eq!(
4931            albums.get("root").unwrap().folder_mp4,
4932            Some(ArtifactState {
4933                path: "creator/album/cover.mp4".to_owned(),
4934                hash: "mh".to_owned(),
4935            })
4936        );
4937    }
4938
4939    #[test]
4940    fn both_folder_covers_fetch_the_video_cover_once() {
4941        let mut manifest = Manifest::new();
4942        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
4943        // `both` retention keeps cover.webp (transcoded) and cover.mp4 (raw) from
4944        // the one video_cover_url. FolderWebp sorts first and caches the fetched
4945        // source; FolderMp4 drains it, so the source is fetched exactly once.
4946        let plan = Plan {
4947            actions: vec![
4948                Action::WriteArtifact {
4949                    kind: ArtifactKind::FolderWebp,
4950                    path: "creator/album/cover.webp".to_owned(),
4951                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4952                    hash: "wh".to_owned(),
4953                    owner_id: "root".to_owned(),
4954                    content: None,
4955                },
4956                Action::WriteArtifact {
4957                    kind: ArtifactKind::FolderMp4,
4958                    path: "creator/album/cover.mp4".to_owned(),
4959                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
4960                    hash: "mh".to_owned(),
4961                    owner_id: "root".to_owned(),
4962                    content: None,
4963                },
4964            ],
4965        };
4966        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
4967        let fs = MemFs::new();
4968
4969        let outcome = run_with_albums(
4970            &plan,
4971            &mut manifest,
4972            &mut albums,
4973            &[],
4974            &http,
4975            &fs,
4976            &StubFfmpeg::webp(),
4977            &RecordingClock::new(),
4978            &ExecOptions::default(),
4979        );
4980
4981        assert_eq!(outcome.artifacts_written, 2);
4982        assert_eq!(outcome.failed(), 0);
4983        // Fetched exactly once despite two artifacts consuming it (#90 / #89).
4984        assert_eq!(http.count("root/video.mp4"), 1);
4985        // The webp is transcoded; the mp4 is the raw source verbatim.
4986        assert!(
4987            fs.read_file("creator/album/cover.webp")
4988                .unwrap()
4989                .starts_with(b"RIFF")
4990        );
4991        assert_eq!(
4992            fs.read_file("creator/album/cover.mp4").unwrap(),
4993            b"mp4-bytes"
4994        );
4995    }
4996
4997    #[test]
4998    fn folder_art_delete_clears_album_state() {
4999        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
5000        let mut manifest = Manifest::new();
5001        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5002        albums.insert(
5003            "root".to_owned(),
5004            AlbumArt {
5005                folder_jpg: Some(ArtifactState {
5006                    path: "creator/album/folder.jpg".to_owned(),
5007                    hash: "jh".to_owned(),
5008                }),
5009                folder_webp: None,
5010                folder_mp4: None,
5011            },
5012        );
5013        let plan = Plan {
5014            actions: vec![Action::DeleteArtifact {
5015                kind: ArtifactKind::FolderJpg,
5016                path: "creator/album/folder.jpg".to_owned(),
5017                owner_id: "root".to_owned(),
5018            }],
5019        };
5020
5021        let outcome = run_with_albums(
5022            &plan,
5023            &mut manifest,
5024            &mut albums,
5025            &[],
5026            &ScriptedHttp::new(),
5027            &fs,
5028            &StubFfmpeg::flac(),
5029            &RecordingClock::new(),
5030            &ExecOptions::default(),
5031        );
5032
5033        assert_eq!(outcome.artifacts_deleted, 1);
5034        assert!(!fs.exists("creator/album/folder.jpg"));
5035        // The album row had only the one kind, so it is pruned entirely.
5036        assert!(!albums.contains_key("root"));
5037    }
5038
5039    // ── Phase 9: playlist artifacts ─────────────────────────────────
5040
5041    #[test]
5042    fn playlist_write_uses_inline_content_and_records_state() {
5043        // A playlist body is generated, carried inline. With an empty manifest
5044        // and NO http routes, the write still succeeds — proving it skipped the
5045        // network — and records the playlist store keyed by the playlist id.
5046        let mut manifest = Manifest::new();
5047        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5048        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5049        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
5050        let plan = Plan {
5051            actions: vec![Action::WriteArtifact {
5052                kind: ArtifactKind::Playlist,
5053                path: "Road Trip.m3u8".to_owned(),
5054                source_url: String::new(),
5055                hash: "ph1".to_owned(),
5056                owner_id: "pl1".to_owned(),
5057                content: Some(body.to_owned()),
5058            }],
5059        };
5060        let fs = MemFs::new();
5061
5062        let outcome = run_full(
5063            &plan,
5064            &mut manifest,
5065            &mut albums,
5066            &mut playlists,
5067            &[],
5068            &ScriptedHttp::new(),
5069            &fs,
5070            &StubFfmpeg::flac(),
5071            &RecordingClock::new(),
5072            &ExecOptions::default(),
5073        );
5074
5075        assert_eq!(outcome.artifacts_written, 1);
5076        assert_eq!(outcome.failed(), 0);
5077        // The exact inline bytes were written, verbatim.
5078        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
5079        assert_eq!(
5080            playlists.get("pl1"),
5081            Some(&PlaylistState {
5082                name: "Road Trip".to_owned(),
5083                path: "Road Trip.m3u8".to_owned(),
5084                hash: "ph1".to_owned(),
5085            })
5086        );
5087    }
5088
5089    #[test]
5090    fn playlist_delete_removes_file_and_clears_state() {
5091        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
5092        let mut manifest = Manifest::new();
5093        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5094        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5095        playlists.insert(
5096            "pl1".to_owned(),
5097            PlaylistState {
5098                name: "Old".to_owned(),
5099                path: "Old.m3u8".to_owned(),
5100                hash: "ph1".to_owned(),
5101            },
5102        );
5103        let plan = Plan {
5104            actions: vec![Action::DeleteArtifact {
5105                kind: ArtifactKind::Playlist,
5106                path: "Old.m3u8".to_owned(),
5107                owner_id: "pl1".to_owned(),
5108            }],
5109        };
5110
5111        let outcome = run_full(
5112            &plan,
5113            &mut manifest,
5114            &mut albums,
5115            &mut playlists,
5116            &[],
5117            &ScriptedHttp::new(),
5118            &fs,
5119            &StubFfmpeg::flac(),
5120            &RecordingClock::new(),
5121            &ExecOptions::default(),
5122        );
5123
5124        assert_eq!(outcome.artifacts_deleted, 1);
5125        assert!(!fs.exists("Old.m3u8"));
5126        assert!(
5127            !playlists.contains_key("pl1"),
5128            "the playlist row is cleared on delete"
5129        );
5130    }
5131
5132    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
5133
5134    #[test]
5135    fn rename_move_relocates_cover_and_prunes_old_album() {
5136        // A title/album change moves the audio (Rename) and re-emits the cover
5137        // at the NEW path. The old cover must be removed and the now-empty old
5138        // album directory pruned, leaving no orphan sidecar and no ghost dir.
5139        let mut manifest = Manifest::new();
5140        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
5141        e.cover_jpg = Some(ArtifactState {
5142            path: "Creator/AlbumA/cover.jpg".to_owned(),
5143            hash: "h1".to_owned(),
5144        });
5145        manifest.insert("a", e);
5146        let fs = MemFs::new()
5147            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
5148            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
5149        let plan = Plan {
5150            actions: vec![
5151                Action::Rename {
5152                    from: "Creator/AlbumA/song.flac".to_owned(),
5153                    to: "Creator/AlbumB/song.flac".to_owned(),
5154                },
5155                Action::WriteArtifact {
5156                    kind: ArtifactKind::CoverJpg,
5157                    path: "Creator/AlbumB/cover.jpg".to_owned(),
5158                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5159                    hash: "h1".to_owned(),
5160                    owner_id: "a".to_owned(),
5161                    content: None,
5162                },
5163            ],
5164        };
5165        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
5166
5167        let outcome = run(
5168            &plan,
5169            &mut manifest,
5170            &[],
5171            &http,
5172            &fs,
5173            &StubFfmpeg::flac(),
5174            &RecordingClock::new(),
5175            &ExecOptions::default(),
5176        );
5177
5178        assert_eq!(outcome.failed(), 0);
5179        // Audio moved, the new cover was written, the old cover removed.
5180        assert!(fs.exists("Creator/AlbumB/song.flac"));
5181        assert_eq!(
5182            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
5183            b"new-jpg"
5184        );
5185        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
5186        assert!(!fs.exists("Creator/AlbumA/song.flac"));
5187        // The manifest cover slot now points at the new path.
5188        assert_eq!(
5189            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5190            "Creator/AlbumB/cover.jpg"
5191        );
5192        // The emptied old album directory is pruned; the new one survives.
5193        assert!(!fs.has_dir("Creator/AlbumA"));
5194        assert!(fs.has_dir("Creator/AlbumB"));
5195    }
5196
5197    #[test]
5198    fn rename_move_relocates_folder_art_and_prunes_old_album() {
5199        // An album rename moves folder.jpg: the old file is removed, the album
5200        // store slot advanced to the new path, and the emptied dir pruned.
5201        let mut manifest = Manifest::new();
5202        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5203        albums.insert(
5204            "root".to_owned(),
5205            AlbumArt {
5206                folder_jpg: Some(ArtifactState {
5207                    path: "Creator/AlbumA/folder.jpg".to_owned(),
5208                    hash: "jh".to_owned(),
5209                }),
5210                folder_webp: None,
5211                folder_mp4: None,
5212            },
5213        );
5214        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
5215        let plan = Plan {
5216            actions: vec![Action::WriteArtifact {
5217                kind: ArtifactKind::FolderJpg,
5218                path: "Creator/AlbumB/folder.jpg".to_owned(),
5219                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5220                hash: "jh".to_owned(),
5221                owner_id: "root".to_owned(),
5222                content: None,
5223            }],
5224        };
5225        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
5226
5227        let outcome = run_with_albums(
5228            &plan,
5229            &mut manifest,
5230            &mut albums,
5231            &[],
5232            &http,
5233            &fs,
5234            &StubFfmpeg::flac(),
5235            &RecordingClock::new(),
5236            &ExecOptions::default(),
5237        );
5238
5239        assert_eq!(outcome.failed(), 0);
5240        assert_eq!(
5241            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
5242            b"new-folder"
5243        );
5244        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
5245        assert_eq!(
5246            albums
5247                .get("root")
5248                .unwrap()
5249                .folder_jpg
5250                .as_ref()
5251                .unwrap()
5252                .path,
5253            "Creator/AlbumB/folder.jpg"
5254        );
5255        assert!(!fs.has_dir("Creator/AlbumA"));
5256        assert!(fs.has_dir("Creator/AlbumB"));
5257    }
5258
5259    #[test]
5260    fn prune_empty_dirs_removes_only_empty_dirs() {
5261        // A direct exercise of the prune port's safety guarantees on a mixed
5262        // tree: nested empties go, anything holding a file (hidden ones too)
5263        // stays, and no file is touched.
5264        let fs = MemFs::new()
5265            .with_file("keep/full/song.flac", b"x".to_vec())
5266            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
5267            .with_dir("empty/leaf")
5268            .with_dir("nested/a/b/c");
5269
5270        fs.prune_empty_dirs("").unwrap();
5271
5272        // Every empty directory, however deeply nested, is pruned bottom-up.
5273        for gone in [
5274            "empty",
5275            "empty/leaf",
5276            "nested",
5277            "nested/a",
5278            "nested/a/b",
5279            "nested/a/b/c",
5280        ] {
5281            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
5282        }
5283        // A directory holding any file — including only a hidden dotfile — stays.
5284        assert!(fs.has_dir("keep"));
5285        assert!(fs.has_dir("keep/full"));
5286        assert!(fs.has_dir("hidden"));
5287        // No file was touched.
5288        assert!(fs.exists("keep/full/song.flac"));
5289        assert!(fs.exists("hidden/.suno-manifest.json"));
5290    }
5291
5292    #[test]
5293    fn prune_empty_dirs_never_removes_the_named_root() {
5294        // Pruning under a named root clears its empty children but keeps the
5295        // root itself, even when the root is now empty.
5296        let fs = MemFs::new().with_dir("empty/leaf");
5297        fs.prune_empty_dirs("empty").unwrap();
5298        assert!(fs.has_dir("empty"), "the named root is never removed");
5299        assert!(!fs.has_dir("empty/leaf"));
5300    }
5301
5302    #[test]
5303    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
5304        // If removing the old sidecar fails, the write is a per-clip failure
5305        // that never aborts the run and does NOT advance the state slot, so the
5306        // next identical run re-attempts the cleanup and the tree converges.
5307        let mut manifest = Manifest::new();
5308        let mut e = entry("a.flac", AudioFormat::Flac);
5309        e.cover_jpg = Some(ArtifactState {
5310            path: "AlbumA/cover.jpg".to_owned(),
5311            hash: "h1".to_owned(),
5312        });
5313        manifest.insert("a", e);
5314        let fs = MemFs::new()
5315            .with_file("a.flac", b"AUDIO".to_vec())
5316            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5317        let plan = Plan {
5318            actions: vec![Action::WriteArtifact {
5319                kind: ArtifactKind::CoverJpg,
5320                path: "AlbumB/cover.jpg".to_owned(),
5321                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5322                hash: "h1".to_owned(),
5323                owner_id: "a".to_owned(),
5324                content: None,
5325            }],
5326        };
5327        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5328
5329        // Run 1: the old-cover remove is forced to fail.
5330        fs.arm_fail_remove("AlbumA/cover.jpg");
5331        let first = run(
5332            &plan,
5333            &mut manifest,
5334            &[],
5335            &http,
5336            &fs,
5337            &StubFfmpeg::flac(),
5338            &RecordingClock::new(),
5339            &ExecOptions::default(),
5340        );
5341        assert_eq!(
5342            first.status,
5343            RunStatus::Completed,
5344            "a remove failure never aborts the run"
5345        );
5346        assert_eq!(first.failed(), 1);
5347        // The new cover is written but the old one lingers and the slot is stale.
5348        assert!(fs.exists("AlbumB/cover.jpg"));
5349        assert!(fs.exists("AlbumA/cover.jpg"));
5350        assert_eq!(
5351            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5352            "AlbumA/cover.jpg"
5353        );
5354        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5355
5356        // Run 2: the same plan re-runs with the fault cleared and converges.
5357        fs.disarm_fail_remove("AlbumA/cover.jpg");
5358        let second = run(
5359            &plan,
5360            &mut manifest,
5361            &[],
5362            &http,
5363            &fs,
5364            &StubFfmpeg::flac(),
5365            &RecordingClock::new(),
5366            &ExecOptions::default(),
5367        );
5368        assert_eq!(second.failed(), 0);
5369        assert!(fs.exists("AlbumB/cover.jpg"));
5370        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5371        assert_eq!(
5372            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5373            "AlbumB/cover.jpg"
5374        );
5375        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5376    }
5377
5378    #[test]
5379    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
5380        // The idempotent case: a content-only cover rewrite (hash drift, path
5381        // unchanged) attempts no remove and prunes no live directory. A remove
5382        // failure is armed on the cover path, so any spurious remove would
5383        // surface as a failure — none does.
5384        let mut manifest = Manifest::new();
5385        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
5386        e.cover_jpg = Some(ArtifactState {
5387            path: "Album/cover.jpg".to_owned(),
5388            hash: "h1".to_owned(),
5389        });
5390        manifest.insert("a", e);
5391        let fs = MemFs::new()
5392            .with_file("Album/a.mp3", b"AUDIO".to_vec())
5393            .with_file("Album/cover.jpg", b"old".to_vec());
5394        fs.arm_fail_remove("Album/cover.jpg");
5395        let plan = Plan {
5396            actions: vec![Action::WriteArtifact {
5397                kind: ArtifactKind::CoverJpg,
5398                path: "Album/cover.jpg".to_owned(),
5399                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5400                hash: "h2".to_owned(),
5401                owner_id: "a".to_owned(),
5402                content: None,
5403            }],
5404        };
5405        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5406
5407        let outcome = run(
5408            &plan,
5409            &mut manifest,
5410            &[],
5411            &http,
5412            &fs,
5413            &StubFfmpeg::flac(),
5414            &RecordingClock::new(),
5415            &ExecOptions::default(),
5416        );
5417
5418        assert_eq!(
5419            outcome.failed(),
5420            0,
5421            "no remove is attempted, so the armed failure never fires"
5422        );
5423        assert_eq!(outcome.artifacts_written, 1);
5424        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
5425        assert_eq!(
5426            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
5427            "h2"
5428        );
5429        // The live directory is untouched by prune.
5430        assert!(fs.has_dir("Album"));
5431    }
5432
5433    // ── Concurrency (issue #22) ─────────────────────────────────────
5434
5435    mod concurrency {
5436        use super::*;
5437        use crate::ffmpeg::FfmpegError;
5438        use crate::fs::{FileStat, FsError};
5439        use crate::http::{HttpRequest, TransportError};
5440        use std::future::Future;
5441        use std::pin::Pin;
5442        use std::sync::Arc;
5443        use std::sync::atomic::{AtomicUsize, Ordering};
5444        use std::task::{Context, Poll};
5445
5446        /// A future that pends exactly once before resolving, waking itself so a
5447        /// single-threaded executor re-polls. It forces the [`Http`] port to
5448        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
5449        /// each in-flight request and the true overlap becomes observable.
5450        #[derive(Default)]
5451        struct YieldOnce {
5452            yielded: bool,
5453        }
5454
5455        impl Future for YieldOnce {
5456            type Output = ();
5457            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
5458                if self.yielded {
5459                    Poll::Ready(())
5460                } else {
5461                    self.yielded = true;
5462                    cx.waker().wake_by_ref();
5463                    Poll::Pending
5464                }
5465            }
5466        }
5467
5468        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
5469        /// number of concurrently in-flight requests. Each `send` bumps a live
5470        /// counter, yields once (so peers can start), then delegates.
5471        struct GatedHttp {
5472            inner: ScriptedHttp,
5473            inflight: Arc<AtomicUsize>,
5474            peak: Arc<AtomicUsize>,
5475        }
5476
5477        impl GatedHttp {
5478            fn new(inner: ScriptedHttp) -> Self {
5479                Self {
5480                    inner,
5481                    inflight: Arc::new(AtomicUsize::new(0)),
5482                    peak: Arc::new(AtomicUsize::new(0)),
5483                }
5484            }
5485
5486            fn peak(&self) -> usize {
5487                self.peak.load(Ordering::SeqCst)
5488            }
5489        }
5490
5491        impl Http for GatedHttp {
5492            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
5493                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
5494                self.peak.fetch_max(now, Ordering::SeqCst);
5495                YieldOnce::default().await;
5496                let out = self.inner.send(request).await;
5497                self.inflight.fetch_sub(1, Ordering::SeqCst);
5498                out
5499            }
5500        }
5501
5502        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
5503            let c = clip(id);
5504            let d = desired(c.clone(), format);
5505            let action = Action::Download {
5506                clip: c.clone(),
5507                lineage: LineageContext::own_root(&c),
5508                path: d.path.clone(),
5509                format,
5510            };
5511            (c, d, action)
5512        }
5513
5514        fn opts_with(concurrency: u32) -> ExecOptions {
5515            ExecOptions {
5516                concurrency,
5517                ..small_poll()
5518            }
5519        }
5520
5521        #[test]
5522        fn concurrency_never_exceeds_the_configured_bound() {
5523            let count = 6;
5524            let concurrency = 3;
5525            let mut scripted = ScriptedHttp::new().with_auth();
5526            let mut actions = Vec::new();
5527            let mut desireds = Vec::new();
5528            for i in 0..count {
5529                let id = format!("c{i}");
5530                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5531                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5532                actions.push(action);
5533                desireds.push(d);
5534            }
5535            let http = GatedHttp::new(scripted);
5536            let fs = MemFs::new();
5537            let plan = Plan { actions };
5538            let mut manifest = Manifest::new();
5539
5540            let outcome = run_gated_fs(
5541                &plan,
5542                &mut manifest,
5543                &desireds,
5544                &http,
5545                &fs,
5546                &opts_with(concurrency),
5547            );
5548
5549            assert_eq!(outcome.downloaded, count);
5550            assert!(
5551                http.peak() <= concurrency as usize,
5552                "peak {} exceeded the bound {concurrency}",
5553                http.peak()
5554            );
5555            assert_eq!(
5556                http.peak(),
5557                concurrency as usize,
5558                "expected the run to saturate the bound"
5559            );
5560        }
5561
5562        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
5563        /// outcome. The client is built here so the limiter can be inspected by
5564        /// the caller-facing variant below.
5565        fn run_gated_fs(
5566            plan: &Plan,
5567            manifest: &mut Manifest,
5568            desired: &[Desired],
5569            http: &GatedHttp,
5570            fs: &MemFs,
5571            opts: &ExecOptions,
5572        ) -> ExecOutcome {
5573            let ffmpeg = StubFfmpeg::flac();
5574            let clock = RecordingClock::new();
5575            let mut albums = BTreeMap::new();
5576            let mut playlists = BTreeMap::new();
5577            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5578            pollster::block_on(execute(
5579                plan,
5580                manifest,
5581                &mut albums,
5582                &mut playlists,
5583                desired,
5584                &HashMap::new(),
5585                Ports {
5586                    client: &mut client,
5587                    http,
5588                    fs,
5589                    ffmpeg: &ffmpeg,
5590                    clock: &clock,
5591                },
5592                opts,
5593            ))
5594        }
5595
5596        #[test]
5597        fn a_failing_clip_does_not_abort_the_others() {
5598            let mut scripted = ScriptedHttp::new().with_auth();
5599            scripted = scripted
5600                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
5601                .route("bad.mp3", Reply::status(404))
5602                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
5603            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
5604            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
5605            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
5606            let http = GatedHttp::new(scripted);
5607            let fs = MemFs::new();
5608            let plan = Plan {
5609                actions: vec![a1, a2, a3],
5610            };
5611            let mut manifest = Manifest::new();
5612
5613            let outcome = run_gated_fs(
5614                &plan,
5615                &mut manifest,
5616                &[d1, d2, d3],
5617                &http,
5618                &fs,
5619                &opts_with(3),
5620            );
5621
5622            assert_eq!(outcome.downloaded, 2);
5623            assert_eq!(outcome.failed(), 1);
5624            assert_eq!(outcome.status, RunStatus::Completed);
5625            assert_eq!(outcome.failures[0].clip_id, "bad");
5626            assert!(manifest.get("ok1").is_some());
5627            assert!(manifest.get("ok2").is_some());
5628            assert!(manifest.get("bad").is_none());
5629        }
5630
5631        #[test]
5632        fn outcome_is_identical_across_concurrency_levels() {
5633            // A plan mixing successful and failing downloads with serial phase-2
5634            // actions (a skip and a delete), so both phases contribute.
5635            fn build() -> (Plan, Vec<Desired>) {
5636                let mut actions = Vec::new();
5637                let mut desireds = Vec::new();
5638                for id in ["a", "b", "c", "d"] {
5639                    let (_c, d, action) = download(id, AudioFormat::Mp3);
5640                    actions.push(action);
5641                    desireds.push(d);
5642                }
5643                // A failing download in the middle of the audio set.
5644                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
5645                actions.insert(2, ae);
5646                desireds.push(de);
5647                // Phase-2 actions.
5648                actions.push(Action::Skip {
5649                    clip_id: "gone".to_owned(),
5650                });
5651                actions.push(Action::Delete {
5652                    path: "old.mp3".to_owned(),
5653                    clip_id: "old".to_owned(),
5654                });
5655                (Plan { actions }, desireds)
5656            }
5657
5658            fn http() -> ScriptedHttp {
5659                ScriptedHttp::new()
5660                    .with_auth()
5661                    .route("a.mp3", Reply::ok(b"a".to_vec()))
5662                    .route("b.mp3", Reply::ok(b"b".to_vec()))
5663                    .route("c.mp3", Reply::ok(b"c".to_vec()))
5664                    .route("d.mp3", Reply::ok(b"d".to_vec()))
5665                    .route("fail.mp3", Reply::status(404))
5666            }
5667
5668            fn seed_manifest() -> Manifest {
5669                let mut m = Manifest::new();
5670                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
5671                m
5672            }
5673
5674            let (plan, desireds) = build();
5675
5676            let mut m1 = seed_manifest();
5677            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
5678            let out1 = run_gated_fs(
5679                &plan,
5680                &mut m1,
5681                &desireds,
5682                &GatedHttp::new(http()),
5683                &fs1,
5684                &opts_with(1),
5685            );
5686
5687            let mut m8 = seed_manifest();
5688            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
5689            let out8 = run_gated_fs(
5690                &plan,
5691                &mut m8,
5692                &desireds,
5693                &GatedHttp::new(http()),
5694                &fs8,
5695                &opts_with(8),
5696            );
5697
5698            assert_eq!(out1, out8, "outcome must not depend on concurrency");
5699            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
5700            assert_eq!(out8.downloaded, 4);
5701            assert_eq!(out8.deleted, 1);
5702            assert_eq!(out8.skipped, 1);
5703            assert_eq!(out8.failed(), 1);
5704        }
5705
5706        #[test]
5707        fn a_systemic_disk_full_aborts_promptly() {
5708            let count = 8;
5709            let concurrency = 2;
5710            let mut scripted = ScriptedHttp::new().with_auth();
5711            let mut actions = Vec::new();
5712            let mut desireds = Vec::new();
5713            for i in 0..count {
5714                let id = format!("d{i}");
5715                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
5716                let (_c, d, action) = download(&id, AudioFormat::Mp3);
5717                actions.push(action);
5718                desireds.push(d);
5719            }
5720            // The very first clip's write hits ENOSPC, a systemic failure.
5721            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
5722            let http = GatedHttp::new(scripted);
5723            let plan = Plan { actions };
5724            let mut manifest = Manifest::new();
5725
5726            let outcome = run_gated_fs(
5727                &plan,
5728                &mut manifest,
5729                &desireds,
5730                &http,
5731                &fs,
5732                &opts_with(concurrency),
5733            );
5734
5735            assert_eq!(outcome.status, RunStatus::DiskFull);
5736            assert!(
5737                outcome.downloaded < count,
5738                "a systemic abort must stop remaining work, downloaded {}",
5739                outcome.downloaded
5740            );
5741        }
5742
5743        #[test]
5744        fn limiter_records_a_rate_limit_under_concurrent_calls() {
5745            // Three concurrent FLAC renders; exactly one clip is throttled once
5746            // on its wav_file read. The shared limiter must record that single
5747            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
5748            // the mutex keeps the AIMD state correct under concurrency.
5749            let scripted = ScriptedHttp::new()
5750                .with_auth()
5751                .route_seq(
5752                    "/gen/x/wav_file/",
5753                    vec![
5754                        Reply::status(429),
5755                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
5756                    ],
5757                )
5758                .route(
5759                    "/gen/y/wav_file/",
5760                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
5761                )
5762                .route(
5763                    "/gen/z/wav_file/",
5764                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
5765                )
5766                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
5767                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
5768                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
5769
5770            let mut actions = Vec::new();
5771            let mut desireds = Vec::new();
5772            for id in ["x", "y", "z"] {
5773                let (_c, d, action) = download(id, AudioFormat::Flac);
5774                actions.push(action);
5775                desireds.push(d);
5776            }
5777            let plan = Plan { actions };
5778            let fs = MemFs::new();
5779            let ffmpeg = StubFfmpeg::flac();
5780            let clock = RecordingClock::new();
5781            let mut albums = BTreeMap::new();
5782            let mut playlists = BTreeMap::new();
5783            let mut manifest = Manifest::new();
5784            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
5785
5786            let outcome = pollster::block_on(execute(
5787                &plan,
5788                &mut manifest,
5789                &mut albums,
5790                &mut playlists,
5791                &desireds,
5792                &HashMap::new(),
5793                Ports {
5794                    client: &mut client,
5795                    http: &scripted,
5796                    fs: &fs,
5797                    ffmpeg: &ffmpeg,
5798                    clock: &clock,
5799                },
5800                &opts_with(3),
5801            ));
5802
5803            assert_eq!(outcome.downloaded, 3);
5804            assert_eq!(outcome.failed(), 0);
5805            assert!(
5806                (client.limiter_rate() - 1.0).abs() < 1e-9,
5807                "one 429 must halve the rate to 1.0, got {}",
5808                client.limiter_rate()
5809            );
5810        }
5811
5812        #[test]
5813        fn a_download_is_committed_in_plan_order_around_a_rename() {
5814            // Plan order: rename "orig" away from shared.mp3 first, then download
5815            // a new clip into shared.mp3. A parallel executor that performed the
5816            // download's destination write off plan order would write shared.mp3
5817            // before the rename ran, letting the rename carry those fresh bytes
5818            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
5819            // Committing every destination effect serially in plan order keeps
5820            // moved.mp3 = the original and shared.mp3 = the new download.
5821            let c_new = clip("new");
5822            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
5823            d_new.path = "shared.mp3".to_owned();
5824            let plan = Plan {
5825                actions: vec![
5826                    Action::Rename {
5827                        from: "shared.mp3".to_owned(),
5828                        to: "moved.mp3".to_owned(),
5829                    },
5830                    Action::Download {
5831                        clip: c_new.clone(),
5832                        lineage: LineageContext::own_root(&c_new),
5833                        path: "shared.mp3".to_owned(),
5834                        format: AudioFormat::Mp3,
5835                    },
5836                ],
5837            };
5838            let scripted = ScriptedHttp::new()
5839                .with_auth()
5840                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
5841            let http = GatedHttp::new(scripted);
5842            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
5843            let mut manifest = Manifest::new();
5844            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
5845
5846            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
5847
5848            assert_eq!(outcome.renamed, 1);
5849            assert_eq!(outcome.downloaded, 1);
5850            assert_eq!(
5851                fs.read_file("moved.mp3").as_deref(),
5852                Some(&b"ORIGINAL"[..]),
5853                "the rename must carry the original bytes, untouched by the download"
5854            );
5855            let landed = fs.read_file("shared.mp3").expect("new download must land");
5856            assert_ne!(
5857                landed, b"ORIGINAL",
5858                "the new download must replace the moved original, not corrupt it"
5859            );
5860            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
5861            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
5862        }
5863
5864        #[test]
5865        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
5866            // A systemic disk-full abort strikes the download committed before the
5867            // reformat. Because the reformat's slow render is side-effect-free and
5868            // its destination write + old-file removal only happen in the serial
5869            // commit (which the abort skips), the old file survives and the
5870            // manifest still points at it: no removed-but-referenced file.
5871            let boom = clip("boom");
5872            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
5873            d_boom.path = "boom.mp3".to_owned();
5874            let reformer = clip("r");
5875            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
5876            let plan = Plan {
5877                actions: vec![
5878                    Action::Download {
5879                        clip: boom.clone(),
5880                        lineage: LineageContext::own_root(&boom),
5881                        path: "boom.mp3".to_owned(),
5882                        format: AudioFormat::Mp3,
5883                    },
5884                    Action::Reformat {
5885                        clip: reformer.clone(),
5886                        path: "r_new.mp3".to_owned(),
5887                        from_path: "r_old.flac".to_owned(),
5888                        from: AudioFormat::Flac,
5889                        to: AudioFormat::Mp3,
5890                    },
5891                ],
5892            };
5893            let scripted = ScriptedHttp::new()
5894                .with_auth()
5895                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
5896                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
5897            let http = GatedHttp::new(scripted);
5898            // The download's write hits ENOSPC, a systemic abort.
5899            let fs = MemFs::new()
5900                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
5901                .fail_write_out_of_space("boom.mp3");
5902            let mut manifest = Manifest::new();
5903            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
5904
5905            let outcome = run_gated_fs(
5906                &plan,
5907                &mut manifest,
5908                &[d_boom, d_reformer],
5909                &http,
5910                &fs,
5911                &opts_with(4),
5912            );
5913
5914            assert_eq!(outcome.status, RunStatus::DiskFull);
5915            assert!(
5916                fs.exists("r_old.flac"),
5917                "the old file must survive the abort"
5918            );
5919            assert!(
5920                !fs.exists("r_new.mp3"),
5921                "no reformatted file may be written"
5922            );
5923            let still = manifest.get("r").expect("the manifest must still track r");
5924            assert_eq!(
5925                still.path, "r_old.flac",
5926                "the manifest must still point at the surviving old file"
5927            );
5928            assert_eq!(still.format, AudioFormat::Flac);
5929        }
5930
5931        #[test]
5932        fn a_systemic_abort_leaves_no_untracked_destination_files() {
5933            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
5934            // and the rest never commit. Every file remaining on disk must be one
5935            // the manifest tracks: producers write nothing, so an abort cannot
5936            // strand an untracked file from an in-flight or buffered render.
5937            let mut scripted = ScriptedHttp::new().with_auth();
5938            let mut actions = Vec::new();
5939            let mut desireds = Vec::new();
5940            for id in ["a0", "a1", "boom", "a3", "a4"] {
5941                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
5942                let (_c, d, action) = download(id, AudioFormat::Mp3);
5943                actions.push(action);
5944                desireds.push(d);
5945            }
5946            let http = GatedHttp::new(scripted);
5947            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
5948            let plan = Plan { actions };
5949            let mut manifest = Manifest::new();
5950
5951            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
5952
5953            assert_eq!(outcome.status, RunStatus::DiskFull);
5954            let tracked: std::collections::BTreeSet<String> = manifest
5955                .entries
5956                .values()
5957                .map(|entry| entry.path.clone())
5958                .collect();
5959            for path in fs.paths() {
5960                assert!(
5961                    tracked.contains(&path),
5962                    "found an untracked destination file: {path}"
5963                );
5964            }
5965            assert!(
5966                !fs.exists("a3.mp3"),
5967                "uncommitted renders must not be on disk"
5968            );
5969            assert!(
5970                !fs.exists("a4.mp3"),
5971                "uncommitted renders must not be on disk"
5972            );
5973        }
5974
5975        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
5976        /// live: it bumps a shared counter (tracking the peak) when a transcode
5977        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
5978        /// The [transcode, write] window is a superset of the true in-memory hold,
5979        /// so the observed peak upper-bounds the real one.
5980        struct CountingFfmpeg {
5981            inner: StubFfmpeg,
5982            held: Arc<AtomicUsize>,
5983            peak: Arc<AtomicUsize>,
5984        }
5985
5986        impl Ffmpeg for CountingFfmpeg {
5987            fn wav_to_flac(
5988                &self,
5989                wav: &[u8],
5990            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
5991                let fut = self.inner.wav_to_flac(wav);
5992                let held = self.held.clone();
5993                let peak = self.peak.clone();
5994                async move {
5995                    let out = fut.await;
5996                    if out.is_ok() {
5997                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
5998                        peak.fetch_max(now, Ordering::SeqCst);
5999                    }
6000                    out
6001                }
6002            }
6003
6004            fn mp4_to_webp(
6005                &self,
6006                mp4: &[u8],
6007                settings: WebpEncodeSettings,
6008            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6009                self.inner.mp4_to_webp(mp4, settings)
6010            }
6011        }
6012
6013        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
6014        /// payload counter on each committing write, closing the window opened by
6015        /// [`CountingFfmpeg`].
6016        struct CountingFs {
6017            inner: MemFs,
6018            held: Arc<AtomicUsize>,
6019        }
6020
6021        impl Filesystem for CountingFs {
6022            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
6023                let out = self.inner.write_atomic(path, bytes);
6024                self.held.fetch_sub(1, Ordering::SeqCst);
6025                out
6026            }
6027
6028            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
6029                self.inner.rename(from, to)
6030            }
6031
6032            fn remove(&self, path: &str) -> Result<(), FsError> {
6033                self.inner.remove(path)
6034            }
6035
6036            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
6037                self.inner.prune_empty_dirs(root)
6038            }
6039
6040            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
6041                self.inner.read(path)
6042            }
6043
6044            fn metadata(&self, path: &str) -> Option<FileStat> {
6045                self.inner.metadata(path)
6046            }
6047        }
6048
6049        #[test]
6050        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
6051            // Far more FLAC clips than the concurrency bound. The ordered buffered
6052            // render keeps at most about `concurrency` transcoded payloads live at
6053            // once (never the whole library), so peak held <= concurrency + 1.
6054            let count = 12;
6055            let concurrency = 3;
6056            let mut scripted = ScriptedHttp::new().with_auth();
6057            let mut actions = Vec::new();
6058            let mut desireds = Vec::new();
6059            for i in 0..count {
6060                let id = format!("f{i}");
6061                scripted = scripted
6062                    .route(
6063                        &format!("/gen/{id}/wav_file/"),
6064                        Reply::json(&format!(
6065                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
6066                        )),
6067                    )
6068                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
6069                let (_c, d, action) = download(&id, AudioFormat::Flac);
6070                actions.push(action);
6071                desireds.push(d);
6072            }
6073            let http = GatedHttp::new(scripted);
6074            let held = Arc::new(AtomicUsize::new(0));
6075            let peak = Arc::new(AtomicUsize::new(0));
6076            let ffmpeg = CountingFfmpeg {
6077                inner: StubFfmpeg::flac(),
6078                held: held.clone(),
6079                peak: peak.clone(),
6080            };
6081            let fs = CountingFs {
6082                inner: MemFs::new(),
6083                held: held.clone(),
6084            };
6085            let clock = RecordingClock::new();
6086            let mut albums = BTreeMap::new();
6087            let mut playlists = BTreeMap::new();
6088            let mut manifest = Manifest::new();
6089            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6090            let plan = Plan { actions };
6091
6092            let outcome = pollster::block_on(execute(
6093                &plan,
6094                &mut manifest,
6095                &mut albums,
6096                &mut playlists,
6097                &desireds,
6098                &HashMap::new(),
6099                Ports {
6100                    client: &mut client,
6101                    http: &http,
6102                    fs: &fs,
6103                    ffmpeg: &ffmpeg,
6104                    clock: &clock,
6105                },
6106                &opts_with(concurrency),
6107            ));
6108
6109            assert_eq!(outcome.downloaded, count as usize);
6110            assert_eq!(
6111                held.load(Ordering::SeqCst),
6112                0,
6113                "every payload must be committed"
6114            );
6115            assert!(
6116                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
6117                "peak live payloads {} exceeded the bound {}",
6118                peak.load(Ordering::SeqCst),
6119                concurrency + 1
6120            );
6121            assert!(
6122                peak.load(Ordering::SeqCst) >= 2,
6123                "the render should genuinely overlap, peak was {}",
6124                peak.load(Ordering::SeqCst)
6125            );
6126        }
6127    }
6128}