Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3, tag_wav};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` performs the authenticated WAV render flow. The rest are shared
158/// references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// Audio-producing ([`Download`](Action::Download) /
186/// [`Reformat`](Action::Reformat)), fetched-artifact
187/// ([`WriteArtifact`](Action::WriteArtifact) with no inline content), and stem
188/// ([`WriteStem`](Action::WriteStem)) actions all run their slow,
189/// side-effect-free work concurrently, bounded by
190/// [`ExecOptions::concurrency`]: WAV render + CDN download + transcode + tag for
191/// audio; CDN fetch + optional WebP transcode for artifacts; WAV render + CDN
192/// download for stems. Order-sensitive Suno API calls (WAV render initiation and
193/// poll) are serialised behind an async mutex over the shared [`SunoClient`],
194/// keeping the adaptive limiter and JWT refresh correct. The remaining actions
195/// (retag, rename, delete, artifact deletes, and inline artifact writes) run
196/// serially in plan order.
197///
198/// The outcome is deterministic regardless of completion order: all prepared
199/// results are committed to the manifest in plan-index order, so the same plan
200/// always yields the same manifest and counts whatever the concurrency level. A
201/// per-clip failure is recorded and the run continues; only an auth failure or a
202/// full disk aborts, and it does so promptly by stopping further concurrent work.
203///
204/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
205/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
206/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
207/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
208/// run with the feature off) is tagged exactly as before. The synced `.lrc`
209/// sidecar itself is a generated artifact whose body the caller has already
210/// resolved into the plan, so it is written like any other text sidecar.
211#[allow(clippy::too_many_arguments)]
212pub async fn execute<H, F, G, C>(
213    plan: &Plan,
214    manifest: &mut Manifest,
215    albums: &mut BTreeMap<String, AlbumArt>,
216    playlists: &mut BTreeMap<String, PlaylistState>,
217    desired: &[Desired],
218    synced: &HashMap<String, AlignedLyrics>,
219    ports: Ports<'_, H, F, G, C>,
220    opts: &ExecOptions,
221) -> ExecOutcome
222where
223    H: Http,
224    F: Filesystem,
225    G: Ffmpeg,
226    C: Clock,
227{
228    let Ports {
229        client,
230        http,
231        fs,
232        ffmpeg,
233        clock,
234    } = ports;
235    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
236    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
237    // How many tracked artifact slots reference each path. The inline old-path
238    // cleanup removes a path only once nothing else holds it: each slot that
239    // moves away decrements its reference, and the removal fires only when the
240    // count reaches zero and no action writes the path this run. This keeps a
241    // live file a co-referencing slot still owns (a prior failed swap can leave
242    // two clips sharing a path) while letting the last slot to leave reclaim it,
243    // so nothing is orphaned either (#76).
244    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
245    for (_, entry) in manifest.iter() {
246        for path in entry.artifact_paths() {
247            *tracked_paths.entry(path.to_owned()).or_default() += 1;
248        }
249    }
250    for art in albums.values() {
251        for state in [
252            art.folder_jpg.as_ref(),
253            art.folder_webp.as_ref(),
254            art.folder_mp4.as_ref(),
255        ]
256        .into_iter()
257        .flatten()
258        {
259            *tracked_paths.entry(state.path.clone()).or_default() += 1;
260        }
261    }
262    for playlist in playlists.values() {
263        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
264    }
265    // Static cover art is otherwise fetched twice per clip (#89): once to embed
266    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
267    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
268    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
269    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
270    // its entry on use, so the map holds at most the covers for the clips in
271    // flight (bounded by `concurrency`), never the whole library.
272    let cover_wanted: HashSet<&str> = plan
273        .actions
274        .iter()
275        .filter_map(|action| match action {
276            Action::WriteArtifact {
277                kind: ArtifactKind::CoverJpg,
278                source_url,
279                ..
280            } if !source_url.is_empty() => Some(source_url.as_str()),
281            _ => None,
282        })
283        .collect();
284    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
285    // The `both` video-cover retention keeps `cover.webp` (transcoded) and
286    // `cover.mp4` (raw) for an album from the SAME `video_cover_url`. Cache that
287    // source on its first fetch so the second folder artifact drains it rather
288    // than fetching the same MP4 twice (#90 reuses the #89 fetch-once path).
289    let mut folder_cover_uses: HashMap<&str, u32> = HashMap::new();
290    for action in &plan.actions {
291        if let Action::WriteArtifact {
292            kind: ArtifactKind::FolderWebp | ArtifactKind::FolderMp4,
293            source_url,
294            ..
295        } = action
296            && !source_url.is_empty()
297        {
298            *folder_cover_uses.entry(source_url.as_str()).or_default() += 1;
299        }
300    }
301    let shared_cover_urls: HashSet<&str> = folder_cover_uses
302        .into_iter()
303        .filter(|(_, uses)| *uses > 1)
304        .map(|(url, _)| url)
305        .collect();
306    let ctx = Ctx {
307        http,
308        fs,
309        ffmpeg,
310        clock,
311        opts,
312        by_id: &by_id,
313        by_path: &by_path,
314        synced,
315        cover_cache: &cover_cache,
316        cover_wanted: &cover_wanted,
317        shared_cover_urls: &shared_cover_urls,
318    };
319
320    let mut outcome = ExecOutcome::default();
321    // Destinations whose write has actually committed this run, gating old-path
322    // cleanup so a vacated sidecar/stem is kept only when a *successful* write
323    // also targets it (#142). Serial commit order makes this a clean prefix.
324    let mut committed: BTreeSet<String> = BTreeSet::new();
325
326    // Audio (Download/Reformat), fetched-artifact (WriteArtifact with no inline
327    // content), and stem (WriteStem) actions all split their work to maintain the
328    // CRITICAL DELETION-SAFETY INVARIANT: NO destination write, file removal, or
329    // manifest/album/playlist mutation happens off plan order:
330    //
331    // - concurrent preparers ([`prepare`](Ctx::prepare)) do only the slow,
332    //   side-effect-free work — fetch CDN/WAV bytes, transcode, tag — returning
333    //   bytes and the routing metadata the committer needs; and
334    // - a single serial committer below writes those bytes to the destination,
335    //   removes any superseded file, and records the manifest/album/playlist
336    //   entry, in strict plan-index order, interleaved with the remaining serial
337    //   actions.
338    //
339    // The shared client is the only `&mut` port and its API calls must stay
340    // ordered, so it rides behind an async mutex; each producer locks it only for
341    // the brief WAV-render calls and runs the heavy work unlocked. Prepares are
342    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
343    // so at most about `concurrency` payloads are ever held in memory — never the
344    // whole library.
345    let client_lock = AsyncMutex::new(client);
346    let concurrency = opts.concurrency.max(1) as usize;
347    let ctx_ref = &ctx;
348    let client_lock_ref = &client_lock;
349    // Clip IDs already in the manifest before this plan runs. Per-clip
350    // artifacts and stems for these clips are prepared concurrently; for new
351    // clips (not yet in the manifest) the serial apply path handles them after
352    // the audio commit, so the owner-absent guard fires correctly.
353    let pre_clip_ids: HashSet<String> = manifest.entries.keys().cloned().collect();
354    // Clip IDs with a concurrent audio (Download/Reformat) action this run.
355    // Used to keep CoverJpg serial when its audio producer will cache the same
356    // cover URL (#89); preparing both concurrently races the remove vs insert.
357    let audio_clip_ids: HashSet<&str> = plan
358        .actions
359        .iter()
360        .filter_map(|action| match action {
361            Action::Download { clip, .. } | Action::Reformat { clip, .. } => Some(clip.id.as_str()),
362            _ => None,
363        })
364        .collect();
365    let mut prepares = stream::iter(
366        plan.actions
367            .iter()
368            .filter(|action| is_prepareable(action, &pre_clip_ids, &audio_clip_ids))
369            .map(|action| async move { ctx_ref.prepare(client_lock_ref, action).await }),
370    )
371    .buffered(concurrency);
372
373    for action in &plan.actions {
374        // Prepareable actions pull their pre-fetched bytes (yielded in plan order)
375        // and commit them here; every other action applies its own effect. Both the
376        // serial commit and the serial apply run in the same serial loop, so all
377        // destination and manifest effects keep the plan's order exactly.
378        let result = if is_prepareable(action, &pre_clip_ids, &audio_clip_ids) {
379            match prepares.next().await {
380                Some(Ok(Prepared::Audio(rendered))) => ctx.commit_audio(manifest, rendered),
381                Some(Ok(Prepared::Artifact(prepared))) => ctx.commit_artifact(
382                    manifest,
383                    albums,
384                    playlists,
385                    prepared,
386                    &mut tracked_paths,
387                    &committed,
388                ),
389                Some(Ok(Prepared::Stem(prepared))) => {
390                    ctx.commit_stem(manifest, prepared, &mut tracked_paths, &committed)
391                }
392                Some(Err(fail)) => Err(fail),
393                None => unreachable!("buffered yields one result per prepareable action"),
394            }
395        } else {
396            ctx.apply(
397                client_lock_ref,
398                action,
399                manifest,
400                albums,
401                playlists,
402                &mut tracked_paths,
403                &committed,
404            )
405            .await
406        };
407        match result {
408            Ok(effect) => {
409                outcome.record(effect);
410                // Record this action's destination now that its write succeeded.
411                // A later action vacating a path removes it only when no
412                // *committed* write also targets it; commit is strictly serial in
413                // plan order, so a planned-but-failed or not-yet-run write never
414                // protects a stale file from cleanup (#142).
415                if let Some(dest) = written_path(action) {
416                    committed.insert(dest.to_owned());
417                }
418            }
419            Err(fail) => {
420                let abort = abort_status(fail.class);
421                outcome.failures.push(Failure {
422                    clip_id: fail.clip_id,
423                    reason: fail.reason,
424                });
425                if let Some(status) = abort {
426                    // A systemic abort stops the run. Dropping the prepare stream
427                    // cancels any in-flight or completed-but-uncommitted producer;
428                    // because producers touch nothing on disk, the destination and
429                    // manifest are left exactly as the committed prefix wrote them,
430                    // with no untracked files and no removed-but-referenced file.
431                    outcome.status = status;
432                    break;
433                }
434            }
435        }
436    }
437    drop(prepares);
438
439    // Renames and deletes can leave an album directory empty; prune those ghost
440    // directories bottom-up. This runs on both the completed and the aborted
441    // paths, and is best-effort: a prune failure is only a missed tidy that the
442    // next run repeats, never a reason to fail the run.
443    let _ = fs.prune_empty_dirs("");
444    outcome
445}
446
447/// Whether an action has a slow, side-effect-free network or transcode phase
448/// that benefits from concurrent preparation. Audio actions (Download/Reformat)
449/// are always prepareable. A fetched artifact (WriteArtifact with no inline
450/// content) or stem write (WriteStem) is prepareable only when its owning clip
451/// was already in the manifest before this plan started: a new clip's sidecar
452/// cannot be prepared concurrently because its audio has not committed yet (the
453/// manifest entry doesn't exist at prepare time), so it falls through to the
454/// serial apply path which checks the manifest after the audio commits.
455///
456/// Two additional cases stay serial to preserve fetch-once dedup:
457///
458/// - [`FolderWebp`](ArtifactKind::FolderWebp) / [`FolderMp4`](ArtifactKind::FolderMp4):
459///   the `both` retention shares one `video_cover_url`; serial ordering lets
460///   the first fetch insert into `cover_cache` and the second drain it (#90).
461/// - [`CoverJpg`](ArtifactKind::CoverJpg) whose owner clip also has an audio
462///   action this run: the audio producer caches the cover bytes in `cover_cache`
463///   (#89); a concurrent CoverJpg drains the cache before the insert, causing a
464///   double fetch and a leaked entry.
465fn is_prepareable(
466    action: &Action,
467    pre_clip_ids: &HashSet<String>,
468    audio_clip_ids: &HashSet<&str>,
469) -> bool {
470    match action {
471        Action::Download { .. } | Action::Reformat { .. } => true,
472        Action::WriteArtifact {
473            kind,
474            owner_id,
475            content: None,
476            ..
477        } => {
478            if matches!(kind, ArtifactKind::FolderWebp | ArtifactKind::FolderMp4) {
479                return false;
480            }
481            if *kind == ArtifactKind::CoverJpg && audio_clip_ids.contains(owner_id.as_str()) {
482                return false;
483            }
484            !is_per_clip_kind(*kind) || pre_clip_ids.contains(owner_id.as_str())
485        }
486        Action::WriteStem { clip_id, .. } => pre_clip_ids.contains(clip_id.as_str()),
487        _ => false,
488    }
489}
490
491/// The destination path an action writes on success, or `None` for actions that
492/// write no file (skips, deletes). The serial committer records this once the
493/// action succeeds, so a later action vacating that same path keeps it rather
494/// than removing a freshly written file (#142, #76).
495fn written_path(action: &Action) -> Option<&str> {
496    match action {
497        Action::Download { path, .. }
498        | Action::Reformat { path, .. }
499        | Action::WriteArtifact { path, .. }
500        | Action::WriteStem { path, .. } => Some(path),
501        Action::Rename { to, .. }
502        | Action::MoveArtifact { to, .. }
503        | Action::MoveStem { to, .. } => Some(to),
504        _ => None,
505    }
506}
507
508/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
509/// committer needs to place them. Produced concurrently and side-effect-free (no
510/// destination write, no removal, no manifest touch); [`commit_audio`] applies
511/// all of those in plan order.
512struct RenderedAudio {
513    clip_id: String,
514    path: String,
515    format: AudioFormat,
516    /// The superseded file to remove after the new one lands (a [`Reformat`]),
517    /// or `None` for a plain [`Download`].
518    from_path: Option<String>,
519    effect: Effect,
520    bytes: Vec<u8>,
521}
522
523/// A fetched-but-uncommitted artifact result: bytes for one
524/// [`WriteArtifact`](Action::WriteArtifact) with no inline content. Produced
525/// concurrently and side-effect-free; [`commit_artifact`](Ctx::commit_artifact)
526/// applies all filesystem and manifest/album/playlist effects in plan order.
527struct PreparedArtifact {
528    kind: ArtifactKind,
529    path: String,
530    hash: String,
531    owner_id: String,
532    bytes: Vec<u8>,
533}
534
535/// A fetched-but-uncommitted stem result: bytes for one
536/// [`WriteStem`](Action::WriteStem) action (including any WAV render + poll).
537/// Produced concurrently and side-effect-free; [`commit_stem`](Ctx::commit_stem)
538/// applies all filesystem and manifest effects in plan order.
539struct PreparedStem {
540    clip_id: String,
541    key: String,
542    path: String,
543    hash: String,
544    bytes: Vec<u8>,
545}
546
547/// The result of one concurrent preparation: audio, an artifact, or a stem.
548enum Prepared {
549    Audio(RenderedAudio),
550    Artifact(PreparedArtifact),
551    Stem(PreparedStem),
552}
553
554/// What an applied action did, for the outcome counters.
555enum Effect {
556    Downloaded,
557    Reformatted,
558    Retagged,
559    Renamed,
560    Deleted,
561    Skipped,
562    ArtifactWritten,
563    ArtifactDeleted,
564}
565
566/// How a failure should be handled (SYNC-17).
567#[derive(Debug, Clone, Copy)]
568enum Class {
569    /// Stop the account run; do not retry.
570    Auth,
571    /// Stop the account run: a full disk is systemic, like auth, so aborting
572    /// beats skipping every remaining clip (each of which would first burn a
573    /// server-side WAV-render budget before failing the same way).
574    Disk,
575    /// Retry a bounded number of times, then record and skip.
576    Transient,
577    /// Record and skip immediately.
578    Permanent,
579}
580
581/// A classified action failure attributed to a clip.
582struct Fail {
583    class: Class,
584    clip_id: String,
585    reason: String,
586}
587
588/// The run-ending status for a failure class, or `None` when the failure is
589/// per-clip and the run continues.
590fn abort_status(class: Class) -> Option<RunStatus> {
591    match class {
592        Class::Auth => Some(RunStatus::AuthAborted),
593        Class::Disk => Some(RunStatus::DiskFull),
594        Class::Transient | Class::Permanent => None,
595    }
596}
597
598fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
599    Fail {
600        class: Class::Auth,
601        clip_id: clip_id.into(),
602        reason: reason.into(),
603    }
604}
605
606fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
607    Fail {
608        class: Class::Transient,
609        clip_id: clip_id.into(),
610        reason: reason.into(),
611    }
612}
613
614fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
615    Fail {
616        class: Class::Permanent,
617        clip_id: clip_id.into(),
618        reason: reason.into(),
619    }
620}
621
622fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
623    Fail {
624        class: Class::Disk,
625        clip_id: clip_id.into(),
626        reason: reason.into(),
627    }
628}
629
630/// Whether an artifact kind is album-scoped folder art (owned by a root id and
631/// recorded on the album store) rather than a per-clip sidecar (recorded on the
632/// manifest).
633fn is_album_kind(kind: ArtifactKind) -> bool {
634    matches!(
635        kind,
636        ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4
637    )
638}
639
640/// True for the library-scoped playlist artifact, routed to the playlist store.
641fn is_playlist_kind(kind: ArtifactKind) -> bool {
642    matches!(kind, ArtifactKind::Playlist)
643}
644
645/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
646/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
647/// root/playlist id that is deliberately absent from the manifest.
648fn is_per_clip_kind(kind: ArtifactKind) -> bool {
649    matches!(
650        kind,
651        ArtifactKind::CoverJpg
652            | ArtifactKind::CoverWebp
653            | ArtifactKind::DetailsTxt
654            | ArtifactKind::LyricsTxt
655            | ArtifactKind::Lrc
656            | ArtifactKind::VideoMp4
657    )
658}
659
660/// Recover a playlist's display name from its `.m3u8` path's file stem.
661///
662/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
663/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
664/// this recovered name is a convenience for humans and its lossiness (the
665/// sanitiser is not reversible) never affects a decision.
666fn playlist_name_from_path(path: &str) -> String {
667    std::path::Path::new(path)
668        .file_stem()
669        .map(|stem| stem.to_string_lossy().into_owned())
670        .unwrap_or_default()
671}
672
673/// A classified fetch failure, not yet attributed to a clip.
674struct FetchError {
675    class: Class,
676    reason: String,
677    retry_after: Option<Duration>,
678}
679
680impl FetchError {
681    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
682        Self {
683            class: Class::Transient,
684            reason: reason.into(),
685            retry_after,
686        }
687    }
688
689    fn permanent(reason: impl Into<String>) -> Self {
690        Self {
691            class: Class::Permanent,
692            reason: reason.into(),
693            retry_after: None,
694        }
695    }
696
697    fn attribute(self, clip_id: &str) -> Fail {
698        Fail {
699            class: self.class,
700            clip_id: clip_id.to_owned(),
701            reason: self.reason,
702        }
703    }
704}
705
706/// The shared, read-only context threaded through every action handler.
707struct Ctx<'a, H, F, G, C> {
708    http: &'a H,
709    fs: &'a F,
710    ffmpeg: &'a G,
711    clock: &'a C,
712    opts: &'a ExecOptions,
713    by_id: &'a HashMap<&'a str, &'a Desired>,
714    by_path: &'a HashMap<&'a str, &'a Desired>,
715    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
716    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
717    /// lyric text; a clip absent here is tagged exactly as before. Populated by
718    /// the caller (the fetch is IO), so the engine stays free of direct IO.
719    synced: &'a HashMap<String, AlignedLyrics>,
720    /// Static cover art the audio producer already fetched to embed in the tag,
721    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
722    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
723    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
724    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
725    /// concurrent producers only ever insert, and the lock is never held across an
726    /// await.
727    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
728    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
729    /// a cover only when its URL is here, so a clip whose cover is embedded but
730    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
731    cover_wanted: &'a HashSet<&'a str>,
732    /// Album video-cover source URLs fetched by more than one folder artifact
733    /// this run. The `both` retention derives `cover.webp` (transcoded) and
734    /// `cover.mp4` (raw) from the SAME `video_cover_url`; the first fetch caches
735    /// the raw source here so the sibling drains it instead of re-fetching (#90
736    /// reuses the #89 fetch-once path). `FolderWebp` sorts before `FolderMp4`, so
737    /// the raw source is always cached before the raw sidecar reads it.
738    shared_cover_urls: &'a HashSet<&'a str>,
739}
740
741impl<H, F, G, C> Ctx<'_, H, F, G, C>
742where
743    H: Http,
744    F: Filesystem,
745    G: Ffmpeg,
746    C: Clock,
747{
748    /// Apply one serial action, returning what it did or why it failed.
749    ///
750    /// Audio actions ([`Download`](Action::Download) and
751    /// [`Reformat`](Action::Reformat)) are always prepared concurrently and never
752    /// reach here. Fetched [`WriteArtifact`](Action::WriteArtifact) and
753    /// [`WriteStem`](Action::WriteStem) actions reach here only when their owning
754    /// clip was NOT in the manifest at plan start (new clips); those for existing
755    /// clips are prepared concurrently and commit through the stream path.
756    #[allow(clippy::too_many_arguments)]
757    async fn apply(
758        &self,
759        client_lock: &ClientLock<'_, C>,
760        action: &Action,
761        manifest: &mut Manifest,
762        albums: &mut BTreeMap<String, AlbumArt>,
763        playlists: &mut BTreeMap<String, PlaylistState>,
764        tracked_paths: &mut HashMap<String, u32>,
765        committed: &BTreeSet<String>,
766    ) -> Result<Effect, Fail> {
767        match action {
768            Action::Download { .. } | Action::Reformat { .. } => {
769                unreachable!("audio actions are prepared concurrently")
770            }
771            Action::Retag {
772                clip,
773                lineage,
774                path,
775            } => self.retag(manifest, clip, lineage, path).await,
776            Action::Rename { from, to } => self.rename(manifest, from, to),
777            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
778            Action::Skip { clip_id } => {
779                self.refresh_preserve(manifest, clip_id);
780                Ok(Effect::Skipped)
781            }
782            Action::WriteArtifact {
783                kind,
784                path,
785                source_url,
786                hash,
787                owner_id,
788                content,
789            } => {
790                // Inline text sidecars carry their body in the plan.
791                // Fetched artifacts for clips already in the manifest are prepared
792                // concurrently and never reach here. Fetched artifacts for new clips
793                // (owner not in the manifest at plan start) are handled here in the
794                // serial path, with the owner-absent guard fired before any fetch.
795                let bytes = match content.as_deref() {
796                    Some(text) => text.as_bytes().to_vec(),
797                    None => {
798                        if is_per_clip_kind(*kind) && manifest.get(owner_id).is_none() {
799                            // Owner never landed (audio failed or never existed).
800                            // Drain any stale cache entry so it doesn't outlive
801                            // this clip, then skip without fetching.
802                            self.cover_cache_lock().remove(source_url);
803                            return Ok(Effect::Skipped);
804                        }
805                        self.artifact_bytes(*kind, source_url, owner_id).await?
806                    }
807                };
808                self.commit_artifact(
809                    manifest,
810                    albums,
811                    playlists,
812                    PreparedArtifact {
813                        kind: *kind,
814                        path: path.clone(),
815                        hash: hash.clone(),
816                        owner_id: owner_id.clone(),
817                        bytes,
818                    },
819                    tracked_paths,
820                    committed,
821                )
822            }
823            Action::DeleteArtifact {
824                kind,
825                path,
826                owner_id,
827            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
828            Action::MoveArtifact {
829                kind,
830                from,
831                to,
832                source_url,
833                hash,
834                owner_id,
835            } => {
836                self.move_artifact(
837                    manifest,
838                    albums,
839                    playlists,
840                    *kind,
841                    from,
842                    to,
843                    source_url,
844                    hash,
845                    owner_id,
846                    tracked_paths,
847                    committed,
848                )
849                .await
850            }
851            Action::WriteStem {
852                clip_id,
853                key,
854                stem_id,
855                path,
856                source_url,
857                format,
858                hash,
859            } => {
860                // Stems for clips already in the manifest at plan start are
861                // prepared concurrently and never reach here. Stems for new
862                // clips (owner not yet in the manifest) are fetched here in the
863                // serial path, after the audio commit, with the same owner-absent
864                // guard as the old serial write_stem.
865                if manifest.get(clip_id).is_none() {
866                    return Ok(Effect::Skipped);
867                }
868                let bytes = self
869                    .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, *format)
870                    .await?;
871                self.commit_stem(
872                    manifest,
873                    PreparedStem {
874                        clip_id: clip_id.clone(),
875                        key: key.clone(),
876                        path: path.clone(),
877                        hash: hash.clone(),
878                        bytes,
879                    },
880                    tracked_paths,
881                    committed,
882                )
883            }
884            Action::DeleteStem { clip_id, key, path } => {
885                self.delete_stem(manifest, clip_id, key, path)
886            }
887            Action::MoveStem {
888                clip_id,
889                key,
890                stem_id,
891                from,
892                to,
893                source_url,
894                format,
895                hash,
896            } => {
897                self.move_stem(
898                    client_lock,
899                    manifest,
900                    clip_id,
901                    key,
902                    stem_id,
903                    from,
904                    to,
905                    source_url,
906                    *format,
907                    hash,
908                    tracked_paths,
909                    committed,
910                )
911                .await
912            }
913        }
914    }
915
916    /// Render one audio action's tagged bytes, side-effect-free.
917    ///
918    /// This is the concurrent part: it fetches, transcodes, and tags the file
919    /// (through shared ports, plus the client behind `client_lock`), then returns
920    /// the bytes and where they must go. It deliberately writes nothing, removes
921    /// nothing, and never touches `manifest`, so many run at once and an aborted
922    /// run can drop them with no destination or manifest effect. The serial
923    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
924    async fn prepare_audio(
925        &self,
926        client_lock: &ClientLock<'_, C>,
927        action: &Action,
928    ) -> Result<RenderedAudio, Fail> {
929        match action {
930            Action::Download {
931                clip,
932                lineage,
933                path,
934                format,
935            } => {
936                let bytes = self
937                    .produce_audio(client_lock, clip, lineage, *format)
938                    .await?;
939                Ok(RenderedAudio {
940                    clip_id: clip.id.clone(),
941                    path: path.clone(),
942                    format: *format,
943                    from_path: None,
944                    effect: Effect::Downloaded,
945                    bytes,
946                })
947            }
948            Action::Reformat {
949                clip,
950                path,
951                from_path,
952                from: _,
953                to,
954            } => {
955                // A Reformat action carries no lineage, so recover it from the
956                // desired set (the same context that drove naming and the hash),
957                // falling back to a self-rooted context when the clip is not in
958                // the current selection.
959                let lineage = self
960                    .by_id
961                    .get(clip.id.as_str())
962                    .map(|d| d.lineage.clone())
963                    .unwrap_or_else(|| LineageContext::own_root(clip));
964                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
965                Ok(RenderedAudio {
966                    clip_id: clip.id.clone(),
967                    path: path.clone(),
968                    format: *to,
969                    from_path: Some(from_path.clone()),
970                    effect: Effect::Reformatted,
971                    bytes,
972                })
973            }
974            _ => unreachable!("prepare_audio only handles audio actions"),
975        }
976    }
977
978    /// Commit one rendered audio result serially, in plan order.
979    ///
980    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
981    /// the superseded file, then records the manifest entry. Ordering the write
982    /// before the removal keeps a crash from losing both copies; keeping all of
983    /// this off the concurrent phase preserves the sequential executor's plan-order
984    /// guarantee for every destination and manifest effect.
985    fn commit_audio(
986        &self,
987        manifest: &mut Manifest,
988        rendered: RenderedAudio,
989    ) -> Result<Effect, Fail> {
990        let RenderedAudio {
991            clip_id,
992            path,
993            format,
994            from_path,
995            effect,
996            bytes,
997        } = rendered;
998        let size = self.write_verify(&clip_id, &path, &bytes)?;
999        if let Some(from) = from_path {
1000            // The new file is safely in place; only now drop the old rendering.
1001            self.fs.remove(&from).map_err(|err| {
1002                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
1003            })?;
1004        }
1005        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
1006        Ok(effect)
1007    }
1008
1009    /// Lock the cover cache, panicking on poison (uniform access point, no repeated magic string).
1010    fn cover_cache_lock(&self) -> std::sync::MutexGuard<'_, HashMap<String, Vec<u8>>> {
1011        self.cover_cache.lock().expect("cover cache mutex poisoned")
1012    }
1013
1014    /// Prepare one concurrent action side-effect-free, returning the bytes and
1015    /// routing metadata the serial committer needs. Only actions that pass
1016    /// [`is_prepareable`] reach here.
1017    async fn prepare(
1018        &self,
1019        client_lock: &ClientLock<'_, C>,
1020        action: &Action,
1021    ) -> Result<Prepared, Fail> {
1022        match action {
1023            Action::Download { .. } | Action::Reformat { .. } => self
1024                .prepare_audio(client_lock, action)
1025                .await
1026                .map(Prepared::Audio),
1027            Action::WriteArtifact {
1028                kind,
1029                path,
1030                source_url,
1031                hash,
1032                owner_id,
1033                content: None,
1034            } => {
1035                let bytes = self.artifact_bytes(*kind, source_url, owner_id).await?;
1036                Ok(Prepared::Artifact(PreparedArtifact {
1037                    kind: *kind,
1038                    path: path.clone(),
1039                    hash: hash.clone(),
1040                    owner_id: owner_id.clone(),
1041                    bytes,
1042                }))
1043            }
1044            Action::WriteStem {
1045                clip_id,
1046                key,
1047                stem_id,
1048                path,
1049                source_url,
1050                format,
1051                hash,
1052            } => {
1053                let bytes = self
1054                    .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, *format)
1055                    .await?;
1056                Ok(Prepared::Stem(PreparedStem {
1057                    clip_id: clip_id.clone(),
1058                    key: key.clone(),
1059                    path: path.clone(),
1060                    hash: hash.clone(),
1061                    bytes,
1062                }))
1063            }
1064            _ => unreachable!("prepare only handles prepareable actions"),
1065        }
1066    }
1067
1068    /// Commit one prepared artifact result serially, in plan order.
1069    ///
1070    /// Writes the pre-fetched bytes, removes any stale copy left at the previously
1071    /// tracked path (when the audio moved), then records the slot on the manifest,
1072    /// album, or playlist store. All filesystem and state effects are identical to
1073    /// what the former serial [`write_artifact`] did; moving the slow fetch (and
1074    /// optional transcode) into [`prepare`] is the only change.
1075    ///
1076    /// A per-clip sidecar is skipped when its owning clip's audio is absent from
1077    /// the manifest: the audio failed or never existed this run, so the sidecar
1078    /// must not land without an owner (the preparation was speculative).
1079    fn commit_artifact(
1080        &self,
1081        manifest: &mut Manifest,
1082        albums: &mut BTreeMap<String, AlbumArt>,
1083        playlists: &mut BTreeMap<String, PlaylistState>,
1084        prepared: PreparedArtifact,
1085        tracked_paths: &mut HashMap<String, u32>,
1086        committed: &BTreeSet<String>,
1087    ) -> Result<Effect, Fail> {
1088        let PreparedArtifact {
1089            kind,
1090            path,
1091            hash,
1092            owner_id,
1093            bytes,
1094        } = prepared;
1095        if is_per_clip_kind(kind) && manifest.get(&owner_id).is_none() {
1096            return Ok(Effect::Skipped);
1097        }
1098        let old_path = match kind {
1099            ArtifactKind::CoverJpg => manifest
1100                .get(&owner_id)
1101                .and_then(|e| e.cover_jpg.as_ref())
1102                .map(|s| s.path.clone()),
1103            ArtifactKind::CoverWebp => manifest
1104                .get(&owner_id)
1105                .and_then(|e| e.cover_webp.as_ref())
1106                .map(|s| s.path.clone()),
1107            ArtifactKind::DetailsTxt => manifest
1108                .get(&owner_id)
1109                .and_then(|e| e.details_txt.as_ref())
1110                .map(|s| s.path.clone()),
1111            ArtifactKind::LyricsTxt => manifest
1112                .get(&owner_id)
1113                .and_then(|e| e.lyrics_txt.as_ref())
1114                .map(|s| s.path.clone()),
1115            ArtifactKind::Lrc => manifest
1116                .get(&owner_id)
1117                .and_then(|e| e.lrc.as_ref())
1118                .map(|s| s.path.clone()),
1119            ArtifactKind::VideoMp4 => manifest
1120                .get(&owner_id)
1121                .and_then(|e| e.video_mp4.as_ref())
1122                .map(|s| s.path.clone()),
1123            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4 => albums
1124                .get(&owner_id)
1125                .and_then(|a| a.artifact(kind))
1126                .map(|s| s.path.clone()),
1127            ArtifactKind::Playlist => None,
1128        };
1129        self.write_verify(&owner_id, &path, &bytes)?;
1130        if let Some(old) = old_path.as_deref()
1131            && !old.is_empty()
1132            && old != path
1133        {
1134            let still_referenced = tracked_paths
1135                .get_mut(old)
1136                .map(|count| {
1137                    *count = count.saturating_sub(1);
1138                    *count > 0
1139                })
1140                .unwrap_or(false);
1141            if !still_referenced && !committed.contains(old) {
1142                self.fs.remove(old).map_err(|err| {
1143                    permanent_fail(
1144                        &owner_id,
1145                        format!("could not remove old sidecar {old}: {err}"),
1146                    )
1147                })?;
1148            }
1149        }
1150        if is_album_kind(kind) {
1151            albums.entry(owner_id.to_owned()).or_default().set(
1152                kind,
1153                Some(ArtifactState {
1154                    path: path.to_owned(),
1155                    hash: hash.to_owned(),
1156                }),
1157            );
1158        } else if is_playlist_kind(kind) {
1159            playlists.insert(
1160                owner_id.to_owned(),
1161                PlaylistState {
1162                    name: playlist_name_from_path(&path),
1163                    path: path.to_owned(),
1164                    hash: hash.to_owned(),
1165                },
1166            );
1167        } else if let Some(entry) = manifest.entries.get_mut(&owner_id) {
1168            set_manifest_artifact(
1169                entry,
1170                kind,
1171                Some(ArtifactState {
1172                    path: path.to_owned(),
1173                    hash: hash.to_owned(),
1174                }),
1175            );
1176        }
1177        Ok(Effect::ArtifactWritten)
1178    }
1179
1180    /// Commit one prepared stem result serially, in plan order.
1181    ///
1182    /// Writes the pre-fetched bytes (including any WAV render), removes any stale
1183    /// copy left at the previously tracked path, and records the stem slot.
1184    /// All filesystem and manifest effects are identical to what the former serial
1185    /// [`write_stem`] did; moving the slow fetch into [`prepare`] is the only change.
1186    ///
1187    /// Skipped when the owning clip's audio is absent from the manifest.
1188    fn commit_stem(
1189        &self,
1190        manifest: &mut Manifest,
1191        prepared: PreparedStem,
1192        tracked_paths: &mut HashMap<String, u32>,
1193        committed: &BTreeSet<String>,
1194    ) -> Result<Effect, Fail> {
1195        let PreparedStem {
1196            clip_id,
1197            key,
1198            path,
1199            hash,
1200            bytes,
1201        } = prepared;
1202        if manifest.get(&clip_id).is_none() {
1203            return Ok(Effect::Skipped);
1204        }
1205        let old_path = manifest
1206            .get(&clip_id)
1207            .and_then(|e| e.stems.get(&key))
1208            .map(|s| s.path.clone());
1209        self.write_verify(&clip_id, &path, &bytes)?;
1210        if let Some(old) = old_path.as_deref()
1211            && !old.is_empty()
1212            && old != path
1213        {
1214            let still_referenced = tracked_paths
1215                .get_mut(old)
1216                .map(|count| {
1217                    *count = count.saturating_sub(1);
1218                    *count > 0
1219                })
1220                .unwrap_or(false);
1221            if !still_referenced && !committed.contains(old) {
1222                self.fs.remove(old).map_err(|err| {
1223                    permanent_fail(&clip_id, format!("could not remove old stem {old}: {err}"))
1224                })?;
1225            }
1226        }
1227        if let Some(entry) = manifest.entries.get_mut(&clip_id) {
1228            set_manifest_stem(
1229                entry,
1230                &key,
1231                Some(ArtifactState {
1232                    path: path.to_owned(),
1233                    hash: hash.to_owned(),
1234                }),
1235            );
1236        }
1237        Ok(Effect::ArtifactWritten)
1238    }
1239
1240    /// Re-tag the existing file in place to match current metadata and art.
1241    async fn retag(
1242        &self,
1243        manifest: &mut Manifest,
1244        clip: &Clip,
1245        lineage: &LineageContext,
1246        path: &str,
1247    ) -> Result<Effect, Fail> {
1248        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
1249            return Err(permanent_fail(
1250                &clip.id,
1251                "retag target missing from manifest",
1252            ));
1253        };
1254
1255        if format == AudioFormat::Wav {
1256            let (meta, synced) = self.track_meta(clip, lineage);
1257            let cover = self.fetch_cover(clip).await;
1258            let existing = self.fs.read(path).map_err(|err| {
1259                permanent_fail(&clip.id, format!("could not read for retag: {err}"))
1260            })?;
1261            let tagged = tag_wav(&existing, &meta, cover.as_deref(), synced)
1262                .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
1263            let size = self.write_verify(&clip.id, path, &tagged)?;
1264            self.refresh_hashes(manifest, &clip.id, Some(size));
1265            return Ok(Effect::Retagged);
1266        }
1267
1268        let (meta, synced) = self.track_meta(clip, lineage);
1269        let cover = self.fetch_cover(clip).await;
1270        let existing = self
1271            .fs
1272            .read(path)
1273            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
1274        let tagged = match format {
1275            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
1276            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
1277            AudioFormat::Wav => unreachable!("WAV handled above"),
1278        }
1279        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
1280        let size = self.write_verify(&clip.id, path, &tagged)?;
1281        self.refresh_hashes(manifest, &clip.id, Some(size));
1282        Ok(Effect::Retagged)
1283    }
1284
1285    /// Move the file and update the entry's path (and protection).
1286    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
1287        let label = self
1288            .by_path
1289            .get(to)
1290            .map(|d| d.clip.id.clone())
1291            .unwrap_or_else(|| to.to_owned());
1292        self.fs.rename(from, to).map_err(|err| {
1293            if err.is_out_of_space() {
1294                disk_fail(label, "disk full: no space left to rename")
1295            } else {
1296                permanent_fail(label, format!("rename failed: {err}"))
1297            }
1298        })?;
1299
1300        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
1301            manifest
1302                .entries
1303                .iter()
1304                .find(|(_, entry)| entry.path == from)
1305                .map(|(id, _)| id.clone())
1306        });
1307        if let Some(id) = clip_id
1308            && let Some(entry) = manifest.entries.get_mut(&id)
1309        {
1310            entry.path = to.to_owned();
1311            if let Some(d) = self.by_path.get(to) {
1312                entry.preserve = preserve_for(d);
1313            }
1314        }
1315        Ok(Effect::Renamed)
1316    }
1317
1318    /// Remove the file and drop the manifest entry.
1319    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
1320        self.fs
1321            .remove(path)
1322            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
1323        manifest.remove(clip_id);
1324        Ok(Effect::Deleted)
1325    }
1326
1327    /// Relocate a fetched per-clip sidecar with a local rename, falling back to a
1328    /// fetch-and-write when the move is unsafe or the old file has vanished.
1329    ///
1330    /// Reconcile downgrades a pure path drift (same bytes, new path, old file
1331    /// present, fetched kind) to a `MoveArtifact`, so a retitle renames the file
1332    /// rather than re-downloading a cover or re-transcoding an animated WebP
1333    /// (#141). The in-place rename is taken only when `from` is this slot's alone
1334    /// to give up (no other tracked slot references it and no committed write has
1335    /// placed a file there); otherwise, or if the rename fails, fresh bytes are
1336    /// fetched and [`commit_artifact`](Self::commit_artifact) runs the gated
1337    /// old-path cleanup, so a swap or co-reference is handled exactly as before.
1338    #[allow(clippy::too_many_arguments)]
1339    async fn move_artifact(
1340        &self,
1341        manifest: &mut Manifest,
1342        albums: &mut BTreeMap<String, AlbumArt>,
1343        playlists: &mut BTreeMap<String, PlaylistState>,
1344        kind: ArtifactKind,
1345        from: &str,
1346        to: &str,
1347        source_url: &str,
1348        hash: &str,
1349        owner_id: &str,
1350        tracked_paths: &mut HashMap<String, u32>,
1351        committed: &BTreeSet<String>,
1352    ) -> Result<Effect, Fail> {
1353        // A per-clip sidecar needs its owning clip's audio present.
1354        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
1355            return Ok(Effect::Skipped);
1356        }
1357        // Relocate in place only when `from` is ours alone to give up: no other
1358        // tracked slot still references it (a prior failed swap can share a path)
1359        // and no committed write this run has already placed a file there.
1360        // Otherwise the fetch-and-write fallback copies fresh bytes and runs the
1361        // gated old-path cleanup.
1362        let exclusive =
1363            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1364        if from != to && exclusive {
1365            match self.fs.rename(from, to) {
1366                Ok(()) => {
1367                    if let Some(count) = tracked_paths.get_mut(from) {
1368                        *count = count.saturating_sub(1);
1369                    }
1370                    if let Some(entry) = manifest.entries.get_mut(owner_id) {
1371                        set_manifest_artifact(
1372                            entry,
1373                            kind,
1374                            Some(ArtifactState {
1375                                path: to.to_owned(),
1376                                hash: hash.to_owned(),
1377                            }),
1378                        );
1379                    }
1380                    return Ok(Effect::Renamed);
1381                }
1382                Err(err) if err.is_out_of_space() => {
1383                    return Err(disk_fail(
1384                        owner_id,
1385                        "disk full: no space left to move sidecar",
1386                    ));
1387                }
1388                // The old file has vanished, or the rename is unsupported: fall
1389                // through to a fetch-and-write at `to`.
1390                Err(_) => {}
1391            }
1392        }
1393        let bytes = self.artifact_bytes(kind, source_url, owner_id).await?;
1394        self.commit_artifact(
1395            manifest,
1396            albums,
1397            playlists,
1398            PreparedArtifact {
1399                kind,
1400                path: to.to_owned(),
1401                hash: hash.to_owned(),
1402                owner_id: owner_id.to_owned(),
1403                bytes,
1404            },
1405            tracked_paths,
1406            committed,
1407        )
1408    }
1409    ///
1410    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1411    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1412    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1413    /// ffmpeg port; every other kind is the fetched source verbatim (the static
1414    /// [`CoverJpg`](ArtifactKind::CoverJpg) / album [`FolderJpg`](ArtifactKind::FolderJpg)
1415    /// image, or the raw album [`FolderMp4`](ArtifactKind::FolderMp4) whose
1416    /// `video_cover_url` is kept untranscoded). A fetch or transcode failure
1417    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1418    /// disk-full transcode, which aborts the run like the audio FLAC path.
1419    async fn artifact_bytes(
1420        &self,
1421        kind: ArtifactKind,
1422        source_url: &str,
1423        owner_id: &str,
1424    ) -> Result<Vec<u8>, Fail> {
1425        // Reuse the cover the audio producer already fetched for the embedded tag
1426        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1427        // is taken and dropped in its own statement so it never spans the await.
1428        let cached = self.cover_cache_lock().remove(source_url);
1429        let source = match cached {
1430            Some(bytes) => bytes,
1431            None => {
1432                let fetched = self
1433                    .fetch_bytes(source_url)
1434                    .await
1435                    .map_err(|err| err.attribute(owner_id))?;
1436                // Cache the raw source when a sibling folder artifact will fetch
1437                // the same URL (the `both` retention: cover.webp + cover.mp4), so
1438                // it is fetched exactly once. Bounded to shared URLs and drained
1439                // on the sibling's use.
1440                if self.shared_cover_urls.contains(source_url) {
1441                    self.cover_cache_lock()
1442                        .insert(source_url.to_owned(), fetched.clone());
1443                }
1444                fetched
1445            }
1446        };
1447        match kind {
1448            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1449                .ffmpeg
1450                .mp4_to_webp(&source, self.opts.cover_webp)
1451                .await
1452                .map_err(|err| {
1453                    if err.is_out_of_space() {
1454                        disk_fail(owner_id, "disk full: no space left to transcode")
1455                    } else {
1456                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1457                    }
1458                }),
1459            // The text sidecars are generated and always carry inline content, so
1460            // `write_artifact` never reaches this fetch path for them. Guard it so
1461            // a future miswiring fails loudly rather than fetching a URL.
1462            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1463                permanent_fail(owner_id, "text sidecar requires inline content"),
1464            ),
1465            ArtifactKind::CoverJpg
1466            | ArtifactKind::FolderJpg
1467            | ArtifactKind::FolderMp4
1468            | ArtifactKind::Playlist
1469            | ArtifactKind::VideoMp4 => Ok(source),
1470        }
1471    }
1472
1473    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1474    ///
1475    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1476    /// When the owning entry is already gone (its audio was deleted earlier this
1477    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1478    ///
1479    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1480    /// the album's root id, not on a manifest clip.
1481    ///
1482    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1483    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1484    /// untracked, but the design stays convergent rather than transactional: the
1485    /// next run re-plans the same removal and retries, and any directory it would
1486    /// have emptied is pruned once the file finally clears.
1487    fn delete_artifact(
1488        &self,
1489        manifest: &mut Manifest,
1490        albums: &mut BTreeMap<String, AlbumArt>,
1491        playlists: &mut BTreeMap<String, PlaylistState>,
1492        kind: ArtifactKind,
1493        path: &str,
1494        owner_id: &str,
1495    ) -> Result<Effect, Fail> {
1496        self.fs
1497            .remove(path)
1498            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1499        if is_album_kind(kind) {
1500            if let Some(art) = albums.get_mut(owner_id) {
1501                art.set(kind, None);
1502                if art.is_empty() {
1503                    albums.remove(owner_id);
1504                }
1505            }
1506        } else if is_playlist_kind(kind) {
1507            playlists.remove(owner_id);
1508        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1509            set_manifest_artifact(entry, kind, None);
1510        }
1511        Ok(Effect::ArtifactDeleted)
1512    }
1513
1514    /// Relocate a stem with a local rename, falling back to a fetch-and-write
1515    /// when the move is unsafe or the old file has vanished (#141).
1516    ///
1517    /// Reconcile downgrades a pure stem path drift to a `MoveStem`, so a retitle
1518    /// renames the raw stem rather than re-rendering a WAV through `convert_wav`
1519    /// or re-fetching an MP3. The in-place rename is taken only when `from` is
1520    /// this slot's alone to give up (no other tracked slot references it — two
1521    /// same-base clips can share a stem path after a partially-failed swap — and
1522    /// no committed write this run already holds it); otherwise the
1523    /// fetch-and-write fallback re-fetches the correct bytes at `to`, so a
1524    /// co-referenced shared stem is never renamed away with mismatched content.
1525    #[allow(clippy::too_many_arguments)]
1526    async fn move_stem(
1527        &self,
1528        client_lock: &ClientLock<'_, C>,
1529        manifest: &mut Manifest,
1530        clip_id: &str,
1531        key: &str,
1532        stem_id: &str,
1533        from: &str,
1534        to: &str,
1535        source_url: &str,
1536        format: StemFormat,
1537        hash: &str,
1538        tracked_paths: &mut HashMap<String, u32>,
1539        committed: &BTreeSet<String>,
1540    ) -> Result<Effect, Fail> {
1541        if manifest.get(clip_id).is_none() {
1542            return Ok(Effect::Skipped);
1543        }
1544        let exclusive =
1545            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1546        if from != to && exclusive {
1547            match self.fs.rename(from, to) {
1548                Ok(()) => {
1549                    if let Some(count) = tracked_paths.get_mut(from) {
1550                        *count = count.saturating_sub(1);
1551                    }
1552                    if let Some(entry) = manifest.entries.get_mut(clip_id) {
1553                        set_manifest_stem(
1554                            entry,
1555                            key,
1556                            Some(ArtifactState {
1557                                path: to.to_owned(),
1558                                hash: hash.to_owned(),
1559                            }),
1560                        );
1561                    }
1562                    return Ok(Effect::Renamed);
1563                }
1564                Err(err) if err.is_out_of_space() => {
1565                    return Err(disk_fail(clip_id, "disk full: no space left to move stem"));
1566                }
1567                // The old file has vanished, or the rename is unsupported: fall
1568                // through to a fetch-and-write at `to`.
1569                Err(_) => {}
1570            }
1571        }
1572        let bytes = self
1573            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1574            .await?;
1575        self.commit_stem(
1576            manifest,
1577            PreparedStem {
1578                clip_id: clip_id.to_owned(),
1579                key: key.to_owned(),
1580                path: to.to_owned(),
1581                hash: hash.to_owned(),
1582                bytes,
1583            },
1584            tracked_paths,
1585            committed,
1586        )
1587    }
1588
1589    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1590    ///
1591    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1592    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1593    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1594    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1595    /// stem with no id to render) downloads its public CDN url directly. Stems
1596    /// are the deliberate exception to the source format: the bytes are returned
1597    /// exactly as delivered and are never re-encoded to FLAC.
1598    async fn fetch_stem_bytes(
1599        &self,
1600        client_lock: &ClientLock<'_, C>,
1601        clip_id: &str,
1602        stem_id: &str,
1603        source_url: &str,
1604        format: StemFormat,
1605    ) -> Result<Vec<u8>, Fail> {
1606        let url = match format {
1607            StemFormat::Wav if !stem_id.is_empty() => {
1608                match self.resolve_wav_url(client_lock, stem_id).await? {
1609                    Some(url) => url,
1610                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1611                }
1612            }
1613            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1614            _ => source_url.to_owned(),
1615        };
1616        self.fetch_bytes(&url)
1617            .await
1618            .map_err(|err| err.attribute(clip_id))
1619    }
1620
1621    /// Remove one stem file and clear its slot in the owning clip's stem map.
1622    ///
1623    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1624    /// the owning entry is already gone (its audio was deleted earlier this run,
1625    /// co-deleting the stem), there is no slot to clear and that is fine; the
1626    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1627    fn delete_stem(
1628        &self,
1629        manifest: &mut Manifest,
1630        clip_id: &str,
1631        key: &str,
1632        path: &str,
1633    ) -> Result<Effect, Fail> {
1634        self.fs
1635            .remove(path)
1636            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1637        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1638            set_manifest_stem(entry, key, None);
1639        }
1640        Ok(Effect::ArtifactDeleted)
1641    }
1642
1643    /// Download (and transcode/tag) the audio for `clip` in `format`.
1644    async fn produce_audio(
1645        &self,
1646        client_lock: &ClientLock<'_, C>,
1647        clip: &Clip,
1648        lineage: &LineageContext,
1649        format: AudioFormat,
1650    ) -> Result<Vec<u8>, Fail> {
1651        let (meta, synced) = self.track_meta(clip, lineage);
1652        match format {
1653            AudioFormat::Mp3 => {
1654                let url = clip.mp3_url();
1655                let audio = self
1656                    .fetch_bytes(&url)
1657                    .await
1658                    .map_err(|err| err.attribute(&clip.id))?;
1659                let cover = self.fetch_cover(clip).await;
1660                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1661                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1662            }
1663            AudioFormat::Flac => {
1664                let wav = self.fetch_wav(client_lock, clip).await?;
1665                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1666                    if err.is_out_of_space() {
1667                        disk_fail(&clip.id, "disk full: no space left to transcode")
1668                    } else {
1669                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1670                    }
1671                })?;
1672                let cover = self.fetch_cover(clip).await;
1673                tag_flac(&flac, &meta, cover.as_deref())
1674                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1675            }
1676            AudioFormat::Wav => {
1677                let wav = self.fetch_wav(client_lock, clip).await?;
1678                let cover = self.fetch_cover(clip).await;
1679                tag_wav(&wav, &meta, cover.as_deref(), synced)
1680                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1681            }
1682        }
1683    }
1684
1685    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1686    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1687        self.synced
1688            .get(clip_id)
1689            .filter(|aligned| !aligned.is_empty())
1690    }
1691
1692    /// The track metadata for a clip, paired with its synced lyrics (if any).
1693    ///
1694    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1695    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1696    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1697    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1698    fn track_meta<'m>(
1699        &'m self,
1700        clip: &Clip,
1701        lineage: &LineageContext,
1702    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1703        let synced = self.synced_for(&clip.id);
1704        let mut meta = TrackMetadata::from_clip(clip, lineage);
1705        if let Some(aligned) = synced {
1706            meta.lyrics = aligned.plain_text();
1707        }
1708        (meta, synced)
1709    }
1710
1711    /// Resolve the rendered WAV URL and download it.
1712    async fn fetch_wav(
1713        &self,
1714        client_lock: &ClientLock<'_, C>,
1715        clip: &Clip,
1716    ) -> Result<Vec<u8>, Fail> {
1717        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1718            Some(url) => url,
1719            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1720        };
1721        self.fetch_bytes(&url)
1722            .await
1723            .map_err(|err| err.attribute(&clip.id))
1724    }
1725
1726    /// Read the WAV URL, requesting a render and polling if it is not ready.
1727    ///
1728    /// `None` means the render did not become ready within the poll budget; the
1729    /// caller treats that as a non-fatal transient failure, never a silent skip.
1730    ///
1731    /// Each client call briefly locks `client_lock`; the poll waits happen
1732    /// unlocked, so concurrent clips interleave their WAV renders rather than
1733    /// serialising behind one clip's whole poll budget.
1734    async fn resolve_wav_url(
1735        &self,
1736        client_lock: &ClientLock<'_, C>,
1737        id: &str,
1738    ) -> Result<Option<String>, Fail> {
1739        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1740            return Ok(Some(url));
1741        }
1742        self.request_wav_retrying(client_lock, id).await?;
1743        for _ in 0..self.opts.wav_poll_attempts {
1744            self.clock.sleep(self.opts.wav_poll_interval).await;
1745            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1746                return Ok(Some(url));
1747            }
1748        }
1749        Ok(None)
1750    }
1751
1752    /// Read the rendered WAV URL, retrying transient API failures with backoff
1753    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1754    async fn wav_url_retrying(
1755        &self,
1756        client_lock: &ClientLock<'_, C>,
1757        id: &str,
1758    ) -> Result<Option<String>, Fail> {
1759        let mut attempt: u32 = 0;
1760        loop {
1761            let result = {
1762                let client = client_lock.lock().await;
1763                client.wav_url(self.http, id).await
1764            };
1765            match result {
1766                Ok(url) => return Ok(url),
1767                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1768                    Some(fail) => return Err(fail),
1769                    None => continue,
1770                },
1771            }
1772        }
1773    }
1774
1775    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1776    async fn request_wav_retrying(
1777        &self,
1778        client_lock: &ClientLock<'_, C>,
1779        id: &str,
1780    ) -> Result<(), Fail> {
1781        let mut attempt: u32 = 0;
1782        loop {
1783            let result = {
1784                let client = client_lock.lock().await;
1785                client.request_wav(self.http, id).await
1786            };
1787            match result {
1788                Ok(()) => return Ok(()),
1789                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1790                    Some(fail) => return Err(fail),
1791                    None => continue,
1792                },
1793            }
1794        }
1795    }
1796
1797    /// Classify a core error from the authenticated WAV flow. On a transient
1798    /// class within budget, back off through the [`Clock`] and return `None` to
1799    /// retry; otherwise return the terminal [`Fail`].
1800    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1801        let fail = classify_core(id, err);
1802        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1803            self.clock.sleep(backoff_delay(*attempt, None)).await;
1804            *attempt += 1;
1805            None
1806        } else {
1807            Some(fail)
1808        }
1809    }
1810
1811    /// GET `url`, retrying transient failures with backoff, verifying size.
1812    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1813        let mut attempt: u32 = 0;
1814        loop {
1815            let result = self.http.send(HttpRequest::get(url)).await;
1816            match classify_response(result) {
1817                Ok(body) => return Ok(body),
1818                Err(err) => {
1819                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1820                        let delay = backoff_delay(attempt, err.retry_after);
1821                        self.clock.sleep(delay).await;
1822                        attempt += 1;
1823                        continue;
1824                    }
1825                    return Err(err);
1826                }
1827            }
1828        }
1829    }
1830
1831    /// Download cover art, trying each candidate URL in order; `None` is fine.
1832    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1833        for url in clip.cover_candidates() {
1834            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1835                && (200..=299).contains(&response.status)
1836                && !response.body.is_empty()
1837            {
1838                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1839                // bytes so its write reuses them instead of fetching again (#89).
1840                // The lock guards only the insert, never the await above.
1841                if self.cover_wanted.contains(url) {
1842                    self.cover_cache_lock()
1843                        .insert(url.to_owned(), response.body.clone());
1844                }
1845                return Some(response.body);
1846            }
1847        }
1848        None
1849    }
1850
1851    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1852    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1853        self.fs.write_atomic(path, bytes).map_err(|err| {
1854            if err.is_out_of_space() {
1855                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1856            } else {
1857                permanent_fail(clip_id, format!("write failed: {err}"))
1858            }
1859        })?;
1860        match self.fs.metadata(path) {
1861            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1862            Some(stat) => Err(permanent_fail(
1863                clip_id,
1864                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1865            )),
1866            None => Ok(bytes.len() as u64),
1867        }
1868    }
1869
1870    /// Build the manifest entry for a freshly written file.
1871    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1872        match self.by_id.get(clip_id) {
1873            Some(d) => manifest_entry(d, size),
1874            None => ManifestEntry {
1875                path: path.to_owned(),
1876                format,
1877                size,
1878                ..ManifestEntry::default()
1879            },
1880        }
1881    }
1882
1883    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1884    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1885        let desired = self.by_id.get(clip_id).copied();
1886        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1887            if let Some(d) = desired {
1888                entry.meta_hash = d.meta_hash.clone();
1889                entry.art_hash = d.art_hash.clone();
1890                entry.preserve = preserve_for(d);
1891            }
1892            if let Some(size) = size {
1893                entry.size = size;
1894            }
1895        }
1896    }
1897
1898    /// Refresh only an entry's preserve marker from the current desired state.
1899    ///
1900    /// A clip can gain or lose copy/private protection with no file change, which
1901    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1902    /// persisted marker a faithful image of live protection, so the cross-run
1903    /// delete guard (SYNC-8) never reads it stale.
1904    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1905        if let Some(d) = self.by_id.get(clip_id).copied()
1906            && let Some(entry) = manifest.entries.get_mut(clip_id)
1907        {
1908            entry.preserve = preserve_for(d);
1909        }
1910    }
1911}
1912
1913/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1914fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1915    ManifestEntry {
1916        path: d.path.clone(),
1917        format: d.format,
1918        meta_hash: d.meta_hash.clone(),
1919        art_hash: d.art_hash.clone(),
1920        size,
1921        preserve: preserve_for(d),
1922        ..Default::default()
1923    }
1924}
1925
1926/// Whether a written entry must be preserved across runs: held by any copy
1927/// source, or private. The reconcile delete guard reads this marker later.
1928fn preserve_for(d: &Desired) -> bool {
1929    d.private || d.modes.contains(&SourceMode::Copy)
1930}
1931
1932/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1933fn classify_response(
1934    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1935) -> Result<Vec<u8>, FetchError> {
1936    let response = match result {
1937        Ok(response) => response,
1938        Err(err) => {
1939            return Err(FetchError::transient(
1940                format!("transport error: {err}"),
1941                None,
1942            ));
1943        }
1944    };
1945    match response.status {
1946        200..=299 => {
1947            if let Some(expected) = content_length(&response) {
1948                let actual = response.body.len() as u64;
1949                if actual != expected {
1950                    return Err(FetchError::transient(
1951                        format!("truncated download: {actual} of {expected} bytes"),
1952                        None,
1953                    ));
1954                }
1955            }
1956            Ok(response.body)
1957        }
1958        401 | 403 => Err(FetchError::transient(
1959            format!("download rejected: status {}", response.status),
1960            None,
1961        )),
1962        408 => Err(FetchError::transient("request timed out", None)),
1963        429 => Err(FetchError::transient(
1964            "rate limited",
1965            retry_after(&response),
1966        )),
1967        500..=599 => Err(FetchError::transient(
1968            format!("server error {}", response.status),
1969            None,
1970        )),
1971        status => Err(FetchError::permanent(format!(
1972            "download failed: status {status}"
1973        ))),
1974    }
1975}
1976
1977/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1978fn classify_core(id: &str, err: Error) -> Fail {
1979    let reason = err.to_string();
1980    match err {
1981        Error::Auth(_) => auth_fail(id, reason),
1982        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1983        Error::Api(_)
1984        | Error::BadRequest(_)
1985        | Error::NotFound(_)
1986        | Error::Tag(_)
1987        | Error::Config(_)
1988        | Error::Refused(_) => permanent_fail(id, reason),
1989    }
1990}
1991
1992/// The provider-reported body size from `Content-Length`, if present and valid.
1993fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1994    response.header("content-length")?.trim().parse().ok()
1995}
1996
1997#[cfg(test)]
1998mod tests {
1999    use super::*;
2000    use crate::ClerkAuth;
2001    use crate::http::HttpResponse;
2002    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
2003
2004    fn clip(id: &str) -> Clip {
2005        Clip {
2006            id: id.to_owned(),
2007            title: "Song".to_owned(),
2008            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
2009            ..Default::default()
2010        }
2011    }
2012
2013    fn art_clip(id: &str) -> Clip {
2014        Clip {
2015            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
2016            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
2017            ..clip(id)
2018        }
2019    }
2020
2021    fn ext(format: AudioFormat) -> &'static str {
2022        match format {
2023            AudioFormat::Mp3 => "mp3",
2024            AudioFormat::Flac => "flac",
2025            AudioFormat::Wav => "wav",
2026        }
2027    }
2028
2029    fn desired(clip: Clip, format: AudioFormat) -> Desired {
2030        Desired {
2031            path: format!("{}.{}", clip.id, ext(format)),
2032            lineage: LineageContext::own_root(&clip),
2033            clip,
2034            format,
2035            meta_hash: "m".to_owned(),
2036            art_hash: "art".to_owned(),
2037            modes: vec![SourceMode::Mirror],
2038            trashed: false,
2039            private: false,
2040            artifacts: Vec::new(),
2041            stems: None,
2042        }
2043    }
2044
2045    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
2046        ManifestEntry {
2047            path: path.to_owned(),
2048            format,
2049            meta_hash: "old".to_owned(),
2050            art_hash: "old-art".to_owned(),
2051            size: 8,
2052            preserve: false,
2053            ..Default::default()
2054        }
2055    }
2056
2057    #[allow(clippy::too_many_arguments)]
2058    fn run<G: Ffmpeg>(
2059        plan: &Plan,
2060        manifest: &mut Manifest,
2061        desired: &[Desired],
2062        http: &ScriptedHttp,
2063        fs: &MemFs,
2064        ffmpeg: &G,
2065        clock: &RecordingClock,
2066        opts: &ExecOptions,
2067    ) -> ExecOutcome {
2068        let mut albums = BTreeMap::new();
2069        run_with_albums(
2070            plan,
2071            manifest,
2072            &mut albums,
2073            desired,
2074            http,
2075            fs,
2076            ffmpeg,
2077            clock,
2078            opts,
2079        )
2080    }
2081
2082    #[allow(clippy::too_many_arguments)]
2083    fn run_with_albums<G: Ffmpeg>(
2084        plan: &Plan,
2085        manifest: &mut Manifest,
2086        albums: &mut BTreeMap<String, AlbumArt>,
2087        desired: &[Desired],
2088        http: &ScriptedHttp,
2089        fs: &MemFs,
2090        ffmpeg: &G,
2091        clock: &RecordingClock,
2092        opts: &ExecOptions,
2093    ) -> ExecOutcome {
2094        let mut playlists = BTreeMap::new();
2095        run_full(
2096            plan,
2097            manifest,
2098            albums,
2099            &mut playlists,
2100            desired,
2101            http,
2102            fs,
2103            ffmpeg,
2104            clock,
2105            opts,
2106        )
2107    }
2108
2109    #[allow(clippy::too_many_arguments)]
2110    fn run_full<G: Ffmpeg>(
2111        plan: &Plan,
2112        manifest: &mut Manifest,
2113        albums: &mut BTreeMap<String, AlbumArt>,
2114        playlists: &mut BTreeMap<String, PlaylistState>,
2115        desired: &[Desired],
2116        http: &ScriptedHttp,
2117        fs: &MemFs,
2118        ffmpeg: &G,
2119        clock: &RecordingClock,
2120        opts: &ExecOptions,
2121    ) -> ExecOutcome {
2122        let client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2123        let synced = HashMap::new();
2124        pollster::block_on(execute(
2125            plan,
2126            manifest,
2127            albums,
2128            playlists,
2129            desired,
2130            &synced,
2131            Ports {
2132                client: &client,
2133                http,
2134                fs,
2135                ffmpeg,
2136                clock,
2137            },
2138            opts,
2139        ))
2140    }
2141
2142    fn small_poll() -> ExecOptions {
2143        ExecOptions {
2144            max_retries: 3,
2145            wav_poll_attempts: 2,
2146            wav_poll_interval: Duration::from_secs(5),
2147            concurrency: 4,
2148            cover_webp: WebpEncodeSettings::default(),
2149        }
2150    }
2151
2152    // ── Download: MP3 ───────────────────────────────────────────────
2153
2154    #[test]
2155    fn download_mp3_writes_tagged_file_and_records_manifest() {
2156        let c = art_clip("a");
2157        let d = desired(c.clone(), AudioFormat::Mp3);
2158        let plan = Plan {
2159            actions: vec![Action::Download {
2160                clip: c.clone(),
2161                lineage: LineageContext::own_root(&c),
2162                path: d.path.clone(),
2163                format: AudioFormat::Mp3,
2164            }],
2165        };
2166        let http = ScriptedHttp::new()
2167            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2168            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2169        let fs = MemFs::new();
2170        let ffmpeg = StubFfmpeg::flac();
2171        let clock = RecordingClock::new();
2172        let mut manifest = Manifest::new();
2173
2174        let outcome = run(
2175            &plan,
2176            &mut manifest,
2177            &[d],
2178            &http,
2179            &fs,
2180            &ffmpeg,
2181            &clock,
2182            &ExecOptions::default(),
2183        );
2184
2185        assert_eq!(outcome.downloaded, 1);
2186        assert_eq!(outcome.failed(), 0);
2187        assert_eq!(outcome.status, RunStatus::Completed);
2188        let written = fs.read_file("a.mp3").unwrap();
2189        assert_eq!(&written[..3], b"ID3");
2190        assert!(written.ends_with(b"mp3-body"));
2191        let entry = manifest.get("a").unwrap();
2192        assert_eq!(entry.path, "a.mp3");
2193        assert_eq!(entry.format, AudioFormat::Mp3);
2194        assert_eq!(entry.meta_hash, "m");
2195        assert_eq!(entry.art_hash, "art");
2196        assert_eq!(entry.size, written.len() as u64);
2197        assert!(!entry.preserve);
2198    }
2199
2200    #[test]
2201    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
2202        // A clip whose alignment was fetched this run gets a word-level SYLT frame
2203        // and its plain lyric text embedded (USLT), end to end through execute.
2204        let c = art_clip("a");
2205        let d = desired(c.clone(), AudioFormat::Mp3);
2206        let plan = Plan {
2207            actions: vec![Action::Download {
2208                clip: c.clone(),
2209                lineage: LineageContext::own_root(&c),
2210                path: d.path.clone(),
2211                format: AudioFormat::Mp3,
2212            }],
2213        };
2214        let http = ScriptedHttp::new()
2215            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2216            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2217        let fs = MemFs::new();
2218        let ffmpeg = StubFfmpeg::flac();
2219        let clock = RecordingClock::new();
2220        let mut manifest = Manifest::new();
2221        let mut albums = BTreeMap::new();
2222        let mut playlists = BTreeMap::new();
2223        let mut synced = HashMap::new();
2224        synced.insert(
2225            "a".to_string(),
2226            AlignedLyrics::from_json(&serde_json::json!({
2227                "aligned_words": [],
2228                "aligned_lyrics": [
2229                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
2230                     "words": [
2231                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
2232                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
2233                     ]}
2234                ]
2235            })),
2236        );
2237        let client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2238        let outcome = pollster::block_on(execute(
2239            &plan,
2240            &mut manifest,
2241            &mut albums,
2242            &mut playlists,
2243            &[d],
2244            &synced,
2245            Ports {
2246                client: &client,
2247                http: &http,
2248                fs: &fs,
2249                ffmpeg: &ffmpeg,
2250                clock: &clock,
2251            },
2252            &ExecOptions::default(),
2253        ));
2254
2255        assert_eq!(outcome.downloaded, 1);
2256        let written = fs.read_file("a.mp3").unwrap();
2257        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2258        assert_eq!(
2259            tag.synchronised_lyrics().count(),
2260            1,
2261            "a SYLT frame is embedded"
2262        );
2263        // The plain lyric text is populated from the alignment for the USLT frame.
2264        assert_eq!(
2265            tag.lyrics().next().map(|frame| frame.text.as_str()),
2266            Some("hi there")
2267        );
2268    }
2269
2270    #[test]
2271    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
2272        // The synced map is empty when the feature is off (no alignment fetched),
2273        // so no SYLT frame and no lyric text are embedded.
2274        let c = art_clip("a");
2275        let d = desired(c.clone(), AudioFormat::Mp3);
2276        let plan = Plan {
2277            actions: vec![Action::Download {
2278                clip: c.clone(),
2279                lineage: LineageContext::own_root(&c),
2280                path: d.path.clone(),
2281                format: AudioFormat::Mp3,
2282            }],
2283        };
2284        let http = ScriptedHttp::new()
2285            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2286            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2287        let fs = MemFs::new();
2288        let ffmpeg = StubFfmpeg::flac();
2289        let clock = RecordingClock::new();
2290        let mut manifest = Manifest::new();
2291        let mut albums = BTreeMap::new();
2292        let mut playlists = BTreeMap::new();
2293        let client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2294        let outcome = pollster::block_on(execute(
2295            &plan,
2296            &mut manifest,
2297            &mut albums,
2298            &mut playlists,
2299            &[d],
2300            &HashMap::new(),
2301            Ports {
2302                client: &client,
2303                http: &http,
2304                fs: &fs,
2305                ffmpeg: &ffmpeg,
2306                clock: &clock,
2307            },
2308            &ExecOptions::default(),
2309        ));
2310        assert_eq!(outcome.downloaded, 1);
2311        let written = fs.read_file("a.mp3").unwrap();
2312        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2313        assert_eq!(tag.synchronised_lyrics().count(), 0);
2314        assert_eq!(tag.lyrics().count(), 0);
2315    }
2316
2317    #[test]
2318    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
2319        let mut c = clip("a");
2320        c.audio_url = String::new();
2321        let d = desired(c.clone(), AudioFormat::Mp3);
2322        let plan = Plan {
2323            actions: vec![Action::Download {
2324                clip: c.clone(),
2325                lineage: LineageContext::own_root(&c),
2326                path: d.path.clone(),
2327                format: AudioFormat::Mp3,
2328            }],
2329        };
2330        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
2331        let fs = MemFs::new();
2332        let mut manifest = Manifest::new();
2333        let outcome = run(
2334            &plan,
2335            &mut manifest,
2336            &[d],
2337            &http,
2338            &fs,
2339            &StubFfmpeg::flac(),
2340            &RecordingClock::new(),
2341            &ExecOptions::default(),
2342        );
2343        assert_eq!(outcome.downloaded, 1);
2344        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
2345    }
2346
2347    // ── Download: FLAC render + transcode ───────────────────────────
2348
2349    #[test]
2350    fn download_flac_renders_transcodes_and_records() {
2351        let c = clip("b");
2352        let d = desired(c.clone(), AudioFormat::Flac);
2353        let plan = Plan {
2354            actions: vec![Action::Download {
2355                clip: c.clone(),
2356                lineage: LineageContext::own_root(&c),
2357                path: d.path.clone(),
2358                format: AudioFormat::Flac,
2359            }],
2360        };
2361        let http = ScriptedHttp::new()
2362            .with_auth()
2363            .route(
2364                "/wav_file/",
2365                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
2366            )
2367            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
2368        let fs = MemFs::new();
2369        let clock = RecordingClock::new();
2370        let mut manifest = Manifest::new();
2371
2372        let outcome = run(
2373            &plan,
2374            &mut manifest,
2375            &[d],
2376            &http,
2377            &fs,
2378            &StubFfmpeg::flac(),
2379            &clock,
2380            &ExecOptions::default(),
2381        );
2382
2383        assert_eq!(outcome.downloaded, 1);
2384        assert_eq!(outcome.failed(), 0);
2385        let written = fs.read_file("b.flac").unwrap();
2386        assert_eq!(&written[..4], b"fLaC");
2387        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
2388        // The URL was ready immediately, so no render request and no polling.
2389        assert_eq!(http.count("/convert_wav/"), 0);
2390        assert!(clock.sleeps().is_empty());
2391    }
2392
2393    #[test]
2394    fn download_flac_requests_render_then_polls_until_ready() {
2395        let c = clip("c");
2396        let d = desired(c.clone(), AudioFormat::Flac);
2397        let plan = Plan {
2398            actions: vec![Action::Download {
2399                clip: c.clone(),
2400                lineage: LineageContext::own_root(&c),
2401                path: d.path.clone(),
2402                format: AudioFormat::Flac,
2403            }],
2404        };
2405        let http = ScriptedHttp::new()
2406            .with_auth()
2407            .route_seq(
2408                "/wav_file/",
2409                vec![
2410                    Reply::json("{}"),
2411                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2412                ],
2413            )
2414            .route("/convert_wav/", Reply::status(200))
2415            .route("c.wav", Reply::ok(b"wav".to_vec()));
2416        let clock = RecordingClock::new();
2417        let mut manifest = Manifest::new();
2418
2419        let outcome = run(
2420            &plan,
2421            &mut manifest,
2422            &[d],
2423            &http,
2424            &fs_new(),
2425            &StubFfmpeg::flac(),
2426            &clock,
2427            &small_poll(),
2428        );
2429
2430        assert_eq!(outcome.downloaded, 1);
2431        assert_eq!(http.count("/convert_wav/"), 1);
2432        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2433    }
2434
2435    #[test]
2436    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2437        let c = clip("d");
2438        let d = desired(c.clone(), AudioFormat::Flac);
2439        let plan = Plan {
2440            actions: vec![Action::Download {
2441                clip: c.clone(),
2442                lineage: LineageContext::own_root(&c),
2443                path: d.path.clone(),
2444                format: AudioFormat::Flac,
2445            }],
2446        };
2447        let http = ScriptedHttp::new()
2448            .with_auth()
2449            .route("/wav_file/", Reply::json("{}"))
2450            .route("/convert_wav/", Reply::status(200));
2451        let fs = MemFs::new();
2452        let clock = RecordingClock::new();
2453        let mut manifest = Manifest::new();
2454
2455        let outcome = run(
2456            &plan,
2457            &mut manifest,
2458            &[d],
2459            &http,
2460            &fs,
2461            &StubFfmpeg::flac(),
2462            &clock,
2463            &small_poll(),
2464        );
2465
2466        assert_eq!(outcome.downloaded, 0);
2467        assert_eq!(outcome.failed(), 1);
2468        assert_eq!(outcome.failures[0].clip_id, "d");
2469        assert_eq!(outcome.status, RunStatus::Completed);
2470        assert!(!fs.exists("d.flac"));
2471        assert_eq!(clock.sleeps().len(), 2);
2472    }
2473
2474    #[test]
2475    fn flac_transcode_failure_is_recorded_and_skipped() {
2476        let c = clip("t");
2477        let d = desired(c.clone(), AudioFormat::Flac);
2478        let plan = Plan {
2479            actions: vec![Action::Download {
2480                clip: c.clone(),
2481                lineage: LineageContext::own_root(&c),
2482                path: d.path.clone(),
2483                format: AudioFormat::Flac,
2484            }],
2485        };
2486        let http = ScriptedHttp::new()
2487            .with_auth()
2488            .route(
2489                "/wav_file/",
2490                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2491            )
2492            .route("t.wav", Reply::ok(b"wav".to_vec()));
2493        let fs = MemFs::new();
2494        let mut manifest = Manifest::new();
2495
2496        let outcome = run(
2497            &plan,
2498            &mut manifest,
2499            &[d],
2500            &http,
2501            &fs,
2502            &StubFfmpeg::failing(),
2503            &RecordingClock::new(),
2504            &ExecOptions::default(),
2505        );
2506
2507        assert_eq!(outcome.downloaded, 0);
2508        assert_eq!(outcome.failed(), 1);
2509        assert!(!fs.exists("t.flac"));
2510        assert!(manifest.get("t").is_none());
2511    }
2512
2513    // ── Cover fallback ──────────────────────────────────────────────
2514
2515    #[test]
2516    fn cover_falls_back_when_large_image_is_missing() {
2517        let c = art_clip("e");
2518        let d = desired(c.clone(), AudioFormat::Mp3);
2519        let plan = Plan {
2520            actions: vec![Action::Download {
2521                clip: c.clone(),
2522                lineage: LineageContext::own_root(&c),
2523                path: d.path.clone(),
2524                format: AudioFormat::Mp3,
2525            }],
2526        };
2527        let http = ScriptedHttp::new()
2528            .route("e.mp3", Reply::ok(b"body".to_vec()))
2529            .route("e/large.jpg", Reply::status(404))
2530            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2531        let fs = MemFs::new();
2532        let mut manifest = Manifest::new();
2533
2534        let outcome = run(
2535            &plan,
2536            &mut manifest,
2537            &[d],
2538            &http,
2539            &fs,
2540            &StubFfmpeg::flac(),
2541            &RecordingClock::new(),
2542            &ExecOptions::default(),
2543        );
2544
2545        assert_eq!(outcome.downloaded, 1);
2546        let calls = http.calls();
2547        let large = calls
2548            .iter()
2549            .position(|u| u.contains("e/large.jpg"))
2550            .unwrap();
2551        let small = calls
2552            .iter()
2553            .position(|u| u.contains("e/small.jpg"))
2554            .unwrap();
2555        assert!(large < small, "large art tried before small");
2556    }
2557
2558    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2559
2560    #[test]
2561    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2562        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2563        // fetched once and the bytes serve both.
2564        let c = art_clip("a");
2565        let d = desired(c.clone(), AudioFormat::Mp3);
2566        let plan = Plan {
2567            actions: vec![
2568                Action::Download {
2569                    clip: c.clone(),
2570                    lineage: LineageContext::own_root(&c),
2571                    path: d.path.clone(),
2572                    format: AudioFormat::Mp3,
2573                },
2574                Action::WriteArtifact {
2575                    kind: ArtifactKind::CoverJpg,
2576                    path: "a/cover.jpg".to_owned(),
2577                    source_url: c.selected_image_url().unwrap().to_owned(),
2578                    hash: "art".to_owned(),
2579                    owner_id: "a".to_owned(),
2580                    content: None,
2581                },
2582            ],
2583        };
2584        let http = ScriptedHttp::new()
2585            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2586            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2587        let fs = MemFs::new();
2588        let mut manifest = Manifest::new();
2589
2590        let outcome = run(
2591            &plan,
2592            &mut manifest,
2593            &[d],
2594            &http,
2595            &fs,
2596            &StubFfmpeg::flac(),
2597            &RecordingClock::new(),
2598            &ExecOptions::default(),
2599        );
2600
2601        assert_eq!(outcome.downloaded, 1);
2602        assert_eq!(outcome.artifacts_written, 1);
2603        assert_eq!(outcome.failed(), 0);
2604        // Fetched once, not twice.
2605        assert_eq!(http.count("a/large.jpg"), 1);
2606        // The sidecar carries the fetched bytes, and the audio was tagged.
2607        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2608        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2609    }
2610
2611    #[test]
2612    fn concurrent_downloads_reuse_each_clips_own_cover() {
2613        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2614        // (no cross-contamination) and each cover URL is fetched exactly once.
2615        let a = art_clip("a");
2616        let b = art_clip("b");
2617        let da = desired(a.clone(), AudioFormat::Mp3);
2618        let db = desired(b.clone(), AudioFormat::Mp3);
2619        let plan = Plan {
2620            actions: vec![
2621                Action::Download {
2622                    clip: a.clone(),
2623                    lineage: LineageContext::own_root(&a),
2624                    path: da.path.clone(),
2625                    format: AudioFormat::Mp3,
2626                },
2627                Action::WriteArtifact {
2628                    kind: ArtifactKind::CoverJpg,
2629                    path: "a/cover.jpg".to_owned(),
2630                    source_url: a.selected_image_url().unwrap().to_owned(),
2631                    hash: "art".to_owned(),
2632                    owner_id: "a".to_owned(),
2633                    content: None,
2634                },
2635                Action::Download {
2636                    clip: b.clone(),
2637                    lineage: LineageContext::own_root(&b),
2638                    path: db.path.clone(),
2639                    format: AudioFormat::Mp3,
2640                },
2641                Action::WriteArtifact {
2642                    kind: ArtifactKind::CoverJpg,
2643                    path: "b/cover.jpg".to_owned(),
2644                    source_url: b.selected_image_url().unwrap().to_owned(),
2645                    hash: "art".to_owned(),
2646                    owner_id: "b".to_owned(),
2647                    content: None,
2648                },
2649            ],
2650        };
2651        let http = ScriptedHttp::new()
2652            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2653            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2654            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2655            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2656        let fs = MemFs::new();
2657        let mut manifest = Manifest::new();
2658
2659        let outcome = run(
2660            &plan,
2661            &mut manifest,
2662            &[da, db],
2663            &http,
2664            &fs,
2665            &StubFfmpeg::flac(),
2666            &RecordingClock::new(),
2667            &small_poll(),
2668        );
2669
2670        assert_eq!(outcome.downloaded, 2);
2671        assert_eq!(outcome.artifacts_written, 2);
2672        assert_eq!(http.count("a/large.jpg"), 1);
2673        assert_eq!(http.count("b/large.jpg"), 1);
2674        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2675        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2676    }
2677
2678    #[test]
2679    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2680        // The large image 404s so the embed falls back to the small image; the
2681        // sidecar still wants the (dead) large URL and must NOT be handed the
2682        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2683        // the sidecar fetches the large URL itself (then fails on the 404).
2684        let c = art_clip("e");
2685        let d = desired(c.clone(), AudioFormat::Mp3);
2686        let plan = Plan {
2687            actions: vec![
2688                Action::Download {
2689                    clip: c.clone(),
2690                    lineage: LineageContext::own_root(&c),
2691                    path: d.path.clone(),
2692                    format: AudioFormat::Mp3,
2693                },
2694                Action::WriteArtifact {
2695                    kind: ArtifactKind::CoverJpg,
2696                    path: "e/cover.jpg".to_owned(),
2697                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2698                    hash: "art".to_owned(),
2699                    owner_id: "e".to_owned(),
2700                    content: None,
2701                },
2702            ],
2703        };
2704        let http = ScriptedHttp::new()
2705            .route("e.mp3", Reply::ok(b"body".to_vec()))
2706            .route("e/large.jpg", Reply::status(404))
2707            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2708        let fs = MemFs::new();
2709        let mut manifest = Manifest::new();
2710
2711        let outcome = run(
2712            &plan,
2713            &mut manifest,
2714            &[d],
2715            &http,
2716            &fs,
2717            &StubFfmpeg::flac(),
2718            &RecordingClock::new(),
2719            &ExecOptions::default(),
2720        );
2721
2722        assert_eq!(outcome.downloaded, 1);
2723        // The small image was fetched once (the embed fallback) and never reused
2724        // for the large-keyed sidecar; the sidecar went to the network itself.
2725        assert_eq!(http.count("e/small.jpg"), 1);
2726        assert!(
2727            http.count("e/large.jpg") >= 2,
2728            "sidecar refetched the large URL"
2729        );
2730        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2731        assert!(!fs.exists("e/cover.jpg"));
2732    }
2733
2734    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2735
2736    #[test]
2737    fn failed_write_leaves_the_prior_file_intact() {
2738        let c = clip("f");
2739        let d = desired(c.clone(), AudioFormat::Mp3);
2740        let plan = Plan {
2741            actions: vec![Action::Download {
2742                clip: c.clone(),
2743                lineage: LineageContext::own_root(&c),
2744                path: d.path.clone(),
2745                format: AudioFormat::Mp3,
2746            }],
2747        };
2748        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2749        let fs = MemFs::new()
2750            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2751            .fail_write("f.mp3");
2752        let mut manifest = Manifest::new();
2753
2754        let outcome = run(
2755            &plan,
2756            &mut manifest,
2757            &[d],
2758            &http,
2759            &fs,
2760            &StubFfmpeg::flac(),
2761            &RecordingClock::new(),
2762            &ExecOptions::default(),
2763        );
2764
2765        assert_eq!(outcome.downloaded, 0);
2766        assert_eq!(outcome.failed(), 1);
2767        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2768        assert!(manifest.get("f").is_none());
2769    }
2770
2771    #[test]
2772    fn size_mismatch_after_write_is_a_failure() {
2773        let c = clip("g");
2774        let d = desired(c.clone(), AudioFormat::Mp3);
2775        let plan = Plan {
2776            actions: vec![Action::Download {
2777                clip: c.clone(),
2778                lineage: LineageContext::own_root(&c),
2779                path: d.path.clone(),
2780                format: AudioFormat::Mp3,
2781            }],
2782        };
2783        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2784        let fs = MemFs::new().corrupt_write("g.mp3");
2785        let mut manifest = Manifest::new();
2786
2787        let outcome = run(
2788            &plan,
2789            &mut manifest,
2790            &[d],
2791            &http,
2792            &fs,
2793            &StubFfmpeg::flac(),
2794            &RecordingClock::new(),
2795            &ExecOptions::default(),
2796        );
2797
2798        assert_eq!(outcome.downloaded, 0);
2799        assert_eq!(outcome.failed(), 1);
2800        assert!(outcome.failures[0].reason.contains("expected"));
2801        assert!(manifest.get("g").is_none());
2802    }
2803
2804    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2805
2806    #[test]
2807    fn transient_failure_is_retried_then_skipped() {
2808        let c = clip("h");
2809        let d = desired(c.clone(), AudioFormat::Mp3);
2810        let plan = Plan {
2811            actions: vec![Action::Download {
2812                clip: c.clone(),
2813                lineage: LineageContext::own_root(&c),
2814                path: d.path.clone(),
2815                format: AudioFormat::Mp3,
2816            }],
2817        };
2818        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2819        let fs = MemFs::new();
2820        let clock = RecordingClock::new();
2821        let opts = ExecOptions {
2822            max_retries: 2,
2823            ..ExecOptions::default()
2824        };
2825        let mut manifest = Manifest::new();
2826
2827        let outcome = run(
2828            &plan,
2829            &mut manifest,
2830            &[d],
2831            &http,
2832            &fs,
2833            &StubFfmpeg::flac(),
2834            &clock,
2835            &opts,
2836        );
2837
2838        assert_eq!(outcome.downloaded, 0);
2839        assert_eq!(outcome.failed(), 1);
2840        assert_eq!(http.count("h.mp3"), 3);
2841        assert_eq!(clock.sleeps().len(), 2);
2842    }
2843
2844    #[test]
2845    fn truncated_download_is_retried_then_succeeds() {
2846        let c = clip("i");
2847        let d = desired(c.clone(), AudioFormat::Mp3);
2848        let plan = Plan {
2849            actions: vec![Action::Download {
2850                clip: c.clone(),
2851                lineage: LineageContext::own_root(&c),
2852                path: d.path.clone(),
2853                format: AudioFormat::Mp3,
2854            }],
2855        };
2856        let http = ScriptedHttp::new().route_seq(
2857            "i.mp3",
2858            vec![
2859                Reply::ok(b"short".to_vec()).with_content_length(999),
2860                Reply::ok(b"good-body".to_vec()),
2861            ],
2862        );
2863        let fs = MemFs::new();
2864        let clock = RecordingClock::new();
2865        let mut manifest = Manifest::new();
2866
2867        let outcome = run(
2868            &plan,
2869            &mut manifest,
2870            &[d],
2871            &http,
2872            &fs,
2873            &StubFfmpeg::flac(),
2874            &clock,
2875            &ExecOptions::default(),
2876        );
2877
2878        assert_eq!(outcome.downloaded, 1);
2879        assert_eq!(http.count("i.mp3"), 2);
2880        assert_eq!(clock.sleeps().len(), 1);
2881    }
2882
2883    #[test]
2884    fn rate_limit_backs_off_using_retry_after() {
2885        let c = clip("j");
2886        let d = desired(c.clone(), AudioFormat::Mp3);
2887        let plan = Plan {
2888            actions: vec![Action::Download {
2889                clip: c.clone(),
2890                lineage: LineageContext::own_root(&c),
2891                path: d.path.clone(),
2892                format: AudioFormat::Mp3,
2893            }],
2894        };
2895        let http = ScriptedHttp::new().route_seq(
2896            "j.mp3",
2897            vec![
2898                Reply::status(429).with_retry_after(7),
2899                Reply::ok(b"body".to_vec()),
2900            ],
2901        );
2902        let fs = MemFs::new();
2903        let clock = RecordingClock::new();
2904        let mut manifest = Manifest::new();
2905
2906        let outcome = run(
2907            &plan,
2908            &mut manifest,
2909            &[d],
2910            &http,
2911            &fs,
2912            &StubFfmpeg::flac(),
2913            &clock,
2914            &ExecOptions::default(),
2915        );
2916
2917        assert_eq!(outcome.downloaded, 1);
2918        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2919    }
2920
2921    #[test]
2922    fn auth_failure_aborts_the_run() {
2923        let c1 = clip("k1");
2924        let c2 = clip("k2");
2925        let d1 = desired(c1.clone(), AudioFormat::Flac);
2926        let d2 = desired(c2.clone(), AudioFormat::Flac);
2927        let plan = Plan {
2928            actions: vec![
2929                Action::Download {
2930                    clip: c1.clone(),
2931                    lineage: LineageContext::own_root(&c1),
2932                    path: d1.path.clone(),
2933                    format: AudioFormat::Flac,
2934                },
2935                Action::Download {
2936                    clip: c2.clone(),
2937                    lineage: LineageContext::own_root(&c2),
2938                    path: d2.path.clone(),
2939                    format: AudioFormat::Flac,
2940                },
2941            ],
2942        };
2943        // The authenticated WAV-render endpoint rejects auth even after a JWT
2944        // refresh: that is a bad token, so the whole run aborts rather than
2945        // hammering every clip. A CDN media rejection, by contrast, does not.
2946        let http = ScriptedHttp::new()
2947            .with_auth()
2948            .route("/wav_file/", Reply::status(401));
2949        let fs = MemFs::new();
2950        let mut manifest = Manifest::new();
2951
2952        let outcome = run(
2953            &plan,
2954            &mut manifest,
2955            &[d1, d2],
2956            &http,
2957            &fs,
2958            &StubFfmpeg::flac(),
2959            &RecordingClock::new(),
2960            &small_poll(),
2961        );
2962
2963        assert_eq!(outcome.status, RunStatus::AuthAborted);
2964        assert_eq!(outcome.failed(), 1);
2965        assert_eq!(outcome.failures[0].clip_id, "k1");
2966        assert_eq!(outcome.downloaded, 0);
2967    }
2968
2969    // ── Disk-full aborts the run (issue #17) ────────────────────────
2970
2971    #[test]
2972    fn disk_full_primary_write_aborts_the_run() {
2973        // Two MP3 downloads; the first write is out of space. That is systemic,
2974        // so the run aborts before the second is even attempted: exactly one
2975        // failure is recorded and its reason names the disk-full cause.
2976        let c1 = clip("d1");
2977        let c2 = clip("d2");
2978        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2979        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2980        let plan = Plan {
2981            actions: vec![
2982                Action::Download {
2983                    clip: c1.clone(),
2984                    lineage: LineageContext::own_root(&c1),
2985                    path: d1.path.clone(),
2986                    format: AudioFormat::Mp3,
2987                },
2988                Action::Download {
2989                    clip: c2.clone(),
2990                    lineage: LineageContext::own_root(&c2),
2991                    path: d2.path.clone(),
2992                    format: AudioFormat::Mp3,
2993                },
2994            ],
2995        };
2996        let http = ScriptedHttp::new()
2997            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2998            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2999        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
3000        let mut manifest = Manifest::new();
3001
3002        let outcome = run(
3003            &plan,
3004            &mut manifest,
3005            &[d1, d2],
3006            &http,
3007            &fs,
3008            &StubFfmpeg::flac(),
3009            &RecordingClock::new(),
3010            &ExecOptions::default(),
3011        );
3012
3013        assert_eq!(outcome.status, RunStatus::DiskFull);
3014        assert_eq!(outcome.failed(), 1);
3015        assert_eq!(outcome.failures[0].clip_id, "d1");
3016        assert!(outcome.failures[0].reason.contains("disk full"));
3017        assert_eq!(outcome.downloaded, 0);
3018        // The second clip was never fetched: the run aborted first.
3019        assert_eq!(http.count("d2.mp3"), 0);
3020        assert!(!fs.exists("d2.mp3"));
3021    }
3022
3023    #[test]
3024    fn disk_full_flac_transcode_aborts_the_run() {
3025        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
3026        // there is nowhere to stage the transcode, so the run aborts.
3027        let c1 = clip("d1");
3028        let c2 = clip("d2");
3029        let d1 = desired(c1.clone(), AudioFormat::Flac);
3030        let d2 = desired(c2.clone(), AudioFormat::Flac);
3031        let plan = Plan {
3032            actions: vec![
3033                Action::Download {
3034                    clip: c1.clone(),
3035                    lineage: LineageContext::own_root(&c1),
3036                    path: d1.path.clone(),
3037                    format: AudioFormat::Flac,
3038                },
3039                Action::Download {
3040                    clip: c2.clone(),
3041                    lineage: LineageContext::own_root(&c2),
3042                    path: d2.path.clone(),
3043                    format: AudioFormat::Flac,
3044                },
3045            ],
3046        };
3047        let http = ScriptedHttp::new()
3048            .with_auth()
3049            .route(
3050                "/wav_file/",
3051                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
3052            )
3053            .route(".wav", Reply::ok(b"wav".to_vec()));
3054        let fs = MemFs::new();
3055        let mut manifest = Manifest::new();
3056
3057        let outcome = run(
3058            &plan,
3059            &mut manifest,
3060            &[d1, d2],
3061            &http,
3062            &fs,
3063            &StubFfmpeg::out_of_space(),
3064            &RecordingClock::new(),
3065            &ExecOptions::default(),
3066        );
3067
3068        assert_eq!(outcome.status, RunStatus::DiskFull);
3069        assert_eq!(outcome.failed(), 1);
3070        assert_eq!(outcome.failures[0].clip_id, "d1");
3071        assert!(outcome.failures[0].reason.contains("disk full"));
3072        assert_eq!(outcome.downloaded, 0);
3073    }
3074
3075    #[test]
3076    fn disk_full_artifact_write_aborts_the_run() {
3077        // A sidecar write (not a primary download) also aborts on a full disk:
3078        // the owning audio is present, the cover fetch succeeds, but the sidecar
3079        // cannot be written.
3080        let mut manifest = Manifest::new();
3081        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3082        let plan = Plan {
3083            actions: vec![Action::WriteArtifact {
3084                kind: ArtifactKind::CoverJpg,
3085                path: "a/cover.jpg".to_owned(),
3086                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3087                hash: "h1".to_owned(),
3088                owner_id: "a".to_owned(),
3089                content: None,
3090            }],
3091        };
3092        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3093        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
3094
3095        let outcome = run(
3096            &plan,
3097            &mut manifest,
3098            &[],
3099            &http,
3100            &fs,
3101            &StubFfmpeg::flac(),
3102            &RecordingClock::new(),
3103            &ExecOptions::default(),
3104        );
3105
3106        assert_eq!(outcome.status, RunStatus::DiskFull);
3107        assert_eq!(outcome.failed(), 1);
3108        assert!(outcome.failures[0].reason.contains("disk full"));
3109        assert_eq!(outcome.artifacts_written, 0);
3110        // The sidecar slot was never recorded: the write failed before it.
3111        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3112    }
3113
3114    #[test]
3115    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
3116        // write_verify fails before any manifest insert, so a re-download that
3117        // hits a full disk leaves the prior entry (and file) exactly as it was.
3118        let c = clip("m");
3119        let d = desired(c.clone(), AudioFormat::Mp3);
3120        let plan = Plan {
3121            actions: vec![Action::Download {
3122                clip: c.clone(),
3123                lineage: LineageContext::own_root(&c),
3124                path: d.path.clone(),
3125                format: AudioFormat::Mp3,
3126            }],
3127        };
3128        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
3129        let fs = MemFs::new()
3130            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
3131            .fail_write_out_of_space("m.mp3");
3132        let mut manifest = Manifest::new();
3133        let before = entry("m.mp3", AudioFormat::Mp3);
3134        manifest.insert("m", before.clone());
3135
3136        let outcome = run(
3137            &plan,
3138            &mut manifest,
3139            &[d],
3140            &http,
3141            &fs,
3142            &StubFfmpeg::flac(),
3143            &RecordingClock::new(),
3144            &ExecOptions::default(),
3145        );
3146
3147        assert_eq!(outcome.status, RunStatus::DiskFull);
3148        assert_eq!(manifest.get("m"), Some(&before));
3149        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
3150    }
3151
3152    #[test]
3153    fn cdn_download_rejection_skips_the_clip_without_aborting() {
3154        let c1 = clip("k1");
3155        let c2 = clip("k2");
3156        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3157        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3158        let plan = Plan {
3159            actions: vec![
3160                Action::Download {
3161                    clip: c1.clone(),
3162                    lineage: LineageContext::own_root(&c1),
3163                    path: d1.path.clone(),
3164                    format: AudioFormat::Mp3,
3165                },
3166                Action::Download {
3167                    clip: c2.clone(),
3168                    lineage: LineageContext::own_root(&c2),
3169                    path: d2.path.clone(),
3170                    format: AudioFormat::Mp3,
3171                },
3172            ],
3173        };
3174        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
3175        // rejection (often transient), not a bad token: the clip is retried
3176        // then recorded and skipped, and the run carries on to the rest.
3177        let http = ScriptedHttp::new()
3178            .route("k1.mp3", Reply::status(403))
3179            .route("k2.mp3", Reply::ok(b"body".to_vec()));
3180        let fs = MemFs::new();
3181        let mut manifest = Manifest::new();
3182
3183        let outcome = run(
3184            &plan,
3185            &mut manifest,
3186            &[d1, d2],
3187            &http,
3188            &fs,
3189            &StubFfmpeg::flac(),
3190            &RecordingClock::new(),
3191            &ExecOptions::default(),
3192        );
3193
3194        assert_ne!(outcome.status, RunStatus::AuthAborted);
3195        assert_eq!(outcome.downloaded, 1);
3196        assert_eq!(outcome.failed(), 1);
3197        assert_eq!(outcome.failures[0].clip_id, "k1");
3198    }
3199
3200    #[test]
3201    fn one_clip_failure_does_not_abort_the_run() {
3202        let c1 = clip("l1");
3203        let c2 = clip("l2");
3204        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3205        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3206        let plan = Plan {
3207            actions: vec![
3208                Action::Download {
3209                    clip: c1.clone(),
3210                    lineage: LineageContext::own_root(&c1),
3211                    path: d1.path.clone(),
3212                    format: AudioFormat::Mp3,
3213                },
3214                Action::Download {
3215                    clip: c2.clone(),
3216                    lineage: LineageContext::own_root(&c2),
3217                    path: d2.path.clone(),
3218                    format: AudioFormat::Mp3,
3219                },
3220            ],
3221        };
3222        let http = ScriptedHttp::new()
3223            .route("l1.mp3", Reply::status(404))
3224            .route("l2.mp3", Reply::ok(b"body".to_vec()));
3225        let fs = MemFs::new();
3226        let mut manifest = Manifest::new();
3227
3228        let outcome = run(
3229            &plan,
3230            &mut manifest,
3231            &[d1, d2],
3232            &http,
3233            &fs,
3234            &StubFfmpeg::flac(),
3235            &RecordingClock::new(),
3236            &ExecOptions::default(),
3237        );
3238
3239        assert_eq!(outcome.status, RunStatus::Completed);
3240        assert_eq!(outcome.downloaded, 1);
3241        assert_eq!(outcome.failed(), 1);
3242        assert_eq!(outcome.failures[0].clip_id, "l1");
3243        assert!(fs.exists("l2.mp3"));
3244        assert!(manifest.get("l2").is_some());
3245        assert!(manifest.get("l1").is_none());
3246    }
3247
3248    // ── preserve marker (SYNC-8) ────────────────────────────────────
3249
3250    #[test]
3251    fn preserve_is_set_for_copy_held_and_private_clips() {
3252        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
3253        mirror.modes = vec![SourceMode::Mirror];
3254        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
3255        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
3256        let mut private = desired(clip("m3"), AudioFormat::Mp3);
3257        private.private = true;
3258
3259        let plan = Plan {
3260            actions: vec![
3261                Action::Download {
3262                    clip: mirror.clip.clone(),
3263                    lineage: LineageContext::own_root(&mirror.clip),
3264                    path: mirror.path.clone(),
3265                    format: AudioFormat::Mp3,
3266                },
3267                Action::Download {
3268                    clip: copy_held.clip.clone(),
3269                    lineage: LineageContext::own_root(&copy_held.clip),
3270                    path: copy_held.path.clone(),
3271                    format: AudioFormat::Mp3,
3272                },
3273                Action::Download {
3274                    clip: private.clip.clone(),
3275                    lineage: LineageContext::own_root(&private.clip),
3276                    path: private.path.clone(),
3277                    format: AudioFormat::Mp3,
3278                },
3279            ],
3280        };
3281        let http = ScriptedHttp::new()
3282            .route("m1.mp3", Reply::ok(b"a".to_vec()))
3283            .route("m2.mp3", Reply::ok(b"b".to_vec()))
3284            .route("m3.mp3", Reply::ok(b"c".to_vec()));
3285        let fs = MemFs::new();
3286        let mut manifest = Manifest::new();
3287
3288        let outcome = run(
3289            &plan,
3290            &mut manifest,
3291            &[mirror, copy_held, private],
3292            &http,
3293            &fs,
3294            &StubFfmpeg::flac(),
3295            &RecordingClock::new(),
3296            &ExecOptions::default(),
3297        );
3298
3299        assert_eq!(outcome.downloaded, 3);
3300        assert!(!manifest.get("m1").unwrap().preserve);
3301        assert!(manifest.get("m2").unwrap().preserve);
3302        assert!(manifest.get("m3").unwrap().preserve);
3303    }
3304
3305    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
3306
3307    #[test]
3308    fn reformat_writes_new_format_and_removes_old_file() {
3309        let c = clip("n");
3310        let d = desired(c.clone(), AudioFormat::Mp3);
3311        let plan = Plan {
3312            actions: vec![Action::Reformat {
3313                clip: c.clone(),
3314                path: "n.mp3".to_owned(),
3315                from_path: "n.flac".to_owned(),
3316                from: AudioFormat::Flac,
3317                to: AudioFormat::Mp3,
3318            }],
3319        };
3320        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
3321        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
3322        let mut manifest = Manifest::new();
3323        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
3324
3325        let outcome = run(
3326            &plan,
3327            &mut manifest,
3328            &[d],
3329            &http,
3330            &fs,
3331            &StubFfmpeg::flac(),
3332            &RecordingClock::new(),
3333            &ExecOptions::default(),
3334        );
3335
3336        assert_eq!(outcome.reformatted, 1);
3337        assert!(fs.exists("n.mp3"));
3338        assert!(!fs.exists("n.flac"));
3339        let updated = manifest.get("n").unwrap();
3340        assert_eq!(updated.path, "n.mp3");
3341        assert_eq!(updated.format, AudioFormat::Mp3);
3342        assert_eq!(updated.meta_hash, "m");
3343    }
3344
3345    #[test]
3346    fn retag_rewrites_file_and_updates_hashes() {
3347        let c = clip("o");
3348        let mut d = desired(c.clone(), AudioFormat::Mp3);
3349        d.meta_hash = "new".to_owned();
3350        d.art_hash = "new-art".to_owned();
3351        let existing = tag_mp3(
3352            b"audio",
3353            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
3354            None,
3355            None,
3356        )
3357        .unwrap();
3358        let fs = MemFs::new().with_file("o.mp3", existing.clone());
3359        let mut manifest = Manifest::new();
3360        let mut start = entry("o.mp3", AudioFormat::Mp3);
3361        start.size = existing.len() as u64;
3362        manifest.insert("o", start);
3363        let plan = Plan {
3364            actions: vec![Action::Retag {
3365                clip: c.clone(),
3366                lineage: LineageContext::own_root(&c),
3367                path: "o.mp3".to_owned(),
3368            }],
3369        };
3370
3371        let outcome = run(
3372            &plan,
3373            &mut manifest,
3374            &[d],
3375            &ScriptedHttp::new(),
3376            &fs,
3377            &StubFfmpeg::flac(),
3378            &RecordingClock::new(),
3379            &ExecOptions::default(),
3380        );
3381
3382        assert_eq!(outcome.retagged, 1);
3383        let updated = manifest.get("o").unwrap();
3384        assert_eq!(updated.meta_hash, "new");
3385        assert_eq!(updated.art_hash, "new-art");
3386        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
3387    }
3388
3389    #[test]
3390    fn rename_moves_file_and_updates_manifest_path() {
3391        let c = clip("p");
3392        let mut d = desired(c.clone(), AudioFormat::Mp3);
3393        d.path = "new/p.mp3".to_owned();
3394        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3395        let mut manifest = Manifest::new();
3396        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3397        let plan = Plan {
3398            actions: vec![Action::Rename {
3399                from: "old/p.mp3".to_owned(),
3400                to: "new/p.mp3".to_owned(),
3401            }],
3402        };
3403
3404        let outcome = run(
3405            &plan,
3406            &mut manifest,
3407            &[d],
3408            &ScriptedHttp::new(),
3409            &fs,
3410            &StubFfmpeg::flac(),
3411            &RecordingClock::new(),
3412            &ExecOptions::default(),
3413        );
3414
3415        assert_eq!(outcome.renamed, 1);
3416        assert!(fs.exists("new/p.mp3"));
3417        assert!(!fs.exists("old/p.mp3"));
3418        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3419    }
3420
3421    #[test]
3422    fn disk_full_rename_aborts_the_run() {
3423        // A move onto a full disk is systemic like a full-disk write: the run
3424        // aborts with DiskFull and the source file is left untouched.
3425        let c = clip("p");
3426        let mut d = desired(c.clone(), AudioFormat::Mp3);
3427        d.path = "new/p.mp3".to_owned();
3428        let fs = MemFs::new()
3429            .with_file("old/p.mp3", b"DATA".to_vec())
3430            .fail_rename_out_of_space("new/p.mp3");
3431        let mut manifest = Manifest::new();
3432        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3433        let plan = Plan {
3434            actions: vec![Action::Rename {
3435                from: "old/p.mp3".to_owned(),
3436                to: "new/p.mp3".to_owned(),
3437            }],
3438        };
3439
3440        let outcome = run(
3441            &plan,
3442            &mut manifest,
3443            &[d],
3444            &ScriptedHttp::new(),
3445            &fs,
3446            &StubFfmpeg::flac(),
3447            &RecordingClock::new(),
3448            &ExecOptions::default(),
3449        );
3450
3451        assert_eq!(outcome.status, RunStatus::DiskFull);
3452        assert_eq!(outcome.renamed, 0);
3453        assert_eq!(outcome.failed(), 1);
3454        assert!(outcome.failures[0].reason.contains("disk full"));
3455        // The source is untouched: the move never happened.
3456        assert!(fs.exists("old/p.mp3"));
3457        assert!(!fs.exists("new/p.mp3"));
3458        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3459    }
3460
3461    #[test]
3462    fn delete_removes_file_and_manifest_entry() {
3463        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3464        let mut manifest = Manifest::new();
3465        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3466        let plan = Plan {
3467            actions: vec![Action::Delete {
3468                path: "q.mp3".to_owned(),
3469                clip_id: "q".to_owned(),
3470            }],
3471        };
3472
3473        let outcome = run(
3474            &plan,
3475            &mut manifest,
3476            &[],
3477            &ScriptedHttp::new(),
3478            &fs,
3479            &StubFfmpeg::flac(),
3480            &RecordingClock::new(),
3481            &ExecOptions::default(),
3482        );
3483
3484        assert_eq!(outcome.deleted, 1);
3485        assert!(!fs.exists("q.mp3"));
3486        assert!(manifest.get("q").is_none());
3487    }
3488
3489    #[test]
3490    fn failed_delete_keeps_the_manifest_entry() {
3491        let fs = MemFs::new()
3492            .with_file("s.mp3", b"DATA".to_vec())
3493            .fail_remove("s.mp3");
3494        let mut manifest = Manifest::new();
3495        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3496        let plan = Plan {
3497            actions: vec![Action::Delete {
3498                path: "s.mp3".to_owned(),
3499                clip_id: "s".to_owned(),
3500            }],
3501        };
3502
3503        let outcome = run(
3504            &plan,
3505            &mut manifest,
3506            &[],
3507            &ScriptedHttp::new(),
3508            &fs,
3509            &StubFfmpeg::flac(),
3510            &RecordingClock::new(),
3511            &ExecOptions::default(),
3512        );
3513
3514        assert_eq!(outcome.deleted, 0);
3515        assert_eq!(outcome.failed(), 1);
3516        assert!(manifest.get("s").is_some());
3517        assert!(fs.exists("s.mp3"));
3518    }
3519
3520    #[test]
3521    fn skip_is_a_noop() {
3522        let mut manifest = Manifest::new();
3523        let plan = Plan {
3524            actions: vec![Action::Skip {
3525                clip_id: "r".to_owned(),
3526            }],
3527        };
3528        let outcome = run(
3529            &plan,
3530            &mut manifest,
3531            &[],
3532            &ScriptedHttp::new(),
3533            &MemFs::new(),
3534            &StubFfmpeg::flac(),
3535            &RecordingClock::new(),
3536            &ExecOptions::default(),
3537        );
3538        assert_eq!(outcome.skipped, 1);
3539        assert_eq!(outcome.failed(), 0);
3540    }
3541
3542    // ── Pure helpers ────────────────────────────────────────────────
3543
3544    #[test]
3545    fn header_helpers_parse_or_ignore() {
3546        let resp = HttpResponse {
3547            status: 200,
3548            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3549            body: Vec::new(),
3550        };
3551        assert_eq!(content_length(&resp), Some(42));
3552
3553        let bare = HttpResponse {
3554            status: 200,
3555            headers: Vec::new(),
3556            body: Vec::new(),
3557        };
3558        assert_eq!(content_length(&bare), None);
3559    }
3560
3561    #[test]
3562    fn preserve_rule_covers_copy_and_private() {
3563        let base = desired(clip("x"), AudioFormat::Mp3);
3564        assert!(!preserve_for(&base));
3565        let mut copy_held = base.clone();
3566        copy_held.modes = vec![SourceMode::Copy];
3567        assert!(preserve_for(&copy_held));
3568        let mut private = base.clone();
3569        private.private = true;
3570        assert!(preserve_for(&private));
3571    }
3572
3573    fn fs_new() -> MemFs {
3574        MemFs::new()
3575    }
3576
3577    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3578
3579    #[test]
3580    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3581        let c = clip("s1");
3582        let mut d = desired(c.clone(), AudioFormat::Mp3);
3583        d.modes = vec![SourceMode::Copy];
3584        let plan = Plan {
3585            actions: vec![Action::Skip {
3586                clip_id: "s1".to_owned(),
3587            }],
3588        };
3589        let mut manifest = Manifest::new();
3590        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3591        assert!(!manifest.get("s1").unwrap().preserve);
3592
3593        let outcome = run(
3594            &plan,
3595            &mut manifest,
3596            &[d],
3597            &ScriptedHttp::new(),
3598            &fs_new(),
3599            &StubFfmpeg::flac(),
3600            &RecordingClock::new(),
3601            &ExecOptions::default(),
3602        );
3603
3604        assert_eq!(outcome.skipped, 1);
3605        assert!(
3606            manifest.get("s1").unwrap().preserve,
3607            "a copy-held skip must mark the entry preserved"
3608        );
3609    }
3610
3611    #[test]
3612    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3613        let c = clip("s2");
3614        let d = desired(c.clone(), AudioFormat::Mp3);
3615        let plan = Plan {
3616            actions: vec![Action::Skip {
3617                clip_id: "s2".to_owned(),
3618            }],
3619        };
3620        let mut manifest = Manifest::new();
3621        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3622        stale.preserve = true;
3623        manifest.insert("s2".to_owned(), stale);
3624
3625        run(
3626            &plan,
3627            &mut manifest,
3628            &[d],
3629            &ScriptedHttp::new(),
3630            &fs_new(),
3631            &StubFfmpeg::flac(),
3632            &RecordingClock::new(),
3633            &ExecOptions::default(),
3634        );
3635
3636        assert!(
3637            !manifest.get("s2").unwrap().preserve,
3638            "a mirror-only skip must clear a stale preserve marker"
3639        );
3640    }
3641
3642    #[test]
3643    fn flac_render_retries_a_rate_limited_wav_lookup() {
3644        let c = clip("rl");
3645        let d = desired(c.clone(), AudioFormat::Flac);
3646        let plan = Plan {
3647            actions: vec![Action::Download {
3648                clip: c.clone(),
3649                lineage: LineageContext::own_root(&c),
3650                path: d.path.clone(),
3651                format: AudioFormat::Flac,
3652            }],
3653        };
3654        let http = ScriptedHttp::new()
3655            .with_auth()
3656            .route_seq(
3657                "/wav_file/",
3658                vec![
3659                    Reply::status(429),
3660                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3661                ],
3662            )
3663            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3664        let clock = RecordingClock::new();
3665        let mut manifest = Manifest::new();
3666
3667        let outcome = run(
3668            &plan,
3669            &mut manifest,
3670            &[d],
3671            &http,
3672            &fs_new(),
3673            &StubFfmpeg::flac(),
3674            &clock,
3675            &small_poll(),
3676        );
3677
3678        assert_eq!(outcome.downloaded, 1);
3679        assert_eq!(outcome.failed(), 0);
3680        // The render was ready on retry, so no fresh convert_wav was needed.
3681        assert_eq!(http.count("/convert_wav/"), 0);
3682        // One transient backoff (1s base), not the 5s poll interval.
3683        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3684    }
3685
3686    // ── Phase 6: artifact actions ───────────────────────────────────
3687
3688    #[test]
3689    fn write_artifact_fetches_writes_and_updates_manifest() {
3690        // The owning entry exists (its audio was kept this run); WriteArtifact
3691        // fetches the source, writes the sidecar, and records it on the entry.
3692        let mut manifest = Manifest::new();
3693        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3694        let plan = Plan {
3695            actions: vec![Action::WriteArtifact {
3696                kind: ArtifactKind::CoverJpg,
3697                path: "a/cover.jpg".to_owned(),
3698                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3699                hash: "h1".to_owned(),
3700                owner_id: "a".to_owned(),
3701                content: None,
3702            }],
3703        };
3704        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3705        let fs = MemFs::new();
3706
3707        let outcome = run(
3708            &plan,
3709            &mut manifest,
3710            &[],
3711            &http,
3712            &fs,
3713            &StubFfmpeg::flac(),
3714            &RecordingClock::new(),
3715            &ExecOptions::default(),
3716        );
3717
3718        assert_eq!(outcome.artifacts_written, 1);
3719        assert_eq!(outcome.failed(), 0);
3720        assert_eq!(outcome.status, RunStatus::Completed);
3721        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3722        assert_eq!(
3723            manifest.get("a").unwrap().cover_jpg,
3724            Some(ArtifactState {
3725                path: "a/cover.jpg".to_owned(),
3726                hash: "h1".to_owned(),
3727            })
3728        );
3729    }
3730
3731    #[test]
3732    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3733        // A generated text sidecar carries its body inline, so it is written
3734        // verbatim with NO HTTP fetch and the details slot records its state.
3735        let mut manifest = Manifest::new();
3736        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3737        let plan = Plan {
3738            actions: vec![Action::WriteArtifact {
3739                kind: ArtifactKind::DetailsTxt,
3740                path: "a.details.txt".to_owned(),
3741                source_url: String::new(),
3742                hash: "dh".to_owned(),
3743                owner_id: "a".to_owned(),
3744                content: Some("Title: A\n".to_owned()),
3745            }],
3746        };
3747        // An empty HTTP script: any fetch would fail, proving none happens.
3748        let http = ScriptedHttp::new();
3749        let fs = MemFs::new();
3750
3751        let outcome = run(
3752            &plan,
3753            &mut manifest,
3754            &[],
3755            &http,
3756            &fs,
3757            &StubFfmpeg::flac(),
3758            &RecordingClock::new(),
3759            &ExecOptions::default(),
3760        );
3761
3762        assert_eq!(outcome.artifacts_written, 1);
3763        assert_eq!(outcome.failed(), 0);
3764        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3765        assert_eq!(
3766            manifest.get("a").unwrap().details_txt,
3767            Some(ArtifactState {
3768                path: "a.details.txt".to_owned(),
3769                hash: "dh".to_owned(),
3770            })
3771        );
3772    }
3773
3774    #[test]
3775    fn write_lyrics_sidecar_relocation_removes_old_file() {
3776        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3777        // the executor writes the new file and prunes the stale one.
3778        let mut manifest = Manifest::new();
3779        let mut e = entry("old/a.flac", AudioFormat::Flac);
3780        e.lyrics_txt = Some(ArtifactState {
3781            path: "old/a.lyrics.txt".to_owned(),
3782            hash: "lh".to_owned(),
3783        });
3784        manifest.insert("a", e);
3785        let fs = MemFs::new()
3786            .with_file("old/a.flac", b"AUDIO".to_vec())
3787            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3788        let plan = Plan {
3789            actions: vec![Action::WriteArtifact {
3790                kind: ArtifactKind::LyricsTxt,
3791                path: "new/a.lyrics.txt".to_owned(),
3792                source_url: String::new(),
3793                hash: "lh".to_owned(),
3794                owner_id: "a".to_owned(),
3795                content: Some("new words\n".to_owned()),
3796            }],
3797        };
3798
3799        let outcome = run(
3800            &plan,
3801            &mut manifest,
3802            &[],
3803            &ScriptedHttp::new(),
3804            &fs,
3805            &StubFfmpeg::flac(),
3806            &RecordingClock::new(),
3807            &ExecOptions::default(),
3808        );
3809
3810        assert_eq!(outcome.failed(), 0);
3811        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3812        assert!(!fs.exists("old/a.lyrics.txt"));
3813        assert_eq!(
3814            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3815            "new/a.lyrics.txt"
3816        );
3817    }
3818
3819    #[test]
3820    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3821        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3822        // Each write's inline old-path cleanup must skip a path another action
3823        // writes this run, or the second write would delete the first's freshly
3824        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3825        // for every sidecar, including the .mp4 video.
3826        let mut manifest = Manifest::new();
3827        let mut a = entry("a.flac", AudioFormat::Flac);
3828        a.lyrics_txt = Some(ArtifactState {
3829            path: "x.lyrics.txt".to_owned(),
3830            hash: "ah".to_owned(),
3831        });
3832        manifest.insert("a", a);
3833        let mut b = entry("b.flac", AudioFormat::Flac);
3834        b.lyrics_txt = Some(ArtifactState {
3835            path: "y.lyrics.txt".to_owned(),
3836            hash: "bh".to_owned(),
3837        });
3838        manifest.insert("b", b);
3839        let fs = MemFs::new()
3840            .with_file("a.flac", b"A".to_vec())
3841            .with_file("b.flac", b"B".to_vec())
3842            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3843            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3844        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3845        let plan = Plan {
3846            actions: vec![
3847                Action::WriteArtifact {
3848                    kind: ArtifactKind::LyricsTxt,
3849                    path: "y.lyrics.txt".to_owned(),
3850                    source_url: String::new(),
3851                    hash: "ah".to_owned(),
3852                    owner_id: "a".to_owned(),
3853                    content: Some("A words\n".to_owned()),
3854                },
3855                Action::WriteArtifact {
3856                    kind: ArtifactKind::LyricsTxt,
3857                    path: "x.lyrics.txt".to_owned(),
3858                    source_url: String::new(),
3859                    hash: "bh".to_owned(),
3860                    owner_id: "b".to_owned(),
3861                    content: Some("B words\n".to_owned()),
3862                },
3863            ],
3864        };
3865
3866        let outcome = run(
3867            &plan,
3868            &mut manifest,
3869            &[],
3870            &ScriptedHttp::new(),
3871            &fs,
3872            &StubFfmpeg::flac(),
3873            &RecordingClock::new(),
3874            &ExecOptions::default(),
3875        );
3876
3877        assert_eq!(outcome.failed(), 0);
3878        // Both freshly written files survive; neither cleanup clobbered the other.
3879        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3880        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3881        assert_eq!(
3882            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3883            "y.lyrics.txt"
3884        );
3885        assert_eq!(
3886            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3887            "x.lyrics.txt"
3888        );
3889    }
3890
3891    #[test]
3892    fn old_sidecar_kept_when_another_clip_still_references_it() {
3893        // A prior failed swap can leave two clips pointing at one path (A -> y and
3894        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3895        // still A's live file (#76). tracked_paths counts two references to y, so
3896        // the removal is skipped even though y is not a write target this run.
3897        let mut manifest = Manifest::new();
3898        let mut a = entry("a.flac", AudioFormat::Flac);
3899        a.lyrics_txt = Some(ArtifactState {
3900            path: "y.lyrics.txt".to_owned(),
3901            hash: "ah".to_owned(),
3902        });
3903        manifest.insert("a", a);
3904        let mut b = entry("b.flac", AudioFormat::Flac);
3905        b.lyrics_txt = Some(ArtifactState {
3906            path: "y.lyrics.txt".to_owned(),
3907            hash: "bh".to_owned(),
3908        });
3909        manifest.insert("b", b);
3910        let fs = MemFs::new()
3911            .with_file("a.flac", b"A".to_vec())
3912            .with_file("b.flac", b"B".to_vec())
3913            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3914        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3915        // the tracked-reference count is what protects A's file.
3916        let plan = Plan {
3917            actions: vec![Action::WriteArtifact {
3918                kind: ArtifactKind::LyricsTxt,
3919                path: "x.lyrics.txt".to_owned(),
3920                source_url: String::new(),
3921                hash: "bh".to_owned(),
3922                owner_id: "b".to_owned(),
3923                content: Some("B words\n".to_owned()),
3924            }],
3925        };
3926
3927        let outcome = run(
3928            &plan,
3929            &mut manifest,
3930            &[],
3931            &ScriptedHttp::new(),
3932            &fs,
3933            &StubFfmpeg::flac(),
3934            &RecordingClock::new(),
3935            &ExecOptions::default(),
3936        );
3937
3938        assert_eq!(outcome.failed(), 0);
3939        assert!(
3940            fs.exists("y.lyrics.txt"),
3941            "A's live sidecar must not be deleted"
3942        );
3943        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3944    }
3945
3946    #[test]
3947    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3948        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3949        // When BOTH move away this run, the path is no longer live, so the last
3950        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3951        // still referenced. The dynamic reference count drops to zero only after
3952        // both moves, so exactly the final cleanup removes it (#76).
3953        let mut manifest = Manifest::new();
3954        let mut a = entry("a.flac", AudioFormat::Flac);
3955        a.lyrics_txt = Some(ArtifactState {
3956            path: "s.lyrics.txt".to_owned(),
3957            hash: "ah".to_owned(),
3958        });
3959        manifest.insert("a", a);
3960        let mut b = entry("b.flac", AudioFormat::Flac);
3961        b.lyrics_txt = Some(ArtifactState {
3962            path: "s.lyrics.txt".to_owned(),
3963            hash: "bh".to_owned(),
3964        });
3965        manifest.insert("b", b);
3966        let fs = MemFs::new()
3967            .with_file("a.flac", b"A".to_vec())
3968            .with_file("b.flac", b"B".to_vec())
3969            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3970        let plan = Plan {
3971            actions: vec![
3972                Action::WriteArtifact {
3973                    kind: ArtifactKind::LyricsTxt,
3974                    path: "pa.lyrics.txt".to_owned(),
3975                    source_url: String::new(),
3976                    hash: "ah".to_owned(),
3977                    owner_id: "a".to_owned(),
3978                    content: Some("A words\n".to_owned()),
3979                },
3980                Action::WriteArtifact {
3981                    kind: ArtifactKind::LyricsTxt,
3982                    path: "pb.lyrics.txt".to_owned(),
3983                    source_url: String::new(),
3984                    hash: "bh".to_owned(),
3985                    owner_id: "b".to_owned(),
3986                    content: Some("B words\n".to_owned()),
3987                },
3988            ],
3989        };
3990
3991        let outcome = run(
3992            &plan,
3993            &mut manifest,
3994            &[],
3995            &ScriptedHttp::new(),
3996            &fs,
3997            &StubFfmpeg::flac(),
3998            &RecordingClock::new(),
3999            &ExecOptions::default(),
4000        );
4001
4002        assert_eq!(outcome.failed(), 0);
4003        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
4004        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
4005        assert!(
4006            !fs.exists("s.lyrics.txt"),
4007            "the vacated shared path must be reclaimed, not orphaned"
4008        );
4009    }
4010
4011    #[test]
4012    fn write_text_sidecar_skipped_when_owner_audio_absent() {
4013        // A text sidecar for a clip with no manifest entry (its audio download
4014        // failed) must be skipped, never writing an untracked file.
4015        let plan = Plan {
4016            actions: vec![Action::WriteArtifact {
4017                kind: ArtifactKind::DetailsTxt,
4018                path: "gone.details.txt".to_owned(),
4019                source_url: String::new(),
4020                hash: "dh".to_owned(),
4021                owner_id: "gone".to_owned(),
4022                content: Some("Title: Gone\n".to_owned()),
4023            }],
4024        };
4025        let fs = MemFs::new();
4026        let mut manifest = Manifest::new();
4027
4028        let outcome = run(
4029            &plan,
4030            &mut manifest,
4031            &[],
4032            &ScriptedHttp::new(),
4033            &fs,
4034            &StubFfmpeg::flac(),
4035            &RecordingClock::new(),
4036            &ExecOptions::default(),
4037        );
4038
4039        assert_eq!(outcome.artifacts_written, 0);
4040        assert_eq!(outcome.skipped, 1);
4041        assert!(!fs.exists("gone.details.txt"));
4042        assert!(manifest.get("gone").is_none());
4043    }
4044
4045    #[test]
4046    fn delete_artifact_removes_file_and_clears_slot() {
4047        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
4048        let mut manifest = Manifest::new();
4049        let mut e = entry("a.mp3", AudioFormat::Mp3);
4050        e.cover_jpg = Some(ArtifactState {
4051            path: "a/cover.jpg".to_owned(),
4052            hash: "h1".to_owned(),
4053        });
4054        manifest.insert("a", e);
4055        let plan = Plan {
4056            actions: vec![Action::DeleteArtifact {
4057                kind: ArtifactKind::CoverJpg,
4058                path: "a/cover.jpg".to_owned(),
4059                owner_id: "a".to_owned(),
4060            }],
4061        };
4062
4063        let outcome = run(
4064            &plan,
4065            &mut manifest,
4066            &[],
4067            &ScriptedHttp::new(),
4068            &fs,
4069            &StubFfmpeg::flac(),
4070            &RecordingClock::new(),
4071            &ExecOptions::default(),
4072        );
4073
4074        assert_eq!(outcome.artifacts_deleted, 1);
4075        assert!(!fs.exists("a/cover.jpg"));
4076        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4077    }
4078
4079    #[test]
4080    fn delete_artifact_tolerates_already_absent_file() {
4081        // `remove` is idempotent, so co-deleting a sidecar that is already gone
4082        // is not a failure.
4083        let mut manifest = Manifest::new();
4084        let mut e = entry("a.mp3", AudioFormat::Mp3);
4085        e.cover_jpg = Some(ArtifactState {
4086            path: "a/cover.jpg".to_owned(),
4087            hash: "h1".to_owned(),
4088        });
4089        manifest.insert("a", e);
4090        let plan = Plan {
4091            actions: vec![Action::DeleteArtifact {
4092                kind: ArtifactKind::CoverJpg,
4093                path: "a/cover.jpg".to_owned(),
4094                owner_id: "a".to_owned(),
4095            }],
4096        };
4097
4098        let outcome = run(
4099            &plan,
4100            &mut manifest,
4101            &[],
4102            &ScriptedHttp::new(),
4103            &MemFs::new(),
4104            &StubFfmpeg::flac(),
4105            &RecordingClock::new(),
4106            &ExecOptions::default(),
4107        );
4108
4109        assert_eq!(outcome.artifacts_deleted, 1);
4110        assert_eq!(outcome.failed(), 0);
4111        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4112    }
4113
4114    #[test]
4115    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
4116        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
4117        // the run continues and the following WriteArtifact still succeeds.
4118        let mut manifest = Manifest::new();
4119        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4120        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4121        let plan = Plan {
4122            actions: vec![
4123                Action::WriteArtifact {
4124                    kind: ArtifactKind::CoverJpg,
4125                    path: "a/cover.jpg".to_owned(),
4126                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4127                    hash: "h1".to_owned(),
4128                    owner_id: "a".to_owned(),
4129                    content: None,
4130                },
4131                Action::WriteArtifact {
4132                    kind: ArtifactKind::CoverJpg,
4133                    path: "b/cover.jpg".to_owned(),
4134                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4135                    hash: "h2".to_owned(),
4136                    owner_id: "b".to_owned(),
4137                    content: None,
4138                },
4139            ],
4140        };
4141        let http = ScriptedHttp::new()
4142            .route("a/large.jpg", Reply::status(404))
4143            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4144        let fs = MemFs::new();
4145
4146        let outcome = run(
4147            &plan,
4148            &mut manifest,
4149            &[],
4150            &http,
4151            &fs,
4152            &StubFfmpeg::flac(),
4153            &RecordingClock::new(),
4154            &ExecOptions::default(),
4155        );
4156
4157        assert_eq!(outcome.status, RunStatus::Completed);
4158        assert_eq!(outcome.failed(), 1);
4159        assert_eq!(outcome.failures[0].clip_id, "a");
4160        assert_eq!(outcome.artifacts_written, 1);
4161        // The failed sidecar left no file and no manifest record.
4162        assert!(!fs.exists("a/cover.jpg"));
4163        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4164        // The following sidecar was written and recorded.
4165        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4166        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4167    }
4168
4169    #[test]
4170    fn stranded_old_sidecar_removed_when_colliding_writer_fails() {
4171        // #142: clip A moves its cover shared -> a/cover.jpg (fetch succeeds);
4172        // clip B is planned to write the vacated `shared` path but its fetch
4173        // fails. The old-path cleanup is gated on COMMITTED writes, not planned
4174        // ones, so B's failed write no longer protects the stale file: A's old
4175        // `shared` copy is removed rather than left as an untracked orphan.
4176        let mut manifest = Manifest::new();
4177        let mut a = entry("a.mp3", AudioFormat::Mp3);
4178        a.cover_jpg = Some(ArtifactState {
4179            path: "shared/cover.jpg".to_owned(),
4180            hash: "ha".to_owned(),
4181        });
4182        manifest.insert("a", a);
4183        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4184        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4185        let plan = Plan {
4186            actions: vec![
4187                Action::WriteArtifact {
4188                    kind: ArtifactKind::CoverJpg,
4189                    path: "a/cover.jpg".to_owned(),
4190                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4191                    hash: "ha".to_owned(),
4192                    owner_id: "a".to_owned(),
4193                    content: None,
4194                },
4195                Action::WriteArtifact {
4196                    kind: ArtifactKind::CoverJpg,
4197                    path: "shared/cover.jpg".to_owned(),
4198                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4199                    hash: "hb".to_owned(),
4200                    owner_id: "b".to_owned(),
4201                    content: None,
4202                },
4203            ],
4204        };
4205        let http = ScriptedHttp::new()
4206            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4207            .route("b/large.jpg", Reply::status(404));
4208
4209        let outcome = run(
4210            &plan,
4211            &mut manifest,
4212            &[],
4213            &http,
4214            &fs,
4215            &StubFfmpeg::flac(),
4216            &RecordingClock::new(),
4217            &ExecOptions::default(),
4218        );
4219
4220        assert_eq!(outcome.failed(), 1);
4221        assert_eq!(outcome.failures[0].clip_id, "b");
4222        // A's move committed; the vacated file is gone, not an orphan.
4223        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4224        assert!(
4225            !fs.exists("shared/cover.jpg"),
4226            "the vacated file must be removed once the colliding writer failed"
4227        );
4228        assert_eq!(
4229            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4230            "a/cover.jpg"
4231        );
4232    }
4233
4234    #[test]
4235    fn committed_write_at_old_path_is_preserved() {
4236        // #142: clip B writes `shared` and commits BEFORE clip A vacates it
4237        // (A moves shared -> a/cover.jpg). A's cleanup sees `shared` in the
4238        // committed set and keeps B's freshly written file rather than deleting
4239        // it. This is the successful-collision case the guard must still protect.
4240        let mut manifest = Manifest::new();
4241        let mut a = entry("a.mp3", AudioFormat::Mp3);
4242        a.cover_jpg = Some(ArtifactState {
4243            path: "shared/cover.jpg".to_owned(),
4244            hash: "ha".to_owned(),
4245        });
4246        manifest.insert("a", a);
4247        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4248        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4249        let plan = Plan {
4250            actions: vec![
4251                Action::WriteArtifact {
4252                    kind: ArtifactKind::CoverJpg,
4253                    path: "shared/cover.jpg".to_owned(),
4254                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4255                    hash: "hb".to_owned(),
4256                    owner_id: "b".to_owned(),
4257                    content: None,
4258                },
4259                Action::WriteArtifact {
4260                    kind: ArtifactKind::CoverJpg,
4261                    path: "a/cover.jpg".to_owned(),
4262                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4263                    hash: "ha".to_owned(),
4264                    owner_id: "a".to_owned(),
4265                    content: None,
4266                },
4267            ],
4268        };
4269        let http = ScriptedHttp::new()
4270            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()))
4271            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()));
4272
4273        let outcome = run(
4274            &plan,
4275            &mut manifest,
4276            &[],
4277            &http,
4278            &fs,
4279            &StubFfmpeg::flac(),
4280            &RecordingClock::new(),
4281            &ExecOptions::default(),
4282        );
4283
4284        assert_eq!(outcome.failed(), 0);
4285        // B's committed write survives A's subsequent move; both files are present.
4286        assert_eq!(fs.read_file("shared/cover.jpg").unwrap(), b"jpg-b");
4287        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4288        assert_eq!(
4289            manifest.get("b").unwrap().cover_jpg.as_ref().unwrap().path,
4290            "shared/cover.jpg"
4291        );
4292        assert_eq!(
4293            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4294            "a/cover.jpg"
4295        );
4296    }
4297
4298    #[test]
4299    fn cover_move_renames_without_fetching() {
4300        // #141: a MoveArtifact relocates the cover with a local rename. The
4301        // ScriptedHttp has no route, so any fetch would fail the run; a clean
4302        // outcome proves the bytes were renamed, not re-downloaded.
4303        let mut manifest = Manifest::new();
4304        let mut e = entry("a.mp3", AudioFormat::Mp3);
4305        e.cover_jpg = Some(ArtifactState {
4306            path: "old/cover.jpg".to_owned(),
4307            hash: "h".to_owned(),
4308        });
4309        manifest.insert("a", e);
4310        let fs = MemFs::new().with_file("old/cover.jpg", b"JPGBYTES".to_vec());
4311        let plan = Plan {
4312            actions: vec![Action::MoveArtifact {
4313                kind: ArtifactKind::CoverJpg,
4314                from: "old/cover.jpg".to_owned(),
4315                to: "new/cover.jpg".to_owned(),
4316                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4317                hash: "h".to_owned(),
4318                owner_id: "a".to_owned(),
4319            }],
4320        };
4321
4322        let outcome = run(
4323            &plan,
4324            &mut manifest,
4325            &[],
4326            &ScriptedHttp::new(),
4327            &fs,
4328            &StubFfmpeg::flac(),
4329            &RecordingClock::new(),
4330            &ExecOptions::default(),
4331        );
4332
4333        assert_eq!(outcome.failed(), 0);
4334        assert_eq!(outcome.renamed, 1, "counted as a rename, not a write");
4335        // Renamed in place: the new path carries the ORIGINAL bytes, old is gone.
4336        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"JPGBYTES");
4337        assert!(!fs.exists("old/cover.jpg"));
4338        assert_eq!(
4339            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4340            "new/cover.jpg"
4341        );
4342    }
4343
4344    #[test]
4345    fn cover_move_falls_back_to_fetch_when_old_file_missing() {
4346        // #141: the old file vanished before commit, so the rename fails and the
4347        // executor fetches fresh bytes at the new path rather than failing.
4348        let mut manifest = Manifest::new();
4349        let mut e = entry("a.mp3", AudioFormat::Mp3);
4350        e.cover_jpg = Some(ArtifactState {
4351            path: "old/cover.jpg".to_owned(),
4352            hash: "h".to_owned(),
4353        });
4354        manifest.insert("a", e);
4355        let fs = MemFs::new(); // old/cover.jpg is absent.
4356        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED".to_vec()));
4357        let plan = Plan {
4358            actions: vec![Action::MoveArtifact {
4359                kind: ArtifactKind::CoverJpg,
4360                from: "old/cover.jpg".to_owned(),
4361                to: "new/cover.jpg".to_owned(),
4362                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4363                hash: "h".to_owned(),
4364                owner_id: "a".to_owned(),
4365            }],
4366        };
4367
4368        let outcome = run(
4369            &plan,
4370            &mut manifest,
4371            &[],
4372            &http,
4373            &fs,
4374            &StubFfmpeg::flac(),
4375            &RecordingClock::new(),
4376            &ExecOptions::default(),
4377        );
4378
4379        assert_eq!(outcome.failed(), 0);
4380        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"FETCHED");
4381        assert_eq!(
4382            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4383            "new/cover.jpg"
4384        );
4385    }
4386
4387    #[test]
4388    fn cover_move_falls_back_when_source_co_referenced() {
4389        // Two clips' covers share old/cover.jpg after a prior failed swap. A move
4390        // for `a` must NOT rename the shared file away (that would strand `b`); it
4391        // falls back to a fetch, and `b`'s file survives.
4392        let mut manifest = Manifest::new();
4393        let mut a = entry("a.mp3", AudioFormat::Mp3);
4394        a.cover_jpg = Some(ArtifactState {
4395            path: "old/cover.jpg".to_owned(),
4396            hash: "h".to_owned(),
4397        });
4398        manifest.insert("a", a);
4399        let mut b = entry("b.mp3", AudioFormat::Mp3);
4400        b.cover_jpg = Some(ArtifactState {
4401            path: "old/cover.jpg".to_owned(),
4402            hash: "h".to_owned(),
4403        });
4404        manifest.insert("b", b);
4405        let fs = MemFs::new().with_file("old/cover.jpg", b"SHARED".to_vec());
4406        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED-A".to_vec()));
4407        // Only `a` moves this run: old/cover.jpg -> a/cover.jpg.
4408        let plan = Plan {
4409            actions: vec![Action::MoveArtifact {
4410                kind: ArtifactKind::CoverJpg,
4411                from: "old/cover.jpg".to_owned(),
4412                to: "a/cover.jpg".to_owned(),
4413                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4414                hash: "h".to_owned(),
4415                owner_id: "a".to_owned(),
4416            }],
4417        };
4418
4419        let outcome = run(
4420            &plan,
4421            &mut manifest,
4422            &[],
4423            &http,
4424            &fs,
4425            &StubFfmpeg::flac(),
4426            &RecordingClock::new(),
4427            &ExecOptions::default(),
4428        );
4429
4430        assert_eq!(outcome.failed(), 0);
4431        // `a` got a fresh fetched copy; `b`'s shared file is untouched.
4432        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"FETCHED-A");
4433        assert_eq!(
4434            fs.read_file("old/cover.jpg").unwrap(),
4435            b"SHARED",
4436            "the co-referenced file must survive"
4437        );
4438    }
4439
4440    #[test]
4441    fn stem_move_renames_without_refetch() {
4442        // #141: a MoveStem relocates the raw stem with a rename; no route is set,
4443        // so a clean outcome proves it did not re-render or re-fetch.
4444        let mut manifest = Manifest::new();
4445        let mut e = entry("a.flac", AudioFormat::Flac);
4446        e.stems.insert(
4447            "voc".to_owned(),
4448            ArtifactState {
4449                path: "old.stems/voc.mp3".to_owned(),
4450                hash: "h1".to_owned(),
4451            },
4452        );
4453        manifest.insert("a", e);
4454        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"STEMBYTES".to_vec());
4455        let plan = Plan {
4456            actions: vec![Action::MoveStem {
4457                clip_id: "a".to_owned(),
4458                key: "voc".to_owned(),
4459                stem_id: "voc".to_owned(),
4460                from: "old.stems/voc.mp3".to_owned(),
4461                to: "new.stems/voc.mp3".to_owned(),
4462                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4463                format: StemFormat::Mp3,
4464                hash: "h1".to_owned(),
4465            }],
4466        };
4467
4468        let outcome = run(
4469            &plan,
4470            &mut manifest,
4471            &[],
4472            &ScriptedHttp::new(),
4473            &fs,
4474            &StubFfmpeg::flac(),
4475            &RecordingClock::new(),
4476            &ExecOptions::default(),
4477        );
4478
4479        assert_eq!(outcome.failed(), 0);
4480        assert_eq!(outcome.renamed, 1);
4481        assert_eq!(fs.read_file("new.stems/voc.mp3").unwrap(), b"STEMBYTES");
4482        assert!(!fs.exists("old.stems/voc.mp3"));
4483        assert_eq!(
4484            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4485            "new.stems/voc.mp3"
4486        );
4487    }
4488
4489    #[test]
4490    fn stem_move_falls_back_to_fetch_when_source_co_referenced() {
4491        // Two clips' stems share shared.stems/voc.mp3 after a partially-failed
4492        // swap (the file holds `a`'s bytes). When `b` moves it, move_stem must NOT
4493        // rename the shared file under `b`'s hash (that records `a`'s bytes as
4494        // `b`'s); it falls back to a fetch of `b`'s correct bytes.
4495        let mut manifest = Manifest::new();
4496        let mut a = entry("a.flac", AudioFormat::Flac);
4497        a.stems.insert(
4498            "voc".to_owned(),
4499            ArtifactState {
4500                path: "shared.stems/voc.mp3".to_owned(),
4501                hash: "h".to_owned(),
4502            },
4503        );
4504        manifest.insert("a", a);
4505        let mut b = entry("b.flac", AudioFormat::Flac);
4506        b.stems.insert(
4507            "voc".to_owned(),
4508            ArtifactState {
4509                path: "shared.stems/voc.mp3".to_owned(),
4510                hash: "h".to_owned(),
4511            },
4512        );
4513        manifest.insert("b", b);
4514        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4515        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4516        let plan = Plan {
4517            actions: vec![Action::MoveStem {
4518                clip_id: "b".to_owned(),
4519                key: "voc".to_owned(),
4520                stem_id: "bvoc".to_owned(),
4521                from: "shared.stems/voc.mp3".to_owned(),
4522                to: "b.stems/voc.mp3".to_owned(),
4523                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4524                format: StemFormat::Mp3,
4525                hash: "h".to_owned(),
4526            }],
4527        };
4528
4529        let outcome = run(
4530            &plan,
4531            &mut manifest,
4532            &[],
4533            &http,
4534            &fs,
4535            &StubFfmpeg::flac(),
4536            &RecordingClock::new(),
4537            &ExecOptions::default(),
4538        );
4539
4540        assert_eq!(outcome.failed(), 0);
4541        // b's new stem carries b's freshly fetched bytes, never a's renamed bytes.
4542        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4543        assert_eq!(
4544            fs.read_file("shared.stems/voc.mp3").unwrap(),
4545            b"A-STEM",
4546            "the co-referenced stem must survive"
4547        );
4548    }
4549
4550    #[test]
4551    fn write_stem_keeps_shared_stem_when_co_referenced() {
4552        // Two clips share shared.stems/voc.mp3 after a prior partially-failed swap.
4553        // When `b` writes to a new path, write_stem must NOT remove the shared file;
4554        // clip `a` still references it and its stem must survive.
4555        let mut manifest = Manifest::new();
4556        let mut a = entry("a.flac", AudioFormat::Flac);
4557        a.stems.insert(
4558            "voc".to_owned(),
4559            ArtifactState {
4560                path: "shared.stems/voc.mp3".to_owned(),
4561                hash: "h".to_owned(),
4562            },
4563        );
4564        manifest.insert("a", a);
4565        let mut b = entry("b.flac", AudioFormat::Flac);
4566        b.stems.insert(
4567            "voc".to_owned(),
4568            ArtifactState {
4569                path: "shared.stems/voc.mp3".to_owned(),
4570                hash: "h".to_owned(),
4571            },
4572        );
4573        manifest.insert("b", b);
4574        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4575        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4576        let plan = Plan {
4577            actions: vec![Action::WriteStem {
4578                clip_id: "b".to_owned(),
4579                key: "voc".to_owned(),
4580                stem_id: "bvoc".to_owned(),
4581                path: "b.stems/voc.mp3".to_owned(),
4582                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4583                format: StemFormat::Mp3,
4584                hash: "bh".to_owned(),
4585            }],
4586        };
4587
4588        let outcome = run(
4589            &plan,
4590            &mut manifest,
4591            &[],
4592            &http,
4593            &fs,
4594            &StubFfmpeg::flac(),
4595            &RecordingClock::new(),
4596            &ExecOptions::default(),
4597        );
4598
4599        assert_eq!(outcome.failed(), 0);
4600        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4601        assert_eq!(
4602            fs.read_file("shared.stems/voc.mp3").unwrap(),
4603            b"A-STEM",
4604            "the co-referenced stem must survive"
4605        );
4606    }
4607
4608    #[test]
4609    fn co_delete_executes_audio_delete_then_artifact_delete() {
4610        // The plan orders the audio Delete before its sidecar DeleteArtifact.
4611        // The audio delete removes the manifest entry; the sidecar delete then
4612        // removes the file and tolerates the now-absent entry.
4613        let fs = MemFs::new()
4614            .with_file("gone.mp3", b"DATA".to_vec())
4615            .with_file("gone/cover.jpg", b"jpg".to_vec());
4616        let mut manifest = Manifest::new();
4617        let mut e = entry("gone.mp3", AudioFormat::Mp3);
4618        e.cover_jpg = Some(ArtifactState {
4619            path: "gone/cover.jpg".to_owned(),
4620            hash: "h1".to_owned(),
4621        });
4622        manifest.insert("gone", e);
4623        let plan = Plan {
4624            actions: vec![
4625                Action::Delete {
4626                    path: "gone.mp3".to_owned(),
4627                    clip_id: "gone".to_owned(),
4628                },
4629                Action::DeleteArtifact {
4630                    kind: ArtifactKind::CoverJpg,
4631                    path: "gone/cover.jpg".to_owned(),
4632                    owner_id: "gone".to_owned(),
4633                },
4634            ],
4635        };
4636
4637        let outcome = run(
4638            &plan,
4639            &mut manifest,
4640            &[],
4641            &ScriptedHttp::new(),
4642            &fs,
4643            &StubFfmpeg::flac(),
4644            &RecordingClock::new(),
4645            &ExecOptions::default(),
4646        );
4647
4648        assert_eq!(outcome.deleted, 1);
4649        assert_eq!(outcome.artifacts_deleted, 1);
4650        assert_eq!(outcome.failed(), 0);
4651        assert!(!fs.exists("gone.mp3"));
4652        assert!(!fs.exists("gone/cover.jpg"));
4653        assert!(manifest.get("gone").is_none());
4654    }
4655
4656    #[test]
4657    fn write_stem_mp3_stores_raw_and_records_slot() {
4658        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
4659        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
4660        // keyed slot records the path and hash.
4661        let mut manifest = Manifest::new();
4662        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4663        let plan = Plan {
4664            actions: vec![Action::WriteStem {
4665                clip_id: "a".to_owned(),
4666                key: "voc".to_owned(),
4667                stem_id: "voc".to_owned(),
4668                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4669                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4670                format: StemFormat::Mp3,
4671                hash: "vh".to_owned(),
4672            }],
4673        };
4674        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
4675        let fs = MemFs::new();
4676
4677        let outcome = run(
4678            &plan,
4679            &mut manifest,
4680            &[],
4681            &http,
4682            &fs,
4683            &StubFfmpeg::flac(),
4684            &RecordingClock::new(),
4685            &ExecOptions::default(),
4686        );
4687
4688        assert_eq!(outcome.artifacts_written, 1);
4689        assert_eq!(outcome.failed(), 0);
4690        // Bytes are stored exactly as delivered (no transcode applied).
4691        assert_eq!(
4692            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
4693            b"stem-bytes"
4694        );
4695        // An MP3 stem never renders WAV: no convert_wav, no generation.
4696        assert_eq!(http.count("convert_wav"), 0);
4697        assert_eq!(http.count("/api/gen/"), 0);
4698        assert_eq!(
4699            manifest.get("a").unwrap().stems.get("voc"),
4700            Some(&ArtifactState {
4701                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4702                hash: "vh".to_owned(),
4703            })
4704        );
4705    }
4706
4707    #[test]
4708    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
4709        // A WAV stem (the default) renders the stem clip's lossless WAV through the
4710        // free convert_wav flow keyed on the stem id, then downloads and stores it
4711        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
4712        let mut manifest = Manifest::new();
4713        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4714        let plan = Plan {
4715            actions: vec![Action::WriteStem {
4716                clip_id: "a".to_owned(),
4717                key: "voc".to_owned(),
4718                stem_id: "stemvoc".to_owned(),
4719                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
4720                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
4721                format: StemFormat::Wav,
4722                hash: "vh".to_owned(),
4723            }],
4724        };
4725        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
4726        // (free) and polls again — exactly the main FLAC/WAV render path.
4727        let http = ScriptedHttp::new()
4728            .with_auth()
4729            .route_seq(
4730                "stemvoc/wav_file/",
4731                vec![
4732                    Reply::json("{}"),
4733                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
4734                ],
4735            )
4736            .route("stemvoc/convert_wav/", Reply::status(200))
4737            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
4738        let fs = MemFs::new();
4739
4740        let outcome = run(
4741            &plan,
4742            &mut manifest,
4743            &[],
4744            &http,
4745            &fs,
4746            &StubFfmpeg::flac(),
4747            &RecordingClock::new(),
4748            &small_poll(),
4749        );
4750
4751        assert_eq!(outcome.artifacts_written, 1);
4752        assert_eq!(outcome.failed(), 0);
4753        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
4754        // so the stored bytes are the raw WAV, not a FLAC transcode.
4755        assert_eq!(
4756            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
4757            b"RIFFwav-bytes"
4758        );
4759        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
4760        // The free WAV render ran; no credit-spending generation endpoint did.
4761        assert_eq!(http.count("convert_wav"), 1);
4762        assert_eq!(http.count("stem_task"), 0);
4763        assert_eq!(http.count("separate"), 0);
4764        assert_eq!(
4765            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4766            "a.stems/a - Vocals [stemvoc].wav"
4767        );
4768    }
4769
4770    #[test]
4771    fn write_stem_is_skipped_when_owner_audio_is_absent() {
4772        // No owning manifest entry (audio failed or never existed) => skip with
4773        // no fetch and no write, so a stem is never stranded without its song.
4774        let mut manifest = Manifest::new();
4775        let plan = Plan {
4776            actions: vec![Action::WriteStem {
4777                clip_id: "ghost".to_owned(),
4778                key: "voc".to_owned(),
4779                stem_id: "voc".to_owned(),
4780                path: "ghost.stems/voc.mp3".to_owned(),
4781                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4782                format: StemFormat::Mp3,
4783                hash: "vh".to_owned(),
4784            }],
4785        };
4786        // Empty HTTP script: any fetch would error, proving none happens.
4787        let http = ScriptedHttp::new();
4788        let fs = MemFs::new();
4789
4790        let outcome = run(
4791            &plan,
4792            &mut manifest,
4793            &[],
4794            &http,
4795            &fs,
4796            &StubFfmpeg::flac(),
4797            &RecordingClock::new(),
4798            &ExecOptions::default(),
4799        );
4800
4801        assert_eq!(outcome.skipped, 1);
4802        assert_eq!(outcome.artifacts_written, 0);
4803        assert_eq!(outcome.failed(), 0);
4804        assert!(!fs.exists("ghost.stems/voc.mp3"));
4805    }
4806
4807    #[test]
4808    fn write_stem_relocates_the_old_file_on_a_path_move() {
4809        // The song was renamed, so the stem moves: the new file is written and the
4810        // stale copy at the previously tracked path is removed (moved, not orphaned).
4811        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
4812        let mut manifest = Manifest::new();
4813        let mut e = entry("new.flac", AudioFormat::Flac);
4814        e.stems.insert(
4815            "voc".to_owned(),
4816            ArtifactState {
4817                path: "old.stems/voc.mp3".to_owned(),
4818                hash: "vh".to_owned(),
4819            },
4820        );
4821        manifest.insert("a", e);
4822        let plan = Plan {
4823            actions: vec![Action::WriteStem {
4824                clip_id: "a".to_owned(),
4825                key: "voc".to_owned(),
4826                stem_id: "voc".to_owned(),
4827                path: "new.stems/voc.mp3".to_owned(),
4828                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4829                format: StemFormat::Mp3,
4830                hash: "vh".to_owned(),
4831            }],
4832        };
4833        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4834
4835        let outcome = run(
4836            &plan,
4837            &mut manifest,
4838            &[],
4839            &http,
4840            &fs,
4841            &StubFfmpeg::flac(),
4842            &RecordingClock::new(),
4843            &ExecOptions::default(),
4844        );
4845
4846        assert_eq!(outcome.artifacts_written, 1);
4847        assert!(fs.exists("new.stems/voc.mp3"));
4848        assert!(
4849            !fs.exists("old.stems/voc.mp3"),
4850            "the old stem is moved, not left behind"
4851        );
4852        assert_eq!(
4853            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4854            "new.stems/voc.mp3"
4855        );
4856    }
4857
4858    #[test]
4859    fn delete_stem_removes_file_and_clears_slot() {
4860        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4861        let mut manifest = Manifest::new();
4862        let mut e = entry("a.flac", AudioFormat::Flac);
4863        e.stems.insert(
4864            "voc".to_owned(),
4865            ArtifactState {
4866                path: "a.stems/voc.mp3".to_owned(),
4867                hash: "vh".to_owned(),
4868            },
4869        );
4870        manifest.insert("a", e);
4871        let plan = Plan {
4872            actions: vec![Action::DeleteStem {
4873                clip_id: "a".to_owned(),
4874                key: "voc".to_owned(),
4875                path: "a.stems/voc.mp3".to_owned(),
4876            }],
4877        };
4878
4879        let outcome = run(
4880            &plan,
4881            &mut manifest,
4882            &[],
4883            &ScriptedHttp::new(),
4884            &fs,
4885            &StubFfmpeg::flac(),
4886            &RecordingClock::new(),
4887            &ExecOptions::default(),
4888        );
4889
4890        assert_eq!(outcome.artifacts_deleted, 1);
4891        assert!(!fs.exists("a.stems/voc.mp3"));
4892        assert!(manifest.get("a").unwrap().stems.is_empty());
4893    }
4894
4895    #[test]
4896    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4897        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4898        // pruned by the end-of-run sweep, so it can never be orphaned.
4899        let fs = MemFs::new()
4900            .with_file("song.flac", b"DATA".to_vec())
4901            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4902        assert!(fs.has_dir("song.stems"));
4903        let mut manifest = Manifest::new();
4904        let mut e = entry("song.flac", AudioFormat::Flac);
4905        e.stems.insert(
4906            "voc".to_owned(),
4907            ArtifactState {
4908                path: "song.stems/voc.mp3".to_owned(),
4909                hash: "vh".to_owned(),
4910            },
4911        );
4912        manifest.insert("a", e);
4913        let plan = Plan {
4914            actions: vec![
4915                Action::Delete {
4916                    path: "song.flac".to_owned(),
4917                    clip_id: "a".to_owned(),
4918                },
4919                Action::DeleteStem {
4920                    clip_id: "a".to_owned(),
4921                    key: "voc".to_owned(),
4922                    path: "song.stems/voc.mp3".to_owned(),
4923                },
4924            ],
4925        };
4926
4927        let outcome = run(
4928            &plan,
4929            &mut manifest,
4930            &[],
4931            &ScriptedHttp::new(),
4932            &fs,
4933            &StubFfmpeg::flac(),
4934            &RecordingClock::new(),
4935            &ExecOptions::default(),
4936        );
4937
4938        assert_eq!(outcome.deleted, 1);
4939        assert_eq!(outcome.artifacts_deleted, 1);
4940        assert!(!fs.exists("song.flac"));
4941        assert!(!fs.exists("song.stems/voc.mp3"));
4942        assert!(
4943            !fs.has_dir("song.stems"),
4944            "the emptied .stems folder is pruned"
4945        );
4946        assert!(manifest.get("a").is_none());
4947    }
4948
4949    #[test]
4950    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4951        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4952        // GET over the live page-count + 0-indexed page shape), reconcile them into
4953        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4954        // is GET-only and touches NO `/api/gen/` endpoint at all.
4955        let http = ScriptedHttp::new()
4956            .with_auth()
4957            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4958            .route(
4959                "clip1/stems?page=0",
4960                Reply::json(
4961                    r#"{"stems":[
4962                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
4963                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
4964                    ]}"#,
4965                ),
4966            )
4967            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
4968            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
4969
4970        // List the existing stems through the client (GET-only, free).
4971        let auth = ClerkAuth::new("eyJtoken");
4972        pollster::block_on(auth.authenticate(&http)).unwrap();
4973        let client = SunoClient::new(auth, RecordingClock::new());
4974        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
4975        assert!(complete);
4976        assert_eq!(stems.len(), 2);
4977        assert_eq!(stems[0].label, "Vocals");
4978
4979        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
4980        let mut manifest = Manifest::new();
4981        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
4982        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
4983            .iter()
4984            .map(|s| crate::reconcile::DesiredStem {
4985                key: s.id.clone(),
4986                stem_id: s.id.clone(),
4987                path: format!("clip1.stems/{}.mp3", s.id),
4988                source_url: s.url.clone(),
4989                format: StemFormat::Mp3,
4990                hash: crate::art_url_hash(&s.url),
4991            })
4992            .collect();
4993        let d = Desired {
4994            path: "clip1.flac".to_owned(),
4995            stems: Some(desired_stems),
4996            ..desired(clip("clip1"), AudioFormat::Flac)
4997        };
4998        let local: HashMap<String, crate::reconcile::LocalFile> = [(
4999            "clip1".to_owned(),
5000            crate::reconcile::LocalFile {
5001                exists: true,
5002                size: 100,
5003            },
5004        )]
5005        .into_iter()
5006        .collect();
5007        let sources = [crate::reconcile::SourceStatus {
5008            mode: SourceMode::Mirror,
5009            fully_enumerated: true,
5010        }];
5011        let plan =
5012            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
5013        assert_eq!(plan.stem_writes(), 2);
5014
5015        let fs = MemFs::new();
5016        let outcome = run(
5017            &plan,
5018            &mut manifest,
5019            std::slice::from_ref(&d),
5020            &http,
5021            &fs,
5022            &StubFfmpeg::flac(),
5023            &RecordingClock::new(),
5024            &ExecOptions::default(),
5025        );
5026
5027        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
5028        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
5029        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
5030        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
5031        // generation, no separation).
5032        assert_eq!(http.count("/api/gen/"), 0);
5033        assert_eq!(http.count("stem_task"), 0);
5034        assert_eq!(http.count("separate"), 0);
5035        assert_eq!(http.count("generate"), 0);
5036        // No stem is ever written as FLAC.
5037        assert!(!fs.exists("clip1.stems/s1.flac"));
5038    }
5039
5040    #[test]
5041    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
5042        // End-to-end #100 path with WAV stems (the default): each stem's lossless
5043        // WAV is rendered through the FREE convert_wav flow and stored RAW as
5044        // `.wav`. The mirror makes NO credit-spending generation POST.
5045        let http = ScriptedHttp::new()
5046            .with_auth()
5047            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
5048            .route(
5049                "clip1/stems?page=0",
5050                Reply::json(
5051                    r#"{"stems":[
5052                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
5053                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
5054                    ]}"#,
5055                ),
5056            )
5057            // Each stem's WAV is already rendered, so wav_file returns the url and
5058            // no convert_wav POST is even needed (still free either way).
5059            .route(
5060                "s1/wav_file/",
5061                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
5062            )
5063            .route(
5064                "s2/wav_file/",
5065                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
5066            )
5067            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
5068            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
5069
5070        let auth = ClerkAuth::new("eyJtoken");
5071        pollster::block_on(auth.authenticate(&http)).unwrap();
5072        let client = SunoClient::new(auth, RecordingClock::new());
5073        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
5074
5075        let mut manifest = Manifest::new();
5076        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
5077        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
5078            .iter()
5079            .map(|s| crate::reconcile::DesiredStem {
5080                key: s.id.clone(),
5081                stem_id: s.id.clone(),
5082                path: format!("clip1.stems/{}.wav", s.id),
5083                source_url: s.url.clone(),
5084                format: StemFormat::Wav,
5085                hash: crate::art_url_hash(&s.url),
5086            })
5087            .collect();
5088        let d = Desired {
5089            path: "clip1.flac".to_owned(),
5090            stems: Some(desired_stems),
5091            ..desired(clip("clip1"), AudioFormat::Flac)
5092        };
5093        let local: HashMap<String, crate::reconcile::LocalFile> = [(
5094            "clip1".to_owned(),
5095            crate::reconcile::LocalFile {
5096                exists: true,
5097                size: 100,
5098            },
5099        )]
5100        .into_iter()
5101        .collect();
5102        let sources = [crate::reconcile::SourceStatus {
5103            mode: SourceMode::Mirror,
5104            fully_enumerated: true,
5105        }];
5106        let plan =
5107            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
5108
5109        let fs = MemFs::new();
5110        let outcome = run(
5111            &plan,
5112            &mut manifest,
5113            std::slice::from_ref(&d),
5114            &http,
5115            &fs,
5116            &StubFfmpeg::flac(),
5117            &RecordingClock::new(),
5118            &small_poll(),
5119        );
5120
5121        assert_eq!(outcome.artifacts_written, 2);
5122        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
5123        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
5124        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
5125        assert!(!fs.exists("clip1.stems/s1.flac"));
5126        // No credit-spending generation/separation endpoint is ever hit.
5127        assert_eq!(http.count("stem_task"), 0);
5128        assert_eq!(http.count("separate"), 0);
5129        assert_eq!(http.count("generate"), 0);
5130    }
5131
5132    #[test]
5133    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
5134        // A clip whose Download fails leaves no manifest entry, so its following
5135        // WriteArtifact must not strand an untracked sidecar: it is skipped with
5136        // no fetch and no write. A following healthy clip still succeeds.
5137        let ca = clip("a");
5138        let plan = Plan {
5139            actions: vec![
5140                Action::Download {
5141                    clip: ca.clone(),
5142                    lineage: LineageContext::own_root(&ca),
5143                    path: "a.mp3".to_owned(),
5144                    format: AudioFormat::Mp3,
5145                },
5146                Action::WriteArtifact {
5147                    kind: ArtifactKind::CoverJpg,
5148                    path: "a/cover.jpg".to_owned(),
5149                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5150                    hash: "h1".to_owned(),
5151                    owner_id: "a".to_owned(),
5152                    content: None,
5153                },
5154                Action::WriteArtifact {
5155                    kind: ArtifactKind::CoverJpg,
5156                    path: "b/cover.jpg".to_owned(),
5157                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5158                    hash: "h2".to_owned(),
5159                    owner_id: "b".to_owned(),
5160                    content: None,
5161                },
5162            ],
5163        };
5164        // The Download's audio 404s (permanent), so no entry for "a" is created.
5165        let http = ScriptedHttp::new()
5166            .route("a.mp3", Reply::status(404))
5167            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
5168            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5169        let fs = MemFs::new();
5170        let mut manifest = Manifest::new();
5171        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
5172        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5173
5174        let outcome = run(
5175            &plan,
5176            &mut manifest,
5177            &[],
5178            &http,
5179            &fs,
5180            &StubFfmpeg::flac(),
5181            &RecordingClock::new(),
5182            &ExecOptions::default(),
5183        );
5184
5185        assert_eq!(outcome.status, RunStatus::Completed);
5186        // The audio download is the only failure; the orphan artifact is skipped.
5187        assert_eq!(outcome.failed(), 1);
5188        assert_eq!(outcome.failures[0].clip_id, "a");
5189        assert_eq!(outcome.skipped, 1);
5190        // The orphan sidecar was neither fetched nor written, and left no record.
5191        assert_eq!(http.count("a/large.jpg"), 0);
5192        assert!(!fs.exists("a/cover.jpg"));
5193        assert!(manifest.get("a").is_none());
5194        // The healthy clip's sidecar still succeeded.
5195        assert_eq!(outcome.artifacts_written, 1);
5196        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5197        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5198    }
5199
5200    #[test]
5201    fn write_artifact_transcodes_animated_cover_to_webp() {
5202        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
5203        // port, and writes the transcoded WebP (not the fetched MP4), recording
5204        // the sidecar on the owning entry.
5205        let mut manifest = Manifest::new();
5206        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5207        let plan = Plan {
5208            actions: vec![Action::WriteArtifact {
5209                kind: ArtifactKind::CoverWebp,
5210                path: "a/cover.webp".to_owned(),
5211                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5212                hash: "v1".to_owned(),
5213                owner_id: "a".to_owned(),
5214                content: None,
5215            }],
5216        };
5217        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5218        let fs = MemFs::new();
5219        let ffmpeg = StubFfmpeg::webp();
5220
5221        let outcome = run(
5222            &plan,
5223            &mut manifest,
5224            &[],
5225            &http,
5226            &fs,
5227            &ffmpeg,
5228            &RecordingClock::new(),
5229            &ExecOptions::default(),
5230        );
5231
5232        assert_eq!(outcome.artifacts_written, 1);
5233        assert_eq!(outcome.failed(), 0);
5234        assert_eq!(outcome.status, RunStatus::Completed);
5235        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
5236        assert_eq!(http.count("a/video.mp4"), 1);
5237        let written = fs.read_file("a/cover.webp").unwrap();
5238        assert_ne!(written, b"mp4-bytes");
5239        assert!(written.starts_with(b"RIFF"));
5240        assert_eq!(
5241            manifest.get("a").unwrap().cover_webp,
5242            Some(ArtifactState {
5243                path: "a/cover.webp".to_owned(),
5244                hash: "v1".to_owned(),
5245            })
5246        );
5247    }
5248
5249    #[test]
5250    fn write_artifact_webp_transcode_failure_is_per_clip() {
5251        // A transcode failure is attributed to the owning clip: it is a per-clip
5252        // failure, the run completes, no sidecar is written, and the slot stays
5253        // empty. A healthy static cover in the same run still succeeds.
5254        let mut manifest = Manifest::new();
5255        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5256        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5257        let plan = Plan {
5258            actions: vec![
5259                Action::WriteArtifact {
5260                    kind: ArtifactKind::CoverWebp,
5261                    path: "a/cover.webp".to_owned(),
5262                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5263                    hash: "v1".to_owned(),
5264                    owner_id: "a".to_owned(),
5265                    content: None,
5266                },
5267                Action::WriteArtifact {
5268                    kind: ArtifactKind::CoverJpg,
5269                    path: "b/cover.jpg".to_owned(),
5270                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5271                    hash: "h1".to_owned(),
5272                    owner_id: "b".to_owned(),
5273                    content: None,
5274                },
5275            ],
5276        };
5277        let http = ScriptedHttp::new()
5278            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
5279            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5280        let fs = MemFs::new();
5281
5282        let outcome = run(
5283            &plan,
5284            &mut manifest,
5285            &[],
5286            &http,
5287            &fs,
5288            &StubFfmpeg::failing(),
5289            &RecordingClock::new(),
5290            &ExecOptions::default(),
5291        );
5292
5293        assert_eq!(outcome.status, RunStatus::Completed);
5294        assert_eq!(outcome.failed(), 1);
5295        assert_eq!(outcome.failures[0].clip_id, "a");
5296        // The animated cover failed to transcode: nothing written, slot empty.
5297        assert!(!fs.exists("a/cover.webp"));
5298        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
5299        // The static cover in the same run still succeeded.
5300        assert_eq!(outcome.artifacts_written, 1);
5301        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5302        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5303    }
5304
5305    #[test]
5306    fn write_artifact_uses_configured_webp_settings() {
5307        use std::sync::{Arc, Mutex};
5308
5309        struct RecordingWebpFfmpeg {
5310            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
5311        }
5312
5313        impl Ffmpeg for RecordingWebpFfmpeg {
5314            async fn wav_to_flac(
5315                &self,
5316                _wav: &[u8],
5317            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5318                Ok(Vec::new())
5319            }
5320
5321            async fn mp4_to_webp(
5322                &self,
5323                _mp4: &[u8],
5324                settings: WebpEncodeSettings,
5325            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5326                let seen = Arc::clone(&self.seen);
5327                seen.lock().unwrap().push(settings);
5328                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
5329            }
5330        }
5331
5332        let mut manifest = Manifest::new();
5333        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5334        let plan = Plan {
5335            actions: vec![Action::WriteArtifact {
5336                kind: ArtifactKind::CoverWebp,
5337                path: "a/cover.webp".to_owned(),
5338                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5339                hash: "v1".to_owned(),
5340                owner_id: "a".to_owned(),
5341                content: None,
5342            }],
5343        };
5344        let seen = Arc::new(Mutex::new(Vec::new()));
5345        let ffmpeg = RecordingWebpFfmpeg {
5346            seen: Arc::clone(&seen),
5347        };
5348        let opts = ExecOptions {
5349            cover_webp: WebpEncodeSettings {
5350                quality: 88,
5351                max_fps: 12,
5352                max_width: Some(720),
5353                lossless: false,
5354                compression_level: 4,
5355            },
5356            ..ExecOptions::default()
5357        };
5358
5359        let _ = run(
5360            &plan,
5361            &mut manifest,
5362            &[],
5363            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
5364            &MemFs::new(),
5365            &ffmpeg,
5366            &RecordingClock::new(),
5367            &opts,
5368        );
5369
5370        assert_eq!(
5371            seen.lock().unwrap().as_slice(),
5372            &[WebpEncodeSettings {
5373                quality: 88,
5374                max_fps: 12,
5375                max_width: Some(720),
5376                lossless: false,
5377                compression_level: 4,
5378            }]
5379        );
5380    }
5381
5382    // ── Phase 8: folder art routes to the album store ───────────────
5383
5384    #[test]
5385    fn folder_jpg_write_records_album_state_and_skips_manifest() {
5386        // Folder art is owned by the album root id, not a manifest clip: it
5387        // writes even with an empty manifest and records on the album store.
5388        let mut manifest = Manifest::new();
5389        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5390        let plan = Plan {
5391            actions: vec![Action::WriteArtifact {
5392                kind: ArtifactKind::FolderJpg,
5393                path: "creator/album/folder.jpg".to_owned(),
5394                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5395                hash: "jh".to_owned(),
5396                owner_id: "root".to_owned(),
5397                content: None,
5398            }],
5399        };
5400        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
5401        let fs = MemFs::new();
5402
5403        let outcome = run_with_albums(
5404            &plan,
5405            &mut manifest,
5406            &mut albums,
5407            &[],
5408            &http,
5409            &fs,
5410            &StubFfmpeg::flac(),
5411            &RecordingClock::new(),
5412            &ExecOptions::default(),
5413        );
5414
5415        assert_eq!(outcome.artifacts_written, 1);
5416        assert_eq!(outcome.status, RunStatus::Completed);
5417        assert_eq!(
5418            fs.read_file("creator/album/folder.jpg").unwrap(),
5419            b"folder-jpg"
5420        );
5421        assert_eq!(
5422            albums.get("root").unwrap().folder_jpg,
5423            Some(ArtifactState {
5424                path: "creator/album/folder.jpg".to_owned(),
5425                hash: "jh".to_owned(),
5426            })
5427        );
5428        assert!(manifest.get("root").is_none());
5429    }
5430
5431    #[test]
5432    fn folder_webp_write_transcodes_and_records_album_state() {
5433        let mut manifest = Manifest::new();
5434        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5435        let plan = Plan {
5436            actions: vec![Action::WriteArtifact {
5437                kind: ArtifactKind::FolderWebp,
5438                path: "creator/album/cover.webp".to_owned(),
5439                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5440                hash: "wh".to_owned(),
5441                owner_id: "root".to_owned(),
5442                content: None,
5443            }],
5444        };
5445        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5446        let fs = MemFs::new();
5447
5448        let outcome = run_with_albums(
5449            &plan,
5450            &mut manifest,
5451            &mut albums,
5452            &[],
5453            &http,
5454            &fs,
5455            &StubFfmpeg::webp(),
5456            &RecordingClock::new(),
5457            &ExecOptions::default(),
5458        );
5459
5460        assert_eq!(outcome.artifacts_written, 1);
5461        assert_eq!(outcome.failed(), 0);
5462        // The MP4 was transcoded to WebP, not written verbatim.
5463        let written = fs.read_file("creator/album/cover.webp").unwrap();
5464        assert_ne!(written, b"mp4-bytes");
5465        assert!(written.starts_with(b"RIFF"));
5466        assert_eq!(
5467            albums.get("root").unwrap().folder_webp,
5468            Some(ArtifactState {
5469                path: "creator/album/cover.webp".to_owned(),
5470                hash: "wh".to_owned(),
5471            })
5472        );
5473    }
5474
5475    #[test]
5476    fn folder_mp4_write_keeps_the_source_verbatim() {
5477        let mut manifest = Manifest::new();
5478        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5479        let plan = Plan {
5480            actions: vec![Action::WriteArtifact {
5481                kind: ArtifactKind::FolderMp4,
5482                path: "creator/album/cover.mp4".to_owned(),
5483                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5484                hash: "mh".to_owned(),
5485                owner_id: "root".to_owned(),
5486                content: None,
5487            }],
5488        };
5489        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5490        let fs = MemFs::new();
5491
5492        let outcome = run_with_albums(
5493            &plan,
5494            &mut manifest,
5495            &mut albums,
5496            &[],
5497            &http,
5498            &fs,
5499            &StubFfmpeg::webp(),
5500            &RecordingClock::new(),
5501            &ExecOptions::default(),
5502        );
5503
5504        assert_eq!(outcome.artifacts_written, 1);
5505        assert_eq!(outcome.failed(), 0);
5506        // The raw MP4 is written byte-for-byte, never transcoded.
5507        assert_eq!(
5508            fs.read_file("creator/album/cover.mp4").unwrap(),
5509            b"mp4-bytes"
5510        );
5511        assert_eq!(
5512            albums.get("root").unwrap().folder_mp4,
5513            Some(ArtifactState {
5514                path: "creator/album/cover.mp4".to_owned(),
5515                hash: "mh".to_owned(),
5516            })
5517        );
5518    }
5519
5520    #[test]
5521    fn both_folder_covers_fetch_the_video_cover_once() {
5522        let mut manifest = Manifest::new();
5523        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5524        // `both` retention keeps cover.webp (transcoded) and cover.mp4 (raw) from
5525        // the one video_cover_url. FolderWebp sorts first and caches the fetched
5526        // source; FolderMp4 drains it, so the source is fetched exactly once.
5527        let plan = Plan {
5528            actions: vec![
5529                Action::WriteArtifact {
5530                    kind: ArtifactKind::FolderWebp,
5531                    path: "creator/album/cover.webp".to_owned(),
5532                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5533                    hash: "wh".to_owned(),
5534                    owner_id: "root".to_owned(),
5535                    content: None,
5536                },
5537                Action::WriteArtifact {
5538                    kind: ArtifactKind::FolderMp4,
5539                    path: "creator/album/cover.mp4".to_owned(),
5540                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5541                    hash: "mh".to_owned(),
5542                    owner_id: "root".to_owned(),
5543                    content: None,
5544                },
5545            ],
5546        };
5547        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5548        let fs = MemFs::new();
5549
5550        let outcome = run_with_albums(
5551            &plan,
5552            &mut manifest,
5553            &mut albums,
5554            &[],
5555            &http,
5556            &fs,
5557            &StubFfmpeg::webp(),
5558            &RecordingClock::new(),
5559            &ExecOptions::default(),
5560        );
5561
5562        assert_eq!(outcome.artifacts_written, 2);
5563        assert_eq!(outcome.failed(), 0);
5564        // Fetched exactly once despite two artifacts consuming it (#90 / #89).
5565        assert_eq!(http.count("root/video.mp4"), 1);
5566        // The webp is transcoded; the mp4 is the raw source verbatim.
5567        assert!(
5568            fs.read_file("creator/album/cover.webp")
5569                .unwrap()
5570                .starts_with(b"RIFF")
5571        );
5572        assert_eq!(
5573            fs.read_file("creator/album/cover.mp4").unwrap(),
5574            b"mp4-bytes"
5575        );
5576    }
5577
5578    #[test]
5579    fn folder_art_delete_clears_album_state() {
5580        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
5581        let mut manifest = Manifest::new();
5582        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5583        albums.insert(
5584            "root".to_owned(),
5585            AlbumArt {
5586                folder_jpg: Some(ArtifactState {
5587                    path: "creator/album/folder.jpg".to_owned(),
5588                    hash: "jh".to_owned(),
5589                }),
5590                folder_webp: None,
5591                folder_mp4: None,
5592            },
5593        );
5594        let plan = Plan {
5595            actions: vec![Action::DeleteArtifact {
5596                kind: ArtifactKind::FolderJpg,
5597                path: "creator/album/folder.jpg".to_owned(),
5598                owner_id: "root".to_owned(),
5599            }],
5600        };
5601
5602        let outcome = run_with_albums(
5603            &plan,
5604            &mut manifest,
5605            &mut albums,
5606            &[],
5607            &ScriptedHttp::new(),
5608            &fs,
5609            &StubFfmpeg::flac(),
5610            &RecordingClock::new(),
5611            &ExecOptions::default(),
5612        );
5613
5614        assert_eq!(outcome.artifacts_deleted, 1);
5615        assert!(!fs.exists("creator/album/folder.jpg"));
5616        // The album row had only the one kind, so it is pruned entirely.
5617        assert!(!albums.contains_key("root"));
5618    }
5619
5620    // ── Phase 9: playlist artifacts ─────────────────────────────────
5621
5622    #[test]
5623    fn playlist_write_uses_inline_content_and_records_state() {
5624        // A playlist body is generated, carried inline. With an empty manifest
5625        // and NO http routes, the write still succeeds — proving it skipped the
5626        // network — and records the playlist store keyed by the playlist id.
5627        let mut manifest = Manifest::new();
5628        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5629        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5630        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
5631        let plan = Plan {
5632            actions: vec![Action::WriteArtifact {
5633                kind: ArtifactKind::Playlist,
5634                path: "Road Trip.m3u8".to_owned(),
5635                source_url: String::new(),
5636                hash: "ph1".to_owned(),
5637                owner_id: "pl1".to_owned(),
5638                content: Some(body.to_owned()),
5639            }],
5640        };
5641        let fs = MemFs::new();
5642
5643        let outcome = run_full(
5644            &plan,
5645            &mut manifest,
5646            &mut albums,
5647            &mut playlists,
5648            &[],
5649            &ScriptedHttp::new(),
5650            &fs,
5651            &StubFfmpeg::flac(),
5652            &RecordingClock::new(),
5653            &ExecOptions::default(),
5654        );
5655
5656        assert_eq!(outcome.artifacts_written, 1);
5657        assert_eq!(outcome.failed(), 0);
5658        // The exact inline bytes were written, verbatim.
5659        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
5660        assert_eq!(
5661            playlists.get("pl1"),
5662            Some(&PlaylistState {
5663                name: "Road Trip".to_owned(),
5664                path: "Road Trip.m3u8".to_owned(),
5665                hash: "ph1".to_owned(),
5666            })
5667        );
5668    }
5669
5670    #[test]
5671    fn playlist_delete_removes_file_and_clears_state() {
5672        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
5673        let mut manifest = Manifest::new();
5674        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5675        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5676        playlists.insert(
5677            "pl1".to_owned(),
5678            PlaylistState {
5679                name: "Old".to_owned(),
5680                path: "Old.m3u8".to_owned(),
5681                hash: "ph1".to_owned(),
5682            },
5683        );
5684        let plan = Plan {
5685            actions: vec![Action::DeleteArtifact {
5686                kind: ArtifactKind::Playlist,
5687                path: "Old.m3u8".to_owned(),
5688                owner_id: "pl1".to_owned(),
5689            }],
5690        };
5691
5692        let outcome = run_full(
5693            &plan,
5694            &mut manifest,
5695            &mut albums,
5696            &mut playlists,
5697            &[],
5698            &ScriptedHttp::new(),
5699            &fs,
5700            &StubFfmpeg::flac(),
5701            &RecordingClock::new(),
5702            &ExecOptions::default(),
5703        );
5704
5705        assert_eq!(outcome.artifacts_deleted, 1);
5706        assert!(!fs.exists("Old.m3u8"));
5707        assert!(
5708            !playlists.contains_key("pl1"),
5709            "the playlist row is cleared on delete"
5710        );
5711    }
5712
5713    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
5714
5715    #[test]
5716    fn rename_move_relocates_cover_and_prunes_old_album() {
5717        // A title/album change moves the audio (Rename) and re-emits the cover
5718        // at the NEW path. The old cover must be removed and the now-empty old
5719        // album directory pruned, leaving no orphan sidecar and no ghost dir.
5720        let mut manifest = Manifest::new();
5721        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
5722        e.cover_jpg = Some(ArtifactState {
5723            path: "Creator/AlbumA/cover.jpg".to_owned(),
5724            hash: "h1".to_owned(),
5725        });
5726        manifest.insert("a", e);
5727        let fs = MemFs::new()
5728            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
5729            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
5730        let plan = Plan {
5731            actions: vec![
5732                Action::Rename {
5733                    from: "Creator/AlbumA/song.flac".to_owned(),
5734                    to: "Creator/AlbumB/song.flac".to_owned(),
5735                },
5736                Action::WriteArtifact {
5737                    kind: ArtifactKind::CoverJpg,
5738                    path: "Creator/AlbumB/cover.jpg".to_owned(),
5739                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5740                    hash: "h1".to_owned(),
5741                    owner_id: "a".to_owned(),
5742                    content: None,
5743                },
5744            ],
5745        };
5746        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
5747
5748        let outcome = run(
5749            &plan,
5750            &mut manifest,
5751            &[],
5752            &http,
5753            &fs,
5754            &StubFfmpeg::flac(),
5755            &RecordingClock::new(),
5756            &ExecOptions::default(),
5757        );
5758
5759        assert_eq!(outcome.failed(), 0);
5760        // Audio moved, the new cover was written, the old cover removed.
5761        assert!(fs.exists("Creator/AlbumB/song.flac"));
5762        assert_eq!(
5763            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
5764            b"new-jpg"
5765        );
5766        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
5767        assert!(!fs.exists("Creator/AlbumA/song.flac"));
5768        // The manifest cover slot now points at the new path.
5769        assert_eq!(
5770            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5771            "Creator/AlbumB/cover.jpg"
5772        );
5773        // The emptied old album directory is pruned; the new one survives.
5774        assert!(!fs.has_dir("Creator/AlbumA"));
5775        assert!(fs.has_dir("Creator/AlbumB"));
5776    }
5777
5778    #[test]
5779    fn rename_move_relocates_folder_art_and_prunes_old_album() {
5780        // An album rename moves folder.jpg: the old file is removed, the album
5781        // store slot advanced to the new path, and the emptied dir pruned.
5782        let mut manifest = Manifest::new();
5783        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5784        albums.insert(
5785            "root".to_owned(),
5786            AlbumArt {
5787                folder_jpg: Some(ArtifactState {
5788                    path: "Creator/AlbumA/folder.jpg".to_owned(),
5789                    hash: "jh".to_owned(),
5790                }),
5791                folder_webp: None,
5792                folder_mp4: None,
5793            },
5794        );
5795        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
5796        let plan = Plan {
5797            actions: vec![Action::WriteArtifact {
5798                kind: ArtifactKind::FolderJpg,
5799                path: "Creator/AlbumB/folder.jpg".to_owned(),
5800                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5801                hash: "jh".to_owned(),
5802                owner_id: "root".to_owned(),
5803                content: None,
5804            }],
5805        };
5806        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
5807
5808        let outcome = run_with_albums(
5809            &plan,
5810            &mut manifest,
5811            &mut albums,
5812            &[],
5813            &http,
5814            &fs,
5815            &StubFfmpeg::flac(),
5816            &RecordingClock::new(),
5817            &ExecOptions::default(),
5818        );
5819
5820        assert_eq!(outcome.failed(), 0);
5821        assert_eq!(
5822            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
5823            b"new-folder"
5824        );
5825        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
5826        assert_eq!(
5827            albums
5828                .get("root")
5829                .unwrap()
5830                .folder_jpg
5831                .as_ref()
5832                .unwrap()
5833                .path,
5834            "Creator/AlbumB/folder.jpg"
5835        );
5836        assert!(!fs.has_dir("Creator/AlbumA"));
5837        assert!(fs.has_dir("Creator/AlbumB"));
5838    }
5839
5840    #[test]
5841    fn prune_empty_dirs_removes_only_empty_dirs() {
5842        // A direct exercise of the prune port's safety guarantees on a mixed
5843        // tree: nested empties go, anything holding a file (hidden ones too)
5844        // stays, and no file is touched.
5845        let fs = MemFs::new()
5846            .with_file("keep/full/song.flac", b"x".to_vec())
5847            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
5848            .with_dir("empty/leaf")
5849            .with_dir("nested/a/b/c");
5850
5851        fs.prune_empty_dirs("").unwrap();
5852
5853        // Every empty directory, however deeply nested, is pruned bottom-up.
5854        for gone in [
5855            "empty",
5856            "empty/leaf",
5857            "nested",
5858            "nested/a",
5859            "nested/a/b",
5860            "nested/a/b/c",
5861        ] {
5862            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
5863        }
5864        // A directory holding any file — including only a hidden dotfile — stays.
5865        assert!(fs.has_dir("keep"));
5866        assert!(fs.has_dir("keep/full"));
5867        assert!(fs.has_dir("hidden"));
5868        // No file was touched.
5869        assert!(fs.exists("keep/full/song.flac"));
5870        assert!(fs.exists("hidden/.suno-manifest.json"));
5871    }
5872
5873    #[test]
5874    fn prune_empty_dirs_never_removes_the_named_root() {
5875        // Pruning under a named root clears its empty children but keeps the
5876        // root itself, even when the root is now empty.
5877        let fs = MemFs::new().with_dir("empty/leaf");
5878        fs.prune_empty_dirs("empty").unwrap();
5879        assert!(fs.has_dir("empty"), "the named root is never removed");
5880        assert!(!fs.has_dir("empty/leaf"));
5881    }
5882
5883    #[test]
5884    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
5885        // If removing the old sidecar fails, the write is a per-clip failure
5886        // that never aborts the run and does NOT advance the state slot, so the
5887        // next identical run re-attempts the cleanup and the tree converges.
5888        let mut manifest = Manifest::new();
5889        let mut e = entry("a.flac", AudioFormat::Flac);
5890        e.cover_jpg = Some(ArtifactState {
5891            path: "AlbumA/cover.jpg".to_owned(),
5892            hash: "h1".to_owned(),
5893        });
5894        manifest.insert("a", e);
5895        let fs = MemFs::new()
5896            .with_file("a.flac", b"AUDIO".to_vec())
5897            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5898        let plan = Plan {
5899            actions: vec![Action::WriteArtifact {
5900                kind: ArtifactKind::CoverJpg,
5901                path: "AlbumB/cover.jpg".to_owned(),
5902                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5903                hash: "h1".to_owned(),
5904                owner_id: "a".to_owned(),
5905                content: None,
5906            }],
5907        };
5908        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5909
5910        // Run 1: the old-cover remove is forced to fail.
5911        fs.arm_fail_remove("AlbumA/cover.jpg");
5912        let first = run(
5913            &plan,
5914            &mut manifest,
5915            &[],
5916            &http,
5917            &fs,
5918            &StubFfmpeg::flac(),
5919            &RecordingClock::new(),
5920            &ExecOptions::default(),
5921        );
5922        assert_eq!(
5923            first.status,
5924            RunStatus::Completed,
5925            "a remove failure never aborts the run"
5926        );
5927        assert_eq!(first.failed(), 1);
5928        // The new cover is written but the old one lingers and the slot is stale.
5929        assert!(fs.exists("AlbumB/cover.jpg"));
5930        assert!(fs.exists("AlbumA/cover.jpg"));
5931        assert_eq!(
5932            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5933            "AlbumA/cover.jpg"
5934        );
5935        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5936
5937        // Run 2: the same plan re-runs with the fault cleared and converges.
5938        fs.disarm_fail_remove("AlbumA/cover.jpg");
5939        let second = run(
5940            &plan,
5941            &mut manifest,
5942            &[],
5943            &http,
5944            &fs,
5945            &StubFfmpeg::flac(),
5946            &RecordingClock::new(),
5947            &ExecOptions::default(),
5948        );
5949        assert_eq!(second.failed(), 0);
5950        assert!(fs.exists("AlbumB/cover.jpg"));
5951        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5952        assert_eq!(
5953            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5954            "AlbumB/cover.jpg"
5955        );
5956        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5957    }
5958
5959    #[test]
5960    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
5961        // The idempotent case: a content-only cover rewrite (hash drift, path
5962        // unchanged) attempts no remove and prunes no live directory. A remove
5963        // failure is armed on the cover path, so any spurious remove would
5964        // surface as a failure — none does.
5965        let mut manifest = Manifest::new();
5966        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
5967        e.cover_jpg = Some(ArtifactState {
5968            path: "Album/cover.jpg".to_owned(),
5969            hash: "h1".to_owned(),
5970        });
5971        manifest.insert("a", e);
5972        let fs = MemFs::new()
5973            .with_file("Album/a.mp3", b"AUDIO".to_vec())
5974            .with_file("Album/cover.jpg", b"old".to_vec());
5975        fs.arm_fail_remove("Album/cover.jpg");
5976        let plan = Plan {
5977            actions: vec![Action::WriteArtifact {
5978                kind: ArtifactKind::CoverJpg,
5979                path: "Album/cover.jpg".to_owned(),
5980                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5981                hash: "h2".to_owned(),
5982                owner_id: "a".to_owned(),
5983                content: None,
5984            }],
5985        };
5986        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5987
5988        let outcome = run(
5989            &plan,
5990            &mut manifest,
5991            &[],
5992            &http,
5993            &fs,
5994            &StubFfmpeg::flac(),
5995            &RecordingClock::new(),
5996            &ExecOptions::default(),
5997        );
5998
5999        assert_eq!(
6000            outcome.failed(),
6001            0,
6002            "no remove is attempted, so the armed failure never fires"
6003        );
6004        assert_eq!(outcome.artifacts_written, 1);
6005        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
6006        assert_eq!(
6007            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
6008            "h2"
6009        );
6010        // The live directory is untouched by prune.
6011        assert!(fs.has_dir("Album"));
6012    }
6013
6014    // ── Concurrency (issue #22) ─────────────────────────────────────
6015
6016    mod concurrency {
6017        use super::*;
6018        use crate::ffmpeg::FfmpegError;
6019        use crate::fs::{FileStat, FsError};
6020        use crate::http::{HttpRequest, TransportError};
6021        use std::future::Future;
6022        use std::pin::Pin;
6023        use std::sync::Arc;
6024        use std::sync::atomic::{AtomicUsize, Ordering};
6025        use std::task::{Context, Poll};
6026
6027        /// A future that pends exactly once before resolving, waking itself so a
6028        /// single-threaded executor re-polls. It forces the [`Http`] port to
6029        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
6030        /// each in-flight request and the true overlap becomes observable.
6031        #[derive(Default)]
6032        struct YieldOnce {
6033            yielded: bool,
6034        }
6035
6036        impl Future for YieldOnce {
6037            type Output = ();
6038            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
6039                if self.yielded {
6040                    Poll::Ready(())
6041                } else {
6042                    self.yielded = true;
6043                    cx.waker().wake_by_ref();
6044                    Poll::Pending
6045                }
6046            }
6047        }
6048
6049        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
6050        /// number of concurrently in-flight requests. Each `send` bumps a live
6051        /// counter, yields once (so peers can start), then delegates.
6052        struct GatedHttp {
6053            inner: ScriptedHttp,
6054            inflight: Arc<AtomicUsize>,
6055            peak: Arc<AtomicUsize>,
6056        }
6057
6058        impl GatedHttp {
6059            fn new(inner: ScriptedHttp) -> Self {
6060                Self {
6061                    inner,
6062                    inflight: Arc::new(AtomicUsize::new(0)),
6063                    peak: Arc::new(AtomicUsize::new(0)),
6064                }
6065            }
6066
6067            fn peak(&self) -> usize {
6068                self.peak.load(Ordering::SeqCst)
6069            }
6070
6071            fn count(&self, needle: &str) -> usize {
6072                self.inner.count(needle)
6073            }
6074        }
6075
6076        impl Http for GatedHttp {
6077            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
6078                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
6079                self.peak.fetch_max(now, Ordering::SeqCst);
6080                YieldOnce::default().await;
6081                let out = self.inner.send(request).await;
6082                self.inflight.fetch_sub(1, Ordering::SeqCst);
6083                out
6084            }
6085        }
6086
6087        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
6088            let c = clip(id);
6089            let d = desired(c.clone(), format);
6090            let action = Action::Download {
6091                clip: c.clone(),
6092                lineage: LineageContext::own_root(&c),
6093                path: d.path.clone(),
6094                format,
6095            };
6096            (c, d, action)
6097        }
6098
6099        fn opts_with(concurrency: u32) -> ExecOptions {
6100            ExecOptions {
6101                concurrency,
6102                ..small_poll()
6103            }
6104        }
6105
6106        #[test]
6107        fn concurrency_never_exceeds_the_configured_bound() {
6108            let count = 6;
6109            let concurrency = 3;
6110            let mut scripted = ScriptedHttp::new().with_auth();
6111            let mut actions = Vec::new();
6112            let mut desireds = Vec::new();
6113            for i in 0..count {
6114                let id = format!("c{i}");
6115                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6116                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6117                actions.push(action);
6118                desireds.push(d);
6119            }
6120            let http = GatedHttp::new(scripted);
6121            let fs = MemFs::new();
6122            let plan = Plan { actions };
6123            let mut manifest = Manifest::new();
6124
6125            let outcome = run_gated_fs(
6126                &plan,
6127                &mut manifest,
6128                &desireds,
6129                &http,
6130                &fs,
6131                &opts_with(concurrency),
6132            );
6133
6134            assert_eq!(outcome.downloaded, count);
6135            assert!(
6136                http.peak() <= concurrency as usize,
6137                "peak {} exceeded the bound {concurrency}",
6138                http.peak()
6139            );
6140            assert_eq!(
6141                http.peak(),
6142                concurrency as usize,
6143                "expected the run to saturate the bound"
6144            );
6145        }
6146
6147        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
6148        /// outcome. The client is built here so the limiter can be inspected by
6149        /// the caller-facing variant below.
6150        fn run_gated_fs(
6151            plan: &Plan,
6152            manifest: &mut Manifest,
6153            desired: &[Desired],
6154            http: &GatedHttp,
6155            fs: &MemFs,
6156            opts: &ExecOptions,
6157        ) -> ExecOutcome {
6158            let ffmpeg = StubFfmpeg::flac();
6159            let clock = RecordingClock::new();
6160            let mut albums = BTreeMap::new();
6161            let mut playlists = BTreeMap::new();
6162            let client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6163            pollster::block_on(execute(
6164                plan,
6165                manifest,
6166                &mut albums,
6167                &mut playlists,
6168                desired,
6169                &HashMap::new(),
6170                Ports {
6171                    client: &client,
6172                    http,
6173                    fs,
6174                    ffmpeg: &ffmpeg,
6175                    clock: &clock,
6176                },
6177                opts,
6178            ))
6179        }
6180
6181        #[test]
6182        fn a_failing_clip_does_not_abort_the_others() {
6183            let mut scripted = ScriptedHttp::new().with_auth();
6184            scripted = scripted
6185                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
6186                .route("bad.mp3", Reply::status(404))
6187                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
6188            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
6189            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
6190            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
6191            let http = GatedHttp::new(scripted);
6192            let fs = MemFs::new();
6193            let plan = Plan {
6194                actions: vec![a1, a2, a3],
6195            };
6196            let mut manifest = Manifest::new();
6197
6198            let outcome = run_gated_fs(
6199                &plan,
6200                &mut manifest,
6201                &[d1, d2, d3],
6202                &http,
6203                &fs,
6204                &opts_with(3),
6205            );
6206
6207            assert_eq!(outcome.downloaded, 2);
6208            assert_eq!(outcome.failed(), 1);
6209            assert_eq!(outcome.status, RunStatus::Completed);
6210            assert_eq!(outcome.failures[0].clip_id, "bad");
6211            assert!(manifest.get("ok1").is_some());
6212            assert!(manifest.get("ok2").is_some());
6213            assert!(manifest.get("bad").is_none());
6214        }
6215
6216        #[test]
6217        fn outcome_is_identical_across_concurrency_levels() {
6218            // A plan mixing successful and failing downloads with serial phase-2
6219            // actions (a skip and a delete), so both phases contribute.
6220            fn build() -> (Plan, Vec<Desired>) {
6221                let mut actions = Vec::new();
6222                let mut desireds = Vec::new();
6223                for id in ["a", "b", "c", "d"] {
6224                    let (_c, d, action) = download(id, AudioFormat::Mp3);
6225                    actions.push(action);
6226                    desireds.push(d);
6227                }
6228                // A failing download in the middle of the audio set.
6229                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
6230                actions.insert(2, ae);
6231                desireds.push(de);
6232                // Phase-2 actions.
6233                actions.push(Action::Skip {
6234                    clip_id: "gone".to_owned(),
6235                });
6236                actions.push(Action::Delete {
6237                    path: "old.mp3".to_owned(),
6238                    clip_id: "old".to_owned(),
6239                });
6240                (Plan { actions }, desireds)
6241            }
6242
6243            fn http() -> ScriptedHttp {
6244                ScriptedHttp::new()
6245                    .with_auth()
6246                    .route("a.mp3", Reply::ok(b"a".to_vec()))
6247                    .route("b.mp3", Reply::ok(b"b".to_vec()))
6248                    .route("c.mp3", Reply::ok(b"c".to_vec()))
6249                    .route("d.mp3", Reply::ok(b"d".to_vec()))
6250                    .route("fail.mp3", Reply::status(404))
6251            }
6252
6253            fn seed_manifest() -> Manifest {
6254                let mut m = Manifest::new();
6255                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
6256                m
6257            }
6258
6259            let (plan, desireds) = build();
6260
6261            let mut m1 = seed_manifest();
6262            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6263            let out1 = run_gated_fs(
6264                &plan,
6265                &mut m1,
6266                &desireds,
6267                &GatedHttp::new(http()),
6268                &fs1,
6269                &opts_with(1),
6270            );
6271
6272            let mut m8 = seed_manifest();
6273            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6274            let out8 = run_gated_fs(
6275                &plan,
6276                &mut m8,
6277                &desireds,
6278                &GatedHttp::new(http()),
6279                &fs8,
6280                &opts_with(8),
6281            );
6282
6283            assert_eq!(out1, out8, "outcome must not depend on concurrency");
6284            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
6285            assert_eq!(out8.downloaded, 4);
6286            assert_eq!(out8.deleted, 1);
6287            assert_eq!(out8.skipped, 1);
6288            assert_eq!(out8.failed(), 1);
6289        }
6290
6291        #[test]
6292        fn a_systemic_disk_full_aborts_promptly() {
6293            let count = 8;
6294            let concurrency = 2;
6295            let mut scripted = ScriptedHttp::new().with_auth();
6296            let mut actions = Vec::new();
6297            let mut desireds = Vec::new();
6298            for i in 0..count {
6299                let id = format!("d{i}");
6300                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6301                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6302                actions.push(action);
6303                desireds.push(d);
6304            }
6305            // The very first clip's write hits ENOSPC, a systemic failure.
6306            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
6307            let http = GatedHttp::new(scripted);
6308            let plan = Plan { actions };
6309            let mut manifest = Manifest::new();
6310
6311            let outcome = run_gated_fs(
6312                &plan,
6313                &mut manifest,
6314                &desireds,
6315                &http,
6316                &fs,
6317                &opts_with(concurrency),
6318            );
6319
6320            assert_eq!(outcome.status, RunStatus::DiskFull);
6321            assert!(
6322                outcome.downloaded < count,
6323                "a systemic abort must stop remaining work, downloaded {}",
6324                outcome.downloaded
6325            );
6326        }
6327
6328        #[test]
6329        fn limiter_records_a_rate_limit_under_concurrent_calls() {
6330            // Three concurrent FLAC renders; exactly one clip is throttled once
6331            // on its wav_file read. The shared limiter must record that single
6332            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
6333            // the mutex keeps the AIMD state correct under concurrency.
6334            let scripted = ScriptedHttp::new()
6335                .with_auth()
6336                .route_seq(
6337                    "/gen/x/wav_file/",
6338                    vec![
6339                        Reply::status(429),
6340                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
6341                    ],
6342                )
6343                .route(
6344                    "/gen/y/wav_file/",
6345                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
6346                )
6347                .route(
6348                    "/gen/z/wav_file/",
6349                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
6350                )
6351                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
6352                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
6353                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
6354
6355            let mut actions = Vec::new();
6356            let mut desireds = Vec::new();
6357            for id in ["x", "y", "z"] {
6358                let (_c, d, action) = download(id, AudioFormat::Flac);
6359                actions.push(action);
6360                desireds.push(d);
6361            }
6362            let plan = Plan { actions };
6363            let fs = MemFs::new();
6364            let ffmpeg = StubFfmpeg::flac();
6365            let clock = RecordingClock::new();
6366            let mut albums = BTreeMap::new();
6367            let mut playlists = BTreeMap::new();
6368            let mut manifest = Manifest::new();
6369            let client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6370
6371            let outcome = pollster::block_on(execute(
6372                &plan,
6373                &mut manifest,
6374                &mut albums,
6375                &mut playlists,
6376                &desireds,
6377                &HashMap::new(),
6378                Ports {
6379                    client: &client,
6380                    http: &scripted,
6381                    fs: &fs,
6382                    ffmpeg: &ffmpeg,
6383                    clock: &clock,
6384                },
6385                &opts_with(3),
6386            ));
6387
6388            assert_eq!(outcome.downloaded, 3);
6389            assert_eq!(outcome.failed(), 0);
6390            assert!(
6391                (client.limiter_rate() - 1.0).abs() < 1e-9,
6392                "one 429 must halve the rate to 1.0, got {}",
6393                client.limiter_rate()
6394            );
6395        }
6396
6397        #[test]
6398        fn a_download_is_committed_in_plan_order_around_a_rename() {
6399            // Plan order: rename "orig" away from shared.mp3 first, then download
6400            // a new clip into shared.mp3. A parallel executor that performed the
6401            // download's destination write off plan order would write shared.mp3
6402            // before the rename ran, letting the rename carry those fresh bytes
6403            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
6404            // Committing every destination effect serially in plan order keeps
6405            // moved.mp3 = the original and shared.mp3 = the new download.
6406            let c_new = clip("new");
6407            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
6408            d_new.path = "shared.mp3".to_owned();
6409            let plan = Plan {
6410                actions: vec![
6411                    Action::Rename {
6412                        from: "shared.mp3".to_owned(),
6413                        to: "moved.mp3".to_owned(),
6414                    },
6415                    Action::Download {
6416                        clip: c_new.clone(),
6417                        lineage: LineageContext::own_root(&c_new),
6418                        path: "shared.mp3".to_owned(),
6419                        format: AudioFormat::Mp3,
6420                    },
6421                ],
6422            };
6423            let scripted = ScriptedHttp::new()
6424                .with_auth()
6425                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
6426            let http = GatedHttp::new(scripted);
6427            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
6428            let mut manifest = Manifest::new();
6429            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
6430
6431            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
6432
6433            assert_eq!(outcome.renamed, 1);
6434            assert_eq!(outcome.downloaded, 1);
6435            assert_eq!(
6436                fs.read_file("moved.mp3").as_deref(),
6437                Some(&b"ORIGINAL"[..]),
6438                "the rename must carry the original bytes, untouched by the download"
6439            );
6440            let landed = fs.read_file("shared.mp3").expect("new download must land");
6441            assert_ne!(
6442                landed, b"ORIGINAL",
6443                "the new download must replace the moved original, not corrupt it"
6444            );
6445            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
6446            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
6447        }
6448
6449        #[test]
6450        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
6451            // A systemic disk-full abort strikes the download committed before the
6452            // reformat. Because the reformat's slow render is side-effect-free and
6453            // its destination write + old-file removal only happen in the serial
6454            // commit (which the abort skips), the old file survives and the
6455            // manifest still points at it: no removed-but-referenced file.
6456            let boom = clip("boom");
6457            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
6458            d_boom.path = "boom.mp3".to_owned();
6459            let reformer = clip("r");
6460            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
6461            let plan = Plan {
6462                actions: vec![
6463                    Action::Download {
6464                        clip: boom.clone(),
6465                        lineage: LineageContext::own_root(&boom),
6466                        path: "boom.mp3".to_owned(),
6467                        format: AudioFormat::Mp3,
6468                    },
6469                    Action::Reformat {
6470                        clip: reformer.clone(),
6471                        path: "r_new.mp3".to_owned(),
6472                        from_path: "r_old.flac".to_owned(),
6473                        from: AudioFormat::Flac,
6474                        to: AudioFormat::Mp3,
6475                    },
6476                ],
6477            };
6478            let scripted = ScriptedHttp::new()
6479                .with_auth()
6480                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
6481                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
6482            let http = GatedHttp::new(scripted);
6483            // The download's write hits ENOSPC, a systemic abort.
6484            let fs = MemFs::new()
6485                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
6486                .fail_write_out_of_space("boom.mp3");
6487            let mut manifest = Manifest::new();
6488            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
6489
6490            let outcome = run_gated_fs(
6491                &plan,
6492                &mut manifest,
6493                &[d_boom, d_reformer],
6494                &http,
6495                &fs,
6496                &opts_with(4),
6497            );
6498
6499            assert_eq!(outcome.status, RunStatus::DiskFull);
6500            assert!(
6501                fs.exists("r_old.flac"),
6502                "the old file must survive the abort"
6503            );
6504            assert!(
6505                !fs.exists("r_new.mp3"),
6506                "no reformatted file may be written"
6507            );
6508            let still = manifest.get("r").expect("the manifest must still track r");
6509            assert_eq!(
6510                still.path, "r_old.flac",
6511                "the manifest must still point at the surviving old file"
6512            );
6513            assert_eq!(still.format, AudioFormat::Flac);
6514        }
6515
6516        #[test]
6517        fn a_systemic_abort_leaves_no_untracked_destination_files() {
6518            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
6519            // and the rest never commit. Every file remaining on disk must be one
6520            // the manifest tracks: producers write nothing, so an abort cannot
6521            // strand an untracked file from an in-flight or buffered render.
6522            let mut scripted = ScriptedHttp::new().with_auth();
6523            let mut actions = Vec::new();
6524            let mut desireds = Vec::new();
6525            for id in ["a0", "a1", "boom", "a3", "a4"] {
6526                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
6527                let (_c, d, action) = download(id, AudioFormat::Mp3);
6528                actions.push(action);
6529                desireds.push(d);
6530            }
6531            let http = GatedHttp::new(scripted);
6532            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
6533            let plan = Plan { actions };
6534            let mut manifest = Manifest::new();
6535
6536            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
6537
6538            assert_eq!(outcome.status, RunStatus::DiskFull);
6539            let tracked: std::collections::BTreeSet<String> = manifest
6540                .entries
6541                .values()
6542                .map(|entry| entry.path.clone())
6543                .collect();
6544            for path in fs.paths() {
6545                assert!(
6546                    tracked.contains(&path),
6547                    "found an untracked destination file: {path}"
6548                );
6549            }
6550            assert!(
6551                !fs.exists("a3.mp3"),
6552                "uncommitted renders must not be on disk"
6553            );
6554            assert!(
6555                !fs.exists("a4.mp3"),
6556                "uncommitted renders must not be on disk"
6557            );
6558        }
6559
6560        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
6561        /// live: it bumps a shared counter (tracking the peak) when a transcode
6562        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
6563        /// The [transcode, write] window is a superset of the true in-memory hold,
6564        /// so the observed peak upper-bounds the real one.
6565        struct CountingFfmpeg {
6566            inner: StubFfmpeg,
6567            held: Arc<AtomicUsize>,
6568            peak: Arc<AtomicUsize>,
6569        }
6570
6571        impl Ffmpeg for CountingFfmpeg {
6572            fn wav_to_flac(
6573                &self,
6574                wav: &[u8],
6575            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6576                let fut = self.inner.wav_to_flac(wav);
6577                let held = self.held.clone();
6578                let peak = self.peak.clone();
6579                async move {
6580                    let out = fut.await;
6581                    if out.is_ok() {
6582                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
6583                        peak.fetch_max(now, Ordering::SeqCst);
6584                    }
6585                    out
6586                }
6587            }
6588
6589            fn mp4_to_webp(
6590                &self,
6591                mp4: &[u8],
6592                settings: WebpEncodeSettings,
6593            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6594                self.inner.mp4_to_webp(mp4, settings)
6595            }
6596        }
6597
6598        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
6599        /// payload counter on each committing write, closing the window opened by
6600        /// [`CountingFfmpeg`].
6601        struct CountingFs {
6602            inner: MemFs,
6603            held: Arc<AtomicUsize>,
6604        }
6605
6606        impl Filesystem for CountingFs {
6607            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
6608                let out = self.inner.write_atomic(path, bytes);
6609                self.held.fetch_sub(1, Ordering::SeqCst);
6610                out
6611            }
6612
6613            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
6614                self.inner.rename(from, to)
6615            }
6616
6617            fn remove(&self, path: &str) -> Result<(), FsError> {
6618                self.inner.remove(path)
6619            }
6620
6621            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
6622                self.inner.prune_empty_dirs(root)
6623            }
6624
6625            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
6626                self.inner.read(path)
6627            }
6628
6629            fn metadata(&self, path: &str) -> Option<FileStat> {
6630                self.inner.metadata(path)
6631            }
6632        }
6633
6634        #[test]
6635        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
6636            // Far more FLAC clips than the concurrency bound. The ordered buffered
6637            // render keeps at most about `concurrency` transcoded payloads live at
6638            // once (never the whole library), so peak held <= concurrency + 1.
6639            let count = 12;
6640            let concurrency = 3;
6641            let mut scripted = ScriptedHttp::new().with_auth();
6642            let mut actions = Vec::new();
6643            let mut desireds = Vec::new();
6644            for i in 0..count {
6645                let id = format!("f{i}");
6646                scripted = scripted
6647                    .route(
6648                        &format!("/gen/{id}/wav_file/"),
6649                        Reply::json(&format!(
6650                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
6651                        )),
6652                    )
6653                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
6654                let (_c, d, action) = download(&id, AudioFormat::Flac);
6655                actions.push(action);
6656                desireds.push(d);
6657            }
6658            let http = GatedHttp::new(scripted);
6659            let held = Arc::new(AtomicUsize::new(0));
6660            let peak = Arc::new(AtomicUsize::new(0));
6661            let ffmpeg = CountingFfmpeg {
6662                inner: StubFfmpeg::flac(),
6663                held: held.clone(),
6664                peak: peak.clone(),
6665            };
6666            let fs = CountingFs {
6667                inner: MemFs::new(),
6668                held: held.clone(),
6669            };
6670            let clock = RecordingClock::new();
6671            let mut albums = BTreeMap::new();
6672            let mut playlists = BTreeMap::new();
6673            let mut manifest = Manifest::new();
6674            let client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6675            let plan = Plan { actions };
6676
6677            let outcome = pollster::block_on(execute(
6678                &plan,
6679                &mut manifest,
6680                &mut albums,
6681                &mut playlists,
6682                &desireds,
6683                &HashMap::new(),
6684                Ports {
6685                    client: &client,
6686                    http: &http,
6687                    fs: &fs,
6688                    ffmpeg: &ffmpeg,
6689                    clock: &clock,
6690                },
6691                &opts_with(concurrency),
6692            ));
6693
6694            assert_eq!(outcome.downloaded, count as usize);
6695            assert_eq!(
6696                held.load(Ordering::SeqCst),
6697                0,
6698                "every payload must be committed"
6699            );
6700            assert!(
6701                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
6702                "peak live payloads {} exceeded the bound {}",
6703                peak.load(Ordering::SeqCst),
6704                concurrency + 1
6705            );
6706            assert!(
6707                peak.load(Ordering::SeqCst) >= 2,
6708                "the render should genuinely overlap, peak was {}",
6709                peak.load(Ordering::SeqCst)
6710            );
6711        }
6712
6713        #[test]
6714        fn artifact_fetches_run_concurrently() {
6715            // Four CoverJpg sidecars whose owning clips are already in the manifest.
6716            // With concurrency=2 the two HTTP fetches should overlap, so the peak
6717            // in-flight count must reach at least 2.
6718            let count = 4usize;
6719            let concurrency = 2u32;
6720            let mut scripted = ScriptedHttp::new().with_auth();
6721            let mut actions = Vec::new();
6722            let mut manifest = Manifest::new();
6723            for i in 0..count {
6724                let id = format!("a{i}");
6725                scripted = scripted.route(&format!("{id}.jpg"), Reply::ok(b"jpg-bytes".to_vec()));
6726                manifest.insert(&id, entry(&format!("{id}.mp3"), AudioFormat::Mp3));
6727                actions.push(Action::WriteArtifact {
6728                    kind: ArtifactKind::CoverJpg,
6729                    path: format!("{id}/cover.jpg"),
6730                    source_url: format!("https://art.suno.ai/{id}.jpg"),
6731                    hash: format!("h{i}"),
6732                    owner_id: id,
6733                    content: None,
6734                });
6735            }
6736            let http = GatedHttp::new(scripted);
6737            let fs = MemFs::new();
6738            let plan = Plan { actions };
6739
6740            let outcome = run_gated_fs(
6741                &plan,
6742                &mut manifest,
6743                &[],
6744                &http,
6745                &fs,
6746                &opts_with(concurrency),
6747            );
6748
6749            assert_eq!(outcome.artifacts_written, count);
6750            assert_eq!(outcome.failed(), 0);
6751            assert!(
6752                http.peak() >= concurrency as usize,
6753                "artifact fetches must overlap: peak {} < concurrency {}",
6754                http.peak(),
6755                concurrency,
6756            );
6757        }
6758
6759        #[test]
6760        fn stem_fetches_run_concurrently() {
6761            // Four Mp3 stem fetches whose owning clips are in the manifest.
6762            // With concurrency=2 the peak in-flight HTTP count must reach at least 2.
6763            let count = 4usize;
6764            let concurrency = 2u32;
6765            let mut scripted = ScriptedHttp::new().with_auth();
6766            let mut actions = Vec::new();
6767            let mut manifest = Manifest::new();
6768            for i in 0..count {
6769                let id = format!("s{i}");
6770                scripted =
6771                    scripted.route(&format!("{id}voc.mp3"), Reply::ok(b"stem-bytes".to_vec()));
6772                manifest.insert(&id, entry(&format!("{id}.mp3"), AudioFormat::Mp3));
6773                actions.push(Action::WriteStem {
6774                    clip_id: id.clone(),
6775                    key: "voc".to_owned(),
6776                    stem_id: format!("{id}voc"),
6777                    path: format!("{id}.stems/voc.mp3"),
6778                    source_url: format!("https://cdn1.suno.ai/{id}voc.mp3"),
6779                    format: StemFormat::Mp3,
6780                    hash: format!("h{i}"),
6781                });
6782            }
6783            let http = GatedHttp::new(scripted);
6784            let fs = MemFs::new();
6785            let plan = Plan { actions };
6786
6787            let outcome = run_gated_fs(
6788                &plan,
6789                &mut manifest,
6790                &[],
6791                &http,
6792                &fs,
6793                &opts_with(concurrency),
6794            );
6795
6796            assert_eq!(outcome.artifacts_written, count);
6797            assert_eq!(outcome.failed(), 0);
6798            assert!(
6799                http.peak() >= concurrency as usize,
6800                "stem fetches must overlap: peak {} < concurrency {}",
6801                http.peak(),
6802                concurrency,
6803            );
6804        }
6805
6806        #[test]
6807        fn prepareable_outcome_is_identical_across_concurrency_levels_with_artifacts_and_stems() {
6808            // A plan mixing downloads, artifact writes, and stem writes. Both a
6809            // failing clip and a serial-only action (delete) are included so all
6810            // code paths contribute. Outcome and final manifest must be the same
6811            // whether concurrency is 1 or 8, proving commits remain serial and
6812            // deterministic while preparation runs in parallel.
6813            fn build() -> (Plan, Vec<Desired>) {
6814                let mut actions = Vec::new();
6815                let mut desireds = Vec::new();
6816                for id in ["x", "y", "z"] {
6817                    let (_c, d, action) = download(id, AudioFormat::Mp3);
6818                    desireds.push(d);
6819                    actions.push(action);
6820                    // A CoverJpg sidecar for each clip.
6821                    actions.push(Action::WriteArtifact {
6822                        kind: ArtifactKind::CoverJpg,
6823                        path: format!("{id}/cover.jpg"),
6824                        source_url: format!("https://art.suno.ai/{id}.jpg"),
6825                        hash: format!("art-{id}"),
6826                        owner_id: id.to_owned(),
6827                        content: None,
6828                    });
6829                    // An Mp3 stem for each clip.
6830                    actions.push(Action::WriteStem {
6831                        clip_id: id.to_owned(),
6832                        key: "voc".to_owned(),
6833                        stem_id: format!("{id}voc"),
6834                        path: format!("{id}.stems/voc.mp3"),
6835                        source_url: format!("https://cdn1.suno.ai/{id}voc.mp3"),
6836                        format: StemFormat::Mp3,
6837                        hash: format!("stem-{id}"),
6838                    });
6839                }
6840                // A failing download in the middle.
6841                let (_f, df, af) = download("fail", AudioFormat::Mp3);
6842                desireds.push(df);
6843                actions.insert(3, af);
6844                // A serial-only delete.
6845                actions.push(Action::Delete {
6846                    path: "old.mp3".to_owned(),
6847                    clip_id: "old".to_owned(),
6848                });
6849                (Plan { actions }, desireds)
6850            }
6851
6852            fn http() -> ScriptedHttp {
6853                ScriptedHttp::new()
6854                    .with_auth()
6855                    .route("x.mp3", Reply::ok(b"x-audio".to_vec()))
6856                    .route("y.mp3", Reply::ok(b"y-audio".to_vec()))
6857                    .route("z.mp3", Reply::ok(b"z-audio".to_vec()))
6858                    .route("fail.mp3", Reply::status(404))
6859                    .route("x.jpg", Reply::ok(b"x-jpg".to_vec()))
6860                    .route("y.jpg", Reply::ok(b"y-jpg".to_vec()))
6861                    .route("z.jpg", Reply::ok(b"z-jpg".to_vec()))
6862                    .route("xvoc.mp3", Reply::ok(b"x-voc".to_vec()))
6863                    .route("yvoc.mp3", Reply::ok(b"y-voc".to_vec()))
6864                    .route("zvoc.mp3", Reply::ok(b"z-voc".to_vec()))
6865            }
6866
6867            fn seed_manifest() -> Manifest {
6868                let mut m = Manifest::new();
6869                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
6870                m
6871            }
6872
6873            let (plan, desireds) = build();
6874
6875            let mut m1 = seed_manifest();
6876            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6877            let out1 = run_gated_fs(
6878                &plan,
6879                &mut m1,
6880                &desireds,
6881                &GatedHttp::new(http()),
6882                &fs1,
6883                &opts_with(1),
6884            );
6885
6886            let mut m8 = seed_manifest();
6887            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6888            let out8 = run_gated_fs(
6889                &plan,
6890                &mut m8,
6891                &desireds,
6892                &GatedHttp::new(http()),
6893                &fs8,
6894                &opts_with(8),
6895            );
6896
6897            assert_eq!(out1, out8, "outcome must not depend on concurrency");
6898            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
6899            assert_eq!(out8.downloaded, 3);
6900            assert_eq!(out8.deleted, 1);
6901            assert_eq!(out8.failed(), 1);
6902            // Covers and stems for the 3 successful clips.
6903            assert_eq!(out8.artifacts_written, 6);
6904        }
6905
6906        #[test]
6907        fn both_folder_covers_fetch_video_cover_once_under_concurrency() {
6908            // FolderWebp and FolderMp4 share a source_url (the `both` retention).
6909            // Even with other downloads running concurrently, they must stay serial
6910            // so the first fetch inserts into cover_cache and the second drains it
6911            // (#90), fetching the video_cover_url exactly once.
6912            let scripted = ScriptedHttp::new()
6913                .with_auth()
6914                .route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
6915                .route("d0.mp3", Reply::ok(b"audio".to_vec()))
6916                .route("d1.mp3", Reply::ok(b"audio".to_vec()));
6917            let mut actions = vec![
6918                Action::WriteArtifact {
6919                    kind: ArtifactKind::FolderWebp,
6920                    path: "album/cover.webp".to_owned(),
6921                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
6922                    hash: "wh".to_owned(),
6923                    owner_id: "root".to_owned(),
6924                    content: None,
6925                },
6926                Action::WriteArtifact {
6927                    kind: ArtifactKind::FolderMp4,
6928                    path: "album/cover.mp4".to_owned(),
6929                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
6930                    hash: "mh".to_owned(),
6931                    owner_id: "root".to_owned(),
6932                    content: None,
6933                },
6934            ];
6935            let mut desireds = vec![];
6936            for id in ["d0", "d1"] {
6937                let (_c, d, a) = download(id, AudioFormat::Mp3);
6938                actions.push(a);
6939                desireds.push(d);
6940            }
6941            let plan = Plan { actions };
6942            let http = GatedHttp::new(scripted);
6943            let ffmpeg = StubFfmpeg::webp();
6944            let clock = RecordingClock::new();
6945            let mut manifest = Manifest::new();
6946            let mut albums = BTreeMap::new();
6947            let mut playlists = BTreeMap::new();
6948            let client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6949            pollster::block_on(execute(
6950                &plan,
6951                &mut manifest,
6952                &mut albums,
6953                &mut playlists,
6954                &desireds,
6955                &HashMap::new(),
6956                Ports {
6957                    client: &client,
6958                    http: &http,
6959                    fs: &MemFs::new(),
6960                    ffmpeg: &ffmpeg,
6961                    clock: &clock,
6962                },
6963                &opts_with(4),
6964            ));
6965
6966            assert_eq!(
6967                http.count("root/video.mp4"),
6968                1,
6969                "video_cover_url must be fetched exactly once even under concurrency"
6970            );
6971        }
6972
6973        #[test]
6974        fn existing_clip_audio_and_cover_sidecar_share_cover_fetch() {
6975            // Clip "e" is already in the manifest; this run reformats its audio
6976            // AND updates its CoverJpg sidecar. The audio producer caches the
6977            // cover; the sidecar drains it. Even under concurrency the cover must
6978            // be fetched exactly once and cover_cache must not accumulate a
6979            // leaked entry.
6980            let c = art_clip("e");
6981            let cover_url = c.image_large_url.clone();
6982            let d = desired(c.clone(), AudioFormat::Mp3);
6983            let scripted = ScriptedHttp::new()
6984                .with_auth()
6985                .route("e.mp3", Reply::ok(b"audio".to_vec()))
6986                .route("e/large.jpg", Reply::ok(b"cover-jpg".to_vec()));
6987            let plan = Plan {
6988                actions: vec![
6989                    Action::Reformat {
6990                        clip: c,
6991                        path: "e.mp3".to_owned(),
6992                        from_path: "e-old.mp3".to_owned(),
6993                        from: AudioFormat::Mp3,
6994                        to: AudioFormat::Mp3,
6995                    },
6996                    Action::WriteArtifact {
6997                        kind: ArtifactKind::CoverJpg,
6998                        path: "e/cover.jpg".to_owned(),
6999                        source_url: cover_url,
7000                        hash: "new-art".to_owned(),
7001                        owner_id: "e".to_owned(),
7002                        content: None,
7003                    },
7004                ],
7005            };
7006            let mut manifest = Manifest::new();
7007            manifest.insert("e".to_owned(), entry("e-old.mp3", AudioFormat::Mp3));
7008            let fs = MemFs::new().with_file("e-old.mp3", b"old-audio".to_vec());
7009            let http = GatedHttp::new(scripted);
7010            let outcome = run_gated_fs(&plan, &mut manifest, &[d], &http, &fs, &opts_with(4));
7011
7012            assert_eq!(outcome.reformatted, 1);
7013            assert_eq!(outcome.failed(), 0);
7014            assert_eq!(
7015                http.count("e/large.jpg"),
7016                1,
7017                "cover must be fetched exactly once, not once per concurrent action"
7018            );
7019        }
7020    }
7021}